GuardObject
不同,不需要产生结果和消费结果的线程一一对应阻塞队列
,采用的就是这种模式资源类
final class Message { private int id; private Object message; public int getId() { return id; } public Object getMessage() { return message; } public Message(int id, Object message) { this.id = id; this.message = message; } @Override public String toString() { return "Message{" + "id=" + id + ", message=" + message + '}'; } }
消息队列
class MessageQueueV1 { //消息队列集合 private LinkedList<Message> list = new LinkedList<>(); //消息容量 private int capacity; public MessageQueueV1(int capacity) { this.capacity = capacity; } //获取消息 public Message take() { synchronized (list) { while (list.isEmpty()) { try { log.debug("队列为空,消费者线程等待"); list.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } //从队列头部获取消息 Message message = list.removeFirst(); log.debug("已消费消息:message = {}", message); //唤醒存入消息的线程 list.notifyAll(); return message; } } //存入消息 public void put(Message message) { synchronized (list) { while (list.size() == capacity) { try { log.debug("队列已经满了,生产者线程等待"); list.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } //将消息从尾部入队列 list.addLast(message); log.debug("已生产消息:message = {}", message); //唤醒获取消息的线程 list.notifyAll(); } } }
测试
public static void main(String[] args) { MessageQueueV1 queue = new MessageQueueV1(2); for (int i = 1; i <= 3; i++) { int id = i; new Thread(() -> { queue.put(new Message(id, "值:" + id)); }, "生产者" + i).start(); } for (int i = 1; i <= 1; i++) { new Thread(() -> { while (true) { try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } Message message = queue.take(); } }, "消费者" + i).start(); } }