//连接的服务器
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"node1:9092,node2:9092");
//指定对应的key和value的序列化类型
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
//关联自定义分区器
//properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.ljr.kafka.producer.MyPartitioner");
键入new KafkaProducer<>(),光标置于括号内CTRL+P可以显示需要对象为properties;
键入new Properties().var 回车,键入new KafkaProducer<>(properties).var 回车,选择变量名
键入KafkaProducer.send(),提示需要对象ProducerRecord;键入topic名(order)和要发送的信息(“0000”+i),new Callback()回车会弹出需要重写的抽象类,补全返回条件、需要返回的信息即可实现抽象类;
e == null 表示消息全部发送完毕;
KafkaProducer.close();
运行:
可以看到有返回信息;
另开窗口查看发送结果
kafka-console-consumer.sh --bootstrap-server node1:9092,node2:9092 --topic order
信息发送成功;
- package com.ljr.kafka.producer;
-
- import org.apache.kafka.clients.producer.*;
- import org.apache.kafka.common.serialization.StringSerializer;
-
- import java.util.Properties;
-
- public class CustomProducerCallback {
- public static void main(String[] args) {
-
- Properties properties = new Properties();
-
- properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"node1:9092,node2:9092");
- properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
- properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
- /关联自定义分区器
- // properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.ljr.kafka.producer.MyPartitioner");
-
- KafkaProducer
kafkaProducer = new KafkaProducer<>(properties); -
- for(int i =0; i < 3; i++){
- kafkaProducer.send(new ProducerRecord<>("customers", "LiSi" + i), new Callback() {
- @Override
- public void onCompletion(RecordMetadata recordMetadata, Exception e) {
- if (e == null) {
- System.out.println("topic:" + recordMetadata.topic() + " partition:" + recordMetadata.partition());
- }
- }
- });
- }
- kafkaProducer.close();
- }
- }