Help Center> Cloud Stream Service> Best Practices> Using Flink SQL to Analyze Logs in Real Time

Using Flink SQL to Analyze Logs in Real Time

Scenario Overview

In this practice, data is read from DIS. Create a Flink SQL job on the CS console to analyze logs in real time, and then output the result to OBS.

The procedure is as follows:

  1. Creating a DIS Stream and an OBS Bucket
  2. Creating a Flink SQL Job
  3. Sending DIS Data and Viewing the Result

Creating a DIS Stream and an OBS Bucket

In this practice, data is read from DIS and then written to OBS after stream processing. Therefore, you need to create a DIS stream and OBS bucket first.

  1. Create a DIS stream.

    1. Log in to the management console.
    2. Click Service List and choose EI Enterprise Intelligence > Data Ingestion Service to access the DIS console.
    3. On the DIS console, click Buy Stream.
      • Region: Set the same region for DIS, OBS, and CS. CN North-Beijing 1 is used as an example.
      • Stream Name: input-dis
      • Source Data Type: CSV
      • Retain the default values for other parameters.
      Figure 1 Creating a DIS stream
    4. Click Next, confirm stream specifications, and click Submit.
    5. Click Back to Stream Management. The created stream input-dis is displayed. This stream is used as the source stream of CS.

  2. Create an OBS bucket.

    1. Click Service List and choose Storage > Object Storage Service.
    2. Click Create Bucket. On the displayed page, configure related parameters.
      • Region: Select CN North-Beijing1.
      • Bucket Name: output-obs
      • Retain the default values for other parameters.
      Figure 2 Creating an OBS bucket
    1. Click Create Now. The new bucket is displayed in the bucket list.
    2. Click the bucket name output-obs. In the navigation pane, choose Objects. Click Create Folder and enter logInfos in the Folder Name field. The logInfos folder is used to store the output.
      Figure 3 Creating a folder

Creating a Flink SQL Job

  1. Click Service List and choose EI Enterprise Intelligence > Cloud Stream Service to access the CS console.

    If you log in to the CS console for the first time, apply for CS and authorize CS as prompted.

  2. Click Create Job. In the displayed Create Job dialog box, specify job information.

    • Type: Select Flink Streaming SQL Job.
    • Name: test
    • Template: Select [Ecosystem]DIS-CS-OBS_SAMPLE_TEMPLATE.
    • Retain the default values for other parameters.
    Figure 4 Creating a Flink SQL job

  3. Click OK. On the displayed page, edit the job.

    [Ecosystem]DIS-CS-OBS_SAMPLE_TEMPLATE is used as an example.

    The SQL editor contains the following parts:

    • Source stream: Configure it in the with statement to interconnect with the input-dis stream of DIS so that CS can obtain data from the input-dis stream in real time. Set the following parameters:
      • type = "dis"
      • region = "cn-north-1". cn-north-1 indicates North China-Beijing 1.
      • channel = "input-dis" (DIS stream name)
      • partition_count = "1" (number of partitions of the DIS stream)
      • encode = "csv" (data encoding format)
      • field_delimiter = "\\|\\|" (delimiter between attributes if the encoding format is CSV)
      • quote = "\u005c\u0022" (quoted symbol in a data format). The attribute delimiters between two quoted symbols are treated as common characters. If double quotation marks are used as the quoted symbol, set this parameter to \u005c\u0022 for character conversion.
      • offset = "0". This indicates that CS starts to process data from data record 0 in DIS.

        For details, see DIS Source Stream.

    • Sink stream: Configure it in the with statement to interconnect with the OBS bucket so that CS can output the result to the OBS bucket. Set the following parameters:
      • type = "obs"
      • region = "cn-north-1". cn-north-1 indicates North China-Beijing 1.
      • encode = "csv" (data encoding format)
      • field_delimiter = "\\|\\|" (delimiter between attributes if the encoding format is CSV)
      • row_delimiter = "\n" (row separator)
      • obs_dir = "output-obs/logInfos" (file storage directory). The format is {bucket name}/{folder name}.
      • file_prefix = "log_out" (prefix of the exported file name). The default value is temp.
      • rolling_size = "100m" (maximum size of a file)

      For details, see OBS Sink Stream.

    • The following are SQL querying examples:
      INSERT INTO log_out
      SELECT http_host,forward_ip,cast(cast(msec * 1000 as bigint) + 28800000  as timestamp),status,request_length, bytes_sent,string_to_array(request, '\\ ')[1],string_to_array(request, '\\ ')[2],http_referer,http_user_agent,
      upstream_cache_status,upstream_status,request_time,cookie_DedeUserID_cookie_sid_sent_http_logdata,upstream_response_time,
      upstream_addr,
      case IP_TO_PROVINCE(forward_ip) when "Guangxi" then "Guangxi Zhuang Autonomous Region"
      when "Ningxia" then "Ningxia Hui Autonomous Region"
      when "Taiwan" then "Taiwan Province"
      when "Macao" then "Macau"
      else IP_TO_PROVINCE(forward_ip) end,
      case when http_user_agent like "%Chrome%" then "Chrome"
      when http_user_agent like "%Firefox%" then "Firefox"
      when http_user_agent like "%Safari%" then "Safari"
      else "Others" end
      FROM log_infos;

  4. Set job running parameters.

    • SPUs: One Stream Processing Unit (SPU) is a unit of stream processing capacity comprised of 1 vCPU compute and 4 GB memory. One SPU costs ¥0.5 per hour. At least two SPUs are required.
    • Parallelism: number of concurrent operators in a Flink job. The default value is 1.
    • Enable Checkpointing: whether to enable the Flink snapshot
    • Save Job Log: whether to save job logs to the OBS bucket
    • Alarm Generation upon Job Exception: whether to send an email and SMS message after a job exception occurs
    Figure 5 Setting job running parameters

  5. Click Check Semantics. You can perform Debug, Submit, and Start operations on a job only after semantics check succeeds.
  6. Click Submit. Review job configurations and then click OK.

    The system automatically switches to the Job Management page, and the created job is displayed in the job list. The Status column displays the job status. If a job is successfully submitted, the job status will change to Running.

