更新时间:2024-11-29 GMT+08:00

FlinkServer对接Redis

操作场景

本章节介绍Redis作为sink表或者维表的DDL定义,以及创建表时使用的WITH参数和代码示例,并指导如何在FlinkServer作业管理页面操作。

本示例以安全模式Kafka为例。

前提条件

  • 集群中已安装HDFS、Yarn、Redis和Flink服务。
  • 包含Redis服务的客户端已安装,例如安装路径为:/opt/client
  • 参考创建FlinkServer角色创建一个具有FlinkServer管理员权限的用户用于访问Flink WebUI,如:flink_admin。

操作步骤

场景一:Redis作为sink表。

  1. 使用flink_admin登录Manager,选择“集群 > 服务 > Flink”,在“Flink WebUI”右侧,单击链接,访问Flink的WebUI。
  2. 参考新建作业,新建Flink SQL流作业,在作业开发界面进行作业开发,配置完成后启动作业。

    需勾选“基础参数”中的“开启CheckPoint”,“时间间隔(ms)”可设置为“60000”,“模式”可使用默认值。
    CREATE TABLE kafka_source (
      account varchar(10),
      costs int,
      ts AS PROCTIME()
    ) WITH (
      'connector' = 'kafka',
      'topic' = 'user_source',
      'properties.bootstrap.servers' = 'Kafka的Broker实例业务IP:Kafka端口号',
      'properties.group.id' = 'testGroup',
      'scan.startup.mode' = 'latest-offset',
      'format' = 'json',
      'properties.sasl.kerberos.service.name' = 'kafka',
      'properties.security.protocol' = 'SASL_PLAINTEXT',
      'properties.kerberos.domain.name' = 'hadoop.系统域名'
    );
    CREATE table redis_sink(
      account varchar,
      costs int,
      PRIMARY KEY(account) NOT ENFORCED
    ) WITH (
      'connector' = 'redis',
      'deploy-mode'='cluster',
      'need-kerberos-auth' = 'true',
      'service-kerberos-name' = 'redis/hadoop.系统域名',
      'login-context-name' = 'Client',
      'host' = '10.10.10.169',
      'port' = '22400',
      'isSSLMode' = 'true',
      'data-type' = 'string',
      'namespace' = 'redis_table_2',
      'sink.batch.max-size' = '-1',--是否开启批写Redis并设置批写数量,'-1'表示不开启,若开启则需同步开启CheckPoint
      'sink.flush-buffer.timeout' = '1000'--开启批写Redis后可按照指定时间将队列里面的数据刷新到Redis,单位:ms
    
    );
    INSERT INTO
      redis_sink
    SELECT
      account,
      SUM(costs)
    FROM
      kafka_source
    GROUP BY
      TUMBLE(ts, INTERVAL '90' SECOND),
      --为了快速看到计算结果
      account;
    • Kafka Broker实例IP地址及端口号说明:
      • 服务的实例IP地址可通过登录FusionInsight Manager后,单击“集群 > 服务 > Kafka > 实例”,在实例列表页面中查询。
      • 集群已启用Kerberos认证(安全模式)时Broker端口为“sasl.port”参数的值,默认为“21007”。
      • 集群未启用Kerberos认证(普通模式)时Broker端口为“port”的值,默认为“9092”。如果配置端口号为9092,则需要配置“allow.everyone.if.no.acl.found”参数为true,具体操作如下:

        登录FusionInsight Manager系统,选择“集群 > 服务 > Kafka > 配置 > 全部配置”,搜索“allow.everyone.if.no.acl.found”配置,修改参数值为true,保存配置即可。

    • 系统域名:可登录FusionInsight Manager,选择“系统 > 权限 > 域和互信”,查看“本端域”参数,即为当前系统域名。
    • host、port:分别为Redis集群的其中一个实例IP(业务平面)和端口号。

      Redis实例的端口计算方式为:22400+该实例的ID-1。

      实例ID可以通过在FusionInsight Manager中选择“集群 > 待操作集群的名称 > 服务 > Redis > Redis管理”,单击Redis集群名称查看。

      例如Redis集群内角色R1对应的Redis实例的端口为22400+1-1=22400。

    • namespace:用于拼接Redis数据库的键,格式为“namespace的值:account的值”。如发送数据的account的值为“A1”,namespace的值为“redis_table_2”,那么此数据在Redis数据库中的键为redis_table_2:A1。
    • sink.batch.max-size:
      • 开启批写Redis并设置批写数量(正整数),单位:条。“-1”表示不开启批写Redis。

        开启该功能可提升大数据场景下性能表现,但不适合对实时性要求过高的场景,建议批写数量不超过30000。

      • 设置该参数需同步开启CheckPoint。
    • sink.flush-buffer.timeout:开启批写Redis后,可按照指定时间将队列里面的数据刷新到Redis。单位:ms。

  3. 查看作业管理界面,作业状态为“运行中”。
  4. 参考管理Kafka主题中的消息,执行以下命令查看Sink表中是否接收到数据,即5执行完成后查看Kafka topic是否正常写入数据。

    sh kafka-console-consumer.sh --topic 主题名称 --bootstrap-server Kafka的Broker实例业务IP:Kafka端口号 --consumer.config /opt/client/Kafka/kafka/config/consumer.properties

  5. 参考管理Kafka主题中的消息,查看Topic并向Kafka中写入数据,输入完成后可在4中的窗口查看执行结果。

    ./kafka-topics.sh --list --zookeeper ZooKeeper的quorumpeer实例业务IP:ZooKeeper客户端端口号/kafka

    sh kafka-console-producer.sh --broker-list Kafka角色实例所在节点的IP地址:Kafka端口号 --topic 主题名称 --producer.config 客户端目录/Kafka/kafka/config/producer.properties

    例如本示例使用主题名称为user_source:sh kafka-console-producer.sh --broker-list Kafka角色实例所在节点的IP地址:Kafka端口号 --topic user_source --producer.config /opt/client/Kafka/kafka/config/producer.properties

    输入消息内容:
    {"account": "A1","costs":"11"}
    {"account": "A1","costs":"22"}
    {"account": "A2","costs":"33"}
    {"account": "A3","costs":"44"}

    输入完成后按回车发送消息。

    • ZooKeeper的quorumpeer实例业务IP:

      ZooKeeper服务所有quorumpeer实例业务IP。登录FusionInsight Manager,选择“集群 > 服务 > ZooKeeper > 实例”,可查看所有quorumpeer实例所在主机业务IP地址。

    • ZooKeeper客户端端口号:

      登录FusionInsight Manager,选择“集群 > 服务 > ZooKeeper”,在“配置”页签查看“clientPort”的值。

  6. 执行以下命令登录Redis客户端查询结果,以查询“redis_table_2:A1”为例。

    redis-cli -c -h Redis集群中其中一个实例业务IP -p Redis端口号

    get redis_table_2:A1

