5.1. Accumulo Connector
Installing the Iterator Dependency
The Accumulo connector uses custom Accumulo iterators inorder to push various information in a SQL predicate clause to Accumulo forserver-side filtering, known as predicate pushdown. In orderfor the server-side iterators to work, you need to add the jar file to Accumulo’s lib/ext
directory on each TabletServer node.
Note that this uses Java 8. If your Accumulo cluster is using Java 7,you’ll receive an Unsupported major.minor version 52.0
error in your TabletServer logs when youattempt to create an indexed table. You’ll instead need to use the presto-accumulo-iterators jar filethat is located at .
Connector Configuration
Create etc/catalog/accumulo.properties
to mount the accumulo
connector as the accumulo
catalog,replacing the accumulo.xxx
properties as required:
- connector.name=accumulo
- accumulo.instance=xxx
- accumulo.zookeepers=xxx
- accumulo.username=username
- accumulo.password=password
Configuration Variables
Unsupported Features
The following features are not supported:
- Adding columns via
ALTER TABLE
: While you cannot add columns via SQL, you can using a tool.See the below section on for more details. DELETE
: Deletion of rows is not yet implemented for the connector.
Simply begin using SQL to create a new table in Accumulo to beginworking with data. By default, the first column of the table definitionis set to the Accumulo row ID. This should be the primary key of yourtable, and keep in mind that any INSERT
statements containing the samerow ID is effectively an UPDATE as far as Accumulo is concerned, as anyprevious data in the cell will be overwritten. The row ID can beany valid Presto datatype. If the first column is not your primary key, youcan set the row ID column using the row_id
table property within the WITH
clause of your table definition.
Simply issue a CREATE TABLE
statement to create a new Presto/Accumulo table:
- CREATE TABLE myschema.scientists (
- recordkey VARCHAR,
- name VARCHAR,
- age BIGINT,
- birthday DATE
- );
- DESCRIBE myschema.scientists;
- Column | Type | Extra | Comment
- -----------+---------+-------+---------------------------------------------------
- recordkey | varchar | | Accumulo row ID
- name | varchar | | Accumulo column name:name. Indexed: false
- age | bigint | | Accumulo column age:age. Indexed: false
- birthday | date | | Accumulo column birthday:birthday. Indexed: false
This command will create a new Accumulo table with the recordkey
columnas the Accumulo row ID. The name, age, and birthday columns are mapped toauto-generated column family and qualifier values (which, in practice,are both identical to the Presto column name).
When creating a table using SQL, you can optionally specify acolumn_mapping
table property. The value of this property is acomma-delimited list of triples, presto column : accumulo columnfamily : accumulo column qualifier, with one triple for everynon-row ID column. This sets the mapping of the Presto column name tothe corresponding Accumulo column family and column qualifier.
If you don’t specify the column_mapping
table property, then theconnector will auto-generate column names (respecting any configured locality groups).Auto-generation of column names is only available for internal tables, so if yourtable is external you must specify the column_mapping property.
For a full list of table properties, see Table Properties.
For example:
- CREATE TABLE myschema.scientists (
- recordkey VARCHAR,
- name VARCHAR,
- age BIGINT,
- birthday DATE
- )
- WITH (
- column_mapping = 'name:metadata:name,age:metadata:age,birthday:metadata:date'
- );
- DESCRIBE myschema.scientists;
- Column | Type | Extra | Comment
- -----------+---------+-------+-----------------------------------------------
- recordkey | varchar | | Accumulo row ID
- name | varchar | | Accumulo column metadata:name. Indexed: false
- age | bigint | | Accumulo column metadata:age. Indexed: false
- birthday | date | | Accumulo column metadata:date. Indexed: false
You can then issue INSERT
statements to put data into Accumulo.
Note
While issuing INSERT
statements is convenient,this method of loading data into Accumulo is low-throughput. You’ll wantto use the Accumulo APIs to write Mutations
directly to the tables.See the section on for more details.
- INSERT INTO myschema.scientists VALUES
- ('row1', 'Grace Hopper', 109, DATE '1906-12-09' ),
- ('row2', 'Alan Turing', 103, DATE '1912-06-23' );
- SELECT * FROM myschema.scientists;
- recordkey | name | age | birthday
- -----------+--------------+-----+------------
- row1 | Grace Hopper | 109 | 1906-12-09
- row2 | Alan Turing | 103 | 1912-06-23
- (2 rows)
- $ accumulo shell -u root -p secret
- root@default> table myschema.scientists
- root@default myschema.scientists> insert row3 metadata name "Tim Berners-Lee"
- root@default myschema.scientists> insert row3 metadata age 60
- root@default myschema.scientists> insert row3 metadata date 5321
- SELECT * FROM myschema.scientists;
- recordkey | name | age | birthday
- -----------+-----------------+-----+------------
- row1 | Grace Hopper | 109 | 1906-12-09
- row2 | Alan Turing | 103 | 1912-06-23
- row3 | Tim Berners-Lee | 60 | 1984-07-27
- (3 rows)
You can also drop tables using DROP TABLE
. This command drops bothmetadata and the tables. See the below section on ExternalTables for more details on internal and externaltables.
Indexing Columns
Internally, the connector creates an Accumulo Range
and packs it ina split. This split gets passed to a Presto Worker to read the data fromthe Range
via a BatchScanner
. When issuing a query that resultsin a full table scan, each Presto Worker gets a single Range
thatmaps to a single tablet of the table. When issuing a query with apredicate (i.e. WHERE x = 10
clause), Presto passes the valueswithin the predicate (10
) to the connector so it can use thisinformation to scan less data. When the Accumulo row ID is used as partof the predicate clause, this narrows down the Range
lookup to quicklyretrieve a subset of data from Accumulo.
But what about the other columns? If you’re frequently querying onnon-row ID columns, you should consider using the indexingfeature built into the Accumulo connector. This feature can drasticallyreduce query runtime when selecting a handful of values from the table,and the heavy lifting is done for you when loading data via PrestoINSERT
statements (though, keep in mind writing data to Accumulo viaINSERT
does not have high throughput).
To enable indexing, add the index_columns
table property and specifya comma-delimited list of Presto column names you wish to index (we use thestring
serializer here to help with this example – youshould be using the default serializer).
- CREATE TABLE myschema.scientists (
- recordkey VARCHAR,
- name VARCHAR,
- age BIGINT,
- )
- WITH (
- serializer = 'string',
- index_columns='name,age,birthday'
- );
After creating the table, we see there are an additional two Accumulotables to store the index and metrics.
- root@default> tables
- accumulo.metadata
- accumulo.root
- myschema.scientists
- myschema.scientists_idx
- myschema.scientists_idx_metrics
- trace
After inserting data, we can look at the index table and see there areindexed values for the name, age, and birthday columns. The connectorqueries this index table
- INSERT INTO myschema.scientists VALUES
- ('row1', 'Grace Hopper', 109, DATE '1906-12-09'),
- ('row2', 'Alan Turing', 103, DATE '1912-06-23');
- root@default> scan -t myschema.scientists_idx
- -21011 metadata_date:row2 []
- -23034 metadata_date:row1 []
- 103 metadata_age:row2 []
- 109 metadata_age:row1 []
- Alan Turing metadata_name:row2 []
- Grace Hopper metadata_name:row1 []
When issuing a query with a WHERE
clause against indexed columns,the connector searches the index table for all row IDs that contain thevalue within the predicate. These row IDs are bundled into a Prestosplit as single-value Range
objects (the number of row IDs per splitis controlled by the value of accumulo.index_rows_per_split
) andpassed to a Presto worker to be configured in the BatchScanner
whichscans the data table.
- SELECT * FROM myschema.scientists WHERE age = 109;
- recordkey | name | age | birthday
- -----------+--------------+-----+------------
- row1 | Grace Hopper | 109 | 1906-12-09
- (1 row)
Loading Data
The Accumulo connector supports loading data via INSERT statements, howeverthis method tends to be low-throughput and should not be relied on when throughputis a concern. Instead, users of the connector should use the PrestoBatchWriter
tool that is provided as part of the presto-accumulo-tools subproject in the.
The PrestoBatchWriter
is a wrapper class for the typical BatchWriter
thatleverages the Presto/Accumulo metadata to write Mutations to the main data table.In particular, it handles indexing the given mutations on any indexed columns.Usage of the tool is provided in the README in the repository.
External Tables
By default, the tables created using SQL statements via Presto areinternal tables, that is both the Presto table metadata and theAccumulo tables are managed by Presto. When you create an internaltable, the Accumulo table is created as well. You will receive an errorif the Accumulo table already exists. When an internal table is droppedvia Presto, the Accumulo table (and any index tables) are dropped aswell.
To change this behavior, set the external
property to true
whenissuing the CREATE
statement. This will make the table an _external_table, and a DROP TABLE
command will only delete the metadataassociated with the table. If the Accumulo tables do not already exist,they will be created by the connector.
Creating an external table will set any configured locality groups as wellas the iterators on the index and metrics tables (if the table is indexed).In short, the only difference between an external table and an internal tableis the connector will delete the Accumulo tables when a DROP TABLE
commandis issued.
External tables can be a bit more difficult to work with, as the data is storedin an expected format. If the data is not stored correctly, then you’regonna have a bad time. Users must provide a columnmapping
propertywhen creating the table. This creates the mapping of Presto column nameto the column family/qualifier for the cell of the table. The value of thecell is stored in the Value
of the Accumulo key/value pair. By default,this value is expected to be serialized using Accumulo’s _lexicoder API.If you are storing values as strings, you can specify a different serializerusing the serializer
property of the table. See the section onTable Properties for more information.
Next, we create the Presto external table.
- CREATE TABLE external_table (
- a VARCHAR,
- b BIGINT,
- c DATE
- )
- WITH (
- column_mapping = 'a:md:a,b:md:b,c:md:c',
- external = true,
- index_columns = 'b,c',
- locality_groups = 'foo:b,c'
- );
- INSERT INTO external_table VALUES
- ('1', 1, DATE '2015-03-06'),
- ('2', 2, DATE '2015-03-07');
- SELECT * FROM external_table;
- a | b | c
- ---+---+------------
- 1 | 1 | 2015-03-06
- 2 | 2 | 2015-03-06
- (2 rows)
- DROP TABLE external_table;
After dropping the table, the table will still exist in Accumulo because it is external.
- root@default> tables
- accumulo.metadata
- accumulo.root
- external_table
- external_table_idx
- external_table_idx_metrics
- trace
If we wanted to add a new column to the table, we can create the table again and specify a new column.Any existing rows in the table will have a value of NULL. This command will re-configure the Accumulotables, setting the locality groups and iterator configuration.
- CREATE TABLE external_table (
- a VARCHAR,
- b BIGINT,
- c DATE,
- d INTEGER
- )
- WITH (
- column_mapping = 'a:md:a,b:md:b,c:md:c,d:md:d',
- external = true,
- index_columns = 'b,c,d',
- locality_groups = 'foo:b,c,d'
- );
- SELECT * FROM external_table;
Table Properties
Table property usage example:
- CREATE TABLE myschema.scientists (
- recordkey VARCHAR,
- name VARCHAR,
- age BIGINT,
- birthday DATE
- )
- WITH (
- index_columns = 'name,age'
- );
You can change the default value of a session property by using SET SESSION.Note that session properties are prefixed with the catalog name:
- SET SESSION accumulo.column_filter_optimizations_enabled = false;
Adding Columns
Adding a new column to an existing table cannot be done today viaALTER TABLE [table] ADD COLUMN [name] [type]
because of the additionalmetadata required for the columns to work; the column family, qualifier,and if the column is indexed.
Instead, you can use one of the utilities in thepresto-accumulo-toolssub-project of the presto-accumulo
repository. Documentation and usage can be found in the README.
Serializers
The Presto connector for Accumulo has a pluggable serializer frameworkfor handling I/O between Presto and Accumulo. This enables end-users theability to programatically serialized and deserialize their special dataformats within Accumulo, while abstracting away the complexity of theconnector itself.
There are two types of serializers currently available; a string
serializer that treats values as Java String
and a lexicoder
serializer that leverages Accumulo’s Lexicoder API to store values. Thedefault serializer is the lexicoder
serializer, as this serializerdoes not require expensive conversion operations back and forth betweenString
objects and the Presto types – the cell’s value is encoded as abyte array.
Additionally, the lexicoder
serializer does proper lexigraphical ordering ofnumerical types like BIGINT
or TIMESTAMP
. This is essential for the connectorto properly leverage the secondary index when querying for data.
You can change the default the serializer by specifying theserializer
table property, using either default
(which islexicoder
), string
or lexicoder
for the built-in types, oryou could provide your own implementation by extendingAccumuloRowSerializer
, adding it to the Presto CLASSPATH
, andspecifying the fully-qualified Java class name in the connector configuration.
- CREATE TABLE myschema.scientists (
- recordkey VARCHAR,
- name VARCHAR,
- age BIGINT,
- birthday DATE
- )
- WITH (
- column_mapping = 'name:metadata:name,age:metadata:age,birthday:metadata:date',
- serializer = 'default'
- );
- INSERT INTO myschema.scientists VALUES
- ('row1', 'Grace Hopper', 109, DATE '1906-12-09' ),
- ('row2', 'Alan Turing', 103, DATE '1912-06-23' );
- root@default> scan -t myschema.scientists
- row1 metadata:age [] \x08\x80\x00\x00\x00\x00\x00\x00m
- row1 metadata:date [] \x08\x7F\xFF\xFF\xFF\xFF\xFF\xA6\x06
- row1 metadata:name [] Grace Hopper
- row2 metadata:age [] \x08\x80\x00\x00\x00\x00\x00\x00g
- row2 metadata:date [] \x08\x7F\xFF\xFF\xFF\xFF\xFF\xAD\xED
- row2 metadata:name [] Alan Turing
- CREATE TABLE myschema.stringy_scientists (
- recordkey VARCHAR,
- name VARCHAR,
- age BIGINT,
- birthday DATE
- )
- WITH (
- column_mapping = 'name:metadata:name,age:metadata:age,birthday:metadata:date',
- serializer = 'string'
- );
- INSERT INTO myschema.stringy_scientists VALUES
- ('row1', 'Grace Hopper', 109, DATE '1906-12-09' ),
- ('row2', 'Alan Turing', 103, DATE '1912-06-23' );
- root@default> scan -t myschema.stringy_scientists
- row1 metadata:age [] 109
- row1 metadata:date [] -23034
- row1 metadata:name [] Grace Hopper
- row2 metadata:age [] 103
- row2 metadata:date [] -21011
- row2 metadata:name [] Alan Turing
- CREATE TABLE myschema.custom_scientists (
- recordkey VARCHAR,
- name VARCHAR,
- age BIGINT,
- birthday DATE
- )
- WITH (
- column_mapping = 'name:metadata:name,age:metadata:age,birthday:metadata:date',
- serializer = 'my.serializer.package.MySerializer'
- );
Metadata Management
Metadata for the Presto/Accumulo tables is stored in ZooKeeper. You can(and should) issue SQL statements in Presto to create and drop tables.This is the easiest method of creating the metadata required to make theconnector work. It is best to not mess with the metadata, but here arethe details of how it is stored. Information is power.
A root node in ZooKeeper holds all the mappings, and the format is asfollows:
- /metadata-root/schema/table
Where metadata-root
is the value of zookeeper.metadata.root
inthe config file (default is /presto-accumulo
), schema
is thePresto schema (which is identical to the Accumulo namespace name), andtable
is the Presto table name (again, identical to Accumulo name).The data of the table
ZooKeeper node is a serializedAccumuloTable
Java object (which resides in the connector code).This table contains the schema (namespace) name, table name, columndefinitions, the serializer to use for the table, and any additionaltable properties.
If you have a need to programmatically manipulate the ZooKeeper metadatafor Accumulo, take a look atcom.facebook.presto.accumulo.metadata.ZooKeeperMetadataManager
for someJava code to simplify the process.
Converting Table from Internal to External
For example:
- We’re starting with an internal table
foo.bar
that was created with the below DDL.If you have not previously defined a table property forcolumn_mapping
(like this example),be sure to describe the table before deleting the metadata. We’ll need the column mappingswhen creating the external table.
- CREATE TABLE foo.bar (a VARCHAR, b BIGINT, c DATE)
- WITH (
- index_columns = 'b,c'
- );
- DESCRIBE foo.bar;
- Column | Type | Extra | Comment
- --------+---------+-------+-------------------------------------
- a | varchar | | Accumulo row ID
- b | bigint | | Accumulo column b:b. Indexed: true
- c | date | | Accumulo column c:c. Indexed: true
- Using the ZooKeeper CLI, delete the corresponding znode. Note this uses the default ZooKeepermetadata root of
/presto-accumulo
- Re-create the table using the same DDL as before, but adding the property.Note that if you had not previously defined the column_mapping, you’ll need to add the propertyto the new DDL (external tables require this property to be set). The column mappings are inthe output of the
DESCRIBE
statement.
- CREATE TABLE foo.bar (
- a VARCHAR,
- b BIGINT,
- c DATE
- )
- WITH (
- column_mapping = 'a:a:a,b:b:b,c:c:c',
- index_columns = 'b,c',
- external = true