自定义业务线程池
package com.bozhong.schedule.common.utils;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
@Slf4j
public class ThreadUtil {
private static final ExecutorService DEFAULT_EXECUTOR = new ScheduleThreadPoolExecutor(5, 10, 0, TimeUnit.SECONDS, new LinkedBlockingDeque<>(), new ScheduleThreadFactory(), new IntelligenceEngineRejectedExecutionHandler());
// 专门处理排班导出的线程池
// new LinkedBlockingDeque<>(100) 最大队列数100,超过100则AbortPolicy(默认):当线程池无法处理新任务时,直接抛出RejectedExecutionException异常。
private static final ExecutorService PROCESS_SCHEDULE_EXPORT_EXECUTOR = new ScheduleThreadPoolExecutor(8, 15, 0, TimeUnit.SECONDS, new LinkedBlockingDeque<>(100), new ProcessScheduleExportThreadFactory(), new IntelligenceEngineRejectedExecutionHandler());
protected static final Logger LOG = LoggerFactory.getLogger("THREAD_POOL_LOGGER");
public static void run(Runnable runnable) {
DEFAULT_EXECUTOR.execute(runnable);
}
public static void runForProcessScheduleExport(Runnable runnable) {
PROCESS_SCHEDULE_EXPORT_EXECUTOR.execute(runnable);
}
static class ScheduleThreadPoolExecutor extends ThreadPoolExecutor {
public ScheduleThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, IntelligenceEngineRejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
}
}
static class IntelligenceEngineRejectedExecutionHandler implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
executor.execute(r);
}
}
static class ScheduleThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
ScheduleThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = "ScheduleThreadFactory-" +
poolNumber.getAndIncrement() +
"-thread-";
}
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon()) {
t.setDaemon(false);
}
if (t.getPriority() != Thread.NORM_PRIORITY) {
t.setPriority(Thread.NORM_PRIORITY);
}
return t;
}
}
static class ProcessScheduleExportThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
ProcessScheduleExportThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = "ProcessScheduleExportThreadFactory-" +
poolNumber.getAndIncrement() +
"-thread-";
}
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon()) {
t.setDaemon(false);
}
if (t.getPriority() != Thread.NORM_PRIORITY) {
t.setPriority(Thread.NORM_PRIORITY);
}
return t;
}
}
/**
* 记录线程池的当前信息
*/
public static void recordThreadPoolInfo(){
// ThreadPoolExecutor statisticsExportExecutor = (ThreadPoolExecutor)STATISTICS_EXPORT_EXECUTOR;
ThreadPoolExecutor defaultExecutor = (ThreadPoolExecutor)DEFAULT_EXECUTOR;
ThreadPoolExecutor processScheduleExportExecutor = (ThreadPoolExecutor)PROCESS_SCHEDULE_EXPORT_EXECUTOR;
// ThreadPoolExecutor serviceExecutor = (ThreadPoolExecutor)SERVICE_EXECUTOR;
// ThreadPoolExecutor standardStatisticsExportExecutor = (ThreadPoolExecutor)STANDARD_STATISTICS_EXPORT_EXECUTOR;
List<ThreadPoolExecutor> threadPoolExecutorList = new ArrayList<>();
// threadPoolExecutorList.add(statisticsExportExecutor);
threadPoolExecutorList.add(defaultExecutor);
threadPoolExecutorList.add(processScheduleExportExecutor);
// threadPoolExecutorList.add(serviceExecutor);
// threadPoolExecutorList.add(standardStatisticsExportExecutor);
for(ThreadPoolExecutor executor : threadPoolExecutorList){
LOG.info("线程池名称:{}, 线程池需要执行的任务数:{}, 在运行过程中已完成的任务数:{}, 曾经创建过的最大线程数:{}, 线程池里的线程数量:{}, 活跃的线程数量:{}, 当前队列里的任务数:{}",
executor.getThreadFactory().toString(), executor.getTaskCount(), executor.getCompletedTaskCount(),
executor.getLargestPoolSize(), executor.getPoolSize(), executor.getActiveCount(),executor.getQueue().size());
}
}
public void init(){
Executors.newScheduledThreadPool(1).scheduleAtFixedRate(ThreadUtil::recordThreadPoolInfo,5,10,TimeUnit.SECONDS);
}
}
使用方式
ThreadUtil.runForProcessScheduleExport(() -> {
// 执行排班明细导出逻辑
});