文档首页/ MapReduce服务 MRS/ 组件操作指南(普通版)/ 使用Kafka/ Kafka常见问题/ 如何使用Python3.x对接开启Kerberos认证的Kafka集群
更新时间:2025-12-26 GMT+08:00
分享

如何使用Python3.x对接开启Kerberos认证的Kafka集群

问题描述

通过Python3.x环境如何对接开启Kerberos认证的Kafka集群?

处理步骤

  1. 登录MRS集群Master节点,执行如下命令,配置华为云欧拉镜像源。

    wget http://mirrors.myhuaweicloud.com/repo/mirrors_source.sh && sh mirrors_source.sh

  2. 执行如下命令,编译Python3.x。

    yum groupinstall "Development tools" -y
    yum -y install zlib zlib-devel
    yum -y install bzip2 bzip2-devel
    yum -y install ncurses ncurses-devel
    yum -y install readline readline-devel
    yum -y install openssl openssl-devel
    yum -y install openssl-static
    yum -y install xz lzma xz-devel
    yum -y install sqlite sqlite-devel
    yum -y install gdbm gdbm-devel
    yum -y install tk tk-devel
    yum -y install libffi libffi-devel

  3. 编译成功后,执行如下命令,下载解压Python3.x的tgz包。

    wget https://www.python.org/ftp/python/3.6.7/Python-3.6.7.tgz
    tar -zxvf Python-3.6.7.tgz
    cd Python-3.6.7

    Python3.x的tgz包也可以去Python官网下载。推荐使用Python-3.6.X版本,3.7版本无法使用rdd的take函数。

  4. 执行如下命令,设置Python3.x的配置信息及编译安装,例如安装到/opt/Bigdata/python3目录下安装的目录可以自定义

    ./configure --prefix=/opt/Bigdata/python3 --enable-shared CFLAGS=-fPIC
    make && make install

  5. 执行如下命令,配置Python3.x变量。配置变量的目录需要跟4中安装的目录保持一致。

    echo "/opt/Bigdata/python3/lib" >> /etc/ld.so.conf
    ldconfig
    ln -s /opt/Bigdata/python3/bin/python3 /usr/bin/python3
    ln -s /opt/Bigdata/python3/bin/pip3 /usr/bin/pip3

  6. 配置成功后,执行如下命令,在Python3.x环境中安装Kafka组件。

    cp /usr/include/gssapi/* /home/omm/kerberos/include/gssapi/
    pip3 install kafka-python
    pip3 install gssapi

  7. 安装成功后,执行以下命令配置环境变量

    source 客户端安装目录/bigdata_env

  8. 执行以下命令认证当前用户。kinit命令使用的用户为登录Manager的用户,此用户需要具有Kafka用户组相关权限。

    kinit Kafka用户

  9. 执行Python3.x脚本样例。

    脚本样例:

    producer:
    from kafka import KafkaProducer
    producer = KafkaProducer(bootstrap_servers=["broker_ip:21007"],
    security_protocol="SASL_PLAINTEXT",
    sasl_mechanism="GSSAPI",
    sasl_kerberos_service_name="kafka",
    sasl_kerberos_domain_name="hadoop.hadoop.com")
    for _ in range(100):
    response = producer.send("test-topic", b"testmessage")
    result = response.get(timeout=50)
    print(result)
    
    consumer:
    from kafka import KafkaConsumer
    consumer = KafkaConsumer("test-topic",
    bootstrap_servers=["broker_ip:21007"],
    group_id="test-group",
    enable_auto_commit="true",
    security_protocol="SASL_PLAINTEXT",
    sasl_mechanism="GSSAPI",
    sasl_kerberos_service_name="kafka",
    sasl_kerberos_domain_name="hadoop.hadoop.com")
    for message in consumer:
    print(message)

相关文档