Help Center> Data Lake Insight> Developer Guide> Flink OpenSource SQL Jobs> Reading Data from Kafka and Writing Data to Elasticsearch
Updated on 2024-06-13 GMT+08:00

Reading Data from Kafka and Writing Data to Elasticsearch

This guide provides reference for Flink 1.12 only.

Description

This example analyzes offering purchase data and collects statistics on data results that meet specific conditions. The offering purchase data is stored in the Kafka source table, and then the analysis result is output to Elasticsearch .

For example, enter the following sample data:

{"order_id":"202103241000000001", "order_channel":"webShop", "order_time":"2021-03-24 10:00:00", "pay_amount":"100.00", "real_pay":"100.00", "pay_time":"2021-03-24 10:02:03", "user_id":"0001", "user_name":"Alice", "area_id":"330106"}

{"order_id":"202103241606060001", "order_channel":"appShop", "order_time":"2021-03-24 16:06:06", "pay_amount":"200.00", "real_pay":"180.00", "pay_time":"2021-03-24 16:10:06", "user_id":"0002", "user_name":"Jason", "area_id":"330106"}

DLI reads data from Kafka and writes the data to Elasticsearch. You can view the result in Kibana of the Elasticsearch cluster.

Prerequisites

  1. You have created a DMS for Kafka instance.

    When you create a DMS Kafka instance, do not enable Kafka SASL_SSL.

  2. You have created a CSS Elasticsearch cluster.

    In this example, the version of the created CSS cluster is 7.6.2, and security mode is disabled for the cluster.

Step 1: Create a Queue

  1. Log in to the DLI console. In the navigation pane on the left, choose Resources > Queue Management.
  2. On the displayed page, click Buy Queue in the upper right corner.
  3. On the Buy Queue page, set queue parameters as follows:
    • Billing Mode: .
    • Region and Project: Retain the default values.
    • Name: Enter a queue name.

      The queue name can contain only digits, letters, and underscores (_), but cannot contain only digits or start with an underscore (_). The name must contain 1 to 128 characters.

      The queue name is case-insensitive. Uppercase letters will be automatically converted to lowercase letters.

    • Type: Select For general purpose. Select the Dedicated Resource Mode.
    • AZ Mode and Specifications: Retain the default values.
    • Enterprise Project: Select default.
    • Advanced Settings: Select Custom.
    • CIDR Block: Specify the queue network segment. For example, 10.0.0.0/16.

      The CIDR block of a queue cannot overlap with the CIDR blocks of DMS Kafka and RDS for MySQL DB instances. Otherwise, datasource connections will fail to be created.

    • Set other parameters as required.
  4. Click Buy. Confirm the configuration and click Submit.

Step 2: Create a Kafka Topic

  1. On the Kafka management console, click an instance name on the DMS for Kafka page. Basic information of the Kafka instance is displayed.
  2. Choose Topics in the navigation pane on the left. On the displayed page, click Create Topic. Configure the following parameters:
    • Topic Name: For this example, enter testkafkatopic.
    • Partitions: Set the value to 1.
    • Replicas: Set the value to 1.

    Retain default values for other parameters.

Step 3: Create an Elasticsearch Index

  1. Log in to the CSS management console and choose Clusters > Elasticsearch from the navigation pane on the left.
  2. On the Clusters page, click Access Kibana in the Operation column of the created CSS cluster.
  3. On the displayed page, choose Dev Tools in the navigation pane on the left. The Console page is displayed.
  4. On the displayed page, run the following command to create index shoporders:
    PUT /shoporders
    {
      "settings": {
        "number_of_shards": 1
      },
    	"mappings": {
    	  "properties": {
    	    "order_id": {
    	      "type": "text"
    	    },
    	    "order_channel": {
    	      "type": "text"
    	    },
    	    "order_time": {
    	      "type": "text"
    	    },
    	    "pay_amount": {
    	      "type": "double"
    	    },
    	    "real_pay": {
    	      "type": "double"
    	    },
    	    "pay_time": {
    	      "type": "text"
    	    },
    	    "user_id": {
    	      "type": "text"
    	    },
    	    "user_name": {
    	      "type": "text"
    	    },
    	    "area_id": {
    	      "type": "text"
    	    }
    	  }
    	}
    }

