Kafka Connect YugabyteDB
The Kafka Connect YugabyteDB source connector streams table updates in YugabyteDB to Kafka topics. It is based on YugabyteDB’s Change Data Capture (CDC) feature. CDC allows the connector to simply subscribe to these table changes and then publish the changes to selected Kafka topics.
You can see the source connector in action in the CDC to Kafka page.
Kafka Connect YugabyteDB Sink Connector
For building and using this project, the following tools must be installed on your system.
- JDK - 1.8+
- Maven - 3.3+
- Clone this repo into directory.
Setup and use
- Set up and start KafkaDownload the Apache Kafka
tar
file.
Any latest version can be used — this is an example.
- Start Zookeeper and Kafka server
~/yb-kafka/kafka_2.11-2.0.0/bin/zookeeper-server-start.sh config/zookeeper.properties &
~/yb-kafka/kafka_2.11-2.0.0/bin/kafka-server-start.sh config/server.properties &
- Create a Kafka topic.
$ ~/yb-kafka/kafka_2.11-2.0.0/bin/kafka-topics.sh --create --zookeeper localhost:2181--replication-factor 1 --partitions 1 --topic test
This needs to be done only once.
- Just cut-and-paste the following lines at the prompt:
{"key" : "A", "value" : 1, "ts" : 1541559411000}
{"key" : "B", "value" : 2, "ts" : 1541559412000}
{"key" : "C", "value" : 3, "ts" : 1541559413000}
- Install YugabyteDB and create the database table.
.Create a database and table by running the following command. You can find cqlsh
in the bin
subdirectory located inside the YugabyteDB installation folder.
yugabyte=# CREATE DATABASE IF NOT EXISTS demo;
demo=# CREATE TABLE demo.test_table (key text, value bigint, ts timestamp, PRIMARY KEY (key));
- Set up and run the Kafka Connect Sink
Setup the required jars needed by connect
Finally, run the Kafka Connect YugabyteDB Sink Connector in standalone mode:
$ ~/yb-kafka/kafka_2.11-2.0.0/bin/connect-standalone.sh ~/yb-kafka/yb-kafka-connector/resourcesexamples/kafka.connect.properties ~/yb-kafka/yb-kafka-connector/resources/examplesyugabyte.sink.properties
- Setting the
bootstrap.servers
to a remote host/ports in thekafka.connect.properties
file can help connect to any accessible existing Kafka cluster. - The
database
andtablename
values in theyugabyte.sink.properties
file should match the values in the ysqlsh commands in step 5. - The
topics
value should match the topic name from producer in step 6. - Check the console output (optional).You should see something like this (relevant lines from
YBSinkTask.java
) on the console:
[2018-10-28 16:24:16,037] INFO Start with keyspace=demo, table=test_table(com.yb.connect.sink.YBSinkTask:79)
[2018-10-28 16:24:16,517] INFO Connected to cluster: cluster1(com.yb.connect.sink.YBSinkTask:155)
[2018-10-28 16:24:16,594] INFO Processing 3 records from Kafka.(com.yb.connect.sink.YBSinkTask:95)
[2018-10-28 16:24:16,602] INFO Insert INSERT INTO demo.test_table(key,ts,value) VALUES (?,?,? (com.yb.connect.sink.YBSinkTask:439)
[2018-10-28 16:24:16,612] INFO Prepare SinkRecord ...
[2018-10-28 16:24:16,618] INFO Bind 'ts' of type timestamp(com.yb.connect.sink.YBSinkTask:255)
...
- Confirm that the rows are in the target table in the YugabyteDB cluster, using
ysqlsh
.
key | value | ts
----+-------+---------------------------------
A | 1 | 2018-11-07 02:56:51.000000+0000
Note that the timestamp value gets printed as a human-readable date format automatically.