TiCDC Open Protocol

    TiCDC Open Protocol 以 Event 为基本单位向下游复制数据变更事件,Event 分为三类:

    • Row Changed Event:代表一行的数据变化,在行发生变更时该 Event 被发出,包含变更后该行的相关信息。
    • DDL Event:代表 DDL 变更,在上游成功执行 DDL 后发出,DDL Event 会广播到每一个 MQ Partition 中。
    • Resolved Event:代表一个特殊的时间点,表示在这个时间点前的收到的 Event 是完整的。

    协议约束

    • 在绝大多数情况下,一个版本的 Row Changed Event 只会发出一次,但是特殊情况(节点故障、网络分区等)下,同一版本的 Row Changed Event 可能会多次发送。
    • 同一张表中的每一个版本第一次发出的 Row Changed Event 在 Event 流中一定是按 TS (timestamp) 顺序递增的。
    • Resolved Event 会被周期性的广播到各个 MQ Partition,Resolved Event 意味着任何 TS 小于 Resolved Event TS 的 Event 已经发送给下游。
    • DDL Event 将被广播到各个 MQ Partition。
    • 一行数据的多个 Row Changed Event 一定会被发送到同一个 MQ Partition 中。

    Message 格式定义

    一个 Message 中包含一个或多个 Event,按照以下格式排列:

    Key:

    Value:

    Offset(Byte) 0~7 8~(7+长度1)
    参数 长度1 Event Value1 长度N Event ValueN
    • 代表第 N 个 Key/Value 的长度
    • 长度及协议版本号均为大端序 int64 类型
    • 当前协议版本号为 1

    本部分介绍 Row Changed Event、DDL Event 和 Resolved Event 的格式定义。

    • Key:

      | 参数 | 类型 | 说明 | | :————— | :——- | :——————————- | | TS | Number | 造成 Row 变更的事务的 TS | | Schema Name | String | Row 所在的 Schema 的名字 | | Table Name | String | Row 所在的 Table 的名字 |

    • Value:

      Insert 事件,输出新增的行数据。

      1. {
      2. "u":{
      3. <Column Name>:{
      4. "t":<Column Type>,
      5. "h":<Where Handle>,
      6. "f":<Flag>,
      7. "v":<Column Value>
      8. },
      9. <Column Name>:{
      10. "t":<Column Type>,
      11. "h":<Where Handle>,
      12. "f":<Flag>,
      13. "v":<Column Value>
      14. }
      15. }
      16. }
      1. {
      2. "u":{
      3. <Column Name>:{
      4. "t":<Column Type>,
      5. "f":<Flag>,
      6. "v":<Column Value>
      7. },
      8. <Column Name>:{
      9. "t":<Column Type>,
      10. "h":<Where Handle>,
      11. "f":<Flag>,
      12. "v":<Column Value>
      13. }
      14. },
      15. <Column Name>:{
      16. "t":<Column Type>,
      17. "h":<Where Handle>,
      18. "f":<Flag>,
      19. "v":<Column Value>
      20. },
      21. <Column Name>:{
      22. "t":<Column Type>,
      23. "h":<Where Handle>,
      24. "f":<Flag>,
      25. "v":<Column Value>
      26. }
      27. }
      28. }

      Delete 事件,输出被删除的行数据。当 Old Value 特性开启时,Delete 事件中包含被删除的行数据中的所有列;当 Old Value 特性关闭时,Delete 事件中仅包含 列。

      1. {
      2. "d":{
      3. <Column Name>:{
      4. "t":<Column Type>,
      5. "h":<Where Handle>,
      6. "f":<Flag>,
      7. "v":<Column Value>
      8. },
      9. <Column Name>:{
      10. "t":<Column Type>,
      11. "h":<Where Handle>,
      12. "f":<Flag>,
      13. "v":<Column Value>
      14. }
      15. }
      16. }
    • Key:

      1. {
      2. "ts":<TS>,
      3. "t":2
      4. }

      | 参数 | 类型 | 说明 | | :————— | :——- | :————————————————— | | TS | Number | 进行 DDL 变更的事务的 TS | | Schema Name | String | DDL 变更的 Schema 的名字,可能为空字符串 | | Table Name | String | DDL 变更的 Table 的名字,可能为空字符串 |

    • Value:

      | 参数 | 类型 | 说明 | | :———— | :——- | :—————— | | DDL Query | String | DDL Query SQL | | DDL Type | String | DDL 类型,详见:DDL 的类型码 |

    • Key:

      1. {
      2. "ts":<TS>,
      3. "t":3
      4. }

      | 参数 | 类型 | 说明 | | :————— | :——- | :————————————————————— | | TS | Number | Resolved TS,任意小于该 TS 的 Event 已经发送完毕 |

    • Value: None

    Event 流的输出示例

    本部分展示并描述 Event 流的输出日志。

    1. CREATE TABLE test.t1(id int primary key, val varchar(16));

    如以下执行日志中的 Log 1、Log 3 所示,DDL Event 将被广播到所有 MQ Partition,Resolved Event 会被周期性地广播到各个 MQ Partition:

    1. 1. [partition=0] [key="{\"ts\":415508856908021766,\"scm\":\"test\",\"tbl\":\"t1\",\"t\":2}"] [value="{\"q\":\"CREATE TABLE test.t1(id int primary key, val varchar(16))\",\"t\":3}"]
    2. 2. [partition=0] [key="{\"ts\":415508856908021766,\"t\":3}"] [value=]
    3. 3. [partition=1] [key="{\"ts\":415508856908021766,\"scm\":\"test\",\"tbl\":\"t1\",\"t\":2}"] [value="{\"q\":\"CREATE TABLE test.t1(id int primary key, val varchar(16))\",\"t\":3}"]
    4. 4. [partition=1] [key="{\"ts\":415508856908021766,\"t\":3}"] [value=]

    在上游执行以下 SQL 语句:

    1. BEGIN;
    2. INSERT INTO test.t1(id, val) VALUES (1, 'aa');
    3. INSERT INTO test.t1(id, val) VALUES (2, 'aa');
    4. UPDATE test.t1 SET val = 'bb' WHERE id = 2;
    5. INSERT INTO test.t1(id, val) VALUES (3, 'cc');
    6. COMMIT;
    • 如以下执行日志中的 Log 5 和 Log 6 所示,同一张表内的 Row Changed Event 可能会根据主键被分派到不同的 Partition,但同一行的变更一定会分派到同一个 Partition,方便下游并发处理。
    • 如 Log 6 所示,在一个事务内对同一行进行多次修改,只会发出一个 Row Changed Event。
    • Log 8 是 Log 7 的重复 Event。Row Changed Event 可能重复,但每个版本的 Event 第一次发出的次序一定是有序的。

    在上游执行以下 SQL 语句:

    1. BEGIN;
    2. DELETE FROM test.t1 WHERE id = 1;
    3. UPDATE test.t1 SET val = 'dd' WHERE id = 3;
    4. UPDATE test.t1 SET id = 4, val = 'ee' WHERE id = 2;
    5. COMMIT;
    • Log 9 是 Delete 类型的 Row Changed Event,这种类型的 Event 只包含主键列或唯一索引列。
    • Log 13 和 Log 14 是 Resolved Event。Resolved Event 意味着在这个 Partition 中,任意小于 Resolved TS 的 Event(包括 Row Changed Event 和 DDL Event)已经发送完毕。
    1. 9. [partition=0] [key="{\"ts\":415508881418485761,\"scm\":\"test\",\"tbl\":\"t1\",\"t\":1}"] [value="{\"d\":{\"id\":{\"t\":3,\"h\":true,\"v\":1}}}"]
    2. 10. [partition=1] [key="{\"ts\":415508881418485761,\"scm\":\"test\",\"tbl\":\"t1\",\"t\":1}"] [value="{\"d\":{\"id\":{\"t\":3,\"h\":true,\"v\":2}}}"]
    3. 11. [partition=0] [key="{\"ts\":415508881418485761,\"scm\":\"test\",\"tbl\":\"t1\",\"t\":1}"] [value="{\"u\":{\"id\":{\"t\":3,\"h\":true,\"v\":3},\"val\":{\"t\":15,\"v\":\"ZGQ=\"}}}"]
    4. 12. [partition=0] [key="{\"ts\":415508881418485761,\"scm\":\"test\",\"tbl\":\"t1\",\"t\":1}"] [value="{\"u\":{\"id\":{\"t\":3,\"h\":true,\"v\":4},\"val\":{\"t\":15,\"v\":\"ZWU=\"}}}"]
    5. 13. [partition=0] [key="{\"ts\":415508881038376963,\"t\":3}"] [value=]
    6. 14. [partition=1] [key="{\"ts\":415508881038376963,\"t\":3}"] [value=]

    消费端协议解析

    目前 TiCDC 没有提供 Open Protocol 协议解析的标准实现,但是提供了 Golang 版本和 Java 版本的解析 demo。用户可以参考本文档提供的数据格式和以下 demo 实现消费端协议解析。

    Column 的类型码用于标识 Row Changed Event 中列的数据类型。

    类型 Code 输出示例 说明
    TINYINT/BOOL 1 {“t”:1,”v”:1}
    SMALLINT 2 {“t”:2,”v”:1}
    INT 3 {“t”:3,”v”:123}
    FLOAT 4 {“t”:4,”v”:153.123}
    DOUBLE 5 {“t”:5,”v”:153.123}
    NULL 6 {“t”:6,”v”:null}
    TIMESTAMP 7 {“t”:7,”v”:”1973-12-30 15:30:00”}
    BIGINT 8 {“t”:8,”v”:123}
    MEDIUMINT 9 {“t”:9,”v”:123}
    DATE 10/14 {“t”:10,”v”:”2000-01-01”}
    TIME 11 {“t”:11,”v”:”23:59:59”}
    DATETIME 12 {“t”:12,”v”:”2015-12-20 23:58:58”}
    YEAR 13 {“t”:13,”v”:1970}
    VARCHAR/VARBINARY 15/253 {“t”:15,”v”:”测试”} / {“t”:15,”v”:”\\x89PNG\\r\\n\\x1a\\n”} value 编码为 UTF-8;当上游类型为 VARBINARY 时,将对不可见的字符转义
    BIT 16 {“t”:16,”v”:81}
    JSON 245 {“t”:245,”v”:”{\“key1\“: \“value1\“}”}
    DECIMAL 246 {“t”:246,”v”:”129012.1230000”}
    ENUM 247 {“t”:247,”v”:1}
    SET 248 {“t”:248,”v”:3}
    TINYTEXT/TINYBLOB 249 {“t”:249,”v”:”5rWL6K+VdGV4dA==”} value 编码为 Base64
    MEDIUMTEXT/MEDIUMBLOB 250 {“t”:250,”v”:”5rWL6K+VdGV4dA==”} value 编码为 Base64
    LONGTEXT/LONGBLOB 251 {“t”:251,”v”:”5rWL6K+VdGV4dA==”} value 编码为 Base64
    TEXT/BLOB 252 {“t”:252,”v”:”5rWL6K+VdGV4dA==”} value 编码为 Base64
    CHAR/BINARY 254 {“t”:254,”v”:”测试”} / {“t”:254,”v”:”\\x89PNG\\r\\n\\x1a\\n”} value 编码为 UTF-8;当上游类型为 BINARY 时,将对不可见的字符转义
    GEOMETRY 255 尚不支持

    DDL 的类型码

    DDL 的类型码用于标识 DDL Event 中的 DDL 语句的类型。

    列标志位

    列标志位以 Bit flags 形式标记列的相关属性。

    位移 名称 说明
    1 0x01 BinaryFlag 该列是否为二进制编码列
    2 0x02 HandleKeyFlag 该列是否为 Handle 列
    3 0x04 GeneratedColumnFlag 该列是否为生成列
    4 0x08 PrimaryKeyFlag 该列是否为主键列
    5 0x10 UniqueKeyFlag 该列是否为唯一索引列
    6 0x20 MultipleKeyFlag 该列是否为组合索引列
    7 0x40 NullableFlag 该列是否为可空列
    8 0x80 UnsignedFlag 该列是否为无符号列

    示例:

    若某列 Flag 值为 85,则代表这一列为可空列、唯一索引列、生成列、二进制编码列。

    1. 85 == 0b_101_0101
    2. == NullableFlag | UniqueKeyFlag | GeneratedColumnFlag | BinaryFlag
    1. 46 == 0b_010_1110
    2. == MultipleKeyFlag | PrimaryKeyFlag | GeneratedColumnFlag | HandleKeyFlag