Step 4: Create an Enhanced Datasource Connection

  • Connecting DLI to Kafka
    1. On the Kafka management console, click an instance name on the DMS for Kafka page. Basic information of the Kafka instance is displayed.
    2. In the Connection pane, obtain the Instance Address (Private Network). In the Network pane, obtain the VPC and subnet of the instance.
    3. Click the security group name in the Network pane. On the displayed page, click the Inbound Rules tab and add a rule to allow access from DLI queues. For example, if the CIDR block of the queue is 10.0.0.0/16, set Priority to 1, Action to Allow, Protocol to TCP, Type to IPv4, Source to 10.0.0.0/16, and click OK.
    4. Log in to the DLI management console. In the navigation pane on the left, choose Datasource Connections. On the displayed page, click Create in the Enhanced tab.
    5. In the displayed dialog box, set the following parameters: For details, see the following section:
      • Connection Name: Enter a name for the enhanced datasource connection. For this example, enter dli_kafka.
      • Resource Pool: Select the name of the queue created in Step 1: Create a Queue.
      • VPC: Select the VPC of the Kafka instance.
      • Subnet: Select the subnet of Kafka instance.
      • Set other parameters as you need.

      Click OK. Click the name of the created datasource connection to view its status. You can perform subsequent steps only after the connection status changes to Active.

    6. Choose Resources > Queue Management from the navigation pane, locate the queue you created in Step 1: Create a Queue. In the Operation column, click More > Test Address Connectivity.
    7. In the displayed dialog box, enter Kafka instance address (private network):port in the Address box and click Test to check whether the instance is reachable.
  • Connecting DLI to CSS
    1. On the CSS management console, choose Clusters > Elasticsearch. On the displayed page, click the name of the created CSS cluster to view basic information.
    2. On the Cluster Information page, obtain the Private Network Address, VPC, AND Subnet.
    3. Click the security group name. On the displayed page, click the Inbound Rules tab and add a rule to allow access from DLI queues. For example, if the CIDR block of the queue is 10.0.0.0/16, set Priority to 1, Action to Allow, Protocol to TCP, Type to IPv4, Source to 10.0.0.0/16, and click OK.
    4. Check whether the Kafka instance and Elasticsearch instance are in the same VPC and subnet.
      1. If they are, go to 7. You do not need to create an enhanced datasource connection again.
      2. If they are not, go to 5. Create an enhanced datasource connection to connect DLI to the subnet where the Elasticsearch instance locates.
    5. Log in to the DLI management console. In the navigation pane on the left, choose Datasource Connections. On the displayed page, click Create in the Enhanced tab.
    6. In the displayed dialog box, set the following parameters: For details, see the following section:
      • Connection Name: Enter a name for the enhanced datasource connection. For this example, enter dli_css.
      • Resource Pool: Select the name of the queue created in Step 1: Create a Queue.
      • VPC: Select the VPC of the Elasticsearch instance.
      • Subnet: Select the subnet of Elasticsearch instance.
      • Set other parameters as you need.

      Click OK. Click the name of the created datasource connection to view its status. You can perform subsequent steps only after the connection status changes to Active.

    7. Choose Resources > Queue Management from the navigation pane, locate the queue you created in Step 1: Create a Queue. In the Operation column, click More > Test Address Connectivity.
    8. In the displayed dialog box, enter floating IP address:database port of the Elasticsearch instance you have obtained in 2 in the Address box and click Test to check whether the database is reachable.

