parent
d51127a9b7
commit
1d2b10c2a5
@ -0,0 +1,99 @@
|
||||
package com.xxl.job.admin.core.complete;
|
||||
|
||||
import com.xxl.job.admin.core.conf.XxlJobAdminConfig;
|
||||
import com.xxl.job.admin.core.model.XxlJobInfo;
|
||||
import com.xxl.job.admin.core.model.XxlJobLog;
|
||||
import com.xxl.job.admin.core.thread.JobTriggerPoolHelper;
|
||||
import com.xxl.job.admin.core.trigger.TriggerTypeEnum;
|
||||
import com.xxl.job.admin.core.util.I18nUtil;
|
||||
import com.xxl.job.core.biz.model.ReturnT;
|
||||
import com.xxl.job.core.context.XxlJobContext;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.text.MessageFormat;
|
||||
|
||||
/**
|
||||
* @author xuxueli 2020-10-30 20:43:10
|
||||
*/
|
||||
public class XxlJobCompleter {
|
||||
private static Logger logger = LoggerFactory.getLogger(XxlJobCompleter.class);
|
||||
|
||||
/**
|
||||
* common fresh handle entrance (limit only once)
|
||||
*
|
||||
* @param xxlJobLog
|
||||
* @return
|
||||
*/
|
||||
public static int updateHandleInfoAndFinish(XxlJobLog xxlJobLog) {
|
||||
|
||||
// finish
|
||||
finishJob(xxlJobLog);
|
||||
|
||||
// text最大64kb 避免长度过长
|
||||
if (xxlJobLog.getHandleMsg().length() > 15000) {
|
||||
xxlJobLog.setHandleMsg( xxlJobLog.getHandleMsg().substring(0, 15000) );
|
||||
}
|
||||
|
||||
// fresh handle
|
||||
return XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateHandleInfo(xxlJobLog);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* do somethind to finish job
|
||||
*/
|
||||
private static void finishJob(XxlJobLog xxlJobLog){
|
||||
|
||||
// 1、handle success, to trigger child job
|
||||
String triggerChildMsg = null;
|
||||
if (XxlJobContext.HANDLE_CODE_SUCCESS == xxlJobLog.getHandleCode()) {
|
||||
XxlJobInfo xxlJobInfo = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(xxlJobLog.getJobId());
|
||||
if (xxlJobInfo!=null && xxlJobInfo.getChildJobId()!=null && xxlJobInfo.getChildJobId().trim().length()>0) {
|
||||
triggerChildMsg = "<br><br><span style=\"color:#00c0ef;\" > >>>>>>>>>>>"+ I18nUtil.getString("jobconf_trigger_child_run") +"<<<<<<<<<<< </span><br>";
|
||||
|
||||
String[] childJobIds = xxlJobInfo.getChildJobId().split(",");
|
||||
for (int i = 0; i < childJobIds.length; i++) {
|
||||
int childJobId = (childJobIds[i]!=null && childJobIds[i].trim().length()>0 && isNumeric(childJobIds[i]))?Integer.valueOf(childJobIds[i]):-1;
|
||||
if (childJobId > 0) {
|
||||
|
||||
JobTriggerPoolHelper.trigger(childJobId, TriggerTypeEnum.PARENT, -1, null, null, null);
|
||||
ReturnT<String> triggerChildResult = ReturnT.SUCCESS;
|
||||
|
||||
// add msg
|
||||
triggerChildMsg += MessageFormat.format(I18nUtil.getString("jobconf_callback_child_msg1"),
|
||||
(i+1),
|
||||
childJobIds.length,
|
||||
childJobIds[i],
|
||||
(triggerChildResult.getCode()==ReturnT.SUCCESS_CODE?I18nUtil.getString("system_success"):I18nUtil.getString("system_fail")),
|
||||
triggerChildResult.getMsg());
|
||||
} else {
|
||||
triggerChildMsg += MessageFormat.format(I18nUtil.getString("jobconf_callback_child_msg2"),
|
||||
(i+1),
|
||||
childJobIds.length,
|
||||
childJobIds[i]);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
if (triggerChildMsg != null) {
|
||||
xxlJobLog.setHandleMsg( xxlJobLog.getHandleMsg() + triggerChildMsg );
|
||||
}
|
||||
|
||||
// 2、fix_delay trigger next
|
||||
// on the way
|
||||
|
||||
}
|
||||
|
||||
private static boolean isNumeric(String str){
|
||||
try {
|
||||
int result = Integer.valueOf(str);
|
||||
return true;
|
||||
} catch (NumberFormatException e) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,39 @@
|
||||
package com.xxl.job.admin.core.scheduler;
|
||||
|
||||
import com.xxl.job.admin.core.util.I18nUtil;
|
||||
|
||||
/**
|
||||
* @author xuxueli 2020-10-29 21:11:23
|
||||
*/
|
||||
public enum MisfireStrategyEnum {
|
||||
|
||||
/**
|
||||
* do nothing
|
||||
*/
|
||||
DO_NOTHING(I18nUtil.getString("misfire_strategy_do_nothing")),
|
||||
|
||||
/**
|
||||
* fire once now
|
||||
*/
|
||||
FIRE_ONCE_NOW(I18nUtil.getString("misfire_strategy_fire_once_now"));
|
||||
|
||||
private String title;
|
||||
|
||||
MisfireStrategyEnum(String title) {
|
||||
this.title = title;
|
||||
}
|
||||
|
||||
public String getTitle() {
|
||||
return title;
|
||||
}
|
||||
|
||||
public static MisfireStrategyEnum match(String name, MisfireStrategyEnum defaultItem){
|
||||
for (MisfireStrategyEnum item: MisfireStrategyEnum.values()) {
|
||||
if (item.name().equals(name)) {
|
||||
return item;
|
||||
}
|
||||
}
|
||||
return defaultItem;
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,204 @@
|
||||
package com.xxl.job.admin.core.thread;
|
||||
|
||||
import com.xxl.job.admin.core.conf.XxlJobAdminConfig;
|
||||
import com.xxl.job.admin.core.model.XxlJobGroup;
|
||||
import com.xxl.job.admin.core.model.XxlJobRegistry;
|
||||
import com.xxl.job.core.biz.model.RegistryParam;
|
||||
import com.xxl.job.core.biz.model.ReturnT;
|
||||
import com.xxl.job.core.enums.RegistryConfig;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.util.StringUtils;
|
||||
|
||||
import java.util.*;
|
||||
import java.util.concurrent.*;
|
||||
|
||||
/**
|
||||
* job registry instance
|
||||
* @author xuxueli 2016-10-02 19:10:24
|
||||
*/
|
||||
public class JobRegistryHelper {
|
||||
private static Logger logger = LoggerFactory.getLogger(JobRegistryHelper.class);
|
||||
|
||||
private static JobRegistryHelper instance = new JobRegistryHelper();
|
||||
public static JobRegistryHelper getInstance(){
|
||||
return instance;
|
||||
}
|
||||
|
||||
private ThreadPoolExecutor registryOrRemoveThreadPool = null;
|
||||
private Thread registryMonitorThread;
|
||||
private volatile boolean toStop = false;
|
||||
|
||||
public void start(){
|
||||
|
||||
// for registry or remove
|
||||
registryOrRemoveThreadPool = new ThreadPoolExecutor(
|
||||
2,
|
||||
10,
|
||||
30L,
|
||||
TimeUnit.SECONDS,
|
||||
new LinkedBlockingQueue<Runnable>(2000),
|
||||
new ThreadFactory() {
|
||||
@Override
|
||||
public Thread newThread(Runnable r) {
|
||||
return new Thread(r, "xxl-job, admin JobRegistryMonitorHelper-registryOrRemoveThreadPool-" + r.hashCode());
|
||||
}
|
||||
},
|
||||
new RejectedExecutionHandler() {
|
||||
@Override
|
||||
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
|
||||
r.run();
|
||||
logger.warn(">>>>>>>>>>> xxl-job, registry or remove too fast, match threadpool rejected handler(run now).");
|
||||
}
|
||||
});
|
||||
|
||||
// for monitor
|
||||
registryMonitorThread = new Thread(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
while (!toStop) {
|
||||
try {
|
||||
// auto registry group
|
||||
List<XxlJobGroup> groupList = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().findByAddressType(0);
|
||||
if (groupList!=null && !groupList.isEmpty()) {
|
||||
|
||||
// remove dead address (admin/executor)
|
||||
List<Integer> ids = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findDead(RegistryConfig.DEAD_TIMEOUT, new Date());
|
||||
if (ids!=null && ids.size()>0) {
|
||||
XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().removeDead(ids);
|
||||
}
|
||||
|
||||
// fresh online address (admin/executor)
|
||||
HashMap<String, List<String>> appAddressMap = new HashMap<String, List<String>>();
|
||||
List<XxlJobRegistry> list = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findAll(RegistryConfig.DEAD_TIMEOUT, new Date());
|
||||
if (list != null) {
|
||||
for (XxlJobRegistry item: list) {
|
||||
if (RegistryConfig.RegistType.EXECUTOR.name().equals(item.getRegistryGroup())) {
|
||||
String appname = item.getRegistryKey();
|
||||
List<String> registryList = appAddressMap.get(appname);
|
||||
if (registryList == null) {
|
||||
registryList = new ArrayList<String>();
|
||||
}
|
||||
|
||||
if (!registryList.contains(item.getRegistryValue())) {
|
||||
registryList.add(item.getRegistryValue());
|
||||
}
|
||||
appAddressMap.put(appname, registryList);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// fresh group address
|
||||
for (XxlJobGroup group: groupList) {
|
||||
List<String> registryList = appAddressMap.get(group.getAppname());
|
||||
String addressListStr = null;
|
||||
if (registryList!=null && !registryList.isEmpty()) {
|
||||
Collections.sort(registryList);
|
||||
StringBuilder addressListSB = new StringBuilder();
|
||||
for (String item:registryList) {
|
||||
addressListSB.append(item).append(",");
|
||||
}
|
||||
addressListStr = addressListSB.toString();
|
||||
addressListStr = addressListStr.substring(0, addressListStr.length()-1);
|
||||
}
|
||||
group.setAddressList(addressListStr);
|
||||
group.setUpdateTime(new Date());
|
||||
|
||||
XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().update(group);
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
if (!toStop) {
|
||||
logger.error(">>>>>>>>>>> xxl-job, job registry monitor thread error:{}", e);
|
||||
}
|
||||
}
|
||||
try {
|
||||
TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);
|
||||
} catch (InterruptedException e) {
|
||||
if (!toStop) {
|
||||
logger.error(">>>>>>>>>>> xxl-job, job registry monitor thread error:{}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
logger.info(">>>>>>>>>>> xxl-job, job registry monitor thread stop");
|
||||
}
|
||||
});
|
||||
registryMonitorThread.setDaemon(true);
|
||||
registryMonitorThread.setName("xxl-job, admin JobRegistryMonitorHelper-registryMonitorThread");
|
||||
registryMonitorThread.start();
|
||||
}
|
||||
|
||||
public void toStop(){
|
||||
toStop = true;
|
||||
|
||||
// stop registryOrRemoveThreadPool
|
||||
registryOrRemoveThreadPool.shutdownNow();
|
||||
|
||||
// stop monitir (interrupt and wait)
|
||||
registryMonitorThread.interrupt();
|
||||
try {
|
||||
registryMonitorThread.join();
|
||||
} catch (InterruptedException e) {
|
||||
logger.error(e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// ---------------------- helper ----------------------
|
||||
|
||||
public ReturnT<String> registry(RegistryParam registryParam) {
|
||||
|
||||
// valid
|
||||
if (!StringUtils.hasText(registryParam.getRegistryGroup())
|
||||
|| !StringUtils.hasText(registryParam.getRegistryKey())
|
||||
|| !StringUtils.hasText(registryParam.getRegistryValue())) {
|
||||
return new ReturnT<String>(ReturnT.FAIL_CODE, "Illegal Argument.");
|
||||
}
|
||||
|
||||
// async execute
|
||||
registryOrRemoveThreadPool.execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
int ret = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().registryUpdate(registryParam.getRegistryGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue(), new Date());
|
||||
if (ret < 1) {
|
||||
XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().registrySave(registryParam.getRegistryGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue(), new Date());
|
||||
|
||||
// fresh
|
||||
freshGroupRegistryInfo(registryParam);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
return ReturnT.SUCCESS;
|
||||
}
|
||||
|
||||
public ReturnT<String> registryRemove(RegistryParam registryParam) {
|
||||
|
||||
// valid
|
||||
if (!StringUtils.hasText(registryParam.getRegistryGroup())
|
||||
|| !StringUtils.hasText(registryParam.getRegistryKey())
|
||||
|| !StringUtils.hasText(registryParam.getRegistryValue())) {
|
||||
return new ReturnT<String>(ReturnT.FAIL_CODE, "Illegal Argument.");
|
||||
}
|
||||
|
||||
// async execute
|
||||
registryOrRemoveThreadPool.execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
int ret = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().registryDelete(registryParam.getRegistryGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue());
|
||||
if (ret > 0) {
|
||||
// fresh
|
||||
freshGroupRegistryInfo(registryParam);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
return ReturnT.SUCCESS;
|
||||
}
|
||||
|
||||
private void freshGroupRegistryInfo(RegistryParam registryParam){
|
||||
// Under consideration, prevent affecting core tables
|
||||
}
|
||||
|
||||
|
||||
}
|
@ -1,111 +0,0 @@
|
||||
package com.xxl.job.admin.core.thread;
|
||||
|
||||
import com.xxl.job.admin.core.conf.XxlJobAdminConfig;
|
||||
import com.xxl.job.admin.core.model.XxlJobGroup;
|
||||
import com.xxl.job.admin.core.model.XxlJobRegistry;
|
||||
import com.xxl.job.core.enums.RegistryConfig;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.*;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/**
|
||||
* job registry instance
|
||||
* @author xuxueli 2016-10-02 19:10:24
|
||||
*/
|
||||
public class JobRegistryMonitorHelper {
|
||||
private static Logger logger = LoggerFactory.getLogger(JobRegistryMonitorHelper.class);
|
||||
|
||||
private static JobRegistryMonitorHelper instance = new JobRegistryMonitorHelper();
|
||||
public static JobRegistryMonitorHelper getInstance(){
|
||||
return instance;
|
||||
}
|
||||
|
||||
private Thread registryThread;
|
||||
private volatile boolean toStop = false;
|
||||
public void start(){
|
||||
registryThread = new Thread(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
while (!toStop) {
|
||||
try {
|
||||
// auto registry group
|
||||
List<XxlJobGroup> groupList = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().findByAddressType(0);
|
||||
if (groupList!=null && !groupList.isEmpty()) {
|
||||
|
||||
// remove dead address (admin/executor)
|
||||
List<Integer> ids = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findDead(RegistryConfig.DEAD_TIMEOUT, new Date());
|
||||
if (ids!=null && ids.size()>0) {
|
||||
XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().removeDead(ids);
|
||||
}
|
||||
|
||||
// fresh online address (admin/executor)
|
||||
HashMap<String, List<String>> appAddressMap = new HashMap<String, List<String>>();
|
||||
List<XxlJobRegistry> list = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findAll(RegistryConfig.DEAD_TIMEOUT, new Date());
|
||||
if (list != null) {
|
||||
for (XxlJobRegistry item: list) {
|
||||
if (RegistryConfig.RegistType.EXECUTOR.name().equals(item.getRegistryGroup())) {
|
||||
String appname = item.getRegistryKey();
|
||||
List<String> registryList = appAddressMap.get(appname);
|
||||
if (registryList == null) {
|
||||
registryList = new ArrayList<String>();
|
||||
}
|
||||
|
||||
if (!registryList.contains(item.getRegistryValue())) {
|
||||
registryList.add(item.getRegistryValue());
|
||||
}
|
||||
appAddressMap.put(appname, registryList);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// fresh group address
|
||||
for (XxlJobGroup group: groupList) {
|
||||
List<String> registryList = appAddressMap.get(group.getAppname());
|
||||
String addressListStr = null;
|
||||
if (registryList!=null && !registryList.isEmpty()) {
|
||||
Collections.sort(registryList);
|
||||
addressListStr = "";
|
||||
for (String item:registryList) {
|
||||
addressListStr += item + ",";
|
||||
}
|
||||
addressListStr = addressListStr.substring(0, addressListStr.length()-1);
|
||||
}
|
||||
group.setAddressList(addressListStr);
|
||||
XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().update(group);
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
if (!toStop) {
|
||||
logger.error(">>>>>>>>>>> xxl-job, job registry monitor thread error:{}", e);
|
||||
}
|
||||
}
|
||||
try {
|
||||
TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);
|
||||
} catch (InterruptedException e) {
|
||||
if (!toStop) {
|
||||
logger.error(">>>>>>>>>>> xxl-job, job registry monitor thread error:{}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
logger.info(">>>>>>>>>>> xxl-job, job registry monitor thread stop");
|
||||
}
|
||||
});
|
||||
registryThread.setDaemon(true);
|
||||
registryThread.setName("xxl-job, admin JobRegistryMonitorHelper");
|
||||
registryThread.start();
|
||||
}
|
||||
|
||||
public void toStop(){
|
||||
toStop = true;
|
||||
// interrupt and wait
|
||||
registryThread.interrupt();
|
||||
try {
|
||||
registryThread.join();
|
||||
} catch (InterruptedException e) {
|
||||
logger.error(e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
Loading…
Reference in new issue