canal 搭配 zookeeper 实现 HA

canal 作为数据平台的一个重要环节, 若存在单点故障, 对整体后端的影响是非常大的. 实践和经验告诉我们, 关键重要的服务都要进行failover, 以保证系统的可用性. 下面就来介绍一下 canal 搭配 zookeeper 的 HA 方案.

环境搭建


下载并解压canal.deployer-1.0.22.tar.gz

改名为canal.deployer-1.0.22-master, 作为第一个提供服务的 canal server

再复制一份为canal.deployer-1.0.22-standby, 作为 canal master failover 的备援机.

canal master 设置

vim canal.deployer-1.0.22_master/conf/canal.properties

canal.zkServers=127.0.0.1:2181
canal.instance.global.spring.xml = classpath:spring/default-instance.xml

vim canal.deployer-1.0.22_master/conf/example/instance.properties

canal.instance.mysql.slaveId = 1234
canal.instance.master.journal.name = mysql-bin.000117
canal.instance.master.position = 154

canal standby 配置

vim canal.deployer-1.0.22_standby/conf/canal.properties

canal.id=2
canal.zkServers=127.0.0.1:2181
canal.port= 11112
canal.instance.global.spring.xml = classpath:spring/default-instance.xml

vim canal.deployer-1.0.22_standby/conf/example/instance.properties

canal.instance.mysql.slaveId = 1235
canal.instance.master.journal.name = mysql-bin.000117
canal.instance.master.position = 154

启动 zookeeper

bin/zkServer.sh start, 具体配置略过.

启动 canal

启动 canal master

$ bin/startup.sh
$ cat logs/example/example.log

2016-11-06 23:11:00.218 [main] INFO c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [canal.properties]
2016-11-06 23:11:00.221 [main] INFO c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [example/instance.properties]
2016-11-06 23:11:00.266 [main] WARN org.springframework.beans.TypeConverterDelegate - PropertyEditor [com.sun.beans.editors.EnumEditor] found through deprecated global PropertyEditorManager fallback - consider using a more isolated form of registration, e.g. on the BeanWrapper/BeanFactory!
2016-11-06 23:11:00.333 [main] INFO c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-example
2016-11-06 23:11:00.344 [main] INFO c.a.otter.canal.instance.core.AbstractCanalInstance - start successful....
2016-11-06 23:11:00.359 [destination = example , address = /127.0.0.1:3306 , EventParser] WARN c.a.otter.canal.parse.inbound.mysql.MysqlEventParser - prepare to find start position mysql-bin.000001:1684:

查看example下的日志, 可知canal正在提供服务.

启动 canal standby

$ bin/startup.sh
$ cat logs/canal/canal.log

2016-11-06 23:13:22.311 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## start the canal server.
log4j:WARN No appenders could be found for logger (org.I0Itec.zkclient.ZkEventThread).
log4j:WARN Please initialize the log4j system properly.
2016-11-06 23:13:22.442 [main] INFO com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[192.168.59.1:11112]
2016-11-06 23:13:22.569 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## the canal server is running now ......

启动后, 发现并没有example的log, 因为当前只有 canal master 在提供服务:

$ ll logs/
total 12
drwxrwxrwx 3 niko niko 4096 Nov 6 23:13 ./
drwxrwxrwx 6 niko niko 4096 Nov 6 10:44 ../
drwxrwxr-x 2 niko niko 4096 Nov 6 23:13 canal/

查看zookeeper中的running节点


[zk: localhost:2181(CONNECTED) ] get /otter/canal/destinations/example/running
{"active":true,"address":"192.168.156.1:11111","cid":1}

此时cid为1, master canal 正在 serving.

client 消费数据

canal client 代码:

CanalConnector connector = CanalConnectors.newClusterConnector("127.0.0.1:2181", destination, "", "");
......

连接 canal server 后, 从日志可以看到更改的数据 :

