TiCDC Canal-JSON Protocol

    当使用 MQ (Message Queue) 作为下游 Sink 时,你可以在 sink-uri 中指定使用 Canal-JSON,TiCDC 将以 Event 为基本单位封装构造 Canal-JSON Message,向下游发送 TiDB 的数据变更事件。

    Event 分为三类:

    • DDL Event:代表 DDL 变更记录,在上游成功执行 DDL 语句后发出,DDL Event 会被发送到索引为 0 的 MQ Partition。
    • DML Event:代表一行数据变更记录,在行变更发生时该类 Event 被发出,包含变更后该行的相关信息。
    • WATERMARK Event:代表一个特殊的时间点,表示在这个时间点前收到的 Event 是完整的。仅适用于 TiDB 扩展字段,当你在 sink-uri 中设置 enable-tidb-extension=true 时生效。

    使用 Canal-JSON 时的配置样例如下所示:

    TiDB 扩展字段

    Canal-JSON 协议本是为 MySQL 设计的,其中并不包含 TiDB 专有的 CommitTS 事务唯一标识等重要字段。为了解决这个问题,TiCDC 在 Canal-JSON 协议格式中附加了 TiDB 扩展字段。在 sink-uri 中设置 enable-tidb-extensiontrue 后,TiCDC 生成 Canal-JSON 消息时的行为如下:

    • TiCDC 发送的 DML Event 和 DDL Event 类型消息中,将会含有一个名为 _tidb 的字段。
    • TiCDC 将会发送 WATERMARK Event 消息。·

    配置样例如下所示:

    1. cdc cli changefeed create --pd=http://127.0.0.1:2379 --changefeed-id="kafka-canal-json-enable-tidb-extension" --sink-uri="kafka://127.0.0.1:9092/topic-name?kafka-version=2.4.0&protocol=canal-json&enable-tidb-extension=true"

    enable-tidb-extension 默认为 false,仅当使用 Canal-JSON 时生效。

    下面介绍 DDL Event、DML Event 和 WATERMARK Event 的格式定义,以及消费端的数据解析。

    TiCDC 会把一个 DDL Event 编码成如下 Canal-JSON 格式:

    1. {
    2. "id": 0,
    3. "database": "test",
    4. "table": "",
    5. "pkNames": null,
    6. "isDdl": true,
    7. "type": "QUERY",
    8. "es": 1639633094670,
    9. "ts": 1639633095489,
    10. "sql": "drop database if exists test",
    11. "sqlType": null,
    12. "mysqlType": null,
    13. "data": null,
    14. "old": null,
    15. "_tidb": { // TiDB 的扩展字段
    16. "commitTs": 163963309467037594
    17. }
    18. }

    以上 JSON 数据的字段解释如下:

    DML Event

    对于一行 DML 数据变更事件,TiCDC 会将其编码成如下形式:

    1. {
    2. "id": 0,
    3. "database": "test",
    4. "table": "tp_int",
    5. "pkNames": [
    6. "id"
    7. ],
    8. "isDdl": false,
    9. "type": "INSERT",
    10. "es": 1639633141221,
    11. "ts": 1639633142960,
    12. "sql": "",
    13. "sqlType": {
    14. "c_bigint": -5,
    15. "c_int": 4,
    16. "c_mediumint": 4,
    17. "c_smallint": 5,
    18. "c_tinyint": -6,
    19. "id": 4
    20. },
    21. "mysqlType": {
    22. "c_bigint": "bigint",
    23. "c_int": "int",
    24. "c_mediumint": "mediumint",
    25. "c_smallint": "smallint",
    26. "c_tinyint": "tinyint",
    27. "id": "int"
    28. },
    29. "data": [
    30. {
    31. "c_bigint": "9223372036854775807",
    32. "c_int": "2147483647",
    33. "c_mediumint": "8388607",
    34. "c_smallint": "32767",
    35. "c_tinyint": "127",
    36. "id": "2"
    37. }
    38. ],
    39. "old": null,
    40. "_tidb": { // TiDB 的扩展字段
    41. "commitTs": 163963314122145239
    42. }

    WATERMARK Event

    当你收到一个该类型的事件,所有 commitTs 小于 watermarkTs 的事件均已发送完毕。因为 TiCDC 提供 At Least Once 语义,可能出现重复发送数据的情况。如果后续收到有 commitTs 小于 watermarkTs 的事件,可以忽略。

    WATERMARK Event 的示例如下:

    从上面的示例中可知,Canal-JSON 具有统一的数据格式,针对不同的事件类型,有不同的字段填充规则。消费者可以使用统一的方法对该 JSON 格式的数据进行解析,然后通过判断字段值的方式,来确定具体事件类型:

    • isDdl 为 true 时,该消息含有一条 DDL Event。
    • isDdl 为 false 时,需要对 type 字段加以判断。如果 typeTIDB_WATERMARK,可得知其为 WATERMARK Event,否则就是 DML Event。

    字段说明

    Canal-JSON 格式会在 mysqlType 字段和 sqlType 字段中记录对应的数据类型。

    MySQL Type 字段

    Canal-JSON 格式会在 字段中记录每一列的 MySQL Type 的字符串表示。相关详情可以参考 。

    SQL Type 字段

    Canal-JSON 格式会在 sqlType 字段中记录每一列的 Java SQL Type,即每条数据在 JDBC 中对应的数据类型,其值可以通过 MySQL Type 和具体数据值计算得到。具体对应关系如下:

    你需要考虑整数类型是否有 Unsigned 约束,以及当前取值大小,分别对应不同的 Java SQL Type Code。如下表所示。

    TiCDC 涉及的 Java SQL Type 及其 Code 映射关系如下表所示。

    想要了解 Java SQL Type 的更多信息,请参考 。

    TiCDC Canal-JSON 和 Canal 官方实现对比

    TiCDC 对 Canal-JSON 数据格式的实现,包括 Update 类型事件和 mysqlType 字段,和官方有些许不同。主要差异见下表。

    假设在上游 TiDB 按顺序执行如下 SQL 语句:

    1. create table tp_int
    2. (
    3. id int auto_increment,
    4. c_tinyint tinyint null,
    5. c_smallint smallint null,
    6. c_mediumint mediumint null,
    7. c_int int null,
    8. c_bigint bigint null,
    9. constraint pk
    10. primary key (id)
    11. );
    12. insert into tp_int(c_tinyint, c_smallint, c_mediumint, c_int, c_bigint)
    13. values (127, 32767, 8388607, 2147483647, 9223372036854775807);
    14. update tp_int set c_int = 0, c_tinyint = 0 where c_smallint = 32767;

    对于 update 语句,TiCDC 将会输出一条 typeUPDATE 的事件消息,如下所示。该 update 语句仅对 c_intc_tinyint 两列进行了修改。输出事件消息的 old 字段,则包含所有列数据。

    1. {
    2. "id": 0,
    3. ...
    4. "type": "UPDATE",
    5. ...
    6. "sqlType": {
    7. ...
    8. },
    9. "mysqlType": {
    10. ...
    11. },
    12. "data": [
    13. {
    14. "c_bigint": "9223372036854775807",
    15. "c_int": "0",
    16. "c_mediumint": "8388607",
    17. "c_smallint": "32767",
    18. "c_tinyint": "0",
    19. "id": "2"
    20. }
    21. ],
    22. "old": [ // TiCDC 输出事件消息的 `old` 字段,则包含所有列数据。
    23. {
    24. "c_bigint": "9223372036854775807",
    25. "c_int": "2147483647", // 修改的列
    26. "c_mediumint": "8388607",
    27. "c_smallint": "32767",
    28. "c_tinyint": "127", // 修改的列
    29. "id": "2"
    30. }
    31. ]
    32. }

    官方 Canal 输出事件消息的 old 字段仅包含被修改的列数据。示例如下。

    1. {
    2. "id": 0,
    3. ...
    4. "type": "UPDATE",
    5. ...
    6. "sqlType": {
    7. ...
    8. },
    9. "mysqlType": {
    10. ...
    11. },
    12. "data": [
    13. {
    14. "c_bigint": "9223372036854775807",
    15. "c_int": "0",
    16. "c_mediumint": "8388607",
    17. "c_smallint": "32767",
    18. "c_tinyint": "0",
    19. "id": "2"
    20. }
    21. ],
    22. "old": [ // Canal 输出事件消息的 `old` 字段,仅包含被修改的列的数据。
    23. {
    24. "c_int": "2147483647", // 修改的列
    25. "c_tinyint": "127", // 修改的列
    26. }
    27. ]

    mysqlType 字段

    对于 mysqlType 字段,Canal 官方实现中,对于含有参数的类型,会包含完整的参数信息,TiCDC 实现则没有类型参数信息。

    在下面示例的表定义 SQL 语句中,如 decimal / char / varchar / enum 等类型,都含有参数。对比 TiCDC 和 Canal 官方实现分别生成的 Canal-JSON 格式数据可知,在 mysqlType 字段中的数据,TiCDC 实现只包含基本 MySQL Type。如果业务需要类型参数信息,需要你自行通过其他方式实现。

    假设在上游数据库按顺序执行如下 SQL 语句:

    TiCDC 输出内容如下:

    1. {
    2. "id": 0,
    3. ...
    4. "isDdl": false,
    5. ...
    6. },
    7. "mysqlType": {
    8. "c_binary": "binary",
    9. "c_bit": "bit",
    10. "c_char": "char",
    11. "c_decimal": "decimal",
    12. "c_enum": "enum",
    13. "c_set": "set",
    14. "c_varbinary": "varbinary",
    15. "c_varchar": "varchar",
    16. "id": "int"
    17. },
    18. "data": [
    19. {
    20. ...
    21. }
    22. ],
    23. "old": null,
    24. }

    Canal 官方实现输出内容如下:

    1. {
    2. "id": 0,
    3. ...
    4. "isDdl": false,
    5. "sqlType": {
    6. ...
    7. },
    8. "mysqlType": {
    9. "c_binary": "binary(16)",
    10. "c_bit": "bit(64)",
    11. "c_char": "char(16)",
    12. "c_decimal": "decimal(10, 4)",
    13. "c_enum": "enum('a','b','c')",
    14. "c_set": "set('a','b','c')",
    15. "c_varbinary": "varbinary(16)",
    16. "c_varchar": "varchar(16)",
    17. "id": "int"
    18. },
    19. "data": [
    20. {
    21. ...
    22. }
    23. ],
    24. "old": null,
    25. }

    Delete 类型事件中 Old 字段的变化说明

    TiCDC 实现的 Canal-JSON 格式,v5.4.0 及以后版本的实现,和之前的有些许不同,具体如下:

    • Delete 类型事件,Old 字段的内容发生了变化。

    如下是一个 DELETE 事件的数据内容,在 v5.4.0 前的实现中,”old” 的内容和 “data” 相同,在 v5.4.0 及之后的实现中,”old” 将被设为 null。你可以通过 “data” 字段获取到被删除的数据。

    1. {
    2. "id": 0,
    3. "database": "test",
    4. ...
    5. "type": "DELETE",
    6. ...
    7. "sqlType": {
    8. ...
    9. },
    10. "mysqlType": {
    11. ...
    12. },
    13. "data": [
    14. {
    15. "c_bigint": "9223372036854775807",
    16. "c_int": "0",
    17. "c_mediumint": "8388607",
    18. "c_smallint": "32767",
    19. "c_tinyint": "0",
    20. "id": "2"
    21. }
    22. ],
    23. "old": null,
    24. // 以下示例是 v5.4.0 之前的实现,`old` 内容等同于 `data` 内容
    25. "old": [
    26. {
    27. "c_bigint": "9223372036854775807",
    28. "c_int": "0",
    29. "c_mediumint": "8388607",
    30. "c_smallint": "32767",
    31. "c_tinyint": "0",
    32. "id": "2"
    33. }
    34. ]
    35. }