• Java并发编程指南:如何正确使用信号量和线程池熔断机制


    前言:

    在分布式系统中,选择合适的熔断机制是保护系统免受故障影响的关键。本文将介绍使用信号量和线程池两种常见的熔断机制,并提供Java和Spring Cloud Alibaba框架下的示例代码,帮助您深入理解和应用。

    1. 信号量熔断机制

    信号量熔断机制基于并发请求的数量进行熔断,可限制系统的并发访问量。它适用于资源有限且对请求响应时间要求较高的场景。下面是使用Java和Spring Cloud Alibaba框架实现信号量熔断机制的示例代码:

    配置类

    import org.springframework.cloud.circuitbreaker.resilience4j.Resilience4JCircuitBreakerFactory;
    import io.github.resilience4j.circuitbreaker.CircuitBreakerConfig;
    
    @Configuration
    public class CircuitBreakerConfiguration {
        @Bean
        public Customizer<Resilience4JCircuitBreakerFactory> defaultCircuitBreakerCustomizer() {
            return factory -> {
                factory.configureDefault(configuration -> {
                    configuration.circuitBreakerConfig(CircuitBreakerConfig.custom()
                            .slidingWindowSize(10)
                            .failureRateThreshold(50)
                            .waitDurationInOpenState(Duration.ofMillis(1000))
                            .build());
                });
            };
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    业务类

    
    @Service
    public class MyService {
        private final CircuitBreaker circuitBreaker;
    
        public MyService(CircuitBreakerFactory circuitBreakerFactory) {
            this.circuitBreaker = circuitBreakerFactory.create("myService");
        }
    
        @CircuitBreakerFallback
        public String myMethod() {
            // 执行业务逻辑
            // ...
        }
    
        public String myMethodWithCircuitBreaker() {
            return this.circuitBreaker.run(this::myMethod, throwable -> {
                // 熔断发生时的处理逻辑,发送警报通知,拓展记录日志等
                sendAlertNotification();
    
                // 执行回退逻辑,抛出自定义异常
                throw new CustomFallbackException("服务熔断");
            });
        }
    
        private void sendAlertNotification() {
            // 发送警报通知的逻辑
            // ...
        }
    
        private static class CustomFallbackException extends RuntimeException {
            public CustomFallbackException(String message) {
                super(message);
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36

    在上述代码中,我们使用Resilience4J库实现了信号量熔断机制。通过CircuitBreakerConfig对象的配置,我们设定了熔断的阈值、失败率和等待时间等参数。在MyServicemyMethodWithCircuitBreaker方法中,我们使用CircuitBreaker来调用myMethod方法,当发生熔断时,会执行处理逻辑。处理逻辑示例中,我们通过sendAlertNotification方法发送了警报通知。回退逻辑示例中,我们抛出了一个自定义的异常,以表明熔断发生。

    2. 线程池熔断机制

    线程池熔断机制基于线程数来进行熔断,可控制系统中的线程数量,保证系统的稳定性。它适用于请求处理时间较长且需要灵活配置的场景。以下是使用Java和Spring Cloud Alibaba框架实现线程池熔断机制的示例代码:

    import java.util.concurrent.*;
    import com.alibaba.csp.sentinel.annotation.SentinelResource;
    import com.alibaba.csp.sentinel.slots.block.BlockException;
    
    @Service
    public class MyService {
        private final ExecutorService executorService;
        private final Semaphore semaphore;
    
        public MyService() {
            // 创建线程池
            this.executorService = new ThreadPoolExecutor(
                10, 10,
                0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<>(100),
                Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.AbortPolicy()
            );
            
            this.semaphore = new Semaphore(10); // 设置信号量的限制为10
        }
    
        @SentinelResource(value = "myService", blockHandler = "handleBlockMethod")
        public String myMethod() {
            // 执行业务逻辑
            // ...
        }
    
        public String handleBlockMethod(BlockException ex) {
            // 熔断发生时的处理逻辑
            // ...
            return "Fallback";
        }
        
        public String myMethodWithCircuitBreaker() throws InterruptedException, ExecutionException {
            Future<String> future = executorService.submit(() -> {
                semaphore.acquire(); // 获取信号量许可
                try {
                    return myMethod(); // 执行业务逻辑
                } finally {
                    semaphore.release(); // 释放信号量许可
                }
            });
            
            try {
                return future.get(); // 等待业务方法执行结果
            } catch (InterruptedException | ExecutionException e) {
                // 熔断发生,执行回退逻辑
                return handleBlockMethod(e);
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52

    在上述示例中,我们使用ThreadPoolExecutor来创建一个线程池,并设置相关参数,如核心线程数、最大线程数和线程空闲时间。在方法myMethodWithCircuitBreaker中,我们使用线程池来执行被保护的方法myMethod,并使用Semaphore来限制并发访问量,即限制同时执行的线程数量。在方法中,通过Semaphoreacquirerelease方法获取和释放信号量许可。如果发生熔断,我们执行处理逻辑handleBlockMethod作为回退逻辑。

    总结

    本文讲解了信号量熔断机制和线程池熔断机制,并给出了Java和Spring Cloud Alibaba框架下的示例代码。选择适合的熔断机制要考虑资源限制、请求处理时间和容错能力等因素。如果你的场景资源有限且需要快速响应请求,那信号量就是个不错的选择。如果你的任务处理时间较长,需要更灵活的配置,那线程池会更适合。要根据实际情况综合考虑,做出最佳的选择。

  • 相关阅读:
    SSR 实战:官网开发指南
    一键将CSDN博客文章如何转为Markdown
    React 中 keys 的作用是什么?
    打开word文档报错,提示HRESULT 0x80004005 位置: 部分: /word/comments.xml,行: 0,列: 0
    uniapp条件判断app,H5,微信小程序端
    【PAT甲级 - C++题解】1062 Talent and Virtue
    Hexagon_V65_Programmers_Reference_Manual(45)
    93. 复原 IP 地址
    MySQL配置
    【Express】服务端渲染(模板引擎 EJS)
  • 原文地址:https://blog.csdn.net/Da_zhenzai/article/details/134088722