Binlog Slave Client 用户文档

    修改 Drainer 的配置文件,设置输出为 Kafka,相关配置如下:

    自定义开发

    首先需要了解 Drainer 写入到 Kafka 中的数据格式:

    1. message Column {
    2. // 数据是否为 null
    3. optional bool is_null = 1 [ default = false ];
    4. // 保存 int 类型的数据
    5. optional int64 int64_value = 2;
    6. // 保存 uint、enum, set 类型的数据
    7. optional uint64 uint64_value = 3;
    8. // 保存 float、double 类型的数据
    9. optional double double_value = 4;
    10. // 保存 bit、blob、binary、json 类型的数据
    11. optional bytes bytes_value = 5;
    12. // 保存 date、time、decimal、text、char 类型的数据
    13. optional string string_value = 6;
    14. }
    15. // ColumnInfo 保存列的信息,包括列名、类型、是否为主键
    16. message ColumnInfo {
    17. optional string name = 1 [ (gogoproto.nullable) = false ];
    18. // MySQL 中小写的列字段类型
    19. // https://dev.mysql.com/doc/refman/8.0/en/data-types.html
    20. // numeric 类型:int bigint smallint tinyint float double decimal bit
    21. // string 类型:text longtext mediumtext char tinytext varchar
    22. // blob longblob mediumblob binary tinyblob varbinary
    23. // enum set
    24. // json 类型:json
    25. optional string mysql_type = 2 [ (gogoproto.nullable) = false ];
    26. optional bool is_primary_key = 3 [ (gogoproto.nullable) = false ];
    27. // Row 保存一行的具体数据
    28. message Row { repeated Column columns = 1; }
    29. // MutationType 表示 DML 的类型
    30. enum MutationType {
    31. Insert = 0;
    32. Update = 1;
    33. Delete = 2;
    34. }
    35. // Table 包含一个表的数据变更
    36. message Table {
    37. optional string schema_name = 1;
    38. optional string table_name = 2;
    39. repeated ColumnInfo column_info = 3;
    40. repeated TableMutation mutations = 4;
    41. }
    42. // TableMutation 保存一行数据的变更
    43. message TableMutation {
    44. required MutationType type = 1;
    45. // 修改后的数据
    46. required Row row = 2;
    47. // 修改前的数据,只对 Update MutationType 有效
    48. optional Row change_row = 3;
    49. }
    50. // DMLData 保存一个事务所有的 DML 造成的数据变更
    51. message DMLData {
    52. // `tables` 包含事务中所有表的数据变更
    53. repeated Table tables = 1;
    54. }
    55. message DDLData {
    56. // 当前使用的数据库
    57. optional string schema_name = 1;
    58. // 相关表
    59. optional string table_name = 2;
    60. // `ddl_query` 是原始的 DDL 语句 query
    61. optional bytes ddl_query = 3;
    62. }
    63. // BinlogType 为 Binlog 的类型,分为 DML 和 DDL
    64. enum BinlogType {
    65. DML = 0; // Has `dml_data`
    66. DDL = 1; // Has `ddl_query`
    67. }
    68. // Binlog 保存一个事务所有的变更,Kafka 中保存的数据为该结构数据序列化后的结果
    69. message Binlog {
    70. optional BinlogType type = 1 [ (gogoproto.nullable) = false ];
    71. optional int64 commit_ts = 2 [ (gogoproto.nullable) = false ];
    72. optional DMLData dml_data = 3;
    73. }

    Driver

    TiDB-Tools 项目提供了用于读取 Kafka 中 binlog 数据的 Driver,具有如下功能:

    • 读取 Kafka 的数据
    • 根据 commit ts 查找 binlog 在 kafka 中的储存位置

    使用该 Driver 时,用户需要配置如下信息:

    • KafkaAddr:Kafka 集群的地址
    • CommitTS:从哪个 commit ts 开始读取 binlog
    • Offset:从 Kafka 哪个 offset 开始读取,如果设置了 CommitTS 就不用配置该参数
    • ClusterID:TiDB 集群的 cluster ID
    • Topic: Kafka Topic 名称,如果 Topic 名称为空,将会使用 drainer _obinlog 中的默认名称
    • 使用该 Driver 将数据同步到 MySQL,该示例包含将 binlog 转化为 SQL 的具体方法
    • 使用该 Driver 将数据打印出来

    Driver 项目地址:Binlog Slave Driver