博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Alibaba Canal Manager Model 配置管理实现
阅读量:5924 次
发布时间:2019-06-19

本文共 9751 字,大约阅读时间需要 32 分钟。

Alibaba Canal Manager Model 配置管理实现

Alibaba Canal 用于增量订阅消费 mysql 数据库 binlog 日志,详细介绍请见 。

其中 Server 端配置有两种管理方式: Spring 和 Manager。其中 Spring 方式是基于spring xml + properties 进行定义构建 spring 配置, Manager 方式则可以对接 Web console/manager 系统。本文主要记录一下 Manager 方式的对接逻辑,源码在 canal-deployer 模块,相对比较简单。

源码入口

版本:canal-1.0.24

查看 Canal Server 端的脚本 ./bin/startup.sh,可以找到启动入口类是 CanalLauncher。该类 main 方法首先加载了配置文件到内存用于启动参数,./conf/canal.properties 文件。将参数传递给 final 类 CanalController,所以我们主要查看 CanalController 类。

配置 Manager 方式

要使用 Manager 方式,需要对启动参数进行设置。由前文可知启动时会先读取 canal.properties 文件,所以先需要在该文件增加以下配置

# 配置方式canal.instance.global.mode=manager# 是否开启自动扫描canal.auto.scan=true# 自动扫描间隔,单位秒canal.auto.scan.interval=5# 全局的manager配置方式的链接信息,用于标识该 Servercanal.instance.global.manager.address = 127.0.0.1:1099

这里需要简单说明几个概念

Server: 表示由 ./bin/startup.sh 脚本启动的程序,即一个 JVM;
Instance: 对应一个 Mysql 实例,代码为 CanalInstance 接口;
Destination: 字符串类型,对应一个 Instance;

CanalServerWithEmbedded 类: 连接 mysql master,管理多个 CanalInstance;

CanalServerWithNetty 类: 基于 netty 网络服务的 server 实现,用于与 Client 通讯;
CanalConfigClient 类: 存放配置相关信息;
CanalInstanceGenerator 接口: canal 实例生产者,根据 destination 以及 InstanceConfig 生产 CanalInstance;

其 Server 架构如下图

Server
启动一个 Server,里面可有多个 Instance,一个 Instance 读取一个 Mysql 实例的binlog 日志,Destination 则是对一个 Instance 实例的描述字符串,在该 Server 中唯一。

Manager 实现

Canal-Server 配置加载方式

加载方式

配置初始化

查看 CanalController 类

在其构造 函数可以看到 CanalInstanceGenerator 实例作为 CanalServerWithEmbedded 实例的组件。

