• Kafka消息队列


    前言

    一、kafka核心概念

    • 概念
      Kafka是消息队列。简称:MQ。MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法。应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,而无需专用连接来链接它们。消息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信,直接调用通常是用于诸如远程过程调用的技术。排队指的是应用程序通过 队列来通信。队列的使用除去了接收和发送应用程序同时执行的要求。其中较为成熟的MQ产品有IBM WEBSPHERE MQ等等…

    二、kafka应用场景

    在微服务系统中,微服务之间通信,主要是通过Http或者gRPC通信。由于http/gRPC通信方式是同步通信,如果遇到了高并发,同步通信就会导致微服务系统性能瓶颈,所以,为了解决微服务性能瓶颈问题。需要将同步通信换成异步通信方式。因此。就选用使用消息队列。
    消息队列的代表技术,就是Kafka。

    三、kafka微服务落地

    • 条件
    • 技术选型
      • RabbitMQ 适用于消息数据在10G以内
      • kafka 适用于用在海量消息场景中[数据达到TP级]
    • 步骤
      • 安装JDK

      • 安装kafkatool_64bit工具

      • 配置

            #在kafka根目录下的[/config/zookeeper.properties]
            dataDir=/tmp/zookeeper
            clientPort=2181
            maxClientCnxns=0
            admin.enableServer=false
        
        • 1
        • 2
        • 3
        • 4
        • 5
      • 运行zookeeper [注册中心]

            #在kafka根目录下的[/bin/windows]文件下启动zookeeper:
            zookeeper-server-start.bat  ../../config/zookeeper.properties
        
        • 1
        • 2

        运行结果如下:
        在这里插入图片描述

      • 运行kafka

        • 配置文件
            #在kafka根目录下的[/config/server.properties]配置持久化消息的文件
            log.dirs=/tmp/kafka-logs
            #其他配置为默认即可
          
          • 1
          • 2
          • 3
        • 运行 kafka
            #在kafka根目录下的[/bin/windows]文件下启动kafka:
            kafka-server-start.bat  ../../config/server.properties
          
          • 1
          • 2
          运行结果如下:
          在这里插入图片描述
      • 项目配置

        • nuget安装
             Confluent.Kafka
          
          • 1
        • 核心代码
          • 生产者代码

                   var producerConfig = new ProducerConfig
                    {
                        BootstrapServers = "127.0.0.1:9092",
                        MessageTimeoutMs = 50000,
                        //失败重试
                        EnableIdempotence = true 
                    };
                    var builder = new ProducerBuilder(producerConfig);
            
                    using (var producer = builder.Build())
                    {
                        try
                        {
                            //数据对象
                            var data = new { name = "小明", sex = "男", old = 12 };
                            string  dataJson = JsonConvert.SerializeObject(data);
                            //参数1:队列名称
                            //参数2:消息数据
                            var dr = producer.ProduceAsync("QueueName", new Message { Key = "data", Value = dataJson }).GetAwaiter().GetResult();
                            Console.WriteLine("------------消息发送成功!----------------");
                        }
                        catch (ProduceException ex)
                        {
                            Console.WriteLine($"消息发送失败!;data:" + ex.Error.Reason);
                        }
                   }
            
            • 1
            • 2
            • 3
            • 4
            • 5
            • 6
            • 7
            • 8
            • 9
            • 10
            • 11
            • 12
            • 13
            • 14
            • 15
            • 16
            • 17
            • 18
            • 19
            • 20
            • 21
            • 22
            • 23
            • 24
            • 25
            • 26
          • 算法
            //在生产者中设置算法
            builder.SetDefaultPartitioner([函数名称\委托]]);

            • 轮询算法
              static int requestCount = 0;
              public Partition RoundRobinPartitioner(string tipic,int partitionCount,ReadOnlySpankeyData,bool keyIsNull)
              {
              int partition = requestCount%partitionCount;
              requestCount++;
              return new Partition(partition);
              }
          • 创建分区
            public async Task CreatePartition(string topic,int partitionCount)
            {
            AdminClientConfig adminClientConfig = new AdminClientConfig(){
            BootstrapServers = “127.0.0.1:9092”
            };
            var build = new AdminClientBuild(adminClientConfig).Build();

                    build.CreatePartitionsAsync(new PartitionsSpecification[]{
                    new PartitionsSpecification(){Topic = topic,IncreaseTo = partitionCount}
                    }).Wait();
                    await  Task.CompletedTask;
                 } 
            
            • 1
            • 2
            • 3
            • 4
            • 5
          • 消费者代码

            • 备注
              建议使用关系型数据库存储分区、偏移量和主题名称,以下代码使用的是redis存储的便偏移量

              private IDistributedCache distributedCache;
              public 构造函数(IDistributedCache _distributedCache)
              {
                  distributedCache = _distributedCache;
              } 
              
                  //消费者代码
                    Console.WriteLine("------------消息消费中--------");
                    //偏移量定义
                    Offset offset = 0;
                    Task.Run(() => {
                    // 创建主题代码未加
                    CreateTopic("QueueName");
                     
                    var consumerConfig = new ConsumerConfig
                    {
                        BootstrapServers = "127.0.0.1:9092",
                        AutoOffsetReset = AutoOffsetReset.Earliest,
                        GroupId = "temp",
                        //消息确认   true:自动确认 false:手动确认
                        //手动确认
                        EnableAutoCommit = true
                    };
                    var builder = new ConsumerBuilder(consumerConfig);
                    using var consumer = builder.Build(); 
                    //订阅
                    consumer.Subscribe("QueueName");
                    //从Redis中获取偏移量
                    string offset = distributedCache.GetString("QueueName");
                    if (string.IsNullOrEmpty(offset))
                     {
                        offset = "0";
                    }
                    重置偏移量
                    TopicPartitionOffset对象参数:1、分区对象[队列明名称,分区] 3、偏移量
                    consumer.Assign(new TopicPartitionOffset(new TopicPartition("QueueName",0),int.Parse(offset)));
                    while (true)
                    {
                        try
                        {
                                //消费后  确认消息
                                var result = consumer.Consume(); 
                                //存储偏移量
                                distributedCache.SetString("QueueName",result.Offset.Value.ToString()); 
                                //业务
                                string key = result.Key;
                                string value = result.Value;
                                Console.WriteLine($"-----------key:{key},value :{value}");
                                Console.WriteLine("------------消费成功!--------------"); 
                            }
                            catch (Exception ex)
                            {
                                Console.WriteLine($"消息消费失败!;data:" + ex.Message);
                            }
                        }
              
                    });
                    Console.ReadLine();      
              
              • 1
              • 2
              • 3
              • 4
              • 5
              • 6
              • 7
              • 8
              • 9
              • 10
              • 11
              • 12
              • 13
              • 14
              • 15
              • 16
              • 17
              • 18
              • 19
              • 20
              • 21
              • 22
              • 23
              • 24
              • 25
              • 26
              • 27
              • 28
              • 29
              • 30
              • 31
              • 32
              • 33
              • 34
              • 35
              • 36
              • 37
              • 38
              • 39
              • 40
              • 41
              • 42
              • 43
              • 44
              • 45
              • 46
              • 47
              • 48
              • 49
              • 50
              • 51
              • 52
              • 53
              • 54
              • 55
              • 56
              • 57
              • 58
          • 创建主题

               public async Task CreateTopic(string topicName)
               {
                    AdminClientConfig adminClientConfig = new AdminClientConfig(){
                    BootstrapServers = "127.0.0.1:9092"
                    };
                    var  build = new  AdminClientBuild(adminClientConfig).Build();
                    build.CreateTopicsAsync(new TopicSpecification[]{
                    new TopicSpecification(){Name = topicName [主题名称]}
                    }).Wait();
                    await Task.CompletedTask;
               }
               
                
            
            • 1
            • 2
            • 3
            • 4
            • 5
            • 6
            • 7
            • 8
            • 9
            • 10
            • 11
            • 12
            • 13
    • 配置属性含义
      属性含义数据类型备注类型
      BootstrapServersstringkafka连接字符串地址生产者 消费者
      MessageTimeoutMsint超时时间生产者
      GroupIdstring组ID消费者
      EnableAutoCommitbooltrue:自动确认 false:手动确认消费者
      EnableIdempotencebool失败重试 true:重试 false:不重试生产者

    四、kafka核心功能落地

    • 手动确认消息

      • 场景
        当消费者端消费消息后,业务逻辑执行失败了,消息需要使用手动确认。
        如果使用自动确认,当消费者端消费消息后,自动去kafka确认消息,不管业务的成功与失败,这样会导致消息丢失。
      • 生产者
        • 实例代码
              var producerConfig = new ProducerConfig
              {
                  BootstrapServers = "127.0.0.1:9092",
                  MessageTimeoutMs = 50000
              };
              var builder = new ProducerBuilder\(producerConfig);
              using (var producer = builder.Build())
              {
                  try
                  {
                      //数据对象
                      var data = new { name = "小明", sex = "男", old = 12 };
                      string  dataJson = JsonConvert.SerializeObject(data);
                      var dr = producer.ProduceAsync("QueueName", new Message\ { Key = "data", Value = dataJson }).GetAwaiter().GetResult();
                      Console.WriteLine("------------消息发送成功!----------------");
                  }
                  catch (ProduceException\ ex)
                  {
                      Console.WriteLine(\$"消息发送失败!;data:" + ex.Error.Reason);
                  }
              }
          
          
          • 1
          • 2
          • 3
          • 4
          • 5
          • 6
          • 7
          • 8
          • 9
          • 10
          • 11
          • 12
          • 13
          • 14
          • 15
          • 16
          • 17
          • 18
          • 19
          • 20
          • 21
          • 22
      • 消费者
        • 代码
              var consumerConfig = new ConsumerConfig
              {
              ..........
              //消息确认   true:自动确认 false:手动确认
              //自动确认的
              EnableAutoCommit = false
              };
          *   实例代码
              Console.WriteLine("------------消息消费中--------");
              Task.Run(() => {
              var consumerConfig = new ConsumerConfig
              {
              BootstrapServers = "127.0.0.1:9092",
              AutoOffsetReset = AutoOffsetReset.Earliest,
              GroupId = "temp",
              //消息确认   true:自动确认 false:手动确认
              //自动确认的
              EnableAutoCommit = false
              };
              var builder = new ConsumerBuilder\(consumerConfig);
              using  var consumer = builder.Build();
              //订阅
              consumer.Subscribe("QueueName");
              while (true)
              {
              try
              {
              //消费后  确认消息
              var result = consumer.Consume();
              //业务
              string key = result.Key;
              string value = result.Value;
              Console.WriteLine(`$"-----------key:{key},value :{value}");
                          Console.WriteLine("------------消费成功!--------------");
                            //手动提交 
                          consumer.StoreOffset(result);
                        }
                        catch (Exception ex)
                        {
                            Console.WriteLine($`"消息消费失败!;data:" + ex.Message);
              }
              }
          
                  });
                  Console.ReadLine();
          
          • 1
          • 2
          • 3
          • 4
          • 5
          • 6
          • 7
          • 8
          • 9
          • 10
          • 11
          • 12
          • 13
          • 14
          • 15
          • 16
          • 17
          • 18
          • 19
          • 20
          • 21
          • 22
          • 23
          • 24
          • 25
          • 26
          • 27
          • 28
          • 29
          • 30
          • 31
          • 32
          • 33
          • 34
          • 35
          • 36
          • 37
          • 38
          • 39
          • 40
          • 41
          • 42
          • 43
          • 44
          • 45
    • 手动确认消息–偏移量[重复消费]

      • 什么是偏移量
        偏移量就是消息的序号。
      • 偏移量生成过程
        生产者第一次发送一个消息到kafka,kafka会给这个消息生成一个序号后,kafka再将消息发送给消费者端,消费者端收到消息消费后通过Commit或者StoreOffset函数提交给给kafka,kafka收到后给当前消息的序号加一在生成一个序号,再次等待生产者端第二次发送消息到kafka将消息放在新的序号里面,kafka再将消息发送给消费者端,消费者端收到消息消费后通过Commit或者StoreOffset函数提交给给kafka,kafka收到后给当前消息的序号加一在生成一个序号,以此类推…
      • 场景
        如果消费者端消息消费后,业务也执行成功,准备使用Commit或者StoreOffset函数提交给kafka,kafka突然宕机了,无法在当前的序号[比如:当前序号 1]加一,重启kafka后会造成[当前序号为1]的消息会重复消费。
      • 解决方案 [示例代码中未实现]
        • 将当前的偏移量记录下来,然后重置偏移量,如果使用重置偏移量的方式来解决消息重复消费的问题,可以不用使用手动提交,直接改为自动提交即可。
        • 使用Redis的方式来存储
          • 缺陷
            无法保证业务与数据库同时成功。
            • 方案
              使用数据库来存储偏移量[核心:通过数据库事务来操作。]。
      • 生产者
        • 实例代码
              var producerConfig = new ProducerConfig
              {
                  BootstrapServers = "127.0.0.1:9092",
                  MessageTimeoutMs = 50000
              };
              var builder = new ProducerBuilder\(producerConfig);
              using (var producer = builder.Build())
              {
                  try
                  {
                      //数据对象
                      var data = new { name = "小明", sex = "男", old = 12 };
                      string  dataJson = JsonConvert.SerializeObject(data);0
                      var dr = producer.ProduceAsync("QueueName", new Message\ { Key = "data", Value = dataJson }).GetAwaiter().GetResult();
                      Console.WriteLine("------------消息发送成功!----------------");
                  }
                  catch (ProduceException\ ex)
                  {
                      Console.WriteLine(\$"消息发送失败!;data:" + ex.Error.Reason);
                  }
              }
          
          • 1
          • 2
          • 3
          • 4
          • 5
          • 6
          • 7
          • 8
          • 9
          • 10
          • 11
          • 12
          • 13
          • 14
          • 15
          • 16
          • 17
          • 18
          • 19
          • 20
          • 21
      • 消费者
        • 实例代码
          • Nuget包
               Microsoft.Extensions.Caching.Redis
            
            • 1
          • 在Startup类中的ConfigureServices函数中注册Redis服务
                services.AddDistributedRedisCache(options=>{
                options.Configuration = "localhost:6379"
                });
            
            • 1
            • 2
            • 3
          • 控制代码
                private IDistributedCache distributedCache;
                public 构造函数(IDistributedCache \_distributedCache)
                {
                distributedCache = \_distributedCache;
                }
            
                            //消费者代码
                              Console.WriteLine("------------消息消费中--------");
                              //偏移量定义
                              Offset offset = 0;
                              Task.Run(() => {
                              var consumerConfig = new ConsumerConfig
                              {
                                  BootstrapServers = "127.0.0.1:9092",
                                  AutoOffsetReset = AutoOffsetReset.Earliest,
                                  GroupId = "temp",
                                  //消息确认   true:自动确认 false:手动确认
                                  //手动确认
                                  EnableAutoCommit = true
                              };
                              var builder = new ConsumerBuilder(consumerConfig);
                              using var consumer = builder.Build(); 
                              //订阅
                              consumer.Subscribe("QueueName");
                              //从Redis中获取偏移量
                              string offset = distributedCache.GetString("QueueName");
                              if (string.IsNullOrEmpty(offset))
                              {
                                  offset = "0";
                              }
                              //重置偏移量
                              //TopicPartitionOffset对象参数:1、分区对象[队列明名称,分区] 3、偏移量
                              consumer.Assign(new TopicPartitionOffset(new TopicPartition("QueueName",0),int.Parse(offset)));
                              while (true)
                              {
                                  try
                                  {
                                          //消费后  确认消息
                                          var result = consumer.Consume(); 
                                          ///存储偏移量
                                          distributedCache.SetString("QueueName",result.Offset.Value.ToString()); 
                                          //业务
                                          string key = result.Key;
                                          string value = result.Value;
                                          Console.WriteLine($"-----------key:{key},value :{value}");
                                          Console.WriteLine("------------消费成功!--------------"); 
                                      }
                                      catch (Exception ex)
                                      {
                                          Console.WriteLine($"消息消费失败!;data:" + ex.Message);
                                      }
                                  }
            
                              });
                              Console.ReadLine();
            
            • 1
            • 2
            • 3
            • 4
            • 5
            • 6
            • 7
            • 8
            • 9
            • 10
            • 11
            • 12
            • 13
            • 14
            • 15
            • 16
            • 17
            • 18
            • 19
            • 20
            • 21
            • 22
            • 23
            • 24
            • 25
            • 26
            • 27
            • 28
            • 29
            • 30
            • 31
            • 32
            • 33
            • 34
            • 35
            • 36
            • 37
            • 38
            • 39
            • 40
            • 41
            • 42
            • 43
            • 44
            • 45
            • 46
            • 47
            • 48
            • 49
            • 50
            • 51
            • 52
            • 53
            • 54
            • 55
    • kafka宕机后如何保证消息不丢失的

      • 默认配置是消息持久化,存储在磁盘中,存储路径:/tmp/kafka-logs/文件名.log[配置KafKa的配置文件[server.properties]]
    • 订阅发布[广播消费] [组 GroupId]

      • 使用场景
        生产者发送消息到队列,多个服务消费同一个消息。
      • 独立偏移量的组成
        • 主题+分区+groupId
      • 代码实现
        • 生产者
              var producerConfig = new ProducerConfig
              {
                  BootstrapServers = "127.0.0.1:9092",
                  MessageTimeoutMs = 50000
              };
              var builder = new ProducerBuilder\(producerConfig);
              using (var producer = builder.Build())
              {
                  try
                  {
                      //数据对象
                      var data = new { name = "小明", sex = "男", old = 12 };
                      string  dataJson = JsonConvert.SerializeObject(data);0
                      var dr = producer.ProduceAsync("QueueName", new Message\ { Key = "data", Value = dataJson }).GetAwaiter().GetResult();
                      Console.WriteLine("------------消息发送成功!----------------");
                  }
                  catch (ProduceException\ ex)
                  {
                      Console.WriteLine(\$"消息发送失败!;data:" + ex.Error.Reason);
                  }
              }
          
          • 1
          • 2
          • 3
          • 4
          • 5
          • 6
          • 7
          • 8
          • 9
          • 10
          • 11
          • 12
          • 13
          • 14
          • 15
          • 16
          • 17
          • 18
          • 19
          • 20
          • 21
        • 消费者
          • 代码
                var consumerConfig = new ConsumerConfig
                {
                ......
                GroupId = "temp",// 如果其他服务也想同时消费一个服务,其他服务在同一个主题之下设置不同的组ID即可。
                .....
                };
            
            • 1
            • 2
            • 3
            • 4
            • 5
            • 6
          • 实例代码
            • 订单服务
                  private IDistributedCache distributedCache;
                  public 构造函数(IDistributedCache \_distributedCache)
                  {
                  distributedCache = \_distributedCache;
                  }
              
                                //消费者代码
                                  Console.WriteLine("------------消息消费中--------");
                                  //偏移量定义
                                  Offset offset = 0;
                                  Task.Run(() => {
                                  var consumerConfig = new ConsumerConfig
                                  {
                                      BootstrapServers = "127.0.0.1:9092",
                                      AutoOffsetReset = AutoOffsetReset.Earliest,
                                      GroupId = "temp",
                                      //消息确认   true:自动确认 false:手动确认
                                      //手动确认
                                      EnableAutoCommit = true
                                  };
                                  var builder = new ConsumerBuilder(consumerConfig);
                                  using var consumer = builder.Build(); 
                                  //订阅
                                  consumer.Subscribe("QueueName");
                                  //从Redis中获取偏移量
                                  string offset = distributedCache.GetString("QueueName");
                                  if (string.IsNullOrEmpty(offset))
                                  {
                                      offset = "0";
                                  }
                                  //重置偏移量
                                  //TopicPartitionOffset对象参数:1、分区对象[队列明名称,分区] 3、偏移量
                                  consumer.Assign(new TopicPartitionOffset(new TopicPartition("QueueName",0),int.Parse(offset)));
                                  while (true)
                                  {
                                      try
                                      {
                                              //消费后  确认消息
                                              var result = consumer.Consume(); 
                                              ///存储偏移量
                                              distributedCache.SetString("QueueName",result.Offset.Value.ToString()); 
                                              //业务
                                              string key = result.Key;
                                              string value = result.Value;
                                              Console.WriteLine($"-----------key:{key},value :{value}");
                                              Console.WriteLine("------------消费成功!--------------"); 
                                          }
                                          catch (Exception ex)
                                          {
                                              Console.WriteLine($"消息消费失败!;data:" + ex.Message);
                                          }
                                      }
              
                                  });
                                  Console.ReadLine(); 
              
              • 1
              • 2
              • 3
              • 4
              • 5
              • 6
              • 7
              • 8
              • 9
              • 10
              • 11
              • 12
              • 13
              • 14
              • 15
              • 16
              • 17
              • 18
              • 19
              • 20
              • 21
              • 22
              • 23
              • 24
              • 25
              • 26
              • 27
              • 28
              • 29
              • 30
              • 31
              • 32
              • 33
              • 34
              • 35
              • 36
              • 37
              • 38
              • 39
              • 40
              • 41
              • 42
              • 43
              • 44
              • 45
              • 46
              • 47
              • 48
              • 49
              • 50
              • 51
              • 52
              • 53
              • 54
              • 55
            • 短信服务
                  private IDistributedCache distributedCache;
                  public 构造函数(IDistributedCache \_distributedCache)
                  {
                  distributedCache = \_distributedCache;
                  }
              
                                //消费者代码
                                  Console.WriteLine("------------消息消费中--------");
                                  //偏移量定义
                                  Offset offset = 0;
                                  Task.Run(() => {
                                  var consumerConfig = new ConsumerConfig
                                  {
                                      BootstrapServers = "127.0.0.1:9092",
                                      AutoOffsetReset = AutoOffsetReset.Earliest,
                                      GroupId = "sms",
                                      //消息确认   true:自动确认 false:手动确认
                                      //手动确认
                                      EnableAutoCommit = true
                                  };
                                  var builder = new ConsumerBuilder(consumerConfig);
                                  using var consumer = builder.Build(); 
                                  //订阅
                                  consumer.Subscribe("QueueName");
                                  //从Redis中获取偏移量
                                  string offset = distributedCache.GetString("QueueName");
                                  if (string.IsNullOrEmpty(offset))
                                  {
                                      offset = "0";
                                  }
                                  //重置偏移量
                                  //TopicPartitionOffset对象参数:1、分区对象[队列明名称,分区] 3、偏移量
                                  consumer.Assign(new TopicPartitionOffset(new TopicPartition("QueueName",0),int.Parse(offset)));
                                  while (true)
                                  {
                                      try
                                      {
                                              //消费后  确认消息
                                              var result = consumer.Consume(); 
                                              ///存储偏移量
                                              distributedCache.SetString("QueueName",result.Offset.Value.ToString()); 
                                              //业务
                                              string key = result.Key;
                                              string value = result.Value;
                                              Console.WriteLine($"-----------key:{key},value :{value}");
                                              Console.WriteLine("------------消费成功!--------------"); 
                                          }
                                          catch (Exception ex)
                                          {
                                              Console.WriteLine($"消息消费失败!;data:" + ex.Message);
                                          }
                                      }
              
                                  });
                                  Console.ReadLine();      
              
              • 1
              • 2
              • 3
              • 4
              • 5
              • 6
              • 7
              • 8
              • 9
              • 10
              • 11
              • 12
              • 13
              • 14
              • 15
              • 16
              • 17
              • 18
              • 19
              • 20
              • 21
              • 22
              • 23
              • 24
              • 25
              • 26
              • 27
              • 28
              • 29
              • 30
              • 31
              • 32
              • 33
              • 34
              • 35
              • 36
              • 37
              • 38
              • 39
              • 40
              • 41
              • 42
              • 43
              • 44
              • 45
              • 46
              • 47
              • 48
              • 49
              • 50
              • 51
              • 52
              • 53
              • 54
              • 55

    五、kafka分区落地

    • 分区
      一个消费者只能对应一个分区。
    • 场景
      • 解决大量消息堆积的问题。
    • 作用
      • 解决消息堆积的问题
      • 存储海量的数据
    • 代码实现
      • 创建分区
            public  async Task CreatePartition(string topic,int partitionCount)
            {
                AdminClientConfig adminClientConfig = new AdminClientConfig(){
                BootstrapServers = "127.0.0.1:9092"
            };
                var  build = new  AdminClientBuild(adminClientConfig).Build();
                build.CreatePartitionsAsync(new PartitionsSpecification\[]{
                new PartitionsSpecification(){Topic = topic,IncreaseTo = partitionCount}
                }).Wait();
                await Task.CompletedTask;
            }
        
        • 1
        • 2
        • 3
        • 4
        • 5
        • 6
        • 7
        • 8
        • 9
        • 10
        • 11
      • 轮询算法
            static int requestCount = 0;
            public   Partition  RoundRobinPartitioner(string tipic,int partitionCount,ReadOnlySpan<byte>keyData,bool keyIsNull)
            {
                int partition = requestCount%partitionCount;
                requestCount++;
                return new Partition(partition);
            }
        
        • 1
        • 2
        • 3
        • 4
        • 5
        • 6
        • 7
      • 生产者
        • 代码
              //设置轮询算法
              builder.SetDefaultPartitioner(\[算法 委托函数]);
          
          • 1
          • 2
        • 实例代码
              var producerConfig = new ProducerConfig
              {
                  BootstrapServers = "127.0.0.1:9092",
                  MessageTimeoutMs = 50000
              };
              var builder = new ProducerBuilder\(producerConfig);
              //设置轮询算法
              builder.SetDefaultPartitioner(RoundRobinPartitioner);
              using (var producer = builder.Build())
              {
                  try
                  {
                  //数据对象
                      var data = new { name = "小明", sex = "男", old = 12 };
                      string  dataJson = JsonConvert.SerializeObject(data);0
                      var dr = producer.ProduceAsync("QueueName", new Message\ { Key = "data", Value = dataJson }).GetAwaiter().GetResult();
                      Console.WriteLine("------------消息发送成功!----------------");
                  }
                  catch (ProduceException\ ex)
                  {
                      Console.WriteLine(\$"消息发送失败!;data:" + ex.Error.Reason);
                  }
              }
          
          • 1
          • 2
          • 3
          • 4
          • 5
          • 6
          • 7
          • 8
          • 9
          • 10
          • 11
          • 12
          • 13
          • 14
          • 15
          • 16
          • 17
          • 18
          • 19
          • 20
          • 21
          • 22
          • 23
      • 消费者
        • 订单服务 [5001]
              private IDistributedCache distributedCache;
              public 构造函数(IDistributedCache \_distributedCache)
              {
              distributedCache = \_distributedCache;
              }
          
                            //消费者代码
                              Console.WriteLine("------------消息消费中--------");
                              //偏移量定义
                              Offset offset = 0;
                              Task.Run(() => {
                              var consumerConfig = new ConsumerConfig
                              {
                                  BootstrapServers = "127.0.0.1:9092",
                                  AutoOffsetReset = AutoOffsetReset.Earliest,
                                  GroupId = "temp",
                                  //消息确认   true:自动确认 false:手动确认
                                  //手动确认
                                  EnableAutoCommit = true
                              };
                              var builder = new ConsumerBuilder(consumerConfig);
                              using var consumer = builder.Build(); 
                              //订阅
                              consumer.Subscribe("QueueName");
                              //从Redis中获取偏移量
                              string offset = distributedCache.GetString("QueueName");
                              if (string.IsNullOrEmpty(offset))
                              {
                                  offset = "0";
                              }
                              //获取分区 
                              ..... [代码未加]
                              //重置偏移量
                              //TopicPartitionOffset对象参数:1、分区对象[队列明名称,分区] 3、偏移量
                              //consumer.Assign(new TopicPartitionOffset(new TopicPartition("QueueName",0[分区动态从数据库获取]),int.Parse(offset)));
                              while (true)
                              {
                                  try
                                  {
                                          //消费后  确认消息
                                          var result = consumer.Consume(); 
                                          //存储分区,代码未加
                                          ........
                                          ///存储偏移量
                                          //distributedCache.SetString("QueueName",result.Offset.Value.ToString()); 
                                          //业务
                                          string key = result.Key;
                                          string value = result.Value;
                                          Console.WriteLine($"-----------key:{key},value :{value}");
                                          Console.WriteLine("------------消费成功!--------------"); 
                                      }
                                      catch (Exception ex)
                                      {
                                          Console.WriteLine($"消息消费失败!;data:" + ex.Message);
                                      }
                                  }
          
                              });
                              Console.ReadLine();  
          
          • 1
          • 2
          • 3
          • 4
          • 5
          • 6
          • 7
          • 8
          • 9
          • 10
          • 11
          • 12
          • 13
          • 14
          • 15
          • 16
          • 17
          • 18
          • 19
          • 20
          • 21
          • 22
          • 23
          • 24
          • 25
          • 26
          • 27
          • 28
          • 29
          • 30
          • 31
          • 32
          • 33
          • 34
          • 35
          • 36
          • 37
          • 38
          • 39
          • 40
          • 41
          • 42
          • 43
          • 44
          • 45
          • 46
          • 47
          • 48
          • 49
          • 50
          • 51
          • 52
          • 53
          • 54
          • 55
          • 56
          • 57
          • 58
          • 59
        • 订单服务 [5002]
              private IDistributedCache distributedCache;
              public 构造函数(IDistributedCache \_distributedCache)
              {
              distributedCache = \_distributedCache;
              }
          
                            //消费者代码
                              Console.WriteLine("------------消息消费中--------");
                              //偏移量定义
                              Offset offset = 0;
                              Task.Run(() => {
                              var consumerConfig = new ConsumerConfig
                              {
                                  BootstrapServers = "127.0.0.1:9092",
                                  AutoOffsetReset = AutoOffsetReset.Earliest,
                                  GroupId = "temp",
                                  //消息确认   true:自动确认 false:手动确认
                                  //手动确认
                                  EnableAutoCommit = true
                              };
                              var builder = new ConsumerBuilder(consumerConfig);
                              using var consumer = builder.Build(); 
                              //订阅
                              consumer.Subscribe("QueueName");
                              //从Redis中获取偏移量
                              string offset = distributedCache.GetString("QueueName");
                              if (string.IsNullOrEmpty(offset))
                              {
                                  offset = "0";
                              }
                              //获取分区 
                              ..... [代码未加]
                              //重置偏移量
                              //TopicPartitionOffset对象参数:1、分区对象[队列明名称,分区] 3、偏移量
                              //consumer.Assign(new TopicPartitionOffset(new TopicPartition("QueueName",0[分区动态从数据库获取]),int.Parse(offset)));
                              while (true)
                              {
                                  try
                                  {
                                          //消费后  确认消息
                                          var result = consumer.Consume(); 
                                          //存储分区,代码未加
                                          ........
                                          ///存储偏移量
                                          //distributedCache.SetString("QueueName",result.Offset.Value.ToString()); 
                                          //业务
                                          string key = result.Key;
                                          string value = result.Value;
                                          Console.WriteLine($"-----------key:{key},value :{value}");
                                          Console.WriteLine("------------消费成功!--------------"); 
                                      }
                                      catch (Exception ex)
                                      {
                                          Console.WriteLine($"消息消费失败!;data:" + ex.Message);
                                      }
                                  }
          
                              });
                              Console.ReadLine();     
          
          • 1
          • 2
          • 3
          • 4
          • 5
          • 6
          • 7
          • 8
          • 9
          • 10
          • 11
          • 12
          • 13
          • 14
          • 15
          • 16
          • 17
          • 18
          • 19
          • 20
          • 21
          • 22
          • 23
          • 24
          • 25
          • 26
          • 27
          • 28
          • 29
          • 30
          • 31
          • 32
          • 33
          • 34
          • 35
          • 36
          • 37
          • 38
          • 39
          • 40
          • 41
          • 42
          • 43
          • 44
          • 45
          • 46
          • 47
          • 48
          • 49
          • 50
          • 51
          • 52
          • 53
          • 54
          • 55
          • 56
          • 57
          • 58
          • 59
    • 指定的分区消费消息
      • 生产者
            var producerConfig = new ProducerConfig
            {
                BootstrapServers = "127.0.0.1:9092",
                MessageTimeoutMs = 50000
            };
            var builder = new ProducerBuilder\(producerConfig);
            using (var producer = builder.Build())
            {
                try
                {
                    //数据对象
                    var data = new { name = "小明", sex = "男", old = 12 };
                    string  dataJson = JsonConvert.SerializeObject(data);0
                    var dr = producer.ProduceAsync("QueueName", new Message\ { Key = "data", Value = dataJson }).GetAwaiter().GetResult();
                    Console.WriteLine("------------消息发送成功!----------------");
                }
                catch (ProduceException\ ex)
                {
                     Console.WriteLine(\$"消息发送失败!;data:" + ex.Error.Reason);
                }
            }
        
        • 1
        • 2
        • 3
        • 4
        • 5
        • 6
        • 7
        • 8
        • 9
        • 10
        • 11
        • 12
        • 13
        • 14
        • 15
        • 16
        • 17
        • 18
        • 19
        • 20
        • 21
      • 消费者
        • 代码
              //重置偏移量可以指定分区
              consumer.Assign(new TopicPartitionOffset(new TopicPartition("\[主题名称]",\[分区]),\[偏移量]);
          
          • 1
          • 2
        • 示例代码
                          private IDistributedCache distributedCache;
                          public 构造函数(IDistributedCache \_distributedCache)
                          {
                                   distributedCache = \_distributedCache;
                          }
          
                            //消费者代码
                              Console.WriteLine("------------消息消费中--------");
                              //偏移量定义
                              Offset offset = 0;
                              Task.Run(() => {
                              var consumerConfig = new ConsumerConfig
                              {
                                  BootstrapServers = "127.0.0.1:9092",
                                  AutoOffsetReset = AutoOffsetReset.Earliest,
                                  GroupId = "temp",
                                  //消息确认   true:自动确认 false:手动确认
                                  //手动确认
                                  EnableAutoCommit = true
                              };
                              var builder = new ConsumerBuilder(consumerConfig);
                              using var consumer = builder.Build(); 
                              //订阅
                              consumer.Subscribe("QueueName");
                              //从Redis中获取偏移量
                              string offset = distributedCache.GetString("QueueName");
                              if (string.IsNullOrEmpty(offset))
                              {
                                  offset = "0";
                              }
                              //重置偏移量
                              //TopicPartitionOffset对象参数:1、分区对象[队列明名称,分区] 3、偏移量
                              consumer.Assign(new TopicPartitionOffset(new TopicPartition("QueueName",0),int.Parse(offset)));
                              while (true)
                              {
                                  try
                                  {
                                          //消费后  确认消息
                                          var result = consumer.Consume(); 
                                          //存储分区
                                          .............
                                          ///存储偏移量
                                          distributedCache.SetString("QueueName",result.Offset.Value.ToString()); 
                                          //业务
                                          string key = result.Key;
                                          string value = result.Value;
                                          Console.WriteLine($"-----------key:{key},value :{value}");
                                          Console.WriteLine("------------消费成功!--------------"); 
                                      }
                                      catch (Exception ex)
                                      {
                                          Console.WriteLine($"消息消费失败!;data:" + ex.Message);
                                      }
                                  }
          
                              });
                              Console.ReadLine();    
          
          • 1
          • 2
          • 3
          • 4
          • 5
          • 6
          • 7
          • 8
          • 9
          • 10
          • 11
          • 12
          • 13
          • 14
          • 15
          • 16
          • 17
          • 18
          • 19
          • 20
          • 21
          • 22
          • 23
          • 24
          • 25
          • 26
          • 27
          • 28
          • 29
          • 30
          • 31
          • 32
          • 33
          • 34
          • 35
          • 36
          • 37
          • 38
          • 39
          • 40
          • 41
          • 42
          • 43
          • 44
          • 45
          • 46
          • 47
          • 48
          • 49
          • 50
          • 51
          • 52
          • 53
          • 54
          • 55
          • 56
          • 57
      • 指定的分区发送消息
        • 生产者
          • 代码
            //参数1:队列名称
            //参数2:分区
            TopicPartition topicPartition = new TopicPartition("\[队列名称]",\[分区]);
            
            • 1
          • 实例代码
            var producerConfig = new ProducerConfig
            {
            BootstrapServers = "127.0.0.1:9092",
            MessageTimeoutMs = 50000
            };
            var builder = new ProducerBuilder\(producerConfig);
            using (var producer = builder.Build())
            {
            try
            {
            //数据对象
            var data = new { name = "小明", sex = "男", old = 12 };
            string  dataJson = JsonConvert.SerializeObject(data);
            //参数1:队列名称
            //参数2:分区
            TopicPartition topicPartition = new TopicPartition("QueueName",1);
            var dr = producer.ProduceAsync(topicPartition, new Message\ { Key = "data", Value = dataJson }).GetAwaiter().GetResult();
            Console.WriteLine("------------消息发送成功!----------------");
            }
            catch (ProduceException\ ex)
            {
            Console.WriteLine(\$"消息发送失败!;data:" + ex.Error.Reason);
            }
            }
            
            • 1
            • 2
            • 3
            • 4
            • 5
            • 6
            • 7
            • 8
            • 9
            • 10
            • 11
            • 12
            • 13
            • 14
            • 15
            • 16
            • 17
            • 18
            • 19
            • 20
            • 21
            • 22
            • 23
            • 24
          • 消费者
            private IDistributedCache distributedCache;
            public 构造函数(IDistributedCache \_distributedCache)
            {
            distributedCache = \_distributedCache;
            }
            
                            //消费者代码
                              Console.WriteLine("------------消息消费中--------");
                              //偏移量定义
                              Offset offset = 0;
                              Task.Run(() => {
                              var consumerConfig = new ConsumerConfig
                              {
                                  BootstrapServers = "127.0.0.1:9092",
                                  AutoOffsetReset = AutoOffsetReset.Earliest,
                                  GroupId = "temp",
                                  //消息确认   true:自动确认 false:手动确认
                                  //手动确认
                                  EnableAutoCommit = true
                              };
                              var builder = new ConsumerBuilder(consumerConfig);
                              using var consumer = builder.Build(); 
                              //订阅
                              consumer.Subscribe("QueueName");
                              //从Redis中获取偏移量
                              string offset = distributedCache.GetString("QueueName");
                              if (string.IsNullOrEmpty(offset))
                              {
                                  offset = "0";
                              }
                              //重置偏移量
                              TopicPartitionOffset对象参数:1、分区对象[队列明名称,分区] 3、偏移量
                              consumer.Assign(new TopicPartitionOffset(new TopicPartition("QueueName",0),int.Parse(offset)));
                              while (true)
                              {
                                  try
                                  {
                                          //消费后  确认消息
                                          var result = consumer.Consume(); 
                                          ///存储偏移量
                                          distributedCache.SetString("QueueName",result.Offset.Value.ToString()); 
                                          //业务
                                          string key = result.Key;
                                          string value = result.Value;
                                          Console.WriteLine($"-----------key:{key},value :{value}");
                                          Console.WriteLine("------------消费成功!--------------"); 
                                      }
                                      catch (Exception ex)
                                      {
                                          Console.WriteLine($"消息消费失败!;data:" + ex.Message);
                                      }
                                  }
            
                              });
                              Console.ReadLine();      
            
            • 1
            • 2
            • 3
            • 4
            • 5
            • 6
            • 7
            • 8
            • 9
            • 10
            • 11
            • 12
            • 13
            • 14
            • 15
            • 16
            • 17
            • 18
            • 19
            • 20
            • 21
            • 22
            • 23
            • 24
            • 25
            • 26
            • 27
            • 28
            • 29
            • 30
            • 31
            • 32
            • 33
            • 34
            • 35
            • 36
            • 37
            • 38
            • 39
            • 40
            • 41
            • 42
            • 43
            • 44
            • 45
            • 46
            • 47
            • 48
            • 49
            • 50
            • 51
            • 52
            • 53
            • 54
            • 55

        六、kafka延迟消费落地

        • 场景
          自动取消订单。
        • 实现
          • 生产者
                var producerConfig = new ProducerConfig
                {
                    BootstrapServers = "127.0.0.1:9092",
                    MessageTimeoutMs = 50000
                };
                var builder = new ProducerBuilder\(producerConfig);
                using (var producer = builder.Build())
                {
                    try
                    {
                        //数据对象
                        var data = new { name = "小明", sex = "男", old = 12 };
                        string  dataJson = JsonConvert.SerializeObject(data);
                        //参数1:队列名称
                        //参数2:消息数据
                        var dr = producer.ProduceAsync("QueueName", new Message\ { Key = "data", Value = dataJson }).GetAwaiter().GetResult();
                        Console.WriteLine("------------消息发送成功!----------------");
                    }
                    catch (ProduceException\ ex)
                    {
                            Console.WriteLine(\$"消息发送失败!;data:" + ex.Error.Reason);
                    }
                }
            
            • 1
            • 2
            • 3
            • 4
            • 5
            • 6
            • 7
            • 8
            • 9
            • 10
            • 11
            • 12
            • 13
            • 14
            • 15
            • 16
            • 17
            • 18
            • 19
            • 20
            • 21
            • 22
            • 23
          • 消费者
            • 代码
              //恢复消费
              new  Timer((s)=>{
              consumer.Resume(new List<TopicPartition>{new TopicPartition(\[主题名称],分区) },null,Timeout.Infinite,Timeout.Infinite).Change(\[延迟时间\[单位:毫秒]],\[延迟时间\[单位:毫秒]]);
              });
              //停止消息
              consumer.Pause(new List<TopicPartition>{new TopicPartition(\[主题名称],分区) });
              
              • 1
              • 2
              • 3
              • 4
              • 5
              • 6
            • 实例代码
              private IDistributedCache distributedCache;
              public 构造函数(IDistributedCache \_distributedCache)
              {
              distributedCache = \_distributedCache;
              }
              
                    //消费者代码
                      Console.WriteLine("------------消息消费中--------");
                      //偏移量定义
                      Offset offset = 0;
                      Task.Run(() => {
                      // 创建主题代码未加
                      CreateTopic("QueueName");
                       
                      var consumerConfig = new ConsumerConfig
                      {
                          BootstrapServers = "127.0.0.1:9092",
                          AutoOffsetReset = AutoOffsetReset.Earliest,
                          GroupId = "temp",
                          //消息确认   true:自动确认 false:手动确认
                          //手动确认
                          EnableAutoCommit = true,
                          //批量消费获取最小值
                          FetchMinBytes = 0,
                          //批量消费的最大值
                          FetchMaxBytes = 3000
                      };
                      var builder = new ConsumerBuilder(consumerConfig);
                      using var consumer = builder.Build(); 
                      //订阅
                      consumer.Subscribe("QueueName");
                      //从Redis中获取偏移量
                      string offset = distributedCache.GetString("QueueName");
                      if (string.IsNullOrEmpty(offset))
                       {
                          offset = "0";
                      }
                      //重置偏移量
                      TopicPartitionOffset对象参数:1、分区对象[队列明名称,分区] 3、偏移量
                      consumer.Assign(new TopicPartitionOffset(new TopicPartition("QueueName",0),int.Parse(offset)));
                      while (true)
                      {
                          try
                          {
                                //恢复消费
                                 new  Timer((s)=>{
                                     consumer.Resume(new List{new TopicPartition("QueueName",0) },null,Timeout.Infinite,Timeout.Infinite).Change(5000,5000); 
                                 });
                                 //停止消息
                                 consumer.Pause(new List{new TopicPartition("QueueName",0) });
                                  //批量获取消息消费后在确认消息
                                  var result = consumer.Consume();   
                                  try
                                  {
                                      ///存储偏移量
                                      distributedCache.SetString("QueueName",result.Offset.Value.ToString()); 
                                      //业务
                                      string key = result.Key;
                                      string value = result.Value;
                                      Console.WriteLine($"-----------key:{key},value :{value}");
                                      Console.WriteLine("------------消费成功!--------------"); 
                                  }
                                  cache (Exception ex)
                                  {
                                      
                                  } 
                                  finally 
                                  {
                                       consumer.Pause(new List{new TopicPartition("QueueName",0) });
                                  }
                              }
                              catch (Exception ex)
                              {
                                  Console.WriteLine($"消息消费失败!;data:" + ex.Message);
                              }
                          }
                    
                      });
                      Console.ReadLine();          
              
              • 1
              • 2
              • 3
              • 4
              • 5
              • 6
              • 7
              • 8
              • 9
              • 10
              • 11
              • 12
              • 13
              • 14
              • 15
              • 16
              • 17
              • 18
              • 19
              • 20
              • 21
              • 22
              • 23
              • 24
              • 25
              • 26
              • 27
              • 28
              • 29
              • 30
              • 31
              • 32
              • 33
              • 34
              • 35
              • 36
              • 37
              • 38
              • 39
              • 40
              • 41
              • 42
              • 43
              • 44
              • 45
              • 46
              • 47
              • 48
              • 49
              • 50
              • 51
              • 52
              • 53
              • 54
              • 55
              • 56
              • 57
              • 58
              • 59
              • 60
              • 61
              • 62
              • 63
              • 64
              • 65
              • 66
              • 67
              • 68
              • 69
              • 70
              • 71
              • 72
              • 73
              • 74
              • 75
              • 76
              • 77
              • 78
              • 79

        七 批量发送消息到队列

        • 场景
          假如发送100条消息到消息队列中,如何办证消息全部发送成功或全部发送失败,那就是使用事务来处理。
        • 代码
          producer.InitTransactions(TimeSpan.FormSeconds(60));
          .....
          producer.BeginTransaction();
          ....
          producer.CommitTransaction();
          
          • 1
          • 2
          • 3
          • 4
          • 5
        • 生产者
          var producerConfig = new ProducerConfig
          {
          BootstrapServers = "127.0.0.1:9092",
          MessageTimeoutMs = 50000,
          //失败重试
          EnableIdempotence = true,
          TransactionalId= ""
          };
          var builder = new ProducerBuilder\(producerConfig);
          using (var producer = builder.Build())
          {
          producer.InitTransactions(TimeSpan.FormSeconds(60));
          try
          {
          //数据对象
          var data = new { name = "小明", sex = "男", old = 12 };
          string  dataJson = JsonConvert.SerializeObject(data);
          //参数1:队列名称
          //参数2:消息数据
          producer.BeginTransaction();
          for(int i= 0;i<=100;i++)
          {
          var dr = producer.ProduceAsync("QueueName", new Message\ { Key = "data", Value = dataJson }).GetAwaiter().GetResult();
          Console.WriteLine("------------消息发送成功!----------------");
          }
          producer.CommitTransaction();
          }
          catch (ProduceException\ ex)
          {
          Console.WriteLine(\$"消息发送失败!;data:" + ex.Error.Reason);
          }
          }
          
          • 1
          • 2
          • 3
          • 4
          • 5
          • 6
          • 7
          • 8
          • 9
          • 10
          • 11
          • 12
          • 13
          • 14
          • 15
          • 16
          • 17
          • 18
          • 19
          • 20
          • 21
          • 22
          • 23
          • 24
          • 25
          • 26
          • 27
          • 28
          • 29
          • 30
          • 31
          • 32
        • 消费者
          private IDistributedCache distributedCache;
          public 构造函数(IDistributedCache \_distributedCache)
          {
          distributedCache = \_distributedCache;
          }
          
                            //消费者代码
                              Console.WriteLine("------------消息消费中--------");
                              //偏移量定义
                              Offset offset = 0;
                              Task.Run(() => {
                              var consumerConfig = new ConsumerConfig
                              {
                                  BootstrapServers = "127.0.0.1:9092",
                                  AutoOffsetReset = AutoOffsetReset.Earliest,
                                  GroupId = "temp",
                                  //消息确认   true:自动确认 false:手动确认
                                  //手动确认
                                  EnableAutoCommit = true
                              };
                              var builder = new ConsumerBuilder(consumerConfig);
                              using var consumer = builder.Build(); 
                              //订阅
                              consumer.Subscribe("QueueName");
                              //从Redis中获取偏移量
                              string offset = distributedCache.GetString("QueueName");
                              if (string.IsNullOrEmpty(offset))
                              {
                                  offset = "0";
                              }
                              //重置偏移量
                              //TopicPartitionOffset对象参数:1、分区对象[队列明名称,分区] 3、偏移量
                              consumer.Assign(new TopicPartitionOffset(new TopicPartition("QueueName",0[从redis获取分区]),int.Parse(offset)));
                              while (true)
                              {
                                  try
                                  {
                                          //消费后  确认消息
                                          var result = consumer.Consume(); 
                                          //存储分区
                                          .............
                                          ///存储偏移量
                                          distributedCache.SetString("QueueName",result.Offset.Value.ToString()); 
                                          //业务
                                          string key = result.Key;
                                          string value = result.Value;
                                          Console.WriteLine($"-----------key:{key},value :{value}");
                                          Console.WriteLine("------------消费成功!--------------"); 
                                      }
                                      catch (Exception ex)
                                      {
                                          Console.WriteLine($"消息消费失败!;data:" + ex.Message);
                                      }
                                  }
                
                              });
                              Console.ReadLine();    
          
          • 1
          • 2
          • 3
          • 4
          • 5
          • 6
          • 7
          • 8
          • 9
          • 10
          • 11
          • 12
          • 13
          • 14
          • 15
          • 16
          • 17
          • 18
          • 19
          • 20
          • 21
          • 22
          • 23
          • 24
          • 25
          • 26
          • 27
          • 28
          • 29
          • 30
          • 31
          • 32
          • 33
          • 34
          • 35
          • 36
          • 37
          • 38
          • 39
          • 40
          • 41
          • 42
          • 43
          • 44
          • 45
          • 46
          • 47
          • 48
          • 49
          • 50
          • 51
          • 52
          • 53
          • 54
          • 55
          • 56
          • 57
      • 相关阅读:
        基于粒子群优化二维Otsu的肺CT图像分割算法
        面试中经常问到的几个问题,快来看看能答对几道吧
        C++ Reference: Standard C++ Library reference: C Library: cmath: hypot
        【工作杂记】groupBy排序-操作word
        【Git】Git 学习笔记_操作本地仓库
        MyBatis 源码分析之 Mapper 接口代理对象生成及方法执行
        Lotusscript中的Base64处理
        1.5.C++项目:仿muduo库实现并发服务器之socket模块的设计
        深度学习阿丘科技AIDI标注工具使用(2.3版本)
        【NodeJs-5天学习】第一天篇④ ——了解NodeJs回调函数和事件驱动机制
      • 原文地址:https://blog.csdn.net/Fu_Shi_rong/article/details/126888175