• pulsar自定义认证插件开发


    pulsar自定义认证插件开发

    pulsar的权限分为两部分:

    • 1 客户端连接时认证的权限

    • 2 客户端发送和订阅主题的权限

    pulsar开启认证功能

    默认情况下,pulsar是不会开启客户端连接认证的,即客户端到broker之间、broker到broker之间的访问都没有任何限制。但是在线上环境中,对于权限的控制往往是很重要的。

    修改配置,开启认证功能
    standalone.conf 或 broker.conf

    #开启认证
    authenticationEnabled=true
    
    #认证处理类: 可以提供N个处理认证的处理类,逗号分隔, 然后broker接收到客户端连接后就会调用此类的方法进行处理
    authenticationProviders=com.beyond.auth.AuthenticationProviderMysql
    
    # 这里是broker与broker之间连接时的认证,broker客户端用到的处理类。通常可以和客户端的认证处理类一样。
    # 正式项目中,需要在broker端判断是哪一种连接,分别做好权限认证。
    # broker与broker之间认证最好使用admin权限,它们之前可能需要同步数据等
    brokerClientAuthenticationPlugin=com.beyond.auth.AuthenticationMysql
    brokerClientAuthenticationParameters=e3592948c:admin:123456
    
    #配置超级权限
    superUserRoles=admin
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    自定义认证插件开发

    pulsar提供了Authentication接口。为我们自定义认证实现接口

    认证分为客户端和服务端,服务端即broker,客户端即我们自己编写的producer或者consumer。
    服务端需要实现org.apache.pulsar.broker.authentication.AuthenticationProvider接口,客户端需要实现org.apache.pulsar.client.api.Authentication。

    下面分别给出服务端和客户端的代码。

    服务端认证代码

    package com.beyond.auth;
    
    import org.apache.pulsar.broker.ServiceConfiguration;
    import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
    import org.apache.pulsar.broker.authentication.AuthenticationProvider;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import javax.naming.AuthenticationException;
    import java.io.IOException;
    import java.util.Set;
    
    public class AuthenticationProviderMysql implements AuthenticationProvider {
    
        private static final Logger log = LoggerFactory.getLogger(AuthenticationProviderMysql.class);
    
        private static final String methodName = "mysql";
    
        private String header = "auth";
    
        @Override
        public void initialize(ServiceConfiguration config) throws IOException {
            log.info("initialize##################" + methodName + "####################");
            if (config == null) {
                return;
            }
    
            Set<String> superRoles = config.getSuperUserRoles();
            if (superRoles == null) {
                return;
            }
            for (String role : superRoles) {
                log.info("initialize##################" + methodName + "#####222222#######" + role);
            }
        }
    
        @Override
        public String getAuthMethodName() {
            log.info("getAuthMethodName##################" + methodName + "################");
            return methodName;
        }
    
        @Override
        public String authenticate(AuthenticationDataSource authData) throws AuthenticationException {
            log.info("authenticate##################" + methodName + "#################");
    
            String roleToken = "unknown";
            if (authData.hasDataFromCommand()) {
                roleToken = authData.getCommandData();
            } else if (authData.hasDataFromHttp()) {
                roleToken = authData.getHttpHeader(header);
            } else {
                throw new AuthenticationException("Authentication data source does not have a role token############");
            }
    
            log.info("authenticate##################" + methodName + "####3333############" + roleToken);
    	
    	//查询数据库,或调用接口,判断roleToken的正确性,并返回role
            //String aaa = MysqlConfig.queryToken("select bbb from test where aaa = ? ", token);
            System.out.println("====##################======mysql==" + aaa);
            return aaa;
        }
    
        @Override
        public void close() {
            log.info("close##################" + methodName + "##############");
        }
    }
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70

    客户端连接认证代码

    package com.beyond.auth;
    
    import org.apache.pulsar.client.api.Authentication;
    import org.apache.pulsar.client.api.AuthenticationDataProvider;
    import org.apache.pulsar.client.api.EncodedAuthenticationParameterSupport;
    import org.apache.pulsar.client.api.PulsarClientException;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.io.IOException;
    import java.util.Map;
    
    public class AuthenticationMysql implements Authentication, EncodedAuthenticationParameterSupport {
    
        private static final Logger log = LoggerFactory.getLogger(AuthenticationMysql.class);
    
        private static final String methodName = "mysql";
    
        @Override
        public String getAuthMethodName() {
            log.info("getAuthMethodName $$$$$$$$$" + methodName + "$$$$$$$$$$$$$$");
            return methodName;
        }
    
        @Override
        public AuthenticationDataProvider getAuthData() throws PulsarClientException {
            log.info("getAuthData $$$$$$$$$" + methodName + "$$$$$$$$$$$$$");
            return new AuthenticationDataMysql();
        }
    
        @Override
        public void configure(String encodedAuthParamString) {
            log.info("configure $$$$$$$$$" + methodName + "@@@@@@@@" + encodedAuthParamString);
        }
    
        @Override
        public void configure(Map<String, String> authParams) {
        }
    
        @Override
        public void start() throws PulsarClientException {
            log.info("start $$$$$$$$$" + methodName + "@@@@@@@");
        }
    
        @Override
        public void close() throws IOException {
            log.info("close $$$$$$$$$" + methodName + "@@@@@@");
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    package com.beyond.auth;
    
    import org.apache.pulsar.client.api.AuthenticationDataProvider;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.util.HashMap;
    import java.util.Map;
    import java.util.Set;
    
    public class AuthenticationDataMysql implements AuthenticationDataProvider {
    
        private static final Logger log = LoggerFactory.getLogger(AuthenticationDataMysql.class);
    
        private static final String methodName = "mysql";
    
        private String header = "auth";
    
        private String token = "liang";
    
        @Override
        public boolean hasDataForHttp() {
            log.info("hasDataForHttp@@@@@@@@" + methodName + "@@@@@@@@@@@@@@");
            return true;
        }
    
        @Override
        public Set<Map.Entry<String, String>> getHttpHeaders() throws Exception {
            log.info("getHttpHeaders@@@@@@@@" + methodName + "@@@@@@@");
            Map<String, String> headers = new HashMap<>();
            headers.put(header, token);
            return headers.entrySet();
    
        }
    
        @Override
        public boolean hasDataFromCommand() {
            log.info("hasDataFromCommand@@@@@@@@" + methodName + "@@@@@@@@@@");
            return true;
        }
    
        @Override
        public String getCommandData() {
            log.info("getCommandData@@@@@@@@" +  methodName + "@@@@@@@@@" + token);
            return token;
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48

    打包部署

    编写好代码后用maven命令打包:
    mvn clean install -Dmaven.test.skip=true

    然后把打好的jar包放到pulsar的lib目录下,如:apache-pulsar-2.8.1/lib,
    重启broker组件即可生效。

    测试自定义认证功能

    maven引入pulsar-client依赖

            <dependency>
                <groupId>org.apache.pulsargroupId>
                <artifactId>pulsar-clientartifactId>
                <version>2.8.0version>
            dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    package com.beyond.auth;
    
    import org.apache.pulsar.client.api.*;
    
    public class TestPulsar {
    
        public static void main(String[] args) throws Exception {
    
            PulsarClient client = PulsarClient.builder()
                    .serviceUrl("pulsar://127.0.0.1:6650")
                    .authentication("com.beyond.auth.AuthenticationMysql", "e3592948c:admin:123456")
                    .build();
    
            //订阅
            ConsumerThread consumerThread = new ConsumerThread(client);
            consumerThread.start();
    
            //发布
            Producer<String> stringProducer = client.newProducer(Schema.STRING)
                    .topic("persistent://my-tenant/my-namespace/aaa")
                    .create();
            for (int i = 0; i < 100; i++) {
                Thread.sleep(1000);
                stringProducer.send(i + "aaaaaaaa111112222333");
            }
    
    
        }
    
    }
    
    class ConsumerThread extends Thread {
    
        private PulsarClient client;
    
        public ConsumerThread(PulsarClient client) throws Exception {
            this.client = client;
        }
    
        @Override
        public void run()  {
            try {
                Consumer consumer = client.newConsumer()
                        .topicsPattern("persistent://my-tenant/my-namespace/aaa")
                        .subscriptionName("my-subscription")
                        .subscriptionType(SubscriptionType.Shared)
                        .subscribe();
    
    
                while (true) {
                    // Wait for a message
                    Message msg = consumer.receive();
    
                    try {
                        // Do something with the message
                        System.out.println(msg.getTopicName() + "222222Message received: " + new String(msg.getData()));
    
                        // Acknowledge the message so that it can be deleted by the message broker
                        consumer.acknowledge(msg);
                    } catch (Exception e) {
                        // Message failed to process, redeliver later
                        consumer.negativeAcknowledge(msg);
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
    
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71

    参考链接:
    https://blog.csdn.net/casuallc/article/details/119151573

  • 相关阅读:
    磁通量概述
    【VUE项目实战】65、配置路由懒加载
    python数据分析与展示--Matplotlib库入门
    服务器怎么关闭防火墙
    python自动化测试中装饰器@unpack、@json_file和@yaml_file源码解析和使用
    【解决方案】多租户技术架构设计入门(一)
    超140支爆款B站恰饭,2022年B站双11战报来了!
    【kafka】——Broker
    冷笑话-1
    C++中const引用的使用
  • 原文地址:https://blog.csdn.net/yinjl123/article/details/133777395