Step 5: Run a Job

  1. On the DLI management console, choose Job Management > Flink Jobs. On the Flink Jobs page, click Create Job.
  2. In the the Create Job dialog box, set Type to Flink OpenSource SQL and Name to FlinkKafkaES. Click OK.
  3. On the job editing page, set the following parameters and retain the default values of other parameters.
    • Queue: Select the queue created in Step 1: Create a Queue.
    • Flink Version: Select 1.12.
    • Save Job Log: Enable this function.
    • OBS Bucket: Select an OBS bucket for storing job logs and grant access permissions of the OBS bucket as prompted.
    • Enable Checkpointing: Enable this function.
    • Enter a SQL statement in the editing pane. The following is an example. Modify the parameters in bold as you need.

      In this example, the syntax version of Flink OpenSource SQL is 1.12. In this example, the data source is Kafka and the result data is written to Elasticsearch.

    • Create a Kafka source table and connect DLI to the Kafka data source.
      CREATE TABLE kafkaSource (
        order_id string,
        order_channel string,
        order_time string,
        pay_amount double,
        real_pay double,
        pay_time string,
        user_id string,
        user_name string,
        area_id string
      ) with (
        "connector" = "kafka",
        "properties.bootstrap.servers" = "10.128.0.120:9092,10.128.0.89:9092,10.128.0.83:9092",-- Internal network address and port number of the Kafka instance
        "properties.group.id" = "click",
        "topic" = "testkafkatopic",--Created Kafka topic
        "format" = "json",
        "scan.startup.mode" = "latest-offset"
      );
    • Create an Elasticsearch result table to display the data analyzed by DLI.
      CREATE TABLE elasticsearchSink (
        order_id string,
        order_channel string,
        order_time string,
        pay_amount double,
        real_pay double,
        pay_time string,
        user_id string,
        user_name string,
        area_id string
      ) WITH (
        'connector' = 'elasticsearch-7',
        'hosts' = '192.168.168.125:9200', --Private IP address and port of the CSS cluster
        'index' = 'shoporders' --Created Elasticsearch engine
      );
      --Write Kafka data to Elasticsearch indexes
      insert into
        elasticsearchSink
      select
        *
      from
        kafkaSource;
  4. Click Check Semantic and ensure that the SQL statement passes the check. Click Save. Click Start, confirm the job parameters, and click Start Now to execute the job. Wait until the job status changes to Running.

Step 6: Send Data and Query Results

  1. Kafaka sends data.

    Use the Kafka client to send data to topics created in Step 2: Create a Kafka Topic to simulate real-time data streams.

    The sample data is as follows:

    {"order_id":"202103241000000001", "order_channel":"webShop", "order_time":"2021-03-24 10:00:00", "pay_amount":"100.00", "real_pay":"100.00", "pay_time":"2021-03-24 10:02:03", "user_id":"0001", "user_name":"Alice", "area_id":"330106"}
    
    {"order_id":"202103241606060001", "order_channel":"appShop", "order_time":"2021-03-24 16:06:06", "pay_amount":"200.00", "real_pay":"180.00", "pay_time":"2021-03-24 16:10:06", "user_id":"0002", "user_name":"Jason", "area_id":"330106"}
  2. View the data processing result on Elasticsearch.
    After the message is sent to Kafka, run the following statement in Kibana for the CSS cluster and check the result:
    GET shoporders/_search
    The query result is as follows:
    {
      "took" : 0,
      "timed_out" : false,
      "_shards" : {
        "total" : 1,
        "successful" : 1,
        "skipped" : 0,
        "failed" : 0
      },
      "hits" : {
        "total" : {
          "value" : 2,
          "relation" : "eq"
        },
        "max_score" : 1.0,
        "hits" : [
          {
            "_index" : "shoporders",
            "_type" : "_doc",
            "_id" : "6fswzIAByVjqg3_qAyM1",
            "_score" : 1.0,
            "_source" : {
              "order_id" : "202103241000000001",
              "order_channel" : "webShop",
              "order_time" : "2021-03-24 10:00:00",
              "pay_amount" : 100.0,
              "real_pay" : 100.0,
              "pay_time" : "2021-03-24 10:02:03",
              "user_id" : "0001",
              "user_name" : "Alice",
              "area_id" : "330106"
            }
          },
          {
            "_index" : "shoporders",
            "_type" : "_doc",
            "_id" : "6vs1zIAByVjqg3_qyyPp",
            "_score" : 1.0,
            "_source" : {
              "order_id" : "202103241606060001",
              "order_channel" : "appShop",
              "order_time" : "2021-03-24 16:06:06",
              "pay_amount" : 200.0,
              "real_pay" : 180.0,
              "pay_time" : "2021-03-24 16:10:06",
              "user_id" : "0002",
              "user_name" : "Jason",
              "area_id" : "330106"
            }
          }
        ]
      }
    }