Updated on 2024-11-29 GMT+08:00

Interconnecting FlinkServer with Redis

Scenario

This section describes the data definition language (DDL) of Redis as a sink or dimension table, as well as the WITH parameters and example code for creating a table, and provides guidance on how to perform operations on the FlinkServer job management page.

Kafka in security mode is used as an example.

Prerequisites

  • The HDFS, Yarn, Redis, and Flink services have been installed in a cluster.
  • The client that contains the Redis service has been installed in a directory, for example, /opt/client.
  • You have created a user assigned with the FlinkServer Admin Privilege (for example, flink_admin) for accessing the Flink web UI by referring to Creating a FlinkServer Role.

Procedure

Scenario 1: Redis functions as a sink table.

  1. Log in to Manager as user flink_admin and choose Cluster > Services > Flink. In the Basic Information area, click the link on the right of Flink WebUI to access the Flink web UI.
  2. Create a Flink SQL job by referring to Creating a Job. On the job development page, configure the job parameters as follows and start the job.

    In Basic Parameter, select Enable CheckPoint, set Time Interval(ms) to 60000, and retain the default value for Mode.
    CREATE TABLE kafka_source (
      account varchar(10),
      costs int,
      ts AS PROCTIME()
    ) WITH (
      'connector' = 'kafka',
      'topic' = 'user_source',
      'properties.bootstrap.servers' = 'IP address of the Kafka broker instance:Kafka port number',
      'properties.group.id' = 'testGroup',
      'scan.startup.mode' = 'latest-offset',
      'format' = 'json',
      'properties.sasl.kerberos.service.name' = 'kafka',
      'properties.security.protocol' = 'SASL_PLAINTEXT',
      'properties.kerberos.domain.name' = 'hadoop.System domain name'
    );
    CREATE table redis_sink(
      account varchar,
      costs int,
      PRIMARY KEY(account) NOT ENFORCED
    ) WITH (
      'connector' = 'redis',
      'deploy-mode'='cluster',
      'need-kerberos-auth' = 'true',
      'service-kerberos-name' = 'redis/hadoop.System domain name',
      'login-context-name' = 'Client',
      'host' = '10.10.10.169',
      'port' = '22400',
      'isSSLMode' = 'true',
      'data-type' = 'string',
      'namespace' = 'redis_table_2',
      'sink.batch.max-size' = '-1',--Indicates whether to enable batch write to Redis and the number of writes in a batch. The value -1 indicates that batch write to Redis is disabled. To enable batch write to Redis, you need to enable checkpointing. 
      'sink.flush-buffer.timeout' = '1000'--After batch write to Redis is enabled, data in the queue can be updated to Redis at a specified time, in milliseconds.
    
    );
    INSERT INTO
      redis_sink
    SELECT
      account,
      SUM(costs)
    FROM
      kafka_source
    GROUP BY
      TUMBLE(ts, INTERVAL '90' SECOND),
      --This allows you to quickly view the calculation result.
      account;
    • The IP address and port number of the Kafka broker instance are as follows:
      • To obtain the instance IP address, log in to FusionInsight Manager, choose Cluster > Services > Kafka, click Instance, and query the instance IP address on the instance list page.
      • If Kerberos authentication is enabled for the cluster (the cluster is in security mode), the Broker port number is the value of sasl.port. The default value is 21007.
      • If Kerberos authentication is disabled for the cluster (the cluster is in normal mode), the broker port number is the value of port. The default value is 9092. If the port number is set to 9092, set allow.everyone.if.no.acl.found to true. The procedure is as follows:

        Log in to FusionInsight Manager and choose Cluster > Services > Kafka. Click Configurations then All Configurations. On the page that is displayed, search for allow.everyone.if.no.acl.found, set it to true, and click Save.

    • System domain name: You can log in to FusionInsight Manager, choose System > Permission > Domain and Mutual Trust, and check the value of Local Domain.
    • host and port indicate the IP address and port number of an instance in the Redis cluster, respectively.

      The formula for calculating the port number of the Redis instance is: 22400 + Instance ID – 1.

      To view the instance ID, choose Cluster > Name of the desired cluster > Service > Redis > Redis Manager on FusionInsight Manager and click the target Redis cluster name.

      For example, in the Redis cluster, the port number of the Redis instance that corresponds to the role R1 port is 22400 (22400 + 1 – 1 = 22400).

    • namespace: Key used to concatenate the Redis database. Set it to a value in the format of Namespace value:Account value. For example, if the account value is A1 and the namespace value is redis_table_2, the value of this key in the Redis database is redis_table_2:A1.
    • sink.batch.max-size:
      • Enable batch write to Redis and set the number (positive integer) of records to be written in a batch. The value -1 indicates that batch write to Redis is disabled.

        Enabling this function can improve the performance in big data scenarios, but it is not suitable for scenarios that have high requirements on real-time performance. It is recommended that the number of batch writes be no more than 30,000.

      • To set this parameter, you need to enable checkpointing.
    • sink.flush-buffer.timeout: After batch write to Redis is enabled, data in the queue can be updated to Redis at a specified time, in milliseconds.

  3. On the job management page, check whether the job status is Running.
  4. Run the following command to check whether data is received in the sink table, that is, check whether data is properly written to the Kafka topic after 5 is performed. For details, see Managing Messages in Kafka Topics.

    sh kafka-console-consumer.sh --topic Topic name --bootstrap-server Service IP address of the Kafka broker instance:Kafka port number --consumer.config /opt/client/Kafka/kafka/config/consumer.properties

  5. View the topic and write data to the Kafka topic by referring to Managing Messages in Kafka Topics. After the data is written, view the execution result in the window in 4.

    ./kafka-topics.sh --list --zookeeper IP address of the ZooKeeper quorumpeer instance:ZooKeeper port number/kafka

    sh kafka-console-producer.sh --broker-list IP address of the node where Kafka instances reside:Kafka port number --topic Topic name --producer.config Client directory/Kafka/kafka/config/producer.properties

    For example, if the topic name is user_source, the script is sh kafka-console-producer.sh --broker-list IP address of the node where the Kafka instance is located:Kafka port number --topic user_source --producer.config /opt/client/Kafka/kafka/config/producer.properties

    Enter the message content.
    {"account": "A1","costs":"11"}
    {"account": "A1","costs":"22"}
    {"account": "A2","costs":"33"}
    {"account": "A3","costs":"44"}

    Press Enter to send the message.

    • IP address of the ZooKeeper quorumpeer instance

      To obtain IP addresses of all ZooKeeper quorumpeer instances, log in to FusionInsight Manager and choose Cluster > Services > ZooKeeper. On the displayed page, click Instance and view the IP addresses of all the hosts where the quorumpeer instances locate.

    • Port number of the ZooKeeper client

      Log in to FusionInsight Manager and choose Cluster > Service > ZooKeeper. On the displayed page, click Configurations and check the value of clientPort.

  6. Run the following command to log in to the Redis client and query the result. redis_table_2:A1 is used as an example.

    redis-cli -c -h Service IP address of one instance in the Redis cluster -p Redis port number

    get redis_table_2:A1

