乐于分享
好东西不私藏

Semaphore:限流神器

Semaphore:限流神器

Semaphore(信号量)是Java并发包中的限流利器,它通过控制同时访问共享资源的线程数,实现优雅的流量控制和资源保护。相比其他限流方案,Semaphore轻量、灵活、易用。

🎯 Semaphore的核心概念

工作原理

Semaphore semaphore = new Semaphore(3); // 3个许可证线程1 → 请求许可证 → 获得 → 执行业务 → 释放线程2 → 请求许可证 → 获得 → 执行业务 → 释放  线程3 → 请求许可证 → 获得 → 执行业务 → 释放线程4 → 请求许可证 → 等待...(直到有线程释放)

两种模式

模式
特点
适用场景
公平模式
FIFO顺序,防止线程饥饿
对等待时间敏感的业务
非公平模式
允许插队,吞吐量更高
追求性能、可接受短暂不公平

💻 基础用法

1. 基础示例

public class SemaphoreBasics {    // 创建信号量,初始3个许可证,非公平模式(默认)    private static Semaphore semaphore = new Semaphore(3);    // 公平模式    // private static Semaphore semaphore = new Semaphore(3, true);    publicstaticvoidmain(String[] args) {        for (int i = 0; i < 10; i++) {            new Thread(new Worker()).start();        }    }    static class Worker implements Runnable {        @Override        publicvoidrun() {            try {                // 1. 获取许可证(可中断)                semaphore.acquire();                System.out.println(Thread.currentThread().getName()                     + " 获得许可证,开始工作");                // 模拟业务处理                Thread.sleep(2000);                System.out.println(Thread.currentThread().getName()                     + " 工作完成,释放许可证");                // 2. 释放许可证                semaphore.release();            } catch (InterruptedException e) {                Thread.currentThread().interrupt();            }        }    }}

2. 常用API

public class SemaphoreAPI {    private Semaphore semaphore = new Semaphore(5);    // 获取许可证(阻塞,可中断)    publicvoidacquireDemo() throws InterruptedException {        semaphore.acquire();           // 获取1个        semaphore.acquire(2);          // 获取2个        semaphore.acquireUninterruptibly(); // 不可中断    }    // 尝试获取(非阻塞)    publicvoidtryAcquireDemo() {        boolean acquired = semaphore.tryAcquire();  // 立即返回        boolean acquiredWithWait = semaphore.tryAcquire(3, TimeUnit.SECONDS); // 等待3秒        if (acquired) {            try {                // 执行业务            } finally {                semaphore.release();            }        }    }    // 获取当前状态    publicvoidstatusDemo() {        int available = semaphore.availablePermits();  // 可用许可证数        int queueLength = semaphore.getQueueLength();  // 等待线程数        boolean hasQueued = semaphore.hasQueuedThreads(); // 是否有等待线程        System.out.printf("可用: %d, 等待: %d%n", available, queueLength);    }    // 批量释放    publicvoidbatchRelease() {        semaphore.release();      // 释放1个        semaphore.release(3);     // 释放3个    }    // 清空许可证(谨慎使用)    publicvoiddrainPermits() {        int drained = semaphore.drainPermits(); // 获取并清空所有可用许可证        System.out.println("清空了 " + drained + " 个许可证");    }}

🚀 实战场景

场景1:数据库连接池限流

@Componentpublic class DatabaseConnectionPool {    private final Semaphore semaphore;    private final List<Connection> connections;    publicDatabaseConnectionPool(int poolSize) {        // 初始化连接池        this.connections = new ArrayList<>(poolSize);        for (int i = 0; i < poolSize; i++) {            connections.add(createConnection());        }        // 初始化信号量,许可证数量等于连接数        this.semaphore = new Semaphore(poolSize, true); // 公平模式    }    /**     * 获取数据库连接     */    public Connection getConnection() throws InterruptedException {        // 获取许可证,如果没有可用则阻塞        semaphore.acquire();        // 从池中取出一个连接(这里简化了,实际需要线程安全)        return borrowConnection();    }    /**     * 归还连接     */    publicvoidreturnConnection(Connection conn) {        // 归还连接到池中        returnConnectionToPool(conn);        // 释放许可证        semaphore.release();    }    /**     * 带超时的获取     */    public Optional<Connection> getConnectionWithTimeout(long timeout, TimeUnit unit            throws InterruptedException {        boolean acquired = semaphore.tryAcquire(timeout, unit);        if (acquired) {            return Optional.of(borrowConnection());        }        return Optional.empty(); // 获取超时    }    private synchronized Connection borrowConnection() {        // 实际应该使用BlockingQueue,这里简化        return connections.remove(0);    }    private synchronized voidreturnConnectionToPool(Connection conn) {        connections.add(conn);    }}

场景2:接口限流保护

@RestControllerpublic class ApiController {    // 每秒最多处理10个请求    private final Semaphore rateLimiter = new Semaphore(10);    @GetMapping("/api/business")    public ResponseEntity<String> handleRequest() {        // 尝试获取许可证,立即失败        if (!rateLimiter.tryAcquire()) {            return ResponseEntity.status(429)                .body("系统繁忙,请稍后重试");        }        try {            // 处理业务            return ResponseEntity.ok(processBusiness());        } finally {            // 释放许可证            rateLimiter.release();        }    }    // 动态调整限流阈值    private final Semaphore dynamicLimiter = new Semaphore(100);    @Scheduled(fixedDelay = 5000)    public void adjustLimit() {        int currentLoad = getCurrentSystemLoad();        if (currentLoad > 80) {            // 系统负载高,降低阈值            int current = dynamicLimiter.availablePermits();            if (current > 50) {                dynamicLimiter.drainPermits(); // 清空多余的                // 重新设置50个许可证(需要重新创建Semaphore)                dynamicLimiter = new Semaphore(50);            }        }    }}

场景3:多资源申请(哲学家就餐问题)

public class DiningPhilosophers {    // 5个筷子,每个筷子是一个信号量    private static final Semaphore[] chopsticks = new Semaphore[5];    static {        for (int i = 0; i < 5; i++) {            chopsticks[i] = new Semaphore(1); // 每个筷子只能被一个人使用        }    }    // 同时拿起筷子的限制(避免死锁)    private static final Semaphore eatLimit = new Semaphore(4); // 最多4人同时吃    static class Philosopher implements Runnable {        private final int id;        private final Semaphore leftChopstick;        private final Semaphore rightChopstick;        public Philosopher(int id) {            this.id = id;            this.leftChopstick = chopsticks[id];            this.rightChopstick = chopsticks[(id + 1) % 5];        }        @Override        public void run() {            try {                while (true) {                    think();                    eat();                }            } catch (InterruptedException e) {                Thread.currentThread().interrupt();            }        }        private void think() throws InterruptedException {            System.out.println("哲学家 " + id + " 在思考");            Thread.sleep((long) (Math.random() * 1000));        }        private void eat() throws InterruptedException {            // 使用eatLimit限制同时吃饭的人数            eatLimit.acquire();            try {                // 拿起左边筷子                leftChopstick.acquire();                System.out.println("哲学家 " + id + " 拿起左边筷子");                // 尝试拿起右边筷子(带超时)                if (rightChopstick.tryAcquire(1, TimeUnit.SECONDS)) {                    try {                        System.out.println("哲学家 " + id + " 拿起右边筷子,开始吃饭");                        Thread.sleep((long) (Math.random() * 1000));                    } finally {                        rightChopstick.release();                        System.out.println("哲学家 " + id + " 放下右边筷子");                    }                } else {                    // 拿不到右边筷子,放弃                    System.out.println("哲学家 " + id + " 拿不到右边筷子,放弃");                }            } finally {                leftChopstick.release();                System.out.println("哲学家 " + id + " 放下左边筷子");                eatLimit.release();            }        }    }}

📊 与限流器对比

特性
Semaphore
RateLimiter(Guava)
滑动窗口
漏桶/令牌桶
控制维度
并发数
速率(QPS)
速率
速率+并发
精度
线程级
时间级
时间级
时间级
突发流量
不支持
支持
不支持
支持
等待队列
适用场景
资源池、并发控制
API限流
统计
流量整形

⚠️ 常见陷阱与最佳实践

1. 正确释放许可证

// ❌ 错误:忘记释放public void badExample() {    try {        semaphore.acquire();        // 业务逻辑        // 忘记释放,导致许可证泄漏    } catch (InterruptedException e) {        Thread.currentThread().interrupt();    }}// ✅ 正确:finally确保释放public void goodExample() {    boolean acquired = false;    try {        semaphore.acquire();        acquired = true;        // 业务逻辑    } catch (InterruptedException e) {        Thread.currentThread().interrupt();    } finally {        if (acquired) {            semaphore.release();        }    }}// ✅ 更简洁的写法(Java 7+ try-with-resource)public void tryWithResource() throws InterruptedException {    try (SemaphoreResource resource = new SemaphoreResource(semaphore)) {        // 业务逻辑,自动释放    }}// 自定义AutoCloseable包装public class SemaphoreResource implements AutoCloseable {    private final Semaphore semaphore;    private final boolean acquired;    public SemaphoreResource(Semaphore semaphore) throws InterruptedException {        this.semaphore = semaphore;        this.semaphore.acquire();        this.acquired = true;    }    @Override    public void close() {        if (acquired) {            semaphore.release();        }    }}

2. 避免死锁

// ❌ 可能死锁:A拿资源1等资源2,B拿资源2等资源1publicvoiddeadlockRisk() throws InterruptedException {    // 线程A    semaphore1.acquire();    semaphore2.acquire(); // 可能死锁    // 线程B    semaphore2.acquire();    semaphore1.acquire(); // 可能死锁}// ✅ 解决方案1:固定顺序获取publicvoidfixedOrder() throws InterruptedException {    // 总是先获取semaphore1,再获取semaphore2    semaphore1.acquire();    semaphore2.acquire();}// ✅ 解决方案2:使用tryAcquire带超时publicvoidsafeAcquire() throws InterruptedException {    semaphore1.acquire();    try {        if (!semaphore2.tryAcquire(1, TimeUnit.SECONDS)) {            // 获取失败,释放已获得的资源            semaphore1.release();            // 等待后重试或返回            return;        }    } catch (InterruptedException e) {        semaphore1.release();        throw e;    }}

3. 监控与告警

@Componentpublic class MonitoredSemaphore extends Semaphore {    private final String name;    private final AtomicInteger peakWaiters = new AtomicInteger(0);    private final AtomicLong totalAcquireTime = new AtomicLong(0);    public MonitoredSemaphore(String name, intpermits) {        super(permits);        this.name = name;        // 启动监控线程        startMonitor();    }    @Override    public void acquire() throws InterruptedException {        long start = System.nanoTime();        int beforeQueue = getQueueLength();        super.acquire();        long cost = System.nanoTime() - start;        totalAcquireTime.addAndGet(cost);        // 更新峰值等待数        int currentQueue = beforeQueue;        peakWaiters.updateAndGet(peak -> Math.max(peak, currentQueue));    }    private void startMonitor() {        ScheduledExecutorService scheduler =             Executors.newScheduledThreadPool(1);        scheduler.scheduleAtFixedRate(() -> {            int available = this.availablePermits();            int queueLength = this.getQueueLength();            int peak = peakWaiters.getAndSet(0);            long avgTime = totalAcquireTime.getAndSet(0) / 1_000_000// 转为毫秒            log.info("Semaphore[{}] 可用:{}, 等待:{}, 峰值等待:{}, 平均获取时间:{}ms",                name, available, queueLength, peak, avgTime);            // 告警阈值            if (queueLength > 100) {                alertService.sendAlert(name + "等待队列过长: " + queueLength);            }        }, 1010, TimeUnit.SECONDS);    }}

📈 性能测试

public class SemaphoreBenchmark {    private static final int THREADS = 100;    private static final int PERMITS = 20;    public static void main(String[] args) throws InterruptedException {        Semaphore fair = new Semaphore(PERMITS, true);        Semaphore unfair = new Semaphore(PERMITS, false);        System.out.println("公平模式测试:");        runTest(fair);        System.out.println("\n非公平模式测试:");        runTest(unfair);    }    private static void runTest(Semaphore semaphore)             throws InterruptedException {        CountDownLatch latch = new CountDownLatch(THREADS);        long start = System.currentTimeMillis();        for (int i = 0; i < THREADS; i++) {            new Thread(() -> {                try {                    semaphore.acquire();                    Thread.sleep(10); // 模拟业务                    semaphore.release();                } catch (InterruptedException e) {                    e.printStackTrace();                } finally {                    latch.countDown();                }            }).start();        }        latch.await();        long cost = System.currentTimeMillis() - start;        System.out.println("总耗时: " + cost + "ms");        System.out.println("平均每个请求: " + (cost / THREADS) + "ms");    }}

🎯 总结

维度
评价
说明
易用性
⭐⭐⭐⭐⭐
API简单,易于理解和使用
灵活性
⭐⭐⭐⭐
支持公平/非公平、超时、批量操作
性能
⭐⭐⭐⭐
非公平模式性能很高
功能
⭐⭐⭐
仅控制并发数,不控制速率
适用场景
连接池限流、资源保护、并发控制

最佳实践口诀

  • 获取释放要成对,finally里不能忘

  • 公平模式保顺序,非公平模式提性能

  • 多资源时要小心,固定顺序避死锁

  • 监控指标要跟上,异常告警早发现

Semaphore虽小,用好了能解决很多并发控制难题。记住:它控制的是”同时能有多少人”,而不是”每秒能来多少人”

本站文章均为手工撰写未经允许谢绝转载:夜雨聆风 » Semaphore:限流神器

猜你喜欢

  • 暂无文章