Updated on 2022-12-07 GMT+08:00

Creating and Submitting a Flink SQL Job

You can use DLI to submit Flink SQL jobs for real-time computing. The general procedure is as follows:

Step 1: Log In to the Cloud Platform

Step 2: Prepare Data Sources and Data Output Channels

Step 3: Create an OBS Bucket for Saving Outputs

Step 4: Logging In to the DLI Management Console

Step 5: Creating a Queue

Step 6: Creating an Enhanced Datasource Connection

Step 7: Creating a Datasource Authentication

Step 8: Configuring Security Group Rules and Testing Address Connectivity

Step 9: Creating a Flink SQL Job

The Flink SQL job you will create has an input and an output stream. The input stream is used to read data from DIS, and the output stream is used to write data to Kafka.

Step 1: Log In to the Cloud Platform

To use DLI, you need to log in to the cloud platform.

  1. Open the DLI home page.
  2. On the login page, enter the username and password, and click Log In.

Step 2: Prepare Data Sources and Data Output Channels

DLI Flink jobs allow other services to function as data sources and data output channels. For details, see"Preparing Data".

In this example, assume that the job name is JobSample and DIS is used as the data source. To enable DIS, see "Creating a DIS Stream" in the Data Ingestion Service User Guide. To use Kafka as the data output channel, you need to create a Kafka platinum instance. For details, see Buying an Instance in the Distributed Message Service for Kafka User Guide.
  • Create a DIS stream for the job input stream.
    1. Log in to the DIS console.
    2. In the upper left corner of the management console, select the target region and project.
    3. On the Overview page, click Buy Stream and configure stream parameters. The channel information is as follows:
      • Region: Select the region where DLI is located.
      • Stream Name: csinput
      • Stream Type: Common
      • Partitions: 1
      • Data Retention (hours): 24
      • Source Data Type: BLOB
      • Auto Scaling: disabled
      • Enterprise project: default
      • Advanced Settings: Skip it.
    4. Click Buy Now. The Details page is displayed.
    5. Click Submit.
  • Create a Kafka platinum instance for the job output stream.
    1. Before creating a Kafka instance, ensure the availability of resources, including a virtual private cloud (VPC), subnet, security group, and security group rules.
      • For details about how to create a VPC and subnet, see "Creating a VPC and Subnet" in the Virtual Private Cloud User Guide. For details about how to create and use a subnet in an existing VPC, see "Create a subnet for the VPC" in the Virtual Private Cloud User Guide.
        • The created VPC and the Kafka instance that will be created must be in the same region.
        • Retain the default settings unless otherwise specified.
      • For details about how to create a security group, see "Creating a Security Group" in the Virtual Private Cloud User Guide. For details about how to add rules to a security group, see "Creating a Subnet for the VPC" in the Virtual Private Cloud User Guide.

      For more information, see Managing Kafka Premium Instances > Preparing the Environment in the Distributed Message Service User Guide.

    2. Log in to the DMS (for Kafka) console.
    3. Select a region in the upper left corner.
    4. On the Kafka Instances page, click Buy Instance in the upper right corner and set related parameters. The instance information is as follows:
      • Region: Select the region where DLI is located.
      • Project: Keep the default value.
      • AZ: Keep the default value.
      • Instance Name: kafka-dliflink
      • Enterprise Project: default
      • Version: Keep the default value.
      • CPU Architecture: Keep the default value.
      • Specifications: Select the specification as needed.
      • Brokers: Keep the default value.
      • Storage Space: Keep the default value.
      • Capacity Threshold Policy: Keep the default value.
      • VPC and Subnet: Select the VPC and subnet created in 1.
      • Security Group: Select the security group created in 1.
      • Manager Username: dliflink (used to log in to the instance management page)
      • Password: **** (The system cannot detect your password.)
      • Confirm Password: ****
      • Advanced Settings: Enable Kafka SASL_SSL and configure the username and password for SSL authentication as prompted. You do not need to set other parameters.
    5. Click Buy. The confirmation page is displayed.
    6. Click Submit.
    7. On the DMS for Kafka console, click Kafka Premium and click the name of the Kafka instance, for example, kafka-dliflink. The instance details page is displayed.
    8. Locate the SSL certificate in Basic Information > Advanced Settings, and click Download. Download the package to the local PC and decompress it to obtain the client certificate file client.truststore.jks.

