• Pulsar IO实战


    一、引言

    今天跟着 官方文档 基于docker玩一把Pulsar IO吧

    二、概要

    • 在用户能够轻松的将消息队列跟其他系统(数据库、其他消息系统)一起使用时,消息队列的作用才是最强大的。而Pulsar IO connectors可以让你很轻松的创建、部署以及管理这些跟外部系统的连接,例如mysql、kafka、cassandra等。

    • Pulsar connector分为Source和Sink两种,Source connector会将数据从外部系统喂给Pulsar,而Sink connector负责将数据从Pulsar喂给外部系统。

    • Pulsar connector是一种特殊的Function,只不过这个Function持有其他系统的客户端作为pulsar与其他系统的桥梁,它在处理保证上跟Function是一致的,分别是最多一次、至少一次、精准一次。处理保证不仅依靠Pulsar,还跟外部系统相关以及实现逻辑相关。

      • 最多一次:发给connector的消息最多处理一次或者不做处理
      • 至少一次:发给connector的消息处理一次或者多次
      • 精准一次:发给connector的消息只处理一次

    三、实战

    1.安装connector

    1. 这里 下载对应的connector,先选择对应的版本,在点进 connectors 目录选择对应的source或者sink
      在这里插入图片描述

    2. 将下载的nar文件放到pulsar安装地址的connectors 目录下(没有则需要创建)
      在这里插入图片描述

    3. 启动Pulsar

    4. 通过指令查看服务connector信息,先输出下面这样的信息就说明connector已经注册到Pulsar上面了

      curl -s http://localhost:8080/admin/v2/functions/connectors
      
      • 1

      在这里插入图片描述

    2. 安装Cassandra

    1. 基于 brew install --cask --appdir=/Applications docker 安装docker(仅针对mac环境)

    2. 基于docker运行 cassandra,成功运行后通过 docker ps可以看到Cassandra服务已经起来了

      docker run -d --rm --name=cassandra -p 9042:9042 cassandra:3.11
      
      • 1

    在这里插入图片描述

    1. 通过 docker exec -ti cassandra cqlsh localhost 进入Cassandra服务的容器,并通过以下指令进行库表的初始化

      CREATE KEYSPACE pulsar_test_keyspace WITH replication = {'class':'SimpleStrategy', 'replication_factor':1};
      
      USE pulsar_test_keyspace;
      
      CREATE TABLE pulsar_test_table (key text PRIMARY KEY, col text);
      
      • 1
      • 2
      • 3
      • 4
      • 5
    2. 先查询该表确保没有数据 select * from pulsar_test_table;
      在这里插入图片描述

    3. 功能验证

    1. 写配置文件cassandra-sink.yml

      configs:
          roots: "localhost:9042"
          keyspace: "pulsar_test_keyspace"
          columnFamily: "pulsar_test_table"
          keyname: "key"
          columnName: "col"
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
    2. 启动写Cassandra的sink,启动后通过指令查看显示sink已经正常启动

    pulsar-admin sinks create \
        --tenant public \
        --namespace default \
        --name cassandra-test-sink \
        --sink-type cassandra \
        --sink-config-file examples/cassandra-sink.yml \
        --inputs test_cassandra
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    在这里插入图片描述

    1. 执行命令批量往pulsar中写入数据,看是否会正常输出到Cassandra中

      for i in {0..9}; do pulsar-client produce -m "key-$i" -n 1 test_cassandra; done
      
      • 1
    2. 由于上面的操作是有延迟的,所以不断的查询Cassandra的表是可以看到数据在逐步的增加,并最终写满十条数据
      在这里插入图片描述
      在这里插入图片描述

    四、总结

    纸上得来终觉浅,绝知此事要躬行。 学习不能仅仅停留在纸面上或者理论,脱离使用去探讨设计或者源码都是不切实际的。因此今天一起体验了一把Pulsar IO,除此之外Pulsar还提供了非常丰富的跟其他系统交互的Connector,详细可以看上面发的下载地址并尝试使用自己感兴趣的Connector感受下实操的快乐~

  • 相关阅读:
    java教学互动系统计算机毕业设计MyBatis+系统+LW文档+源码+调试部署
    BI-SQL丨MEGRE
    webpack 面试题
    C#使用Spire.Pdf包对PDF文档进行数字签名
    【C++笔试强训】第十三天
    Java 栈和队列的基本使用
    java毕业设计大学生心愿墙系统Mybatis+系统+数据库+调试部署
    站在巨人的肩膀上谈-计算机视觉走向未来
    靠谱的车【华为OD机试-JAVA&Python&C++&JS】
    前端优化-前端性能优化
  • 原文地址:https://blog.csdn.net/linchonghui888/article/details/136689470