Semaphore:限流神器
Semaphore(信号量)是Java并发包中的限流利器,它通过控制同时访问共享资源的线程数,实现优雅的流量控制和资源保护。相比其他限流方案,Semaphore轻量、灵活、易用。
🎯 Semaphore的核心概念
工作原理
Semaphore semaphore = new Semaphore(3); // 3个许可证线程1 → 请求许可证 → 获得 → 执行业务 → 释放线程2 → 请求许可证 → 获得 → 执行业务 → 释放线程3 → 请求许可证 → 获得 → 执行业务 → 释放线程4 → 请求许可证 → 等待...(直到有线程释放)
两种模式
|
|
|
|
|---|---|---|
| 公平模式 |
|
|
| 非公平模式 |
|
|
💻 基础用法
1. 基础示例
public class SemaphoreBasics {// 创建信号量,初始3个许可证,非公平模式(默认)private static Semaphore semaphore = new Semaphore(3);// 公平模式// private static Semaphore semaphore = new Semaphore(3, true);publicstaticvoidmain(String[] args) {for (int i = 0; i < 10; i++) {new Thread(new Worker()).start();}}static class Worker implements Runnable {@Overridepublicvoidrun() {try {// 1. 获取许可证(可中断)semaphore.acquire();System.out.println(Thread.currentThread().getName()+ " 获得许可证,开始工作");// 模拟业务处理Thread.sleep(2000);System.out.println(Thread.currentThread().getName()+ " 工作完成,释放许可证");// 2. 释放许可证semaphore.release();} catch (InterruptedException e) {Thread.currentThread().interrupt();}}}}
2. 常用API
public class SemaphoreAPI {private Semaphore semaphore = new Semaphore(5);// 获取许可证(阻塞,可中断)publicvoidacquireDemo() throws InterruptedException {semaphore.acquire(); // 获取1个semaphore.acquire(2); // 获取2个semaphore.acquireUninterruptibly(); // 不可中断}// 尝试获取(非阻塞)publicvoidtryAcquireDemo() {boolean acquired = semaphore.tryAcquire(); // 立即返回boolean acquiredWithWait = semaphore.tryAcquire(3, TimeUnit.SECONDS); // 等待3秒if (acquired) {try {// 执行业务} finally {semaphore.release();}}}// 获取当前状态publicvoidstatusDemo() {int available = semaphore.availablePermits(); // 可用许可证数int queueLength = semaphore.getQueueLength(); // 等待线程数boolean hasQueued = semaphore.hasQueuedThreads(); // 是否有等待线程System.out.printf("可用: %d, 等待: %d%n", available, queueLength);}// 批量释放publicvoidbatchRelease() {semaphore.release(); // 释放1个semaphore.release(3); // 释放3个}// 清空许可证(谨慎使用)publicvoiddrainPermits() {int drained = semaphore.drainPermits(); // 获取并清空所有可用许可证System.out.println("清空了 " + drained + " 个许可证");}}
🚀 实战场景
场景1:数据库连接池限流
@Componentpublic class DatabaseConnectionPool {private final Semaphore semaphore;private final List<Connection> connections;publicDatabaseConnectionPool(int poolSize) {// 初始化连接池this.connections = new ArrayList<>(poolSize);for (int i = 0; i < poolSize; i++) {connections.add(createConnection());}// 初始化信号量,许可证数量等于连接数this.semaphore = new Semaphore(poolSize, true); // 公平模式}/*** 获取数据库连接*/public Connection getConnection() throws InterruptedException {// 获取许可证,如果没有可用则阻塞semaphore.acquire();// 从池中取出一个连接(这里简化了,实际需要线程安全)return borrowConnection();}/*** 归还连接*/publicvoidreturnConnection(Connection conn) {// 归还连接到池中returnConnectionToPool(conn);// 释放许可证semaphore.release();}/*** 带超时的获取*/public Optional<Connection> getConnectionWithTimeout(long timeout, TimeUnit unit)throws InterruptedException {boolean acquired = semaphore.tryAcquire(timeout, unit);if (acquired) {return Optional.of(borrowConnection());}return Optional.empty(); // 获取超时}private synchronized Connection borrowConnection() {// 实际应该使用BlockingQueue,这里简化return connections.remove(0);}private synchronized voidreturnConnectionToPool(Connection conn) {connections.add(conn);}}
场景2:接口限流保护
@RestControllerpublic class ApiController {// 每秒最多处理10个请求private final Semaphore rateLimiter = new Semaphore(10);@GetMapping("/api/business")public ResponseEntity<String> handleRequest() {// 尝试获取许可证,立即失败if (!rateLimiter.tryAcquire()) {return ResponseEntity.status(429).body("系统繁忙,请稍后重试");}try {// 处理业务return ResponseEntity.ok(processBusiness());} finally {// 释放许可证rateLimiter.release();}}// 动态调整限流阈值private final Semaphore dynamicLimiter = new Semaphore(100);@Scheduled(fixedDelay = 5000)public void adjustLimit() {int currentLoad = getCurrentSystemLoad();if (currentLoad > 80) {// 系统负载高,降低阈值int current = dynamicLimiter.availablePermits();if (current > 50) {dynamicLimiter.drainPermits(); // 清空多余的// 重新设置50个许可证(需要重新创建Semaphore)dynamicLimiter = new Semaphore(50);}}}}
场景3:多资源申请(哲学家就餐问题)
public class DiningPhilosophers {// 5个筷子,每个筷子是一个信号量private static final Semaphore[] chopsticks = new Semaphore[5];static {for (int i = 0; i < 5; i++) {chopsticks[i] = new Semaphore(1); // 每个筷子只能被一个人使用}}// 同时拿起筷子的限制(避免死锁)private static final Semaphore eatLimit = new Semaphore(4); // 最多4人同时吃static class Philosopher implements Runnable {private final int id;private final Semaphore leftChopstick;private final Semaphore rightChopstick;public Philosopher(int id) {this.id = id;this.leftChopstick = chopsticks[id];this.rightChopstick = chopsticks[(id + 1) % 5];}@Overridepublic void run() {try {while (true) {think();eat();}} catch (InterruptedException e) {Thread.currentThread().interrupt();}}private void think() throws InterruptedException {System.out.println("哲学家 " + id + " 在思考");Thread.sleep((long) (Math.random() * 1000));}private void eat() throws InterruptedException {// 使用eatLimit限制同时吃饭的人数eatLimit.acquire();try {// 拿起左边筷子leftChopstick.acquire();System.out.println("哲学家 " + id + " 拿起左边筷子");// 尝试拿起右边筷子(带超时)if (rightChopstick.tryAcquire(1, TimeUnit.SECONDS)) {try {System.out.println("哲学家 " + id + " 拿起右边筷子,开始吃饭");Thread.sleep((long) (Math.random() * 1000));} finally {rightChopstick.release();System.out.println("哲学家 " + id + " 放下右边筷子");}} else {// 拿不到右边筷子,放弃System.out.println("哲学家 " + id + " 拿不到右边筷子,放弃");}} finally {leftChopstick.release();System.out.println("哲学家 " + id + " 放下左边筷子");eatLimit.release();}}}}
📊 与限流器对比
|
|
|
|
|
|
|---|---|---|---|---|
| 控制维度 |
|
|
|
|
| 精度 |
|
|
|
|
| 突发流量 |
|
|
|
|
| 等待队列 |
|
|
|
|
| 适用场景 |
|
|
|
|
⚠️ 常见陷阱与最佳实践
1. 正确释放许可证
// ❌ 错误:忘记释放public void badExample() {try {semaphore.acquire();// 业务逻辑// 忘记释放,导致许可证泄漏} catch (InterruptedException e) {Thread.currentThread().interrupt();}}// ✅ 正确:finally确保释放public void goodExample() {boolean acquired = false;try {semaphore.acquire();acquired = true;// 业务逻辑} catch (InterruptedException e) {Thread.currentThread().interrupt();} finally {if (acquired) {semaphore.release();}}}// ✅ 更简洁的写法(Java 7+ try-with-resource)public void tryWithResource() throws InterruptedException {try (SemaphoreResource resource = new SemaphoreResource(semaphore)) {// 业务逻辑,自动释放}}// 自定义AutoCloseable包装public class SemaphoreResource implements AutoCloseable {private final Semaphore semaphore;private final boolean acquired;public SemaphoreResource(Semaphore semaphore) throws InterruptedException {this.semaphore = semaphore;this.semaphore.acquire();this.acquired = true;}@Overridepublic void close() {if (acquired) {semaphore.release();}}}
2. 避免死锁
// ❌ 可能死锁:A拿资源1等资源2,B拿资源2等资源1publicvoiddeadlockRisk() throws InterruptedException {// 线程Asemaphore1.acquire();semaphore2.acquire(); // 可能死锁// 线程Bsemaphore2.acquire();semaphore1.acquire(); // 可能死锁}// ✅ 解决方案1:固定顺序获取publicvoidfixedOrder() throws InterruptedException {// 总是先获取semaphore1,再获取semaphore2semaphore1.acquire();semaphore2.acquire();}// ✅ 解决方案2:使用tryAcquire带超时publicvoidsafeAcquire() throws InterruptedException {semaphore1.acquire();try {if (!semaphore2.tryAcquire(1, TimeUnit.SECONDS)) {// 获取失败,释放已获得的资源semaphore1.release();// 等待后重试或返回return;}} catch (InterruptedException e) {semaphore1.release();throw e;}}
3. 监控与告警
@Componentpublic class MonitoredSemaphore extends Semaphore {private final String name;private final AtomicInteger peakWaiters = new AtomicInteger(0);private final AtomicLong totalAcquireTime = new AtomicLong(0);public MonitoredSemaphore(String name, intpermits) {super(permits);this.name = name;// 启动监控线程startMonitor();}@Overridepublic void acquire() throws InterruptedException {long start = System.nanoTime();int beforeQueue = getQueueLength();super.acquire();long cost = System.nanoTime() - start;totalAcquireTime.addAndGet(cost);// 更新峰值等待数int currentQueue = beforeQueue;peakWaiters.updateAndGet(peak -> Math.max(peak, currentQueue));}private void startMonitor() {ScheduledExecutorService scheduler =Executors.newScheduledThreadPool(1);scheduler.scheduleAtFixedRate(() -> {int available = this.availablePermits();int queueLength = this.getQueueLength();int peak = peakWaiters.getAndSet(0);long avgTime = totalAcquireTime.getAndSet(0) / 1_000_000; // 转为毫秒log.info("Semaphore[{}] 可用:{}, 等待:{}, 峰值等待:{}, 平均获取时间:{}ms",name, available, queueLength, peak, avgTime);// 告警阈值if (queueLength > 100) {alertService.sendAlert(name + "等待队列过长: " + queueLength);}}, 10, 10, TimeUnit.SECONDS);}}
📈 性能测试
public class SemaphoreBenchmark {private static final int THREADS = 100;private static final int PERMITS = 20;public static void main(String[] args) throws InterruptedException {Semaphore fair = new Semaphore(PERMITS, true);Semaphore unfair = new Semaphore(PERMITS, false);System.out.println("公平模式测试:");runTest(fair);System.out.println("\n非公平模式测试:");runTest(unfair);}private static void runTest(Semaphore semaphore)throws InterruptedException {CountDownLatch latch = new CountDownLatch(THREADS);long start = System.currentTimeMillis();for (int i = 0; i < THREADS; i++) {new Thread(() -> {try {semaphore.acquire();Thread.sleep(10); // 模拟业务semaphore.release();} catch (InterruptedException e) {e.printStackTrace();} finally {latch.countDown();}}).start();}latch.await();long cost = System.currentTimeMillis() - start;System.out.println("总耗时: " + cost + "ms");System.out.println("平均每个请求: " + (cost / THREADS) + "ms");}}
🎯 总结
|
|
|
|
|---|---|---|
| 易用性 |
|
|
| 灵活性 |
|
|
| 性能 |
|
|
| 功能 |
|
|
| 适用场景 |
|
|
最佳实践口诀:
-
获取释放要成对,finally里不能忘
-
公平模式保顺序,非公平模式提性能
-
多资源时要小心,固定顺序避死锁
-
监控指标要跟上,异常告警早发现
Semaphore虽小,用好了能解决很多并发控制难题。记住:它控制的是”同时能有多少人”,而不是”每秒能来多少人”。
夜雨聆风