import org.springframework.context.annotation.Bean; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; import java.util.concurrent.Executor; import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; import org.springframework.scheduling.annotation.Async; import org.springframework.scheduling.annotation.Scheduled; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; @Slf4j @Component public class UdpQueueServer{ //消息队列 public static BlockingQueue queue = new ArrayBlockingQueue<>(1024); //设置线程池信息 @Bean("taskExecutor") public Executor taskExecutor() { ThreadPoolTaskScheduler executor = new ThreadPoolTaskScheduler(); //最大线程数 executor.setPoolSize(20); //线程名称 executor.setThreadNamePrefix("处理数据-"); executor.setWaitForTasksToCompleteOnShutdown(true); executor.setAwaitTerminationSeconds(10); return executor; } @Async("taskExecutor") //与上方@Bean中值相同 @Scheduled(fixedDelay = 3000) //3秒启动一个线程 public void consumerMessage() throws Exception { boolean isRunning = true; while (isRunning) { try { String con = (String) queue.take(); //取出队列中的数据 if (con != null){ log.info("内容:{}",con); } else { log.info("取出内容为null"); } if(queue.remainingCapacity == 1024){ isRunning = false; } } catch (Exception e) { e.printStackTrace(); log.info("{}",e); } } } }
线程池详细信息:https://blog.csdn.net/weixin_43583693/article/details/115675877