场景二:Redis作为维表。

  1. 使用flink_admin登录Manager,选择“集群 > 服务 > Flink”,在“Flink WebUI”右侧,单击链接,访问Flink的WebUI。
  2. 参考新建作业,新建Flink SQL流作业,在作业开发界面进行作业开发,配置完成后启动作业。

    需勾选“基础参数”中的“开启CheckPoint”,“时间间隔(ms)”可设置为“60000”,“模式”可使用默认值。

    CREATE TABLE KafkaSource ( -- Kafka作为source表
      `user_id` VARCHAR,
      `user_name` VARCHAR,
      `age` double,
       proctime as proctime()
    ) WITH (
      'connector' = 'kafka',
      'topic' = 'user_source',
      'properties.bootstrap.servers' = 'Kafka的Broker实例业务IP:Kafka端口号',
      'properties.group.id' = 'testGroup',
      'scan.startup.mode' = 'latest-offset',
      'value.format' = 'csv',
      'properties.sasl.kerberos.service.name' = 'kafka',
      'properties.security.protocol' = 'SASL_PLAINTEXT',
      'properties.kerberos.domain.name' = 'hadoop.系统域名'
    );
    CREATE TABLE KafkaSink (  -- Kafka作为sink表
      `user_id` VARCHAR,
      `user_name` VARCHAR,
      `age` double,
      `phone_number` VARCHAR,
      `address` VARCHAR
    ) WITH (
      'connector' = 'kafka',
      'topic' = 'user_sink',
      'properties.bootstrap.servers' = 'Kafka的Broker实例业务IP:Kafka端口号',
      'properties.group.id' = 'testGroup',
      'scan.startup.mode' = 'latest-offset',
      'value.format' = 'csv',
      'properties.sasl.kerberos.service.name' = 'kafka',
      'properties.security.protocol' = 'SASL_PLAINTEXT',
      'properties.kerberos.domain.name' = 'hadoop.系统域名'
    );
    CREATE TABLE RedisTable ( -- Redis作为维表
      user_name VARCHAR,
      score double,
      phone_number VARCHAR,
      address VARCHAR
    ) WITH (
      'connector' = 'redis',
      'deploy-mode'='cluster',
      'need-kerberos-auth' = 'true',
      'service-kerberos-name' = 'redis/hadoop.系统域名',
      'login-context-name' = 'Client',
      'zset-score-column' = 'score',
      'host' = '10.10.10.169',
      'port' = '22400',
      'isSSLMode' = 'true',
      'key-ttl-mode' = 'no-ttl',
      'data-type' = 'sorted-set',
      'namespace' = 'redis_zset',
      'zset-delimiter' = ',',
      'key-column' = 'user_name',
      'schema-syntax' = 'concatenate-fields'
    );
    
    INSERT INTO
      KafkaSink
    SELECT
      t.user_id,
      t.user_name,
      t.age,
      d.phone_number,
      d.address
    FROM
      KafkaSource as t
      JOIN RedisTable FOR SYSTEM_TIME AS OF t.proctime as d ON t.user_name = d.user_name;
      -- 必须加上FOR SYSTEM_TIME AS OF t.proctime,表示JOIN维表当前时刻所看到的每条数据

  3. 执行以下命令向Redis维表中写入测试数据。

    cd /opt/client/Redis/bin

    ./redis-cli -h 10.10.10.11 -p 22400 -c

    输入消息内容:

    ZADD redis_zset:zhangsan 80 153xxxx1111,city1
    ZADD redis_zset:lisi 70 153xxxx2222,city2
    ZADD redis_zset:wangwu 90 153xxxx3333,city3

    若Redis启用通道加密,使用命令:./redis-cli -h 10.10.10.11 -p 22400 --tls -c

    登录Manager,选择“集群 > 服务 > Redis > 配置 > 全部配置”,搜索“REDIS_SSL_ON”,将参数“值”设置为“true”,Redis启用SSL通道加密。通道加密在数据传输中对数据进行加密保护,会损耗性能,Redis中无重要或敏感信息不建议开启。

  4. 生产数据,写入Kafka Source表中。

    sh kafka-console-producer.sh --broker-list Kafka角色实例所在节点的IP地址:Kafka端口号 --topic 主题名称 --producer.config 客户端目录/Kafka/kafka/config/producer.properties

    输入消息内容:
    1,zhangsan,20
    2,lisi,25
    3,wangwu,28

  5. 执行以下命令查看Sink表中是否接收到数据,即查看Kafka topic是否正常写入数据。

    sh kafka-console-consumer.sh --topic 主题名称 --bootstrap-server Kafka的Broker实例业务IP:Kafka端口号 --consumer.config 客户端目录/Kafka/kafka/config/consumer.properties

    结果如下:

    1,zhangsan,20,153xxxx1111,city1
    2,lisi,25,153xxxx2222,city2
    3,wangwu,28,153xxxx3333,city3

