Flink Doris Connector

    Flink Doris Connector 可以支持通过 Flink 操作(读取、插入、修改、删除) Doris 中存储的数据。

    代码库地址:https://github.com/apache/doris-flink-connector

    • 可以将 表映射为 DataStream 或者 Table

    注意:

    1. 修改和删除只支持在 Unique Key 模型上
    2. 目前的删除是支持 Flink CDC 的方式接入数据实现自动删除,如果是其他数据接入的方式删除需要自己实现。Flink CDC 的数据删除使用方式参照本文档最后一节

    编译与安装

    准备工作

    1.修改custom_env.sh.tpl文件,重命名为custom_env.sh

    2.指定thrift安装目录

    安装 thrift 0.13.0 版本(注意:Doris 0.15 和最新的版本基于 thrift 0.13.0 构建, 之前的版本依然使用thrift 0.9.3 构建) Windows: 1.下载:http://archive.apache.org/dist/thrift/0.13.0/thrift-0.13.0.exe(下载目录自己指定) 2.修改thrift-0.13.0.exe 为 thrift

    MacOS:

    1. 1. 下载:`brew install thrift@0.13.0`
    2. 2. 默认下载地址:/opt/homebrew/Cellar/thrift@0.13.0/0.13.0/bin/thrift

    注:MacOS执行 brew install thrift@0.13.0 可能会报找不到版本的错误,解决方法如下,在终端执行:

    1. 1. `brew tap-new $USER/local-tap`
    2. 2. `brew extract --version='0.13.0' thrift $USER/local-tap`
    3. 3. `brew install thrift@0.13.0`

    参考链接: https://gist.github.com/tonydeng/02e571f273d6cce4230dc8d5f394493c

    在源码目录下执行:

    1. sh build.sh
    2. Usage:
    3. build.sh --flink version # specify flink version (after flink-doris-connector v1.2 and flink-1.15, there is no need to provide scala version)
    4. build.sh --tag # this is a build from tag
    5. e.g.:
    6. build.sh --flink 1.16.0
    7. build.sh --tag

    然后按照你需要版本执行命令编译即可,例如: sh build.sh --flink 1.16.0

    编译成功后,会在 target/ 目录下生成文件,如:flink-doris-connector-1.16-1.3.0-SNAPSHOT.jar 。将此文件复制到 Flinkclasspath 中即可使用 Flink-Doris-Connector 。例如, Local 模式运行的 Flink ,将此文件放入 lib/ 文件夹下。 Yarn 集群模式运行的 Flink ,则将此文件放入预部署包中。

    备注

    1. Doris FE 要在配置中配置启用 http v2

    ​ conf/fe.conf

    使用 Maven 管理

    添加 flink-doris-connector

    1. <!-- flink-doris-connector -->
    2. <dependency>
    3. <groupId>org.apache.doris</groupId>
    4. <artifactId>flink-doris-connector-1.16</artifactId>
    5. <version>1.3.0</version>
    6. </dependency>

    备注

    1.请根据不同的 Flink 版本替换对应的 Connector 和 Flink 依赖版本。

    2.也可从这里下载相关版本jar包。

    Flink 读写 Doris 数据主要有两种方式

    • SQL
    • DataStream
    • SQL 使用 WITH 参数 sink.properties. 配置
    • DataStream 使用方法DorisExecutionOptions.builder().setStreamLoadProp(Properties)配置

    SQL

    • Source
    1. CREATE TABLE flink_doris_source (
    2. name STRING,
    3. age INT,
    4. price DECIMAL(5,2),
    5. sale DOUBLE
    6. )
    7. WITH (
    8. 'fenodes' = 'FE_IP:8030',
    9. 'table.identifier' = 'database.table',
    10. 'username' = 'root',
    11. 'password' = 'password'
    12. );
    • Sink
    1. -- enable checkpoint
    2. SET 'execution.checkpointing.interval' = '10s';
    3. CREATE TABLE flink_doris_sink (
    4. name STRING,
    5. age INT,
    6. sale DOUBLE
    7. )
    8. WITH (
    9. 'connector' = 'doris',
    10. 'fenodes' = 'FE_IP:8030',
    11. 'table.identifier' = 'db.table',
    12. 'username' = 'root',
    13. 'password' = 'password',
    14. 'sink.label-prefix' = 'doris_label'
    15. );
    • Insert
    • Source
    1. DorisOptions.Builder builder = DorisOptions.builder()
    2. .setFenodes("FE_IP:8030")
    3. .setTableIdentifier("db.table")
    4. .setUsername("root")
    5. .setPassword("password");
    6. DorisSource<List<?>> dorisSource = DorisSourceBuilder.<List<?>>builder()
    7. .setDorisOptions(builder.build())
    8. .setDorisReadOptions(DorisReadOptions.builder().build())
    9. .setDeserializer(new SimpleListDeserializationSchema())
    10. .build();
    11. env.fromSource(dorisSource, WatermarkStrategy.noWatermarks(), "doris source").print();
    • Sink

    String 数据流

    1. // enable checkpoint
    2. env.enableCheckpointing(10000);
    3. // using batch mode for bounded data
    4. env.setRuntimeMode(RuntimeExecutionMode.BATCH);
    5. DorisSink.Builder<String> builder = DorisSink.builder();
    6. DorisOptions.Builder dorisBuilder = DorisOptions.builder();
    7. dorisBuilder.setFenodes("FE_IP:8030")
    8. .setTableIdentifier("db.table")
    9. .setUsername("root")
    10. .setPassword("password");
    11. DorisExecutionOptions.Builder executionBuilder = DorisExecutionOptions.builder();
    12. executionBuilder.setLabelPrefix("label-doris"); //streamload label prefix
    13. builder.setDorisReadOptions(DorisReadOptions.builder().build())
    14. .setDorisExecutionOptions(executionBuilder.build())
    15. .setSerializer(new SimpleStringSerializer()) //serialize according to string
    16. .setDorisOptions(dorisBuilder.build());
    17. //mock string source
    18. List<Tuple2<String, Integer>> data = new ArrayList<>();
    19. data.add(new Tuple2<>("doris",1));
    20. DataStreamSource<Tuple2<String, Integer>> source = env.fromCollection(data);
    21. source.map((MapFunction<Tuple2<String, Integer>, String>) t -> t.f0 + "\t" + t.f1)

    RowData 数据流

    1. // enable checkpoint
    2. env.enableCheckpointing(10000);
    3. // using batch mode for bounded data
    4. env.setRuntimeMode(RuntimeExecutionMode.BATCH);
    5. //doris sink option
    6. DorisSink.Builder<RowData> builder = DorisSink.builder();
    7. DorisOptions.Builder dorisBuilder = DorisOptions.builder();
    8. .setTableIdentifier("db.table")
    9. .setUsername("root")
    10. .setPassword("password");
    11. // json format to streamload
    12. Properties properties = new Properties();
    13. properties.setProperty("format", "json");
    14. properties.setProperty("read_json_by_line", "true");
    15. DorisExecutionOptions.Builder executionBuilder = DorisExecutionOptions.builder();
    16. executionBuilder.setLabelPrefix("label-doris") //streamload label prefix
    17. .setStreamLoadProp(properties); //streamload params
    18. //flink rowdata‘s schema
    19. String[] fields = {"city", "longitude", "latitude", "destroy_date"};
    20. DataType[] types = {DataTypes.VARCHAR(256), DataTypes.DOUBLE(), DataTypes.DOUBLE(), DataTypes.DATE()};
    21. builder.setDorisReadOptions(DorisReadOptions.builder().build())
    22. .setDorisExecutionOptions(executionBuilder.build())
    23. .setSerializer(RowDataSerializer.builder() //serialize according to rowdata
    24. .setFieldNames(fields)
    25. .setType("json") //json format
    26. .setFieldType(types).build())
    27. .setDorisOptions(dorisBuilder.build());
    28. //mock rowdata source
    29. DataStream<RowData> source = env.fromElements("")
    30. .map(new MapFunction<String, RowData>() {
    31. @Override
    32. public RowData map(String value) throws Exception {
    33. GenericRowData genericRowData = new GenericRowData(4);
    34. genericRowData.setField(0, StringData.fromString("beijing"));
    35. genericRowData.setField(1, 116.405419);
    36. genericRowData.setField(2, 39.916927);
    37. genericRowData.setField(3, LocalDate.now().toEpochDay());
    38. return genericRowData;
    39. }
    40. });
    41. source.sinkTo(builder.build());

    配置

    通用配置项

    Java示例

    samples/doris-demo/ 下提供了 Java 版本的示例,可供参考,查看点击这里

    最佳实践

    使用 Flink Doris Connector最适合的场景就是实时/批次同步源数据(Mysql,Oracle,PostgreSQL等)到Doris,使用Flink对Doris中的数据和其他数据源进行联合分析,也可以使用Flink Doris Connector。

    其他

    1. Flink Doris Connector主要是依赖Checkpoint进行流式写入,所以Checkpoint的间隔即为数据的可见延迟时间。
    2. 为了保证Flink的Exactly Once语义,Flink Doris Connector 默认开启两阶段提交,Doris在1.1版本后默认开启两阶段提交。1.0可通过修改BE参数开启,可参考two_phase_commit
    1. Doris Source在数据读取完成后,流为什么就结束了?

    目前Doris Source是有界流,不支持CDC方式读取。

    1. Flink读取Doris可以进行条件下推吗?

    通过配置doris.filter.query参数,详情参考配置小节。

    1. 如何写入Bitmap类型?
    1. CREATE TABLE bitmap_sink (
    2. dt int,
    3. page string,
    4. user_id int
    5. )
    6. WITH (
    7. 'connector' = 'doris',
    8. 'fenodes' = '127.0.0.1:8030',
    9. 'table.identifier' = 'test.bitmap_test',
    10. 'username' = 'root',
    11. 'password' = '',
    12. 'sink.label-prefix' = 'doris_label',
    13. 'sink.properties.columns' = 'dt,page,user_id,user_id=to_bitmap(user_id)'
    1. errCode = 2, detailMessage = Label [label_0_1] has already been used, relate to txn [19650]

    Exactly-Once场景下,Flink Job重启时必须从最新的Checkpoint/Savepoint启动,否则会报如上错误。 不要求Exactly-Once时,也可通过关闭2PC提交(sink.enable-2pc=false) 或更换不同的sink.label-prefix解决。

    1. errCode = 2, detailMessage = transaction [19650] not found

    发生在Commit阶段,checkpoint里面记录的事务ID,在FE侧已经过期,此时再次commit就会出现上述错误。 此时无法从checkpoint启动,后续可通过修改fe.conf的streaming_label_keep_max_second配置来延长过期时间,默认12小时。

    1. errCode = 2, detailMessage = current running txns on db 10006 is 100, larger than limit 100

    这是因为同一个库并发导入超过了100,可通过调整 fe.conf的参数 max_running_txn_num_per_db 来解决。具体可参考 max_running_txn_num_per_db

    1. Flink写入Uniq模型时,如何保证一批数据的有序性?

    可以添加sequence列配置来保证,具体可参考

    1. Flink任务没报错,但是无法同步数据?

    通常发生在Connector1.1.0之前,是由于写入频率过快,导致版本过多。可以通过设置sink.batch.size 和 sink.batch.interval参数来降低Streamload的频率。