• python写带ACL的kafka集群问题


    背景描述

    日常用python读写kafka验证问题,一直比较稳定。最近运维提供了带ACL的kafka集群,遇到写kafka异常的问题

    异常日志

    ---------------------------------------------------------------------------
    AssertionError                            Traceback (most recent call last)
     in 
          6              sasl_mechanism='SCRAM-SHA-512',
          7              sasl_plain_username='xxx',
    ----> 8              sasl_plain_password='xxx')
          9 
         10 producer.send('opcrm_usertask_likednormal_test', b'test_msg1')
    
    ~/miniconda3/envs/crm-py39/lib/python3.6/site-packages/kafka/producer/kafka.py in __init__(self, **configs)
        345 
        346         client = KafkaClient(metrics=self._metrics, metric_group_prefix='producer',
    --> 347                              **self.config)
        348 
        349         # Get auto-discovered version from client if necessary
    
    ~/miniconda3/envs/crm-py39/lib/python3.6/site-packages/kafka/client_async.py in __init__(self, **configs)
        214                                                self._conns)
        215 
    --> 216         self._bootstrap(collect_hosts(self.config['bootstrap_servers']))
        217 
        218         # Check Broker Version if not set explicitly
    
    ~/miniconda3/envs/crm-py39/lib/python3.6/site-packages/kafka/client_async.py in _bootstrap(self, hosts)
        244                                          state_change_callback=cb,
        245                                          node_id='bootstrap',
    --> 246                                          **self.config)
        247             bootstrap.connect()
        248             while bootstrap.connecting():
    
    ~/miniconda3/envs/crm-py39/lib/python3.6/site-packages/kafka/conn.py in __init__(self, host, port, afi, **configs)
        223         if self.config['security_protocol'] in ('SASL_PLAINTEXT', 'SASL_SSL'):
        224             assert self.config['sasl_mechanism'] in self.SASL_MECHANISMS, (
    --> 225                 'sasl_mechanism must be in ' + ', '.join(self.SASL_MECHANISMS))
        226             if self.config['sasl_mechanism'] == 'PLAIN':
        227                 assert self.config['sasl_plain_username'] is not None, 'sasl_plain_username required for PLAIN sasl'
    
    AssertionError: sasl_mechanism must be in PLAIN, GSSAPI

    问题分析

    看一些示例可以正常运行,另外python的kafka第三方包有几个,怀疑是python安装kafka第三方包或者版本问题

    (crm-py39) [xxx@ip-10-169-49-131 ~]$ pip list | grep kafka
    kafka                1.3.5

    解决方案

    卸载kafka 1.3.5的包,安装kafka-python 2.0.2的包,即可

    (crm-py39) [xxx@ip-10-169-49-131 ~]$ pip list | grep kafka
    kafka                1.3.5
    (crm-py39) [xxx@ip-10-169-49-131 ~]$ pip uninstall kafka
    Found existing installation: kafka 1.3.5
    Uninstalling kafka-1.3.5:
      Would remove:
        /ldap_home/xxx/miniconda3/envs/crm-py39/lib/python3.6/site-packages/kafka-1.3.5.dist-info/*
        /ldap_home/xxx/miniconda3/envs/crm-py39/lib/python3.6/site-packages/kafka/*
    Proceed (Y/n)? y
      Successfully uninstalled kafka-1.3.5
    (crm-py39) [xxx@ip-10-169-49-131 ~]$ pip install kafka-python
    Collecting kafka-python
      Using cached kafka_python-2.0.2-py2.py3-none-any.whl (246 kB)
    Installing collected packages: kafka-python
    Successfully installed kafka-python-2.0.2
    (crm-py39) [xxx@ip-10-169-49-131 ~]$ pip list | grep kafka
    kafka-python         2.0.2

  • 相关阅读:
    十、【VUE基础】样式绑定
    nginx两台负载均衡服务器之间使用keepalived实现高可用
    【洛谷 P1152】欢乐的跳 题解(枚举+位集合)
    值得学习的Linux内核锁(一)
    30天Python入门(第十八天:深入了解Python中的正则表达式)
    修炼k8s+flink+hdfs+dlink(七:flinkcdc)
    python自动化测试全栈之ApiFox批量运行脚本生成报告
    NativeBuferring,一种零分配的数据类型[下篇]
    漏洞练习环境搭建笔记
    HttpClient 在vivo内销浏览器的高并发实践优化
  • 原文地址:https://blog.csdn.net/L13763338360/article/details/126290793