自定义线程池工具类

  作者:记性不好的阁主

自定义业务线程池


package com.bozhong.schedule.common.utils;

import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

@Slf4j
public class ThreadUtil {

    private static final ExecutorService DEFAULT_EXECUTOR = new ScheduleThreadPoolExecutor(5, 10, 0, TimeUnit.SECONDS, new LinkedBlockingDeque<>(), new ScheduleThreadFactory(), new IntelligenceEngineRejectedExecutionHandler());
    // 专门处理排班导出的线程池
    // new LinkedBlockingDeque<>(100) 最大队列数100,超过100则AbortPolicy(默认):当线程池无法处理新任务时,直接抛出RejectedExecutionException异常。
    private static final ExecutorService PROCESS_SCHEDULE_EXPORT_EXECUTOR = new ScheduleThreadPoolExecutor(8, 15, 0, TimeUnit.SECONDS, new LinkedBlockingDeque<>(100), new ProcessScheduleExportThreadFactory(), new IntelligenceEngineRejectedExecutionHandler());

    protected static final Logger LOG = LoggerFactory.getLogger("THREAD_POOL_LOGGER");

    public static void run(Runnable runnable) {
        DEFAULT_EXECUTOR.execute(runnable);
    }

    public static void runForProcessScheduleExport(Runnable runnable) {
        PROCESS_SCHEDULE_EXPORT_EXECUTOR.execute(runnable);
    }

    static class ScheduleThreadPoolExecutor extends ThreadPoolExecutor {

        public ScheduleThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, IntelligenceEngineRejectedExecutionHandler handler) {
            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
        }
    }

    static class IntelligenceEngineRejectedExecutionHandler implements RejectedExecutionHandler {

        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            executor.execute(r);
        }
    }

    static class ScheduleThreadFactory implements ThreadFactory {

        private static final AtomicInteger poolNumber = new AtomicInteger(1);
        private final ThreadGroup group;
        private final AtomicInteger threadNumber = new AtomicInteger(1);
        private final String namePrefix;

        ScheduleThreadFactory() {
            SecurityManager s = System.getSecurityManager();
            group = (s != null) ? s.getThreadGroup() :
                    Thread.currentThread().getThreadGroup();
            namePrefix = "ScheduleThreadFactory-" +
                    poolNumber.getAndIncrement() +
                    "-thread-";
        }

        @Override
        public Thread newThread(Runnable r) {
            Thread t = new Thread(group, r,
                    namePrefix + threadNumber.getAndIncrement(),
                    0);
            if (t.isDaemon()) {
                t.setDaemon(false);
            }
            if (t.getPriority() != Thread.NORM_PRIORITY) {
                t.setPriority(Thread.NORM_PRIORITY);
            }
            return t;
        }
    }

    static class ProcessScheduleExportThreadFactory implements ThreadFactory {

        private static final AtomicInteger poolNumber = new AtomicInteger(1);
        private final ThreadGroup group;
        private final AtomicInteger threadNumber = new AtomicInteger(1);
        private final String namePrefix;

        ProcessScheduleExportThreadFactory() {
            SecurityManager s = System.getSecurityManager();
            group = (s != null) ? s.getThreadGroup() :
                    Thread.currentThread().getThreadGroup();
            namePrefix = "ProcessScheduleExportThreadFactory-" +
                    poolNumber.getAndIncrement() +
                    "-thread-";
        }

        @Override
        public Thread newThread(Runnable r) {
            Thread t = new Thread(group, r,
                    namePrefix + threadNumber.getAndIncrement(),
                    0);
            if (t.isDaemon()) {
                t.setDaemon(false);
            }
            if (t.getPriority() != Thread.NORM_PRIORITY) {
                t.setPriority(Thread.NORM_PRIORITY);
            }
            return t;
        }
    }


    /**
     * 记录线程池的当前信息
     */
    public static void recordThreadPoolInfo(){
//        ThreadPoolExecutor statisticsExportExecutor = (ThreadPoolExecutor)STATISTICS_EXPORT_EXECUTOR;
        ThreadPoolExecutor defaultExecutor = (ThreadPoolExecutor)DEFAULT_EXECUTOR;
        ThreadPoolExecutor processScheduleExportExecutor = (ThreadPoolExecutor)PROCESS_SCHEDULE_EXPORT_EXECUTOR;

//        ThreadPoolExecutor serviceExecutor = (ThreadPoolExecutor)SERVICE_EXECUTOR;
//        ThreadPoolExecutor standardStatisticsExportExecutor = (ThreadPoolExecutor)STANDARD_STATISTICS_EXPORT_EXECUTOR;

        List<ThreadPoolExecutor> threadPoolExecutorList = new ArrayList<>();
//        threadPoolExecutorList.add(statisticsExportExecutor);
        threadPoolExecutorList.add(defaultExecutor);
        threadPoolExecutorList.add(processScheduleExportExecutor);

//        threadPoolExecutorList.add(serviceExecutor);
//        threadPoolExecutorList.add(standardStatisticsExportExecutor);

        for(ThreadPoolExecutor executor : threadPoolExecutorList){
            LOG.info("线程池名称:{}, 线程池需要执行的任务数:{}, 在运行过程中已完成的任务数:{}, 曾经创建过的最大线程数:{}, 线程池里的线程数量:{}, 活跃的线程数量:{}, 当前队列里的任务数:{}",
                    executor.getThreadFactory().toString(), executor.getTaskCount(), executor.getCompletedTaskCount(),
                    executor.getLargestPoolSize(), executor.getPoolSize(), executor.getActiveCount(),executor.getQueue().size());
        }
    }

    public void init(){
        Executors.newScheduledThreadPool(1).scheduleAtFixedRate(ThreadUtil::recordThreadPoolInfo,5,10,TimeUnit.SECONDS);
    }

}

使用方式


ThreadUtil.runForProcessScheduleExport(() -> {
    // 执行排班明细导出逻辑
});


相关推荐

评论 抢沙发

表情

分类选择