public class CustomThread { public static void main(String[] args) { // 自定义线程 new Thread(new Runnable() { @Override public void run() { System.out.println("Custom Run"); System.out.println(Thread.currentThread().getName()); } },"custom-thread-1").start(); } }
1、Executors工具类,是JDK中Doug Lea大佬实现供开发者使用。
public class ConstomThreadPool extends ThreadPoolExecutor{ /** * * @param corePoolSize 核心线程池 * @param maximumPoolSize 线程池最大数量 * @param keepAliveTime 线程存活时间 * @param unit TimeUnit * @param workQueue 工作队列,自定义大小 * @param poolName 线程工厂自定义线程名称 */ public ConstomThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, String poolName) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); setThreadFactory(new CustomThreadFactory(poolName, false)); } }
/** * 自定义线程工厂 */ public class CustomThreadFactory implements ThreadFactory { /** * 线程前缀,采用AtomicInteger实现线程编号线程安全自增 */ private final AtomicInteger atomicInteger = new AtomicInteger(1); /** * 线程命名前缀 */ private final String namePrefix; /** * 线程工厂创建的线程是否是守护线程 */ private final boolean isDaemon; public CustomThreadFactory(String prefix, boolean daemin) { if (StringUtils.isNoneBlank(prefix)) { this.namePrefix = prefix; } else { this.namePrefix = "thread_pool"; } // 是否是守护线程 isDaemon = daemin; } @Override public Thread newThread(Runnable r) { Thread thread = new Thread(r, namePrefix + "-" + atomicInteger.getAndIncrement()); thread.setDaemon(isDaemon); // 设置线程优先级 if (thread.getPriority() != Thread.NORM_PRIORITY) { thread.setPriority(Thread.NORM_PRIORITY); } return thread; } }
@SuppressWarnings("serial") public class CustomizableThreadFactory extends CustomizableThreadCreator implements ThreadFactory { /** * Create a new CustomizableThreadFactory with default thread name prefix. */ public CustomizableThreadFactory() { super(); } /** * Create a new CustomizableThreadFactory with the given thread name prefix. * @param threadNamePrefix the prefix to use for the names of newly created threads */ public CustomizableThreadFactory(String threadNamePrefix) { super(threadNamePrefix); } @Override public Thread newThread(Runnable runnable) { return createThread(runnable); } }
@Configuration public class ThreadPoolConfig { /** * * @return ThreadPoolTaskExecutor */ @Bean("serviceTaskA") public ThreadPoolTaskExecutor serviceTaskA() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(2); executor.setMaxPoolSize(2); executor.setQueueCapacity(10); executor.setKeepAliveSeconds(60); executor.setThreadNamePrefix("service-a"); executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); return executor; } /** * * @return ThreadPoolTaskExecutor */ @Bean("serviceTaskB") public ThreadPoolTaskExecutor serviceTaskB() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(2); executor.setMaxPoolSize(2); executor.setQueueCapacity(10); executor.setKeepAliveSeconds(60); executor.setThreadNamePrefix("service-b"); executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); return executor; } }
List<String> list = new ArrayList<>(4); list.add("A"); list.add("B"); list.add("C"); list.add("D"); list.parallelStream().forEach(string -> { string = string + "paralleStream"; System.out.println(Thread.currentThread().getName()+":-> "+string); });
public final class CustomExecutors { /** * 核心线程数大小 */ private static final int CORE_POOL_SIZE=5; /** * 核心线程池大小 */ private static final int MAX_POOL_SIZE=10; /** * 线程存活时间 */ private static final int KEEP_ALIVE_TIME=60; /** * 工作队列大小 */ private static final LinkedBlockingQueue queue=new LinkedBlockingQueue(100); /** * 自定义线程池名前缀 */ private static final String POOL_PREFIX_NAME="Custom-Common-Pool"; private CustomExecutors(){ //throw new XXXXException("un support create pool!"); } private static ConstomThreadPool constomThreadPool; /** * 静态块初始化只执行一次,不关闭,整个系统公用一个线程池 */ static { constomThreadPool=new ConstomThreadPool(CORE_POOL_SIZE,MAX_POOL_SIZE,KEEP_ALIVE_TIME,TimeUnit.SECONDS,queue,POOL_PREFIX_NAME); } /** * 单例模式获取线程池 * @return ExecutorService */ private static ExecutorService getInstance(){ return constomThreadPool; } private static Future<?> submit(Runnable task){ return constomThreadPool.submit(task); } private static <T> Future<T> submit(Runnable task, T result){ return constomThreadPool.submit(task,result); } private static <T> Future<T> submit(Callable<T> task){ return constomThreadPool.submit(task); } private static void execute(Runnable task){ constomThreadPool.execute(task); } }
public class JobTriggerPoolHelper { private static Logger logger = LoggerFactory.getLogger(JobTriggerPoolHelper.class); // ---------------------- trigger pool ---------------------- // fast/slow thread pool private ThreadPoolExecutor fastTriggerPool = null; private ThreadPoolExecutor slowTriggerPool = null; public void start(){ fastTriggerPool = new ThreadPoolExecutor( 10, XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax(), 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(1000), new ThreadFactory() { @Override public Thread newThread(Runnable r) { return new Thread(r, "xxl-job, admin JobTriggerPoolHelper-fastTriggerPool-" + r.hashCode()); } }); slowTriggerPool = new ThreadPoolExecutor( 10, XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax(), 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(2000), new ThreadFactory() { @Override public Thread newThread(Runnable r) { return new Thread(r, "xxl-job, admin JobTriggerPoolHelper-slowTriggerPool-" + r.hashCode()); } }); } public void stop() { //triggerPool.shutdown(); fastTriggerPool.shutdownNow(); slowTriggerPool.shutdownNow(); logger.info(">>>>>>>>> xxl-job trigger thread pool shutdown success."); } // job timeout count private volatile long minTim = System.currentTimeMillis()/60000; // ms > min private volatile ConcurrentMap<Integer, AtomicInteger> jobTimeoutCountMap = new ConcurrentHashMap<>(); /** * add trigger */ public void addTrigger(final int jobId, final TriggerTypeEnum triggerType, final int failRetryCount, final String executorShardingParam, final String executorParam, final String addressList) { // choose thread pool ThreadPoolExecutor triggerPool_ = fastTriggerPool; AtomicInteger jobTimeoutCount = jobTimeoutCountMap.get(jobId); if (jobTimeoutCount!=null && jobTimeoutCount.get() > 10) { // job-timeout 10 times in 1 min triggerPool_ = slowTriggerPool; } // trigger triggerPool_.execute(new Runnable() { @Override public void run() { long start = System.currentTimeMillis(); try { // do trigger XxlJobTrigger.trigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam, addressList); } catch (Exception e) { logger.error(e.getMessage(), e); } finally { // check timeout-count-map long minTim_now = System.currentTimeMillis()/60000; if (minTim != minTim_now) { minTim = minTim_now; jobTimeoutCountMap.clear(); } // incr timeout-count-map long cost = System.currentTimeMillis()-start; if (cost > 500) { // ob-timeout threshold 500ms AtomicInteger timeoutCount = jobTimeoutCountMap.putIfAbsent(jobId, new AtomicInteger(1)); if (timeoutCount != null) { timeoutCount.incrementAndGet(); } } } } }); } // ---------------------- helper ---------------------- private static JobTriggerPoolHelper helper = new JobTriggerPoolHelper(); public static void toStart() { helper.start(); } public static void toStop() { helper.stop(); } /** * @param jobId * @param triggerType * @param failRetryCount * >=0: use this param * <0: use param from job info config * @param executorShardingParam * @param executorParam * null: use job param * not null: cover job param */ public static void trigger(int jobId, TriggerTypeEnum triggerType, int failRetryCount, String executorShardingParam, String executorParam, String addressList) { helper.addTrigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam, addressList); } }