前置条件:
- mysql开启binlog日志
- 部署canal服务
Springboot代码:
接口工具类:
import cn.hutool.http.HttpRequest;
import cn.hutool.http.HttpResponse;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import java.util.HashMap;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
public class HttpMethodUtil {
public int httpPost(String url,String Content) {
Map heads = new HashMap<>();
heads.put("Content-Type", "application/json;charset=UTF-8");
HttpResponse response = HttpRequest.post(url)
JSONObject jsonObject = JSON.parseObject(response.body());
int code = (int) jsonObject.get("code");
response = HttpRequest.post(url)
jsonObject = JSON.parseObject(response.body());
code = (int) jsonObject.get("code");
canal工具类:
import com.alibaba.fastjson.JSONObject;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.Message;
import java.net.InetSocketAddress;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
@Value("${canal-monitor-mysql.hostname}")
@Value("${canal-monitor-mysql.port}")
Integer canalMonitorPort;
@Value("${canal-monitor-mysql.database}")
String canalMonitorDatabaseName;
private final static int BATCH_SIZE = 100;
HttpMethodUtil httpMethodUtil;
public void startMonitorSQL() {
CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(canalMonitorHost, canalMonitorPort), "example", "", "");
log.info("数据库检测连接成功!" + canalMonitorDatabaseName);
connector.subscribe(canalMonitorDatabaseName+ "\\.jyz_jyzqgdwa");
Message message = connector.getWithoutAck(BATCH_SIZE);
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
handleDATAChange(message.getEntries());
log.error("成功断开监测连接!尝试重连");
} catch (InterruptedException e) {
private void handleDATAChange(List entrys) {
for (CanalEntry.Entry entry : entrys) {
if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
CanalEntry.RowChange rowChage;
rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),e);
CanalEntry.EventType eventType = rowChage.getEventType();
log.info("================》; binlog[{} : {}] , name[{}, {}] , eventType : {}",
entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
if (rowChage.getIsDdl()) {
log.info("================》;isDdl: true,sql:{}", rowChage.getSql());
for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {
if (eventType == CanalEntry.EventType.DELETE) {
log.info(">>>>>>>>>> 删除 >>>>>>>>>>");
log.info("DELETE:{}", rowData.getBeforeColumnsList());
} else if (eventType == CanalEntry.EventType.INSERT) {
log.info(">>>>>>>>>> 新增 >>>>>>>>>>");
JSONObject json = new JSONObject();
for (CanalEntry.Column column : rowData.getAfterColumnsList()) {
json.put(column.getName(), column.getValue());
log.info(json.toJSONString());
String url = "http://192.168.21.11:8000/api/wirte";
int code = httpMethodUtil.httpPost(url,json.toJSONString());
log.info(json.toJSONString());
log.error(json.toJSONString());
log.info(">>>>>>>>>> 更新 >>>>>>>>>>");
log.info("------->; before");
JSONObject json = new JSONObject();
for (CanalEntry.Column column : rowData.getAfterColumnsList()) {
json.put(column.getName(), column.getValue());
log.info(json.toJSONString());
String url = "http://192.168.21.11:8000/api/wirte";
int code = httpMethodUtil.httpPost(url,json.toJSONString());
log.info(json.toJSONString());
log.error(json.toJSONString());
application.properties配置文件:
canal-monitor-mysql.hostname: 192.168.21.11
canal-monitor-mysql.port: 11111
canal-monitor-mysql.database: datbases_name
logging.file.name=./log/rizhi.log
logging.pattern.dateformat=yyyy-MM-dd # 设置日期格式化
logging.logback.rollingpolicy.max-history=30