kafka golang生产者客户端
下载安装kafka Golang客户端
go get github.com/Shopify/sarama
使用Golang创建消息生产者
同步消息模式
package main
import (
"fmt"
"log"
"os"
"time"
"github.com/Shopify/sarama"
)
var Adddress = []string{
"192.168.74.138:9092"}
func main() {
syncProducer(Adddress)
}
func syncProducer(address []string) {
config := sarama.NewConfig()
config.Producer.Return.Successes = true
config.Producer.Timeout = 5 * time.Second
p, err := sarama.NewSyncProducer(address, config)
if err != nil {
log.Printf("sarama.NewSyncProducer err, message = %s", err)
return
}
defer p.Close()
topic := "topic1"
srcValue := "sync: this is message. index = %d"
for i := 0; i < 10; i++ {
value := fmt.Sprintf(srcValue, i)
msg := &sarama.ProducerMessage{
Topic: topic,
Value: sarama.ByteEncoder(value),
}
part, offset, err := p.SendMessage(msg)
if err != nil {
log.Printf("send message(%s) err = %s\n", value, err)
} el
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58