特性 | ActiveMq | RabbitMq | RocketMQ | Kafka |
---|---|---|---|---|
成熟度 | 成熟 | 成熟 | 比较成熟 | 成熟的日志领域 |
时效性 | 微秒级 | 毫秒级 | 毫秒级 | |
社区活跃度 | 低 | 高 | 高 | 高 |
单机吞吐量 | 万级,吞吐量比RocketMQ和Kafka要低了一个数量级 | 万级,吞吐量比RocketMQ和Kafka要低了一个数量级 | 10万级,RocketMQ也是可以支撑高吞吐的一种MQ | 10万级别,这是kafka最大的优点,就是吞吐量高。一般配合大数据类的系统来进行实时数据计算、日志采集等场景 |
topic数量对吞吐量的影响 | topic可以达到几百,几千个的级别,吞吐量会有较小幅度的下降这是RocketMQ的一大优势,在同等机器下,可以支撑大量的topic | topic从几十个到几百个的时候,吞吐量会大幅度下降所以在同等机器下,kafka尽量保证topic数量不要过多。如果要支撑大规模topic,需要增加更多的机器资源 | ||
可用性 | 高,基于主从架构实现高可用性 | 高,基于主从架构实现高可用性 | 非常高,分布式架构 | 非常高,kafka是分布式的,一个数据多个副本,少数机器宕机,不会丢失数据,不会导致不可用 |
消息可靠性 | 有较低的概率丢失数据 | 经过参数优化配置,可以做到0丢失 | 经过参数优化配置,消息可以做到0丢失 | |
功能支持 | MQ领域的功能极其完备 | 基于erlang开发,所以并发能力很强,性能极其好,延时很低 | MQ功能较为完善,还是分布式的,扩展性好 | 功能较为简单,主要支持简单的MQ功能,在大数据领域的实时计算以及日志采集被大规模使用,是事实上的标准 |
发送方式 | 发送TPS | 发送结果反馈 | 是否有重试 | 可靠性 |
---|---|---|---|---|
同步发送 | 快 | 有 | 有 | 不丢失 |
异步发送 | 快 | 有 | 无 | 不丢失 |
单向发送 | 最快 | 无 | 无 | 可能丢失 |
package main
import (
"context"
"fmt"
"github.com/apache/rocketmq-client-go/v2"
"github.com/apache/rocketmq-client-go/v2/primitive"
"github.com/apache/rocketmq-client-go/v2/producer"
)
func main() {
p, err := rocketmq.NewProducer(producer.WithNameServer([]string{"192.168.124.51:9876"}))
if err != nil {
panic("生成producer失败")
}
if err = p.Start(); err != nil {
panic("启动producer失败")
}
res, err := p.SendSync(context.Background(), primitive.NewMessage("imooc1", []byte("this is imooc1")))
if err != nil {
fmt.Printf("发送失败: %s\n", err)
} else {
fmt.Printf("发送成功: %s\n", res.String())
}
if err = p.Shutdown(); err != nil {
panic("关闭producer失败")
}
}
package main
import (
"context"
"fmt"
"time"
"github.com/apache/rocketmq-client-go/v2"
"github.com/apache/rocketmq-client-go/v2/consumer"
"github.com/apache/rocketmq-client-go/v2/primitive"
)
func main() {
c, _ := rocketmq.NewPushConsumer(
consumer.WithNameServer([]string{"192.168.124.51:9876"}),
consumer.WithGroupName("mxshop"),
)
if err := c.Subscribe("imooc1", consumer.MessageSelector{}, func(ctx context.Context, msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
for i := range msgs {
fmt.Printf("获取到值: %v \n", msgs[i])
}
return consumer.ConsumeSuccess, nil
}); err != nil {
fmt.Println("读取消息失败")
}
_ = c.Start()
//不能让主goroutine退出
time.Sleep(time.Hour)
_ = c.Shutdown()
}
package main
import (
"context"
"fmt"
"github.com/apache/rocketmq-client-go/v2"
"github.com/apache/rocketmq-client-go/v2/primitive"
"github.com/apache/rocketmq-client-go/v2/producer"
)
func main() {
p, err := rocketmq.NewProducer(producer.WithNameServer([]string{"192.168.124.51:9876"}))
if err != nil {
panic("生成producer失败")
}
if err = p.Start(); err != nil {
panic("启动producer失败")
}
msg := primitive.NewMessage("imooc1", []byte("this is delay message"))
msg.WithDelayTimeLevel(3)
res, err := p.SendSync(context.Background(), msg)
if err != nil {
fmt.Printf("发送失败: %s\n", err)
} else {
fmt.Printf("发送成功: %s\n", res.String())
}
if err = p.Shutdown(); err != nil {
panic("关闭producer失败")
}
//支付的时候, 淘宝, 12306, 购票, 超时归还 - 定时执行逻辑
//我可以去写一个轮询, 轮询的问题: 1. 多久执行一次轮询 30分钟
//在12:00执行过一次, 下一次执行就是在 12:30的时候 但是12:01的时候下了单, 12:31就应该超时 13:00时候才能超时
//那我1分钟执行一次啊, 比如我的订单量没有这么大,1分钟执行一次, 其中29次查询都是无用, 而且你还还会轮询mysql
//rocketmq的延迟消息, 1. 时间一到就执行, 2. 消息中包含了订单编号,你只查询这种订单编号
}
package main
import (
"context"
"fmt"
"time"
"github.com/apache/rocketmq-client-go/v2"
"github.com/apache/rocketmq-client-go/v2/primitive"
"github.com/apache/rocketmq-client-go/v2/producer"
)
type OrderListener struct{}
func (o *OrderListener) ExecuteLocalTransaction(msg *primitive.Message) primitive.LocalTransactionState {
//成功执行
fmt.Println("开始执行本地逻辑")
time.Sleep(time.Second * 3)
fmt.Println("执行本地逻辑成功")
return primitive.CommitMessageState
//执行失败回滚
//fmt.Println("开始执行本地逻辑")
//time.Sleep(time.Second * 3)
//fmt.Println("执行本地逻辑失败回滚")
//return primitive.RollbackMessageState
//执行失败
//fmt.Println("开始执行本地逻辑")
//time.Sleep(time.Second * 3)
//fmt.Println("执行本地逻辑失败")
本地执行逻辑无缘无故失败 代码异常 宕机
//return primitive.UnknowState
}
func (o *OrderListener) CheckLocalTransaction(msg *primitive.MessageExt) primitive.LocalTransactionState {
fmt.Println("rocketmq的消息回查")
time.Sleep(time.Second * 15)
return primitive.CommitMessageState
}
func main() {
p, err := rocketmq.NewTransactionProducer(
&OrderListener{},
producer.WithNameServer([]string{"192.168.124.51:9876"}),
)
if err != nil {
panic("生成producer失败")
}
if err = p.Start(); err != nil {
panic("启动producer失败")
}
res, err := p.SendMessageInTransaction(context.Background(), primitive.NewMessage("TransTopic", []byte("this is transaction message")))
if err != nil {
fmt.Printf("发送失败: %s\n", err)
} else {
fmt.Printf("发送成功: %s\n", res.String())
}
time.Sleep(time.Hour)
if err = p.Shutdown(); err != nil {
panic("关闭producer失败")
}
}