• Kafka多语言版本


    Installation

    curl -sSL https://raw.githubusercontent.com/bitnami/containers/main/bitnami/kafka/docker-compose.yml > docker-compose.yml
    docker-compose up -d
    
    • 1
    • 2

    Rust

    这里使用的是rdkafka,

    producer.rs:

    use std::time::Duration;
    
    use clap::{App, Arg};
    use log::info;
    
    use rdkafka::config::ClientConfig;
    use rdkafka::message::{Header, OwnedHeaders};
    use rdkafka::producer::{FutureProducer, FutureRecord};
    use rdkafka::util::get_rdkafka_version;
    
    use crate::example_utils::setup_logger;
    
    mod example_utils;
    
    async fn produce(brokers: &str, topic_name: &str) {
        let producer: &FutureProducer = &ClientConfig::new()
            .set("bootstrap.servers", brokers)
            .set("message.timeout.ms", "5000")
            .create()
            .expect("Producer creation error");
    
        let send_time = std::time::SystemTime::now()
            .duration_since(std::time::UNIX_EPOCH)
            .expect("Time went backwards")
            .as_millis(); // 获取当前时间戳(毫秒)
    
        // This loop is non blocking: all messages will be sent one after the other, without waiting
        // for the results.
        let futures = (0..5)
            .map(|i| async move {
                // The send operation on the topic returns a future, which will be
                // completed once the result or failure from Kafka is received.
                let delivery_status = producer
                    .send(
                        FutureRecord::to(topic_name)
                            .payload(&format!("{}",  send_time))
                            .key(&format!("Key {}", i))
                            .headers(OwnedHeaders::new().insert(Header {
                                key: "header_key",
                                value: Some("header_value"),
                            })),
                        Duration::from_secs(0),
                    )
                    .await;
    
                // This will be executed when the result is received.
                info!("Delivery status for message {} received", i);
                delivery_status
            })
            .collect::<Vec<_>>();
    
        // This loop will wait until all delivery statuses have been received.
        for future in futures {
            info!("Future completed. Result: {:?}", future.await);
        }
    }
    
    #[tokio::main]
    async fn main() {
        let matches = App::new("producer example")
            .version(option_env!("CARGO_PKG_VERSION").unwrap_or(""))
            .about("Simple command line producer")
            .arg(
                Arg::with_name("brokers")
                    .short("b")
                    .long("brokers")
                    .help("Broker list in kafka format")
                    .takes_value(true)
                    .default_value("localhost:9092"),
            )
            .arg(
                Arg::with_name("log-conf")
                    .long("log-conf")
                    .help("Configure the logging format (example: 'rdkafka=trace')")
                    .takes_value(true),
            )
            .arg(
                Arg::with_name("topic")
                    .short("t")
                    .long("topic")
                    .help("Destination topic")
                    .takes_value(true)
                    .required(true),
            )
            .get_matches();
    
        setup_logger(true, matches.value_of("log-conf"));
    
        let (version_n, version_s) = get_rdkafka_version();
        info!("rd_kafka_version: 0x{:08x}, {}", version_n, version_s);
    
        let topic = matches.value_of("topic").unwrap();
        let brokers = matches.value_of("brokers").unwrap();
    
        produce(brokers, topic).await;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96

    consumer.rs

    use clap::{App, Arg};
    use log::{info, warn};
    
    use rdkafka::client::ClientContext;
    use rdkafka::config::{ClientConfig, RDKafkaLogLevel};
    use rdkafka::consumer::stream_consumer::StreamConsumer;
    use rdkafka::consumer::{CommitMode, Consumer, ConsumerContext, Rebalance};
    use rdkafka::error::KafkaResult;
    use rdkafka::message::{Headers, Message};
    use rdkafka::topic_partition_list::TopicPartitionList;
    use rdkafka::util::get_rdkafka_version;
    
    use crate::example_utils::setup_logger;
    
    mod example_utils;
    
    // A context can be used to change the behavior of producers and consumers by adding callbacks
    // that will be executed by librdkafka.
    // This particular context sets up custom callbacks to log rebalancing events.
    struct CustomContext;
    
    impl ClientContext for CustomContext {}
    
    impl ConsumerContext for CustomContext {
        fn pre_rebalance(&self, rebalance: &Rebalance) {
            info!("Pre rebalance {:?}", rebalance);
        }
    
        fn post_rebalance(&self, rebalance: &Rebalance) {
            info!("Post rebalance {:?}", rebalance);
        }
    
        fn commit_callback(&self, result: KafkaResult<()>, _offsets: &TopicPartitionList) {
            info!("Committing offsets: {:?}", result);
        }
    }
    
    // A type alias with your custom consumer can be created for convenience.
    type LoggingConsumer = StreamConsumer<CustomContext>;
    
    async fn consume_and_print(brokers: &str, group_id: &str, topics: &[&str]) {
        let context = CustomContext;
    
        let consumer: LoggingConsumer = ClientConfig::new()
            .set("group.id", group_id)
            .set("bootstrap.servers", brokers)
            .set("enable.partition.eof", "false")
            .set("session.timeout.ms", "6000")
            .set("enable.auto.commit", "true")
            //.set("statistics.interval.ms", "30000")
            //.set("auto.offset.reset", "smallest")
            .set_log_level(RDKafkaLogLevel::Debug)
            .create_with_context(context)
            .expect("Consumer creation failed");
    
        consumer
            .subscribe(&topics.to_vec())
            .expect("Can't subscribe to specified topics");
    
        loop {
            let start = std::time::Instant::now();
            match consumer.recv().await {
                Err(e) => warn!("Kafka error: {}", e),
                Ok(m) => {
                    info!("Start time is: {:?}", start);
                    let duration = start.elapsed();
                    info!("Time elapsed receiving the message is: {:?}", duration);
                    let payload = match m.payload_view::<str>() {
                        None => "",
                        Some(Ok(s)) => s,
                        Some(Err(e)) => {
                            warn!("Error while deserializing message payload: {:?}", e);
                            ""
                        }
                    };
                    println!("pay load is {}", payload);
                    let payload_parts: Vec<&str> = payload.split(" with timestamp ").collect();
                    if payload_parts.len() == 2 {
                        let send_time: u128 = payload_parts[1].parse().unwrap_or(0);
                        let receive_time = std::time::SystemTime::now()
                            .duration_since(std::time::UNIX_EPOCH)
                            .expect("Time went backwards")
                            .as_millis(); // 获取当前时间戳(毫秒)
                        let elapsed_time = receive_time - send_time; // 计算耗时
                        info!("Received time: {}  {}", receive_time, send_time);
                        info!("Elapsed time: {} ms", elapsed_time);
                    }
    
                    // let send_time: u128 = payload.parse().unwrap_or(1);
                    // println!("send time is {} ", send_time);
                    info!("key: '{:?}', payload: '{}', topic: {}, partition: {}, offset: {}, timestamp: {:?}",
                          m.key(), payload, m.topic(), m.partition(), m.offset(), m.timestamp());
                    if let Some(headers) = m.headers() {
                        for header in headers.iter() {
                            info!("  Header {:#?}: {:?}", header.key, header.value);
                        }
                    }
                    consumer.commit_message(&m, CommitMode::Async).unwrap();
                }
            };
        }
    }
    
    #[tokio::main]
    async fn main() {
        let matches = App::new("consumer example")
            .version(option_env!("CARGO_PKG_VERSION").unwrap_or(""))
            .about("Simple command line consumer")
            .arg(
                Arg::with_name("brokers")
                    .short("b")
                    .long("brokers")
                    .help("Broker list in kafka format")
                    .takes_value(true)
                    .default_value("localhost:9092"),
            )
            .arg(
                Arg::with_name("group-id")
                    .short("g")
                    .long("group-id")
                    .help("Consumer group id")
                    .takes_value(true)
                    .default_value("example_consumer_group_id"),
            )
            .arg(
                Arg::with_name("log-conf")
                    .long("log-conf")
                    .help("Configure the logging format (example: 'rdkafka=trace')")
                    .takes_value(true),
            )
            .arg(
                Arg::with_name("topics")
                    .short("t")
                    .long("topics")
                    .help("Topic list")
                    .takes_value(true)
                    .multiple(true)
                    .required(true),
            )
            .get_matches();
    
        setup_logger(true, matches.value_of("log-conf"));
    
        let (version_n, version_s) = get_rdkafka_version();
        // info!("rd_kafka_version: 0x{:08x}, {}", version_n, version_s);
    
        let topics = matches.values_of("topics").unwrap().collect::<Vec<&str>>();
        let brokers = matches.value_of("brokers").unwrap();
        let group_id = matches.value_of("group-id").unwrap();
    
        consume_and_print(brokers, group_id, &topics).await
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152

    Golang

    这里使用的是confluent-kafka-go

    producer.go:

    package main
    
    import (
    	"fmt"
    	"math/rand"
    	"os"
    	"strconv"
    	"time"
    
    	"github.com/confluentinc/confluent-kafka-go/kafka"
    )
    
    func main() {
    
    	config := &kafka.ConfigMap{
    		"bootstrap.servers": "localhost:9092",
    		"client.id":         "go-producer",
    	}
    
    	topic := "purchases"
    	p, err := kafka.NewProducer(config)
    
    	if err != nil {
    		fmt.Printf("Failed to create producer: %s", err)
    		os.Exit(1)
    	}
    
    	// Go-routine to handle message delivery reports and
    	// possibly other event types (errors, stats, etc)
    	go func() {
    		for e := range p.Events() {
    			switch ev := e.(type) {
    			case *kafka.Message:
    				if ev.TopicPartition.Error != nil {
    					fmt.Printf("Failed to deliver message: %v\n", ev.TopicPartition)
    				} else {
    					fmt.Printf("Produced event to topic %s: key = %-10s value = %s\n",
    						*ev.TopicPartition.Topic, string(ev.Key), string(ev.Value))
    				}
    			}
    		}
    	}()
    
    	users := [...]string{"eabara", "jsmith", "sgarcia", "jbernard", "htanaka", "awalther"}
    	items := [...]string{"book", "alarm clock", "t-shirts", "gift card", "batteries"}
    
    	for n := 0; n < 50; n++ {
    		key := users[rand.Intn(len(users))]
    		data := items[rand.Intn(len(items))]
    
    		currentTimeMillis := time.Now().UnixNano() / 1e6
    		data = strconv.FormatInt(currentTimeMillis, 10) // append current time to data
    
    		p.Produce(&kafka.Message{
    			TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
    			Key:            []byte(key),
    			Value:          []byte(data),
    		}, nil)
    	}
    
    	// Wait for all messages to be delivered
    	p.Flush(15 * 1000)
    	p.Close()
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65

    consumer.go

    package main
    
    import (
    	"fmt"
    	"os"
    	"os/signal"
    	"strconv"
    	"syscall"
    	"time"
    
    	"github.com/confluentinc/confluent-kafka-go/kafka"
    )
    
    func main() {
    
    	// if len(os.Args) != 2 {
    	// 	fmt.Fprintf(os.Stderr, "Usage: %s \n",
    	// 		os.Args[0])
    	// 	os.Exit(1)
    	// }
    
    	// configFile := os.Args[1]
    	conf := kafka.ConfigMap{
    		"bootstrap.servers": "localhost:9092",
    		"client.id":         "go-producer",
    	}
    	conf["group.id"] = "kafka-go-getting-started"
    	conf["auto.offset.reset"] = "earliest"
    
    	c, err := kafka.NewConsumer(&conf)
    
    	if err != nil {
    		fmt.Printf("Failed to create consumer: %s", err)
    		os.Exit(1)
    	}
    
    	topic := "purchases"
    	err = c.SubscribeTopics([]string{topic}, nil)
    	// Set up a channel for handling Ctrl-C, etc
    	sigchan := make(chan os.Signal, 1)
    	signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)
    
    	// Process messages
    	run := true
    	for run {
    		select {
    		case sig := <-sigchan:
    			fmt.Printf("Caught signal %v: terminating\n", sig)
    			run = false
    		default:
    			ev, err := c.ReadMessage(100 * time.Millisecond)
    			if err != nil {
    				// Errors are informational and automatically handled by the consumer
    				continue
    			}
    			fmt.Printf("Consumed event from topic %s: key = %-10s value = %s\n",
    				*ev.TopicPartition.Topic, string(ev.Key), string(ev.Value))
    			// parts := strings.Split(string(ev.Value))
    			// if len(parts) < 2 {
    			// 	fmt.Println("Invalid message format. Skipping.")
    			// 	continue
    			// }
    
    			// data := parts[0]
    			timestampStr := string(ev.Value)
    
    			// Convert the string timestamp to an integer
    			sentTimestamp, err := strconv.ParseInt(timestampStr, 10, 64)
    			if err != nil {
    				fmt.Println("Could not parse timestamp. Skipping.")
    				continue
    			}
    
    			receivedTimestamp := time.Now().UnixNano() / 1e6 // current time in milliseconds
    
    			latency := receivedTimestamp - sentTimestamp
    			fmt.Printf("Consumed event from topic %s: key = %-10s  latency = %d ms\n",
    				*ev.TopicPartition.Topic, string(ev.Key), latency)
    		}
    	}
    
    	c.Close()
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83

    Python

    这里使用的是confluent_kafka

    producer.py:

    import socket
    from typing import Optional
    
    from confluent_kafka import Producer
    
    
    class KafkaProducer:
        def __init__(self, bootstrap_servers: str, client_id: str) -> None:
            self.conf = {"bootstrap.servers": bootstrap_servers, "client.id": client_id}
            self.producer = Producer(self.conf)
    
        def acked(self, err: Optional[str], msg: str) -> None:
            if err is not None:
                print(f"Failed to deliver message: {msg}: {err}")
            else:
                print(f"Message produced: {msg}")
    
        def produce(
            self, topic: str, key: Optional[str] = None, value: Optional[str] = None
        ) -> None:
            self.producer.produce(topic, key=key, value=value, callback=self.acked)
    
        def poll(self, timeout: float) -> None:
            self.producer.poll(timeout)
    
        def flush(self) -> None:
            self.producer.flush()
    
    
    if __name__ == "__main__":
        producer = KafkaProducer(
            bootstrap_servers="kafka:9092", client_id=socket.gethostname()
        )
        topic = "test"
        producer.produce(topic, key="key", value="value")
        producer.poll(1)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36

    consumer.py

    import sys
    from typing import List, Optional
    
    from confluent_kafka import Consumer, KafkaError, KafkaException
    
    
    class KafkaConsumer:
        def __init__(self, bootstrap_servers: str, group_id: str, auto_offset_reset: str = 'smallest') -> None:
            self.conf = {
                'bootstrap.servers': bootstrap_servers,
                'group.id': group_id,
                'auto.offset.reset': auto_offset_reset
            }
            self.consumer = Consumer(self.conf)
            self.running = True
    
        def basic_consume_loop(self, topics: List[str], timeout: float = 1.0) -> None:
            try:
                self.consumer.subscribe(topics)
    
                while self.running:
                    msg = self.consumer.poll(timeout=timeout)
                    if msg is None:
                        continue
    
                    if msg.error():
                        if msg.error().code() == KafkaError._PARTITION_EOF:
                            # End of partition event
                            sys.stderr.write('%% %s [%d] reached end at offset %d\n' %
                                             (msg.topic(), msg.partition(), msg.offset()))
                        elif msg.error():
                            raise KafkaException(msg.error())
                    else:
                        print(msg.value().decode('utf-8'))
                        # do something with msg.value()
            finally:
                # Close down consumer to commit final offsets.
                self.consumer.close()
    
        def shutdown(self) -> None:
            self.running = False
    
    if __name__ == "__main__":
        consumer = KafkaConsumer(bootstrap_servers="kafka:9092", group_id="foo", auto_offset_reset="smallest")
        topics = ["test"]
        consumer.basic_consume_loop(topics)
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
  • 相关阅读:
    自然语言处理(NLP)-spacy简介以及安装指南(语言库zh_core_web_sm)
    C++最佳实践之常用关键字
    rand()函数产生随机数详解
    U-Net: Convolutional Networks for Biomedical Images Segmentation
    五、Spring Boot(1)
    【附源码】Python计算机毕业设计社区住户信息管理系统
    网络安全(黑客技术)—自学
    es-head连接Elasticsearch没反应和新建索引没反应(406)
    Spring 源码(11)Spring Bean 的创建过程(2)
    第三章 多维随机变量及其分布
  • 原文地址:https://blog.csdn.net/majiayu000/article/details/132721052