Step 3: Create an OBS Bucket for Saving Outputs

In this example, you need to enable OBS for job JobSample to provide DLI Flink jobs with the functions of checkpoint, saving job logs, and commissioning test data.

For details about how to create a bucket, see "Creating a Bucket" in the Object Storage Service Console Operation Guide.

  1. In the navigation pane on the OBS management console, select Object Storage.
  2. In the upper right corner of the page, click Create Bucket and set bucket parameters.
    • Region: Select the region where DLI is located.
    • Bucket Name: Enter a bucket name.
    • Default Storage Class: Standard
    • Bucket Policy: Private
    • Default Encryption: Do not enable
    • Direct Reading: Do not enable
    • Enterprise project: default
    • Tags: Leave it blank.
  3. Click Create Now.

Step 4: Logging In to the DLI Management Console

  1. .
  2. The DLI management console page is displayed. If you log in to the DLI management console for the first time, you need to be authorized to access OBS.

Step 5: Creating a Queue

You cannot create a DLI Flink SQL job on the existing default queue. Instead, you need to create a queue, for example, a queue named Flinktest. For details, see Creating a Queue.

  1. On the Overview page of the DLI management console, click Buy Queue in the upper right corner.
  2. Configure the following parameters:
    • Name: Flinktest
    • Queue Usage: Select For general purpose and Dedicated Resource Mode.
    • CU Specifications: 16 CUs
    • Enterprise project: default
    • Description: Leave it blank.
    • Advanced Settings: Custom
    • CIDR Block: The configured CIDR block cannot conflict with the Kafka subnet CIDR block.
  3. Click Buy to confirm the configuration.
  4. Confirm the configuration and submit the request.

Step 6: Creating an Enhanced Datasource Connection

You need to create an enhanced datasource connection for the Flink job. For details, see "Creating an Enhanced Datasource Connection".

  • The CIDR block of the DLI queue bound with a datasource connection cannot overlap with the CIDR block of the data source.
  • Datasource connections cannot be created for the default queue.
  • To access a table across data sources, you need to use a queue bound to a datasource connection.
  1. In the navigation pane of the DLI management console, choose Datasource Connections.
  2. Click the Enhanced tab and click Create. Set the following parameters:

    • Connection Name: diskafka
    • Bind Queue: flinktest
    • VPC: vpc-dli
    • Subnet: dli-subnet

      The VPC and subnet must be the same as those of the Kafka instance.

  3. Click OK.
  4. On the Enhanced tab page, click the created connection diskafka to view its VPC Peering ID and Connection Status. If the status is Active, the connection is successful.

Step 7: Creating a Datasource Authentication

For details about how to create data source authentication, see "Datasource Authentication".

  1. Upload the Kafka authentication file client.truststore.jks obtained in Step 2: Prepare Data Sources and Data Output Channels to the OBS bucket smoke-test created in Step 3: Create an OBS Bucket for Saving Outputs.
  2. On the DLI management console, click Datasource Connections.
  3. On the Datasource Authentication tab page, click Create to add an authentication information. Set the following parameters:
    • Authentication Certificate: Flink
    • Type: Kafka_SSL
    • Truststore Path: obs://smoke-test/client.truststore.jks
    • Truststore password: dms@kafka

    You do not need to set other parameters.

  4. Click OK.

Step 8: Configuring Security Group Rules and Testing Address Connectivity

  1. On the DLI management console, click Resources > Queue Management, select the bound queue, and click the arrow next to the queue name to view the network segment information of the queue.
  2. Log in to the DMS for Kafka console, click Kafka Premium, and click the name of the created Kafka instance, for example, kafka-dliflink. The instance information page is displayed.
  3. On the Basic Information page, obtain the Kafka connection address and port number in the Connection Address area.

  4. On the Basic Information page, click the security group name in the Security Group area.
  5. On the security group configuration page of the Kafka instance, choose Inbound Rules > Add Rule. Set Protocol to TCP, Port to 9093, and Source to the CIDR block of the DLI queue. Click OK.
  6. Log in to the DLI management console and choose Resources > Queue Management. In the row of the Flink queue, choose More > Test Address Connectivity. In the Address text box, enter the Kafka connection address and port number in the format of IP address:port number, and click Test. The subsequent operations can be performed only when the address is reachable. Note that multiple addresses must be tested separately.

