在使用大数据进行数据计算的时候,首先我们需要获取到数据。如果是从MySQL获取数据的话,可以选择阿里的开源组件canal,它将自己伪装成MySQL的slave来接收数据。
开启MySQL的binlog设置
首先我们查看MySQL是否打开了binlog复制的功能
mysql> show variables like 'log_bin';+---------------+-------+| Variable_name | Value |+---------------+-------+| log_bin | OFF |+---------------+-------+1 row in set (0.00 sec)
如果没有打开,就编辑MySQL的配置文件/etc/my.cnf
,添加如下配置
# binlog文件名log-bin=mysql-bin# 选择row模式binlog_format=ROW# mysql实例id,不能和canal的slaveId重复server_id=1
然后使用命令systemctl restart mysqld
重启MySQL,之后再次查看配置,此时binlog复制功能已经开启了
mysql> show variables like 'log_bin';+---------------+-------+| Variable_name | Value |+---------------+-------+| log_bin | ON |+---------------+-------+1 row in set (0.00 sec)
安装canal
开启了MySQL的binlog功能之后我们下载canal.deployer文件,下载完毕进行解压
mkdir canaltar -zxvf canal.deployer-1.1.4.tar.gz -C canal
之后修改文件conf/example/instance.properties
,修改如下配置
# slaveId,和之前的server_id不能一样canal.instance.mysql.slaveId = 2# 数据库的地址端口canal.instance.master.address = 172.19.34.19:3306# 数据库的用户名canal.instance.dbUsername = root# 数据库的密码canal.instance.dbPassword = 1234
还要修改文件conf/canal.properties
,添加ZooKeeper的地址设置
canal.zkServers = 172.19.65.196:2181,172.19.72.108:2181,172.19.72.112:2181
在另一台机器上面也下载解压canal文件,同样进行如上的配置。唯一的区别就是将这台机器的slaveId设置和mysql的server_id以及之前那台canal机器的slaveId不一样,这里我设置这台机器的slaveId为3。
配置好了之后使用./bin/startup.sh
启动两台机器的canal服务,随后可以在MySQL上面执行show master status;
查看binlog的位点。连接ZK之后我们也能看到我们所创建的两个canal服务地址
[zk] ls /otter/canal/cluster[172.19.65.136:11111, 172.19.65.228:11111]
创建从canal获取数据的客户端
启动好了canal之后我们创建一个maven项目,并且添加依赖
1 2 3 4 5 6 7 8 9 10 11
|
<dependency> <groupId>com.alibaba.otter</groupId> <artifactId>canal.client</artifactId> <version>1.1.4</version> </dependency> <dependency> <!-- 可选,其实canal已经依赖了该包,但是我在执行时总是会报包不存在的错误,所以此处手动添加了依赖 --> <groupId>com.google.protobuf</groupId> <artifactId>protobuf-java</artifactId> <version>3.6.1</version> </dependency>
|
使用如下代码从canal中获取MySQL的数据变更
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100
|
import com.alibaba.otter.canal.client.CanalConnector; import com.alibaba.otter.canal.client.CanalConnectors; import com.alibaba.otter.canal.protocol.CanalEntry.*; import com.alibaba.otter.canal.protocol.Message;
import java.util.List;
public class Test {
public static void main(String[] args) { CanalConnector connector = CanalConnectors.newClusterConnector("172.19.65.196:2181,172.19.72.108:2181,172.19.72.112:2181", "example", "", ""); try { // 打开连接 connector.connect(); // 订阅数据库表,全部表 connector.subscribe(".*\\..*"); // 回滚到未进行ack的地方,下次fetch的时候,可以从最后一个没有ack的地方开始拿 connector.rollback(); while (true) { // 获取指定数量的数据 Message message = connector.getWithoutAck(1000); // 获取批量ID long batchId = message.getId(); // 获取批量的数量 int size = message.getEntries().size(); //如果没有数据 if (batchId == -1 || size == 0) { try { // 线程休眠2秒 Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } } else { // 如果有数据,处理数据 printEntry(message.getEntries()); } // 进行 batch id 的确认。确认之后,小于等于此 batchId 的 Message 都会被确认。 connector.ack(batchId); } } catch (Exception e) { e.printStackTrace(); } finally { connector.disconnect(); } }
private static void printEntry(List<Entry> entrys) { for (Entry entry : entrys) { if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) { // 开启/关闭事务的实体类型,跳过 continue; } // RowChange对象,包含了一行数据变化的所有特征 // 比如isDdl 是否是ddl变更操作 sql 具体的ddl sql beforeColumns afterColumns 变更前后的数据字段等等 RowChange rowChage; try { rowChage = RowChange.parseFrom(entry.getStoreValue()); } catch (Exception e) { throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e); } // 获取操作类型:insert/update/delete类型 EventType eventType = rowChage.getEventType(); // 打印Header信息 System.out.printf("================》; binlog[%s:%s] , name[%s,%s] , eventType : %s%n", entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(), entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType); // 判断是否是DDL语句 if (rowChage.getIsDdl()) { System.out.println("================》;isDdl: true,sql:" + rowChage.getSql()); } // 获取RowChange对象里的每一行数据,打印出来 for (RowData rowData : rowChage.getRowDatasList()) { // 如果是删除语句 if (eventType == EventType.DELETE) { printColumn(rowData.getBeforeColumnsList()); // 如果是新增语句 } else if (eventType == EventType.INSERT) { printColumn(rowData.getAfterColumnsList()); // 如果是更新的语句 } else { // 变更前的数据 System.out.println("------->; before"); printColumn(rowData.getBeforeColumnsList()); // 变更后的数据 System.out.println("------->; after"); printColumn(rowData.getAfterColumnsList()); } } } }
private static void printColumn(List<Column> columns) { for (Column column : columns) { System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated()); } }
}
|
如上代码在我们对MySQL数据进行修改的时候会获取到相应的修改内容并且打印出来,如果两台canal的其中一台挂了,我们也还是能正常获取数据实现高可用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294
|
================》; binlog[mysql-bin.000001:2113] , name[bugatti,tapd_bugs] , eventType : INSERT workspace_id : 1 update=true id : 222 update=true title : 测试测试测试 update=true priority : update=true severity : update=true status : update=true iteration_id : update=true module : update=true version_report : update=true version_test : update=true version_fix : update=true version_close : update=true baseline_find : update=true baseline_join : update=true baseline_test : update=true baseline_close : update=true current_owner : update=true current_owner_no : update=true cc : update=true reporter : update=true reporter_no : update=true participator : update=true te : update=true de : update=true auditer : update=true confirmer : update=true fixer : update=true fixer_no : update=true closer : update=true lastmodify : update=true created : update=true in_progress_time : update=true resolved : update=true verity_time : update=true closed : update=true reject_time : update=true modified : update=true begin : update=true due : update=true deadline : update=true os : update=true platform : update=true testmode : update=true testphase : update=true testtype : update=true source : update=true bugtype : update=true frequency : update=true originphase : update=true sourcephase : update=true resolution : update=true description : update=true ================》; binlog[mysql-bin.000001:2565] , name[bugatti,tapd_bugs] , eventType : DELETE workspace_id : 1 update=false id : 222 update=false title : 测试测试测试 update=false priority : update=false severity : update=false status : update=false iteration_id : update=false module : update=false version_report : update=false version_test : update=false version_fix : update=false version_close : update=false baseline_find : update=false baseline_join : update=false baseline_test : update=false baseline_close : update=false current_owner : update=false current_owner_no : update=false cc : update=false reporter : update=false reporter_no : update=false participator : update=false te : update=false de : update=false auditer : update=false confirmer : update=false fixer : update=false fixer_no : update=false closer : update=false lastmodify : update=false created : update=false in_progress_time : update=false resolved : update=false verity_time : update=false closed : update=false reject_time : update=false modified : update=false begin : update=false due : update=false deadline : update=false os : update=false platform : update=false testmode : update=false testphase : update=false testtype : update=false source : update=false bugtype : update=false frequency : update=false originphase : update=false sourcephase : update=false resolution : update=false description : update=false 2022-04-26 10:24:55:732 WARN --- [ main] c.a.o.c.c.i.ClusterCanalConnector : something goes wrong when getWithoutAck data from server:null com.alibaba.otter.canal.protocol.exception.CanalClientException: java.io.IOException: end of stream when reading header at com.alibaba.otter.canal.client.impl.SimpleCanalConnector.getWithoutAck(SimpleCanalConnector.java:325) ~[canal.client-1.1.4.jar:na] at com.alibaba.otter.canal.client.impl.SimpleCanalConnector.getWithoutAck(SimpleCanalConnector.java:295) ~[canal.client-1.1.4.jar:na] at com.alibaba.otter.canal.client.impl.ClusterCanalConnector.getWithoutAck(ClusterCanalConnector.java:183) ~[canal.client-1.1.4.jar:na] at me.hourui.canal.Test.main(Test.java:26) [classes/:na] Caused by: java.io.IOException: end of stream when reading header at com.alibaba.otter.canal.client.impl.SimpleCanalConnector.read(SimpleCanalConnector.java:413) ~[canal.client-1.1.4.jar:na] at com.alibaba.otter.canal.client.impl.SimpleCanalConnector.readNextPacket(SimpleCanalConnector.java:401) ~[canal.client-1.1.4.jar:na] at com.alibaba.otter.canal.client.impl.SimpleCanalConnector.readNextPacket(SimpleCanalConnector.java:385) ~[canal.client-1.1.4.jar:na] at com.alibaba.otter.canal.client.impl.SimpleCanalConnector.receiveMessages(SimpleCanalConnector.java:330) ~[canal.client-1.1.4.jar:na] at com.alibaba.otter.canal.client.impl.SimpleCanalConnector.getWithoutAck(SimpleCanalConnector.java:323) ~[canal.client-1.1.4.jar:na] ... 3 common frames omitted 2022-04-26 10:25:01:109 ERROR --- [ main] c.a.o.c.c.i.r.ClientRunningMonitor : There is an error when execute initRunning method, with destination [example]. com.alibaba.otter.canal.protocol.exception.CanalClientException: failed to subscribe with reason: something goes wrong with channel:[id: 0x5aa611b9, /172.19.6.8:61485 => /172.19.65.228:11111], exception=com.alibaba.otter.canal.server.exception.CanalServerException: destination:example should start first
at com.alibaba.otter.canal.client.impl.SimpleCanalConnector.subscribe(SimpleCanalConnector.java:249) [canal.client-1.1.4.jar:na] at com.alibaba.otter.canal.client.impl.SimpleCanalConnector$1.processActiveEnter(SimpleCanalConnector.java:434) ~[canal.client-1.1.4.jar:na] at com.alibaba.otter.canal.client.impl.running.ClientRunningMonitor.processActiveEnter(ClientRunningMonitor.java:221) [canal.client-1.1.4.jar:na] at com.alibaba.otter.canal.client.impl.running.ClientRunningMonitor.initRunning(ClientRunningMonitor.java:123) [canal.client-1.1.4.jar:na] at com.alibaba.otter.canal.client.impl.running.ClientRunningMonitor.start(ClientRunningMonitor.java:93) [canal.client-1.1.4.jar:na] at com.alibaba.otter.canal.client.impl.SimpleCanalConnector.connect(SimpleCanalConnector.java:108) [canal.client-1.1.4.jar:na] at com.alibaba.otter.canal.client.impl.ClusterCanalConnector.connect(ClusterCanalConnector.java:64) [canal.client-1.1.4.jar:na] at com.alibaba.otter.canal.client.impl.ClusterCanalConnector.restart(ClusterCanalConnector.java:273) [canal.client-1.1.4.jar:na] at com.alibaba.otter.canal.client.impl.ClusterCanalConnector.getWithoutAck(ClusterCanalConnector.java:189) [canal.client-1.1.4.jar:na] at me.hourui.canal.Test.main(Test.java:26) [classes/:na] 2022-04-26 10:25:01:139 WARN --- [ main] c.a.o.c.c.i.ClusterCanalConnector : failed to connect to:/172.19.65.228:11111 after retry 0 times 2022-04-26 10:25:01:153 WARN --- [ main] c.a.o.c.c.i.r.ClientRunningMonitor : canal is not run any in node 2022-04-26 10:25:06:498 INFO --- [ main] c.a.o.c.c.i.ClusterCanalConnector : restart the connector for next round retry. ================》; binlog[mysql-bin.000001:2565] , name[bugatti,tapd_bugs] , eventType : DELETE workspace_id : 1 update=false id : 222 update=false title : 测试测试测试 update=false priority : update=false severity : update=false status : update=false iteration_id : update=false module : update=false version_report : update=false version_test : update=false version_fix : update=false version_close : update=false baseline_find : update=false baseline_join : update=false baseline_test : update=false baseline_close : update=false current_owner : update=false current_owner_no : update=false cc : update=false reporter : update=false reporter_no : update=false participator : update=false te : update=false de : update=false auditer : update=false confirmer : update=false fixer : update=false fixer_no : update=false closer : update=false lastmodify : update=false created : update=false in_progress_time : update=false resolved : update=false verity_time : update=false closed : update=false reject_time : update=false modified : update=false begin : update=false due : update=false deadline : update=false os : update=false platform : update=false testmode : update=false testphase : update=false testtype : update=false source : update=false bugtype : update=false frequency : update=false originphase : update=false sourcephase : update=false resolution : update=false description : update=false ================》; binlog[mysql-bin.000001:3017] , name[bugatti,tapd_bugs] , eventType : INSERT workspace_id : 1111 update=true id : 2222 update=true title : 333测试1坻崿1 update=true priority : update=true severity : update=true status : update=true iteration_id : update=true module : update=true version_report : update=true version_test : update=true version_fix : update=true version_close : update=true baseline_find : update=true baseline_join : update=true baseline_test : update=true baseline_close : update=true current_owner : update=true current_owner_no : update=true cc : update=true reporter : update=true reporter_no : update=true participator : update=true te : update=true de : update=true auditer : update=true confirmer : update=true fixer : update=true fixer_no : update=true closer : update=true lastmodify : update=true created : update=true in_progress_time : update=true resolved : update=true verity_time : update=true closed : update=true reject_time : update=true modified : update=true begin : update=true due : update=true deadline : update=true os : update=true platform : update=true testmode : update=true testphase : update=true testtype : update=true source : update=true bugtype : update=true frequency : update=true originphase : update=true sourcephase : update=true resolution : update=true description : update=true ================》; binlog[mysql-bin.000001:3472] , name[bugatti,tapd_bugs] , eventType : DELETE workspace_id : 1111 update=false id : 2222 update=false title : 333测试1坻崿1 update=false priority : update=false severity : update=false status : update=false iteration_id : update=false module : update=false version_report : update=false version_test : update=false version_fix : update=false version_close : update=false baseline_find : update=false baseline_join : update=false baseline_test : update=false baseline_close : update=false current_owner : update=false current_owner_no : update=false cc : update=false reporter : update=false reporter_no : update=false participator : update=false te : update=false de : update=false auditer : update=false confirmer : update=false fixer : update=false fixer_no : update=false closer : update=false lastmodify : update=false created : update=false in_progress_time : update=false resolved : update=false verity_time : update=false closed : update=false reject_time : update=false modified : update=false begin : update=false due : update=false deadline : update=false os : update=false platform : update=false testmode : update=false testphase : update=false testtype : update=false source : update=false bugtype : update=false frequency : update=false originphase : update=false sourcephase : update=false resolution : update=false description : update=false
|
本文转自: http://www.nosuchfield.com/2022/04/25/Simple-use-of-canal/
本站仅做收录,版权归原作者所有。
Post Views: 3