• xxl-job+rabbitmq 进行定时的微信消息推送


    下载xxl-job的源码

    https://gitee.com/xuxueli0323/xxl-job

    项目结构

    |_doc:项目文档,包含数据库初始化 sql 和架构图等
    |_xxl-job-admin:调度中心
    |_xxl-job-core:公共依赖
    |_xxl-job-executor-samples:执行器Sample示例(选择合适的版本执行器,可直接使用,也可以参考其并将现有项目改造成执行器)
        |__xxl-job-executor-sample-springboot:Springboot版本,通过Springboot管理执行器,推荐这种方式;
        |__xxl-job-executor-sample-frameless:无框架版本;
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    部署调度中心

    • 首先初始化数据库,在MySQL中创建名为 xxl_job 的数据库,执行 ./doc/db/tables_xxl_job.sql 文件

    • 其次修改 xxl-job-admin 项目的配置文件 ./xxl-job-admin/src/main/resources/application.properties 将数据库配置改成自己本地即可。

    • 最后启动调度中心项目,访问项目地址:http://localhost:8080/xxl-job-admin/,默认账户:admin + 123456

    • 后续部署时可以直接将项目进行打包,然后部署到服务器上
      在这里插入图片描述

    添加XXL-JOB依赖和配置

            <!-- xxl-job-core -->
            <dependency>
                <groupId>com.xuxueli</groupId>
                <artifactId>xxl-job-core</artifactId>
                <version>2.3.0</version>
            </dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    @Configuration
    public class XxlJobConfig {
        private Logger logger = LoggerFactory.getLogger(XxlJobConfig.class);
    
    //调度中心部署跟地址 [选填]:如调度中心集群部署存在多个地址则用逗号分隔。
    //执行器将会使用该地址进行"执行器心跳注册"和"任务结果回调";为空则关闭自动注册;
        @Value("${xxl.job.admin.addresses}")
        private String adminAddresses;
        
    // 执行器通讯TOKEN [选填]:非空时启用;
        @Value("${xxl.job.accessToken}")
        private String accessToken;
        
    //执行器AppName [选填]:执行器心跳注册分组依据;为空则关闭自动注册
        @Value("${xxl.job.executor.appname}")
        private String appname;
    
        @Value("${xxl.job.executor.address}")
        private String address;
    //执行器IP [选填]:默认为空表示自动获取IP,多网卡时可手动设置指定IP,该IP不会绑定Host仅作为通讯实用;
    //地址信息用于 "执行器注册" 和 "调度中心请求并触发任务";
        @Value("${xxl.job.executor.ip}")
        private String ip;
    //### 执行器端口号 [选填]:小于等于0则自动获取;默认端口为9999,单机部署多个执行器时,注意要配置不同执行器端口;
        @Value("${xxl.job.executor.port}")
        private int port;
    //### 执行器运行日志文件存储磁盘路径 [选填] :需要对该路径拥有读写权限;为空则使用默认路径;
        @Value("${xxl.job.executor.logpath}")
        private String logPath;
    // 执行器日志保存天数 [选填] :值大于3时生效,启用执行器Log文件定期清理功能,否则不生效;
        @Value("${xxl.job.executor.logretentiondays}")
        private int logRetentionDays;
    
    
        @Bean
        public XxlJobSpringExecutor xxlJobExecutor() {
            logger.info(">>>>>>>>>>> xxl-job config init.");
            XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
            xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
            xxlJobSpringExecutor.setAppname(appname);
            xxlJobSpringExecutor.setAddress(address);
            xxlJobSpringExecutor.setIp(ip);
            xxlJobSpringExecutor.setPort(port);
            xxlJobSpringExecutor.setAccessToken(accessToken);
            xxlJobSpringExecutor.setLogPath(logPath);
            xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);
    
            return xxlJobSpringExecutor;
        }
    
        /**
         * 针对多网卡、容器内部署等情况,可借助 "spring-cloud-commons" 提供的 "InetUtils" 组件灵活定制注册IP;
         *
         *      1、引入依赖:
         *          
         *             org.springframework.cloud
         *             spring-cloud-commons
         *             ${version}
         *         
         *
         *      2、配置文件,或者容器启动变量
         *          spring.cloud.inetutils.preferred-networks: 'xxx.xxx.xxx.'
         *
         *      3、获取IP
         *          String ip_ = inetUtils.findFirstNonLoopbackHostInfo().getIpAddress();
         */
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68

    修改执行器项目配置文件

    ./xxl-job-executor-sample-springboot/src/main/resources/application.properties 中配置执行器的相关参数。

    ### 调度中心部署跟地址 [选填]:如调度中心集群部署存在多个地址则用逗号分隔。执行器将会使用该地址进行"执行器心跳注册""任务结果回调";为空则关闭自动注册;
    xxl.job.admin.addresses=http://127.0.0.1:8080/xxl-job-admin
    ### 执行器通讯TOKEN [选填]:非空时启用;
    xxl.job.accessToken=
    ### 执行器AppName [选填]:执行器心跳注册分组依据;为空则关闭自动注册
    xxl.job.executor.appname=xxl-job-executor-sample
    ### 执行器注册 [选填]:优先使用该配置作为注册地址,为空时使用内嵌服务 ”IP:PORT“ 作为注册地址。从而更灵活的支持容器类型执行器动态IP和动态映射端口问题。
    xxl.job.executor.address=
    ### 执行器IP [选填]:默认为空表示自动获取IP,多网卡时可手动设置指定IP,该IP不会绑定Host仅作为通讯实用;地址信息用于 "执行器注册""调度中心请求并触发任务";
    xxl.job.executor.ip=
    ### 执行器端口号 [选填]:小于等于0则自动获取;默认端口为9999,单机部署多个执行器时,注意要配置不同执行器端口;
    xxl.job.executor.port=9999
    ### 执行器运行日志文件存储磁盘路径 [选填] :需要对该路径拥有读写权限;为空则使用默认路径;
    xxl.job.executor.logpath=/data/applogs/xxl-job/jobhandler
    ### 执行器日志文件保存天数 [选填] : 过期日志自动清理, 限制值大于等于3时生效; 否则,-1, 关闭自动清理功能;
    xxl.job.executor.logretentiondays=30
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    调度中心添加定时任务配置

    添加执行器

    在这里插入图片描述

    添加定时任务

    在这里插入图片描述

    编写定时任务代码

    @Slf4j
    @Component
    public class WechatRemindXxlJob {
    	//可以使用 XxlJobHelper.log() 打印日志,在调度中心平台上可以直接看到日志详情。
        private static Logger logger = LoggerFactory.getLogger(SampleXxlJob.class);
    
        @Autowired
        private AppointLogService appointLogService;
    
    
        // @XxlJob("")值为调度中心新增任务时的控制器名称
        @XxlJob("startRemindHandler")
        public void startRemind(){
            try {
            	//XxlJobHelper.getJobParam():获取定时任务中的参数,多个时可用“,”分割
                bean.setUserId(XxlJobHelper.getJobParam());
                XxlJobHelper.log("XXL-JOB, Start AppointLogWechatRemindSix 6点开启");
                appointLogService.startRemind(bean);
            } catch (Exception e) {
                e.printStackTrace();
                XxlJobHelper.log("定时活动消息提示失败"+e.getMessage());
            }
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25

    真正执行的代码

        @Override
        public void startRemindSix(bean) throws ParseException {
    
            SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
            SimpleDateFormat sdf2 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
            String time = sdf.format(new Date()) + " 00:00:00";
            List<AppointLog> todayLogs = AppointLogRepository.findByStartTime(sdf2.parse(time));
    
            //今天没有预约数据
            if (CollectionUtils.isEmpty(todayLogs)){
                return;
            }
            //将查询的预约数据发送到mq
            todayLogs.forEach(log -> rabbitTemplate.convertAndSend(MQConstant.RECORD_NOTICE,  JSONObject.toJSONString(log)));
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    mq的接听者监听到进行消费并发送微信推送

        @RabbitListener(bindings = @QueueBinding(
                value = @Queue(value = MQConstant.RECORD_NOTICE, durable = "true"),
                exchange = @Exchange(value = MQConstant.RECORD_NOTICE))
        )
        public void saveOrUpdate(String json) {
            AppointLog bean = JSON.parseObject(json, AppointLog.class);
    
            SimpleDateFormat sdf = new SimpleDateFormat("MM-dd HH:mm");
            //每条要发送的数据内容
            String typeName = SuperviseType.getTypeName(bean.getType());
            String time = sdf.format(bean.getStartTime()) + "~" + sdf.format(bean.getEndTime());
            String address = bean.getPlaceName();
            WechatSend(bean.getUserId(), time, address, typeName, bean.getId());
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    微信消息推送

    要想实现消息订阅及发送总体有几个步骤,一、选择消息模板,二、提醒用户订阅模板,三、给用户发送订阅消息。

    首先,在微信公众平台开通消息推送功能,并添加消息模板。可以从模板库选择模板也可以创建一个模板,模板添加之后,模板ID我们接下来要用的。
    发送模板消息需要用到accesstoken、formId和openID。formID就是消息模板ID,openID我们最好在获取用户信息或用户登录时储存到全局变量里。

    参考链接: 参考文章

    创建模板

    在这里插入图片描述

    在这里插入图片描述
    在这里插入图片描述

    调用小程序推送接口

        /**
         * 微信小程序通知发送
         *
         * @param userId
         * @param time
         * @param address
         * @param typeName
         */
        private void WechatSend(String userId, String time, String address, String typeName, String recordId) {
    
    
    //构建要发送的消息内容  一个TemplateField代表一个列
            List<TemplateField> list = new ArrayList<>();
    
            TemplateField templateField1 = new TemplateField();
            templateField1.setName("thing1");
            templateField1.setValue("xxxx");
            list.add(templateField1);
    
    
            TemplateField templateField2 = new TemplateField();
            templateField2.setName("character_string2");
            templateField2.setValue(time);
            list.add(templateField2);
    
            TemplateField templateField3 = new TemplateField();
            templateField3.setName("thing3");
            templateField3.setValue(address);
            list.add(templateField3);
    
            TemplateField templateField4 = new TemplateField();
            templateField4.setName("thing4");
            templateField4.setValue("请前往签到或取消");
            list.add(templateField4);
    
            TemplateField templateField5 = new TemplateField();
            templateField5.setName("thing5");
            templateField5.setValue(typeName);
            list.add(templateField5);
    
            WxMsgDTO wxMsgDTO = new WxMsgDTO();
            wxMsgDTO.setUserId(userId);
            wxMsgDTO.setTemplateId(sendTemplateId);
            wxMsgDTO.setLink(pages+recordId);
            wxMsgDTO.setFields(list);
            iWechatService.Send(wxMsgDTO);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
        @Override
        public Result Send(WxMsg bean) {
            Messages messages = wechatAccountUtil.getWechat(bean.getTargetScid()).msg();
    
            WxUserinfo wxUserinfo = wxUserinfoRepository.findFirstByUidAndTypeAndDeletedAtIsNull(bean.getUserId(),2);
            Long messageLong = messages.subscribeSendTemplate(wxUserinfo.getOpenid(), bean.getTemplateId(), bean.getLink(), bean.getFields(), null);
            return success(messageLong);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
        public Result subscribeSend(WxMsg bean) {
            WxUserinfo wxUserinfo = wxUserinfoRepository.findFirstByUidAndTypeAndDeletedAtIsNull(bean.getUserId(),2);
            Long messageLong = messages.subscribeSendTemplate(wxUserinfo.getOpenid(), bean.getTemplateId(), bean.getLink(), bean.getFields(), null);
            return success(messageLong);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
        public Long subscribeSendTemplate(String accessToken, String openId, String templateId, String link, List<TemplateField> fields, MiniProgram miniProgram, String requestUrl) {
            checkNotNullAndEmpty(accessToken, "accessToken");
            checkNotNullAndEmpty(openId, "openId");
            checkNotNullAndEmpty(templateId, "templateId");
    
            String url = requestUrl + accessToken;
            Map<String, Object> params = buildSubscribeParams(openId, templateId, link, fields, miniProgram);
    
            Map<String, Object> resp = doPost(url, params);
            Object msgId = resp.get("msgid");
            return msgId instanceof Long ? (Long) msgId : ((Integer) msgId).longValue();
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    构建参数

    private Map<String, Object> buildSubscribeParams(String openId, String templateId, String link, List<TemplateField> fields, MiniProgram miniProgram) {
            Map<String, Object> params = Maps.newHashMapWithExpectedSize(4);
            params.put("touser", openId);
            params.put("template_id", templateId);
            if (!Strings.isNullOrEmpty(link)) {
                params.put("page", link);
            }
            if (!ObjectUtils.isEmpty(miniProgram)) {
                params.put("miniprogram", miniProgram);
            }
            if (fields != null && !fields.isEmpty()) {
                Map<String, Map<String, String>> data = Maps.newHashMapWithExpectedSize(fields.size());
                Map<String, String> dataItem;
                for (TemplateField field : fields) {
                    dataItem = Maps.newHashMapWithExpectedSize(2);
                    dataItem.put("value", field.getValue());
                    dataItem.put("color", field.getColor());
                    data.put(field.getName(), dataItem);
                }
                params.put("data", data);
            }
            return params;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
  • 相关阅读:
    程序媛的mac修炼手册-- 2024如何彻底卸载Python
    [数据集][VOC]眼睛佩戴数据集VOC格式6000张
    ASML大举向中国出口光刻机,或在于忧虑中国光刻机技术取得突破
    【Edge】微软Edge每次启动自动导入Chrome收藏夹,无法取消“每次启动浏览器时导入浏览数据”功能的解决方法(202311)
    ThinkPHP+基于ThinkPHP的图书馆管理系统 毕业设计-附源码311833
    只有真正将产业互联网看成是一种嬗变的过程,才能把握其精髓和原始奥义
    迅为RK3399开发板创建android工程
    Python自动化必不可少的测试框架 — pytest
    expdp导出分区表缓慢排查(Streams AQ: waiting for messages in the queue )
    11.10 校招 实习 内推 面经
  • 原文地址:https://blog.csdn.net/weixin_45081813/article/details/126491969