Updated on 2022-07-04 GMT+08:00

Submitting a Flink SQL Job

Use DLI to submit Flink SQL jobs for real-time computing. The general procedure is as follows:

Step 1: Logging in to the Cloud

Step 2: Prepare Data Sources and Data Output Channels

Step 3: Create an OBS Bucket for Saving Outputs

Step 4: Logging In to the DLI Management Console

Step 5: Creating a Queue

Step 6: Creating an Enhanced Datasource Connection

Step 7: Creating a Datasource Authentication

Step 8: Creating a Flink SQL Job

In the sample scenario, you need to create a Flink SQL job, and the job has an input and an output stream. The input stream is used to read data from DIS, and the output stream is used to write data to Kafka.

Step 1: Logging in to the Cloud

To use DLI, you need to log in to the cloud.

  1. Open the DLI home page.
  2. On the login page, enter the Username and Password, and click Login.

Step 2: Prepare Data Sources and Data Output Channels

DLI Flink jobs support other services as data sources and data output channels. For details, see Preparing Data.

In this example, assume that the job name is JobSample and DIS is used as the data source. To enable DIS, see Creating a DIS Stream in the Data Ingestion Service User Guide. Use Kafka as the data output channel to create a Kafka platinum instance. For details, see Buying an Instance in the Distributed Message Service for Kafka User Guide.
  • Create a DIS stream for the job input stream.
    1. Log in to the DIS console.
    2. In the upper left corner of the management console, select the target region and project.
    3. On the homepage, click Buy Stream and configure stream parameters. The channel information is as follows:
      • Region: Select the region where DLI is located.
      • Stream Name: csinput
      • Stream Type: Common
      • Partitions: 1
      • Data Retention (hours): 24
      • Source Data Type: BLOB
      • Auto Scaling: disabled
      • Advanced Settings: Skip it.
    4. Click Buy Now. The Details page is displayed.
    5. Click Submit.
  • Create a Kafka platinum instance for the job output stream.
    1. Before creating a Kafka instance, you need to ensure the availability of resources, including a virtual private cloud (VPC), subnet, security group, and security group rules.
      • For details about how to create a VPC and subnet, see Creating a VPC in the Virtual Private Cloud User Guide. If you need to create and use a subnet in an existing VPC, see Creating a VPC in the Virtual Private Cloud User Guide.
        • The created VPC and the Kafka instance must be in the same region.
        • Retain the default settings unless otherwise specified.
      • For details about how to create a security group, see Creating a Security Group in the Virtual Private Cloud User Guide. For details about how to add rules to a security group, see Creating a Security Group in the Virtual Private Cloud User Guide.

      For more information, see Preparing Instance Dependency Resources in the Distributed Message Service for Kafka User Guide.

    2. Log in to the Kafka management console.
    3. Select a region in the upper left corner.
    4. On the Kafka Premium page, click Buy Kafka Instance in the upper right corner and set related parameters. The instance information is as follows:
      • Region: Select the region where DLI is located.
      • AZ: Keep the default value.
      • Instance Name: kafka-dliflink
      • Version: Keep the default value.
      • CPU Architecture: Keep the default value.
      • Bandwidth: Keep the default value.
      • Maximum Partitions: Keep the default value.
      • Storage Space: Keep the default value.
      • VPC: vpc-dli
      • Subnet: dli-subnet
      • Security Group: Keep the default value.
      • Capacity Threshold Policy: Keep the default value.
      • Manager Username: dliflink (used to log in to the instance management page)
      • Password: **** (The system cannot detect your password.)
      • Confirm Password: ****
      • More Settings: Do not configure this parameter.
    5. Click Buy Now. The Details page is displayed.
    6. Click Submit.

Step 3: Create an OBS Bucket for Saving Outputs

In this example, you need to enable OBS for job JobSample to provide DLI Flink jobs with the functions of checkpoint, saving job logs, and commissioning test data.

For details about how to create a bucket, see Creating a Bucket in the Object Storage Service Console Operation Guide.

  1. In the navigation pane on the OBS management console, select Object Storage.
  2. In the upper right corner of the page, click Create Bucket and set bucket parameters.
    • Region: Select the region where DLI is located.
    • Data Redundancy Policy: Single-AZ storage
    • Bucket Name: smoke-test
    • Storage Class: Standard
    • Bucket Policy: Private
    • Default Encryption: Disable
    • Direct Reading: Disable
    • Tags: Leave it blank.
  3. Click Create Now.

Step 4: Logging In to the DLI Management Console

  1. In the service list displayed, click Data Lake Insight in Enterprise Intelligence.
  2. The DLI management console page is displayed. If you log in to the DLI management console for the first time, you need to be authorized to access OBS.

Step 5: Creating a Queue

When creating a DLI Flink SQL job, you cannot use the existing default queue. Instead, you need to create a queue, for example, a queue named Flinktest. For details about how to create a queue, see Creating a Queue.

  1. On the Dashboard page of the DLI management console, click Buy Queue in the upper right corner.
  2. Configure parameters.
    • Billing Mode: Pay-per-use
    • Region: default region
    • Name: Flinktest
    • Queue Usage: For general purpose Select Dedicated Resource Mode.
    • CPU Architecture: x86
    • CU Specifications: 16 CUs
    • Advanced Configuration: Custom
    • CIDR Block: The configured CIDR block cannot conflict with the Kafka subnet CIDR block.
  3. Click Buy to confirm the configuration.
  4. Confirm the configuration and submit the request.