Step 9: Creating a Flink SQL Job

After the data source and data output channel are prepared, you can create a Flink SQL job.

  1. In the left navigation pane of the DLI management console, choose Job Management > Flink Jobs. The Flink Jobs page is displayed.
  2. In the upper right corner of the Flink Jobs page, click Create Job. Set the following parameters:
    • Type: Flink SQL
    • Name: DIS-Flink-Kafka
    • Description: Leave it blank.
    • Template Name: Do not select any template.
  3. Click OK to enter the editing page.
  4. Edit the Flink SQL job

    Enter SQL statements in the editing window. The example statements are as follows. Note that the values of the parameters in bold must be changed according to the comments.

    CREATE SOURCE STREAM car_info (
      a1 string,
      a2 string,
      a3 string,
      a4 INT
    )
    WITH (
      type = "dis",
      region = "xxx",// Region where the current DLI queue is located
      channel = "csinput",
      encode = "csv",
      FIELD_DELIMITER = ";"
    );
    
     CREATE SINK STREAM kafka_sink ( 
      a1 string,
      a2 string,
      a3 string,
      a4 INT
    ) // Output field
    
    WITH (
      type="kafka",
    Change kafka_bootstrap_servers = "192.x.x.x:9093, 192.x.x.x:9093, 192.x.x.x:9093",// Connection address of the Kafka instance
    kafka_topic = "testflink", // Topic to be written to Kafka. Log in to the Kafka console, click the name of the created Kafka instance, and view the topic name on the Topic Management page.
      encode = "csv", // Encoding format, which can be JSON or CSV.
      kafka_certificate_name = "Flink",// The value is the name of the Kafka datasource authentication created in Step 7.
      kafka_properties_delimiter = ",",
      // Replace xxx in username and password in kafka_properties with the username and password for creating SSL authentication for Kafka in step 2.
      kafka_properties = "sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username=\"xxx\" password=\"xxx\";,sasl.mechanism=PLAIN,security.protocol=SASL_SSL"
    );
    
    INSERT INTO kafka_sink
    SELECT * FROM car_info;
    
    CREATE sink STREAM car_info1 (
      a1 string,
      a2 string,
      a3 string,
      a4 INT
    )
    WITH (
      type = "dis",
      region = "xxx",// Region where the current DLI queue is located
      channel = "csinput",
      encode = "csv",
      FIELD_DELIMITER = ";"
    );
    
    insert into car_info1 select 'id','owner','brand',1;
    insert into car_info1 select 'id','owner','brand',2;
    insert into car_info1 select 'id','owner','brand',3;
    insert into car_info1 select 'id','owner','brand',4;
    insert into car_info1 select 'id','owner','brand',5;
    insert into car_info1 select 'id','owner','brand',6;
    insert into car_info1 select 'id','owner','brand',7;
    insert into car_info1 select 'id','owner','brand',8;
    insert into car_info1 select 'id','owner','brand',9;
    insert into car_info1 select 'id','owner','brand',10;
  5. Click Check Semantics.
  6. Set job running parameters. The mandatory parameters are as follows:
    • Queue: Flinktest
    • CUs: 2
    • Job Manager CUs: 1
    • Parallelism: 1
    • Save Job Log: selected
    • OBS Bucket: Select the OBS bucket for storing job logs. You need the permissions to access this bucket.

    You do not need to set other parameters.

  7. Click Save.
  8. Click Start. On the displayed Start Flink Job page, confirm the job specifications and the price, and click Start Now to start the job.

    After the job is started, the system automatically switches to the Flink Jobs page, and the created job is displayed in the job list. You can view the job status in the Status column. After a job is successfully submitted, Status of the job will change from Submitting to Running.

    If Status of a job is Submission failed or Running exception, the job fails to be submitted or fails to run. In this case, you can move the cursor over the status icon to view the error details. You can click to copy these details. After handling the fault based on the provided information, resubmit the job.

  9. After the job is complete, you can log in to the management console of Distributed Message Service for Kafka to view the corresponding Kafka premium instance. Click the instance name, click the Message Query tab, select the Kafka topic written in the Flink SQL job, and click Search. In the Operation column, click View Message Body to view the written message content.