diff --git a/austin-common/src/main/java/com/java3y/austin/common/constant/ThreadPoolConstant.java b/austin-common/src/main/java/com/java3y/austin/common/constant/ThreadPoolConstant.java new file mode 100644 index 0000000..e413860 --- /dev/null +++ b/austin-common/src/main/java/com/java3y/austin/common/constant/ThreadPoolConstant.java @@ -0,0 +1,37 @@ +package com.java3y.austin.common.constant; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + +/** + * 线程池常见的常量信息 + * (仅供初始化的使用,代码里的线程池配置有可能被apollo动态修改) + * + * @author 3y + */ +public class ThreadPoolConstant { + + + /** + * small + */ + public static final Integer SINGLE_CORE_POOL_SIZE = 1; + public static final Integer SINGLE_MAX_POOL_SIZE = 1; + public static final Integer SMALL_KEEP_LIVE_TIME = 10; + + /** + * medium + */ + public static final Integer COMMON_CORE_POOL_SIZE = 2; + public static final Integer COMMON_MAX_POOL_SIZE = 2; + public static final Integer COMMON_KEEP_LIVE_TIME = 60; + public static final Integer COMMON_QUEUE_SIZE = 20; + + + /** + * big + */ + public static final Integer BIG_QUEUE_SIZE = 1024; + public static final BlockingQueue BIG_BLOCKING_QUEUE = new LinkedBlockingQueue(BIG_QUEUE_SIZE); + +} diff --git a/austin-cron/src/main/java/com/java3y/austin/cron/config/AsyncConfiguration.java b/austin-cron/src/main/java/com/java3y/austin/cron/config/AsyncConfiguration.java deleted file mode 100644 index b2063a1..0000000 --- a/austin-cron/src/main/java/com/java3y/austin/cron/config/AsyncConfiguration.java +++ /dev/null @@ -1,49 +0,0 @@ -package com.java3y.austin.cron.config; - -import com.google.common.base.Throwables; -import lombok.extern.slf4j.Slf4j; -import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler; -import org.springframework.boot.autoconfigure.task.TaskExecutionAutoConfiguration; -import org.springframework.boot.context.properties.EnableConfigurationProperties; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; -import org.springframework.context.annotation.Primary; -import org.springframework.scheduling.annotation.AsyncConfigurer; -import org.springframework.scheduling.annotation.EnableAsync; -import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; - -/** - * 处理定时任务的线程池配置信息,为@Async注解服务 - * 自定义线程池配置 - * - * @author 3y - * @see TaskExecutionAutoConfiguration - */ -@Slf4j -@Configuration -@EnableAsync -@EnableConfigurationProperties(AsyncExecutionProperties.class) -public class AsyncConfiguration implements AsyncConfigurer { - @Bean("austinExecutor") - @Primary - public ThreadPoolTaskExecutor executor(AsyncExecutionProperties properties) { - ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); - executor.setCorePoolSize(properties.getCoreSize()); - executor.setMaxPoolSize(properties.getMaxSize()); - executor.setKeepAliveSeconds(properties.getKeepAlive()); - executor.setQueueCapacity(properties.getQueueCapacity()); - executor.setThreadNamePrefix(properties.getThreadNamePrefix()); - executor.setRejectedExecutionHandler(properties.getRejectedHandler().getHandler()); - executor.setAllowCoreThreadTimeOut(properties.isAllowCoreThreadTimeout()); - executor.setWaitForTasksToCompleteOnShutdown(properties.isWaitForTasksToCompleteOnShutDown()); - executor.setAwaitTerminationSeconds(properties.getAwaitTerminationSeconds()); - executor.initialize(); - return executor; - } - - - @Override - public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() { - return (ex, method, params) -> log.error("austinExecutor execute fail!method:{},params:{},ex:{}", method, params, Throwables.getStackTraceAsString(ex)); - } -} diff --git a/austin-cron/src/main/java/com/java3y/austin/cron/config/AsyncExecutionProperties.java b/austin-cron/src/main/java/com/java3y/austin/cron/config/AsyncExecutionProperties.java deleted file mode 100644 index 9d0aa4f..0000000 --- a/austin-cron/src/main/java/com/java3y/austin/cron/config/AsyncExecutionProperties.java +++ /dev/null @@ -1,116 +0,0 @@ -package com.java3y.austin.cron.config; - -import lombok.Data; -import org.springframework.boot.autoconfigure.task.TaskExecutionProperties; -import org.springframework.boot.context.properties.ConfigurationProperties; - -import javax.annotation.PostConstruct; -import java.util.concurrent.RejectedExecutionHandler; -import java.util.concurrent.ThreadPoolExecutor; - -/** - * @program: austin - * @description: spring 自定义线程池配置类 - * @author: WhyWhatHow - * @create: 2022-02-27 09:41 - * @see TaskExecutionProperties - **/ -@Data -@ConfigurationProperties("austin.async.task") -public class AsyncExecutionProperties { - /** - * 核心线程数,默认数量当前cpu核心线程数 - */ - int coreSize; - /** - * 最大线程数 ,默认coreSize*2 - */ - int maxSize; - /** - * 线程名前缀 eg: "austinAsyncExecutor-" - */ - private String threadNamePrefix = "austinAsyncExecutor-"; - - /** - * queue capacity - */ - private int queueCapacity = 1000; - - /** - * 线程最大存活时间,单位s - */ - private int keepAlive = 60; - - /** - * 是否允许核心线程超时 - */ - private boolean allowCoreThreadTimeout = false; - - /** - * 拒绝策略 ,默认callRun - */ - private RejectedEnum rejectedHandler = RejectedEnum.CALLRUNSPOLICY; - - - /** - * 是否在关机时等待任务完成 ,默认为true - */ - private boolean waitForTasksToCompleteOnShutDown = true; - - /** - * 阻止关机的最大秒数 ,默认10s - */ - private int awaitTerminationSeconds = 10; - - /** - * 初始化 核心线程数, 最大线程数, 以用户配置为主 - */ - @PostConstruct - void init() { - if (coreSize <= 0) { - this.coreSize = Runtime.getRuntime().availableProcessors(); - } - if (maxSize <= 0) { - this.maxSize = coreSize << 1; - } - } - - /** - * 拒绝策略枚举 - */ - public enum RejectedEnum { - /** - * 直接抛出异常 - */ - ABORTPOLICY(new ThreadPoolExecutor.AbortPolicy()), - /** - * 交给当前run_thread 运行 - */ - CALLRUNSPOLICY(new ThreadPoolExecutor.CallerRunsPolicy()), - /*** - * 直接丢掉 - */ - DISCARDPOLICY(new ThreadPoolExecutor.DiscardPolicy()), - /** - * 丢掉队列中排队时间最久的任务 - */ - DISCARDOLDESTPOLICY(new ThreadPoolExecutor.DiscardOldestPolicy()); - /** - * 线程池默认拒绝策略 - */ - private RejectedExecutionHandler handler; - - RejectedEnum(RejectedExecutionHandler handler) { - this.handler = handler; - } - - - public RejectedExecutionHandler getHandler() { - return handler; - } - - public void setHandler(RejectedExecutionHandler handler) { - this.handler = handler; - } - } -} diff --git a/austin-cron/src/main/java/com/java3y/austin/cron/config/CronAsyncThreadPoolConfig.java b/austin-cron/src/main/java/com/java3y/austin/cron/config/CronAsyncThreadPoolConfig.java new file mode 100644 index 0000000..d98563f --- /dev/null +++ b/austin-cron/src/main/java/com/java3y/austin/cron/config/CronAsyncThreadPoolConfig.java @@ -0,0 +1,64 @@ +package com.java3y.austin.cron.config; + +import cn.hutool.core.thread.ExecutorBuilder; +import com.dtp.common.em.QueueTypeEnum; +import com.dtp.common.em.RejectedTypeEnum; +import com.dtp.core.thread.DtpExecutor; +import com.dtp.core.thread.ThreadPoolBuilder; +import com.java3y.austin.common.constant.ThreadPoolConstant; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +/** + * @author 3y + * 动态线程池配置。实际的具体配置以apollo的为准!实际的具体配置以apollo的为准!实际的具体配置以apollo的为准 + */ +public class CronAsyncThreadPoolConfig { + + /** + * 接收到xxl-job请求的线程池名 + */ + public static final String EXECUTE_XXL_THREAD_POOL_NAME = "execute-xxl-thread-pool"; + + + /** + * 业务:消费pending队列实际的线程池 + * 配置:核心线程可以被回收,当线程池无被引用且无核心线程数,应当被回收 + * 动态线程池且被Spring管理:false + * @return + */ + public static ExecutorService getConsumePendingThreadPool() { + return ExecutorBuilder.create() + .setCorePoolSize(ThreadPoolConstant.COMMON_CORE_POOL_SIZE) + .setMaxPoolSize(ThreadPoolConstant.COMMON_MAX_POOL_SIZE) + .setWorkQueue(ThreadPoolConstant.BIG_BLOCKING_QUEUE) + .setHandler(new ThreadPoolExecutor.CallerRunsPolicy()) + .setAllowCoreThreadTimeOut(true) + .setKeepAliveTime(ThreadPoolConstant.SMALL_KEEP_LIVE_TIME, TimeUnit.SECONDS) + .build(); + } + + + /** + * 业务:接收到xxl-job请求的线程池 + * 配置:不丢弃消息,核心线程数不会随着keepAliveTime而减少(不会被回收) + * 动态线程池且被Spring管理:true + * + * @return + */ + public static DtpExecutor getXxlCronExecutor() { + return ThreadPoolBuilder.newBuilder() + .threadPoolName(EXECUTE_XXL_THREAD_POOL_NAME) + .corePoolSize(ThreadPoolConstant.COMMON_CORE_POOL_SIZE) + .maximumPoolSize(ThreadPoolConstant.COMMON_MAX_POOL_SIZE) + .keepAliveTime(ThreadPoolConstant.COMMON_KEEP_LIVE_TIME) + .timeUnit(TimeUnit.SECONDS) + .rejectedExecutionHandler(RejectedTypeEnum.CALLER_RUNS_POLICY.getName()) + .allowCoreThreadTimeOut(false) + .workQueue(QueueTypeEnum.VARIABLE_LINKED_BLOCKING_QUEUE.getName(), ThreadPoolConstant.COMMON_QUEUE_SIZE, false) + .buildDynamic(); + } + +} diff --git a/austin-cron/src/main/java/com/java3y/austin/cron/constants/PendingConstant.java b/austin-cron/src/main/java/com/java3y/austin/cron/constants/PendingConstant.java index e0a4dfb..9c87aa0 100644 --- a/austin-cron/src/main/java/com/java3y/austin/cron/constants/PendingConstant.java +++ b/austin-cron/src/main/java/com/java3y/austin/cron/constants/PendingConstant.java @@ -1,8 +1,5 @@ package com.java3y.austin.cron.constants; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; - /** * @author 3y * @date 2022/2/13 @@ -25,12 +22,5 @@ public class PendingConstant { */ public static final Long TIME_THRESHOLD = 1000L; - /** - * 真正消费线程池配置的信息 - */ - public static final Integer CORE_POOL_SIZE = 2; - public static final Integer MAX_POOL_SIZE = 2; - public static final Integer KEEP_LIVE_TIME = 20; - public static final BlockingQueue BLOCKING_QUEUE = new LinkedBlockingQueue<>(5); } diff --git a/austin-cron/src/main/java/com/java3y/austin/cron/csv/CountFileRowHandler.java b/austin-cron/src/main/java/com/java3y/austin/cron/csv/CountFileRowHandler.java new file mode 100644 index 0000000..3554121 --- /dev/null +++ b/austin-cron/src/main/java/com/java3y/austin/cron/csv/CountFileRowHandler.java @@ -0,0 +1,25 @@ +package com.java3y.austin.cron.csv; + +import cn.hutool.core.text.csv.CsvRow; +import cn.hutool.core.text.csv.CsvRowHandler; +import lombok.Data; + +/** + * @author 3y + * @date 2022/3/10 + * 统计当前文件有多少行 + */ +@Data +public class CountFileRowHandler implements CsvRowHandler { + + private long rowSize; + + @Override + public void handle(CsvRow row) { + rowSize++; + } + + public long getRowSize() { + return rowSize; + } +} diff --git a/austin-cron/src/main/java/com/java3y/austin/cron/handler/CronTaskHandler.java b/austin-cron/src/main/java/com/java3y/austin/cron/handler/CronTaskHandler.java index e0b11b3..c3ec149 100644 --- a/austin-cron/src/main/java/com/java3y/austin/cron/handler/CronTaskHandler.java +++ b/austin-cron/src/main/java/com/java3y/austin/cron/handler/CronTaskHandler.java @@ -1,6 +1,9 @@ package com.java3y.austin.cron.handler; +import com.dtp.core.thread.DtpExecutor; +import com.java3y.austin.cron.config.CronAsyncThreadPoolConfig; import com.java3y.austin.cron.service.TaskHandler; +import com.java3y.austin.support.utils.ThreadPoolUtils; import com.xxl.job.core.context.XxlJobHelper; import com.xxl.job.core.handler.annotation.XxlJob; import lombok.extern.slf4j.Slf4j; @@ -19,14 +22,21 @@ public class CronTaskHandler { @Autowired private TaskHandler taskHandler; + @Autowired + private ThreadPoolUtils threadPoolUtils; + private DtpExecutor dtpExecutor = CronAsyncThreadPoolConfig.getXxlCronExecutor(); + /** * 处理所有的 austin 定时任务消息 */ @XxlJob("austinJob") public void execute() { log.info("CronTaskHandler#execute messageTemplateId:{} cron exec!", XxlJobHelper.getJobParam()); + threadPoolUtils.register(dtpExecutor); + Long messageTemplateId = Long.valueOf(XxlJobHelper.getJobParam()); - taskHandler.handle(messageTemplateId); + dtpExecutor.execute(() -> taskHandler.handle(messageTemplateId)); + } } diff --git a/austin-cron/src/main/java/com/java3y/austin/cron/pending/CrowdBatchTaskPending.java b/austin-cron/src/main/java/com/java3y/austin/cron/pending/CrowdBatchTaskPending.java index bfa8a22..50339ab 100644 --- a/austin-cron/src/main/java/com/java3y/austin/cron/pending/CrowdBatchTaskPending.java +++ b/austin-cron/src/main/java/com/java3y/austin/cron/pending/CrowdBatchTaskPending.java @@ -2,9 +2,9 @@ package com.java3y.austin.cron.pending; import cn.hutool.core.collection.CollUtil; import cn.hutool.core.map.MapUtil; -import cn.hutool.core.thread.ExecutorBuilder; import cn.hutool.core.util.StrUtil; import com.google.common.collect.Lists; +import com.java3y.austin.cron.config.CronAsyncThreadPoolConfig; import com.java3y.austin.cron.constants.PendingConstant; import com.java3y.austin.cron.vo.CrowdInfoVo; import com.java3y.austin.service.api.domain.BatchSendRequest; @@ -23,8 +23,6 @@ import org.springframework.stereotype.Component; import java.util.List; import java.util.Map; import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; /** * 延迟批量处理人群信息 @@ -45,15 +43,7 @@ public class CrowdBatchTaskPending extends AbstractLazyPending { pendingParam.setNumThreshold(PendingConstant.NUM_THRESHOLD) .setQueue(new LinkedBlockingQueue(PendingConstant.QUEUE_SIZE)) .setTimeThreshold(PendingConstant.TIME_THRESHOLD) - .setExecutorService(ExecutorBuilder.create() - .setCorePoolSize(PendingConstant.CORE_POOL_SIZE) - .setMaxPoolSize(PendingConstant.MAX_POOL_SIZE) - .setWorkQueue(PendingConstant.BLOCKING_QUEUE) - .setHandler(new ThreadPoolExecutor.CallerRunsPolicy()) - .setAllowCoreThreadTimeOut(true) - .setKeepAliveTime(PendingConstant.KEEP_LIVE_TIME, TimeUnit.SECONDS) - .build()); - + .setExecutorService(CronAsyncThreadPoolConfig.getConsumePendingThreadPool()); this.pendingParam = pendingParam; } @@ -89,4 +79,5 @@ public class CrowdBatchTaskPending extends AbstractLazyPending { .build(); sendService.batchSend(batchSendRequest); } + } diff --git a/austin-cron/src/main/java/com/java3y/austin/cron/service/impl/TaskHandlerImpl.java b/austin-cron/src/main/java/com/java3y/austin/cron/service/impl/TaskHandlerImpl.java index 974e434..9c52acb 100644 --- a/austin-cron/src/main/java/com/java3y/austin/cron/service/impl/TaskHandlerImpl.java +++ b/austin-cron/src/main/java/com/java3y/austin/cron/service/impl/TaskHandlerImpl.java @@ -1,17 +1,19 @@ package com.java3y.austin.cron.service.impl; import cn.hutool.core.collection.CollUtil; +import cn.hutool.core.text.csv.CsvRow; import cn.hutool.core.util.StrUtil; +import com.java3y.austin.cron.csv.CountFileRowHandler; import com.java3y.austin.cron.pending.CrowdBatchTaskPending; import com.java3y.austin.cron.service.TaskHandler; import com.java3y.austin.cron.utils.ReadFileUtils; import com.java3y.austin.cron.vo.CrowdInfoVo; import com.java3y.austin.support.dao.MessageTemplateDao; import com.java3y.austin.support.domain.MessageTemplate; +import com.java3y.austin.support.pending.AbstractLazyPending; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.ApplicationContext; -import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Service; import java.util.HashMap; @@ -29,10 +31,9 @@ public class TaskHandlerImpl implements TaskHandler { @Autowired private ApplicationContext context; + @Override - @Async public void handle(Long messageTemplateId) { - log.info("TaskHandler handle:{}", Thread.currentThread().getName()); MessageTemplate messageTemplate = messageTemplateDao.findById(messageTemplateId).get(); if (messageTemplate == null || StrUtil.isBlank(messageTemplate.getCronCrowdPath())) { @@ -40,19 +41,42 @@ public class TaskHandlerImpl implements TaskHandler { return; } + // 1. 获取文件行数大小 + long countCsvRow = ReadFileUtils.countCsvRow(messageTemplate.getCronCrowdPath(), new CountFileRowHandler()); + + // 2. 读取文件得到每一行记录给到队列做lazy batch处理 CrowdBatchTaskPending crowdBatchTaskPending = context.getBean(CrowdBatchTaskPending.class); - // 读取文件得到每一行记录给到队列做lazy batch处理 ReadFileUtils.getCsvRow(messageTemplate.getCronCrowdPath(), row -> { if (CollUtil.isEmpty(row.getFieldMap()) || StrUtil.isBlank(row.getFieldMap().get(ReadFileUtils.RECEIVER_KEY))) { return; } + + // 3. 每一行处理交给LazyPending HashMap params = ReadFileUtils.getParamFromLine(row.getFieldMap()); CrowdInfoVo crowdInfoVo = CrowdInfoVo.builder().receiver(row.getFieldMap().get(ReadFileUtils.RECEIVER_KEY)) .params(params).messageTemplateId(messageTemplateId).build(); crowdBatchTaskPending.pending(crowdInfoVo); - }); + // 4. 判断是否读取文件完成回收资源且更改状态 + onComplete(row, countCsvRow, crowdBatchTaskPending, messageTemplateId); + }); } + /** + * 文件遍历结束时 + * 1. 暂停单线程池消费(最后会回收线程池资源) + * 2. 更改消息模板的状态(暂未实现) + * + * @param row + * @param countCsvRow + * @param crowdBatchTaskPending + * @param messageTemplateId + */ + private void onComplete(CsvRow row, long countCsvRow, AbstractLazyPending crowdBatchTaskPending, Long messageTemplateId) { + if (row.getOriginalLineNumber() == countCsvRow) { + crowdBatchTaskPending.setStop(true); + log.info("messageTemplate:[{}] read csv file complete!", messageTemplateId); + } + } } diff --git a/austin-cron/src/main/java/com/java3y/austin/cron/utils/ReadFileUtils.java b/austin-cron/src/main/java/com/java3y/austin/cron/utils/ReadFileUtils.java index 4aae38a..39d21f8 100644 --- a/austin-cron/src/main/java/com/java3y/austin/cron/utils/ReadFileUtils.java +++ b/austin-cron/src/main/java/com/java3y/austin/cron/utils/ReadFileUtils.java @@ -4,6 +4,7 @@ import cn.hutool.core.io.FileUtil; import cn.hutool.core.map.MapUtil; import cn.hutool.core.text.csv.*; import com.google.common.base.Throwables; +import com.java3y.austin.cron.csv.CountFileRowHandler; import com.java3y.austin.cron.vo.CrowdInfoVo; import lombok.extern.slf4j.Slf4j; @@ -45,9 +46,28 @@ public class ReadFileUtils { } } + /** + * 读取csv文件,获取文件里的行数 + * + * @param path + * @param countFileRowHandler + */ + public static long countCsvRow(String path, CountFileRowHandler countFileRowHandler) { + try { + // 把首行当做是标题,获取reader + CsvReader reader = CsvUtil.getReader(new FileReader(path), + new CsvReadConfig().setContainsHeader(true)); + reader.read(countFileRowHandler); + } catch (Exception e) { + log.error("ReadFileUtils#getCsvRow fail!{}", Throwables.getStackTraceAsString(e)); + } + return countFileRowHandler.getRowSize(); + } + /** * 从文件的每一行数据获取到params信息 * [{key:value},{key:value}] + * * @param fieldMap * @return */ diff --git a/austin-support/src/main/java/com/java3y/austin/support/config/SupportThreadPoolConfig.java b/austin-support/src/main/java/com/java3y/austin/support/config/SupportThreadPoolConfig.java new file mode 100644 index 0000000..9474209 --- /dev/null +++ b/austin-support/src/main/java/com/java3y/austin/support/config/SupportThreadPoolConfig.java @@ -0,0 +1,33 @@ +package com.java3y.austin.support.config; + +import cn.hutool.core.thread.ExecutorBuilder; +import com.java3y.austin.common.constant.ThreadPoolConstant; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +/** + * @author 3y + * support 线程池配置类 + * + */ +public class SupportThreadPoolConfig { + + /** + * 业务:实现pending队列的单线程池 + * 配置:核心线程可以被回收,当线程池无被引用且无核心线程数,应当被回收 + * 动态线程池且被Spring管理:false + */ + public static ExecutorService getPendingSingleThreadPool() { + return ExecutorBuilder.create() + .setCorePoolSize(ThreadPoolConstant.SINGLE_CORE_POOL_SIZE) + .setMaxPoolSize(ThreadPoolConstant.SINGLE_MAX_POOL_SIZE) + .setWorkQueue(ThreadPoolConstant.BIG_BLOCKING_QUEUE) + .setHandler(new ThreadPoolExecutor.CallerRunsPolicy()) + .setAllowCoreThreadTimeOut(true) + .setKeepAliveTime(ThreadPoolConstant.SMALL_KEEP_LIVE_TIME, TimeUnit.SECONDS) + .build(); + } + +} diff --git a/austin-support/src/main/java/com/java3y/austin/support/config/ThreadPoolConfiguration.java b/austin-support/src/main/java/com/java3y/austin/support/config/ThreadPoolConfiguration.java deleted file mode 100644 index 9b5f76d..0000000 --- a/austin-support/src/main/java/com/java3y/austin/support/config/ThreadPoolConfiguration.java +++ /dev/null @@ -1,36 +0,0 @@ -package com.java3y.austin.support.config; - -import com.dtp.common.em.QueueTypeEnum; -import com.dtp.core.support.ThreadPoolCreator; -import com.dtp.core.thread.DtpExecutor; -import com.dtp.core.thread.ThreadPoolBuilder; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; - -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; - -/** - * @author Redick01 - */ -@Configuration -public class ThreadPoolConfiguration { - - @Bean - public DtpExecutor dtpExecutor() { - - return ThreadPoolCreator.createDynamicFast("dynamic-tp-test-1"); - } - - @Bean - public ThreadPoolExecutor threadPoolExecutor() { - return ThreadPoolBuilder.newBuilder() - .threadPoolName("dynamic-tp-test-2") - .corePoolSize(10) - .maximumPoolSize(15) - .keepAliveTime(15000) - .timeUnit(TimeUnit.MILLISECONDS) - .workQueue(QueueTypeEnum.SYNCHRONOUS_QUEUE.getName(), null, false) - .buildDynamic(); - } -} diff --git a/austin-support/src/main/java/com/java3y/austin/support/pending/AbstractLazyPending.java b/austin-support/src/main/java/com/java3y/austin/support/pending/AbstractLazyPending.java index 6098815..f3cc326 100644 --- a/austin-support/src/main/java/com/java3y/austin/support/pending/AbstractLazyPending.java +++ b/austin-support/src/main/java/com/java3y/austin/support/pending/AbstractLazyPending.java @@ -1,15 +1,16 @@ package com.java3y.austin.support.pending; import cn.hutool.core.collection.CollUtil; -import cn.hutool.core.thread.ThreadUtil; import com.google.common.base.Throwables; import com.google.common.collect.Lists; +import com.java3y.austin.support.config.SupportThreadPoolConfig; import lombok.Data; import lombok.extern.slf4j.Slf4j; import javax.annotation.PostConstruct; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; /** @@ -36,12 +37,18 @@ public abstract class AbstractLazyPending { */ private Long lastHandleTime = System.currentTimeMillis(); + /** + * 是否终止线程 + */ + private Boolean stop = false; + /** * 单线程消费 阻塞队列的数据 */ @PostConstruct public void initConsumePending() { - ThreadUtil.newSingleExecutor().execute(() -> { + ExecutorService executorService = SupportThreadPoolConfig.getPendingSingleThreadPool(); + executorService.execute(() -> { while (true) { try { T obj = pendingParam.getQueue().poll(pendingParam.getTimeThreshold(), TimeUnit.MILLISECONDS); @@ -58,16 +65,23 @@ public abstract class AbstractLazyPending { // 具体执行逻辑 pendingParam.getExecutorService().execute(() -> this.handle(taskRef)); } + + // 判断是否停止当前线程 + if (stop && CollUtil.isEmpty(tasks)) { + break; + } } catch (Exception e) { log.error("Pending#initConsumePending failed:{}", Throwables.getStackTraceAsString(e)); } } }); + executorService.shutdown(); } /** * 1. 数量超限 * 2. 时间超限 + * * @return */ private boolean dataReady() { @@ -110,4 +124,5 @@ public abstract class AbstractLazyPending { * @param list */ public abstract void doHandle(List list); + } diff --git a/austin-support/src/main/java/com/java3y/austin/support/utils/ThreadPoolUtils.java b/austin-support/src/main/java/com/java3y/austin/support/utils/ThreadPoolUtils.java new file mode 100644 index 0000000..65c258d --- /dev/null +++ b/austin-support/src/main/java/com/java3y/austin/support/utils/ThreadPoolUtils.java @@ -0,0 +1,31 @@ +package com.java3y.austin.support.utils; + +import com.dtp.core.DtpRegistry; +import com.dtp.core.thread.DtpExecutor; +import com.java3y.austin.support.config.ThreadPoolExecutorShutdownDefinition; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +/** + * 线程池工具类 + * + * @author 3y + */ +@Component +public class ThreadPoolUtils { + + @Autowired + private ThreadPoolExecutorShutdownDefinition shutdownDefinition; + + private static final String SOURCE_NAME = "austin"; + + + /** + * 1. 将当前线程池 加入到 动态线程池内 + * 2. 注册 线程池 被Spring管理,优雅关闭 + */ + public void register(DtpExecutor dtpExecutor) { + DtpRegistry.register(dtpExecutor, SOURCE_NAME); + shutdownDefinition.registryExecutor(dtpExecutor); + } +} diff --git a/austin-web/src/main/java/com/java3y/austin/web/controller/ThreadPoolTest.java b/austin-web/src/main/java/com/java3y/austin/web/controller/ThreadPoolTest.java index beb6243..c302b59 100644 --- a/austin-web/src/main/java/com/java3y/austin/web/controller/ThreadPoolTest.java +++ b/austin-web/src/main/java/com/java3y/austin/web/controller/ThreadPoolTest.java @@ -13,10 +13,12 @@ public class ThreadPoolTest { @GetMapping("/tp") public void send() { DtpExecutor dtpExecutor1 = DtpRegistry.getExecutor("austin-im.notice"); - DtpExecutor dtpExecutor2 = DtpRegistry.getExecutor("dynamic-tp-test-2"); + DtpExecutor dtpExecutor2 = DtpRegistry.getExecutor("execute-xxl-thread-pool"); + DtpExecutor dtpExecutor3 = DtpRegistry.getExecutor("dynamic-tp-test-2"); System.out.println(dtpExecutor1); System.out.println(dtpExecutor2); + System.out.println(dtpExecutor3); } } diff --git a/austin-web/src/main/resources/application.properties b/austin-web/src/main/resources/application.properties index a1a4da6..722c629 100644 --- a/austin-web/src/main/resources/application.properties +++ b/austin-web/src/main/resources/application.properties @@ -52,18 +52,6 @@ austin.business.log.topic.name=austinLog austin.business.graylog.ip=${austin-grayLog-ip} # TODO if windows os ,replace path ! austin.business.upload.crowd.path=/Users/3y/temp - -##################### business cron async properties ##################### -austin.async.task.thread-name-prefix=austinAsyncExecutor- -austin.async.task.max-size=2 -austin.async.task.core-size=2 -austin.async.task.queue-capacity=20 -austin.async.task.keep-alive=60 -austin.async.task.rejected-handler=callrunspolicy -austin.async.task.allow-core-thread-timeout=false -austin.async.task.await-termination-seconds=10 -austin.async.task.wait-for-tasks-to-complete-on-shut-down=true - ##################### xxl properties ##################### xxl.job.admin.addresses=http://${austin-xxl-job-ip}:${austin-xxl-job-port}/xxl-job-admin diff --git a/austin-web/src/main/resources/dynamic-tp-apollo-dtp.yml b/austin-web/src/main/resources/dynamic-tp-apollo-dtp.yml index ef3a3da..d48e744 100644 --- a/austin-web/src/main/resources/dynamic-tp-apollo-dtp.yml +++ b/austin-web/src/main/resources/dynamic-tp-apollo-dtp.yml @@ -19,22 +19,24 @@ spring: secret: SECb544445a6a34f0315d08b17de41 receivers: 18888888888 executors: - - threadPoolName: dynamic-tp-test-1 - corePoolSize: 5 + - threadPoolName: austin-im.notice + corePoolSize: 6 maximumPoolSize: 8 - keepAliveTime: 40 - queueType: VariableLinkedBlockingQueue - queueCapacity: 500 - rejectedHandlerType: CallerRunsPolicy - threadNamePrefix: test-1 - - - threadPoolName: dynamic-tp-test-2 + queueCapacity: 200 + queueType: VariableLinkedBlockingQueue # 任务队列,查看源码QueueTypeEnum枚举类 + rejectedHandlerType: CallerRunsPolicy # 拒绝策略,查看RejectedTypeEnum枚举类 + keepAliveTime: 50 + allowCoreThreadTimeOut: false + threadNamePrefix: austin- # 线程名前缀 + - threadPoolName: execute-xxl-thread-pool corePoolSize: 3 - maximumPoolSize: 4 + maximumPoolSize: 3 + queueCapacity: 200 + queueType: VariableLinkedBlockingQueue # 任务队列,查看源码QueueTypeEnum枚举类 + rejectedHandlerType: CallerRunsPolicy # 拒绝策略,查看RejectedTypeEnum枚举类 keepAliveTime: 50 - queueType: VariableLinkedBlockingQueue - queueCapacity: 5000 - threadNamePrefix: test2 + allowCoreThreadTimeOut: false + threadNamePrefix: austin- # 线程名前缀 notifyItems: # 报警项,不配置自动会配置(变更通知、容量报警、活性报警、拒绝报警) - type: capacity # 报警项类型,查看源码 NotifyTypeEnum枚举类 enabled: true @@ -48,5 +50,4 @@ spring: threshold: 80 - type: reject enabled: true - threshold: 1 - + threshold: 1 \ No newline at end of file diff --git a/pom.xml b/pom.xml index 48aba5f..0e3f6a0 100644 --- a/pom.xml +++ b/pom.xml @@ -164,7 +164,7 @@ io.github.lyh200 dynamic-tp-spring-boot-starter-apollo - 1.0.1 + 1.0.2