7.11. Kudu Connector

    Connector is compatible with all Apache Kudu versions starting from 1.0.

    If the connector uses features that are not available on the target server, an error will be returned. Apache Kudu 1.8.0 is currently used for testing.

    Configuration

    To configure the Kudu connector, create a catalog properties file etc/catalog/kudu.properties with the following contents, replacing the properties as appropriate:

    Apache Kudu does not support schemas, i.e. namespaces for tables. The connector can optionally emulate schemas by table naming conventions.

    The emulation of schemas is disabled by default. In this case all Kudu tables are part of the default schema.

    For example, a Kudu table named orders can be queried in Presto with SELECT * FROM kudu.default.orders or simple with SELECT * FROM orders if catalog and schema are set to kudu and default respectively.

    Table names can contain any characters in Kudu. In this case, use double quotes. E.g. To query a Kudu table named special.table! use SELECT * FROM kudu.default."special.table!".

    Example

    • Create a users table in the default schema with
    1. CREATE TABLE kudu.default.users (
    2. user_id int WITH (primary_key = true),
    3. first_name varchar,
    4. last_name varchar
    5. ) WITH (
    6. partition_by_hash_columns = ARRAY['user_id'],
    7. partition_by_hash_buckets = 2
    8. );

    On creating a Kudu table you must/can specify addition information about the primary key, encoding, and compression of columns and hash or range partitioning. Details see in section Create Table.

    • The table can be described using
    1. DESCRIBE kudu.default.users;

    You should get something like

    1. Column | Type | Extra | Comment
    2. ------------+---------+-------------------------------------------------+---------
    3. user_id | integer | primary_key, encoding=auto, compression=default |
    4. first_name | varchar | nullable, encoding=auto, compression=default |
    5. last_name | varchar | nullable, encoding=auto, compression=default |
    6. (3 rows)
    • Insert some data with
      • Select the inserted data

      If schema emulation has been enabled in the connector properties, i.e. etc/catalog/kudu.properties, tables are mapped to schemas depending on some conventions.

      • With kudu.schema-emulation.enabled=true and kudu.schema-emulation.prefix=, the mapping works like:

        As schemas are not directly supported by Kudu, a special table named $schemas is created for managing the schemas.

      • With kudu.schema-emulation.enabled=true and kudu.schema-emulation.prefix=presto::, the mapping works like:

        Kudu Table NamePresto Qualified Name
        orderskudu.default.orders
        part1.part2kudu.default.”part1.part2”
        x.y.zkudu.default.”x.y.z”
        presto::part1.part2kudu.part1.part2
        presto:x.y.zkudu.x.”y.z”

        As schemas are not directly supported by Kudu, a special table named presto::$schemas is created for managing the schemas.

      Data Type Mapping

      The data types of Presto and Kudu are mapped as far as possible:

      [1](1, ) On performing CREATE TABLE … AS … from a Presto table to Kudu, the optional maximum length is lost
      Presto SQL statementComment
      SELECT 
      INSERT INTO … VALUESBehaves like upsert
      INSERT INTO … SELECT …Behaves like upsert
      DELETE 
      CREATE SCHEMAOnly allowed, if schema emulation is enabled
      DROP SCHEMAOnly allowed, if schema emulation is enabled
      CREATE TABLESee Create Table
      CREATE TABLE … AS 
      DROP TABLE 
      ALTER TABLE … RENAME TO … 
      ALTER TABLE … RENAME COLUMN …Only allowed, if not part of primary key
      ALTER TABLE … ADD COLUMN …See
      ALTER TABLE … DROP COLUMN …Only allowed, if not part of primary key
      SHOW SCHEMAS 
      SHOW TABLES 
      SHOW CREATE TABLE 
      SHOW COLUMNS FROM 
      DESCRIBESame as SHOW COLUMNS FROM
      CALL kudu.system.add_range_partitionAdds range partition to a table. See Managing range partitions
      CALL kudu.system.drop_range_partitionDrops a range partition from a table. See

      ALTER SCHEMA ... RENAME TO ... is not supported.

      Create Table

      On creating a Kudu Table you need to provide the columns and their types, of course, but Kudu needs information about partitioning and optionally for column encoding and compression.

      1. CREATE TABLE user_events (
      2. user_id int WITH (primary_key = true),
      3. event_name varchar WITH (primary_key = true),
      4. message varchar,
      5. details varchar WITH (nullable = true, encoding = 'plain')
      6. ) WITH (
      7. partition_by_hash_columns = ARRAY['user_id'],
      8. partition_by_hash_buckets = 5,
      9. number_of_replicas = 3
      10. );

      The primary key consists of user_id and event_name, the table is partitioned into five partitions by hash values of the column user_id, and the number_of_replicas is explicitly set to 3.

      The primary key columns must always be the first columns of the column list. All columns used in partitions must be part of the primary key.

      The table property number_of_replicas is optional. It defines the number of tablet replicas and must be an odd number. If it is not specified, the default replication factor from the Kudu master configuration is used.

      Kudu supports two different kinds of partitioning: hash and range partitioning. Hash partitioning distributes rows by hash value into one of many buckets. Range partitions distributes rows using a totally-ordered range partition key. The concrete range partitions must be created explicitly. Kudu also supports multi-level partitioning. A table must have at least one partitioning (either hash or range). It can have at most one range partitioning, but multiple hash partitioning ‘levels’.

      For more details see .

      Besides column name and type, you can specify some more properties of a column.

      Example

      1. CREATE TABLE mytable (
      2. name varchar WITH (primary_key = true, encoding = 'dictionary', compression = 'snappy'),
      3. index bigint WITH (nullable = true, encoding = 'runlength', compression = 'lz4'),
      4. comment varchar WITH (nullable = true, encoding = 'plain', compression = 'default'),
      5. ...
      6. ) WITH (...);

      A table must have at least one partitioning (either hash or range). It can have at most one range partitioning, but multiple hash partitioning ‘levels’. For more details see Apache Kudu documentation:

      If you create a Kudu table in Presto, the partitioning design is given by several table properties.

      Hash partitioning

      You can provide the first hash partition group with two table properties:

      The partition_by_hash_columns defines the column(s) belonging to the partition group and the number of partitions to split the hash values range into. All partition columns must be part of the primary key.

      Example:

      1. CREATE TABLE mytable (
      2. col1 varchar WITH (primary_key=true),
      3. col2 varchar WITH (primary_key=true),
      4. ...
      5. ) WITH (
      6. partition_by_hash_columns = ARRAY['col1', 'col2'],
      7. )

      This defines a hash partitioning with the columns col1 and col2 distributed over 4 partitions.

      To define two separate hash partition groups use also the second pair of table properties named partition_by_second_hash_columns and partition_by_second_hash_buckets.

      Example:

      This defines a two-level hash partitioning with the first hash partition group over the column col1 distributed over 2 buckets and the second hash partition group over the column col2 distributed over 3 buckets. As a result you have table with 2 x 3 = 6 partitions.

      Range partitioning

      You can provide at most one range partitioning in Apache Kudu. The columns are defined with the table property partition_by_range_columns. The ranges themselves are given either in the table property range_partitions on creating the table. Or alternatively, the procedures kudu.system.add_range_partition and kudu.system.drop_range_partition can be used to manage range partitions for existing tables. For both ways see below for more details.

      Example:

      With the range_partitions table property you specify the concrete range partitions to be created. The range partition definition itself must be given in the table property partition_design separately.

      Example:

      1. CREATE TABLE events (
      2. serialno varchar WITH (primary_key = true),
      3. event_time timestamp WITH (primary_key = true),
      4. message varchar
      5. ) WITH (
      6. partition_by_hash_columns = ARRAY['serialno'],
      7. partition_by_hash_buckets = 4,
      8. partition_by_range_columns = ARRAY['event_time'],
      9. range_partitions = '[{"lower": null, "upper": "2017-01-01T00:00:00"},
      10. {"lower": "2017-01-01T00:00:00", "upper": "2017-07-01T00:00:00"},
      11. {"lower": "2017-07-01T00:00:00", "upper": "2018-01-01T00:00:00"}]'
      12. );

      This creates a table with a hash partition on column serialno with 4 buckets and range partitioning on column event_time. Additionally three range partitions are created:

      1. for all event_times before the year 2017 (lower bound = null means it is unbound)
      2. for the first half of the year 2017
      3. for the second half the year 2017

      This means any try to add rows with event_time of year 2018 or greater will fail, as no partition is defined. The next section shows how to define a new range partition for an existing table.

      Managing range partitions

      For existing tables, there are procedures to add and drop a range partition.

      • adding a range partition

        1. CALL kudu.system.add_range_partition(<schema>, <table>, <range_partition_as_json_string>),
      • dropping a range partition

        1. CALL kudu.system.drop_range_partition(<schema>, <table>, <range_partition_as_json_string>)
        • <schema>: schema of the table

        • <table>: table names

        • <range_partition_as_json_string>: lower and upper bound of the range partition as json string in the form '{"lower": <value>, "upper": <value>}', or if the range partition has multiple columns: '{"lower": [<value_col1>,...], "upper": [<value_col1>,...]}'. The concrete literal for lower and upper bound values are depending on the column types.

          Examples:

          Presto Data TypeJSON string example
          BIGINT‘{“lower”: 0, “upper”: 1000000}’
          SMALLINT‘{“lower”: 10, “upper”: null}’
          VARCHAR‘{“lower”: “A”, “upper”: “M”}’
          TIMESTAMP‘{“lower”: “2018-02-01T00:00:00.000”, “upper”: “2018-02-01T12:00:00.000”}’
          BOOLEAN‘{“lower”: false, “upper”: true}’
          VARBINARYvalues encoded as base64 strings

          To specified an unbounded bound, use the value null.

      Example:

      This would add a range partition for a table events in the schema myschema with the lower bound 2018-01-01 (more exactly 2018-01-01T00:00:00.000) and the upper bound 2018-07-01.

      Use the sql statement SHOW CREATE TABLE to query the existing range partitions (they are shown in the table property range_partitions).

      Adding a column to an existing table uses the SQL statement ALTER TABLE ... ADD COLUMN .... You can specify the same column properties as on creating a table.

      Example:

      Known limitations

      • Only lower case table and column names in Kudu are supported
      • Using a secured Kudu cluster has not been tested.