安全集群中使用Python3.x对接Kafka
用户问题
通过Python3.x环境如何对接开启Kerberos认证的Kafka集群?
问题现象
客户想使用Python3.x的环境对接开启Kerberos认证的Kafka的集群。
处理步骤
- 登录Master节点,执行如下命令,配置华为云欧拉镜像源。
wget http://mirrors.myhuaweicloud.com/repo/mirrors_source.sh && sh mirrors_source.sh
- 执行如下命令,编译Python3.x。
yum groupinstall "Development tools" -y
yum -y install zlib zlib-devel
yum -y install bzip2 bzip2-devel
yum -y install ncurses ncurses-devel
yum -y install readline readline-devel
yum -y install openssl openssl-devel
yum -y install openssl-static
yum -y install xz lzma xz-devel
yum -y install sqlite sqlite-devel
yum -y install gdbm gdbm-devel
yum -y install tk tk-devel
yum -y install libffi libffi-devel
- 编译成功后,执行如下命令,下载解压Python3.x的tgz包。
wget https://www.python.org/ftp/python/3.6.7/Python-3.6.7.tgz
tar -zxvf Python-3.6.7.tgz
cd Python-3.6.7
Python3.x的tgz包也可以去Python官网下载。推荐使用Python-3.6.X版本,3.7版本无法使用rdd的take函数。
- 执行如下命令,设置Python3.x的配置信息及编译安装,安装到/opt/Bigdata/python3目录下。
./configure --prefix=/opt/Bigdata/python3 --enable-shared CFLAGS=-fPIC
make && make install
安装的目录可以自定义。
- 执行如下命令,配置Python3.x变量。
echo "/opt/Bigdata/python3/lib" >> /etc/ld.so.conf
ldconfig
ln -s /opt/Bigdata/python3/bin/python3 /usr/bin/python3
ln -s /opt/Bigdata/python3/bin/pip3 /usr/bin/pip3
配置变量的目录需要跟4中安装的目录保持一致。
- 配置成功后,执行如下命令,在Python3.x环境中安装Kafka组件。
cp /usr/include/gssapi/* /home/omm/kerberos/include/gssapi/
pip3 install kafka-python
pip3 install gssapi
- 安装成功后,执行以下命令配置环境变量。
source 客户端安装目录/bigdata_env
- 执行以下命令认证当前用户。
kinit Kafka用户
kinit命令使用的用户为登录Manager的用户,此用户需要具有Kafka用户组相关权限。
- 执行Python3.x脚本样例。
脚本样例:
producer: from kafka import KafkaProducer producer = KafkaProducer(bootstrap_servers=["broker_ip:21007"], security_protocol="SASL_PLAINTEXT", sasl_mechanism="GSSAPI", sasl_kerberos_service_name="kafka", sasl_kerberos_domain_name="hadoop.hadoop.com") for _ in range(100): response = producer.send("test-topic", b"testmessage") result = response.get(timeout=50) print(result) consumer: from kafka import KafkaConsumer consumer = KafkaConsumer("test-topic", bootstrap_servers=["broker_ip:21007"], group_id="test-group", enable_auto_commit="true", security_protocol="SASL_PLAINTEXT", sasl_mechanism="GSSAPI", sasl_kerberos_service_name="kafka", sasl_kerberos_domain_name="hadoop.hadoop.com") for message in consumer: print(message)