基于Redission实现分布式调度任务选主

笔记哥 / 04-15 / 9点赞 / 0评论 / 657阅读
在Spring Cloud微服务架构中,基于Redisson实现分布式调度任务的选主和心跳监听,可以通过以下完整方案实现。该方案结合了Redisson的分布式锁和发布/订阅功能,确保高可靠性和实时性: ### 一、整体架构设计 graph TD A[微服务实例1] -->|竞争锁| B(Redisson) C[微服务实例2] -->|竞争锁| B D[微服务实例3] -->|竞争锁| B B -->|心跳事件| E[Redis Pub/Sub] E --> A E --> C E --> D ### 二、核心实现步骤 #### 1. 添加依赖 ```csharp org.redisson redisson-spring-boot-starter 3.23.2 ``` #### 2. 选主服务实现 ```csharp import org.redisson.api.RLock; import org.redisson.api.RedissonClient; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; import java.util.concurrent.TimeUnit; @Service public class LeaderElectionService { private static final String LEADER_LOCK_KEY = "scheduler:leader:lock"; private static final String HEARTBEAT_CHANNEL = "scheduler:heartbeat"; @Autowired private RedissonClient redisson; private volatile boolean isLeader = false; private RLock leaderLock; private Thread heartbeatThread; @PostConstruct public void init() { leaderLock = redisson.getLock(LEADER_LOCK_KEY); startElection(); startHeartbeatListener(); } private void startElection() { // 尝试获取领导权(非阻塞式) new Thread(() -> { while (!Thread.currentThread().isInterrupted()) { try { // 尝试获取锁,锁过期时间30秒 boolean acquired = leaderLock.tryLock(0, 30, TimeUnit.SECONDS); if (acquired) { isLeader = true; System.out.println("当前节点当选为Leader"); startHeartbeatTask(); // 启动心跳任务 break; } Thread.sleep(5000); // 每5秒重试一次 } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } }).start(); } private void startHeartbeatTask() { heartbeatThread = new Thread(() -> { while (isLeader && !Thread.currentThread().isInterrupted()) { try { // 1. 续期锁(看门狗机制会自动处理) // 2. 发布心跳 redisson.getTopic(HEARTBEAT_CHANNEL) .publish(System.currentTimeMillis()); Thread.sleep(10000); // 每10秒发送一次心跳 } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } }); heartbeatThread.start(); } private void startHeartbeatListener() { // 监听Leader心跳 redisson.getTopic(HEARTBEAT_CHANNEL) .addListener(Long.class, (channel, heartbeatTime) -> { System.out.println("收到Leader心跳: " + heartbeatTime); // 可在此更新最后一次心跳时间 }); } @PreDestroy public void shutdown() { if (isLeader && leaderLock.isHeldByCurrentThread()) { leaderLock.unlock(); isLeader = false; if (heartbeatThread != null) { heartbeatThread.interrupt(); } } } public boolean isLeader() { return isLeader; } } ``` #### 3. 健康检查增强 ```csharp @Service public class HealthCheckService { @Autowired private RedissonClient redisson; private volatile long lastHeartbeatTime = 0; @PostConstruct public void init() { // 定时检查Leader状态 Executors.newSingleThreadScheduledExecutor() .scheduleAtFixedRate(this::checkLeaderStatus, 0, 5, TimeUnit.SECONDS); } private void checkLeaderStatus() { Long currentTime = redisson.getBucket("scheduler:leader:heartbeat").get(); if (currentTime != null) { lastHeartbeatTime = currentTime; } // 超过30秒未收到心跳认为Leader失效 if (System.currentTimeMillis() - lastHeartbeatTime > 30000) { System.out.println("Leader可能已宕机,触发重新选举"); // 可在此触发主动抢锁逻辑 } } } ``` ### 三、关键优化点 #### 1. 多级故障检测 | 检测方式 | 触发条件 | 恢复动作 | | --- | --- | --- | | Redisson看门狗超时 | 锁续期失败(默认30秒) | 自动释放锁,其他节点可竞争 | | 主动心跳超时 | 自定义阈值(如30秒) | 强制释放锁并重新选举 | | Redis连接断开 | ConnectionState.LOST | 暂停选举直到连接恢复 | #### 2. 选举性能优化配置 ```csharp # application.yml redisson: lock: watchdog-timeout: 30000 # 看门狗超时时间(ms) threads: 16 # 事件处理线程数 netty-threads: 32 # Netty工作线程数 ``` #### 3. 脑裂防护方案 ```csharp // 使用Redisson的MultiLock实现多Redis节点锁 RLock lock1 = redissonClient1.getLock(LEADER_LOCK_KEY); RLock lock2 = redissonClient2.getLock(LEADER_LOCK_KEY); RLock multiLock = redisson.getMultiLock(lock1, lock2); boolean acquired = multiLock.tryLock(0, 30, TimeUnit.SECONDS); ``` ### 四、生产环境部署建议 #### 1. Redis架构选择 | 部署模式 | 适用场景 | 建议配置 | | --- | --- | --- | | 哨兵模式 | 高可用要求高 | 3哨兵+3Redis实例 | | Cluster模式 | 大数据量+高性能 | 至少6节点(3主3从) | | 单节点 | 仅开发测试 | 不推荐生产使用 | #### 2. 监控指标 ```csharp // 暴露Redisson指标(配合Spring Boot Actuator) @Bean public RedissonMetricsBinder redissonMetrics(RedissonClient redisson) { return new RedissonMetricsBinder(redisson); } ``` 监控关键指标: - `redisson.executor.active_threads`:活跃线程数 - `redisson.pubsub.subscriptions`:订阅数量 - `redisson.connections.active`:活跃连接数 #### 3. 灾备方案 - **双活数据中心**:通过`RedissonClient`配置多区域端点 ```csharp Config config = new Config(); config.useClusterServers() .addNodeAddress("redis://dc1-node1:6379") .addNodeAddress("redis://dc2-node1:6379"); ``` - **降级策略**:本地缓存最后已知状态 ```csharp @Bean @Primary public LeaderService fallbackLeaderService() { return new FallbackLeaderService(redisLeaderService, localCache); } ``` ### 五、与Spring Cloud集成 #### 1. 调度任务控制 ```csharp @Scheduled(fixedRate = 5000) public void scheduledTask() { if (leaderElectionService.isLeader()) { // 只有Leader执行的逻辑 processBatchData(); } } ``` #### 2. 动态配置更新 ```csharp @RefreshScope @RestController @RequestMapping("/leader") public class LeaderController { @Value("${election.timeout:30000}") private long electionTimeout; @Autowired private LeaderElectionService electionService; @PostMapping("/timeout") public void updateTimeout(@RequestParam long timeout) { // 动态调整选举超时 electionService.setElectionTimeout(timeout); } } ``` ### 六、方案优势总结 1. **亚秒级故障检测**:通过Redis Pub/Sub实现实时通知 2. **自动故障转移**:Redisson看门狗机制保障锁释放 3. **弹性扩展**:支持动态增减微服务实例 4. **最小依赖**:仅需Redis集群,无需额外组件 5. **与Spring生态无缝集成**:完美配合`@Scheduled`等组件