Step 6: Creating an Enhanced Datasource Connection

To create a DLI Flink job, you need to create an enhanced datasource connection. For details, see Creating a Connection.

  • The CIDR block of the DLI queue which is bound with a datasource connection cannot overlap with that of the data source.
  • Datasource connections cannot be created for the default queue.
  • To access a datasource connection table, you need to use the queue for which a datasource connection has been created.
  1. In the navigation pane of the DLI management console, choose Datasource Connections.
  2. Click the Enhanced tab and click Create in the upper left corner. Set the following parameters:
    • Connection Name: diskafka
    • Bind Queue: Flinktest
    • VPC: vpc-dli
    • Subnet: dli-subnet
  3. Click OK.
  4. On the Enhanced tab page, click the created connection diskafka to view its ID and status. If the connection status is Active, the connection is successful.

Step 7: Creating a Datasource Authentication

To create a Kafka cluster, you need to enable SSL access. Download the authentication certificate and upload it to the customized OBS bucket. For details about how to create datasource authentication, see Datasource Authentication.

  1. On the DLI management console, click Datasource Connections.
  2. On the Datasource Authentication tab page, click Create to create an authentication information. Set the following parameters:
    • Authentication Certificate: Flink
    • Type: Kafka_SSL
    • Truststore Path: obs://smoke-test/client.truststore.jks.
    • Truststore Password: ********

    You do not need to set other parameters.

  3. Click OK.

Step 8: Creating a Flink SQL Job

After the data source and data output channel are prepared, you can create a Flink SQL job.

  1. In the left navigation pane of the DLI management console, choose Job Management > Flink Jobs. The Flink Jobs page is displayed.
  2. In the upper right corner of the Flink Jobs page, click Create Job. Set the following parameters:
    • Type: Flink SQL
    • Name: DIS-Flink-Kafka
    • Editor: SQL editor
  3. Click OK to enter the Edit page.
  4. Edit a Flink SQL job

    Enter details SQL statements in the SQL statement edit area. See the following:

    CREATE sink STREAM car_info (
      a1 string,
      a2 string,
      a3 string,
      a4 INT
    )
    WITH (
      type = "dis",
      region = "",
      channel = "csinput",
      encode = "csv",
      FIELD_DELIMITER = ";"
    );
    
    insert into car_info select 'id','owner','brand',1;
    insert into car_info select 'id','owner','brand',2;
    insert into car_info select 'id','owner','brand',3;
    insert into car_info select 'id','owner','brand',4;
    insert into car_info select 'id','owner','brand',5;
    insert into car_info select 'id','owner','brand',6;
    insert into car_info select 'id','owner','brand',7;
    insert into car_info select 'id','owner','brand',8;
    insert into car_info select 'id','owner','brand',9;
    insert into car_info select 'id','owner','brand',10;
    
    CREATE SOURCE STREAM car_info (
      a1 string,
      a2 string,
      a3 string,
      a4 INT
    )
    
    WITH (
      type = "dis",
      region = "",
      channel = "csinput",
      encode = "csv",
      FIELD_DELIMITER = ";"
    );
    
     CREATE SINK STREAM kafka_sink ( 
      a1 string,
      a2 string,
      a3 string,
      a4 INT
    ) // Output Field
    
    WITH (
      type="kafka",
      kafka_bootstrap_servers =  "192.x.x.x:9093, 192.x.x.x:9093, 192.x.x.x:9093",
    kafka_topic = "testflink", // Written topic
      encode = "csv", // Encoding format, which can be JSON or CSV.
      kafka_certificate_name = "Flink",
      kafka_properties_delimiter = ",",
      kafka_properties = "sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username=\"xxx\" password=\"xxx\";,sasl.mechanism=PLAIN,security.protocol=SASL_SSL"
    );
    
    INSERT INTO kafka_sink
    SELECT * FROM car_info;
  5. Click Check Semantics.
  6. Set job running parameters. Set the following parameters:
    • CUs: 2
    • Job Manager CUs: 1
    • Max Concurrent Jobs: 1
    • Dirty Data Dump Address: obs://smoke-test
    • Queue: Flinktest

    You do not need to set other parameters.

  7. Click Save.
  8. Click Start. On the displayed Start Flink Job page, confirm the job specifications and the price, and click Start Now to start the job.

    After the job is started, the system automatically switches to the Flink Jobs page, and the created job is displayed in the job list. You can view the job status in the Status column. After a job is successfully submitted, Status of the job will change from Submitting to Running. After the execution is complete, the message Completed is displayed.

    If Status of a job is Submission failed or Running exception, the job fails to be submitted or fails to run. In this case, you can move the cursor over the status icon in the Status column of the job list to view the error details. You can click to copy these details. After handling the fault based on the provided information, resubmit the job.

  9. After the job is complete, you can log in to the management console of Distributed Message Service for Kafka to view the corresponding Kafka premium instance. Click the instance name and click the Message Query tab to view the message body.