Scenario 2: Redis functions as a dimension table.

  1. Log in to Manager as user flink_admin and choose Cluster > Services > Flink. In the Basic Information area, click the link on the right of Flink WebUI to access the Flink web UI.
  2. Create a Flink SQL job by referring to Creating a Job. On the job development page, configure the job parameters as follows and start the job.

    In Basic Parameter, select Enable CheckPoint, set Time Interval(ms) to 60000, and retain the default value for Mode.

    CREATE TABLE KafkaSource ( -- Kafka functions as a source table.
      `user_id` VARCHAR,
      `user_name` VARCHAR,
      `age` double,
       proctime as proctime()
    ) WITH (
      'connector' = 'kafka',
      'topic' = 'user_source',
      'properties.bootstrap.servers' = 'IP address of the Kafka broker instance:Kafka port',
      'properties.group.id' = 'testGroup',
      'scan.startup.mode' = 'latest-offset',
      'value.format' = 'csv',
      'properties.sasl.kerberos.service.name' = 'kafka',
      'properties.security.protocol' = 'SASL_PLAINTEXT',
      'properties.kerberos.domain.name' = 'hadoop.System domain name'
    );
    CREATE TABLE KafkaSink ( -- Kafka functions as a sink table.
      `user_id` VARCHAR,
      `user_name` VARCHAR,
      `age` double,
      `phone_number` VARCHAR,
      `address` VARCHAR
    ) WITH (
      'connector' = 'kafka',
      'topic' = 'user_sink',
      'properties.bootstrap.servers' = 'IP address of the Kafka broker instance:Kafka port',
      'properties.group.id' = 'testGroup',
      'scan.startup.mode' = 'latest-offset',
      'value.format' = 'csv',
      'properties.sasl.kerberos.service.name' = 'kafka',
      'properties.security.protocol' = 'SASL_PLAINTEXT',
      'properties.kerberos.domain.name' = 'hadoop.System domain name'
    );
    CREATE TABLE RedisTable ( -- Redis functions as a dimension table.
      user_name VARCHAR,
      score double,
      phone_number VARCHAR,
      address VARCHAR
    ) WITH (
      'connector' = 'redis',
      'deploy-mode'='cluster',
      'need-kerberos-auth' = 'true',
      'service-kerberos-name' = 'redis/hadoop.System domain name',
      'login-context-name' = 'Client',
      'zset-score-column' = 'score',
      'host' = '10.10.10.169',
      'port' = '22400',
      'isSSLMode' = 'true',
      'key-ttl-mode' = 'no-ttl',
      'data-type' = 'sorted-set',
      'namespace' = 'redis_zset',
      'zset-delimiter' = ',',
      'key-column' = 'user_name',
      'schema-syntax' = 'concatenate-fields'
    );
    
    INSERT INTO
      KafkaSink
    SELECT
      t.user_id,
      t.user_name,
      t.age,
      d.phone_number,
      d.address
    FROM
      KafkaSource as t
      JOIN RedisTable FOR SYSTEM_TIME AS OF t.proctime as d ON t.user_name = d.user_name;
      -- FOR SYSTEM_TIME AS OF t.proctime must be added, indicating the current data in the JOIN dimension table.

  3. Run the following commands to write test data to the Redis dimension table:

    cd /opt/client/Redis/bin

    ./redis-cli -h 10.10.10.11 -p 22400 -c

    Enter the message content.

    ZADD redis_zset:zhangsan 80 153xxxx1111,city1
    ZADD redis_zset:lisi 70 153xxxx2222,city2
    ZADD redis_zset:wangwu 90 153xxxx3333,city3

    If channel encryption is enabled for Redis, replace the second command with ./redis-cli -h 10.10.10.11 -p 22400 --tls -c.

    To enable SSL channel encryption for Redis, log in to FusionInsight Manager and choose Cluster > Services > Redis. On the displayed page, click Configurations and then All Configurations, search for REDIS_SSL_ON, and set this parameter to true. Channel encryption encrypts data during data transfer but affects performance. Do not enable this function (set REDIS_SSL_ON to false) when Redis does not contain important or sensitive data.

  4. Run the following command to generate data and then write it into the Kafka source table.

    sh kafka-console-producer.sh --broker-list IP address of the node where Kafka instances reside:Kafka port number --topic Topic name --producer.config Client directory/Kafka/kafka/config/producer.properties

    Enter the message content.
    1,zhangsan,20
    2,lisi,25
    3,wangwu,28

  5. Run the following commands to check whether data is written from the Kafka topic to the sink table:

    sh kafka-console-consumer.sh --topic Topic name --bootstrap-server Service IP address of the Kafka broker instance:Kafka port number --consumer.config Client directory/Kafka/kafka/config/consumer.properties

    The output will be displayed as follows:

    1,zhangsan,20,153xxxx1111,city1
    2,lisi,25,153xxxx2222,city2
    3,wangwu,28,153xxxx3333,city3

