⭐⭐⭐ Spring Boot 项目实战 ⭐⭐⭐ Spring Cloud 项目实战
《Dubbo 实现原理与源码解析 —— 精品合集》 《Netty 实现原理与源码解析 —— 精品合集》
《Spring 实现原理与源码解析 —— 精品合集》 《MyBatis 实现原理与源码解析 —— 精品合集》
《Spring MVC 实现原理与源码解析 —— 精品合集》 《数据库实体设计合集》
《Spring Boot 实现原理与源码解析 —— 精品合集》 《Java 面试题 + Java 学习指南》

摘要: 原创出处 http://www.iocoder.cn/Elastic-Job/reg-center-zookeeper-listener/ 「芋道源码」欢迎转载,保留摘要,谢谢!

本文基于 Elastic-Job V2.1.5 版本分享


🙂🙂🙂关注**微信公众号:【芋道源码】**有福利:

  1. RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表
  2. RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址
  3. 您对于源码的疑问每条留言将得到认真回复。甚至不知道如何读源码也可以请教噢
  4. 新的源码解析文章实时收到通知。每周更新一篇左右
  5. 认真的源码交流微信群。

1. 概述

本文主要分享 Elastic-Job-Lite 注册中心监听器

建议前置阅读:

涉及到主要类的类图如下( 打开大图 ):

你行好事会因为得到赞赏而愉悦
同理,开源项目贡献者会因为 Star 而更加有动力
为 Elastic-Job 点赞!传送门

2. ListenerManager

ListenerManager,作业注册中心的监听器管理者。管理者两类组件:

  • 监听管理器
  • 注册中心连接状态监听器

其中监听管理器管理着自己的作业注册中心监听器。

一起从代码层面看看:

public final class ListenerManager {

private final JobNodeStorage jobNodeStorage;

private final ElectionListenerManager electionListenerManager;

private final ShardingListenerManager shardingListenerManager;

private final FailoverListenerManager failoverListenerManager;

private final MonitorExecutionListenerManager monitorExecutionListenerManager;

private final ShutdownListenerManager shutdownListenerManager;

private final TriggerListenerManager triggerListenerManager;

private final RescheduleListenerManager rescheduleListenerManager;

private final GuaranteeListenerManager guaranteeListenerManager;

private final RegistryCenterConnectionStateListener regCenterConnectionStateListener;
}

  • 第一类:electionListenerManager / shardingListenerManager / failoverListenerManager / MonitorExecutionListenerManager / shutdownListenerManager / triggerListenerManager / rescheduleListenerManager / guaranteeListenerManager 是不同服务的监听管理器,都继承作业注册中心的监听器管理者的抽象类( AbstractListenerManager )。我们以下一篇文章会涉及到的分片监听管理器( ShardingListenerManager ) 来瞅瞅内部整体实现:

    public final class ShardingListenerManager extends AbstractListenerManager {
    @Override
    public void start() {
    addDataListener(new ShardingTotalCountChangedJobListener());
    addDataListener(new ListenServersChangedJobListener());
    }

    class ShardingTotalCountChangedJobListener extends AbstractJobListener {
    // .... 省略方法
    }

    class ListenServersChangedJobListener extends AbstractJobListener {
    // .... 省略方法
    }
    }

    • ShardingListenerManager 内部管理了 ShardingTotalCountChangedJobListener / ListenServersChangedJobListener 两个作业注册中心监听器。具体作业注册中心监听器是什么,有什么用途,下文会详细解析。
  • 第二类:regCenterConnectionStateListener 是注册中心连接状态监听器。下文也会详细解析。

《Elastic-Job-Lite 源码分析 —— 作业初始化》「3.2.4」注册作业启动信息,我们看到作业初始化时,会开启所有注册中心监听器:

// SchedulerFacade.java
/**
* 注册作业启动信息.
*
* @param enabled 作业是否启用
*/
public void registerStartUpInfo(final boolean enabled) {
// 开启 所有监听器
listenerManager.startAllListeners();
// .... 省略方法
}

// ListenerManager.java
/**
* 开启所有监听器.
*/
public void startAllListeners() {
// 开启 不同服务监听管理器
electionListenerManager.start();
shardingListenerManager.start();
failoverListenerManager.start();
monitorExecutionListenerManager.start();
shutdownListenerManager.start();
triggerListenerManager.start();
rescheduleListenerManager.start();
guaranteeListenerManager.start();
// 开启 注册中心连接状态监听器
jobNodeStorage.addConnectionStateListener(regCenterConnectionStateListener);
}

3. AbstractListenerManager

AbstractListenerManager,作业注册中心的监听器管理者的抽象类

public abstract class AbstractListenerManager {

private final JobNodeStorage jobNodeStorage;

protected AbstractListenerManager(final CoordinatorRegistryCenter regCenter, final String jobName) {
jobNodeStorage = new JobNodeStorage(regCenter, jobName);
}

/**
* 开启监听器.
*/
public abstract void start();

/**
* 添加注册中心监听器
*
* @param listener 注册中心监听器
*/
protected void addDataListener(final TreeCacheListener listener) {
jobNodeStorage.addDataListener(listener);
}
}

  • #addDataListener(),将作业注册中心的监听器添加到注册中心 TreeCache 的监听者里。JobNodeStorage#addDataListener(...)《Elastic-Job-Lite 源码分析 —— 作业初始化》「2.2」缓存已经详细解析。

  • 子类实现 #start() 方法实现监听器初始化。目前所有子类的实现都是将自己管理的注册中心监听器调用 #addDataListener(...),还是以 ShardingListenerManager 举例子:

    public final class ShardingListenerManager extends AbstractListenerManager {

    @Override
    public void start() {
    addDataListener(new ShardingTotalCountChangedJobListener());
    addDataListener(new ListenServersChangedJobListener());
    }

    }

