7.6. Hive Connector
- Data files in varying formats that are typically stored in the Hadoop Distributed File System (HDFS) or in Amazon S3.
- Metadata about how the data files are mapped to schemas and tables. This metadata is stored in a database such as MySQL and is accessed via the Hive metastore service.
- A query language called HiveQL. This query language is executed on a distributed computing framework such as MapReduce or Tez.
Presto only uses the first two components: the data and the metadata. It does not use HiveQL or any part of Hive’s execution environment.
Supported File Types
The following file types are supported for the Hive connector:
- ORC
- Parquet
- Avro
- RCFile
- SequenceFile
- JSON
- Text
Configuration
The Hive connector supports Apache Hadoop 2.x and derivative distributions including Cloudera CDH 5 and Hortonworks Data Platform (HDP).
Create with the following contents to mount the hive-hadoop2
connector as the hive
catalog, replacing example.net:9083
with the correct host and port for your Hive metastore Thrift service:
You can have as many catalogs as you need, so if you have additional Hive clusters, simply add another properties file to etc/catalog
with a different name (making sure it ends in .properties
). For example, if you name the property file sales.properties
, Presto will create a catalog named sales
using the configured connector.
HDFS Configuration
For basic setups, Presto configures the HDFS client automatically and does not require any configuration files. In some cases, such as when using federated HDFS or NameNode high availability, it is necessary to specify additional HDFS client options in order to access your HDFS cluster. To do so, add the hive.config.resources
property to reference your HDFS config files:
hive.config.resources=/etc/hadoop/conf/core-site.xml,/etc/hadoop/conf/hdfs-site.xml
Only specify additional configuration files if necessary for your setup. We also recommend reducing the configuration files to have the minimum set of required properties, as additional properties may cause problems.
The configuration files must exist on all Presto nodes. If you are referencing existing Hadoop config files, make sure to copy them to any Presto nodes that are not running Hadoop.
HDFS Username
When not using Kerberos with HDFS, Presto will access HDFS using the OS user of the Presto process. For example, if Presto is running as nobody
, it will access HDFS as nobody
. You can override this username by setting the HADOOP_USER_NAME
system property in the Presto JVM Config, replacing hdfs_user
with the appropriate username:
-DHADOOP_USER_NAME=hdfs_user
Accessing Hadoop clusters protected with Kerberos authentication
Kerberos authentication is supported for both HDFS and the Hive metastore. However, Kerberos authentication by ticket cache is not yet supported.
The properties that apply to Hive connector security are listed in the Hive Configuration Properties table. Please see the section for a more detailed discussion of the security options in the Hive connector.
Hive Configuration Properties
The Hive Connector can read and write tables that are stored in S3. This is accomplished by having a table or database location that uses an S3 prefix rather than an HDFS prefix.
Presto uses its own S3 filesystem for the URI prefixes s3://
, s3n://
and s3a://
.
S3 Credentials
If you are running Presto on Amazon EC2 using EMR or another facility, it is highly recommended that you set hive.s3.use-instance-credentials
to true
and use IAM Roles for EC2 to govern access to S3. If this is the case, your EC2 instances will need to be assigned an IAM Role which grants appropriate access to the data stored in the S3 bucket(s) you wish to use. It’s also possible to configure an IAM role with hive.s3.iam-role
that will be assumed for accessing any S3 bucket. This is much cleaner than setting AWS access and secret keys in the hive.s3.aws-access-key
and hive.s3.aws-secret-key
settings, and also allows EC2 to automatically rotate credentials on a regular basis without any additional work on your part.
Custom S3 Credentials Provider
You can configure a custom S3 credentials provider by setting the Hadoop configuration property presto.s3.credentials-provider
to be the fully qualified class name of a custom AWS credentials provider implementation. This class must implement the interface and provide a two-argument constructor that takes a java.net.URI
and a Hadoop org.apache.hadoop.conf.Configuration
as arguments. A custom credentials provider can be used to provide temporary credentials from STS (using STSSessionCredentialsProvider
), IAM role-based credentials (using STSAssumeRoleSessionCredentialsProvider
), or credentials for a specific use case (e.g., bucket/user specific credentials). This Hadoop configuration property must be set in the Hadoop configuration files referenced by the hive.config.resources
Hive connector property.
Tuning Properties
The following tuning properties affect the behavior of the client used by the Presto S3 filesystem when communicating with S3. Most of these parameters affect settings on the ClientConfiguration
object associated with the AmazonS3Client
.
Presto supports reading and writing encrypted data in S3 using both server-side encryption with S3 managed keys and client-side encryption using either the Amazon KMS or a software plugin to manage AES encryption keys.
With , (called SSE-S3 in the Amazon documentation) the S3 infrastructure takes care of all encryption and decryption work (with the exception of SSL to the client, assuming you have hive.s3.ssl.enabled
set to true
). S3 also manages all the encryption keys for you. To enable this, set hive.s3.sse.enabled
to true
.
With S3 client-side encryption, S3 stores encrypted data and the encryption keys are managed outside of the S3 infrastructure. Data is encrypted and decrypted by Presto instead of in the S3 infrastructure. In this case, encryption keys can be managed either by using the AWS KMS or your own key management system. To use the AWS KMS for key management, set hive.s3.kms-key-id
to the UUID of a KMS key. Your AWS credentials or EC2 IAM role will need to be granted permission to use the given key as well.
To use a custom encryption key management system, set hive.s3.encryption-materials-provider
to the fully qualified name of a class which implements the interface from the AWS Java SDK. This class will have to be accessible to the Hive Connector through the classpath and must be able to communicate with your custom key management system. If this class also implements the org.apache.hadoop.conf.Configurable
interface from the Hadoop Java API, then the Hadoop configuration will be passed in after the object instance is created and before it is asked to provision or retrieve any encryption keys.
S3SelectPushdown
S3SelectPushdown enables pushing down projection (SELECT) and predicate (WHERE) processing to . With S3SelectPushdown Presto only retrieves the required data from S3 instead of entire S3 objects reducing both latency and network usage.
Is S3 Select a good fit for my workload?
Use the following guidelines to determine if S3 Select is a good fit for your workload:
- Your query filters out more than half of the original data set.
- Your query filter predicates use columns that have a data type supported by Presto and S3 Select. The
TIMESTAMP
,REAL
, andDOUBLE
data types are not supported by S3 Select Pushdown. We recommend using the decimal data type for numerical data. For more information about supported data types for S3 Select, see the . - Your network connection between Amazon S3 and the Amazon EMR cluster has good transfer speed and available bandwidth. Amazon S3 Select does not compress HTTP responses, so the response size may increase for compressed input files.
Considerations and Limitations
- Only objects stored in CSV format are supported. Objects can be uncompressed or optionally compressed with gzip or bzip2.
- The “AllowQuotedRecordDelimiters” property is not supported. If this property is specified, the query fails.
- Amazon S3 server-side encryption with customer-provided encryption keys (SSE-C) and client-side encryption are not supported.
- S3 Select Pushdown is not a substitute for using columnar or compressed file formats such as ORC and Parquet.
Enabling S3 Select Pushdown
You can enable S3 Select Pushdown using the s3_select_pushdown_enabled
Hive session property or using the hive.s3select-pushdown.enabled
configuration property. The session property will override the config property, allowing you enable or disable on a per-query basis.
Understanding and Tuning the Maximum Connections
Presto can use its native S3 file system or EMRFS. When using the native FS, the maximum connections is configured via the hive.s3.max-connections
configuration property. When using EMRFS, the maximum connections is configured via the fs.s3.maxConnections
Hadoop configuration property.
S3 Select Pushdown bypasses the file systems when accessing Amazon S3 for predicate operations. In this case, the value of hive.s3select-pushdown.max-connections
determines the maximum number of client connections allowed for those operations from worker nodes.
If your workload experiences the error Timeout waiting for connection from pool, increase the value of both hive.s3select-pushdown.max-connections
and the maximum connections configuration for the file system you are using.
Alluxio Configuration
Presto can read and write tables stored in the Alluxio Data Orchestration System Alluxio, leveraging Alluxio’s distributed block-level read/write caching functionality. The tables must be created in the Hive metastore with the alluxio://
location prefix (see for details and examples). Presto queries will then transparently retrieve and cache files or objects from a variety of disparate storage systems including HDFS and S3.
Alluxio Client-Side Configuration
To configure Alluxio client-side properties on Presto, append the Alluxio configuration directory (${ALLUXIO_HOME}/conf
) to the Presto JVM classpath, so that the Alluxio properties file alluxio-site.properties
can be loaded as a resource. Update the Presto file etc/jvm.config
to include the following:
-Xbootclasspath/a:<path-to-alluxio-conf>
The advantage of this approach is that all the Alluxio properties are set in the single alluxio-site.properties
file. For details, see Customize Alluxio User Properties.
Alternatively, add Alluxio configuration properties to the Hadoop configuration files (core-site.xml
, hdfs-site.xml
) and configure the Hive connector to use the via the hive.config.resources
connector property.
Deploy Alluxio with Presto
To achieve the best performance running Presto on Alluxio, it is recommended to collocate Presto workers with Alluxio workers. This allows reads and writes to bypass the network. See for more details.
An alternative way for Presto to interact with Alluxio is via the Alluxio Catalog Service.. The primary benefits for using the Alluxio Catalog Service are simpler deployment of Alluxio with Presto, and enabling schema-aware optimizations such as transparent caching and transformations. Currently, the catalog service supports read-only workloads.
The Alluxio Catalog Service is a metastore that can cache the information from different underlying metastores. It currently supports the Hive metastore as an underlying metastore. In for the Alluxio Catalog to manage the metadata of other existing metastores, the other metastores must be “attached” to the Alluxio catalog. To attach an existing Hive metastore to the Alluxio Catalog, simply use the . The appropriate Hive metastore location and Hive database name need to be provided.
./bin/alluxio table attachdb hive thrift://HOSTNAME:9083 hive_db_name
Once a metastore is attached, the Alluxio Catalog can manage and serve the information to Presto. To configure the Hive connector for Alluxio Catalog Service, simply configure the connector to use the Alluxio metastore type, and provide the location to the Alluxio cluster. For example, your etc/catalog/catalog_alluxio.properties
will include the following (replace the Alluxio address with the appropriate location):
Now, Presto queries can take advantage of the Alluxio Catalog Service, such as transparent caching and transparent transformations, without any modifications to existing Hive metastore deployments.
Table Statistics
The Hive connector automatically collects basic statistics (numFiles', ``numRows
, rawDataSize
, totalSize
) on INSERT
and CREATE TABLE AS
operations.
The Hive connector can also collect column level statistics:
Automatic column level statistics collection on write is controlled by the collect-column-statistics-on-write
catalog session property.
Collecting table and column statistics
The Hive connector supports collection of table and partition statistics via the ANALYZE statement. When analyzing a partitioned table, the partitions to analyze can be specified via the optional partitions
property, which is an array containing the values of the partition keys in the order they are declared in the table schema:
ANALYZE hive.sales WITH (
partitions = ARRAY[
ARRAY['partition1_value1', 'partition1_value2'],
ARRAY['partition2_value1', 'partition2_value2']]);
This query will collect statistics for 2 partitions with keys:
partition1_value1, partition1_value2
partition2_value1, partition2_value2
Hive allows the partitions in a table to have a different schema than the table. This occurs when the column types of a table are changed after partitions already exist (that use the original column types). The Hive connector supports this by allowing the same conversions as Hive:
varchar
to and from ,smallint
,integer
andbigint
real
todouble
- Widening conversions for integers, such as
tinyint
tosmallint
Any conversion failure will result in null, which is the same behavior as Hive. For example, converting the string 'foo'
to a number, or converting the string '1234'
to a tinyint
(which has a maximum value of 127
).
Avro Schema Evolution
To specify that Avro schema should be used for interpreting table’s data one must use avro_schema_url
table property. The schema can be placed remotely in HDFS (e.g. avro_schema_url = 'hdfs://user/avro/schema/avro_data.avsc'
), S3 (e.g. avro_schema_url = 's3n:///schema_bucket/schema/avro_data.avsc'
), a web server (e.g. avro_schema_url = 'http://example.org/schema/avro_data.avsc'
) as well as local file system. This url where the schema is located, must be accessible from the Hive metastore and Presto coordinator/worker nodes.
The table created in Presto using avro_schema_url
behaves the same way as a Hive table with avro.schema.url
or avro.schema.literal
set.
Example:
CREATE TABLE hive.avro.avro_data (
id bigint
)
WITH (
format = 'AVRO',
avro_schema_url = '/usr/local/avro_data.avsc'
)
The columns listed in the DDL (id
in the above example) will be ignored if avro_schema_url
is specified. The table schema will match the schema in the Avro schema file. Before any read operation, the Avro schema is accessed so query result reflects any changes in schema. Thus Presto takes advantage of Avro’s backward compatibility abilities.
If the schema of the table changes in the Avro schema file, the new schema can still be used to read old data. Newly added/renamed fields must have a default value in the Avro schema file.
The schema evolution behavior is as follows:
- Column added in new schema: Data created with an older schema will produce a default value when table is using the new schema.
- Column removed in new schema: Data created with an older schema will no longer output the data from the column that was removed.
- Changing type of column in the new schema: If the type coercion is supported by Avro or the Hive connector, then the conversion happens. An error is thrown for incompatible types.
Limitations
The following operations are not supported when avro_schema_url
is set:
CREATE TABLE AS
is not supported.- Using partitioning(
partitioned_by
) or bucketing(bucketed_by
) columns are not supported inCREATE TABLE
. ALTER TABLE
commands modifying columns are not supported.
Procedures
system.create_empty_partition(schema_name, table_name, partition_columns, partition_values)
system.sync_partition_metadata(schema_name, table_name, mode, case_sensitive)
Check and update partitions list in metastore. There are three modes available:
ADD
: add any partitions that exist on the file system but not in the metastore.DROP
: drop any partitions that exist in the metastore but not on the file system.FULL
: perform bothADD
andDROP
.
The
case_sensitive
argument is optional. The default value istrue
for compatibility with Hive’sMSCK REPAIR TABLE
behavior, which expects the partition column names in file system paths to use lowercase (e.g.col_x=SomeValue
). Partitions on the file system not conforming to this convention are ignored, unless the argument is set tofalse
.
Examples
The Hive connector supports querying and manipulating Hive tables and schemas (databases). While some uncommon operations will need to be performed using Hive directly, most operations can be performed using Presto.
Create a new Hive schema named web
that will store tables in an S3 bucket named my-bucket
:
CREATE SCHEMA hive.web
WITH (location = 's3://my-bucket/')
Create a new Hive table named page_views
in the web
schema that is stored using the ORC file format, partitioned by date and country, and bucketed by user into 50
buckets (note that Hive requires the partition columns to be the last columns in the table):
CREATE TABLE hive.web.page_views (
view_time timestamp,
user_id bigint,
page_url varchar,
ds date,
country varchar
)
WITH (
format = 'ORC',
partitioned_by = ARRAY['ds', 'country'],
bucketed_by = ARRAY['user_id'],
bucket_count = 50
)
Drop a partition from the page_views
table:
Add an empty partition to the page_views
table:
CALL system.create_empty_partition(
schema_name => 'web',
table_name => 'page_views',
partition_columns => ARRAY['ds', 'country'],
partition_values => ARRAY['2016-08-09', 'US']);
Query the page_views
table:
SELECT * FROM hive.web.page_views
List the partitions of the page_views
table:
SELECT * FROM hive.web."page_views$partitions"
Create an external Hive table named request_logs
that points at existing data in S3:
CREATE TABLE hive.web.request_logs (
request_time timestamp,
url varchar,
ip varchar,
user_agent varchar
)
WITH (
format = 'TEXTFILE',
external_location = 's3://my-bucket/data/logs/'
)
Drop the external table request_logs
. This only drops the metadata for the table. The referenced data directory is not deleted:
is only supported if the clause matches entire partitions.