• pulsar简介


    pulsar简介

    pulsar是和kafak同类型的消息处理平台,这两年开始走入大众视野。相比于老牌kafka,新秀pulsar带来了一些让我喜欢的新功能:

    1. 多种订阅方式: 独占、灾备、共享、Key_Shared
    2. 延迟发送: 消息发送到topic后,consumer过一段时间才消费到这条消息
    3. 易用: 不需要像kafka那样去计算分区、副本数量等,pulsar不够用了直接无脑扩broker

    详细的说明就不赘述了,我们来快速使用体验下。

    安装

    在安装前需要有java环境,本次使用的系统是Ubuntu 20.04, java版本是17.02。

    本次使用的pulsar版本是2.10.1。

    1. 获取安装包
    $ wget https://archive.apache.org/dist/pulsar/pulsar-2.10.1/apache-pulsar-2.10.1-bin.tar.gz
    
    • 1
    1. 解压
    $ tar xvfz apache-pulsar-2.10.1-bin.tar.gz
    $ cd apache-pulsar-2.10.1
    
    • 1
    • 2
    1. 启动服务端
    $ ./bin/pulsar standalone
    
    • 1

    看到如下信息就是启动成功了:

    2022-08-13T21:34:46,652+0800 [worker-scheduler-0] INFO  org.apache.pulsar.functions.worker.SchedulerManager - Schedule summary - execution time: 0.033926031 sec | total unassigned: 0 | stats: {"Added": 0, "Updated": 0, "removed": 0}
    {
      "c-standalone-fw-localhost-8080" : {
        "originalNumAssignments" : 0,
        "finalNumAssignments" : 0,
        "instancesAdded" : 0,
        "instancesRemoved" : 0,
        "instancesUpdated" : 0,
        "alive" : true
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    如果想后台启动可以执行如下命令:

    $ ./bin/pulsar-daemon start standalone
    
    • 1

    使用

    1. 查看健康状态
    $ ./bin/pulsar-admin brokers healthcheck
    ok
    
    • 1
    • 2
    1. 发送一个消息
    $ ./bin/pulsar-client produce k-topic --messages "hello test"
    
    • 1

    如果正常发送的话可以看到如下提示:

    2022-08-13T21:43:50,559+0800 [main] INFO  org.apache.pulsar.client.cli.PulsarClientTool - 1 messages successfully produced
    
    • 1
    1. 消费一个消息
    $ ./bin/pulsar-client consume j-topic -s "first-subscription"
    
    • 1

    如果正常发送的话可以看到刚才发的那个消息:

    ----- got message -----
    key:[null], properties:[], content:hello test
    
    • 1
    • 2

    可以看到整个过程我们不需要预先做其他的设置,不需要手动去处理zookeeper,启动pulsar后就可以直接使用了。

    Go使用

    pulsar官方客户端除了有java之外,还有Python、go、c++、nodejs、C#,还支持websocket、以及REST api
    来发送消息,可以说是很方便了。

    我们来用go试试。

    1. 准备好相关扩展

    创建一个测试目录,并初始化go mod:

    $ mkdir test_dir && cd test_dir
    $ go mod init test_dir 
    $ go mod tidy 
    
    • 1
    • 2
    • 3

    安装pulsar客户端:

    $ go get -u "github.com/apache/pulsar-client-go/pulsar"
    
    • 1
    1. 创建生产者
    package main
    
    import (
    	"context"
    	"fmt"
    	"github.com/apache/pulsar-client-go/pulsar"
    	"log"
    	"time"
    )
    
    func main() {
    
    	// 创建客户端
    	client, err := pulsar.NewClient(pulsar.ClientOptions{
    		URL:               "pulsar://localhost:6650",
    		OperationTimeout:  30 * time.Second,
    		ConnectionTimeout: 30 * time.Second,
    	})
    	if err != nil {
    		log.Fatalf("Could not instantiate Pulsar client: %v", err)
    	}
    
    	// 创建生产者
    	producer, err := client.CreateProducer(pulsar.ProducerOptions{
    		Topic: "go-topic",
    	})
    	if err != nil {
    		log.Fatal(err)
    	}
    	defer client.Close()
    
    	// 每隔3秒发一次消息
    	num := 0
    	for {
    		msg := fmt.Sprintf("hello %d", num)
    		_, err = producer.Send(context.Background(), &pulsar.ProducerMessage{
    			Payload: []byte(msg),
    		})
    		num += 1
    		time.Sleep(time.Duration(3) * time.Second)
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    1. 创建消费者
    package main
    
    import (
    	"context"
    	"fmt"
    	"github.com/apache/pulsar-client-go/pulsar"
    	"log"
    	"time"
    )
    
    func main() {
    
    	// 创建客户端
    	client, err := pulsar.NewClient(pulsar.ClientOptions{
    		URL:               "pulsar://localhost:6650",
    		OperationTimeout:  30 * time.Second,
    		ConnectionTimeout: 30 * time.Second,
    	})
    	if err != nil {
    		log.Fatalf("Could not instantiate Pulsar client: %v", err)
    	}
    
    	// 创建消费者
    	consumer, err := client.Subscribe(pulsar.ConsumerOptions{
    		Topic:            "go-topic",
    		SubscriptionName: "my-sub",
    		Type:             pulsar.Shared,
    	})
    	if err != nil {
    		log.Fatal(err)
    	}
    	defer consumer.Close()
    
    	// 消费消息
    	for {
    		msg, err := consumer.Receive(context.Background())
    		if err != nil {
    			log.Fatal(err)
    		}
    
    		fmt.Printf("Received message msgId: %#v -- content: '%s'\n",
    			msg.ID(), string(msg.Payload()))
    
    		consumer.Ack(msg)
    	}
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    1. 打包测试
    $ go build -o consumer consumer.go
    $ go build -o produce produce.go
    $ chmod +x consumer produce
    
    • 1
    • 2
    • 3

    启动一个生产者

    $ ./produce 
    INFO[0000] [Connecting to broker]                        remote_addr="pulsar://localhost:6650"
    INFO[0000] [TCP connection established]                  local_addr="127.0.0.1:33210" remote_addr="pulsar://localhost:6650"
    INFO[0000] [Connection is ready]                         local_addr="127.0.0.1:33210" remote_addr="pulsar://localhost:6650"
    INFO[0000] [Created producer]                            cnx="127.0.0.1:33210 -> 127.0.0.1:6650" producerID=1 producer_name=standalone-2-9 topic="persistent://public/default/go-topic"
    
    • 1
    • 2
    • 3
    • 4
    • 5

    再启动一个消费者,获取消息

    $ ./consumer 
    INFO[0000] [Connecting to broker]                        remote_addr="pulsar://localhost:6650"
    INFO[0000] [TCP connection established]                  local_addr="127.0.0.1:33212" remote_addr="pulsar://localhost:6650"
    INFO[0000] [Connection is ready]                         local_addr="127.0.0.1:33212" remote_addr="pulsar://localhost:6650"
    INFO[0000] [Connected consumer]                          consumerID=1 name=xgqua subscription=my-sub topic="persistent://public/default/go-topic"
    INFO[0000] [Created consumer]                            consumerID=1 name=xgqua subscription=my-sub topic="persistent://public/default/go-topic"
    Received message msgId: pulsar.trackingMessageID{messageID:pulsar.messageID{ledgerID:242, entryID:5, batchIdx:0, partitionIdx:0}, tracker:(*pulsar.ackTracker)(nil), consumer:(*pulsar.partitionConsumer)(0xc000274ea0), receivedTime:time.Time{wall:0xc0b60af1958c2114, ext:2499737025, loc:(*time.Location)(0x1024400)}} -- content: 'hello 4'
    Received message msgId: pulsar.trackingMessageID{messageID:pulsar.messageID{ledgerID:242, entryID:6, batchIdx:0, partitionIdx:0}, tracker:(*pulsar.ackTracker)(nil), consumer:(*pulsar.partitionConsumer)(0xc000274ea0), receivedTime:time.Time{wall:0xc0b60af255a034ea, ext:5501052821, loc:(*time.Location)(0x1024400)}} -- content: 'hello 5'
    Received message msgId: pulsar.trackingMessageID{messageID:pulsar.messageID{ledgerID:242, entryID:7, batchIdx:0, partitionIdx:0}, tracker:(*pulsar.ackTracker)(nil), consumer:(*pulsar.partitionConsumer)(0xc000274ea0), receivedTime:time.Time{wall:0xc0b60af315c8fc87, ext:8503725362, loc:(*time.Location)(0x1024400)}} -- content: 'hello 6'
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    今天的介绍就到这里了,感谢阅读。

  • 相关阅读:
    依概率收敛和依分布收敛(附一道例题)
    3分钟从零解读Transformer的Encoder
    nginx下载安装与反向代理及负载均衡应用
    我复现的第一个神经网络: LeNet
    腾讯云安全2022年度产品发布会:“3+1”一体化防护体系 助力企业实现云上安全“最优解”
    【森城市】GIS数据漫谈(一)
    汽车soa架构介绍
    SPI协议讲解与总结
    论如何直接用EF Core实现创建更新时间、用户审计,自动化乐观并发、软删除和树形查询(中)
    下半年软考报名时间发布,你准备好了吗?
  • 原文地址:https://blog.csdn.net/u012375924/article/details/126325321