配置对接Kafka
Flink样例工程的数据存储在Kafka组件中。向Kafka组件发送数据(需要有Kafka权限用户),并从Kafka组件接收数据。
- 确保集群安装完成,包括HDFS、Yarn、Flink和Kafka。
- 创建Topic。
- 用户使用Linux命令行创建topic,执行命令前需要使用kinit命令进行人机认证,如kinit flinkuser。
flinkuser需要用户自己创建,并拥有创建Kafka的topic权限。
创建topic的命令格式:{zkQuorum}表示ZooKeeper集群信息,格式为IP:port。{Topic}表示Topic名称。
bin/kafka-topics.sh --create --zookeeper {zkQuorum}/kafka --replication-factor 1 --partitions 5 --topic {Topic}
例如此处以topic1的数据为例:/opt/client/Kafka/kafka/bin/kafka-topics.sh --create --zookeeper 10.96.101.32:2181,10.96.101.251:2181,10.96.101.177:2181,10.91.8.160:2181/kafka --replication-factor 1 --partitions 5 --topic topic1
- 服务端topic权限配置。
将Kafka的Broker配置参数“allow.everyone.if.no.acl.found”的值修改为“true”。
- 用户使用Linux命令行创建topic,执行命令前需要使用kinit命令进行人机认证,如kinit flinkuser。
- 安全认证。
安全认证的方式有三种:Kerberos认证、SSL加密认证和Kerberos+SSL模式认证,用户在使用的时候可任选其中一种方式进行认证。
- Kerberos认证配置
- 客户端配置。
在Flink配置文件“flink-conf.yaml”中,增加kerberos认证相关配置(主要在“contexts”项中增加“KafkaClient”),示例如下:
security.kerberos.login.keytab: /home/demo//keytab/flinkuser.keytab security.kerberos.login.principal: flinkuser security.kerberos.login.contexts: Client,KafkaClient security.kerberos.login.use-ticket-cache: false
- 运行参数。
关于“SASL_PLAINTEXT”协议的运行参数示例如下:
--topic topic1 --bootstrap.servers 10.96.101.32:21007 --security.protocol SASL_PLAINTEXT --sasl.kerberos.service.name kafka //10.96.101.32:21007表示kafka服务器的IP:port
- 客户端配置。
- SSL加密配置
- 服务端配置
登录FusionInsight Manager页面,选择“集群 > 服务 > Kafka > 配置”,参数类别设置为“全部配置”,搜索“ssl.mode.enable”并配置为“true”。
- 客户端配置
- 登录集群的FusionInsight Manager,选择“集群 > 待操作的集群名称 > 服务 > Kafka > 更多 > 下载客户端”,下载客户端压缩文件到本地机器。
- 使用客户端根目录中的“ca.crt”证书文件生成客户端的“truststore”。
执行命令如下:
keytool -noprompt -import -alias myservercert -file ca.crt -keystore truststore.jks
命令执行结果查看:
- 运行参数。
“ssl.truststore.password”参数内容需要跟创建“truststore”时输入的密码保持一致,执行以下命令运行参数。
--topic topic1 --bootstrap.servers 10.96.101.32:9093 --security.protocol SSL --ssl.truststore.location /home/zgd/software/FusionInsight_Kafka_ClientConfig/truststore.jks --ssl.truststore.password XXX
- 服务端配置
- Kerberos+SSL模式配置
完成上文中Kerberos和SSL各自的服务端和客户端配置后,只需要修改运行参数中的端口号和协议类型即可启动Kerberos+SSL模式。
--topic topic1 --bootstrap.servers 10.96.101.32:21009 --security.protocol SASL_SSL --sasl.kerberos.service.name kafka --ssl.truststore.location /home/zgd/software/FusionInsight_Kafka_ClientConfig/truststore.jks --ssl.truststore.password XXX
- Kerberos认证配置