Kafka连接器教程

    安装

    本教程假定用户熟悉openLooKeng且本地安装有可用的openLooKeng(见)。教程将专注于设置Apache Kafka并将其与openLooKeng集成。

    下载并解压Apache Kafka

    说明

    本教程使用Apache Kafka 0.8.1进行了测试。教程应适用于任何0.8.x版本的Apache Kafka。

    启动ZooKeeper和Kafka服务器:

    1. [2013-04-22 15:01:47,028] INFO Verifying properties (kafka.utils.VerifiableProperties)
    2. [2013-04-22 15:01:47,051] INFO Property socket.send.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties)
    3. ...

    这将在端口2181上启动ZooKeeper,在端口9092上启动Kafka。

    步骤2:加载数据

    从Maven Central下载tpch-kafka加载器:

    1. $ curl -o kafka-tpch https://repo1.maven.org/maven2/de/softwareforge/kafka_tpch_0811/1.0/kafka_tpch_0811-1.0.sh
    2. $ chmod 755 kafka-tpch

    现在运行kafka-tpch程序,预加载带有tpch数据多个主题:

    1. $ ./kafka-tpch load --brokers localhost:9092 --prefix tpch. --tpch-type tiny
    2. 2014-07-28T17:17:07.594-0700 INFO main io.airlift.log.Logging Logging to stderr
    3. 2014-07-28T17:17:07.623-0700 INFO main de.softwareforge.kafka.LoadCommand Processing tables: [customer, orders, lineitem, part, partsupp, supplier, nation, region]
    4. 2014-07-28T17:17:07.981-0700 INFO pool-1-thread-1 de.softwareforge.kafka.LoadCommand Loading table 'customer' into topic 'tpch.customer'...
    5. 2014-07-28T17:17:07.981-0700 INFO pool-1-thread-2 de.softwareforge.kafka.LoadCommand Loading table 'orders' into topic 'tpch.orders'...
    6. 2014-07-28T17:17:07.981-0700 INFO pool-1-thread-3 de.softwareforge.kafka.LoadCommand Loading table 'lineitem' into topic 'tpch.lineitem'...
    7. 2014-07-28T17:17:07.982-0700 INFO pool-1-thread-4 de.softwareforge.kafka.LoadCommand Loading table 'part' into topic 'tpch.part'...
    8. 2014-07-28T17:17:07.982-0700 INFO pool-1-thread-5 de.softwareforge.kafka.LoadCommand Loading table 'partsupp' into topic 'tpch.partsupp'...
    9. 2014-07-28T17:17:07.982-0700 INFO pool-1-thread-6 de.softwareforge.kafka.LoadCommand Loading table 'supplier' into topic 'tpch.supplier'...
    10. 2014-07-28T17:17:07.982-0700 INFO pool-1-thread-7 de.softwareforge.kafka.LoadCommand Loading table 'nation' into topic 'tpch.nation'...
    11. 2014-07-28T17:17:07.982-0700 INFO pool-1-thread-8 de.softwareforge.kafka.LoadCommand Loading table 'region' into topic 'tpch.region'...
    12. 2014-07-28T17:17:10.612-0700 ERROR pool-1-thread-8 kafka.producer.async.DefaultEventHandler Failed to collate messages by topic, partition due to: Failed to fetch topic metadata for topic: tpch.region
    13. 2014-07-28T17:17:10.781-0700 INFO pool-1-thread-8 de.softwareforge.kafka.LoadCommand Generated 5 rows for table 'region'.
    14. 2014-07-28T17:17:10.797-0700 ERROR pool-1-thread-3 kafka.producer.async.DefaultEventHandler Failed to collate messages by topic, partition due to: Failed to fetch topic metadata for topic: tpch.lineitem
    15. 2014-07-28T17:17:10.932-0700 ERROR pool-1-thread-1 kafka.producer.async.DefaultEventHandler Failed to collate messages by topic, partition due to: Failed to fetch topic metadata for topic: tpch.customer
    16. 2014-07-28T17:17:11.068-0700 ERROR pool-1-thread-2 kafka.producer.async.DefaultEventHandler Failed to collate messages by topic, partition due to: Failed to fetch topic metadata for topic: tpch.orders
    17. 2014-07-28T17:17:11.200-0700 ERROR pool-1-thread-6 kafka.producer.async.DefaultEventHandler Failed to collate messages by topic, partition due to: Failed to fetch topic metadata for topic: tpch.supplier
    18. 2014-07-28T17:17:11.319-0700 INFO pool-1-thread-6 de.softwareforge.kafka.LoadCommand Generated 100 rows for table 'supplier'.
    19. 2014-07-28T17:17:11.333-0700 ERROR pool-1-thread-4 kafka.producer.async.DefaultEventHandler Failed to collate messages by topic, partition due to: Failed to fetch topic metadata for topic: tpch.part
    20. 2014-07-28T17:17:11.466-0700 ERROR pool-1-thread-5 kafka.producer.async.DefaultEventHandler Failed to collate messages by topic, partition due to: Failed to fetch topic metadata for topic: tpch.partsupp
    21. 2014-07-28T17:17:11.597-0700 ERROR pool-1-thread-7 kafka.producer.async.DefaultEventHandler Failed to collate messages by topic, partition due to: Failed to fetch topic metadata for topic: tpch.nation
    22. 2014-07-28T17:17:11.706-0700 INFO pool-1-thread-7 de.softwareforge.kafka.LoadCommand Generated 25 rows for table 'nation'.
    23. 2014-07-28T17:17:12.180-0700 INFO pool-1-thread-1 de.softwareforge.kafka.LoadCommand Generated 1500 rows for table 'customer'.
    24. 2014-07-28T17:17:12.251-0700 INFO pool-1-thread-4 de.softwareforge.kafka.LoadCommand Generated 2000 rows for table 'part'.
    25. 2014-07-28T17:17:12.905-0700 INFO pool-1-thread-2 de.softwareforge.kafka.LoadCommand Generated 15000 rows for table 'orders'.
    26. 2014-07-28T17:17:12.919-0700 INFO pool-1-thread-5 de.softwareforge.kafka.LoadCommand Generated 8000 rows for table 'partsupp'.
    27. 2014-07-28T17:17:13.877-0700 INFO pool-1-thread-3 de.softwareforge.kafka.LoadCommand Generated 60175 rows for table 'lineitem'.

    现在,Kafka拥有了多个预先装载了要查询的数据的主题。

    在openLooKeng安装中,为Kafka连接器添加目录属性文件etc/catalog/kafka.properties。该文件列出了Kafka节点和主题:

    1. connector.name=kafka
    2. kafka.nodes=localhost:9092
    3. kafka.table-names=tpch.customer,tpch.orders,tpch.lineitem,tpch.part,tpch.partsupp,tpch.supplier,tpch.nation,tpch.region
    4. kafka.hide-internal-columns=false

    现在启动openLooKeng:

    1. $ bin/launcher start

    启动openLooKeng CLI

    1. $ ./openlk-cli --catalog kafka --schema tpch

    列出表,以验证操作是否成功:

    步骤4:基础数据查询

    Kafka数据是非结构化的,没有元数据来描述消息的格式。无需进一步配置,Kafka连接器可以访问数据并以原始形式映射数据,但除了内置列之外没有实际列:

    1. lk:tpch> DESCRIBE customer;
    2. Column | Type | Extra | Comment
    3. -------------------+---------+-------+---------------------------------------------
    4. _partition_id | bigint | | Partition Id
    5. _partition_offset | bigint | | Offset for the message within the partition
    6. _segment_start | bigint | | Segment start offset
    7. _segment_end | bigint | | Segment end offset
    8. _segment_count | bigint | | Running message count per segment
    9. _key | varchar | | Key text
    10. _key_corrupt | boolean | | Key data is corrupt
    11. _key_length | bigint | | Total number of key bytes
    12. _message | varchar | | Message text
    13. _message_corrupt | boolean | | Message data is corrupt
    14. _message_length | bigint | | Total number of message bytes
    15. (11 rows)
    16. lk:tpch> SELECT count(*) FROM customer;
    17. _col0
    18. -------
    19. 1500
    20. lk:tpch> SELECT _message FROM customer LIMIT 5;
    21. _message
    22. --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
    23. {"rowNumber":1,"customerKey":1,"name":"Customer#000000001","address":"IVhzIApeRb ot,c,E","nationKey":15,"phone":"25-989-741-2988","accountBalance":711.56,"marketSegment":"BUILDING","comment":"to the even, regular platelets. regular, ironic epitaphs nag e"}
    24. {"rowNumber":3,"customerKey":3,"name":"Customer#000000003","address":"MG9kdTD2WBHm","nationKey":1,"phone":"11-719-748-3364","accountBalance":7498.12,"marketSegment":"AUTOMOBILE","comment":" deposits eat slyly ironic, even instructions. express foxes detect slyly. blithel
    25. {"rowNumber":5,"customerKey":5,"name":"Customer#000000005","address":"KvpyuHCplrB84WgAiGV6sYpZq7Tj","nationKey":3,"phone":"13-750-942-6364","accountBalance":794.47,"marketSegment":"HOUSEHOLD","comment":"n accounts will have to unwind. foxes cajole accor"}
    26. {"rowNumber":7,"customerKey":7,"name":"Customer#000000007","address":"TcGe5gaZNgVePxU5kRrvXBfkasDTea","nationKey":18,"phone":"28-190-982-9759","accountBalance":9561.95,"marketSegment":"AUTOMOBILE","comment":"ainst the ironic, express theodolites. express, even pinto bean
    27. {"rowNumber":9,"customerKey":9,"name":"Customer#000000009","address":"xKiAFTjUsCuxfeleNqefumTrjS","nationKey":8,"phone":"18-338-906-3675","accountBalance":8324.07,"marketSegment":"FURNITURE","comment":"r theodolites according to the requests wake thinly excuses: pending
    28. (5 rows)
    29. lk:tpch> SELECT sum(cast(json_extract_scalar(_message, '$.accountBalance') AS double)) FROM customer LIMIT 10;
    30. _col0
    31. ------------
    32. 6681865.59
    33. (1 row)

    Kafka中的数据可以使用openLooKeng查询,但并不是呈实际表的形式。原始数据可以通过_message_key列获得,但不会解码为列。由于样本数据是JSON格式,因此可以使用openLooKeng内置的json对数据进行切片。

    Kafka连接器支持主题描述文件,将原始数据转换为表格式。这些文件位于openLooKeng安装的etc/kafka文件夹中,必须以.json结尾。建议文件名与表名匹配,但不一定必须匹配。

    将如下文件添加为etc/kafka/tpch.customer.json,并重启openLooKeng。

    1. {
    2. "tableName": "customer",
    3. "schemaName": "tpch",
    4. "topicName": "tpch.customer",
    5. "key": {
    6. "dataFormat": "raw",
    7. "fields": [
    8. {
    9. "name": "kafka_key",
    10. "dataFormat": "LONG",
    11. "type": "BIGINT",
    12. "hidden": "false"
    13. }
    14. ]
    15. }
    16. }

    customer表现在有一个额外的列:kafka_key

    1. lk:tpch> DESCRIBE customer;
    2. Column | Type | Extra | Comment
    3. -------------------+---------+-------+---------------------------------------------
    4. kafka_key | bigint | |
    5. _partition_id | bigint | | Partition Id
    6. _partition_offset | bigint | | Offset for the message within the partition
    7. _segment_start | bigint | | Segment start offset
    8. _segment_end | bigint | | Segment end offset
    9. _segment_count | bigint | | Running message count per segment
    10. _key | varchar | | Key text
    11. _key_corrupt | boolean | | Key data is corrupt
    12. _key_length | bigint | | Total number of key bytes
    13. _message | varchar | | Message text
    14. _message_corrupt | boolean | | Message data is corrupt
    15. _message_length | bigint | | Total number of message bytes
    16. (12 rows)
    17. lk:tpch> SELECT kafka_key FROM customer ORDER BY kafka_key LIMIT 10;
    18. kafka_key
    19. -----------
    20. 1
    21. 2
    22. 3
    23. 5
    24. 6
    25. 7
    26. 8
    27. 9
    28. (10 rows)

    主题定义文件将内部Kafka密钥(8字节,原始长度)映射到openLooKeng BIGINT列。

    步骤6:将来自主题消息的所有值映射到列

    更新etc/kafka/tpch.customer.json文件,为消息添加字段,并重启openLooKeng。由于消息中的字段是JSON,因此使用json数据格式。以下是对键和消息使用不同的数据格式的示例。

    1. {
    2. "tableName": "customer",
    3. "schemaName": "tpch",
    4. "topicName": "tpch.customer",
    5. "key": {
    6. "dataFormat": "raw",
    7. "fields": [
    8. {
    9. "name": "kafka_key",
    10. "dataFormat": "LONG",
    11. "type": "BIGINT",
    12. "hidden": "false"
    13. }
    14. ]
    15. },
    16. "message": {
    17. "dataFormat": "json",
    18. "fields": [
    19. {
    20. "name": "row_number",
    21. "mapping": "rowNumber",
    22. "type": "BIGINT"
    23. },
    24. {
    25. "name": "customer_key",
    26. "mapping": "customerKey",
    27. "type": "BIGINT"
    28. },
    29. {
    30. "name": "name",
    31. "mapping": "name",
    32. "type": "VARCHAR"
    33. },
    34. {
    35. "name": "address",
    36. "mapping": "address",
    37. "type": "VARCHAR"
    38. },
    39. {
    40. "name": "nation_key",
    41. "mapping": "nationKey",
    42. "type": "BIGINT"
    43. },
    44. {
    45. "name": "phone",
    46. "mapping": "phone",
    47. "type": "VARCHAR"
    48. },
    49. {
    50. "name": "account_balance",
    51. "mapping": "accountBalance",
    52. "type": "DOUBLE"
    53. },
    54. {
    55. "name": "market_segment",
    56. "mapping": "marketSegment",
    57. "type": "VARCHAR"
    58. },
    59. {
    60. "name": "comment",
    61. "mapping": "comment",
    62. "type": "VARCHAR"
    63. }
    64. ]
    65. }
    66. }

    现在,对于消息的JSON中的所有字段,都定义了列,并且来自早期的求和查询可以直接对account_balance列进行操作:

    1. lk:tpch> DESCRIBE customer;
    2. Column | Type | Extra | Comment
    3. -------------------+---------+-------+---------------------------------------------
    4. kafka_key | bigint | |
    5. row_number | bigint | |
    6. customer_key | bigint | |
    7. name | varchar | |
    8. address | varchar | |
    9. nation_key | bigint | |
    10. phone | varchar | |
    11. account_balance | double | |
    12. market_segment | varchar | |
    13. comment | varchar | |
    14. _partition_id | bigint | | Partition Id
    15. _partition_offset | bigint | | Offset for the message within the partition
    16. _segment_start | bigint | | Segment start offset
    17. _segment_end | bigint | | Segment end offset
    18. _segment_count | bigint | | Running message count per segment
    19. _key | varchar | | Key text
    20. _key_corrupt | boolean | | Key data is corrupt
    21. _key_length | bigint | | Total number of key bytes
    22. _message | varchar | | Message text
    23. _message_corrupt | boolean | | Message data is corrupt
    24. _message_length | bigint | | Total number of message bytes
    25. (21 rows)
    26. lk:tpch> SELECT * FROM customer LIMIT 5;
    27. kafka_key | row_number | customer_key | name | address | nation_key | phone | account_balance | market_segment | comment
    28. -----------+------------+--------------+--------------------+---------------------------------------+------------+-----------------+-----------------+----------------+---------------------------------------------------------------------------------------------------------
    29. 1 | 2 | 2 | Customer#000000002 | XSTf4,NCwDVaWNe6tEgvwfmRchLXak | 13 | 23-768-687-3665 | 121.65 | AUTOMOBILE | l accounts. blithely ironic theodolites integrate boldly: caref
    30. 3 | 4 | 4 | Customer#000000004 | XxVSJsLAGtn | 4 | 14-128-190-5944 | 2866.83 | MACHINERY | requests. final, regular ideas sleep final accou
    31. 5 | 6 | 6 | Customer#000000006 | sKZz0CsnMD7mp4Xd0YrBvx,LREYKUWAh yVn | 20 | 30-114-968-4951 | 7638.57 | AUTOMOBILE | tions. even deposits boost according to the slyly bold packages. final accounts cajole requests. furious
    32. 7 | 8 | 8 | Customer#000000008 | I0B10bB0AymmC, 0PrRYBCP1yGJ8xcBPmWhl5 | 17 | 27-147-574-9335 | 6819.74 | BUILDING | among the slyly regular theodolites kindle blithely courts. carefully even theodolites haggle slyly alon
    33. 9 | 10 | 10 | Customer#000000010 | 6LrEaV6KR6PLVcgl2ArL Q3rqzLzcT1 v2 | 5 | 15-741-346-9870 | 2753.54 | HOUSEHOLD | es regular deposits haggle. fur
    34. (5 rows)
    35. lk:tpch> SELECT sum(account_balance) FROM customer LIMIT 10;
    36. _col0
    37. ------------
    38. 6681865.59
    39. (1 row)

    现在,customer主题消息的所有字段都作为openLooKeng表列可用。

    设置实时Twitter推送

    • 下载twistr工具
    1. $ curl -o twistr https://repo1.maven.org/maven2/de/softwareforge/twistr_kafka_0811/1.2/twistr_kafka_0811-1.2.sh
    2. $ chmod 755 twistr
    • 在创建开发者账号,并设置访问和消费token。
    • 创建twistr.properties文件,将访问密钥和消费者密钥放入其中:

    在openLooKeng上创建tweets表

    etc/catalog/kafka.properties文件中添加tweets表:

    1. connector.name=kafka
    2. kafka.nodes=localhost:9092
    3. kafka.hide-internal-columns=false

    将Twitter推送的主题定义文件添加为etc/kafka/tweets.json

    1. {
    2. "topicName": "twitter_feed",
    3. "dataFormat": "json",
    4. "key": {
    5. "dataFormat": "raw",
    6. "fields": [
    7. {
    8. "name": "kafka_key",
    9. "dataFormat": "LONG",
    10. "type": "BIGINT",
    11. "hidden": "false"
    12. }
    13. ]
    14. },
    15. "message": {
    16. "dataFormat":"json",
    17. "fields": [
    18. {
    19. "name": "text",
    20. "mapping": "text",
    21. "type": "VARCHAR"
    22. },
    23. {
    24. "name": "user_name",
    25. "mapping": "user/screen_name",
    26. "type": "VARCHAR"
    27. },
    28. {
    29. "name": "lang",
    30. "mapping": "lang",
    31. "type": "VARCHAR"
    32. },
    33. {
    34. "name": "created_at",
    35. "mapping": "created_at",
    36. "type": "TIMESTAMP",
    37. "dataFormat": "rfc2822"
    38. },
    39. {
    40. "name": "favorite_count",
    41. "mapping": "favorite_count",
    42. "type": "BIGINT"
    43. },
    44. {
    45. "name": "retweet_count",
    46. "mapping": "retweet_count",
    47. "type": "BIGINT"
    48. },
    49. {
    50. "name": "favorited",
    51. "mapping": "favorited",
    52. "type": "BOOLEAN"
    53. },
    54. {
    55. "name": "id",
    56. "mapping": "id_str",
    57. "type": "VARCHAR"
    58. },
    59. {
    60. "name": "in_reply_to_screen_name",
    61. "mapping": "in_reply_to_screen_name",
    62. "type": "VARCHAR"
    63. },
    64. {
    65. "name": "place_name",
    66. "mapping": "place/full_name",
    67. "type": "VARCHAR"
    68. }
    69. ]
    70. }
    71. }

    由于此表没有显式的模式名称,因此将把它放入default模式中。

    推送实时数据

    启动twistr工具:

    1. $ java -Dness.config.location=file:$(pwd) -Dness.config=twistr -jar ./twistr

    twistr连接Twitter API,并将“sample tweet”推送到名为twitter_feed的Kafka主题。

    现在对实时数据运行查询:

    1. $ ./openlk-cli --catalog kafka --schema default
    2. lk:default> SELECT count(*) FROM tweets;
    3. _col0
    4. -------
    5. 4467
    6. (1 row)
    7. lk:default> SELECT count(*) FROM tweets;
    8. _col0
    9. -------
    10. 4517
    11. (1 row)
    12. lk:default> SELECT count(*) FROM tweets;
    13. _col0
    14. -------
    15. 4572
    16. (1 row)
    17. lk:default> SELECT kafka_key, user_name, lang, created_at FROM tweets LIMIT 10;
    18. kafka_key | user_name | lang | created_at
    19. --------------------+-----------------+------+-------------------------
    20. 494227746231685121 | burncaniff | en | 2014-07-29 14:07:31.000
    21. 494227746214535169 | gu8tn | ja | 2014-07-29 14:07:31.000
    22. 494227746219126785 | pequitamedicen | es | 2014-07-29 14:07:31.000
    23. 494227746201931777 | josnyS | ht | 2014-07-29 14:07:31.000
    24. 494227746219110401 | Cafe510 | en | 2014-07-29 14:07:31.000
    25. 494227746210332673 | Da_JuanAnd_Only | en | 2014-07-29 14:07:31.000
    26. 494227746193956865 | Smile_Kidrauhl6 | pt | 2014-07-29 14:07:31.000
    27. 494227750426017793 | CashforeverCD | en | 2014-07-29 14:07:32.000
    28. 494227750396653569 | FilmArsivimiz | tr | 2014-07-29 14:07:32.000
    29. 494227750388256769 | jmolas | es | 2014-07-29 14:07:32.000
    30. (10 rows)

    现在有一个实时推送到Kafka,可以使用openLooKeng查询。

    结语:时间戳

    在上一步中设置的tweets推送在每个tweet中都包含一个RFC 2822格式的时间戳作为created_at属性。

    1. lk:default> SELECT DISTINCT json_extract_scalar(_message, '$.created_at')) AS raw_date
    2. -> FROM tweets LIMIT 5;
    3. raw_date
    4. --------------------------------
    5. Tue Jul 29 21:07:31 +0000 2014
    6. Tue Jul 29 21:07:32 +0000 2014
    7. Tue Jul 29 21:07:33 +0000 2014
    8. Tue Jul 29 21:07:34 +0000 2014
    9. Tue Jul 29 21:07:35 +0000 2014
    10. (5 rows)

    tweets表的主题定义文件包含使用rfc2822转换器转换到时间戳的映射:

    1. ...
    2. {
    3. "name": "created_at",
    4. "mapping": "created_at",
    5. "type": "TIMESTAMP",
    6. "dataFormat": "rfc2822"
    7. ...

    这允许将原始数据映射到openLooKeng时间戳列:

    Kafka连接器包含用于ISO8601、RFC 2822文本格式以及使用自epoch时间以来的秒或毫秒的数的时间戳的转换器。还有一个通用的、基于文本的格式化程序,它使用Joda-Time格式字符串来解析文本列。