Query data with Pulsar SQL

    1. Install Pulsar.
    2. Install Pulsar .

    To query data in Pulsar with Pulsar SQL, complete the following steps.

    1. Start a Pulsar standalone cluster.
    1. Start a Pulsar SQL worker.
    1. ./bin/pulsar sql-worker run
    1. After initializing Pulsar standalone cluster and the SQL worker, run SQL CLI.
    1. Test with SQL commands.
    1. presto> show catalogs;
    2. Catalog
    3. ---------
    4. pulsar
    5. system
    6. (2 rows)
    7. Query 20180829_211752_00004_7qpwh, FINISHED, 1 node
    8. Splits: 19 total, 19 done (100.00%)
    9. 0:00 [0 rows, 0B] [0 rows/s, 0B/s]
    10. presto> show schemas in pulsar;
    11. Schema
    12. -----------------------
    13. information_schema
    14. public/default
    15. public/functions
    16. sample/standalone/ns1
    17. (4 rows)
    18. Query 20180829_211818_00005_7qpwh, FINISHED, 1 node
    19. Splits: 19 total, 19 done (100.00%)
    20. presto> show tables in pulsar."public/default";
    21. Table
    22. -------
    23. Query 20180829_211839_00006_7qpwh, FINISHED, 1 node
    24. Splits: 19 total, 19 done (100.00%)
    25. 0:00 [0 rows, 0B] [0 rows/s, 0B/s]
    1. Start the built-in connector DataGeneratorSource and ingest some mock data.

    And then you can query a topic in the namespace “public/default”.

    1. presto> show tables in pulsar."public/default";
    2. Table
    3. ----------------
    4. generator_test
    5. (1 row)
    6. Query 20180829_213202_00000_csyeu, FINISHED, 1 node
    7. Splits: 19 total, 19 done (100.00%)
    8. 0:02 [1 rows, 38B] [0 rows/s, 17B/s]

    You can query the mock data.

    1. public class TestProducer {
    2. public static class Foo {
    3. private int field1 = 1;
    4. private String field2;
    5. private long field3;
    6. public Foo() {
    7. }
    8. public int getField1() {
    9. return field1;
    10. }
    11. public void setField1(int field1) {
    12. this.field1 = field1;
    13. public String getField2() {
    14. return field2;
    15. }
    16. public void setField2(String field2) {
    17. this.field2 = field2;
    18. }
    19. public long getField3() {
    20. return field3;
    21. }
    22. public void setField3(long field3) {
    23. this.field3 = field3;
    24. }
    25. }
    26. public static void main(String[] args) throws Exception {
    27. PulsarClient pulsarClient = PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();
    28. Producer<Foo> producer = pulsarClient.newProducer(AvroSchema.of(Foo.class)).topic("test_topic").create();
    29. for (int i = 0; i < 1000; i++) {
    30. Foo foo = new Foo();
    31. foo.setField1(i);
    32. foo.setField2("foo" + i);
    33. foo.setField3(System.currentTimeMillis());
    34. producer.newMessage().value(foo).send();
    35. }
    36. producer.close();
    37. pulsarClient.close();