WITH Parameters

Table 1 WITH parameters

Parameter

Mandatory

Type

Description

zSetScoreColumn

No

String

Column name corresponding to the score field when Redis functions as a dimension table in Zset format.

hashKeyColumn

No

String

Column name corresponding to the Hash field, in hash format.

host

Yes

String

IP address for connecting to the Redis cluster, which is the instance IP address (service plane) of the Redis cluster.

port

Yes

String

Port number of the Redis instance.

The formula for calculating the port number of the Redis instance is: 22400 + Instance ID – 1.

To view the instance ID, log in to FusionInsight Manager and choose Cluster > Name of the desired cluster > Services. On the page that is displayed, choose Redis > Redis Manager and click the target Redis cluster name.

For example, in the Redis cluster, the port number of the Redis instance that corresponds to the role R1 port is 22400 (22400 + 1 – 1 = 22400).

separator

No

String

Separator for the fields in a value when Redis is used as a dimension table, for example, (,) and (\u200b).

key-ttl-mode

No

String

Redis data expiration policy. Value options are as follows:

  • no-ttl: Data does not expire.
  • expire-msec: the period after which data expires, in milliseconds.
  • expire-at-date: Data expires at a specified time, accurate to seconds.
  • expire-at-timestamp: Data expires at a specified time, accurate to milliseconds.

key-ttl

No

String

This parameter is mandatory when key-ttl-mode is set to a value other than no-ttl. The value does not need to contain a unit.

isSSLMode

No

String

Whether to enable SSL.

  • true: The SSL mode is enabled.
  • false: The SSL mode is disabled.

keyPrefix

No

String

Prefix of the Redis key.