社区
https://www.xuxueli.com/xxl-job/
Gitee
https://gitee.com/xuxueli0323/xxl-job
设计思想
将调度行为抽象形成 “调度中心” 公共平台,而平台自身并不承担业务逻辑,“调度中心” 负责发起调度请求
将任务抽象成分散的 JobHandler,交由 “执行器” 统一管理,“执行器” 负责接收调度请求并执行对应的 JobHandler 中业务逻辑
因此,“调度” 和 “任务” 两部分可以相互解耦,提高系统整体稳定性和扩展性
部署调度中心
源码编译打包
根目录下 mvn clean package
xxl-job-admin/target/xxl-job-admin-2.3.1-SNAPSHOT.jar
运行 java -jar xxl-job-admin-2.3.1-SNAPSHOT.jar
Docker
Docker 镜像方式搭建调度中心:
1
| // Docker地址:https://hub.docker.com/r/xuxueli/xxl-job-admin/ (建议指定版本号)docker pull xuxueli/xxl-job-admin
|
1
| docker run -p 8080:8080 -v /tmp:/data/applogs --name xxl-job-admin -d xuxueli/xxl-job-admin:{指定版本}/*** 如需自定义 mysql 等配置,可通过 "-e PARAMS" 指定,参数格式 PARAMS="--key=value --key2=value2" ;* 配置项参考文件:/xxl-job/xxl-job-admin/src/main/resources/application.properties* 如需自定义 JVM内存参数 等配置,可通过 "-e JAVA_OPTS" 指定,参数格式 JAVA_OPTS="-Xmx512m" ;*/docker run -e PARAMS="--spring.datasource.url=jdbc:mysql://127.0.0.1:3306/xxl_job?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&serverTimezone=Asia/Shanghai" -p 8080:8080 -v /tmp:/data/applogs --name xxl-job-admin -d xuxueli/xxl-job-admin:{指定版本}
|
执行器-SpringBoot
参考源码中 xxl-job-executor-samples/xxl-job-executor-sample-springboot
pom.xml
1 2 3 4 5 6
| <dependency> <groupId>com.xuxueli</groupId> <artifactId>xxl-job-core</artifactId> <version>2.1.1-SNAPSHOT</version> </dependency>
|
logback.xml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| <?xml version="1.0" encoding="UTF-8"?> <configuration debug="false" scan="true" scanPeriod="1 seconds">
<contextName>logback</contextName> <property name="log.path" value="./xxl-job-executor-sample-springboot.log"/>
<appender name="console" class="ch.qos.logback.core.ConsoleAppender"> <encoder> <pattern>%d{HH:mm:ss.SSS} %contextName [%thread] %-5level %logger{36} - %msg%n</pattern> </encoder> </appender>
<appender name="file" class="ch.qos.logback.core.rolling.RollingFileAppender"> <file>${log.path}</file> <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy"> <fileNamePattern>${log.path}.%d{yyyy-MM-dd}.zip</fileNamePattern> </rollingPolicy> <encoder> <pattern>%date %level [%thread] %logger{36} [%file : %line] %msg%n </pattern> </encoder> </appender>
<root level="info"> <appender-ref ref="console"/> <appender-ref ref="file"/> </root>
</configuration>
|
application.properties
加入配置,需修改或自定义
- xxl-job admin 地址
- xxl.job.executor.appname 自定义名称,后台配置必须对应
- xxl.job.executor.ip 当前电脑 Ip,或部署项目的电脑 Ip
- xxl.job.executor.port 端口
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| server.port=8081
logging.config=classpath:logback.xml
xxl.job.admin.addresses=http://127.0.0.1:8080/xxl-job-admin
xxl.job.accessToken=
xxl.job.executor.appname=xxl-job-executor-sample
xxl.job.executor.address=
xxl.job.executor.ip=
xxl.job.executor.port=9999
xxl.job.executor.logpath=/data/applogs/xxl-job/jobhandler
xxl.job.executor.logretentiondays=30
|
XxlJobConfig.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76
| package com.xxl.job.executor.core.config;
import com.xxl.job.core.executor.impl.XxlJobSpringExecutor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;
@Configuration public class XxlJobConfig { private Logger logger = LoggerFactory.getLogger(XxlJobConfig.class);
@Value("${xxl.job.admin.addresses}") private String adminAddresses;
@Value("${xxl.job.accessToken}") private String accessToken;
@Value("${xxl.job.executor.appname}") private String appname;
@Value("${xxl.job.executor.address}") private String address;
@Value("${xxl.job.executor.ip}") private String ip;
@Value("${xxl.job.executor.port}") private int port;
@Value("${xxl.job.executor.logpath}") private String logPath;
@Value("${xxl.job.executor.logretentiondays}") private int logRetentionDays;
@Bean public XxlJobSpringExecutor xxlJobExecutor() { logger.info(">>>>>>>>>>> xxl-job config init."); XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor(); xxlJobSpringExecutor.setAdminAddresses(adminAddresses); xxlJobSpringExecutor.setAppname(appname); xxlJobSpringExecutor.setAddress(address); xxlJobSpringExecutor.setIp(ip); xxlJobSpringExecutor.setPort(port); xxlJobSpringExecutor.setAccessToken(accessToken); xxlJobSpringExecutor.setLogPath(logPath); xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);
return xxlJobSpringExecutor; }
}
|
SampleXxlJob.java - 任务 job
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251
| package com.xxl.job.executor.service.jobhandler;
import com.xxl.job.core.context.XxlJobHelper; import com.xxl.job.core.handler.annotation.XxlJob; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component;
import java.io.BufferedInputStream; import java.io.BufferedReader; import java.io.DataOutputStream; import java.io.InputStreamReader; import java.net.HttpURLConnection; import java.net.URL; import java.util.Arrays; import java.util.concurrent.TimeUnit;
@Component public class SampleXxlJob { private static Logger logger = LoggerFactory.getLogger(SampleXxlJob.class);
@XxlJob("demoJobHandler") public void demoJobHandler() throws Exception { XxlJobHelper.log("XXL-JOB, Hello World.");
for (int i = 0; i < 5; i++) { XxlJobHelper.log("beat at:" + i); TimeUnit.SECONDS.sleep(2); } }
@XxlJob("shardingJobHandler") public void shardingJobHandler() throws Exception {
int shardIndex = XxlJobHelper.getShardIndex(); int shardTotal = XxlJobHelper.getShardTotal();
XxlJobHelper.log("分片参数:当前分片序号 = {}, 总分片数 = {}", shardIndex, shardTotal);
for (int i = 0; i < shardTotal; i++) { if (i == shardIndex) { XxlJobHelper.log("第 {} 片, 命中分片开始处理", i); } else { XxlJobHelper.log("第 {} 片, 忽略", i); } }
}
@XxlJob("commandJobHandler") public void commandJobHandler() throws Exception { String command = XxlJobHelper.getJobParam(); int exitValue = -1;
BufferedReader bufferedReader = null; try { ProcessBuilder processBuilder = new ProcessBuilder(); processBuilder.command(command); processBuilder.redirectErrorStream(true);
Process process = processBuilder.start();
BufferedInputStream bufferedInputStream = new BufferedInputStream(process.getInputStream()); bufferedReader = new BufferedReader(new InputStreamReader(bufferedInputStream));
String line; while ((line = bufferedReader.readLine()) != null) { XxlJobHelper.log(line); }
process.waitFor(); exitValue = process.exitValue(); } catch (Exception e) { XxlJobHelper.log(e); } finally { if (bufferedReader != null) { bufferedReader.close(); } }
if (exitValue == 0) { } else { XxlJobHelper.handleFail("command exit value("+exitValue+") is failed"); }
}
@XxlJob("httpJobHandler") public void httpJobHandler() throws Exception {
String param = XxlJobHelper.getJobParam(); if (param==null || param.trim().length()==0) { XxlJobHelper.log("param["+ param +"] invalid.");
XxlJobHelper.handleFail(); return; }
String[] httpParams = param.split("\n"); String url = null; String method = null; String data = null; for (String httpParam: httpParams) { if (httpParam.startsWith("url:")) { url = httpParam.substring(httpParam.indexOf("url:") + 4).trim(); } if (httpParam.startsWith("method:")) { method = httpParam.substring(httpParam.indexOf("method:") + 7).trim().toUpperCase(); } if (httpParam.startsWith("data:")) { data = httpParam.substring(httpParam.indexOf("data:") + 5).trim(); } }
if (url==null || url.trim().length()==0) { XxlJobHelper.log("url["+ url +"] invalid.");
XxlJobHelper.handleFail(); return; } if (method==null || !Arrays.asList("GET", "POST").contains(method)) { XxlJobHelper.log("method["+ method +"] invalid.");
XxlJobHelper.handleFail(); return; } boolean isPostMethod = method.equals("POST");
HttpURLConnection connection = null; BufferedReader bufferedReader = null; try { URL realUrl = new URL(url); connection = (HttpURLConnection) realUrl.openConnection();
connection.setRequestMethod(method); connection.setDoOutput(isPostMethod); connection.setDoInput(true); connection.setUseCaches(false); connection.setReadTimeout(5 * 1000); connection.setConnectTimeout(3 * 1000); connection.setRequestProperty("connection", "Keep-Alive"); connection.setRequestProperty("Content-Type", "application/json;charset=UTF-8"); connection.setRequestProperty("Accept-Charset", "application/json;charset=UTF-8");
connection.connect();
if (isPostMethod && data!=null && data.trim().length()>0) { DataOutputStream dataOutputStream = new DataOutputStream(connection.getOutputStream()); dataOutputStream.write(data.getBytes("UTF-8")); dataOutputStream.flush(); dataOutputStream.close(); }
int statusCode = connection.getResponseCode(); if (statusCode != 200) { throw new RuntimeException("Http Request StatusCode(" + statusCode + ") Invalid."); }
bufferedReader = new BufferedReader(new InputStreamReader(connection.getInputStream(), "UTF-8")); StringBuilder result = new StringBuilder(); String line; while ((line = bufferedReader.readLine()) != null) { result.append(line); } String responseMsg = result.toString();
XxlJobHelper.log(responseMsg);
return; } catch (Exception e) { XxlJobHelper.log(e);
XxlJobHelper.handleFail(); return; } finally { try { if (bufferedReader != null) { bufferedReader.close(); } if (connection != null) { connection.disconnect(); } } catch (Exception e2) { XxlJobHelper.log(e2); } }
}
@XxlJob(value = "demoJobHandler2", init = "init", destroy = "destroy") public void demoJobHandler2() throws Exception { XxlJobHelper.log("XXL-JOB, Hello World."); } public void init(){ logger.info("init"); } public void destroy(){ logger.info("destory"); } }
|
创建任务并执行
登录调度中心
http://localhost:8081/xxl-job-admin/ admin/123456
创建执行器:一个微服务可以对应一个执行器,各微服务执行器做下区分
任务管理-新增任务
启动