下载地址:https://www.apache.org/dyn/closer.cgi?path=rocketmq/4.9.4/rocketmq-all-4.9.4-bin-release.zip
安装:
unzip rocketmq-all-4.9.4-bin-release.zip
cd rocketmq-all-4.9.4-bin-release/
启动Name Server
nohup sh bin/mqnamesrv &
tail -f ~/logs/rocketmqlogs/namesrv.log

修改broker配置(默认8g内存太大了)
vim ./bin/runbroker.sh

启动Broker
nohup sh bin/mqbroker -n localhost:9876 &
tail -f ~/logs/rocketmqlogs/broker.log

创建stock主题
./bin/mqadmin updateTopic -n localhost:9876 -t stock -c DefaultCluster

application.properties配置
mq.nameserver.addr=192.168.174.128:9876
mq.topicname=stock
生产者发送消息:
import com.alibaba.fastjson.JSON;
import com.imooc.miaoshaproject.dao.StockLogDOMapper;
import com.imooc.miaoshaproject.dataobject.StockLogDO;
import com.imooc.miaoshaproject.service.OrderService;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.*;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.nio.charset.Charset;
import java.util.HashMap;
import java.util.Map;
/**
* Created by hzllb on 2019/2/23.
*/
@Component
public class MqProducer {
private DefaultMQProducer producer; // 默认生产者
private TransactionMQProducer transactionMQProducer; // 事务型生产者
@Value("${mq.nameserver.addr}")
private String nameAddr;
@Value("${mq.topicname}")
private String topicName;
@Autowired
private OrderService orderService;
@Autowired
private StockLogDOMapper stockLogDOMapper;
@PostConstruct
public void init() throws MQClientException {
//做mq producer的初始化
producer = new DefaultMQProducer("producer_group");
producer.setNamesrvAddr(nameAddr);
producer.start();
transactionMQProducer = new TransactionMQProducer("transaction_producer_group");
transactionMQProducer.setNamesrvAddr(nameAddr);
transactionMQProducer.start();
transactionMQProducer.setTransactionListener(new TransactionListener() {
// 第2步执行
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
//真正要做的事 创建订单
Integer itemId = (Integer) ((Map)arg).get("itemId");
Integer promoId = (Integer) ((Map)arg).get("promoId");
Integer userId = (Integer) ((Map)arg).get("userId");
Integer amount = (Integer) ((Map)arg).get("amount");
String stockLogId = (String) ((Map)arg).get("stockLogId");
try {
orderService.createOrder(userId,itemId,promoId,amount,stockLogId);
} catch (Exception e) {
e.printStackTrace();
//设置对应的stockLog为回滚状态
StockLogDO stockLogDO = stockLogDOMapper.selectByPrimaryKey(stockLogId);
stockLogDO.setStatus(3);
stockLogDOMapper.updateByPrimaryKeySelective(stockLogDO);
return LocalTransactionState.ROLLBACK_MESSAGE;
}
return LocalTransactionState.COMMIT_MESSAGE;
}
// 第2步执行出错后,执行
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
//根据是否扣减库存成功,来判断要返回COMMIT,ROLLBACK还是继续UNKNOWN
String jsonString = new String(msg.getBody());
Map<String,Object>map = JSON.parseObject(jsonString, Map.class);
Integer itemId = (Integer) map.get("itemId");
Integer amount = (Integer) map.get("amount");
String stockLogId = (String) map.get("stockLogId");
StockLogDO stockLogDO = stockLogDOMapper.selectByPrimaryKey(stockLogId);
if(stockLogDO == null){
return LocalTransactionState.UNKNOW;
}
if(stockLogDO.getStatus().intValue() == 2){
return LocalTransactionState.COMMIT_MESSAGE;
}else if(stockLogDO.getStatus().intValue() == 1){
return LocalTransactionState.UNKNOW;
}
return LocalTransactionState.ROLLBACK_MESSAGE;
}
});
}
//事务型同步库存扣减消息 第1步执行
public boolean transactionAsyncReduceStock(Integer userId,Integer itemId,Integer promoId,Integer amount,String stockLogId){
Map<String,Object> bodyMap = new HashMap<>();
bodyMap.put("itemId",itemId);
bodyMap.put("amount",amount);
bodyMap.put("stockLogId",stockLogId);
Map<String,Object> argsMap = new HashMap<>();
argsMap.put("itemId",itemId);
argsMap.put("amount",amount);
argsMap.put("userId",userId);
argsMap.put("promoId",promoId);
argsMap.put("stockLogId",stockLogId);
Message message = new Message(topicName,"increase",
JSON.toJSON(bodyMap).toString().getBytes(Charset.forName("UTF-8")));
TransactionSendResult sendResult = null;
try {
sendResult = transactionMQProducer.sendMessageInTransaction(message,argsMap);
} catch (MQClientException e) {
e.printStackTrace();
return false;
}
if(sendResult.getLocalTransactionState() == LocalTransactionState.ROLLBACK_MESSAGE){
return false;
}else if(sendResult.getLocalTransactionState() == LocalTransactionState.COMMIT_MESSAGE){
return true;
}else{
return false;
}
}
}
消费者
import com.alibaba.fastjson.JSON;
import com.imooc.miaoshaproject.dao.ItemStockDOMapper;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.List;
import java.util.Map;
/**
* Created by hzllb on 2019/2/23.
*/
@Component
public class MqConsumer {
private DefaultMQPushConsumer consumer;
@Value("${mq.nameserver.addr}")
private String nameAddr;
@Value("${mq.topicname}")
private String topicName;
@Autowired
private ItemStockDOMapper itemStockDOMapper;
@PostConstruct
public void init() throws MQClientException {
consumer = new DefaultMQPushConsumer("stock_consumer_group");
consumer.setNamesrvAddr(nameAddr);
consumer.subscribe(topicName,"*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
//实现库存真正到数据库内扣减的逻辑
Message msg = msgs.get(0);
String jsonString = new String(msg.getBody());
Map<String,Object>map = JSON.parseObject(jsonString, Map.class);
Integer itemId = (Integer) map.get("itemId");
Integer amount = (Integer) map.get("amount");
itemStockDOMapper.decreaseStock(itemId,amount);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
}
}