4. AbstractJobListener

AbstractJobListener,作业注册中心的监听器抽象类

public abstract class AbstractJobListener implements TreeCacheListener {

@Override
public final void childEvent(final CuratorFramework client, final TreeCacheEvent event) throws Exception {
ChildData childData = event.getData();
// 忽略掉非数据变化的事件,例如 event.type 为 CONNECTION_SUSPENDED、CONNECTION_RECONNECTED、CONNECTION_LOST、INITIALIZED 事件
if (null == childData) {
return;
}
String path = childData.getPath();
if (path.isEmpty()) {
return;
}
dataChanged(path, event.getType(), null == childData.getData() ? "" : new String(childData.getData(), Charsets.UTF_8));
}

/**
* 节点数据变化
*
* @param path 节点路径
* @param eventType 事件类型
* @param data 数据
*/
protected abstract void dataChanged(final String path, final Type eventType, final String data);
}

  • 作业注册中心的监听器实现类实现 #dataChanged(...),对节点数据变化进行处理。
  • #childEvent(...) 屏蔽掉非节点数据变化事件,例如:CONNECTION_SUSPENDED、CONNECTION_RECONNECTED、CONNECTION_LOST、INITIALIZED 事件,只处理 NODE_ADDED、NODE_UPDATED、NODE_REMOVED 事件。

我们再拿 ShardingListenerManager 举例子:

public final class ShardingListenerManager extends AbstractListenerManager {

class ShardingTotalCountChangedJobListener extends AbstractJobListener {

@Override
protected void dataChanged(final String path, final Type eventType, final String data) {
if (configNode.isConfigPath(path) && 0 != JobRegistry.getInstance().getCurrentShardingTotalCount(jobName)) {
int newShardingTotalCount = LiteJobConfigurationGsonFactory.fromJson(data).getTypeConfig().getCoreConfig().getShardingTotalCount();
if (newShardingTotalCount != JobRegistry.getInstance().getCurrentShardingTotalCount(jobName)) {
shardingService.setReshardingFlag();
JobRegistry.getInstance().setCurrentShardingTotalCount(jobName, newShardingTotalCount);
}
}
}
}

class ListenServersChangedJobListener extends AbstractJobListener {

@Override
protected void dataChanged(final String path, final Type eventType, final String data) {
if (!JobRegistry.getInstance().isShutdown(jobName) && (isInstanceChange(eventType, path) || isServerChange(path))) {
shardingService.setReshardingFlag();
}
}

private boolean isInstanceChange(final Type eventType, final String path) {
return instanceNode.isInstancePath(path) && Type.NODE_UPDATED != eventType;
}

private boolean isServerChange(final String path) {
return serverNode.isServerPath(path);
}
}

}

5. RegistryCenterConnectionStateListener

RegistryCenterConnectionStateListener,实现 Curator ConnectionStateListener 接口,注册中心连接状态监听器。

public final class RegistryCenterConnectionStateListener implements ConnectionStateListener {

@Override
public void stateChanged(final CuratorFramework client, final ConnectionState newState) {
if (JobRegistry.getInstance().isShutdown(jobName)) {
return;
}
JobScheduleController jobScheduleController = JobRegistry.getInstance().getJobScheduleController(jobName);
if (ConnectionState.SUSPENDED == newState || ConnectionState.LOST == newState) { // Zookeeper 连接终端 或 连接丢失
// 暂停作业调度
jobScheduleController.pauseJob();
} else if (ConnectionState.RECONNECTED == newState) { // Zookeeper 重新连上
// 持久化作业服务器上线信息
serverService.persistOnline(serverService.isEnableServer(JobRegistry.getInstance().getJobInstance(jobName).getIp()));
// 持久化作业运行实例上线相关信息
instanceService.persistOnline();
// 清除本地分配的作业分片项运行中的标记
executionService.clearRunningInfo(shardingService.getLocalShardingItems());
// 恢复作业调度
jobScheduleController.resumeJob();
}
}

}

  • 当注册中心连接 SUSPENDED 或 LOST 时,暂停本地作业调度:

    // JobScheduleController.java
    public synchronized void pauseJob() {
    try {
    if (!scheduler.isShutdown()) {
    scheduler.pauseAll();
    }
    } catch (final SchedulerException ex) {
    throw new JobSystemException(ex);
    }
    }

  • 当注册中心重新连接成功( RECONNECTED ),恢复本地作业调度:

    /**
    * 恢复作业.
    */
    public synchronized void resumeJob() {
    try {
    if (!scheduler.isShutdown()) {
    scheduler.resumeAll();
    }
    } catch (final SchedulerException ex) {
    throw new JobSystemException(ex);
    }
    }

666. 彩蛋

知识星球

旁白君:芋道君,你又水更了!
芋道君:是是是,是是是!

道友,赶紧上车,分享一波朋友圈!

文章目录
  1. 1. 1. 概述
  2. 2. 2. ListenerManager
  3. 3. 3. AbstractListenerManager
  4. 4. 4. AbstractJobListener
  5. 5. 5. RegistryCenterConnectionStateListener
  6. 6. 666. 彩蛋