2016-11-07 09:02:12.527 [main] INFO  org.apache.zookeeper.ZooKeeper - Client environment:java.library.path=.::/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib
2016-11-07 09:02:12.527 [main] INFO org.apache.zookeeper.ZooKeeper - Client environment:java.io.tmpdir=/tmp
2016-11-07 09:02:12.527 [main] INFO org.apache.zookeeper.ZooKeeper - Client environment:java.compiler=<NA>
2016-11-07 09:02:12.527 [main] INFO org.apache.zookeeper.ZooKeeper - Client environment:os.name=Linux
2016-11-07 09:02:12.527 [main] INFO org.apache.zookeeper.ZooKeeper - Client environment:os.arch=amd64
2016-11-07 09:02:12.527 [main] INFO org.apache.zookeeper.ZooKeeper - Client environment:os.version=3.13.0-96-generic
2016-11-07 09:02:12.527 [main] INFO org.apache.zookeeper.ZooKeeper - Client environment:user.name=niko
2016-11-07 09:02:12.527 [main] INFO org.apache.zookeeper.ZooKeeper - Client environment:user.home=/home/niko
2016-11-07 09:02:12.527 [main] INFO org.apache.zookeeper.ZooKeeper - Client environment:user.dir=/home/niko/mount/hsb_ssd_1/niko/dev/code/git_repos/java/MavenProj/OpenSources/canal/official/example
2016-11-07 09:02:12.527 [main] INFO org.apache.zookeeper.ZooKeeper - Initiating client connection, connectString=127.0.0.1:2181 sessionTimeout=90000 watcher=com.alibaba.otter.canal.common.zookeeper.ZkClientx@351d00c0
2016-11-07 09:02:12.557 [main-SendThread(a.niko:2181)] INFO org.apache.zookeeper.ClientCnxn - Opening socket connection to server a.niko/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error)
2016-11-07 09:02:12.709 [main-SendThread(a.niko:2181)] INFO org.apache.zookeeper.ClientCnxn - Socket connection established to a.niko/127.0.0.1:2181, initiating session
2016-11-07 09:02:12.732 [main-SendThread(a.niko:2181)] INFO org.apache.zookeeper.ClientCnxn - Session establishment complete on server a.niko/127.0.0.1:2181, sessionid = 0x1583c459dd40002, negotiated timeout = 40000

****************************************************
* Batch Id: [1] ,count : [3] , memsize : [219] , Time : 2016-11-07 09:02:40
* Start : [mysql-bin.000117:219:1478480560000(2016-11-07 09:02:40)]
* End : [mysql-bin.000117:473:1478480560000(2016-11-07 09:02:40)]
****************************************************

================> binlog[mysql-bin.000117:219] , executeTime : 1478480560000 , delay : 788ms
BEGIN ----> Thread id: 8
----------------> binlog[mysql-bin.000117:365] , name[foo,city] , eventType : UPDATE , executeTime : 1478480560000 , delay : 791ms
id : 1 type=int(11)
province_id : 1 type=int(11)
name : 北京市3 type=varchar(50) update=true
----------------
END ----> transaction id: 28
================> binlog[mysql-bin.000117:473] , executeTime : 1478480560000 , delay : 8353ms

连接后, 在zookeeper可以看到 client 的信息:

[zk: localhost:2181(CONNECTED) 0] get /otter/canal/destinations/example/1001/running
{"active":true,"address":"192.168.156.1:41823","clientId":1001}
cZxid = 0xb2
ctime = Mon Nov 07 09:02:13 HKT 2016
mZxid = 0xb3
mtime = Mon Nov 07 09:02:13 HKT 2016
pZxid = 0xb2
cversion = 0
dataVersion = 1
aclVersion = 0
ephemeralOwner = 0x1583c459dd40002
dataLength = 63
numChildren = 0

数据消费成功后,canal server 会在zookeeper中记录下当前最后一次消费成功的 binlog 位置, 以便下次重启client时,可从这个位点继续进行消费.

[zk: localhost:2181(CONNECTED) 1] get /otter/canal/destinations/example/1001/cursor
{"@type":"com.alibaba.otter.canal.protocol.position.LogPosition","identity":{"slaveId":-1,"sourceAddress":{"address":"127.0.0.1","port":3306}},"postion":{"included":false,"journalName":"mysql-bin.000117","position":473,"timestamp":1478480560000}}

master canal 宕机测试

停掉 canal master :

$ bin/stop.sh
niko-ub1404: stopping canal 7662 ...
Oook! cost:1

停掉canal server后, 可以看到 client的log发生变化 :


......
================> binlog[mysql-bin.000117:473] , executeTime : 1478480560000 , delay : 8353ms
2016-11-07 09:13:14.485 [Thread-2] WARN c.alibaba.otter.canal.client.impl.ClusterCanalConnector - something goes wrong when getWithoutAck data from server:null
com.alibaba.otter.canal.protocol.exception.CanalClientException: java.io.IOException: Connection reset by peer
at com.alibaba.otter.canal.client.impl.SimpleCanalConnector.getWithoutAck(SimpleCanalConnector.java:281)
at com.alibaba.otter.canal.client.impl.SimpleCanalConnector.getWithoutAck(SimpleCanalConnector.java:252)
at com.alibaba.otter.canal.client.impl.ClusterCanalConnector.getWithoutAck(ClusterCanalConnector.java:180)
at com.alibaba.otter.canal.example.AbstractCanalClientTest.process(AbstractCanalClientTest.java:113)
at com.alibaba.otter.canal.example.AbstractCanalClientTest$2.run(AbstractCanalClientTest.java:80)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Connection reset by peer
at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
at sun.nio.ch.IOUtil.read(IOUtil.java:197)
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
at com.alibaba.otter.canal.client.impl.SimpleCanalConnector.read(SimpleCanalConnector.java:376)
at com.alibaba.otter.canal.client.impl.SimpleCanalConnector.readNextPacket(SimpleCanalConnector.java:366)
at com.alibaba.otter.canal.client.impl.SimpleCanalConnector.receiveMessages(SimpleCanalConnector.java:286)
at com.alibaba.otter.canal.client.impl.SimpleCanalConnector.getWithoutAck(SimpleCanalConnector.java:279)
... 5 more

