• kafka(四)消息类型


    一、同步消息

    1、生产者

    同步发送的意思就是,一条消息发送之后,会阻塞当前线程,直至返回 ack。 由于 send 方法返回的是一个 Future 对象,根据 Futrue 对象的特点,我们也可以实现同 步发送的效果,只需在调用 Future 对象的 get 方发即可。

    1. import org.apache.kafka.clients.producer.KafkaProducer;
    2. import org.apache.kafka.clients.producer.ProducerConfig;
    3. import org.apache.kafka.clients.producer.ProducerRecord;
    4. import java.util.Properties;
    5. import java.util.concurrent.ExecutionException;
    6. public class CustomProducerSync {
    7. public static void main(String[] args) throws ExecutionException, InterruptedException {
    8. // 1. 创建kafka生产者的配置对象
    9. Properties properties = new Properties();
    10. // 2. 给kafka配置对象添加配置信息:bootstrap.servers
    11. properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
    12. // key,value序列化(必须):
    13. properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
    14. properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
    15. // 3. 创建kafka生产者对象
    16. KafkaProducer<String,String> kafkaProducer = new KafkaProducer<String,String>(properties);
    17. // 4. 调用send方法,发送消息
    18. for (int i = 0; i < 10; i++) {
    19. // 默认为异步发送
    20. kafkaProducer.send(new ProducerRecord<>("first1", "atguigu" + i));
    21. // 末尾加get为同步发送
    22. kafkaProducer.send(new ProducerRecord<>("first1", "atguigu" + i)).get();
    23. }
    24. // 5. 关闭资源
    25. kafkaProducer.close();
    26. }
    27. }

    二、异步消息

    1、生产者

    异步消息有两种:

    1.1、普通异步
    1. import org.apache.kafka.clients.producer.KafkaProducer;
    2. import org.apache.kafka.clients.producer.ProducerConfig;
    3. import org.apache.kafka.clients.producer.ProducerRecord;
    4. import java.util.Properties;
    5. public class CustomProducer {
    6. public static void main(String[] args) {
    7. // 1. 创建kafka生产者的配置对象
    8. Properties properties = new Properties();
    9. // 2. 给kafka配置对象添加配置信息:bootstrap.servers
    10. properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
    11. // key,value序列化(必须):
    12. properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
    13. properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
    14. // 3. 创建kafka生产者对象
    15. KafkaProducer<String,String> kafkaProducer = new KafkaProducer<String,String>(properties);
    16. // 4. 调用send方法,发送消息
    17. for (int i = 0; i < 10; i++) {
    18. kafkaProducer.send(new ProducerRecord<>("first", "wtyy"));
    19. }
    20. // 5. 关闭资源
    21. kafkaProducer.close();
    22. }
    23. }
    1.2、带回调函数的异步发送

    回调函数会在 producer 收到 ack 时调用,为异步调用,该方法有两个参数,分别是 RecordMetadata 和 Exception,如果 Exception 为 null,说明消息发送成功,如果 Exception 不为 null,说明消息发送失败。

    注意:消息发送失败会自动重试,不需要我们在回调函数中手动重试。

    1. import org.apache.kafka.clients.producer.*;
    2. import java.util.Properties;
    3. public class CustomProducerCallBack {
    4. public static void main(String[] args) {
    5. // 1. 创建kafka生产者的配置对象
    6. Properties properties = new Properties();
    7. // 2. 给kafka配置对象添加配置信息:bootstrap.servers
    8. properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
    9. // key,value序列化(必须):
    10. // 序列化器的serialization是一个接口,找到他的实现类
    11. // 我们一般都是使用String
    12. properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
    13. properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
    14. // 3. 创建kafka生产者对象
    15. KafkaProducer<String,String> kafkaProducer = new KafkaProducer<String,String>(properties);
    16. // 4. 调用send方法,发送消息
    17. for (int i = 0; i < 10; i++) {
    18. kafkaProducer.send(new ProducerRecord<>("first1", "atguigu" + i),
    19. new Callback() {
    20. @Override
    21. public void onCompletion(RecordMetadata metadata, Exception exception) {
    22. //(1)消息发送成功 exception == null 接受到服务端ack消息 调用该方法
    23. //(2)消息发送失败 exception != null 也会调用该方法
    24. if (exception == null) {
    25. System.out.println(metadata);//使用打印演示
    26. }else{
    27. exception.printStackTrace();//打印异常信息
    28. }
    29. }
    30. });
    31. }
    32. // 5. 关闭资源
    33. kafkaProducer.close();
    34. }
    35. }

    三、顺序消息

    以订单为例,

    • 生产者将相同的key的订单状态事件推送到kafka的同一分区
    • kafka 消费者接收消息
    • 消费者将消息提交给线程池
    • 线程池根据接收到的消息,将订单状态事件使用路由策略选择其中一个线程,将具有相同路由key的事件发送到同一个线程的阻塞队列中
    • 单个线程不停的从阻塞队列获取订单状态消息消费

    1. @RestController
    2. public class OrderController {
    3. @Autowired
    4. private KafkaTemplate<String, String> kafkaTemplate;
    5. @GetMapping("/send")
    6. public String send() throws InterruptedException {
    7. int size = 1000;
    8. for (int i = 0; i < size; i++) {
    9. OrderDto orderDto = new InterOrderDto();
    10. orderDto.setOrderNo(i + "");
    11. orderDto.setPayStatus(getStatus(0));
    12. orderDto.setTimestamp(System.currentTimeMillis());
    13. //相同的key发送到相同的分区
    14. kafkaTemplate.send(Constants.TOPIC_ORDER, orderDto.getOrderNo(), JSON.toJSONString(orderDto));
    15. TimeUnit.MILLISECONDS.sleep(10);
    16. orderDto.setPayStatus(getStatus(1));
    17. orderDto.setTimestamp(System.currentTimeMillis());
    18. kafkaTemplate.send(Constants.TOPIC_ORDER, orderDto.getOrderNo(), JSON.toJSONString(orderDto));
    19. TimeUnit.MILLISECONDS.sleep(10);
    20. orderDto.setPayStatus(getStatus(2));
    21. orderDto.setTimestamp(System.currentTimeMillis());
    22. kafkaTemplate.send(Constants.TOPIC_ORDER, orderDto.getOrderNo(), JSON.toJSONString(orderDto));
    23. }
    24. return "success";
    25. }
    26. private String getStatus(int status){
    27. return status == 0 ? "待支付" : status == 1 ? "已支付" : "支付失败";
    28. }
    29. }

  • 相关阅读:
    腾讯云数据库公有云市场稳居TOP 2!
    【服务器】无法进行ssh连接的问题逐一排查以及解决方法
    C++日期类实现(联系类和对象)
    C++项目实战——基于多设计模式下的同步&异步日志系统-⑫-日志宏&全局接口设计(代理模式)
    设计模式之单例设计模式
    架构与思维:互联网高性能Web架构
    Leetcode 剑指 Offer II 030. 插入、删除和随机访问都是 O(1) 的容器
    UE5 官方案例Lyra 全特性详解 9.蓝图消息系统
    elasticsearch 官方优化建议
    怎么按要求对PDF文件进行拆分?PDF拆分教程来了
  • 原文地址:https://blog.csdn.net/w_t_y_y/article/details/139879392