Kafka

    Kafka 特性:

    • 发布或者订阅数据流。
    • 容错存储机制。
    • 处理流数据。

    老版格式:

    新版格式:

    1. kafka_broker_list = 'localhost:9092',
    2. kafka_topic_list = 'topic1,topic2',
    3. kafka_group_name = 'group1',
    4. kafka_format = 'JSONEachRow',
    5. kafka_row_delimiter = '\n',
    6. kafka_schema = '',
    7. kafka_num_consumers = 2

    必要参数:

    • kafka_broker_list – 以逗号分隔的 brokers 列表 (localhost:9092)。
    • kafka_topic_list – topic 列表 (my_topic)。
    • kafka_group_name – Kafka 消费组名称 (group1)。如果不希望消息在集群中重复,请在每个分片中使用相同的组名。
    • kafka_format – 消息体格式。使用与 SQL 部分的 FORMAT 函数相同表示方法,例如 JSONEachRow。了解详细信息,请参考 Formats 部分。

    可选参数:

    • kafka_schema – 如果解析格式需要一个 schema 时,此参数必填。例如,普罗托船长 需要 schema 文件路径以及根对象 的名字。
    • kafka_num_consumers – 单个表的消费者数量。默认值是:1,如果一个消费者的吞吐量不足,则指定更多的消费者。消费者的总数不应该超过 topic 中分区的数量,因为每个分区只能分配一个消费者。

    消费的消息会被自动追踪,因此每个消息在不同的消费组里只会记录一次。如果希望获得两次数据,则使用另一个组名创建副本。

    消费组可以灵活配置并且在集群之间同步。例如,如果群集中有10个主题和5个表副本,则每个副本将获得2个主题。 如果副本数量发生变化,主题将自动在副本中重新分配。了解更多信息请访问 。

    SELECT 查询对于读取消息并不是很有用(调试除外),因为每条消息只能被读取一次。使用物化视图创建实时线程更实用。您可以这样做:

    1. 使用引擎创建一个 Kafka 消费者并作为一条数据流。
    2. 创建一个结构表。
    3. 创建物化视图,改视图会在后台转换引擎中的数据并将其放入之前创建的表中。

    MATERIALIZED VIEW 添加至引擎,它将会在后台收集数据。可以持续不断地从 Kafka 收集数据并通过 SELECT 将数据转换为所需要的格式。

    示例:

    1. CREATE TABLE queue (
    2. timestamp UInt64,
    3. level String,
    4. message String
    5. ) ENGINE = Kafka('localhost:9092', 'topic', 'group1', 'JSONEachRow');
    6. CREATE TABLE daily (
    7. day Date,
    8. level String,
    9. total UInt64
    10. ) ENGINE = SummingMergeTree(day, (day, level), 8192);
    11. CREATE MATERIALIZED VIEW consumer TO daily
    12. AS SELECT toDate(toDateTime(timestamp)) AS day, level, count() as total
    13. SELECT level, sum(total) FROM daily GROUP BY level;

    停止接收主题数据或更改转换逻辑,请 detach 物化视图:

    如果使用 ALTER 更改目标表,为了避免目标表与视图中的数据之间存在差异,推荐停止物化视图。

    GraphiteMergeTree 类似,Kafka 引擎支持使用ClickHouse配置文件进行扩展配置。可以使用两个配置键:全局 (kafka) 和 主题级别 (kafka_*)。首先应用全局配置,然后应用主题级配置(如果存在)。

    1. <!-- Global configuration options for all tables of Kafka engine type -->
    2. <kafka>
    3. <debug>cgrp</debug>
    4. <auto_offset_reset>smallest</auto_offset_reset>
    5. </kafka>
    6. <!-- Configuration specific for topic "logs" -->
    7. <kafka_logs>
    8. <retry_backoff_ms>250</retry_backoff_ms>
    9. <fetch_min_bytes>100000</fetch_min_bytes>
    10. </kafka_logs>

    有关详细配置选项列表,请参阅 librdkafka配置参考。在 ClickHouse 配置中使用下划线 (_) ,并不是使用点 (.)。例如, 将是 <check_crcs>true</check_crcs>