{"id":1679,"date":"2023-03-25T22:33:30","date_gmt":"2023-03-25T14:33:30","guid":{"rendered":"https:\/\/www.appblog.cn\/?p=1679"},"modified":"2023-04-23T21:48:51","modified_gmt":"2023-04-23T13:48:51","slug":"alibaba-canal-client-access","status":"publish","type":"post","link":"https:\/\/www.appblog.cn\/index.php\/2023\/03\/25\/alibaba-canal-client-access\/","title":{"rendered":"\u963f\u91cc\u5df4\u5df4Canal Client\u63a5\u5165"},"content":{"rendered":"<p>\u53c2\u8003\uff1a<a target=\"_blank\" rel=\"noopener\" href=\"https:\/\/github.com\/alibaba\/canal\/wiki\">https:\/\/github.com\/alibaba\/canal\/wiki<\/a><\/p>\n<blockquote>\n<p>\u9996\u5148\u542f\u52a8Canal Server<\/p>\n<\/blockquote>\n<h3>\u521b\u5efamvn\u6807\u51c6\u5de5\u7a0b<\/h3>\n<pre><code>mvn archetype:generate -DgroupId=com.alibaba.otter -DartifactId=canal.sample<\/code><\/pre>\n<p><!-- more --><\/p>\n<h3>pom.xml\u4f9d\u8d56\u914d\u7f6e<\/h3>\n<pre><code class=\"language-xml\">&lt;dependency&gt;\n    &lt;groupId&gt;com.alibaba.otter&lt;\/groupId&gt;\n    &lt;artifactId&gt;canal.client&lt;\/artifactId&gt;\n    &lt;version&gt;1.1.4&lt;\/version&gt;\n&lt;\/dependency&gt;<\/code><\/pre>\n<h3>ClientSample\u4ee3\u7801<\/h3>\n<pre><code class=\"language-java\">package com.alibaba.otter.canal.sample;\n\nimport java.net.InetSocketAddress;\nimport java.util.List;\n\nimport com.alibaba.otter.canal.client.CanalConnectors;\nimport com.alibaba.otter.canal.client.CanalConnector;\nimport com.alibaba.otter.canal.common.utils.AddressUtils;\nimport com.alibaba.otter.canal.protocol.Message;\nimport com.alibaba.otter.canal.protocol.CanalEntry.Column;\nimport com.alibaba.otter.canal.protocol.CanalEntry.Entry;\nimport com.alibaba.otter.canal.protocol.CanalEntry.EntryType;\nimport com.alibaba.otter.canal.protocol.CanalEntry.EventType;\nimport com.alibaba.otter.canal.protocol.CanalEntry.RowChange;\nimport com.alibaba.otter.canal.protocol.CanalEntry.RowData;\n\npublic class SimpleCanalClientExample {\n\n    public static void main(String args[]) {\n        \/\/ \u521b\u5efa\u94fe\u63a5\n        CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(),\n                                                                                            11111), &quot;test&quot;, &quot;&quot;, &quot;&quot;);\n        int batchSize = 1000;\n        int emptyCount = 0;\n        try {\n            connector.connect();\n            connector.subscribe(&quot;.*\\\\..*&quot;);\n            connector.rollback();\n            int totalEmptyCount = 120;\n            while (emptyCount &lt; totalEmptyCount) {\n                Message message = connector.getWithoutAck(batchSize); \/\/ \u83b7\u53d6\u6307\u5b9a\u6570\u91cf\u7684\u6570\u636e\n                long batchId = message.getId();\n                int size = message.getEntries().size();\n                if (batchId == -1 || size == 0) {\n                    emptyCount++;\n                    System.out.println(&quot;empty count : &quot; + emptyCount);\n                    try {\n                        Thread.sleep(1000);\n                    } catch (InterruptedException e) {\n                    }\n                } else {\n                    emptyCount = 0;\n                    \/\/ System.out.printf(&quot;message[batchId=%s,size=%s] \\n&quot;, batchId, size);\n                    printEntry(message.getEntries());\n                }\n\n                connector.ack(batchId); \/\/ \u63d0\u4ea4\u786e\u8ba4\n                \/\/ connector.rollback(batchId); \/\/ \u5904\u7406\u5931\u8d25, \u56de\u6eda\u6570\u636e\n            }\n\n            System.out.println(&quot;empty too many times, exit&quot;);\n        } finally {\n            connector.disconnect();\n        }\n    }\n\n    private static void printEntry(List&lt;Entry&gt; entrys) {\n        for (Entry entry : entrys) {\n            if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {\n                continue;\n            }\n\n            RowChange rowChage = null;\n            try {\n                rowChage = RowChange.parseFrom(entry.getStoreValue());\n            } catch (Exception e) {\n                throw new RuntimeException(&quot;ERROR ## parser of eromanga-event has an error , data:&quot; + entry.toString(),\n                                           e);\n            }\n\n            EventType eventType = rowChage.getEventType();\n            System.out.println(String.format(&quot;================&gt; binlog[%s:%s] , name[%s,%s] , eventType : %s&quot;,\n                                             entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),\n                                             entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),\n                                             eventType));\n\n            for (RowData rowData : rowChage.getRowDatasList()) {\n                if (eventType == EventType.DELETE) {\n                    printColumn(rowData.getBeforeColumnsList());\n                } else if (eventType == EventType.INSERT) {\n                    printColumn(rowData.getAfterColumnsList());\n                } else {\n                    System.out.println(&quot;-------&gt; before&quot;);\n                    printColumn(rowData.getBeforeColumnsList());\n                    System.out.println(&quot;-------&gt; after&quot;);\n                    printColumn(rowData.getAfterColumnsList());\n                }\n            }\n        }\n    }\n\n    private static void printColumn(List&lt;Column&gt; columns) {\n        for (Column column : columns) {\n            System.out.println(column.getName() + &quot; : &quot; + column.getValue() + &quot;    update=&quot; + column.getUpdated());\n        }\n    }\n}<\/code><\/pre>\n<h3>\u8fd0\u884cClient<\/h3>\n<p>\u542f\u52a8Canal Client\u540e\uff0c\u53ef\u4ee5\u4ece\u63a7\u5236\u53f0\u770b\u5230\u7c7b\u4f3c\u6d88\u606f\uff1a<\/p>\n<pre><code>empty count : 1\nempty count : 2\nempty count : 3\nempty count : 4<\/code><\/pre>\n<p>\u6b64\u65f6\u4ee3\u8868\u5f53\u524d\u6570\u636e\u5e93\u65e0\u53d8\u66f4\u6570\u636e<\/p>\n<h3>\u89e6\u53d1\u6570\u636e\u5e93\u53d8\u66f4<\/h3>\n<pre><code class=\"language-sql\">mysql&gt; use test;\nDatabase changed\nmysql&gt; CREATE TABLE `xdual` (\n    -&gt;   `ID` int(11) NOT NULL AUTO_INCREMENT,\n    -&gt;   `X` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,\n    -&gt;   PRIMARY KEY (`ID`)\n    -&gt; ) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8 ;\nQuery OK, 0 rows affected (0.06 sec)\nmysql&gt; insert into xdual(id,x) values(null,now());Query OK, 1 row affected (0.06 sec)<\/code><\/pre>\n<p>\u53ef\u4ee5\u4ece\u63a7\u5236\u53f0\u4e2d\u770b\u5230\uff1a<\/p>\n<pre><code>empty count : 1\nempty count : 2\nempty count : 3\nempty count : 4\n================&gt; binlog[mysql-bin.001946:313661577] , name[test,xdual] , eventType : INSERT\nID : 4    update=true\nX : 2013-02-05 23:29:46    update=true<\/code><\/pre>\n<h3>\u66f4\u591a\u80fd\u529b<\/h3>\n<p>\u5982\u679c\u9700\u8981\u66f4\u8be6\u7ec6\u7684exmpale\u4f8b\u5b50\uff0c\u8bf7\u4e0b\u8f7dcanal\u5f53\u524d\u6700\u65b0\u6e90\u7801\u5305\uff0c\u91cc\u9762\u6709\u4e2aexample\u5de5\u7a0b<\/p>\n<ul>\n<li>Simple\u5ba2\u6237\u7aef\u4f8b\u5b50\uff1a<a target=\"_blank\" rel=\"noopener\" href=\"https:\/\/github.com\/alibaba\/canal\/blob\/master\/example\/src\/main\/java\/com\/alibaba\/otter\/canal\/example\/SimpleCanalClientTest.java\" title=\"SimpleCanalClientTest\">SimpleCanalClientTest<\/a><\/li>\n<li>Cluster\u5ba2\u6237\u7aef\u4f8b\u5b50\uff1a<a target=\"_blank\" rel=\"noopener\" href=\"https:\/\/github.com\/alibaba\/canal\/blob\/master\/example\/src\/main\/java\/com\/alibaba\/otter\/canal\/example\/ClusterCanalClientTest.java\" title=\"ClusterCanalClientTest\">ClusterCanalClientTest<\/a><\/li>\n<\/ul>\n<h3>server\/client\u4ea4\u4e92\u534f\u8bae<\/h3>\n<p><code>get\/ack\/rollback<\/code>\u534f\u8bae\u4ecb\u7ecd\uff1a<\/p>\n<p>\uff081\uff09<code>Message getWithoutAck(int batchSize)<\/code><\/p>\n<p>\u5141\u8bb8\u6307\u5b9a<code>batchSize<\/code>\uff0c\u4e00\u6b21\u53ef\u4ee5\u83b7\u53d6\u591a\u6761\uff0c\u6bcf\u6b21\u8fd4\u56de\u7684\u5bf9\u8c61\u4e3a<code>Message<\/code>\uff0c\u5305\u542b\u7684\u5185\u5bb9\u4e3a\uff1a<\/p>\n<ul>\n<li>batch id \u552f\u4e00\u6807\u8bc6<\/li>\n<li>entries \u5177\u4f53\u7684\u6570\u636e\u5bf9\u8c61<\/li>\n<\/ul>\n<p>\uff082\uff09<code>getWithoutAck(int batchSize, Long timeout, TimeUnit unit)<\/code><\/p>\n<p>\u76f8\u6bd4\u4e8e<code>getWithoutAck(int batchSize)<\/code>\uff0c\u5141\u8bb8\u8bbe\u5b9a\u83b7\u53d6\u6570\u636e\u7684timeout\u8d85\u65f6\u65f6\u95f4<\/p>\n<ul>\n<li>\u62ff\u591f<code>batchSize<\/code>\u6761\u8bb0\u5f55\u6216\u8005\u8d85\u8fc7<code>timeout<\/code>\u65f6\u95f4<\/li>\n<li><code>timeout=0<\/code>\uff0c\u963b\u585e\u7b49\u5230\u8db3\u591f\u7684<code>batchSize<\/code><\/li>\n<\/ul>\n<p>\uff083\uff09<code>void rollback(long batchId)<\/code><\/p>\n<p>\u987e\u547d\u601d\u8bae\uff0c\u56de\u6eda\u4e0a\u6b21\u7684get\u8bf7\u6c42\uff0c\u91cd\u65b0\u83b7\u53d6\u6570\u636e\u3002\u57fa\u4e8eget\u83b7\u53d6\u7684batchId\u8fdb\u884c\u63d0\u4ea4\uff0c\u907f\u514d\u8bef\u64cd\u4f5c<\/p>\n<p>\uff084\uff09<code>void ack(long batchId)<\/code><\/p>\n<p>\u987e\u547d\u601d\u8bae\uff0c\u786e\u8ba4\u5df2\u7ecf\u6d88\u8d39\u6210\u529f\uff0c\u901a\u77e5server\u5220\u9664\u6570\u636e\u3002\u57fa\u4e8eget\u83b7\u53d6\u7684batchId\u8fdb\u884c\u63d0\u4ea4\uff0c\u907f\u514d\u8bef\u64cd\u4f5c<\/p>\n<p>canal\u7684<code>get\/ack\/rollback<\/code>\u534f\u8bae\u548c\u5e38\u89c4\u7684jms\u534f\u8bae\u6709\u6240\u4e0d\u540c\uff0c\u5141\u8bb8<code>get\/ack<\/code>\u5f02\u6b65\u5904\u7406\uff0c\u6bd4\u5982\u53ef\u4ee5\u8fde\u7eed\u8c03\u7528get\u591a\u6b21\uff0c\u540e\u7eed\u5f02\u6b65\u6309\u987a\u5e8f\u63d0\u4ea4<code>ack\/rollback<\/code>\uff0c\u9879\u76ee\u4e2d\u79f0\u4e4b\u4e3a\u6d41\u5f0fapi<\/p>\n<h3>\u6570\u636e\u5bf9\u8c61\u683c\u5f0f<\/h3>\n<pre><code>Entry  \n    Header  \n        logfileName   [binlog\u6587\u4ef6\u540d]  \n        logfileOffset [binlog position]  \n        executeTime   [binlog\u91cc\u8bb0\u5f55\u53d8\u66f4\u53d1\u751f\u7684\u65f6\u95f4\u6233,\u7cbe\u786e\u5230\u79d2]  \n        schemaName   \n        tableName  \n        eventType [insert\/update\/delete\u7c7b\u578b]  \n    entryType     [\u4e8b\u52a1\u5934BEGIN\/\u4e8b\u52a1\u5c3eEND\/\u6570\u636eROWDATA]  \n    storeValue    [byte\u6570\u636e,\u53ef\u5c55\u5f00\uff0c\u5bf9\u5e94\u7684\u7c7b\u578b\u4e3aRowChange]  \n\nRowChange\n\nisDdl       [\u662f\u5426\u662fddl\u53d8\u66f4\u64cd\u4f5c\uff0c\u6bd4\u5982create table\/drop table]\n\nsql         [\u5177\u4f53\u7684ddl sql]\n\nrowDatas    [\u5177\u4f53insert\/update\/delete\u7684\u53d8\u66f4\u6570\u636e\uff0c\u53ef\u4e3a\u591a\u6761\uff0c1\u4e2abinlog event\u4e8b\u4ef6\u53ef\u5bf9\u5e94\u591a\u6761\u53d8\u66f4\uff0c\u6bd4\u5982\u6279\u5904\u7406]\n\nbeforeColumns [Column\u7c7b\u578b\u7684\u6570\u7ec4\uff0c\u53d8\u66f4\u524d\u7684\u6570\u636e\u5b57\u6bb5]\n\nafterColumns  [Column\u7c7b\u578b\u7684\u6570\u7ec4\uff0c\u53d8\u66f4\u540e\u7684\u6570\u636e\u5b57\u6bb5]\n\nColumn\n\nindex\n\nsqlType     [jdbc type]\n\nname        [column name]\n\nisKey       [\u662f\u5426\u4e3a\u4e3b\u952e]\n\nupdated     [\u662f\u5426\u53d1\u751f\u8fc7\u53d8\u66f4]\n\nisNull      [\u503c\u662f\u5426\u4e3anull]\n\nvalue       [\u5177\u4f53\u7684\u5185\u5bb9\uff0c\u6ce8\u610f\u4e3astring\u6587\u672c]  <\/code><\/pre>\n<p>\u8bf4\u660e\uff1a<\/p>\n<ul>\n<li>\u53ef\u4ee5\u63d0\u4f9b\u6570\u636e\u5e93\u53d8\u66f4\u524d\u548c\u53d8\u66f4\u540e\u7684\u5b57\u6bb5\u5185\u5bb9\uff0c\u9488\u5bf9<code>binlog<\/code>\u4e2d\u6ca1\u6709\u7684<code>name<\/code>,<code>isKey<\/code>\u7b49\u4fe1\u606f\u8fdb\u884c\u8865\u5168<\/li>\n<li>\u53ef\u4ee5\u63d0\u4f9b<code>ddl<\/code>\u7684\u53d8\u66f4\u8bed\u53e5<\/li>\n<li><code>insert<\/code>\u53ea\u6709<code>after columns<\/code>\uff0c<code>delete<\/code>\u53ea\u6709<code>before columns<\/code>\uff0c\u800c<code>update<\/code>\u5219\u4f1a\u6709<code>before\/after columns<\/code>\u6570\u636e<\/li>\n<\/ul>\n<h3>\u521b\u5efaConnector<\/h3>\n<ul>\n<li>\u521b\u5efa<code>SimpleCanalConnector<\/code>(\u76f4\u8fdeip\uff0c\u4e0d\u652f\u6301server\/client\u7684failover\u673a\u5236)<\/li>\n<\/ul>\n<pre><code class=\"language-java\">CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(),11111), destination, &quot;&quot;, &quot;&quot;);<\/code><\/pre>\n<ul>\n<li>\u521b\u5efa<code>ClusterCanalConnector<\/code>(\u57fa\u4e8ezookeeper\u83b7\u53d6canal server ip\uff0c\u652f\u6301server\/client\u7684failover\u673a\u5236)<\/li>\n<\/ul>\n<pre><code class=\"language-java\">CanalConnector connector = CanalConnectors.newClusterConnector(&quot;10.20.144.51:2181&quot;, destination, &quot;&quot;, &quot;&quot;);<\/code><\/pre>\n<ul>\n<li>\u521b\u5efa<code>ClusterCanalConnector<\/code>(\u57fa\u4e8e\u56fa\u5b9acanal server\u7684\u5730\u5740\uff0c\u652f\u6301\u56fa\u5b9a\u7684server ip\u7684failover\u673a\u5236\uff0c\u4e0d\u652f\u6301client\u7684failover\u673a\u5236<\/li>\n<\/ul>\n<pre><code class=\"language-java\">CanalConnector connector = CanalConnectors.newClusterConnector(Arrays.asList(new InetSocketAddress(AddressUtils.getHostIp(),11111)), destination,&quot;&quot;, &quot;&quot;);<\/code><\/pre>\n","protected":false},"excerpt":{"rendered":"<p>\u53c2\u8003\uff1ahttps:\/\/github.com\/alibaba\/canal\/wiki \u9996\u5148\u542f\u52a8Canal Serv [&hellip;]<\/p>\n","protected":false},"author":1,"featured_media":0,"comment_status":"open","ping_status":"open","sticky":false,"template":"","format":"standard","meta":{"footnotes":""},"categories":[417],"tags":[],"class_list":["post-1679","post","type-post","status-publish","format-standard","hentry","category-canal"],"_links":{"self":[{"href":"https:\/\/www.appblog.cn\/index.php\/wp-json\/wp\/v2\/posts\/1679","targetHints":{"allow":["GET"]}}],"collection":[{"href":"https:\/\/www.appblog.cn\/index.php\/wp-json\/wp\/v2\/posts"}],"about":[{"href":"https:\/\/www.appblog.cn\/index.php\/wp-json\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"https:\/\/www.appblog.cn\/index.php\/wp-json\/wp\/v2\/users\/1"}],"replies":[{"embeddable":true,"href":"https:\/\/www.appblog.cn\/index.php\/wp-json\/wp\/v2\/comments?post=1679"}],"version-history":[{"count":0,"href":"https:\/\/www.appblog.cn\/index.php\/wp-json\/wp\/v2\/posts\/1679\/revisions"}],"wp:attachment":[{"href":"https:\/\/www.appblog.cn\/index.php\/wp-json\/wp\/v2\/media?parent=1679"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"https:\/\/www.appblog.cn\/index.php\/wp-json\/wp\/v2\/categories?post=1679"},{"taxonomy":"post_tag","embeddable":true,"href":"https:\/\/www.appblog.cn\/index.php\/wp-json\/wp\/v2\/tags?post=1679"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}