Kafka

    插件已经正式合并进官方仓库,以下使用介绍基于logstash 1.4相关版本,1.5及以后版本的使用后续依照官方文档持续更新。

    插件本身内容非常简单,其主要依赖同一作者写的 jruby-kafka 模块。需要注意的是:该模块仅支持 Kafka-0.8 版本。如果是使用 0.7 版本 kafka 的,将无法直接使 jruby-kafka 该模块和 logstash-kafka 插件。

    • 安装按照官方文档完全自动化的安装.或是可以通过以下方式手动自己安装插件,不过重点注意的是 kafka 的版本,上面已经指出了。

    Input 配置示例

    以下配置可以实现对 kafka 读取端(consumer)的基本使用。

    消费端更多详细的配置请查看 http://kafka.apache.org/documentation.html#consumerconfigs kafka 官方文档的消费者部分配置文档。

    消费端的一些比较有用的配置项:

    • group_id

    消费者分组,可以通过组 ID 去指定,不同的组之间消费是相互不受影响的,相互隔离。

    • topic_id
    • reset_beginning

    logstash 启动后从什么位置开始读取数据,默认是结束位置,也就是说 logstash 进程会以从上次读取结束时的偏移量开始继续读取,如果之前没有消费过,那么就开始从头读取.如果你是要导入原有数据,把这个设定改成 “true”, logstash 进程就从头开始读取.有点类似 cat ,但是读到最后一行不会终止,而是变成 tail -F ,继续监听相应数据。

    • decorate_events

    在输出消息的时候会输出自身的信息包括:消费消息的大小, topic 来源以及 consumer 的 group 信息。

    • rebalance_max_retries

    当有新的 consumer(logstash) 加入到同一 group 时,将会 reblance ,此后将会有 partitions 的消费端迁移到新的 consumer 上,如果一个 consumer 获得了某个 partition 的消费权限,那么它将会向 zookeeper 注册, Partition Owner registry 节点信息,但是有可能此时旧的 尚没有释放此节点,此值用于控制,注册节点的重试次数。

    指定时间内没有消息到达就抛出异常,一般不需要改。

    以上是相对重要参数的使用示例,更多参数可以选项可以跟据 查看 input 默认参数。

    注意

    1.想要使用多个 logstash 端协同消费同一个 topic 的话,那么需要把两个或是多个 logstash 消费端配置成相同的 group_idtopic_id, 但是前提是要把相应的 topic 分多个 partitions (区),多个消费者消费是无法保证消息的消费顺序性的。

    总结:保证消息的顺序,那就用一个 partitionkafka 的每个 partition 只能同时被同一个 group 中的一个 consumer 消费

    以下配置可以实现对 kafka 写入端 (producer) 的基本使用。

    生产端更多详细的配置请查看 kafka 官方文档的生产者部分配置文档。

    Output 解释

    生产的可设置性还是很多的,设置其实更多,以下是更多的设置:

    • compression_codec

    消息的压缩模式,默认是 none,可以有 gzip 和 snappy (暂时还未测试开启压缩与不开启的性能,数据传输大小等对比)。

    • compressed_topics

    可以针对特定的 topic 进行压缩,设置这个参数为 topic ,表示此 topic 进行压缩。

    • request_required_acks
    • partitioner_class

    分区的策略,默认是 hash 取模

    • send_buffer_bytes

    socket 的缓存大小设置,其实就是缓冲区的大小

    消息模式相关

    • serializer_class

    消息体的系列化处理类,转化为字节流进行传输,请注意 encoder 必须和下面的 key_serializer_class 使用相同的类型

    • key_serializer_class

    默认的是与 serializer_class 相同

    • producer_type

    生产者的类型 async 异步执行消息的发送 sync 同步执行消息的发送

    • queue_buffering_max_ms

    异步模式下,那么就会在设置的时间缓存消息,并一次性发送

    • queue_buffering_max_messages

    异步的模式下,最长等待的消息数

    异步模式下,进入队列的等待时间,若是设置为0,那么要么进入队列,要么直接抛弃

    • batch_num_messages

    异步模式下,每次发送的最大消息数,前提是触发了 queue_buffering_max_messages 或是 queue_enqueue_timeout_ms 的限制

    以上是相对重要参数的使用示例,更多参数可以选项可以跟据 https://github.com/joekiller/logstash-kafka/blob/master/README.md 查看 output 默认参数。