2016-11-07 09:13:20.001 [Thread-2] INFO c.alibaba.otter.canal.client.impl.ClusterCanalConnector - restart the connector for next round retry.

****************************************************
* Batch Id: [1] ,count : [1] , memsize : [31] , Time : 2016-11-07 09:13:20
* Start : [mysql-bin.000117:473:1478480560000(2016-11-07 09:02:40)]
* End : [mysql-bin.000117:473:1478480560000(2016-11-07 09:02:40)]
****************************************************
----------------
END ----> transaction id: 28
================> binlog[mysql-bin.000117:473] , executeTime : 1478480560000 , delay : 640062ms

随着 canal server 的切换, client 会从zookeeper获取最新的 canal server 地址, 并与之建立连接, 继续消费数据.

接下来再查看当前running节点 :

[zk: localhost:2181(CONNECTED) 3] get /otter/canal/destinations/example/running
{"active":true,"address":"192.168.156.1:11112","cid":2}
cZxid = 0xbc
ctime = Mon Nov 07 09:13:19 HKT 2016
mZxid = 0xbc
mtime = Mon Nov 07 09:13:19 HKT 2016
pZxid = 0xbc
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x1583c459dd40001
dataLength = 55
numChildren = 0

可以看到cid2, canal standby 正在提供服务, zookeeper 这里切换完成.

这时候再修改数据库数据, 测试验证 canal standby 是否能够正常工作:

================> binlog[mysql-bin.000117:473] , executeTime : 1478480560000 , delay : 640062ms

****************************************************
* Batch Id: [2] ,count : [3] , memsize : [219] , Time : 2016-11-07 09:21:52
* Start : [mysql-bin.000117:569:1478481712000(2016-11-07 09:21:52)]
* End : [mysql-bin.000117:823:1478481712000(2016-11-07 09:21:52)]
****************************************************

================> binlog[mysql-bin.000117:569] , executeTime : 1478481712000 , delay : 634ms
BEGIN ----> Thread id: 8
----------------> binlog[mysql-bin.000117:715] , name[foo,city] , eventType : UPDATE , executeTime : 1478481712000 , delay : 634ms
id : 1 type=int(11)
province_id : 1 type=int(11)
name : 北京市4 type=varchar(50) update=true
----------------
END ----> transaction id: 41
================> binlog[mysql-bin.000117:823] , executeTime : 1478481712000 , delay : 634ms

可以看到, canal standby 推送了数据变更给 client, 此时再查看logs/example也会发现有日志出来了:

canal.deployer-1.0.22-SNAPSHOT_standby$ cat logs/example/example.log
2016-11-07 09:13:19.772 [pool-1-thread-1] INFO c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [canal.properties]
2016-11-07 09:13:19.776 [pool-1-thread-1] INFO c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [example/instance.properties]
2016-11-07 09:13:19.819 [pool-1-thread-1] WARN org.springframework.beans.TypeConverterDelegate - PropertyEditor [com.sun.beans.editors.EnumEditor] found through deprecated global PropertyEditorManager fallback - consider using a more isolated form of registration, e.g. on the BeanWrapper/BeanFactory!
2016-11-07 09:13:19.907 [pool-1-thread-1] INFO c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-example
2016-11-07 09:13:19.921 [pool-1-thread-1] INFO c.a.otter.canal.instance.core.AbstractCanalInstance - start successful....
2016-11-07 09:13:19.980 [destination = example , address = /127.0.0.1:3306 , EventParser] WARN c.a.otter.canal.parse.inbound.mysql.MysqlEventParser - prepare to find start position just last position

可知新的canal实例已经被启动并接管服务了.

异常场景

当然, 有一些异常场景我们要注意一下:

释放时间

如果canal server的jvm异常crash,running节点(EPHEMERAL type) 的释放是在对应的zookeeper session失效后,

canal server 网络中断

如果 canal server 所在的网络断了, zookeeper将会认为session失效, 进而释放running节点, 而canal server的jvm其实并没有退出.

为了避免瞬间runing失效导致的instance重新分布,canal 有一个策略:

canal server在收到running节点释放后,延迟一段时间抢占running,原本running节点的拥有者可以不需要等待延迟,优先取得running节点,可以保证假死状态下尽可能不无谓的释放资源。 目前延迟时间的默认值为5秒,即running节点针对假死状态的保护期为5秒.

参考


https://github.com/alibaba/canal/wiki/AdminGuide