public CanalController(final Properties properties) {        managerClients = MigrateMap.makeComputingMap(new Function
() { public CanalConfigClient apply(String managerAddress) { return getManagerClient(managerAddress); } }); // 初始化全局参数设置 globalInstanceConfig = initGlobalConfig(properties); instanceConfigs = new MapMaker().makeMap(); // 初始化instance config initInstanceConfig(properties); // 准备canal server cid = Long.valueOf(getProperty(properties, CanalConstants.CANAL_ID)); ip = getProperty(properties, CanalConstants.CANAL_IP); port = Integer.valueOf(getProperty(properties, CanalConstants.CANAL_PORT)); embededCanalServer = CanalServerWithEmbedded.instance(); embededCanalServer.setCanalInstanceGenerator(instanceGenerator);// 设置自定义的instanceGenerator canalServer = CanalServerWithNetty.instance(); canalServer.setIp(ip); canalServer.setPort(port); ......}

查看 CanalInstanceGenerator 实例的具体实现,在初始化全局配置 initGlobalConfig 方法中

instanceGenerator = new CanalInstanceGenerator() {    public CanalInstance generate(String destination) {        InstanceConfig config = instanceConfigs.get(destination);        if (config == null) {            throw new CanalServerException("can't find destination:{}");        }        logger.info("CanalInstanceGenerator generate mode[{}]", config.getMode());        if (config.getMode().isManager()) {            ManagerCanalInstanceGenerator instanceGenerator = new ManagerCanalInstanceGenerator();            instanceGenerator.setCanalConfigClient(managerClients.get(config.getManagerAddress()));            return instanceGenerator.generate(destination);        } else if (config.getMode().isSpring()) {            SpringCanalInstanceGenerator instanceGenerator = new SpringCanalInstanceGenerator();            synchronized (this) {                try {                    // 设置当前正在加载的通道,加载spring查找文件时会用到该变量                    System.setProperty(CanalConstants.CANAL_DESTINATION_PROPERTY, destination);                    instanceGenerator.setBeanFactory(getBeanFactory(config.getSpringXml()));                    return instanceGenerator.generate(destination);                } catch (Throwable e) {                    logger.error("generator instance failed.", e);                    throw new CanalException(e);                } finally {                    System.setProperty(CanalConstants.CANAL_DESTINATION_PROPERTY, "");                }            }        } else {            throw new UnsupportedOperationException("unknow mode :" + config.getMode());        }    }};

可以看到 Manger 方式的配置主要是 CanalConfigClient,从 managerClients 集合中获取。该集合实现主要在构造器中,调用 getManagerClient 方法。所以我们主要通过该方法,构造 CanalConfigClient 实例返回即可。查看 CanalConfigClient 类主要有两个方法 findCanal, findFilter,可以确定 CanalConfigClient 表示整个 Server 的配置。getManagerClient 方法参数 managerAddress 则是在配置文件 canal.properties 中 destination 对应的 ip 地址,若没有配置则使用本机ip。这里比较奇怪的是,既然是通过 destination 获取的 ip,然后构建 CanalConfigClient 实例,为啥不让 CanalConfigClient 实例表示一个 destination 的配置而却是整个 Sever 的配置。

实现 CanalConfigClient 类,主要是一些参数的配置(如 Mysql 连接用户名、密码等), Canal 和字符串 Filter 的构建。Canal 实例的构建可以参考一下 CanalInstanceWithManager 类所用到的一些参数。

Canal canal = new Canal();        CanalParameter canalParameter = new CanalParameter();        canalParameter.setSlaveId(dc.getSlaveId());        canalParameter.setDbUsername(dc.getUsername());        canalParameter.setDbPassword(dc.getPassword());        canalParameter.setIndexMode(CanalParameter.IndexMode.MEMORY);        List
dbAddresses = new ArrayList<>(); dbAddresses.add(new InetSocketAddress(dc.getHost(), dc.getPort())); canalParameter.setDbAddresses(dbAddresses); canal.setCanalParameter(canalParameter); canal.setName(destination);

到此可以说已经完成 Manger 方式配置的初始化,那如何更新配置呢,主要是用到了 ManagerInstanceConfigMonitor 类,该类在 CanalController 构造器中初始化。

配置刷新

CanalContoller 构造器中

......    // 初始化monitor机制        autoScan = BooleanUtils.toBoolean(getProperty(properties, CanalConstants.CANAL_AUTO_SCAN));        if (autoScan) {            defaultAction = new InstanceAction() {                public void start(String destination) {                    InstanceConfig config = instanceConfigs.get(destination);                    if (config == null) {                        // 重新读取一下instance config                        config = parseInstanceConfig(properties, destination);                        instanceConfigs.put(destination, config);                    }                    if (!embededCanalServer.isStart(destination)) {                        // HA机制启动                        ServerRunningMonitor runningMonitor = ServerRunningMonitors.getRunningMonitor(destination);                        if (!config.getLazy() && !runningMonitor.isStart()) {                            runningMonitor.start();                        }                    }                }                public void stop(String destination) {                    // 此处的stop,代表强制退出,非HA机制,所以需要退出HA的monitor和配置信息                    InstanceConfig config = instanceConfigs.remove(destination);                    if (config != null) {                        embededCanalServer.stop(destination);                        ServerRunningMonitor runningMonitor = ServerRunningMonitors.getRunningMonitor(destination);                        if (runningMonitor.isStart()) {                            runningMonitor.stop();                        }                    }                }                public void reload(String destination) {                    // 目前任何配置变化,直接重启,简单处理                    stop(destination);                    start(destination);                }            };            instanceConfigMonitors = MigrateMap.makeComputingMap(new Function
() { public InstanceConfigMonitor apply(InstanceMode mode) { int scanInterval = Integer.valueOf(getProperty(properties, CanalConstants.CANAL_AUTO_SCAN_INTERVAL)); if (mode.isSpring()) { SpringInstanceConfigMonitor monitor = new SpringInstanceConfigMonitor(); monitor.setScanIntervalInSecond(scanInterval); monitor.setDefaultAction(defaultAction); // 设置conf目录,默认是user.dir + conf目录组成 String rootDir = getProperty(properties, CanalConstants.CANAL_CONF_DIR); if (StringUtils.isEmpty(rootDir)) { rootDir = "../conf"; } if (StringUtils.equals("otter-canal", System.getProperty("appName"))) { monitor.setRootConf(rootDir); } else { // eclipse debug模式 monitor.setRootConf("src/main/resources/"); } return monitor; } else if (mode.isManager()) { // 配置更新,实现 ManagerInstanceConfigMonitor 参考 SpringInstanceConfigMonitor, 使用上面的 defaultAction 即可 ManagerInstanceConfigMonitor monitor = new ManagerInstanceConfigMonitor(); monitor.setScanIntervalInSecond(scanInterval); monitor.setDefaultAction(defaultAction); monitor.setIp(ip); return monitor; } else { throw new UnsupportedOperationException("unknow mode :" + mode + " for monitor"); } } }); }.....

查看 ManagerInstanceConfigMonitor 类,主要是实现 scan 方法,若配置有更新的话,回调 InstanceAction。这里还有个地方比较奇怪,可以看到 InstanceAction 回调方法参数是字符串 destination?我们在 scan 方法中调用后端服务器接口,已经获取到新的配置,却只能回调字符串 destination,然后在 CanalController.getManagerClient 方法根据 destination 再去调服务器获取具体配置?这里多了一次调用服务器接口,感觉比较奇怪。

总结

Manager 模式实现步骤

1. 在 canal.properties 配置文件设置 `canal.instance.global.mode=manager`等;2. 在 CanalController 类构建 CanalConfigClient 实例,根据 ManagerAddress 从 managerClients 获取 CanalConfigClient (根据destination 获取 Canal, Filter);3. 实现 ManagerInstanceConfigMonitor,启用定时线程刷新配置,使用 InstanceAction 当作回调与 CanalServerWithEmbedded 通信,
配置的过滤 Filter 传递到 EventParser 中的 BinlogParser (实现类 LogEventConvert),所以感觉这里的 Filter 没什么意义,倒不如让 Client 消费所有数据,然后下发再做过滤。

转载地址:http://pvovx.baihongyu.com/

你可能感兴趣的文章
网游的服务器瓶颈
查看>>
win32的一个售票程序,收获有非常的多
查看>>
erlang 编译之 to_core
查看>>
做移动互联网App,你的测试用例足够吗?
查看>>
Perl的第二纪
查看>>
在Android应用中使用Pull解析XML文件(传智播客视频笔记)
查看>>
常用Jsp命令
查看>>
堆和栈概念整理
查看>>
cmd.exe启动参数说明
查看>>
在PowerDesigner中设计物理模型1——表和主外键
查看>>
Win10系统修改MAC地址
查看>>
Radius 中 与Response Authernticator 与 Message-Authenticator的计算
查看>>
phpstrom 编辑器设置
查看>>
JavaScript中“javascript:void(0) ”是什么意思
查看>>
重要的ui组件——Behavior
查看>>
flash文件制作笔记
查看>>
Java工程转换为Maven工程-b
查看>>
inode
查看>>
最近对latin-1这个字符集产生了不少好感
查看>>
JS 无法清除Cookie的解决方法
查看>>