• Spring 框架下如何调用kafka


    1、Spring 项目代码结构如下:

    2、数据库资源配置文件如下:

    #sql配置文件
    spring.datasource.driver-class-name=com.microsoft.sqlserver.jdbc.SQLServerDriver
    #.19為測試地址,.13為正式地址
    spring.datasource.url=jdbc:sqlserver://172.12.100.19:1433;DatabaseName=data
    spring.datasource.username=wms1234
    spring.datasource.password=wms1234
    spring.datasource.hikari.connection-timeout=20000
    spring.datasource.hikari.connection-test-query=SELECT 1 
    #mybatis配置
    mybatis.mapper-locations=classpath:mapper/*.xml
    #配置映射的实体类
    mybatis.type-aliases-package=com.device.search.dao 

    3、Kafaka实现类源码 

    1. package com.device.search.kafka;
    2. import java.io.File;
    3. import java.io.IOException;
    4. import java.security.Timestamp;
    5. import java.util.ArrayList;
    6. import java.util.Date;
    7. import java.util.List;
    8. import java.util.Properties;
    9. import com.aliyun.api.internal.mapping.Converter;
    10. import com.aliyun.api.internal.parser.json.JsonConverter;
    11. import com.device.search.dao.AimerBoxlistDao;
    12. import com.device.search.dao.AimerPackboxDao;
    13. import com.device.search.dao.AimerToWms2BDao;
    14. import com.device.search.model.EdiOutErpData;
    15. import com.device.search.model.PackListData;
    16. import com.device.search.service.AimerBoxlistService;
    17. import com.device.search.service.AimerPackboxService;
    18. import com.device.search.service.AimerToWms2BService;
    19. import com.device.search.utils.TimeUtil;
    20. import com.fasterxml.jackson.databind.JsonSerializer;
    21. import com.fasterxml.jackson.databind.annotation.JsonSerialize;
    22. import com.google.gson.Gson;
    23. import com.qimen.api.request.DeliveryorderBatchcreateRequest;
    24. import org.apache.kafka.clients.producer.Callback;
    25. import org.apache.kafka.clients.producer.KafkaProducer;
    26. import org.apache.kafka.clients.producer.ProducerRecord;
    27. import org.apache.kafka.clients.producer.RecordMetadata;
    28. import org.slf4j.Logger;
    29. import org.slf4j.LoggerFactory;
    30. import com.device.search.model.SendData;
    31. public class PackListSendThread implements Runnable {
    32. private static Logger logger = LoggerFactory.getLogger(PackListSendThread.class);
    33. private final KafkaProducer producer;
    34. private final String topic;
    35. public static String packListBack_flag="0";
    36. private AimerPackboxService aimerPackboxService;
    37. private AimerBoxlistService aimerBoxlistService;
    38. private AimerToWms2BService aimerToWms2BService;
    39. private final String log_path="d:\\log\\PackListBack\\";
    40. private final String err_log="PackListBack_err_";
    41. private final String rsp_log="PackListBack_rsp_";
    42. private final String req_log="PackListBack_req_";
    43. public PackListSendThread(String brokers, String topic, AimerPackboxService aimerPackboxService, AimerBoxlistService aimerBoxlistService, AimerToWms2BService aimerToWms2BService)
    44. {
    45. Properties prop = createProducerConfig(brokers);
    46. this.producer = new KafkaProducer(prop);
    47. this.topic = topic;
    48. this.aimerBoxlistService=aimerBoxlistService;
    49. this.aimerPackboxService=aimerPackboxService;
    50. this.aimerToWms2BService=aimerToWms2BService;
    51. }
    52. private static Properties createProducerConfig(String brokers) {
    53. Properties props = new Properties();
    54. props.put("bootstrap.servers", brokers);
    55. props.put("acks", "all");
    56. props.put("retries", 0);
    57. props.put("batch.size", 16384);
    58. props.put("linger.ms", 1);
    59. props.put("buffer.memory", 33554432);
    60. props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    61. props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    62. return props;
    63. }
    64. @Override
    65. public void run() {
    66. System.out.println("Produces 3 messages");
    67. while (true) {
    68. try {
    69. // if (packListBack_flag == "0") {
    70. // continue;
    71. // }
    72. synchronized (this) {
    73. if (packListBack_flag == "0") {
    74. continue;
    75. }
    76. }
    77. List listPackbox = aimerPackboxService.findbyid("N");
    78. for (int i = 0; i < listPackbox.size(); i++)
    79. {
    80. String finalBox_no = listPackbox.get(i).getBoxCode();
    81. String sotype="SC";
    82. PackListData packListData = new PackListData();
    83. packListData.setWarehouseID(listPackbox.get(i).getWhsId());
    84. packListData.setCARTONID(finalBox_no);
    85. packListData.setConsigneeid(listPackbox.get(i).getCustom());
    86. packListData.setWaveno(listPackbox.get(i).getTicno());
    87. packListData.setGrossweight(listPackbox.get(i).getWeight());
    88. packListData.setCartontype(listPackbox.get(i).getBoxType());
    89. packListData.setPackno(listPackbox.get(i).getPackPort());
    90. packListData.setPackwho(listPackbox.get(i).getOperator());
    91. packListData.setStarttime(listPackbox.get(i).getCreateDate());
    92. packListData.setEndtime(listPackbox.get(i).getDoneDate()); //20220428
    93. //2022.04.08 toC箱单增加运单号和承运商
    94. packListData.setCarrierid(listPackbox.get(i).getCysname());
    95. packListData.setDeliverno(listPackbox.get(i).getMailno());
    96. List listBoxlist = aimerBoxlistService.findbyboxno(finalBox_no);
    97. long shipqty=0;
    98. if (listBoxlist.size()<=0)//没有箱明细报错
    99. {
    100. logger.error("回传箱明细:没有箱明细数据"+finalBox_no);
    101. aimerPackboxService.update_boxno_err(finalBox_no,finalBox_no+"回传箱明细:没有箱明细数据");
    102. continue;
    103. }
    104. String ordno=listBoxlist.get(0).getOrdno();
    105. List listAimerToWms2BDao = aimerToWms2BService.find2bOrdnoUsr02(ordno,"2");
    106. if (listAimerToWms2BDao.size()<=0)//没有listAimerToWms2BDao
    107. {
    108. logger.error("Aimer_to_wms明细:没有出库数据"+finalBox_no);
    109. aimerPackboxService.update_boxno_err(finalBox_no,finalBox_no+":"+ordno+"Aimer_to_wms明细:没有出库数据");
    110. continue;
    111. }
    112. List listReturnBoxlist = new ArrayList();
    113. for (AimerBoxlistDao aimerBoxlistDao :listBoxlist)
    114. {
    115. //20220402 循环查询箱号对应每个单据的操作号,有一个箱号里多个单子。
    116. List aimerToWms2BDaoList = aimerToWms2BService.find2bOrdnoUsr02(aimerBoxlistDao.getOrdno(),"2");
    117. if (aimerToWms2BDaoList.size()<=0)//没有listAimerToWms2BDao
    118. {
    119. logger.error("Aimer_to_wms明细:没有出库数据11"+finalBox_no);
    120. aimerPackboxService.update_boxno_err(finalBox_no,finalBox_no+":"+aimerBoxlistDao.getOrdno()+"Aimer_to_wms明细1:没有出库数据");
    121. continue;
    122. }
    123. shipqty = (long) (shipqty + Double.parseDouble(aimerBoxlistDao.getBoxQty()));
    124. EdiOutErpData ediOutErpData = new EdiOutErpData();
    125. ediOutErpData.setSku(aimerBoxlistDao.getBarCode());
    126. ediOutErpData.setQty(aimerBoxlistDao.getBoxQty());
    127. ediOutErpData.setSoreference1(aimerToWms2BDaoList.get(0).getUsr02());
    128. //toC出库箱单明细操作好通过KAFKA回传 --2022.04.08
    129. if(listAimerToWms2BDao.get(0).getOtype().equals("全渠道O2O") || listAimerToWms2BDao.get(0).getOtype().equals("全渠道o2o")){
    130. ediOutErpData.setSoreference1(aimerToWms2BDaoList.get(0).getOrdno());
    131. }
    132. ediOutErpData.setNotes(aimerToWms2BDaoList.get(0).getOrdno());
    133. listReturnBoxlist.add(ediOutErpData);
    134. }
    135. if (listAimerToWms2BDao.get(0).getUsr01()!=null)//202203029
    136. {
    137. if (listAimerToWms2BDao.get(0).getUsr01().equals("B2BCK"))
    138. {
    139. sotype = "SC";
    140. }
    141. if (listAimerToWms2BDao.get(0).getUsr01().equals("DBCK"))
    142. {
    143. sotype = "SC";
    144. }
    145. if (listAimerToWms2BDao.get(0).getUsr01().equals("SQDCK"))
    146. {
    147. sotype = "OT";
    148. }
    149. }
    150. //toC出库箱单明细通过KAFKA回传 --2022.04.08
    151. if(listAimerToWms2BDao.get(0).getOtype().equals("全渠道O2O") || listAimerToWms2BDao.get(0).getOtype().equals("全渠道o2o")){
    152. sotype = "SO";
    153. }
    154. packListData.setSOType(sotype);
    155. packListData.setShippedQty(shipqty);
    156. packListData.setItemData(listReturnBoxlist);
    157. SendData sendData = new SendData();
    158. sendData.setopr_type("add");
    159. long time = System.currentTimeMillis();
    160. sendData.settimestamp(Long.toString(time));
    161. sendData.setPackListData(packListData);
    162. String msg = new Gson().toJson(sendData);
    163. File file=new File(log_path+ TimeUtil.convertDay(new Date()));
    164. if(!file.exists()){//如果文件夹不存在
    165. file.mkdirs();//创建文件夹
    166. }
    167. String req_filename=log_path+TimeUtil.convertDay(new Date())+"\\"+ req_log+packListData.getCARTONID() +"_" +TimeUtil.convertDate(new Date())+".xml" ;
    168. TimeUtil.OutJson(msg,req_filename);
    169. producer.send(new ProducerRecord(topic, msg), new Callback()
    170. {
    171. public void onCompletion(RecordMetadata metadata, Exception e)
    172. {
    173. String resString = new Gson().toJson(metadata);
    174. if (e != null)
    175. {
    176. String req_filename=log_path+TimeUtil.convertDay(new Date())+"\\"+ err_log+packListData.getCARTONID() +"_" +TimeUtil.convertDate(new Date())+".xml" ;
    177. try {
    178. TimeUtil.OutJson(resString,req_filename);
    179. } catch (IOException ioException) {
    180. ioException.printStackTrace();
    181. }
    182. e.printStackTrace();
    183. }
    184. String req_filename=log_path+TimeUtil.convertDay(new Date())+"\\"+ rsp_log+packListData.getCARTONID() +"_" +TimeUtil.convertDate(new Date())+".xml" ;
    185. try {
    186. TimeUtil.OutJson(resString,req_filename);
    187. } catch (IOException ioException) {
    188. ioException.printStackTrace();
    189. }
    190. System.out.println("Sent:" + msg + ", Partition: " + metadata.partition() + ", Offset: "
    191. + metadata.offset());
    192. aimerPackboxService.update_boxno(finalBox_no);
    193. }
    194. });
    195. }
    196. // closes producer
    197. //producer.close();
    198. Thread.sleep(3000);
    199. }catch (Exception ex)
    200. {
    201. String req_filename=log_path+TimeUtil.convertDay(new Date())+"\\"+ rsp_log+"eRR_" +TimeUtil.convertDate(new Date())+".xml" ;
    202. try {
    203. TimeUtil.OutJson(ex.getMessage(),req_filename);
    204. } catch (IOException e) {
    205. e.printStackTrace();
    206. }
    207. continue;
    208. }
    209. }
    210. }
    211. }

  • 相关阅读:
    if消除术之 Map + Function
    DevExpress开发WPF应用实现对话框总结
    SpringCloud和SpringBoot版本对应关系
    【C++进阶】:C++类型转换
    UE4 Sequence添加基础动画效果 (04-在序列中使用粒子效果)
    《IEEE Transactions on Robotics》发表!北京大学研究团队推出具有多种运动模态的软体两栖机器人
    SpringBoot整合knife 4j
    基于SOCKET编程多人聊天软件HTML+JSON报文
    什么是集成测试?集成测试方法有哪些?
    ChatGPT突然上线APP!iPhone可用、速度更快,GPT-4用量限制疑似取消
  • 原文地址:https://blog.csdn.net/xdpcxq/article/details/128182638