本文共 9751 字,大约阅读时间需要 32 分钟。
Alibaba Canal 用于增量订阅消费 mysql 数据库 binlog 日志,详细介绍请见 。
其中 Server 端配置有两种管理方式: Spring 和 Manager。其中 Spring 方式是基于spring xml + properties 进行定义构建 spring 配置, Manager 方式则可以对接 Web console/manager 系统。本文主要记录一下 Manager 方式的对接逻辑,源码在 canal-deployer 模块,相对比较简单。版本:canal-1.0.24
查看 Canal Server 端的脚本 ./bin/startup.sh,可以找到启动入口类是 CanalLauncher。该类 main 方法首先加载了配置文件到内存用于启动参数,./conf/canal.properties 文件。将参数传递给 final 类 CanalController,所以我们主要查看 CanalController 类。要使用 Manager 方式,需要对启动参数进行设置。由前文可知启动时会先读取 canal.properties 文件,所以先需要在该文件增加以下配置
# 配置方式canal.instance.global.mode=manager# 是否开启自动扫描canal.auto.scan=true# 自动扫描间隔,单位秒canal.auto.scan.interval=5# 全局的manager配置方式的链接信息,用于标识该 Servercanal.instance.global.manager.address = 127.0.0.1:1099
这里需要简单说明几个概念
Server: 表示由 ./bin/startup.sh 脚本启动的程序,即一个 JVM;Instance: 对应一个 Mysql 实例,代码为 CanalInstance 接口;Destination: 字符串类型,对应一个 Instance;CanalServerWithEmbedded 类: 连接 mysql master,管理多个 CanalInstance;
CanalServerWithNetty 类: 基于 netty 网络服务的 server 实现,用于与 Client 通讯;CanalConfigClient 类: 存放配置相关信息;CanalInstanceGenerator 接口: canal 实例生产者,根据 destination 以及 InstanceConfig 生产 CanalInstance;其 Server 架构如下图
启动一个 Server,里面可有多个 Instance,一个 Instance 读取一个 Mysql 实例的binlog 日志,Destination 则是对一个 Instance 实例的描述字符串,在该 Server 中唯一。Canal-Server 配置加载方式
查看 CanalController 类
在其构造 函数可以看到 CanalInstanceGenerator 实例作为 CanalServerWithEmbedded 实例的组件。public CanalController(final Properties properties) { managerClients = MigrateMap.makeComputingMap(new Function() { public CanalConfigClient apply(String managerAddress) { return getManagerClient(managerAddress); } }); // 初始化全局参数设置 globalInstanceConfig = initGlobalConfig(properties); instanceConfigs = new MapMaker().makeMap(); // 初始化instance config initInstanceConfig(properties); // 准备canal server cid = Long.valueOf(getProperty(properties, CanalConstants.CANAL_ID)); ip = getProperty(properties, CanalConstants.CANAL_IP); port = Integer.valueOf(getProperty(properties, CanalConstants.CANAL_PORT)); embededCanalServer = CanalServerWithEmbedded.instance(); embededCanalServer.setCanalInstanceGenerator(instanceGenerator);// 设置自定义的instanceGenerator canalServer = CanalServerWithNetty.instance(); canalServer.setIp(ip); canalServer.setPort(port); ......}
查看 CanalInstanceGenerator 实例的具体实现,在初始化全局配置 initGlobalConfig 方法中
instanceGenerator = new CanalInstanceGenerator() { public CanalInstance generate(String destination) { InstanceConfig config = instanceConfigs.get(destination); if (config == null) { throw new CanalServerException("can't find destination:{}"); } logger.info("CanalInstanceGenerator generate mode[{}]", config.getMode()); if (config.getMode().isManager()) { ManagerCanalInstanceGenerator instanceGenerator = new ManagerCanalInstanceGenerator(); instanceGenerator.setCanalConfigClient(managerClients.get(config.getManagerAddress())); return instanceGenerator.generate(destination); } else if (config.getMode().isSpring()) { SpringCanalInstanceGenerator instanceGenerator = new SpringCanalInstanceGenerator(); synchronized (this) { try { // 设置当前正在加载的通道,加载spring查找文件时会用到该变量 System.setProperty(CanalConstants.CANAL_DESTINATION_PROPERTY, destination); instanceGenerator.setBeanFactory(getBeanFactory(config.getSpringXml())); return instanceGenerator.generate(destination); } catch (Throwable e) { logger.error("generator instance failed.", e); throw new CanalException(e); } finally { System.setProperty(CanalConstants.CANAL_DESTINATION_PROPERTY, ""); } } } else { throw new UnsupportedOperationException("unknow mode :" + config.getMode()); } }};
可以看到 Manger 方式的配置主要是 CanalConfigClient,从 managerClients 集合中获取。该集合实现主要在构造器中,调用 getManagerClient 方法。所以我们主要通过该方法,构造 CanalConfigClient 实例返回即可。查看 CanalConfigClient 类主要有两个方法 findCanal, findFilter,可以确定 CanalConfigClient 表示整个 Server 的配置。getManagerClient 方法参数 managerAddress 则是在配置文件 canal.properties 中 destination 对应的 ip 地址,若没有配置则使用本机ip。这里比较奇怪的是,既然是通过 destination 获取的 ip,然后构建 CanalConfigClient 实例,为啥不让 CanalConfigClient 实例表示一个 destination 的配置而却是整个 Sever 的配置。
实现 CanalConfigClient 类,主要是一些参数的配置(如 Mysql 连接用户名、密码等), Canal 和字符串 Filter 的构建。Canal 实例的构建可以参考一下 CanalInstanceWithManager 类所用到的一些参数。
Canal canal = new Canal(); CanalParameter canalParameter = new CanalParameter(); canalParameter.setSlaveId(dc.getSlaveId()); canalParameter.setDbUsername(dc.getUsername()); canalParameter.setDbPassword(dc.getPassword()); canalParameter.setIndexMode(CanalParameter.IndexMode.MEMORY); ListdbAddresses = new ArrayList<>(); dbAddresses.add(new InetSocketAddress(dc.getHost(), dc.getPort())); canalParameter.setDbAddresses(dbAddresses); canal.setCanalParameter(canalParameter); canal.setName(destination);
到此可以说已经完成 Manger 方式配置的初始化,那如何更新配置呢,主要是用到了 ManagerInstanceConfigMonitor 类,该类在 CanalController 构造器中初始化。
CanalContoller 构造器中
...... // 初始化monitor机制 autoScan = BooleanUtils.toBoolean(getProperty(properties, CanalConstants.CANAL_AUTO_SCAN)); if (autoScan) { defaultAction = new InstanceAction() { public void start(String destination) { InstanceConfig config = instanceConfigs.get(destination); if (config == null) { // 重新读取一下instance config config = parseInstanceConfig(properties, destination); instanceConfigs.put(destination, config); } if (!embededCanalServer.isStart(destination)) { // HA机制启动 ServerRunningMonitor runningMonitor = ServerRunningMonitors.getRunningMonitor(destination); if (!config.getLazy() && !runningMonitor.isStart()) { runningMonitor.start(); } } } public void stop(String destination) { // 此处的stop,代表强制退出,非HA机制,所以需要退出HA的monitor和配置信息 InstanceConfig config = instanceConfigs.remove(destination); if (config != null) { embededCanalServer.stop(destination); ServerRunningMonitor runningMonitor = ServerRunningMonitors.getRunningMonitor(destination); if (runningMonitor.isStart()) { runningMonitor.stop(); } } } public void reload(String destination) { // 目前任何配置变化,直接重启,简单处理 stop(destination); start(destination); } }; instanceConfigMonitors = MigrateMap.makeComputingMap(new Function() { public InstanceConfigMonitor apply(InstanceMode mode) { int scanInterval = Integer.valueOf(getProperty(properties, CanalConstants.CANAL_AUTO_SCAN_INTERVAL)); if (mode.isSpring()) { SpringInstanceConfigMonitor monitor = new SpringInstanceConfigMonitor(); monitor.setScanIntervalInSecond(scanInterval); monitor.setDefaultAction(defaultAction); // 设置conf目录,默认是user.dir + conf目录组成 String rootDir = getProperty(properties, CanalConstants.CANAL_CONF_DIR); if (StringUtils.isEmpty(rootDir)) { rootDir = "../conf"; } if (StringUtils.equals("otter-canal", System.getProperty("appName"))) { monitor.setRootConf(rootDir); } else { // eclipse debug模式 monitor.setRootConf("src/main/resources/"); } return monitor; } else if (mode.isManager()) { // 配置更新,实现 ManagerInstanceConfigMonitor 参考 SpringInstanceConfigMonitor, 使用上面的 defaultAction 即可 ManagerInstanceConfigMonitor monitor = new ManagerInstanceConfigMonitor(); monitor.setScanIntervalInSecond(scanInterval); monitor.setDefaultAction(defaultAction); monitor.setIp(ip); return monitor; } else { throw new UnsupportedOperationException("unknow mode :" + mode + " for monitor"); } } }); }.....
查看 ManagerInstanceConfigMonitor 类,主要是实现 scan 方法,若配置有更新的话,回调 InstanceAction。这里还有个地方比较奇怪,可以看到 InstanceAction 回调方法参数是字符串 destination?我们在 scan 方法中调用后端服务器接口,已经获取到新的配置,却只能回调字符串 destination,然后在 CanalController.getManagerClient 方法根据 destination 再去调服务器获取具体配置?这里多了一次调用服务器接口,感觉比较奇怪。
Manager 模式实现步骤
1. 在 canal.properties 配置文件设置 `canal.instance.global.mode=manager`等;2. 在 CanalController 类构建 CanalConfigClient 实例,根据 ManagerAddress 从 managerClients 获取 CanalConfigClient (根据destination 获取 Canal, Filter);3. 实现 ManagerInstanceConfigMonitor,启用定时线程刷新配置,使用 InstanceAction 当作回调与 CanalServerWithEmbedded 通信,
配置的过滤 Filter 传递到 EventParser 中的 BinlogParser (实现类 LogEventConvert),所以感觉这里的 Filter 没什么意义,倒不如让 Client 消费所有数据,然后下发再做过滤。
转载地址:http://pvovx.baihongyu.com/