• Pulsar IO实战


    一、引言

    今天跟着 官方文档 基于docker玩一把Pulsar IO吧

    二、概要

    • 在用户能够轻松的将消息队列跟其他系统(数据库、其他消息系统)一起使用时,消息队列的作用才是最强大的。而Pulsar IO connectors可以让你很轻松的创建、部署以及管理这些跟外部系统的连接,例如mysql、kafka、cassandra等。

    • Pulsar connector分为Source和Sink两种,Source connector会将数据从外部系统喂给Pulsar,而Sink connector负责将数据从Pulsar喂给外部系统。

    • Pulsar connector是一种特殊的Function,只不过这个Function持有其他系统的客户端作为pulsar与其他系统的桥梁,它在处理保证上跟Function是一致的,分别是最多一次、至少一次、精准一次。处理保证不仅依靠Pulsar,还跟外部系统相关以及实现逻辑相关。

      • 最多一次:发给connector的消息最多处理一次或者不做处理
      • 至少一次:发给connector的消息处理一次或者多次
      • 精准一次:发给connector的消息只处理一次

    三、实战

    1.安装connector

    1. 这里 下载对应的connector,先选择对应的版本,在点进 connectors 目录选择对应的source或者sink
      在这里插入图片描述

    2. 将下载的nar文件放到pulsar安装地址的connectors 目录下(没有则需要创建)
      在这里插入图片描述

    3. 启动Pulsar

    4. 通过指令查看服务connector信息,先输出下面这样的信息就说明connector已经注册到Pulsar上面了

      curl -s http://localhost:8080/admin/v2/functions/connectors
      
      • 1

      在这里插入图片描述

    2. 安装Cassandra

    1. 基于 brew install --cask --appdir=/Applications docker 安装docker(仅针对mac环境)

    2. 基于docker运行 cassandra,成功运行后通过 docker ps可以看到Cassandra服务已经起来了

      docker run -d --rm --name=cassandra -p 9042:9042 cassandra:3.11
      
      • 1

    在这里插入图片描述

    1. 通过 docker exec -ti cassandra cqlsh localhost 进入Cassandra服务的容器,并通过以下指令进行库表的初始化

      CREATE KEYSPACE pulsar_test_keyspace WITH replication = {'class':'SimpleStrategy', 'replication_factor':1};
      
      USE pulsar_test_keyspace;
      
      CREATE TABLE pulsar_test_table (key text PRIMARY KEY, col text);
      
      • 1
      • 2
      • 3
      • 4
      • 5
    2. 先查询该表确保没有数据 select * from pulsar_test_table;
      在这里插入图片描述

    3. 功能验证

    1. 写配置文件cassandra-sink.yml

      configs:
          roots: "localhost:9042"
          keyspace: "pulsar_test_keyspace"
          columnFamily: "pulsar_test_table"
          keyname: "key"
          columnName: "col"
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
    2. 启动写Cassandra的sink,启动后通过指令查看显示sink已经正常启动

    pulsar-admin sinks create \
        --tenant public \
        --namespace default \
        --name cassandra-test-sink \
        --sink-type cassandra \
        --sink-config-file examples/cassandra-sink.yml \
        --inputs test_cassandra
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    在这里插入图片描述

    1. 执行命令批量往pulsar中写入数据,看是否会正常输出到Cassandra中

      for i in {0..9}; do pulsar-client produce -m "key-$i" -n 1 test_cassandra; done
      
      • 1
    2. 由于上面的操作是有延迟的,所以不断的查询Cassandra的表是可以看到数据在逐步的增加,并最终写满十条数据
      在这里插入图片描述
      在这里插入图片描述

    四、总结

    纸上得来终觉浅,绝知此事要躬行。 学习不能仅仅停留在纸面上或者理论,脱离使用去探讨设计或者源码都是不切实际的。因此今天一起体验了一把Pulsar IO,除此之外Pulsar还提供了非常丰富的跟其他系统交互的Connector,详细可以看上面发的下载地址并尝试使用自己感兴趣的Connector感受下实操的快乐~

  • 相关阅读:
    高速电路设计笔记----第一章
    解决https页面加载http资源报错
    《计算机体系结构》1.4 技术趋势
    (万文)最全、最细前端面试问题总结(答题思路分析、答案解析)
    对于xshell连接不上虚拟机的一些解答
    数据结构学习笔记——查找算法
    架构师书籍推荐
    JDK8新特性--函数式接口--(Consumer的概念理解,模拟练习,企业实战)全流程彻底搞懂
    聊聊JedisFactory
    数据库异常恢复2-备份文件恢复(快速恢复的手动启动方式)
  • 原文地址:https://blog.csdn.net/linchonghui888/article/details/136689470