WITH主要参数说明

表1 WITH主要参数说明

配置项

是否必选

类型

描述

zSetScoreColumn

可选

String

Redis作为维表时,ZSet格式score字段对应的列名

hashKeyColumn

可选

String

Hash格式,Hash字段对应的列名

host

必选

String

Redis集群连接IP,为Redis集群的实例IP(业务平面)

port

必选

String

端口为对应的Redis实例的端口

Redis实例的端口计算方式为:22400+该实例的ID-1

实例ID可以通过在FusionInsight Manager中选择“集群 > 待操作集群的名称 > 服务 > Redis > Redis管理”,单击Redis集群名称查看

例如Redis集群内角色R1对应的Redis实例的端口为22400+1-1=22400

separator

可选

String

Redis作为维表时,value中的字段分割符,示例:“(,)”、“(\u200b)”

key-ttl-mode

可选

String

Redis数据过期策略:

  • no-ttl:数据不过期
  • expire-msec: 指定多长时间之后数据过期,以毫秒为单位
  • expire-at-date:到指定时间数据过期,精确到秒
  • expire-at-timestamp:到指定时间数据过期,精确到毫秒

key-ttl

可选

String

配置“key-ttl-mode”参数为非“no-ttl”时需设置该值,该值不需要带单位。

isSSLMode

可选

String

是否开启SSL模式

  • true:开启SSL模式
  • false:不开启SSL模式

keyPrefix

可选

String

Redis key的前缀