• 【starters】springboot-starter整合阿里云datahub


    文章目录

    DataHub 类似于传统大数据解决方案中 Kafka 的角色,提供了一个数据队列功能。
    DataHub 除了供了一个缓冲的队列作用。同时由于 DataHub 提供了各种与其他阿里云
    上下游产品的对接功能,所以 DataHub 又扮演了一个数据的分发枢纽工作。

    在这里插入图片描述
    datahub提供了开发者生产和消费的sdk,在平时的开发中往往会写很多重复的代码,我们可以利用springboot为我们提供的自定义starter的方式,模仿springboot官方的starter组件实现方式,来封装一个更高效简单易用的starter组件,实现开箱即用。

    本文仅提供核心思路实现供学习使用,应根据自己所在公司开发习惯做定制开发

    1. 功能介绍

    1.无需关心DataHub底层如何操作,安心编写业务代码即可进行数据的获取和上传,

    2.类似RabbitMQ的starter,通过注解方式,Listener和Handler方式进行队列消费

    3.支持游标的上次记忆功能

    
          cry-starters-projects
          cn.com.cry.starters
          2022-1.0.0
    
    
    • 1
    • 2
    • 3
    • 4
    • 5

    2.快速开始

    2.1 启动客户端

    配置阿里云DataHub的endpoint以及AK信息

    aliyun:
      datahub:
      	# 开启功能
      	havingValue: true
        #是否为私有云
        isPrivate: false
        accessId: xxx
        accessKey: xxx
        endpoint: xxx
        #连接DataHub客户端超时时间
        conn-timeout: 10000
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    启动SpringBoot,你会发现datahub客户端已经启动完毕

    2.2 获取DataHub客户端

    DatahubClient datahubClient=DataHubTemplate.getDataHubClient();
    
    • 1

    2.3 写数据

    public int write(@RequestParam("id") Integer shardId) {
        List datas = new ArrayList<>();
        for (int i = 0; i < 10; i++) {
            Student s = new Student();
            s.setAge(i);
            s.setName("name-" + i);
            s.setAddress("address-" + i);
            datas.add(s);
        }
        int successNumbers = DataHubTemplate.write("my_test", "student", datas, shardId);
        return successNumbers;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    以上示例代码表示往 projectName为my_test, topicName为student, shardId 为N的hub里写数据,并且返回插入成功的条数

    2.4 读数据

    读数据开发的逻辑类似RabbitMq的starter,使用@DataHubListener和@DataHubHandler处理器注解进行使用

    @Component
    @DataHubListener(projectName = "my_test")
    public class ReadServiceImpl {
    
        @DataHubHandler(topicName = "student", shardId = 0, cursorType = CursorTypeWrapper.LATEST)
        public void handler(Message message) {
            System.out.println("读取到shardId=0的消息");
            System.out.println(message.getData());
            System.out.println(message.getCreateTsime());
            System.out.println(message.getSize());
            System.out.println(message.getConfig());
            System.out.println(message.getMessageId());
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    以上代码说明: 通过LATEST游标的方式,监听 project=my_test ,topicName=student,shardId=0 ,最终通过Message的包装类拿到dataHub实时写入的数据。

    这边可以设置多种游标类型,例如根据最新的系统时间、最早录入的序号等

    3. 核心代码

    首先需要一个DataHubClient增强类,在SpringBoot启动时开启一个线程来监听对应的project-topic-shardingId,根据游标规则来读取当前的cursor进行数据的读取。

    public class DataHubClientWrapper implements InitializingBean, DisposableBean {
    
        @Autowired
        private AliyunAccountProperties properties;
    
        @Autowired
        private ApplicationContext context;
    
        private DatahubClient datahubClient;
    
    
        public DataHubClientWrapper() {
    
        }
    
        /**
         * 执行销毁方法
         *
         * @throws Exception
         */
        @Override
        public void destroy() throws Exception {
            WorkerResourceExecutor.shutdown();
        }
    
        @Override
        public void afterPropertiesSet() throws Exception {
    
            /**
             * 创建DataHubClient
             */
            this.datahubClient = DataHubClientFactory.create(properties);
    
            /**
             * 打印Banner
             */
            BannerUtil.printBanner();
    
            /**
             * 赋值Template的静态对象dataHubClient
             */
            DataHubTemplate.setDataHubClient(datahubClient);
    
            /**
             * 初始化Worker线程
             */
            WorkerResourceExecutor.initWorkerResource(context);
            /**
             * 启动Worker线程
             */
            WorkerResourceExecutor.start();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53

    写数据,构建了一个类似RedisDataTemplate的模板类,封装了write的逻辑,调用时只需要用DataHubTemplate.write调用

    public class DataHubTemplate {
    
        private static DatahubClient dataHubClient;
    
        private final static Logger logger = LoggerFactory.getLogger(DataHubTemplate.class);
    
        /**
         * 默认不开启重试机制
         *
         * @param projectName
         * @param topicName
         * @param datas
         * @param shardId
         * @return
         */
        public static int write(String projectName, String topicName, List datas, Integer shardId) {
            return write(projectName, topicName, datas, shardId, false);
        }
    
        /**
         * 往指定的projectName以及topic和shard下面写数据
         *
         * @param projectName
         * @param topicName
         * @param datas
         * @param shardId
         * @param retry
         * @return
         */
        private static int write(String projectName, String topicName, List datas, Integer shardId, boolean retry) {
            RecordSchema recordSchema = dataHubClient.getTopic(projectName, topicName).getRecordSchema();
            List recordEntries = new ArrayList<>();
            for (Object o : datas) {
                RecordEntry entry = new RecordEntry();
                Map data = BeanUtil.beanToMap(o);
                TupleRecordData tupleRecordData = new TupleRecordData(recordSchema);
                for (String key : data.keySet()) {
                    tupleRecordData.setField(key, data.get(key));
                }
                entry.setRecordData(tupleRecordData);
                entry.setShardId(String.valueOf(shardId));
                recordEntries.add(entry);
            }
            PutRecordsResult result = dataHubClient.putRecords(projectName, topicName, recordEntries);
            int failedRecordCount = result.getFailedRecordCount();
            if (failedRecordCount > 0 && retry) {
                retry(dataHubClient, result.getFailedRecords(), 1, projectName, topicName);
            }
            return datas.size() - failedRecordCount;
        }
    
        /**
         * @param client
         * @param records
         * @param retryTimes
         * @param project
         * @param topic
         */
        private static void retry(DatahubClient client, List records, int retryTimes, String project, String topic) {
            boolean suc = false;
            List failedRecords = records;
            while (retryTimes != 0) {
                logger.info("the time to send message has [{}] records failed, is starting retry", records.size());
                retryTimes = retryTimes - 1;
                PutRecordsResult result = client.putRecords(project, topic, failedRecords);
                int failedNum = result.getFailedRecordCount();
                if (failedNum > 0) {
                    failedRecords = result.getFailedRecords();
                    continue;
                }
                suc = true;
                break;
            }
            if (!suc) {
                logger.error("DataHub send message retry failure");
            }
        }
    
        public static DatahubClient getDataHubClient() {
            return dataHubClient;
        }
    
        public static void setDataHubClient(DatahubClient dataHubClient) {
            DataHubTemplate.dataHubClient = dataHubClient;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86

    读数据,需要在Spring启动时开启一个监听线程DataListenerWorkerThread,执行一个死循环不停轮询DataHub下的对应通道。

    public class DataListenerWorkerThread extends Thread {
        private final static Logger logger = LoggerFactory.getLogger(DataListenerWorkerThread.class);
        private volatile boolean init = false;
        private DatahubConfig config;
        private String workerKey;
        private int recordLimits;
        private int sleep;
        private RecordSchema recordSchema;
        private RecordHandler recordHandler;
        private CursorHandler cursorHandler;
    
        public DataListenerWorkerThread(String projectName, String topicName, int shardId, CursorTypeWrapper cursorType, int recordLimits, int sleep, int sequenceOffset, String startTime, StringRedisTemplate redisTemplate) {
            this.config = new DatahubConfig(projectName, topicName, shardId);
            this.workerKey = projectName + "-" + topicName + "-" + shardId;
            this.cursorHandler = new CursorHandler(cursorType, sequenceOffset, startTime, redisTemplate, workerKey);
            this.recordLimits = recordLimits;
            this.sleep = sleep;
            this.setName("DataHub-Worker");
            this.setDaemon(true);
        }
    
        @Override
        public void run() {
            initRecordSchema();
            String cursor = cursorHandler.positioningCursor(config);
            for (; ; ) {
                try {
                    GetRecordsResult result = DataHubTemplate.getDataHubClient().getRecords(config.getProjectName(), config.getTopicName(), String.valueOf(config.getShardId()), recordSchema, cursor, recordLimits);
                    if (result.getRecordCount() <= 0) {
                        // 无数据,sleep后读取
                        Thread.sleep(sleep);
                        continue;
                    }
                    List> dataMap = recordHandler.convert2List(result.getRecords());
                    logger.info("receive [{}] records from project:[{}] topic:[{}] shard:[{}]", dataMap.size(), config.getProjectName(), config.getTopicName(), config.getShardId());
                    // 拿到下一个游标
                    cursor = cursorHandler.nextCursor(result);
                    //执行方法
                    WorkerResourceExecutor.invokeMethod(workerKey, JsonUtils.toJson(dataMap), dataMap.size(), config, cursor);
                } catch (InvalidParameterException ex) {
                    //非法游标或游标已过期,建议重新定位后开始消费
                    cursor = cursorHandler.resetCursor(config);
                    logger.error("get Cursor error and reset cursor localtion ,errorMessage:{}", ex.getErrorMessage());
                } catch (DatahubClientException e) {
                    logger.error("DataHubException:{}", e.getErrorMessage());
                    this.interrupt();
                } catch (InterruptedException e) {
                    logger.info("daemon thread {}-{} interrupted", this.getName(), this.getId());
                } catch (Exception e) {
                    this.interrupt();
                    logger.error("receive DataHub records cry.exception:{}", e, e);
                }
            }
        }
    
        /**
         * 终止
         */
        public void shutdown() {
            if (!interrupted()) {
                interrupt();
            }
        }
    
        /**
         * 初始化topic字段以及recordSchema
         */
        private void initRecordSchema() {
            try {
                if (!init) {
                    recordSchema = DataHubTemplate.getDataHubClient().getTopic(config.getProjectName(), config.getTopicName()).getRecordSchema();
                    List fields = recordSchema.getFields();
                    this.recordHandler = new RecordHandler(fields);
                    init = true;
                }
            } catch (Exception e) {
                logger.error("initRecordSchema error:{}", e, e);
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80

    read的时候结合了注解开发,通过定义类注解DataHubListener和方法注解DataHubHandler内置属性,来动态的控制需要在哪些方法中处理监听到的数据的逻辑:

    DataHubHandler

    @Target(ElementType.METHOD)
    @Retention(RetentionPolicy.RUNTIME)
    @Documented
    public @interface DataHubHandler {
        /**
         * 话题名称
         *
         * @return
         */
        String topicName();
    
        /**
         * shardId
         *
         * @return
         */
        int shardId();
    
        /**
         * 最大数据量限制
         *
         * @return
         */
        int recordLimit() default 1000;
    
        /**
         * 游标类型
         *
         * @return
         */
        CursorTypeWrapper cursorType() default CursorTypeWrapper.LATEST;
    
        /**
         * 若未监听到数据添加,休眠时间 ms
         *
         * @return
         */
        int sleep() default 10000;
    
        /**
         * 使用CursorType.SYSTEM_TIME的时候配置 时间偏移量
         *
         * @return
         */
        String startTime() default "";
    
        /**
         * 使用使用CursorType.SEQUENCE的时候配置,偏移量,必须是正整数
         *
         * @return
         */
        int sequenceOffset() default 0;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53

    DataHubListener

    @Target(ElementType.TYPE)
    @Retention(RetentionPolicy.RUNTIME)
    @Documented
    public @interface DataHubListener {
        String projectName();
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    最后我们需要启动SpringBootStarter的EnableConfigurationProperties 功能,通过配置文件来控制default-bean的开启或者关闭。

    启动类:

    @Configuration
    @EnableConfigurationProperties(value = {AliyunAccountProperties.class})
    public class DataHubClientAutoConfiguration {
        /**
         * 初始化dataHub装饰bean
         *
         * @return
         */
        @Bean
        public DataHubClientWrapper dataHubWrapper() {
            return new DataHubClientWrapper();
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    属性配置类

    @ConditionalOnProperty(prefix = "aliyun.datahub",havingValue = "true")
    @Data
    public class AliyunAccountProperties implements Properties{
    
        /**
         * http://xxx.aliyuncs.com
         */
        private String endpoint;
    
        /**
         * account
         */
        private String accessId;
    
        /**
         * password
         */
        private String accessKey;
    
        /**
         * private cloud || public cloud
         */
        private boolean isPrivate;
    
        /**
         * unit: ms
         */
        private Integer connTimeout = 10000;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29

    最后记得要做成一个starter,在resources下新建一个META-INF文件夹,新建一个spring.factories文件,

    org.springframework.boot.autoconfigure.EnableAutoConfiguration= 
    
    • 1

    cry.starter.datahub.DataHubClientAutoConfiguration

    大体逻辑就是这样了,你学会了吗? hhhhhhhhh~

    先自我介绍一下,小编13年上师交大毕业,曾经在小公司待过,去过华为OPPO等大厂,18年进入阿里,直到现在。深知大多数初中级java工程师,想要升技能,往往是需要自己摸索成长或是报班学习,但对于培训机构动则近万元的学费,着实压力不小。自己不成体系的自学效率很低又漫长,而且容易碰到天花板技术停止不前。因此我收集了一份《java开发全套学习资料》送给大家,初衷也很简单,就是希望帮助到想自学又不知道该从何学起的朋友,同时减轻大家的负担。添加下方名片,即可获取全套学习资料哦

  • 相关阅读:
    Java学习到面试所遇的小小问题
    C++语法基础(1)——编程入门
    Python数据分析实战-实现卡方检验(附源码和实现效果)
    百题千解计划【CSDN每日一练】订班服(附解析+多种实现方法:Python、Java、C、C++、C#、Go、JavaScript)
    股票推荐系统,并查集
    java commons-io类库常用方法
    登录页面案例
    【超图+CESIUM】【基础API使用示例】49、超图|CESIUM -自定义按钮操作视角上下左右东西南北移动|修改覆盖罗盘的上下左右东西南北的视角移动
    linux部署SpringBoot项目
    Dijkstra算法基础详解,附有练习题
  • 原文地址:https://blog.csdn.net/m0_67391401/article/details/126080689