• xxl-job源码解读(执行器注册调度器)


    一.执行器注册调度器时序图

     

     二.部分代码片段

       1.springboot扫描xxljob配置类初始化XxlJobSpringExecutor 

        

    1. @Configuration
    2. public class XxlJobConfig {
    3. private Logger logger = LoggerFactory.getLogger(XxlJobConfig.class);
    4. @Value("${xxl.job.admin.addresses}")
    5. private String adminAddresses;
    6. @Value("${xxl.job.accessToken}")
    7. private String accessToken;
    8. @Value("${xxl.job.executor.appname}")
    9. private String appname;
    10. @Value("${xxl.job.executor.address}")
    11. private String address;
    12. @Value("${xxl.job.executor.ip}")
    13. private String ip;
    14. @Value("${xxl.job.executor.port}")
    15. private int port;
    16. @Value("${xxl.job.executor.logpath}")
    17. private String logPath;
    18. @Value("${xxl.job.executor.logretentiondays}")
    19. private int logRetentionDays;
    20. @Bean
    21. public XxlJobSpringExecutor xxlJobExecutor() {
    22. logger.info(">>>>>>>>>>> xxl-job config init.");
    23. XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
    24. xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
    25. xxlJobSpringExecutor.setAppname(appname);
    26. xxlJobSpringExecutor.setAddress(address);
    27. xxlJobSpringExecutor.setIp(ip);
    28. xxlJobSpringExecutor.setPort(port);
    29. xxlJobSpringExecutor.setAccessToken(accessToken);
    30. xxlJobSpringExecutor.setLogPath(logPath);
    31. xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);
    32. return xxlJobSpringExecutor;
    33. }
    34. }

    2. XxlJobSpringExecutor实现SmartInitializingSingleton接口执行afterSingletonsInstantiated方法

    1. public class XxlJobSpringExecutor extends XxlJobExecutor implements ApplicationContextAware, SmartInitializingSingleton, DisposableBean {
    2. // start
    3. @Override
    4. public void afterSingletonsInstantiated() {
    5. // init JobHandler Repository
    6. /*initJobHandlerRepository(applicationContext);*/
    7. // init JobHandler Repository (for method)
    8. initJobHandlerMethodRepository(applicationContext);
    9. // refresh GlueFactory
    10. GlueFactory.refreshInstance(1);
    11. // super start
    12. try {
    13. super.start();
    14. } catch (Exception e) {
    15. throw new RuntimeException(e);
    16. }
    17. }
    18. }

    3.EmbedServer内置服务类,启动netty服务,注册到调度器

    1. public class EmbedServer {
    2. private static final Logger logger = LoggerFactory.getLogger(EmbedServer.class);
    3. private ExecutorBiz executorBiz;
    4. private Thread thread;
    5. public void start(final String address, final int port, final String appname, final String accessToken) {
    6. executorBiz = new ExecutorBizImpl();
    7. try {
    8. // start server
    9. ServerBootstrap bootstrap = new ServerBootstrap();
    10. bootstrap.group(bossGroup, workerGroup)
    11. .channel(NioServerSocketChannel.class)
    12. .childHandler(new ChannelInitializer() {
    13. @Override
    14. public void initChannel(SocketChannel channel) throws Exception {
    15. channel.pipeline()
    16. .addLast(new IdleStateHandler(0, 0, 30 * 3, TimeUnit.SECONDS)) // beat 3N, close if idle
    17. .addLast(new HttpServerCodec())
    18. .addLast(new HttpObjectAggregator(5 * 1024 * 1024)) // merge request & reponse to FULL
    19. .addLast(new EmbedHttpServerHandler(executorBiz, accessToken, bizThreadPool));
    20. }
    21. })
    22. .childOption(ChannelOption.SO_KEEPALIVE, true);
    23. // bind
    24. ChannelFuture future = bootstrap.bind(port).sync();
    25. logger.info(">>>>>>>>>>> xxl-job remoting server start success, nettype = {}, port = {}", EmbedServer.class, port);
    26. // start registry
    27. startRegistry(appname, address);
    28. // wait util stop
    29. future.channel().closeFuture().sync();
    30. } catch (InterruptedException e) {
    31. logger.info(">>>>>>>>>>> xxl-job remoting server stop.");
    32. } catch (Exception e) {
    33. logger.error(">>>>>>>>>>> xxl-job remoting server error.", e);
    34. } finally {
    35. // stop
    36. try {
    37. workerGroup.shutdownGracefully();
    38. bossGroup.shutdownGracefully();
    39. } catch (Exception e) {
    40. logger.error(e.getMessage(), e);
    41. }
    42. }
    43. }
    44. });
    45. thread.setDaemon(true); // daemon, service jvm, user thread leave >>> daemon leave >>> jvm leave
    46. thread.start();
    47. }

    4.ExecutorRegistryThread注册线程类,重启一个守护线程发起http请求,完成注册

    1. public class ExecutorRegistryThread {
    2. private Thread registryThread;
    3. private volatile boolean toStop = false;
    4. public void start(final String appname, final String address){
    5. registryThread = new Thread(new Runnable() {
    6. @Override
    7. public void run() {
    8. // registry
    9. while (!toStop) {
    10. try {
    11. RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appname, address);
    12. for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {
    13. try {
    14. ReturnT registryResult = adminBiz.registry(registryParam);
    15. }
    16. } catch (Exception e) {
    17. if (!toStop) {
    18. logger.error(e.getMessage(), e);
    19. }
    20. }
    21. }
    22. // registry remove
    23. try {
    24. RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appname, address);
    25. for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {
    26. try {
    27. ReturnT registryResult = adminBiz.registryRemove(registryParam);
    28. }
    29. } catch (Exception e) {
    30. if (!toStop) {
    31. logger.error(e.getMessage(), e);
    32. }
    33. }
    34. logger.info(">>>>>>>>>>> xxl-job, executor registry thread destroy.");
    35. }
    36. });
    37. registryThread.setDaemon(true);
    38. registryThread.setName("xxl-job, executor ExecutorRegistryThread");
    39. registryThread.start();
    40. }

    三.调度服务代码片段

    1.JobApiController控制类,接收http请求

    1. @Controller
    2. @RequestMapping("/api")
    3. public class JobApiController {
    4. @Resource
    5. private AdminBiz adminBiz;
    6. /**
    7. * api
    8. *
    9. * @param uri
    10. * @param data
    11. * @return
    12. */
    13. @RequestMapping("/{uri}")
    14. @ResponseBody
    15. @PermissionLimit(limit=false)
    16. public ReturnT api(HttpServletRequest request, @PathVariable("uri") String uri, @RequestBody(required = false) String data) {
    17. // services mapping
    18. RegistryParam registryParam = GsonTool.fromJson(data, RegistryParam.class);
    19. return adminBiz.registry(registryParam);
    20. }
    21. }

    2.JobRegistryHelper注册帮助类,将注册信息存入数据库,异步响应注册结果

    1. */
    2. public class JobRegistryHelper {
    3. public ReturnT registry(RegistryParam registryParam) {
    4. // valid
    5. if (!StringUtils.hasText(registryParam.getRegistryGroup())
    6. || !StringUtils.hasText(registryParam.getRegistryKey())
    7. || !StringUtils.hasText(registryParam.getRegistryValue())) {
    8. return new ReturnT(ReturnT.FAIL_CODE, "Illegal Argument.");
    9. }
    10. // async execute
    11. registryOrRemoveThreadPool.execute(new Runnable() {
    12. @Override
    13. public void run() {
    14. int ret = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().registryUpdate(registryParam.getRegistryGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue(), new Date());
    15. if (ret < 1) {
    16. XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().registrySave(registryParam.getRegistryGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue(), new Date());
    17. // fresh
    18. freshGroupRegistryInfo(registryParam);
    19. }
    20. }
    21. });
    22. return ReturnT.SUCCESS;
    23. }
    24. }

    四.代码说明

    a.ExecutorRegistryThread,内部启动一个线程registryThread
      1.setDaemon(ture)为守护线程,程序关闭不受影响
       2.toStop在程序关闭的时候,注销注册的执行器,以免后续调度器调用无效执行器

    b.服务端,接收到http请求,将注册信息,存入到数据库,供后面,调度器调度

  • 相关阅读:
    [c语言]这c语言代码就像做完形填空一样
    附文献!艾美捷抗人IL-12/-23(p40)mAbs MT86/221研究
    前端学习笔记二.布局方式的选择与Flex布局示例
    postman接口测试工具发起webservice请求
    短信视频提取批量工具,免COOKIE,博主视频下载抓取,爬虫
    scrapy爬虫之网站图片爬取
    Kafka - 15 Kafka Offset | 自动和手动提交Offset | 指定Offset消费 | 漏消费和重复消费 | 消息积压
    【论文解读】针对生成任务的多模态图学习
    0.96OLED显示屏C51_IIC例程
    【OpenCV-Python】教程:4-3 Shi-Tomasi 角点检测
  • 原文地址:https://blog.csdn.net/zengliangxi/article/details/126895023