上一篇是同步调用,我们在中间加上MQ就可以实现异步调用,这种方式性能高,不易出现数据丢失问题,多源写入之间相互隔离,便于扩展更多的数据源写入。
同时也会带来一些问题,首先还是代码侵入强,其次系统复杂度会增加,因为引入了消息中间件
可能出现延时问题:MQ是异步消费模型,可能会造成延时。
这种方案也不是很推荐,简单了解学习一下就好。
下面通过SpringBoot项目演示一下,首先本地要有MQ,我这里使用RabbitMQ。若本地没有,可移步:Windows版Docker安装RabbitMQ
Linux的Docker也类似
对RabbitMQ还不是很了解的,可以打开我的主页查看RabbitMQ系列教程
这里只做最简单的MQ可靠性配置
全部依赖如下
<dependency>
<groupId>org.springframework.bootgroupId>
<artifactId>spring-boot-starter-data-elasticsearchartifactId>
dependency>
<dependency>
<groupId>org.springframework.bootgroupId>
<artifactId>spring-boot-starter-jdbcartifactId>
dependency>
<dependency>
<groupId>org.springframework.bootgroupId>
<artifactId>spring-boot-starter-webartifactId>
dependency>
<dependency>
<groupId>org.springframework.bootgroupId>
<artifactId>spring-boot-devtoolsartifactId>
<scope>runtimescope>
<optional>trueoptional>
dependency>
<dependency>
<groupId>com.baomidougroupId>
<artifactId>mybatis-plus-boot-starterartifactId>
<version>3.5.3.1version>
dependency>
<dependency>
<groupId>mysqlgroupId>
<artifactId>mysql-connector-javaartifactId>
<scope>runtimescope>
dependency>
<dependency>
<groupId>org.projectlombokgroupId>
<artifactId>lombokartifactId>
<optional>trueoptional>
dependency>
<dependency>
<groupId>org.springframework.bootgroupId>
<artifactId>spring-boot-starter-testartifactId>
<scope>testscope>
dependency>
<dependency>
<groupId>org.springframework.bootgroupId>
<artifactId>spring-boot-starter-amqpartifactId>
dependency>
注意修改Mysql,ES,rabbitmq地址及信息
# 端口号8080
server:
port: 8080
# 数据库名:mysql,用户名root,密码123456
spring:
datasource:
username: root
password: 123456
url: jdbc:mysql://mysql地址:3306/mysql?useUnicode=true&characterEncoding=UTF-8&useSSL=false&serverTimezone=Asia/Shanghai
driver-class-name: com.mysql.cj.jdbc.Driver
elasticsearch:
rest:
uris: ES地址:9200
rabbitmq:
host: rabbitmq地址
port: 5672
username: admin
password: admin
#确认消息已发送到交换机
publisher-confirm-type: correlated
#确认消息已发送到队列(Queue)
publisher-returns: true
# mybatis-plus配置
mybatis-plus:
# xml文件位置
mapper-locations: classpath:mapper/*.xml
/**
* mysql(user)与ES(user-demo)实体类
*/
@Data
@TableName(value = "user_t")
@Document(indexName = "user-demo")
public class User {
@Id
private String id;
private String userName;
private String address;
}
/**
* RabbitMQ交换机队列声明,绑定配置类
*/
@Configuration
public class Config {
//交换机名称
public static final String X_EXCHANGE = "X";
//队列名称
public static final String QUEUE_INSERT = "A";
public static final String QUEUE_DELETE = "B";
public static final String QUEUE_UPDATE = "C";
//声明交换机xExchange
@Bean("xExchange")
public DirectExchange xExchange() {
return new DirectExchange(X_EXCHANGE);
}
//声明队列A
@Bean("queueA")
public Queue queueInsert() {
return QueueBuilder.durable(QUEUE_INSERT).build();
}
//声明队列B
@Bean("queueB")
public Queue queueDelete() {
return QueueBuilder.durable(QUEUE_DELETE).build();
}
//声明队列C
@Bean("queueC")
public Queue queueUpdate() {
return QueueBuilder.durable(QUEUE_UPDATE).build();
}
//绑定交换机与队列
//A与X通过XA线路绑定
@Bean
public Binding queueInsertBindingX(@Qualifier("queueA") Queue queueA,
@Qualifier("xExchange") DirectExchange xExchange) {
return BindingBuilder.bind(queueA).to(xExchange).with("XA");
}
//B与X通过XB线路绑定
@Bean
public Binding queueDeleteBindingX(@Qualifier("queueB") Queue queueB,
@Qualifier("xExchange") DirectExchange xExchange) {
return BindingBuilder.bind(queueB).to(xExchange).with("XB");
}
//C与X通过XC线路绑定
@Bean
public Binding queueUpdateBindingX(@Qualifier("queueC") Queue queueC,
@Qualifier("xExchange") DirectExchange xExchange) {
return BindingBuilder.bind(queueC).to(xExchange).with("XC");
}
}
/**
* 回调接口
*/
@Slf4j
@Component
public class MyCallBack implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback {
@Autowired
private RabbitTemplate rabbitTemplate;
@PostConstruct
public void init() {
//内部接口注入类中
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setReturnCallback(this);
}
/**
* 交换机确定回调方法
* 1.发消息 交换机接收到消息 回调
* 1.1 correlationData 保存回调消息的ID及相关信息
* 1.2 交换机收到消息 ack=true
* 1.3 cause null
* 2.发消息 交换机接受失败 回调
* 2.1 correlationData 保存回调消息的ID及相关信息
* 2.2 交换机收到消息 ack=false
* 2.3 cause 失败原因
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
String id = correlationData != null ? correlationData.getId() : "";
if (ack) {
log.info("交换机回报消息:收到id为:{}的消息", id);
} else {
log.info("交换机回报消息:未经收到id为:{}的消息,原因为:{}", id, cause);
}
}
/**
* 队列失败回报
* @param message 消息
* @param i 返回码
* @param s 返回信息
* @param s1 交换机
* @param s2 路由
*/
@Override
public void returnedMessage(Message message, int i, String s, String s1, String s2) {
log.error("队列回报消息:消息被交换机:{}退回,路由key:{},退回原因:{}", s1,s2,s);
}
}
/**
* mysql user实体Mapper接口
*/
public interface UserMapper extends BaseMapper<User> {
}
/**
* ES user-demo实体Mapper接口
*/
@Repository
public interface UserEsMapper extends ElasticsearchRepository<User,String> {
}
此处Controller充当生产者,接到请求,先执行mysql操作,然后将消息按情况通过交换机转发到不同的队列,相应的消费者收到消息后对ES进行处理
/**
* 异步调用方式实现mysql与ES数据同步Controller/消息生产者
*/
@Slf4j
@RestController
@RequestMapping(value = "/asyn")
public class DataController {
@Resource
private IDataService dataService;
@Resource
private RabbitTemplate rabbitTemplate;
/**
* 同步更新mysql和ES的user信息
* @param user user实体
*/
@GetMapping("/update")
public void updateData(User user){
dataService.updateMysqlData(user);
String key = UUID.randomUUID().toString();
CorrelationData correlationData = new CorrelationData(key);
rabbitTemplate.convertAndSend("X","XA",user.getId(),correlationData);
log.info("Producer消息:已发送消息:{}到队列A中等待ES更新处理,消息ID:{}",user.getId(),key);
}
/**
* 查询user表信息
* @return user信息集合
*/
@GetMapping("/findData")
public List<User> findAllData(){
return dataService.findAllData();
}
/**
* 同步根据id删除mysql和ES中user对应的数据信息
* @param id 需要删除的信息id
*/
@GetMapping("/delete")
public void deleteDataById(String id){
dataService.deleteDataById(id);
String key = UUID.randomUUID().toString();
CorrelationData correlationData = new CorrelationData(key);
rabbitTemplate.convertAndSend("X","XB",id,correlationData);
log.info("Producer消息:已发送消息:{}到队列B中等待ES删除处理,消息ID:{}",id,key);
}
/**
* 同步新增mysql和ES的user数据
* @param user user实体
*/
@GetMapping("addData")
public void addData(User user){
dataService.addData(user);
String key = UUID.randomUUID().toString();
CorrelationData correlationData = new CorrelationData(key);
rabbitTemplate.convertAndSend("X","XA",user.getId(),correlationData);
log.info("Producer消息:已发送消息:{} 到队列A中等待ES新增处理,消息ID:{}",user.getId(),key);
}
/**
* 同步删除mysql和ES中所有user信息
*/
@GetMapping("deleteAll")
public void deleteAllData(){
dataService.deleteAllData();
dataService.esDeleteAllData();
}
/**
* 查询ES中所有user信息
*/
@GetMapping("findEs")
public Iterable<User> findEs(){
return dataService.findEs();
}
}
/**
* 异步调用方式实现mysql与ES数据同步Service
*/
public interface IDataService extends IService<User> {
/**
* 根据id更新mysql数据
* @param user 需要更新数据的user对象
*/
void updateMysqlData(User user);
/**
* 查询所有数据
* @return user对象集合
*/
List<User> findAllData();
/**
* mysql根据id删除信息
* @param id 需要删除信息的id
*/
void deleteDataById(String id);
/**
* mysql新增数据
* @param user 需要新增数据的对象
*/
void addData(User user);
/**
* ES根据ID删除数据
* @param id 需要删除信息的id
*/
void esDeleteDataById(String id);
/**
* ES新增/根据ID修改数据
* @param user 需要新增/根据ID修改数据的对象
*/
void esAddData (User user);
/**
* mysql删除user表所有数据
*/
void deleteAllData();
/**
* es删除index=user-demo中所有数据
*/
void esDeleteAllData();
/**
* 查询ES中所有数据信息
*/
Iterable<User> findEs();
}
/**
* 异步调用方式实现mysql与ES数据同步Service实现类
*/
@Service
public class DataServiceImpl extends ServiceImpl<UserMapper, User> implements IDataService {
@Resource
private UserMapper userMapper;
@Resource
private UserEsMapper userEsMapper;
/**
* 根据id更新mysql数据
* @param user 需要更新数据的user对象
*/
@Override
public void updateMysqlData(User user) {
userMapper.updateById(user);
}
/**
* 查询所有数据
* @return user对象集合
*/
@Override
public List<User> findAllData() {
return userMapper.selectList(null);
}
/**
* mysql根据id删除信息
* @param id 需要删除信息的id
*/
@Override
public void deleteDataById(String id) {
userMapper.deleteById(id);
}
/**
* mysql新增数据
* @param user 需要新增数据的对象
*/
@Override
public void addData(User user) {
userMapper.insert(user);
}
/**
* ES根据ID删除数据
* @param id 需要删除信息的id
*/
@Override
public void esDeleteDataById(String id) {
userEsMapper.deleteById(id);
}
/**
* ES新增/根据ID修改数据
* @param user 需要新增/根据ID修改数据的对象
*/
@Override
public void esAddData(User user) {
userEsMapper.save(user);
}
/**
* mysql删除user表所有数据
*/
@Override
public void deleteAllData() {
userMapper.delete(null);
}
/**
* es删除index=user-demo中所有数据
*/
@Override
public void esDeleteAllData() {
userEsMapper.deleteAll();
}
/**
* 查询ES中user所有信息
* @return 查询user信息集合
*/
@Override
public Iterable<User> findEs() {
return userEsMapper.findAll();
}
}
/**
* 异步调用方式实现mysql与ES数据同步消息消费者
*/
@Slf4j
@Component
public class Consumer {
@Resource
private IDataService dataService;
@Resource
private UserMapper userMapper;
//接收消息
@RabbitListener(queues="A")
public void addData(Message message){
log.info("Consumer消息:当前时间:{},收到A队列的消息:{},进行ES新增操作",new Date().toString(),new String(message.getBody()));
QueryWrapper<User> queryWrapper = new QueryWrapper<>();
queryWrapper.eq("id",new String(message.getBody()));
User user = userMapper.selectOne(queryWrapper);
dataService.esAddData(user);
log.info("ES新增/更新数据为:{}",user);
}
@RabbitListener(queues = "B")
public void delete(Message message){
log.info("Consumer消息:当前时间:{},收到B队列的消息:{},进行ES删除操作",new Date().toString(),new String(message.getBody()));
dataService.esDeleteDataById(new String(message.getBody()));
}
@RabbitListener(queues="C")
public void update(Message message){
}
}
操作完成