接入动态线程池

master
3y 3 years ago
parent ac554a08b1
commit c1639ef82e

@ -1,6 +1,9 @@
package com.java3y.austin.handler.pending; package com.java3y.austin.handler.pending;
import com.java3y.austin.handler.config.ThreadPoolConfig; import com.dtp.common.em.QueueTypeEnum;
import com.dtp.core.DtpRegistry;
import com.dtp.core.thread.DtpExecutor;
import com.dtp.core.thread.ThreadPoolBuilder;
import com.java3y.austin.handler.utils.GroupIdMappingUtils; import com.java3y.austin.handler.utils.GroupIdMappingUtils;
import com.java3y.austin.support.config.ThreadPoolExecutorShutdownDefinition; import com.java3y.austin.support.config.ThreadPoolExecutorShutdownDefinition;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
@ -11,6 +14,7 @@ import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
/** /**
@ -22,6 +26,14 @@ public class TaskPendingHolder {
@Autowired @Autowired
private ThreadPoolExecutorShutdownDefinition threadPoolExecutorShutdownDefinition; private ThreadPoolExecutorShutdownDefinition threadPoolExecutorShutdownDefinition;
@Autowired
private SpringUtils springUtils;
@Autowired
private DtpRegistry dtpRegistry;
/** /**
* 线 * 线
*/ */
@ -43,9 +55,17 @@ public class TaskPendingHolder {
@PostConstruct @PostConstruct
public void init() { public void init() {
for (String groupId : groupIds) { for (String groupId : groupIds) {
ExecutorService threadPool = ThreadPoolConfig.getThreadPool(coreSize, maxSize, queueSize); DtpExecutor dtpExecutor = ThreadPoolBuilder.newBuilder()
threadPoolExecutorShutdownDefinition.registryExecutor(threadPool); .threadPoolName("austin-" + groupId)
taskPendingHolder.put(groupId, threadPool); .corePoolSize(10)
.maximumPoolSize(15)
.keepAliveTime(15000)
.timeUnit(TimeUnit.MILLISECONDS)
.workQueue(QueueTypeEnum.SYNCHRONOUS_QUEUE.getName(), null, false)
.buildDynamic();
DtpRegistry.register(dtpExecutor, "beanPostProcessor");
threadPoolExecutorShutdownDefinition.registryExecutor(dtpExecutor);
taskPendingHolder.put(groupId, dtpExecutor);
} }
} }
/** /**

@ -78,6 +78,11 @@
<groupId>de.siegmar</groupId> <groupId>de.siegmar</groupId>
<artifactId>logback-gelf</artifactId> <artifactId>logback-gelf</artifactId>
</dependency> </dependency>
<dependency>
<groupId>io.github.lyh200</groupId>
<artifactId>dynamic-tp-spring-boot-starter-apollo</artifactId>
</dependency>
</dependencies> </dependencies>
</project> </project>

@ -0,0 +1,36 @@
package com.java3y.austin.support.config;
import com.dtp.common.em.QueueTypeEnum;
import com.dtp.core.support.ThreadPoolCreator;
import com.dtp.core.thread.DtpExecutor;
import com.dtp.core.thread.ThreadPoolBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* @author Redick01
*/
@Configuration
public class ThreadPoolConfiguration {
@Bean
public DtpExecutor dtpExecutor() {
return ThreadPoolCreator.createDynamicFast("dynamic-tp-test-1");
}
@Bean
public ThreadPoolExecutor threadPoolExecutor() {
return ThreadPoolBuilder.newBuilder()
.threadPoolName("dynamic-tp-test-2")
.corePoolSize(10)
.maximumPoolSize(15)
.keepAliveTime(15000)
.timeUnit(TimeUnit.MILLISECONDS)
.workQueue(QueueTypeEnum.SYNCHRONOUS_QUEUE.getName(), null, false)
.buildDynamic();
}
}

@ -0,0 +1,22 @@
package com.java3y.austin.web.controller;
import com.dtp.core.DtpRegistry;
import com.dtp.core.thread.DtpExecutor;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class ThreadPoolTest {
@GetMapping("/tp")
public void send() {
DtpExecutor dtpExecutor1 = DtpRegistry.getExecutor("austin-im.notice");
DtpExecutor dtpExecutor2 = DtpRegistry.getExecutor("dynamic-tp-test-2");
System.out.println(dtpExecutor1);
System.out.println(dtpExecutor2);
}
}

@ -0,0 +1,52 @@
# 动态线程池配置文件建议单独开一个文件放到配置中心字段详解看readme介绍
spring:
dynamic:
tp:
enabled: true
enabledBanner: true # 是否开启banner打印默认true
enabledCollect: true # 是否开启监控指标采集默认false
collectorType: micrometer # 监控数据采集器类型JsonLog | MicroMeter默认logging
monitorInterval: 5 # 监控时间间隔报警判断、指标采集默认5s
apollo: # apollo配置不配置默认拿apollo配置第一个namespace
namespace: dynamic-tp-apollo-dtp.yml
configType: yml
platforms:
- platform: wechat
urlKey: 38aa7eff500-1287
receivers: apollo
- platform: ding
urlKey: f80dad441fcd65bac48473d4a88dcd6a
secret: SECb544445a6a34f0315d08b17de41
receivers: 18888888888
executors:
- threadPoolName: dynamic-tp-test-1
corePoolSize: 5
maximumPoolSize: 8
keepAliveTime: 40
queueType: VariableLinkedBlockingQueue
queueCapacity: 500
rejectedHandlerType: CallerRunsPolicy
threadNamePrefix: test-1
- threadPoolName: dynamic-tp-test-2
corePoolSize: 3
maximumPoolSize: 4
keepAliveTime: 50
queueType: VariableLinkedBlockingQueue
queueCapacity: 5000
threadNamePrefix: test2
notifyItems: # 报警项,不配置自动会配置(变更通知、容量报警、活性报警、拒绝报警)
- type: capacity # 报警项类型,查看源码 NotifyTypeEnum枚举类
enabled: true
threshold: 80 # 报警阈值
platforms: [ding,wechat] # 可选配置不配置默认拿上层platforms配置的所以平台
interval: 120 # 报警间隔单位s
- type: change
enabled: true
- type: liveness
enabled: true
threshold: 80
- type: reject
enabled: true
threshold: 1

@ -160,6 +160,13 @@
<version>${weixin-java-mp}</version> <version>${weixin-java-mp}</version>
</dependency> </dependency>
<!--动态线程池引入-->
<dependency>
<groupId>io.github.lyh200</groupId>
<artifactId>dynamic-tp-spring-boot-starter-apollo</artifactId>
<version>1.0.1</version>
</dependency>
</dependencies> </dependencies>
</dependencyManagement> </dependencyManagement>

Loading…
Cancel
Save