

- @Configuration
- public class XxlJobConfig {
- private Logger logger = LoggerFactory.getLogger(XxlJobConfig.class);
-
- @Value("${xxl.job.admin.addresses}")
- private String adminAddresses;
-
- @Value("${xxl.job.accessToken}")
- private String accessToken;
-
- @Value("${xxl.job.executor.appname}")
- private String appname;
-
- @Value("${xxl.job.executor.address}")
- private String address;
-
- @Value("${xxl.job.executor.ip}")
- private String ip;
-
- @Value("${xxl.job.executor.port}")
- private int port;
-
- @Value("${xxl.job.executor.logpath}")
- private String logPath;
-
- @Value("${xxl.job.executor.logretentiondays}")
- private int logRetentionDays;
-
-
- @Bean
- public XxlJobSpringExecutor xxlJobExecutor() {
- logger.info(">>>>>>>>>>> xxl-job config init.");
- XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
- xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
- xxlJobSpringExecutor.setAppname(appname);
- xxlJobSpringExecutor.setAddress(address);
- xxlJobSpringExecutor.setIp(ip);
- xxlJobSpringExecutor.setPort(port);
- xxlJobSpringExecutor.setAccessToken(accessToken);
- xxlJobSpringExecutor.setLogPath(logPath);
- xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);
-
- return xxlJobSpringExecutor;
- }
- }
- public class XxlJobSpringExecutor extends XxlJobExecutor implements ApplicationContextAware, SmartInitializingSingleton, DisposableBean {
-
-
- // start
- @Override
- public void afterSingletonsInstantiated() {
-
- // init JobHandler Repository
- /*initJobHandlerRepository(applicationContext);*/
-
- // init JobHandler Repository (for method)
- initJobHandlerMethodRepository(applicationContext);
-
- // refresh GlueFactory
- GlueFactory.refreshInstance(1);
-
- // super start
- try {
- super.start();
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
- }
- public class EmbedServer {
- private static final Logger logger = LoggerFactory.getLogger(EmbedServer.class);
-
- private ExecutorBiz executorBiz;
- private Thread thread;
-
- public void start(final String address, final int port, final String appname, final String accessToken) {
- executorBiz = new ExecutorBizImpl();
-
- try {
- // start server
- ServerBootstrap bootstrap = new ServerBootstrap();
- bootstrap.group(bossGroup, workerGroup)
- .channel(NioServerSocketChannel.class)
- .childHandler(new ChannelInitializer
() { - @Override
- public void initChannel(SocketChannel channel) throws Exception {
- channel.pipeline()
- .addLast(new IdleStateHandler(0, 0, 30 * 3, TimeUnit.SECONDS)) // beat 3N, close if idle
- .addLast(new HttpServerCodec())
- .addLast(new HttpObjectAggregator(5 * 1024 * 1024)) // merge request & reponse to FULL
- .addLast(new EmbedHttpServerHandler(executorBiz, accessToken, bizThreadPool));
- }
- })
- .childOption(ChannelOption.SO_KEEPALIVE, true);
-
- // bind
- ChannelFuture future = bootstrap.bind(port).sync();
-
- logger.info(">>>>>>>>>>> xxl-job remoting server start success, nettype = {}, port = {}", EmbedServer.class, port);
-
- // start registry
- startRegistry(appname, address);
-
- // wait util stop
- future.channel().closeFuture().sync();
-
- } catch (InterruptedException e) {
- logger.info(">>>>>>>>>>> xxl-job remoting server stop.");
- } catch (Exception e) {
- logger.error(">>>>>>>>>>> xxl-job remoting server error.", e);
- } finally {
- // stop
- try {
- workerGroup.shutdownGracefully();
- bossGroup.shutdownGracefully();
- } catch (Exception e) {
- logger.error(e.getMessage(), e);
- }
- }
- }
- });
- thread.setDaemon(true); // daemon, service jvm, user thread leave >>> daemon leave >>> jvm leave
- thread.start();
- }
- public class ExecutorRegistryThread {
-
- private Thread registryThread;
- private volatile boolean toStop = false;
- public void start(final String appname, final String address){
-
- registryThread = new Thread(new Runnable() {
- @Override
- public void run() {
-
- // registry
- while (!toStop) {
- try {
- RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appname, address);
- for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {
- try {
- ReturnT
registryResult = adminBiz.registry(registryParam); -
- }
- } catch (Exception e) {
- if (!toStop) {
- logger.error(e.getMessage(), e);
- }
-
- }
-
-
- }
-
- // registry remove
- try {
- RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appname, address);
- for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {
- try {
- ReturnT
registryResult = adminBiz.registryRemove(registryParam); -
-
- }
- } catch (Exception e) {
- if (!toStop) {
- logger.error(e.getMessage(), e);
- }
- }
- logger.info(">>>>>>>>>>> xxl-job, executor registry thread destroy.");
-
- }
- });
- registryThread.setDaemon(true);
- registryThread.setName("xxl-job, executor ExecutorRegistryThread");
- registryThread.start();
- }
- @Controller
- @RequestMapping("/api")
- public class JobApiController {
-
- @Resource
- private AdminBiz adminBiz;
- /**
- * api
- *
- * @param uri
- * @param data
- * @return
- */
- @RequestMapping("/{uri}")
- @ResponseBody
- @PermissionLimit(limit=false)
- public ReturnT
api(HttpServletRequest request, @PathVariable("uri") String uri, @RequestBody(required = false) String data) { - // services mapping
- RegistryParam registryParam = GsonTool.fromJson(data, RegistryParam.class);
- return adminBiz.registry(registryParam);
-
- }
-
- }
- */
- public class JobRegistryHelper {
-
- public ReturnT
registry(RegistryParam registryParam) { -
- // valid
- if (!StringUtils.hasText(registryParam.getRegistryGroup())
- || !StringUtils.hasText(registryParam.getRegistryKey())
- || !StringUtils.hasText(registryParam.getRegistryValue())) {
- return new ReturnT
(ReturnT.FAIL_CODE, "Illegal Argument."); - }
-
- // async execute
- registryOrRemoveThreadPool.execute(new Runnable() {
- @Override
- public void run() {
- int ret = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().registryUpdate(registryParam.getRegistryGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue(), new Date());
- if (ret < 1) {
- XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().registrySave(registryParam.getRegistryGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue(), new Date());
-
- // fresh
- freshGroupRegistryInfo(registryParam);
- }
- }
- });
-
- return ReturnT.SUCCESS;
- }
-
- }
a.ExecutorRegistryThread,内部启动一个线程registryThread
1.setDaemon(ture)为守护线程,程序关闭不受影响
2.toStop在程序关闭的时候,注销注册的执行器,以免后续调度器调用无效执行器
b.服务端,接收到http请求,将注册信息,存入到数据库,供后面,调度器调度