6.10. Kafka Connector Tutorial
Installation
This tutorial assumes familiarity with Presto and a working local Prestoinstallation (see ). It will focus onsetting up Apache Kafka and integrating it with Presto.
Download and extract Apache Kafka.
Note
This tutorial was tested with Apache Kafka 0.8.1.It should work with any 0.8.x version of Apache Kafka.
Start ZooKeeper and the Kafka server:
- $ bin/kafka-server-start.sh config/server.properties
- [2013-04-22 15:01:47,028] INFO Verifying properties (kafka.utils.VerifiableProperties)
- [2013-04-22 15:01:47,051] INFO Property socket.send.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties)
- ...
This will start Zookeeper on port and Kafka on port 9092
.
Step 2: Load data
Download the tpch-kafka loader from Maven central:
- $ curl -o kafka-tpch https://repo1.maven.org/maven2/de/softwareforge/kafka_tpch_0811/1.0/kafka_tpch_0811-1.0.sh
- $ chmod 755 kafka-tpch
Now run the kafka-tpch
program to preload a number of topics with tpch data:
- $ ./kafka-tpch load --brokers localhost:9092 --prefix tpch. --tpch-type tiny
- 2014-07-28T17:17:07.594-0700 INFO main com.facebook.airlift.log.Logging Logging to stderr
- 2014-07-28T17:17:07.623-0700 INFO main de.softwareforge.kafka.LoadCommand Processing tables: [customer, orders, lineitem, part, partsupp, supplier, nation, region]
- 2014-07-28T17:17:07.981-0700 INFO pool-1-thread-1 de.softwareforge.kafka.LoadCommand Loading table 'customer' into topic 'tpch.customer'...
- 2014-07-28T17:17:07.981-0700 INFO pool-1-thread-2 de.softwareforge.kafka.LoadCommand Loading table 'orders' into topic 'tpch.orders'...
- 2014-07-28T17:17:07.981-0700 INFO pool-1-thread-3 de.softwareforge.kafka.LoadCommand Loading table 'lineitem' into topic 'tpch.lineitem'...
- 2014-07-28T17:17:07.982-0700 INFO pool-1-thread-4 de.softwareforge.kafka.LoadCommand Loading table 'part' into topic 'tpch.part'...
- 2014-07-28T17:17:07.982-0700 INFO pool-1-thread-5 de.softwareforge.kafka.LoadCommand Loading table 'partsupp' into topic 'tpch.partsupp'...
- 2014-07-28T17:17:07.982-0700 INFO pool-1-thread-6 de.softwareforge.kafka.LoadCommand Loading table 'supplier' into topic 'tpch.supplier'...
- 2014-07-28T17:17:07.982-0700 INFO pool-1-thread-7 de.softwareforge.kafka.LoadCommand Loading table 'nation' into topic 'tpch.nation'...
- 2014-07-28T17:17:07.982-0700 INFO pool-1-thread-8 de.softwareforge.kafka.LoadCommand Loading table 'region' into topic 'tpch.region'...
- 2014-07-28T17:17:10.612-0700 ERROR pool-1-thread-8 kafka.producer.async.DefaultEventHandler Failed to collate messages by topic, partition due to: Failed to fetch topic metadata for topic: tpch.region
- 2014-07-28T17:17:10.781-0700 INFO pool-1-thread-8 de.softwareforge.kafka.LoadCommand Generated 5 rows for table 'region'.
- 2014-07-28T17:17:10.797-0700 ERROR pool-1-thread-3 kafka.producer.async.DefaultEventHandler Failed to collate messages by topic, partition due to: Failed to fetch topic metadata for topic: tpch.lineitem
- 2014-07-28T17:17:10.932-0700 ERROR pool-1-thread-1 kafka.producer.async.DefaultEventHandler Failed to collate messages by topic, partition due to: Failed to fetch topic metadata for topic: tpch.customer
- 2014-07-28T17:17:11.068-0700 ERROR pool-1-thread-2 kafka.producer.async.DefaultEventHandler Failed to collate messages by topic, partition due to: Failed to fetch topic metadata for topic: tpch.orders
- 2014-07-28T17:17:11.200-0700 ERROR pool-1-thread-6 kafka.producer.async.DefaultEventHandler Failed to collate messages by topic, partition due to: Failed to fetch topic metadata for topic: tpch.supplier
- 2014-07-28T17:17:11.319-0700 INFO pool-1-thread-6 de.softwareforge.kafka.LoadCommand Generated 100 rows for table 'supplier'.
- 2014-07-28T17:17:11.333-0700 ERROR pool-1-thread-4 kafka.producer.async.DefaultEventHandler Failed to collate messages by topic, partition due to: Failed to fetch topic metadata for topic: tpch.part
- 2014-07-28T17:17:11.466-0700 ERROR pool-1-thread-5 kafka.producer.async.DefaultEventHandler Failed to collate messages by topic, partition due to: Failed to fetch topic metadata for topic: tpch.partsupp
- 2014-07-28T17:17:11.597-0700 ERROR pool-1-thread-7 kafka.producer.async.DefaultEventHandler Failed to collate messages by topic, partition due to: Failed to fetch topic metadata for topic: tpch.nation
- 2014-07-28T17:17:11.706-0700 INFO pool-1-thread-7 de.softwareforge.kafka.LoadCommand Generated 25 rows for table 'nation'.
- 2014-07-28T17:17:12.180-0700 INFO pool-1-thread-1 de.softwareforge.kafka.LoadCommand Generated 1500 rows for table 'customer'.
- 2014-07-28T17:17:12.251-0700 INFO pool-1-thread-4 de.softwareforge.kafka.LoadCommand Generated 2000 rows for table 'part'.
- 2014-07-28T17:17:12.905-0700 INFO pool-1-thread-2 de.softwareforge.kafka.LoadCommand Generated 15000 rows for table 'orders'.
- 2014-07-28T17:17:12.919-0700 INFO pool-1-thread-5 de.softwareforge.kafka.LoadCommand Generated 8000 rows for table 'partsupp'.
- 2014-07-28T17:17:13.877-0700 INFO pool-1-thread-3 de.softwareforge.kafka.LoadCommand Generated 60175 rows for table 'lineitem'.
Kafka now has a number of topics that are preloaded with data to query.
In your Presto installation, add a catalog properties fileetc/catalog/kafka.properties
for the Kafka connector.This file lists the Kafka nodes and topics:
- connector.name=kafka
- kafka.nodes=localhost:9092
- kafka.table-names=tpch.customer,tpch.orders,tpch.lineitem,tpch.part,tpch.partsupp,tpch.supplier,tpch.nation,tpch.region
- kafka.hide-internal-columns=false
Now start Presto:
- $ bin/launcher start
Start the Presto CLI:
- $ ./presto --catalog kafka --schema tpch
List the tables to verify that things are working:
Step 4: Basic data querying
Kafka data is unstructured and it has no metadata to describe the format ofthe messages. Without further configuration, the Kafka connector can accessthe data and map it in raw form but there are no actual columns besides thebuilt-in ones:
- presto:tpch> DESCRIBE customer;
- Column | Type | Extra | Comment
- -------------------+---------+-------+---------------------------------------------
- _partition_id | bigint | | Partition Id
- _partition_offset | bigint | | Offset for the message within the partition
- _key | varchar | | Key text
- _key_corrupt | boolean | | Key data is corrupt
- _key_length | bigint | | Total number of key bytes
- _message | varchar | | Message text
- _message_corrupt | boolean | | Message data is corrupt
- _message_length | bigint | | Total number of message bytes
- (11 rows)
- presto:tpch> SELECT count(*) FROM customer;
- _col0
- -------
- 1500
- presto:tpch> SELECT _message FROM customer LIMIT 5;
- _message
- --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
- {"rowNumber":1,"customerKey":1,"name":"Customer#000000001","address":"IVhzIApeRb ot,c,E","nationKey":15,"phone":"25-989-741-2988","accountBalance":711.56,"marketSegment":"BUILDING","comment":"to the even, regular platelets. regular, ironic epitaphs nag e"}
- {"rowNumber":3,"customerKey":3,"name":"Customer#000000003","address":"MG9kdTD2WBHm","nationKey":1,"phone":"11-719-748-3364","accountBalance":7498.12,"marketSegment":"AUTOMOBILE","comment":" deposits eat slyly ironic, even instructions. express foxes detect slyly. blithel
- {"rowNumber":5,"customerKey":5,"name":"Customer#000000005","address":"KvpyuHCplrB84WgAiGV6sYpZq7Tj","nationKey":3,"phone":"13-750-942-6364","accountBalance":794.47,"marketSegment":"HOUSEHOLD","comment":"n accounts will have to unwind. foxes cajole accor"}
- {"rowNumber":7,"customerKey":7,"name":"Customer#000000007","address":"TcGe5gaZNgVePxU5kRrvXBfkasDTea","nationKey":18,"phone":"28-190-982-9759","accountBalance":9561.95,"marketSegment":"AUTOMOBILE","comment":"ainst the ironic, express theodolites. express, even pinto bean
- {"rowNumber":9,"customerKey":9,"name":"Customer#000000009","address":"xKiAFTjUsCuxfeleNqefumTrjS","nationKey":8,"phone":"18-338-906-3675","accountBalance":8324.07,"marketSegment":"FURNITURE","comment":"r theodolites according to the requests wake thinly excuses: pending
- (5 rows)
- presto:tpch> SELECT sum(cast(json_extract_scalar(_message, '$.accountBalance') AS double)) FROM customer LIMIT 10;
- _col0
- ------------
- 6681865.59
- (1 row)
The data from Kafka can be queried using Presto but it is not yet inactual table shape. The raw data is available through the _message
and columns but it is not decoded into columns. As the sample data isin JSON format, the JSON Functions and Operators built into Presto can be usedto slice the data.
The Kafka connector supports topic description files to turn raw data intotable format. These files are located in the etc/kafka
folder in thePresto installation and must end with .json
. It is recommended thatthe file name matches the table name but this is not necessary.
Add the following file as etc/kafka/tpch.customer.json
and restart Presto:
- {
- "tableName": "customer",
- "schemaName": "tpch",
- "topicName": "tpch.customer",
- "key": {
- "dataFormat": "raw",
- "fields": [
- {
- "name": "kafka_key",
- "dataFormat": "LONG",
- "type": "BIGINT",
- "hidden": "false"
- }
- ]
- }
- }
The customer table now has an additional column: kafka_key
.
- presto:tpch> DESCRIBE customer;
- Column | Type | Extra | Comment
- -------------------+---------+-------+---------------------------------------------
- kafka_key | bigint | |
- _partition_id | bigint | | Partition Id
- _partition_offset | bigint | | Offset for the message within the partition
- _key | varchar | | Key text
- _key_corrupt | boolean | | Key data is corrupt
- _key_length | bigint | | Total number of key bytes
- _message | varchar | | Message text
- _message_corrupt | boolean | | Message data is corrupt
- _message_length | bigint | | Total number of message bytes
- (12 rows)
- kafka_key
- -----------
- 0
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- (10 rows)
The topic definition file maps the internal Kafka key (which is a raw longin eight bytes) onto a Presto BIGINT
column.
Step 6: Map all the values from the topic message onto columns
Update the etc/kafka/tpch.customer.json
file to add fields for themessage and restart Presto. As the fields in the message are JSON, it usesthe json
data format. This is an example where different data formatsare used for the key and the message.
- {
- "tableName": "customer",
- "schemaName": "tpch",
- "topicName": "tpch.customer",
- "key": {
- "dataFormat": "raw",
- "fields": [
- {
- "name": "kafka_key",
- "dataFormat": "LONG",
- "type": "BIGINT",
- "hidden": "false"
- }
- ]
- },
- "message": {
- "dataFormat": "json",
- "fields": [
- {
- "name": "row_number",
- "mapping": "rowNumber",
- "type": "BIGINT"
- },
- {
- "name": "customer_key",
- "mapping": "customerKey",
- "type": "BIGINT"
- },
- {
- "name": "name",
- "mapping": "name",
- "type": "VARCHAR"
- },
- {
- "name": "address",
- "mapping": "address",
- "type": "VARCHAR"
- },
- {
- "name": "nation_key",
- "mapping": "nationKey",
- "type": "BIGINT"
- },
- {
- "name": "phone",
- "mapping": "phone",
- "type": "VARCHAR"
- },
- {
- "name": "account_balance",
- "mapping": "accountBalance",
- "type": "DOUBLE"
- },
- {
- "name": "market_segment",
- "mapping": "marketSegment",
- "type": "VARCHAR"
- },
- {
- "name": "comment",
- "mapping": "comment",
- "type": "VARCHAR"
- }
- ]
- }
- }
Now for all the fields in the JSON of the message, columns are defined andthe sum query from earlier can operate on the account_balance
column directly:
- presto:tpch> DESCRIBE customer;
- Column | Type | Extra | Comment
- -------------------+---------+-------+---------------------------------------------
- kafka_key | bigint | |
- row_number | bigint | |
- customer_key | bigint | |
- name | varchar | |
- address | varchar | |
- nation_key | bigint | |
- phone | varchar | |
- account_balance | double | |
- market_segment | varchar | |
- comment | varchar | |
- _partition_id | bigint | | Partition Id
- _partition_offset | bigint | | Offset for the message within the partition
- _key | varchar | | Key text
- _key_corrupt | boolean | | Key data is corrupt
- _key_length | bigint | | Total number of key bytes
- _message | varchar | | Message text
- _message_corrupt | boolean | | Message data is corrupt
- _message_length | bigint | | Total number of message bytes
- (21 rows)
- presto:tpch> SELECT * FROM customer LIMIT 5;
- kafka_key | row_number | customer_key | name | address | nation_key | phone | account_balance | market_segment | comment
- -----------+------------+--------------+--------------------+---------------------------------------+------------+-----------------+-----------------+----------------+---------------------------------------------------------------------------------------------------------
- 1 | 2 | 2 | Customer#000000002 | XSTf4,NCwDVaWNe6tEgvwfmRchLXak | 13 | 23-768-687-3665 | 121.65 | AUTOMOBILE | l accounts. blithely ironic theodolites integrate boldly: caref
- 3 | 4 | 4 | Customer#000000004 | XxVSJsLAGtn | 4 | 14-128-190-5944 | 2866.83 | MACHINERY | requests. final, regular ideas sleep final accou
- 5 | 6 | 6 | Customer#000000006 | sKZz0CsnMD7mp4Xd0YrBvx,LREYKUWAh yVn | 20 | 30-114-968-4951 | 7638.57 | AUTOMOBILE | tions. even deposits boost according to the slyly bold packages. final accounts cajole requests. furious
- 7 | 8 | 8 | Customer#000000008 | I0B10bB0AymmC, 0PrRYBCP1yGJ8xcBPmWhl5 | 17 | 27-147-574-9335 | 6819.74 | BUILDING | among the slyly regular theodolites kindle blithely courts. carefully even theodolites haggle slyly alon
- 9 | 10 | 10 | Customer#000000010 | 6LrEaV6KR6PLVcgl2ArL Q3rqzLzcT1 v2 | 5 | 15-741-346-9870 | 2753.54 | HOUSEHOLD | es regular deposits haggle. fur
- (5 rows)
- presto:tpch> SELECT sum(account_balance) FROM customer LIMIT 10;
- _col0
- ------------
- 6681865.59
- (1 row)
Now all the fields from the topic messages are available asPresto table columns.
Setup a live Twitter feed
- Download the twistr tool
- $ curl -o twistr https://repo1.maven.org/maven2/de/softwareforge/twistr_kafka_0811/1.2/twistr_kafka_0811-1.2.sh
- $ chmod 755 twistr
- Create a developer account at and set up anaccess and consumer token.
- Create a
twistr.properties
file and put the access and consumer keyand secrets into it:
Create a tweets table on Presto
Add the tweets table to the etc/catalog/kafka.properties
file:
- connector.name=kafka
- kafka.nodes=localhost:9092
- kafka.table-names=tpch.customer,tpch.orders,tpch.lineitem,tpch.part,tpch.partsupp,tpch.supplier,tpch.nation,tpch.region,tweets
Add a topic definition file for the Twitter feed as etc/kafka/tweets.json
:
- {
- "tableName": "tweets",
- "topicName": "twitter_feed",
- "dataFormat": "json",
- "key": {
- "dataFormat": "raw",
- "fields": [
- {
- "name": "kafka_key",
- "dataFormat": "LONG",
- "type": "BIGINT",
- "hidden": "false"
- }
- ]
- },
- "message": {
- "dataFormat":"json",
- "fields": [
- {
- "name": "text",
- "mapping": "text",
- "type": "VARCHAR"
- },
- {
- "name": "user_name",
- "mapping": "user/screen_name",
- "type": "VARCHAR"
- },
- {
- "name": "lang",
- "mapping": "lang",
- "type": "VARCHAR"
- },
- {
- "name": "created_at",
- "mapping": "created_at",
- "type": "TIMESTAMP",
- "dataFormat": "rfc2822"
- },
- {
- "name": "favorite_count",
- "mapping": "favorite_count",
- "type": "BIGINT"
- },
- {
- "name": "retweet_count",
- "mapping": "retweet_count",
- "type": "BIGINT"
- },
- {
- "name": "favorited",
- "mapping": "favorited",
- "type": "BOOLEAN"
- },
- {
- "name": "id",
- "mapping": "id_str",
- "type": "VARCHAR"
- },
- {
- "name": "in_reply_to_screen_name",
- "mapping": "in_reply_to_screen_name",
- "type": "VARCHAR"
- },
- {
- "name": "place_name",
- "mapping": "place/full_name",
- "type": "VARCHAR"
- }
- ]
- }
- }
As this table does not have an explicit schema name, it will be placedinto the default
schema.
Feed live data
Start the twistr tool:
- $ java -Dness.config.location=file:$(pwd) -Dness.config=twistr -jar ./twistr
twistr
connects to the Twitter API and feeds the “sample tweet” feedinto a Kafka topic called twitter_feed
.
Now run queries against live data:
- $ ./presto-cli --catalog kafka --schema default
- presto:default> SELECT count(*) FROM tweets;
- _col0
- -------
- 4467
- (1 row)
- presto:default> SELECT count(*) FROM tweets;
- _col0
- -------
- 4517
- (1 row)
- presto:default> SELECT count(*) FROM tweets;
- _col0
- -------
- 4572
- (1 row)
- presto:default> SELECT kafka_key, user_name, lang, created_at FROM tweets LIMIT 10;
- kafka_key | user_name | lang | created_at
- --------------------+-----------------+------+-------------------------
- 494227746231685121 | burncaniff | en | 2014-07-29 14:07:31.000
- 494227746214535169 | gu8tn | ja | 2014-07-29 14:07:31.000
- 494227746219126785 | pequitamedicen | es | 2014-07-29 14:07:31.000
- 494227746201931777 | josnyS | ht | 2014-07-29 14:07:31.000
- 494227746219110401 | Cafe510 | en | 2014-07-29 14:07:31.000
- 494227746210332673 | Da_JuanAnd_Only | en | 2014-07-29 14:07:31.000
- 494227746193956865 | Smile_Kidrauhl6 | pt | 2014-07-29 14:07:31.000
- 494227750426017793 | CashforeverCD | en | 2014-07-29 14:07:32.000
- 494227750396653569 | FilmArsivimiz | tr | 2014-07-29 14:07:32.000
- 494227750388256769 | jmolas | es | 2014-07-29 14:07:32.000
- (10 rows)
There is now a live feed into Kafka which can be queried using Presto.
Epilogue: Time stamps
The tweets feed that was set up in the last step contains a time stamp inRFC 2822 format as created_at
attribute in each tweet.
- presto:default> SELECT DISTINCT json_extract_scalar(_message, '$.created_at')) AS raw_date
- -> FROM tweets LIMIT 5;
- raw_date
- --------------------------------
- Tue Jul 29 21:07:31 +0000 2014
- Tue Jul 29 21:07:32 +0000 2014
- Tue Jul 29 21:07:33 +0000 2014
- Tue Jul 29 21:07:34 +0000 2014
- Tue Jul 29 21:07:35 +0000 2014
- (5 rows)
The topic definition file for the tweets table contains a mapping onto atimestamp using the rfc2822
converter:
- ...
- {
- "name": "created_at",
- "mapping": "created_at",
- "type": "TIMESTAMP",
- "dataFormat": "rfc2822"
- },
- ...
This allows the raw data to be mapped onto a Presto timestamp column:
The Kafka connector contains converters for ISO 8601, RFC 2822 textformats and for number-based timestamps using seconds or miillisecondssince the epoch. There is also a generic, text-based formatter which usesJoda-Time format strings to parse text columns.