1
支持阻塞:当队列为空时,take() 操作会阻塞消费者线程;当队列满时,put() 操作会阻塞生产者线程。
线程安全:内部使用 ReentrantLock 和 Condition 实现,无需开发者手动加锁
2
// LinkedBlockingQueue 核心字段privatefinal ReentrantLock takeLock = new ReentrantLock(); // 消费锁privatefinal Condition notEmpty = takeLock.newCondition();privatefinal ReentrantLock putLock = new ReentrantLock(); // 生产锁privatefinal Condition notFull = putLock.newCondition();// ArrayBlockingQueue 核心字段privatefinal ReentrantLock lock = new ReentrantLock();privatefinal Condition notEmpty = lock.newCondition();privatefinal Condition notFull = lock.newCondition();SynchronousQueue<String> queue = new SynchronousQueue<>();// 下面的put会阻塞,直到有线程调用takenew Thread(() -> {try {queue.put("data"); } catch (InterruptedException e) {}}).start();// 主线程取走数据,上面阻塞解除String data = queue.take(); // 拿到 "data"定时任务调度
缓存过期清理
订单超时关闭
classTaskimplementsDelayed{private String name;privatelong startTime; // 执行时间戳@OverridepubliclonggetDelay(TimeUnit unit){return unit.convert(startTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS); }@OverridepublicintcompareTo(Delayed o){return Long.compare(this.getDelay(TimeUnit.MILLISECONDS), o.getDelay(TimeUnit.MILLISECONDS)); }}3
publicvoidput(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock;lock.lockInterruptibly(); // 可中断加锁try {while (count == items.length) // 队列满了 notFull.await(); // 阻塞等待 enqueue(e); // 入队 } finally {lock.unlock(); }}privatevoidenqueue(E x) { final Object[] items = this.items; items[putIndex] = x;if (++putIndex == items.length) putIndex = 0; // 环形数组 count++; notEmpty.signal(); // 唤醒等待的消费者}使用while循环检查而非if,防止虚假唤醒
入队后调用notEmpty.signal()唤醒消费者
public E take()throws InterruptedException { E x;int c = -1;final AtomicInteger count = this.count;final ReentrantLock takeLock = this.takeLock; takeLock.lockInterruptibly();try {while (count.get() == 0) // 队列为空 notEmpty.await(); // 阻塞等待 x = dequeue(); // 出队 c = count.getAndDecrement();if (c > 1) // 出队后队列仍非空 notEmpty.signal(); // 唤醒其他消费者 } finally { takeLock.unlock(); }if (c == capacity) // 之前队列满了 signalNotFull(); // 唤醒生产者return x;}消费完成后,如果队列仍非空,唤醒其他消费者
如果消费前队列是满的,消费后唤醒生产者(通过signalNotFull())
4
publicclassProducerConsumerDemo{privatestaticfinal BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(10);staticclassProducerimplementsRunnable{@Overridepublicvoidrun(){int i = 0;while (true) {try { queue.put(i++); // 满了就阻塞 System.out.println("生产: " + i); Thread.sleep(500); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } } }staticclassConsumerimplementsRunnable{@Overridepublicvoidrun(){while (true) {try { Integer val = queue.take(); // 空了就阻塞 System.out.println("消费: " + val); Thread.sleep(1000); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } } }publicstaticvoidmain(String[] args){new Thread(new Producer()).start();new Thread(new Consumer()).start(); }}// 固定线程池采用 LinkedBlockingQueue(无界)ExecutorService pool = Executors.newFixedThreadPool(5);// 内部核心:return new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS,// new LinkedBlockingQueue<Runnable>());// 缓存线程池采用 SynchronousQueue(直接传递)ExecutorService cachedPool = Executors.newCachedThreadPool();// 内部核心:return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS,// new SynchronousQueue<Runnable>());publicclassOrderimplementsDelayed{private String orderId;privatelong expireTime;publicOrder(String orderId, long delayMillis){this.orderId = orderId;this.expireTime = System.currentTimeMillis() + delayMillis; }@OverridepubliclonggetDelay(TimeUnit unit){long diff = expireTime - System.currentTimeMillis();return unit.convert(diff, TimeUnit.MILLISECONDS); }@OverridepublicintcompareTo(Delayed o){return Long.compare(this.expireTime, ((Order) o).expireTime); }}// 使用DelayQueue<Order> queue = new DelayQueue<>();queue.put(new Order("001", 30 * 60 * 1000)); // 30分钟后关闭Order order = queue.take(); // 阻塞直到30分钟后closeOrder(order.getOrderId());5
固定大小缓冲区选择ArrayBlockingQueue
高吞吐量生产-消费选择LinkedBlockingQueue
需要处理优先级选择PriorityBlockingQueue
延迟任务选择DelayQueue
直接传递(无缓冲)选择SynchronousQueue
无界队列要严格监控内存使用,避免 OOM。
生产环境中建议设置合理容量和监控队列长度(queue.size())。
线程池的 LinkedBlockingQueue 默认是无界队列,建议改成有界或使用 SynchronousQueue。
停止时注意使用 poll(timeout) 避免永久阻塞。
往期推荐
分享
收藏
点赞
在看

夜雨聆风