同步发送的意思就是,一条消息发送之后,会阻塞当前线程,直至返回 ack。 由于 send 方法返回的是一个 Future 对象,根据 Futrue 对象的特点,我们也可以实现同 步发送的效果,只需在调用 Future 对象的 get 方发即可。
- import org.apache.kafka.clients.producer.KafkaProducer;
- import org.apache.kafka.clients.producer.ProducerConfig;
- import org.apache.kafka.clients.producer.ProducerRecord;
-
- import java.util.Properties;
- import java.util.concurrent.ExecutionException;
-
- public class CustomProducerSync {
- public static void main(String[] args) throws ExecutionException, InterruptedException {
- // 1. 创建kafka生产者的配置对象
- Properties properties = new Properties();
- // 2. 给kafka配置对象添加配置信息:bootstrap.servers
- properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
- // key,value序列化(必须):
- properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
- properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
- // 3. 创建kafka生产者对象
- KafkaProducer<String,String> kafkaProducer = new KafkaProducer<String,String>(properties);
- // 4. 调用send方法,发送消息
- for (int i = 0; i < 10; i++) {
- // 默认为异步发送
- kafkaProducer.send(new ProducerRecord<>("first1", "atguigu" + i));
- // 末尾加get为同步发送
- kafkaProducer.send(new ProducerRecord<>("first1", "atguigu" + i)).get();
- }
-
- // 5. 关闭资源
- kafkaProducer.close();
- }
- }
异步消息有两种:
- import org.apache.kafka.clients.producer.KafkaProducer;
- import org.apache.kafka.clients.producer.ProducerConfig;
- import org.apache.kafka.clients.producer.ProducerRecord;
-
- import java.util.Properties;
-
- public class CustomProducer {
- public static void main(String[] args) {
- // 1. 创建kafka生产者的配置对象
- Properties properties = new Properties();
- // 2. 给kafka配置对象添加配置信息:bootstrap.servers
- properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
- // key,value序列化(必须):
- properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
- properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
- // 3. 创建kafka生产者对象
- KafkaProducer<String,String> kafkaProducer = new KafkaProducer<String,String>(properties);
- // 4. 调用send方法,发送消息
- for (int i = 0; i < 10; i++) {
- kafkaProducer.send(new ProducerRecord<>("first", "wtyy"));
- }
- // 5. 关闭资源
- kafkaProducer.close();
- }
- }
回调函数会在 producer 收到 ack 时调用,为异步调用,该方法有两个参数,分别是 RecordMetadata 和 Exception,如果 Exception 为 null,说明消息发送成功,如果 Exception 不为 null,说明消息发送失败。
注意:消息发送失败会自动重试,不需要我们在回调函数中手动重试。
- import org.apache.kafka.clients.producer.*;
-
- import java.util.Properties;
-
- public class CustomProducerCallBack {
- public static void main(String[] args) {
- // 1. 创建kafka生产者的配置对象
- Properties properties = new Properties();
- // 2. 给kafka配置对象添加配置信息:bootstrap.servers
- properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
- // key,value序列化(必须):
- // 序列化器的serialization是一个接口,找到他的实现类
- // 我们一般都是使用String
- properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
- properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
- // 3. 创建kafka生产者对象
- KafkaProducer<String,String> kafkaProducer = new KafkaProducer<String,String>(properties);
- // 4. 调用send方法,发送消息
- for (int i = 0; i < 10; i++) {
- kafkaProducer.send(new ProducerRecord<>("first1", "atguigu" + i),
- new Callback() {
- @Override
- public void onCompletion(RecordMetadata metadata, Exception exception) {
- //(1)消息发送成功 exception == null 接受到服务端ack消息 调用该方法
- //(2)消息发送失败 exception != null 也会调用该方法
- if (exception == null) {
- System.out.println(metadata);//使用打印演示
- }else{
- exception.printStackTrace();//打印异常信息
- }
- }
- });
- }
- // 5. 关闭资源
- kafkaProducer.close();
- }
- }
以订单为例,

- @RestController
- public class OrderController {
-
- @Autowired
- private KafkaTemplate<String, String> kafkaTemplate;
-
- @GetMapping("/send")
- public String send() throws InterruptedException {
- int size = 1000;
- for (int i = 0; i < size; i++) {
- OrderDto orderDto = new InterOrderDto();
- orderDto.setOrderNo(i + "");
- orderDto.setPayStatus(getStatus(0));
- orderDto.setTimestamp(System.currentTimeMillis());
- //相同的key发送到相同的分区
- kafkaTemplate.send(Constants.TOPIC_ORDER, orderDto.getOrderNo(), JSON.toJSONString(orderDto));
- TimeUnit.MILLISECONDS.sleep(10);
- orderDto.setPayStatus(getStatus(1));
- orderDto.setTimestamp(System.currentTimeMillis());
- kafkaTemplate.send(Constants.TOPIC_ORDER, orderDto.getOrderNo(), JSON.toJSONString(orderDto));
- TimeUnit.MILLISECONDS.sleep(10);
- orderDto.setPayStatus(getStatus(2));
- orderDto.setTimestamp(System.currentTimeMillis());
- kafkaTemplate.send(Constants.TOPIC_ORDER, orderDto.getOrderNo(), JSON.toJSONString(orderDto));
- }
- return "success";
- }
-
- private String getStatus(int status){
- return status == 0 ? "待支付" : status == 1 ? "已支付" : "支付失败";
- }
- }