• springboot整合TDengine实现数据订阅——多线程快速消费


    一、TDengine数据订阅服务

    为了帮助应用实时获取写入 TDengine 的数据,或者以事件到达顺序处理数据,TDengine 提供了类似消息队列产品的数据订阅、消费接口。这样在很多场景下,采用 TDengine 的时序数据处理系统不再需要集成消息队列产品,比如 kafka, 从而简化系统设计的复杂度,降低运营维护成本。

    关于TDengine数据订阅服务的详细使用文档可参考官网:TDengine—数据订阅

    这里不再过多叙述,本文主要内容:在物联网背景下,实时数据监控是很常见的业务功能,面对PB级的数据量我们如何高效地获取到实时数据,TDengine 作为一款优秀的时序数据库,集成了类似Kafka消息队列一样的服务。消息队列可以起到异步解耦消峰的作用,但是通常来说数据的发送速度是远高于数据消费速度的(因为有针对业务的消费逻辑),于是乎数据堆积发生的可能性非常大,那么提高消费速度自然就是重中之重了。

    二、多线程批量消费

    2.0 准备

    创建数据库:tmqdb

    创建超级表:

    CREATE TABLE meters (ts TIMESTAMP, current FLOAT, voltage INT)
    TAGS (groupid INT, location BINARY(16))

    创建子表d0和d1:INSERT INTO d1 USING meters TAGS(1, ‘San Francisco’) values(now - 9s, 10.1, 119)
    INSERT INTO d0 values(now - 8s, NULL, NULL)

    创建主题:create topic topic_name as select * from meters

    依赖:

        <dependency>
                <groupId>org.projectlombokgroupId>
                <artifactId>lombokartifactId>
                <optional>trueoptional>
            dependency>
            <dependency>
                <groupId>org.springframework.bootgroupId>
                <artifactId>spring-boot-starter-testartifactId>
                <scope>testscope>
            dependency>
            
            <dependency>
                <groupId>com.taosdata.jdbcgroupId>
                <artifactId>taos-jdbcdriverartifactId>
                <version>3.0.0version>
         dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    2.1 超级表实体类

    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    public class Meters {
    
        //动态属性
        private Timestamp ts;
        private float current;
        private int voltage;
        //静态属性
        private int groupid;
        private String location;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    2.2 模拟数据插入

    
    /**
     * 模拟写数据
     *
     * @author zhmsky
     * @date 2022/9/12 17:18
     */
    public class WriteData {
    
        private static int count = 0;
    
        public static void main(String[] args) {
            TaosCrudUtils taosCrudUtils = new TaosCrudUtils();
            //一次性插入两万数据
            while (count < 20000) {
                Random random = new Random();
                int i = random.nextInt(235);
                String sql = "INSERT INTO tmqdb.d1 VALUES(now, " + (float) i + ", " + i + ");";
                taosCrudUtils.insert(sql);
                count++;
            }
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    2.3 配置文件

    #  服务器主机
    taos.hostName=localdomain.com:6030
    # 消费组
    taos.groupId=test
    # 主题名
    taos.topicName=topic_name
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    2.4 消费者多线程批量消费

    package com.zhmsky.springboottdengine.数据订阅.消费者多线程消费;
    
    import com.taosdata.jdbc.tmq.ConsumerRecords;
    import com.taosdata.jdbc.tmq.TMQConstants;
    import com.taosdata.jdbc.tmq.TaosConsumer;
    import com.zhmsky.springboottdengine.数据订阅.pojo.Meters;
    import org.springframework.stereotype.Component;
    
    import javax.annotation.PostConstruct;
    import java.io.IOException;
    import java.io.InputStream;
    import java.sql.SQLException;
    import java.time.Duration;
    import java.util.Collections;
    import java.util.Properties;
    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;
    
    /**
     * @author zhmsky
     * @date 2022/9/12 19:58
     */
    @Component
    public class ConsumerHandler {
    
        private static String HOST_NAME;
        private static String GROUP_ID;
        private static String TOPICNAME;
    
        private TaosConsumer<Meters> consumer;
        private ExecutorService executors;
    
        //消息队列消息拉取是否开启
        private boolean active = true;
    
        public static Properties initConfig() {
            //获取配置文件
            InputStream is = Thread.currentThread().getContextClassLoader().getResourceAsStream("application.properties");
            Properties fileProperties = new Properties();
            try {
                //读取配置文件
                fileProperties.load(is);
                HOST_NAME = fileProperties.getProperty("taos.hostName");
                GROUP_ID = fileProperties.getProperty("taos.groupId");
                TOPICNAME = fileProperties.getProperty("taos.topicName");
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
    
            //消费者配置
            Properties properties = new Properties();
            //连接地址
            properties.setProperty(TMQConstants.BOOTSTRAP_SERVERS, HOST_NAME);
            //允许从消息中解析表名
            properties.setProperty(TMQConstants.MSG_WITH_TABLE_NAME, "true");
            //开启自动提交
            properties.setProperty(TMQConstants.ENABLE_AUTO_COMMIT, "true");
            properties.setProperty(TMQConstants.GROUP_ID, GROUP_ID);
            //值解析方法,需要实现com.taosdata.jdbc.tmq.Deserializer 接口或继承 com.taosdata.jdbc.tmq.ReferenceDeserializer 类
            properties.setProperty(TMQConstants.VALUE_DESERIALIZER,
                    "com.zhmsky.springboottdengine.数据订阅.MetersDeserializer");
            return properties;
        }
    
        /**
         * 项目启动时完成初始化配置
         */
        @PostConstruct
        public void initTaosConfig() {
            Properties properties = initConfig();
            try {
                //创建消费者实例
                consumer = new TaosConsumer<>(properties);
                //订阅主题
                consumer.subscribe(Collections.singletonList(TOPICNAME));
            } catch (SQLException e) {
                throw new RuntimeException(e);
            }
    
        }
    
        /**
         * 多线程批量消费(执行这个方法即可循环拉取消息)
         *
         * @param workerNum
         */
        public void execute(int workerNum) {
            executors = new ThreadPoolExecutor(workerNum, 20, 10,
                    TimeUnit.SECONDS, new ArrayBlockingQueue<>(2000), new ThreadPoolExecutor.CallerRunsPolicy());
            while (active) {
                try {
                    ConsumerRecords<Meters> records = consumer.poll(Duration.ofMillis(100));
                    if (!records.isEmpty()) {
                        //将消息交给线程池认领
                        executors.submit(new TaskWorker(records));
                    }
                } catch (SQLException e) {
                    throw new RuntimeException(e);
                }
            }
        }
    
        /**
         * 停止拉取消息
         */
        public void stopTaosPoll() {
            this.active = false;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111

    2.5 自定义线程类(处理真正的消费逻辑)

    /**
     * @author zhmsky
     * @date 2022/9/12 20:29
     */
    @Slf4j
    public class TaskWorker implements Runnable {
    
        private ConsumerRecords<Meters> consumerRecords;
    
        public TaskWorker(ConsumerRecords<Meters> records) {
            this.consumerRecords = records;
        }
    
        /**
         * 线程处理逻辑(正真的消息消费逻辑)
         */
        @Override
        public void run() {
            //TODO 真实的消费处理逻辑
            for (Meters consumerRecord : consumerRecords) {
                log.info(Thread.currentThread().getName() + "::" + consumerRecord.getTs() + " " + consumerRecord.getCurrent() + " "
                        + consumerRecord.getVoltage() + " " + consumerRecord.getLocation() + " " + consumerRecord.getGroupid());
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25

    2.6 值解析

    /**
     * 值解析方法
     *
     * @author zhmsky
     * @date 2022/9/12 16:43
     */
    public class MetersDeserializer extends ReferenceDeserializer<Meters> {
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    2.7 测试

    @SpringBootApplication
    @EnableScheduling
    public class SpringbootTDengineApplication {
        @Autowired
        private ConsumerHandler consumers;
    
        public static void main(String[] args) {
            SpringApplication.run(SpringbootTDengineApplication.class, args);
        }
    
        //定时任务启动
        @Scheduled(cron = "* 3 21 * * ? ")
        public void test() {
            consumers.execute(10);
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    在这里插入图片描述

  • 相关阅读:
    英语——分享篇——每日200词——1001-1200
    “媒体+”时代正当时,ATEN以前瞻解决方案助推媒体融合纵深发展
    使用GoQuery实现头条新闻采集
    CobalStrike(CS)上线隐藏IP和流量
    一招教你控制python多线程的线程数量
    nacos
    记一次 .NET 某电厂Web系统 内存泄漏分析
    ②⑩ 【MySQL Log】详解MySQL日志:错误日志、二进制日志、查询日志、慢查询日志
    英语单词: truncate;截断警告
    苹果iPhone手机发抖音短视频如何挂载词令抖音小程序?
  • 原文地址:https://blog.csdn.net/weixin_42194695/article/details/126823147