Sending DIS Data and Viewing the Result

The DIS Agent is used to upload CSV data to the DIS stream. Similar to Flume, the DIS Agent is a local agent that monitors local file changes. Once new data is added to a file, the data is immediately uploaded to the DIS stream.

For details about how to use DIS Agent, see Uploading Data by Using Agent.

  1. Start DIS Agent.

    1. Download DIS Agent from https://dis-publish.obs-website.cn-north-1.myhuaweicloud.com/dis-agent-1.1.0.zip.
    2. Decompress the downloaded DIS Agent package.
    3. Modify the conf/agent.yml file.
      ---
      # Keep unchanged.
      region: cn-north-1
      # user ak (get from 'My Credentials')
      ak: Enter your AK.
      # user sk (get from 'My Credentials')
      sk: Enter your SK.
      ak/sk: Log in to the management console, hover the cursor on the username in the upper right corner, and choose My Credentials > Access Keys > Create Access Key.
      # user project id (get from 'My Credentials')
      projectId: Log in to the management console, hover the cursor on the username in the upper right corner, choose My Credentials > Projects, locate the row containing cn-north-1, and use the value of Project ID.
      # Keep unchanged.
      endpoint: https://dis.cn-north-1.myhuaweicloud.com:20004
      # config each flow to monitor file.
      flows:
      # Enter the name of the DIS stream you created.
      - DISStream: input-dis
      # Enter the directory for storing data files.
      filePattern: D:/disagent-cw/dis-agent-1.1.0/data/*.log
      # Keep unchanged.
    4. Start DIS Agent.
      • Linux operating system: bin/start-dis-agent.sh
      • Windows operating system: bin/start-dis-agent.bat

  2. Send DIS data.

    Place your data file in the file path configured in agent.yml. If you choose to add data to the file by writing a program, move the program to the file path configured in agent.yml.

    1
    2
    3
    4
    5
    6
    import time
    
    for idx in range(10000):
    	with open("test.log", mode = "a+") as f:
    		f.write("api.huaweicloud.com||45.249.212.44||15421010072.675||200||651||228||POST /x/report/heartbeat HTTP/1.1||-||Mozilla/5.0 (Windows NT 6.0; rv:34.0) Gecko/20100101 Firefox/34.0||-||200||0.033||-.918nw0fj-||0.033||140.206.227.10:80" + "\n" + "api.huaweicloud.com||45.249.212.52||15421010072.875||200||651||228||POST /details/jobs HTTP/1.1||-||Mozilla/5.0 (Windows NT 6.0; rv:34.0) Gecko/20100101 Firefox/34.0||-||200||0.033||-.918nw0fj-||0.033||140.206.227.10:80" + "\n")
    	time.sleep(60)
    

  3. Log in to the OBS console, go to the logInfos folder of the output-obs bucket, click Download, and view the output.