1、Spring 项目代码结构如下:

2、数据库资源配置文件如下:
#sql配置文件
spring.datasource.driver-class-name=com.microsoft.sqlserver.jdbc.SQLServerDriver
#.19為測試地址,.13為正式地址
spring.datasource.url=jdbc:sqlserver://172.12.100.19:1433;DatabaseName=data
spring.datasource.username=wms1234
spring.datasource.password=wms1234
spring.datasource.hikari.connection-timeout=20000
spring.datasource.hikari.connection-test-query=SELECT 1
#mybatis配置
mybatis.mapper-locations=classpath:mapper/*.xml
#配置映射的实体类
mybatis.type-aliases-package=com.device.search.dao
3、Kafaka实现类源码
package com.device.search.kafka;
import java.io.IOException;
import java.security.Timestamp;
import java.util.ArrayList;
import java.util.Properties;
import com.aliyun.api.internal.mapping.Converter;
import com.aliyun.api.internal.parser.json.JsonConverter;
import com.device.search.dao.AimerBoxlistDao;
import com.device.search.dao.AimerPackboxDao;
import com.device.search.dao.AimerToWms2BDao;
import com.device.search.model.EdiOutErpData;
import com.device.search.model.PackListData;
import com.device.search.service.AimerBoxlistService;
import com.device.search.service.AimerPackboxService;
import com.device.search.service.AimerToWms2BService;
import com.device.search.utils.TimeUtil;
import com.fasterxml.jackson.databind.JsonSerializer;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import com.google.gson.Gson;
import com.qimen.api.request.DeliveryorderBatchcreateRequest;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.LoggerFactory;
import com.device.search.model.SendData;
public class PackListSendThread implements Runnable {
private static Logger logger = LoggerFactory.getLogger(PackListSendThread.class);
private final KafkaProducer producer;
private final String topic;
public static String packListBack_flag="0";
private AimerPackboxService aimerPackboxService;
private AimerBoxlistService aimerBoxlistService;
private AimerToWms2BService aimerToWms2BService;
private final String log_path="d:\\log\\PackListBack\\";
private final String err_log="PackListBack_err_";
private final String rsp_log="PackListBack_rsp_";
private final String req_log="PackListBack_req_";
public PackListSendThread(String brokers, String topic, AimerPackboxService aimerPackboxService, AimerBoxlistService aimerBoxlistService, AimerToWms2BService aimerToWms2BService)
Properties prop = createProducerConfig(brokers);
this.producer = new KafkaProducer(prop);
this.aimerBoxlistService=aimerBoxlistService;
this.aimerPackboxService=aimerPackboxService;
this.aimerToWms2BService=aimerToWms2BService;
private static Properties createProducerConfig(String brokers) {
Properties props = new Properties();
props.put("bootstrap.servers", brokers);
props.put("acks", "all");
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
System.out.println("Produces 3 messages");
if (packListBack_flag == "0") {
List
listPackbox = aimerPackboxService.findbyid("N"); for (int i = 0; i < listPackbox.size(); i++)
String finalBox_no = listPackbox.get(i).getBoxCode();
PackListData packListData = new PackListData();
packListData.setWarehouseID(listPackbox.get(i).getWhsId());
packListData.setCARTONID(finalBox_no);
packListData.setConsigneeid(listPackbox.get(i).getCustom());
packListData.setWaveno(listPackbox.get(i).getTicno());
packListData.setGrossweight(listPackbox.get(i).getWeight());
packListData.setCartontype(listPackbox.get(i).getBoxType());
packListData.setPackno(listPackbox.get(i).getPackPort());
packListData.setPackwho(listPackbox.get(i).getOperator());
packListData.setStarttime(listPackbox.get(i).getCreateDate());
packListData.setEndtime(listPackbox.get(i).getDoneDate());
packListData.setCarrierid(listPackbox.get(i).getCysname());
packListData.setDeliverno(listPackbox.get(i).getMailno());
List
listBoxlist = aimerBoxlistService.findbyboxno(finalBox_no); if (listBoxlist.size()<=0)
logger.error("回传箱明细:没有箱明细数据"+finalBox_no);
aimerPackboxService.update_boxno_err(finalBox_no,finalBox_no+"回传箱明细:没有箱明细数据");
String ordno=listBoxlist.get(0).getOrdno();
List
listAimerToWms2BDao = aimerToWms2BService.find2bOrdnoUsr02(ordno,"2"); if (listAimerToWms2BDao.size()<=0)
logger.error("Aimer_to_wms明细:没有出库数据"+finalBox_no);
aimerPackboxService.update_boxno_err(finalBox_no,finalBox_no+":"+ordno+"Aimer_to_wms明细:没有出库数据");
List listReturnBoxlist = new ArrayList();
for (AimerBoxlistDao aimerBoxlistDao :listBoxlist)
List
aimerToWms2BDaoList = aimerToWms2BService.find2bOrdnoUsr02(aimerBoxlistDao.getOrdno(),"2"); if (aimerToWms2BDaoList.size()<=0)
logger.error("Aimer_to_wms明细:没有出库数据11"+finalBox_no);
aimerPackboxService.update_boxno_err(finalBox_no,finalBox_no+":"+aimerBoxlistDao.getOrdno()+"Aimer_to_wms明细1:没有出库数据");
shipqty = (long) (shipqty + Double.parseDouble(aimerBoxlistDao.getBoxQty()));
EdiOutErpData ediOutErpData = new EdiOutErpData();
ediOutErpData.setSku(aimerBoxlistDao.getBarCode());
ediOutErpData.setQty(aimerBoxlistDao.getBoxQty());
ediOutErpData.setSoreference1(aimerToWms2BDaoList.get(0).getUsr02());
if(listAimerToWms2BDao.get(0).getOtype().equals("全渠道O2O") || listAimerToWms2BDao.get(0).getOtype().equals("全渠道o2o")){
ediOutErpData.setSoreference1(aimerToWms2BDaoList.get(0).getOrdno());
ediOutErpData.setNotes(aimerToWms2BDaoList.get(0).getOrdno());
listReturnBoxlist.add(ediOutErpData);
if (listAimerToWms2BDao.get(0).getUsr01()!=null)
if (listAimerToWms2BDao.get(0).getUsr01().equals("B2BCK"))
if (listAimerToWms2BDao.get(0).getUsr01().equals("DBCK"))
if (listAimerToWms2BDao.get(0).getUsr01().equals("SQDCK"))
if(listAimerToWms2BDao.get(0).getOtype().equals("全渠道O2O") || listAimerToWms2BDao.get(0).getOtype().equals("全渠道o2o")){
packListData.setSOType(sotype);
packListData.setShippedQty(shipqty);
packListData.setItemData(listReturnBoxlist);
SendData sendData = new SendData();
sendData.setopr_type("add");
long time = System.currentTimeMillis();
sendData.settimestamp(Long.toString(time));
sendData.setPackListData(packListData);
String msg = new Gson().toJson(sendData);
File file=new File(log_path+ TimeUtil.convertDay(new Date()));
String req_filename=log_path+TimeUtil.convertDay(new Date())+"\\"+ req_log+packListData.getCARTONID() +"_" +TimeUtil.convertDate(new Date())+".xml" ;
TimeUtil.OutJson(msg,req_filename);
producer.send(new ProducerRecord(topic, msg), new Callback()
public void onCompletion(RecordMetadata metadata, Exception e)
String resString = new Gson().toJson(metadata);
String req_filename=log_path+TimeUtil.convertDay(new Date())+"\\"+ err_log+packListData.getCARTONID() +"_" +TimeUtil.convertDate(new Date())+".xml" ;
TimeUtil.OutJson(resString,req_filename);
} catch (IOException ioException) {
ioException.printStackTrace();
String req_filename=log_path+TimeUtil.convertDay(new Date())+"\\"+ rsp_log+packListData.getCARTONID() +"_" +TimeUtil.convertDate(new Date())+".xml" ;
TimeUtil.OutJson(resString,req_filename);
} catch (IOException ioException) {
ioException.printStackTrace();
System.out.println("Sent:" + msg + ", Partition: " + metadata.partition() + ", Offset: "
aimerPackboxService.update_boxno(finalBox_no);
String req_filename=log_path+TimeUtil.convertDay(new Date())+"\\"+ rsp_log+"eRR_" +TimeUtil.convertDate(new Date())+".xml" ;
TimeUtil.OutJson(ex.getMessage(),req_filename);
} catch (IOException e) {
