在微服务系统中,微服务之间通信,主要是通过Http或者gRPC通信。由于http/gRPC通信方式是同步通信,如果遇到了高并发,同步通信就会导致微服务系统性能瓶颈,所以,为了解决微服务性能瓶颈问题。需要将同步通信换成异步通信方式。因此。就选用使用消息队列。
消息队列的代表技术,就是Kafka。
安装kafkatool_64bit工具
配置
#在kafka根目录下的[/config/zookeeper.properties]
dataDir=/tmp/zookeeper
clientPort=2181
maxClientCnxns=0
admin.enableServer=false
运行zookeeper [注册中心]
#在kafka根目录下的[/bin/windows]文件下启动zookeeper:
zookeeper-server-start.bat ../../config/zookeeper.properties
运行结果如下:

运行kafka
#在kafka根目录下的[/config/server.properties]配置持久化消息的文件
log.dirs=/tmp/kafka-logs
#其他配置为默认即可
#在kafka根目录下的[/bin/windows]文件下启动kafka:
kafka-server-start.bat ../../config/server.properties

项目配置
Confluent.Kafka
生产者代码
var producerConfig = new ProducerConfig
{
BootstrapServers = "127.0.0.1:9092",
MessageTimeoutMs = 50000,
//失败重试
EnableIdempotence = true
};
var builder = new ProducerBuilder(producerConfig);
using (var producer = builder.Build())
{
try
{
//数据对象
var data = new { name = "小明", sex = "男", old = 12 };
string dataJson = JsonConvert.SerializeObject(data);
//参数1:队列名称
//参数2:消息数据
var dr = producer.ProduceAsync("QueueName", new Message { Key = "data", Value = dataJson }).GetAwaiter().GetResult();
Console.WriteLine("------------消息发送成功!----------------");
}
catch (ProduceException ex)
{
Console.WriteLine($"消息发送失败!;data:" + ex.Error.Reason);
}
}
算法
//在生产者中设置算法
builder.SetDefaultPartitioner([函数名称\委托]]);
创建分区
public async Task CreatePartition(string topic,int partitionCount)
{
AdminClientConfig adminClientConfig = new AdminClientConfig(){
BootstrapServers = “127.0.0.1:9092”
};
var build = new AdminClientBuild(adminClientConfig).Build();
build.CreatePartitionsAsync(new PartitionsSpecification[]{
new PartitionsSpecification(){Topic = topic,IncreaseTo = partitionCount}
}).Wait();
await Task.CompletedTask;
}
消费者代码
备注
建议使用关系型数据库存储分区、偏移量和主题名称,以下代码使用的是redis存储的便偏移量
private IDistributedCache distributedCache;
public 构造函数(IDistributedCache _distributedCache)
{
distributedCache = _distributedCache;
}
//消费者代码
Console.WriteLine("------------消息消费中--------");
//偏移量定义
Offset offset = 0;
Task.Run(() => {
// 创建主题代码未加
CreateTopic("QueueName");
var consumerConfig = new ConsumerConfig
{
BootstrapServers = "127.0.0.1:9092",
AutoOffsetReset = AutoOffsetReset.Earliest,
GroupId = "temp",
//消息确认 true:自动确认 false:手动确认
//手动确认
EnableAutoCommit = true
};
var builder = new ConsumerBuilder(consumerConfig);
using var consumer = builder.Build();
//订阅
consumer.Subscribe("QueueName");
//从Redis中获取偏移量
string offset = distributedCache.GetString("QueueName");
if (string.IsNullOrEmpty(offset))
{
offset = "0";
}
重置偏移量
TopicPartitionOffset对象参数:1、分区对象[队列明名称,分区] 3、偏移量
consumer.Assign(new TopicPartitionOffset(new TopicPartition("QueueName",0),int.Parse(offset)));
while (true)
{
try
{
//消费后 确认消息
var result = consumer.Consume();
//存储偏移量
distributedCache.SetString("QueueName",result.Offset.Value.ToString());
//业务
string key = result.Key;
string value = result.Value;
Console.WriteLine($"-----------key:{key},value :{value}");
Console.WriteLine("------------消费成功!--------------");
}
catch (Exception ex)
{
Console.WriteLine($"消息消费失败!;data:" + ex.Message);
}
}
});
Console.ReadLine();
创建主题
public async Task CreateTopic(string topicName)
{
AdminClientConfig adminClientConfig = new AdminClientConfig(){
BootstrapServers = "127.0.0.1:9092"
};
var build = new AdminClientBuild(adminClientConfig).Build();
build.CreateTopicsAsync(new TopicSpecification[]{
new TopicSpecification(){Name = topicName [主题名称]}
}).Wait();
await Task.CompletedTask;
}
| 属性含义 | 数据类型 | 备注 | 类型 |
|---|---|---|---|
| BootstrapServers | string | kafka连接字符串地址 | 生产者 消费者 |
| MessageTimeoutMs | int | 超时时间 | 生产者 |
| GroupId | string | 组ID | 消费者 |
| EnableAutoCommit | bool | true:自动确认 false:手动确认 | 消费者 |
| EnableIdempotence | bool | 失败重试 true:重试 false:不重试 | 生产者 |
手动确认消息
var producerConfig = new ProducerConfig
{
BootstrapServers = "127.0.0.1:9092",
MessageTimeoutMs = 50000
};
var builder = new ProducerBuilder\(producerConfig);
using (var producer = builder.Build())
{
try
{
//数据对象
var data = new { name = "小明", sex = "男", old = 12 };
string dataJson = JsonConvert.SerializeObject(data);
var dr = producer.ProduceAsync("QueueName", new Message\ { Key = "data", Value = dataJson }).GetAwaiter().GetResult();
Console.WriteLine("------------消息发送成功!----------------");
}
catch (ProduceException\ ex)
{
Console.WriteLine(\$"消息发送失败!;data:" + ex.Error.Reason);
}
}
var consumerConfig = new ConsumerConfig
{
..........
//消息确认 true:自动确认 false:手动确认
//自动确认的
EnableAutoCommit = false
};
* 实例代码
Console.WriteLine("------------消息消费中--------");
Task.Run(() => {
var consumerConfig = new ConsumerConfig
{
BootstrapServers = "127.0.0.1:9092",
AutoOffsetReset = AutoOffsetReset.Earliest,
GroupId = "temp",
//消息确认 true:自动确认 false:手动确认
//自动确认的
EnableAutoCommit = false
};
var builder = new ConsumerBuilder\(consumerConfig);
using var consumer = builder.Build();
//订阅
consumer.Subscribe("QueueName");
while (true)
{
try
{
//消费后 确认消息
var result = consumer.Consume();
//业务
string key = result.Key;
string value = result.Value;
Console.WriteLine(`$"-----------key:{key},value :{value}");
Console.WriteLine("------------消费成功!--------------");
//手动提交
consumer.StoreOffset(result);
}
catch (Exception ex)
{
Console.WriteLine($`"消息消费失败!;data:" + ex.Message);
}
}
});
Console.ReadLine();
手动确认消息–偏移量[重复消费]
var producerConfig = new ProducerConfig
{
BootstrapServers = "127.0.0.1:9092",
MessageTimeoutMs = 50000
};
var builder = new ProducerBuilder\(producerConfig);
using (var producer = builder.Build())
{
try
{
//数据对象
var data = new { name = "小明", sex = "男", old = 12 };
string dataJson = JsonConvert.SerializeObject(data);0
var dr = producer.ProduceAsync("QueueName", new Message\ { Key = "data", Value = dataJson }).GetAwaiter().GetResult();
Console.WriteLine("------------消息发送成功!----------------");
}
catch (ProduceException\ ex)
{
Console.WriteLine(\$"消息发送失败!;data:" + ex.Error.Reason);
}
}
Microsoft.Extensions.Caching.Redis
services.AddDistributedRedisCache(options=>{
options.Configuration = "localhost:6379"
});
private IDistributedCache distributedCache;
public 构造函数(IDistributedCache \_distributedCache)
{
distributedCache = \_distributedCache;
}
//消费者代码
Console.WriteLine("------------消息消费中--------");
//偏移量定义
Offset offset = 0;
Task.Run(() => {
var consumerConfig = new ConsumerConfig
{
BootstrapServers = "127.0.0.1:9092",
AutoOffsetReset = AutoOffsetReset.Earliest,
GroupId = "temp",
//消息确认 true:自动确认 false:手动确认
//手动确认
EnableAutoCommit = true
};
var builder = new ConsumerBuilder(consumerConfig);
using var consumer = builder.Build();
//订阅
consumer.Subscribe("QueueName");
//从Redis中获取偏移量
string offset = distributedCache.GetString("QueueName");
if (string.IsNullOrEmpty(offset))
{
offset = "0";
}
//重置偏移量
//TopicPartitionOffset对象参数:1、分区对象[队列明名称,分区] 3、偏移量
consumer.Assign(new TopicPartitionOffset(new TopicPartition("QueueName",0),int.Parse(offset)));
while (true)
{
try
{
//消费后 确认消息
var result = consumer.Consume();
///存储偏移量
distributedCache.SetString("QueueName",result.Offset.Value.ToString());
//业务
string key = result.Key;
string value = result.Value;
Console.WriteLine($"-----------key:{key},value :{value}");
Console.WriteLine("------------消费成功!--------------");
}
catch (Exception ex)
{
Console.WriteLine($"消息消费失败!;data:" + ex.Message);
}
}
});
Console.ReadLine();
kafka宕机后如何保证消息不丢失的
订阅发布[广播消费] [组 GroupId]
var producerConfig = new ProducerConfig
{
BootstrapServers = "127.0.0.1:9092",
MessageTimeoutMs = 50000
};
var builder = new ProducerBuilder\(producerConfig);
using (var producer = builder.Build())
{
try
{
//数据对象
var data = new { name = "小明", sex = "男", old = 12 };
string dataJson = JsonConvert.SerializeObject(data);0
var dr = producer.ProduceAsync("QueueName", new Message\ { Key = "data", Value = dataJson }).GetAwaiter().GetResult();
Console.WriteLine("------------消息发送成功!----------------");
}
catch (ProduceException\ ex)
{
Console.WriteLine(\$"消息发送失败!;data:" + ex.Error.Reason);
}
}
var consumerConfig = new ConsumerConfig
{
......
GroupId = "temp",// 如果其他服务也想同时消费一个服务,其他服务在同一个主题之下设置不同的组ID即可。
.....
};
private IDistributedCache distributedCache;
public 构造函数(IDistributedCache \_distributedCache)
{
distributedCache = \_distributedCache;
}
//消费者代码
Console.WriteLine("------------消息消费中--------");
//偏移量定义
Offset offset = 0;
Task.Run(() => {
var consumerConfig = new ConsumerConfig
{
BootstrapServers = "127.0.0.1:9092",
AutoOffsetReset = AutoOffsetReset.Earliest,
GroupId = "temp",
//消息确认 true:自动确认 false:手动确认
//手动确认
EnableAutoCommit = true
};
var builder = new ConsumerBuilder(consumerConfig);
using var consumer = builder.Build();
//订阅
consumer.Subscribe("QueueName");
//从Redis中获取偏移量
string offset = distributedCache.GetString("QueueName");
if (string.IsNullOrEmpty(offset))
{
offset = "0";
}
//重置偏移量
//TopicPartitionOffset对象参数:1、分区对象[队列明名称,分区] 3、偏移量
consumer.Assign(new TopicPartitionOffset(new TopicPartition("QueueName",0),int.Parse(offset)));
while (true)
{
try
{
//消费后 确认消息
var result = consumer.Consume();
///存储偏移量
distributedCache.SetString("QueueName",result.Offset.Value.ToString());
//业务
string key = result.Key;
string value = result.Value;
Console.WriteLine($"-----------key:{key},value :{value}");
Console.WriteLine("------------消费成功!--------------");
}
catch (Exception ex)
{
Console.WriteLine($"消息消费失败!;data:" + ex.Message);
}
}
});
Console.ReadLine();
private IDistributedCache distributedCache;
public 构造函数(IDistributedCache \_distributedCache)
{
distributedCache = \_distributedCache;
}
//消费者代码
Console.WriteLine("------------消息消费中--------");
//偏移量定义
Offset offset = 0;
Task.Run(() => {
var consumerConfig = new ConsumerConfig
{
BootstrapServers = "127.0.0.1:9092",
AutoOffsetReset = AutoOffsetReset.Earliest,
GroupId = "sms",
//消息确认 true:自动确认 false:手动确认
//手动确认
EnableAutoCommit = true
};
var builder = new ConsumerBuilder(consumerConfig);
using var consumer = builder.Build();
//订阅
consumer.Subscribe("QueueName");
//从Redis中获取偏移量
string offset = distributedCache.GetString("QueueName");
if (string.IsNullOrEmpty(offset))
{
offset = "0";
}
//重置偏移量
//TopicPartitionOffset对象参数:1、分区对象[队列明名称,分区] 3、偏移量
consumer.Assign(new TopicPartitionOffset(new TopicPartition("QueueName",0),int.Parse(offset)));
while (true)
{
try
{
//消费后 确认消息
var result = consumer.Consume();
///存储偏移量
distributedCache.SetString("QueueName",result.Offset.Value.ToString());
//业务
string key = result.Key;
string value = result.Value;
Console.WriteLine($"-----------key:{key},value :{value}");
Console.WriteLine("------------消费成功!--------------");
}
catch (Exception ex)
{
Console.WriteLine($"消息消费失败!;data:" + ex.Message);
}
}
});
Console.ReadLine();
public async Task CreatePartition(string topic,int partitionCount)
{
AdminClientConfig adminClientConfig = new AdminClientConfig(){
BootstrapServers = "127.0.0.1:9092"
};
var build = new AdminClientBuild(adminClientConfig).Build();
build.CreatePartitionsAsync(new PartitionsSpecification\[]{
new PartitionsSpecification(){Topic = topic,IncreaseTo = partitionCount}
}).Wait();
await Task.CompletedTask;
}
static int requestCount = 0;
public Partition RoundRobinPartitioner(string tipic,int partitionCount,ReadOnlySpan<byte>keyData,bool keyIsNull)
{
int partition = requestCount%partitionCount;
requestCount++;
return new Partition(partition);
}
//设置轮询算法
builder.SetDefaultPartitioner(\[算法 委托函数]);
var producerConfig = new ProducerConfig
{
BootstrapServers = "127.0.0.1:9092",
MessageTimeoutMs = 50000
};
var builder = new ProducerBuilder\(producerConfig);
//设置轮询算法
builder.SetDefaultPartitioner(RoundRobinPartitioner);
using (var producer = builder.Build())
{
try
{
//数据对象
var data = new { name = "小明", sex = "男", old = 12 };
string dataJson = JsonConvert.SerializeObject(data);0
var dr = producer.ProduceAsync("QueueName", new Message\ { Key = "data", Value = dataJson }).GetAwaiter().GetResult();
Console.WriteLine("------------消息发送成功!----------------");
}
catch (ProduceException\ ex)
{
Console.WriteLine(\$"消息发送失败!;data:" + ex.Error.Reason);
}
}
private IDistributedCache distributedCache;
public 构造函数(IDistributedCache \_distributedCache)
{
distributedCache = \_distributedCache;
}
//消费者代码
Console.WriteLine("------------消息消费中--------");
//偏移量定义
Offset offset = 0;
Task.Run(() => {
var consumerConfig = new ConsumerConfig
{
BootstrapServers = "127.0.0.1:9092",
AutoOffsetReset = AutoOffsetReset.Earliest,
GroupId = "temp",
//消息确认 true:自动确认 false:手动确认
//手动确认
EnableAutoCommit = true
};
var builder = new ConsumerBuilder(consumerConfig);
using var consumer = builder.Build();
//订阅
consumer.Subscribe("QueueName");
//从Redis中获取偏移量
string offset = distributedCache.GetString("QueueName");
if (string.IsNullOrEmpty(offset))
{
offset = "0";
}
//获取分区
..... [代码未加]
//重置偏移量
//TopicPartitionOffset对象参数:1、分区对象[队列明名称,分区] 3、偏移量
//consumer.Assign(new TopicPartitionOffset(new TopicPartition("QueueName",0[分区动态从数据库获取]),int.Parse(offset)));
while (true)
{
try
{
//消费后 确认消息
var result = consumer.Consume();
//存储分区,代码未加
........
///存储偏移量
//distributedCache.SetString("QueueName",result.Offset.Value.ToString());
//业务
string key = result.Key;
string value = result.Value;
Console.WriteLine($"-----------key:{key},value :{value}");
Console.WriteLine("------------消费成功!--------------");
}
catch (Exception ex)
{
Console.WriteLine($"消息消费失败!;data:" + ex.Message);
}
}
});
Console.ReadLine();
private IDistributedCache distributedCache;
public 构造函数(IDistributedCache \_distributedCache)
{
distributedCache = \_distributedCache;
}
//消费者代码
Console.WriteLine("------------消息消费中--------");
//偏移量定义
Offset offset = 0;
Task.Run(() => {
var consumerConfig = new ConsumerConfig
{
BootstrapServers = "127.0.0.1:9092",
AutoOffsetReset = AutoOffsetReset.Earliest,
GroupId = "temp",
//消息确认 true:自动确认 false:手动确认
//手动确认
EnableAutoCommit = true
};
var builder = new ConsumerBuilder(consumerConfig);
using var consumer = builder.Build();
//订阅
consumer.Subscribe("QueueName");
//从Redis中获取偏移量
string offset = distributedCache.GetString("QueueName");
if (string.IsNullOrEmpty(offset))
{
offset = "0";
}
//获取分区
..... [代码未加]
//重置偏移量
//TopicPartitionOffset对象参数:1、分区对象[队列明名称,分区] 3、偏移量
//consumer.Assign(new TopicPartitionOffset(new TopicPartition("QueueName",0[分区动态从数据库获取]),int.Parse(offset)));
while (true)
{
try
{
//消费后 确认消息
var result = consumer.Consume();
//存储分区,代码未加
........
///存储偏移量
//distributedCache.SetString("QueueName",result.Offset.Value.ToString());
//业务
string key = result.Key;
string value = result.Value;
Console.WriteLine($"-----------key:{key},value :{value}");
Console.WriteLine("------------消费成功!--------------");
}
catch (Exception ex)
{
Console.WriteLine($"消息消费失败!;data:" + ex.Message);
}
}
});
Console.ReadLine();
var producerConfig = new ProducerConfig
{
BootstrapServers = "127.0.0.1:9092",
MessageTimeoutMs = 50000
};
var builder = new ProducerBuilder\(producerConfig);
using (var producer = builder.Build())
{
try
{
//数据对象
var data = new { name = "小明", sex = "男", old = 12 };
string dataJson = JsonConvert.SerializeObject(data);0
var dr = producer.ProduceAsync("QueueName", new Message\ { Key = "data", Value = dataJson }).GetAwaiter().GetResult();
Console.WriteLine("------------消息发送成功!----------------");
}
catch (ProduceException\ ex)
{
Console.WriteLine(\$"消息发送失败!;data:" + ex.Error.Reason);
}
}
//重置偏移量可以指定分区
consumer.Assign(new TopicPartitionOffset(new TopicPartition("\[主题名称]",\[分区]),\[偏移量]);
private IDistributedCache distributedCache;
public 构造函数(IDistributedCache \_distributedCache)
{
distributedCache = \_distributedCache;
}
//消费者代码
Console.WriteLine("------------消息消费中--------");
//偏移量定义
Offset offset = 0;
Task.Run(() => {
var consumerConfig = new ConsumerConfig
{
BootstrapServers = "127.0.0.1:9092",
AutoOffsetReset = AutoOffsetReset.Earliest,
GroupId = "temp",
//消息确认 true:自动确认 false:手动确认
//手动确认
EnableAutoCommit = true
};
var builder = new ConsumerBuilder(consumerConfig);
using var consumer = builder.Build();
//订阅
consumer.Subscribe("QueueName");
//从Redis中获取偏移量
string offset = distributedCache.GetString("QueueName");
if (string.IsNullOrEmpty(offset))
{
offset = "0";
}
//重置偏移量
//TopicPartitionOffset对象参数:1、分区对象[队列明名称,分区] 3、偏移量
consumer.Assign(new TopicPartitionOffset(new TopicPartition("QueueName",0),int.Parse(offset)));
while (true)
{
try
{
//消费后 确认消息
var result = consumer.Consume();
//存储分区
.............
///存储偏移量
distributedCache.SetString("QueueName",result.Offset.Value.ToString());
//业务
string key = result.Key;
string value = result.Value;
Console.WriteLine($"-----------key:{key},value :{value}");
Console.WriteLine("------------消费成功!--------------");
}
catch (Exception ex)
{
Console.WriteLine($"消息消费失败!;data:" + ex.Message);
}
}
});
Console.ReadLine();
TopicPartition topicPartition = new TopicPartition("\[队列名称]",\[分区]);
var producerConfig = new ProducerConfig
{
BootstrapServers = "127.0.0.1:9092",
MessageTimeoutMs = 50000
};
var builder = new ProducerBuilder\(producerConfig);
using (var producer = builder.Build())
{
try
{
//数据对象
var data = new { name = "小明", sex = "男", old = 12 };
string dataJson = JsonConvert.SerializeObject(data);
//参数1:队列名称
//参数2:分区
TopicPartition topicPartition = new TopicPartition("QueueName",1);
var dr = producer.ProduceAsync(topicPartition, new Message\ { Key = "data", Value = dataJson }).GetAwaiter().GetResult();
Console.WriteLine("------------消息发送成功!----------------");
}
catch (ProduceException\ ex)
{
Console.WriteLine(\$"消息发送失败!;data:" + ex.Error.Reason);
}
}
private IDistributedCache distributedCache;
public 构造函数(IDistributedCache \_distributedCache)
{
distributedCache = \_distributedCache;
}
//消费者代码
Console.WriteLine("------------消息消费中--------");
//偏移量定义
Offset offset = 0;
Task.Run(() => {
var consumerConfig = new ConsumerConfig
{
BootstrapServers = "127.0.0.1:9092",
AutoOffsetReset = AutoOffsetReset.Earliest,
GroupId = "temp",
//消息确认 true:自动确认 false:手动确认
//手动确认
EnableAutoCommit = true
};
var builder = new ConsumerBuilder(consumerConfig);
using var consumer = builder.Build();
//订阅
consumer.Subscribe("QueueName");
//从Redis中获取偏移量
string offset = distributedCache.GetString("QueueName");
if (string.IsNullOrEmpty(offset))
{
offset = "0";
}
//重置偏移量
TopicPartitionOffset对象参数:1、分区对象[队列明名称,分区] 3、偏移量
consumer.Assign(new TopicPartitionOffset(new TopicPartition("QueueName",0),int.Parse(offset)));
while (true)
{
try
{
//消费后 确认消息
var result = consumer.Consume();
///存储偏移量
distributedCache.SetString("QueueName",result.Offset.Value.ToString());
//业务
string key = result.Key;
string value = result.Value;
Console.WriteLine($"-----------key:{key},value :{value}");
Console.WriteLine("------------消费成功!--------------");
}
catch (Exception ex)
{
Console.WriteLine($"消息消费失败!;data:" + ex.Message);
}
}
});
Console.ReadLine();
var producerConfig = new ProducerConfig
{
BootstrapServers = "127.0.0.1:9092",
MessageTimeoutMs = 50000
};
var builder = new ProducerBuilder\(producerConfig);
using (var producer = builder.Build())
{
try
{
//数据对象
var data = new { name = "小明", sex = "男", old = 12 };
string dataJson = JsonConvert.SerializeObject(data);
//参数1:队列名称
//参数2:消息数据
var dr = producer.ProduceAsync("QueueName", new Message\ { Key = "data", Value = dataJson }).GetAwaiter().GetResult();
Console.WriteLine("------------消息发送成功!----------------");
}
catch (ProduceException\ ex)
{
Console.WriteLine(\$"消息发送失败!;data:" + ex.Error.Reason);
}
}
//恢复消费
new Timer((s)=>{
consumer.Resume(new List<TopicPartition>{new TopicPartition(\[主题名称],分区) },null,Timeout.Infinite,Timeout.Infinite).Change(\[延迟时间\[单位:毫秒]],\[延迟时间\[单位:毫秒]]);
});
//停止消息
consumer.Pause(new List<TopicPartition>{new TopicPartition(\[主题名称],分区) });
private IDistributedCache distributedCache;
public 构造函数(IDistributedCache \_distributedCache)
{
distributedCache = \_distributedCache;
}
//消费者代码
Console.WriteLine("------------消息消费中--------");
//偏移量定义
Offset offset = 0;
Task.Run(() => {
// 创建主题代码未加
CreateTopic("QueueName");
var consumerConfig = new ConsumerConfig
{
BootstrapServers = "127.0.0.1:9092",
AutoOffsetReset = AutoOffsetReset.Earliest,
GroupId = "temp",
//消息确认 true:自动确认 false:手动确认
//手动确认
EnableAutoCommit = true,
//批量消费获取最小值
FetchMinBytes = 0,
//批量消费的最大值
FetchMaxBytes = 3000
};
var builder = new ConsumerBuilder(consumerConfig);
using var consumer = builder.Build();
//订阅
consumer.Subscribe("QueueName");
//从Redis中获取偏移量
string offset = distributedCache.GetString("QueueName");
if (string.IsNullOrEmpty(offset))
{
offset = "0";
}
//重置偏移量
TopicPartitionOffset对象参数:1、分区对象[队列明名称,分区] 3、偏移量
consumer.Assign(new TopicPartitionOffset(new TopicPartition("QueueName",0),int.Parse(offset)));
while (true)
{
try
{
//恢复消费
new Timer((s)=>{
consumer.Resume(new List{new TopicPartition("QueueName",0) },null,Timeout.Infinite,Timeout.Infinite).Change(5000,5000);
});
//停止消息
consumer.Pause(new List{new TopicPartition("QueueName",0) });
//批量获取消息消费后在确认消息
var result = consumer.Consume();
try
{
///存储偏移量
distributedCache.SetString("QueueName",result.Offset.Value.ToString());
//业务
string key = result.Key;
string value = result.Value;
Console.WriteLine($"-----------key:{key},value :{value}");
Console.WriteLine("------------消费成功!--------------");
}
cache (Exception ex)
{
}
finally
{
consumer.Pause(new List{new TopicPartition("QueueName",0) });
}
}
catch (Exception ex)
{
Console.WriteLine($"消息消费失败!;data:" + ex.Message);
}
}
});
Console.ReadLine();
producer.InitTransactions(TimeSpan.FormSeconds(60));
.....
producer.BeginTransaction();
....
producer.CommitTransaction();
var producerConfig = new ProducerConfig
{
BootstrapServers = "127.0.0.1:9092",
MessageTimeoutMs = 50000,
//失败重试
EnableIdempotence = true,
TransactionalId= ""
};
var builder = new ProducerBuilder\(producerConfig);
using (var producer = builder.Build())
{
producer.InitTransactions(TimeSpan.FormSeconds(60));
try
{
//数据对象
var data = new { name = "小明", sex = "男", old = 12 };
string dataJson = JsonConvert.SerializeObject(data);
//参数1:队列名称
//参数2:消息数据
producer.BeginTransaction();
for(int i= 0;i<=100;i++)
{
var dr = producer.ProduceAsync("QueueName", new Message\ { Key = "data", Value = dataJson }).GetAwaiter().GetResult();
Console.WriteLine("------------消息发送成功!----------------");
}
producer.CommitTransaction();
}
catch (ProduceException\ ex)
{
Console.WriteLine(\$"消息发送失败!;data:" + ex.Error.Reason);
}
}
private IDistributedCache distributedCache;
public 构造函数(IDistributedCache \_distributedCache)
{
distributedCache = \_distributedCache;
}
//消费者代码
Console.WriteLine("------------消息消费中--------");
//偏移量定义
Offset offset = 0;
Task.Run(() => {
var consumerConfig = new ConsumerConfig
{
BootstrapServers = "127.0.0.1:9092",
AutoOffsetReset = AutoOffsetReset.Earliest,
GroupId = "temp",
//消息确认 true:自动确认 false:手动确认
//手动确认
EnableAutoCommit = true
};
var builder = new ConsumerBuilder(consumerConfig);
using var consumer = builder.Build();
//订阅
consumer.Subscribe("QueueName");
//从Redis中获取偏移量
string offset = distributedCache.GetString("QueueName");
if (string.IsNullOrEmpty(offset))
{
offset = "0";
}
//重置偏移量
//TopicPartitionOffset对象参数:1、分区对象[队列明名称,分区] 3、偏移量
consumer.Assign(new TopicPartitionOffset(new TopicPartition("QueueName",0[从redis获取分区]),int.Parse(offset)));
while (true)
{
try
{
//消费后 确认消息
var result = consumer.Consume();
//存储分区
.............
///存储偏移量
distributedCache.SetString("QueueName",result.Offset.Value.ToString());
//业务
string key = result.Key;
string value = result.Value;
Console.WriteLine($"-----------key:{key},value :{value}");
Console.WriteLine("------------消费成功!--------------");
}
catch (Exception ex)
{
Console.WriteLine($"消息消费失败!;data:" + ex.Message);
}
}
});
Console.ReadLine();