在尝试从HDFS读取或向HDFS写入数据之前,请确保已满足PXF Hadoop先决条件

PXF HDFS连接器 配置文件支持读写HDFS中SequenceFile格式的二进制数据。 当您将记录插入可写外部表中时,您插入的数据块将写入指定目录中的一个或多个文件。

注意: 使用可写配置创建的外部表仅INSERT操作。 如果要查询插入的数据,则必须单独创建一个引用相关HDFS目录的可读外部表。

使用以下语法创建一个引用HDFS目录的Greenplum数据库外部表:

命令中使用的特定关键字和值见下表中描述。

SequenceFile 格式数据可以选择采用record或block压缩。 PXF hdfs:SequenceFile 配置支持以下压缩编解码器:

  • org.apache.hadoop.io.compress.DefaultCodec
  • org.apache.hadoop.io.compress.BZip2Codec

使用 hdfs:SequenceFile 配置文件写入SequenceFile格式数据时,则必须提供Java类的名称,以用于序列化/反序列化二进制数据。 这个类必须为数据模式中引用的每种数据类型提供读写方法。

您可以通过 CREATE EXTERNAL TABLE LOCATION 子句的自定义选项指定压缩编解码器和Java 序列化类。 hdfs:SequenceFile 配置文件支持以下自定义选项:

选项值描述
COMPRESSION_CODEC压缩编码器类名称。 如果此选项未提供,Greenplum 数据库不执行数据压缩。 支持的压缩编解码器包括:
org.apache.hadoop.io.compress.DefaultCodec
org.apache.hadoop.io.compress.BZip2Codec
org.apache.hadoop.io.compress.GzipCodec
COMPRESSION_TYPE采用的压缩类型; 支持的值有 RECORD (默认) 或 BLOCK.
DATA-SCHEMA编写器序列化/反序列化类的名称。 此类所在的jar文件必须位于PXF类路径中。 hdfs:SequenceFile 配置文件需要此选项,并且没有默认值。
THREAD-SAFE确定表查询是否可以在多线程模式下运行的布尔值。 默认为 TRUE。 将此选项设置为 FALSE 以处理单个线程中所有非线程安全操作 (例如,压缩)的请求。

当您要读写HDFS中 SequenceFile 格式的数据,请使用HDFS 连接器 hdfs:SequenceFile 配置文件。 这种类型的文件由二进制键/值对组成。 SequenceFile 格式是MapReduce作业之间常见的数据传输格式。

在此示例中,您将创建一个名为 PxfExample_CustomWritable 的Java类,该类将对先前示例中使用的示例模式中的字段进行序列化/反序列化。然后,您将使用此类访问通过hdfs:SequenceFile配置文件创建的可写外部表,该表使用默认的PXF服务器。

  1. 准备创建示例Java类:

    1. $ mkdir -p pxfex/com/example/pxf/hdfs/writable/dataschema
    2. $ cd pxfex/com/example/pxf/hdfs/writable/dataschema
    3. $ vi PxfExample_CustomWritable.java
  2. 将以下文本复制并黏贴到 PxfExample_CustomWritable.java 文件中:

    1. package com.example.pxf.hdfs.writable.dataschema;
    2. import org.apache.hadoop.io.*;
    3. import java.io.DataInput;
    4. import java.io.DataOutput;
    5. import java.io.IOException;
    6. import java.lang.reflect.Field;
    7. /**
    8. * PxfExample_CustomWritable class - used to serialize and deserialize data with
    9. * text, int, and float data types
    10. */
    11. public class PxfExample_CustomWritable implements Writable {
    12. public String st1, st2;
    13. public int int1;
    14. public float ft;
    15. public PxfExample_CustomWritable() {
    16. st1 = new String("");
    17. st2 = new String("");
    18. int1 = 0;
    19. ft = 0.f;
    20. }
    21. public PxfExample_CustomWritable(int i1, int i2, int i3) {
    22. st1 = new String("short_string___" + i1);
    23. st2 = new String("short_string___" + i1);
    24. int1 = i2;
    25. }
    26. String GetSt1() {
    27. return st1;
    28. }
    29. String GetSt2() {
    30. return st2;
    31. }
    32. int GetInt1() {
    33. return int1;
    34. }
    35. float GetFt() {
    36. return ft;
    37. }
    38. @Override
    39. public void write(DataOutput out) throws IOException {
    40. Text txt = new Text();
    41. txt.set(st1);
    42. txt.write(out);
    43. txt.set(st2);
    44. txt.write(out);
    45. IntWritable intw = new IntWritable();
    46. intw.set(int1);
    47. intw.write(out);
    48. FloatWritable fw = new FloatWritable();
    49. fw.set(ft);
    50. fw.write(out);
    51. }
    52. @Override
    53. public void readFields(DataInput in) throws IOException {
    54. Text txt = new Text();
    55. txt.readFields(in);
    56. st1 = txt.toString();
    57. txt.readFields(in);
    58. st2 = txt.toString();
    59. IntWritable intw = new IntWritable();
    60. intw.readFields(in);
    61. int1 = intw.get();
    62. FloatWritable fw = new FloatWritable();
    63. fw.readFields(in);
    64. ft = fw.get();
    65. }
    66. public void printFieldTypes() {
    67. for (int i = 0; i < fields.length; i++) {
    68. System.out.println(fields[i].getType().getName());
    69. }
    70. }
    71. }
  3. 编译并为 PxfExample_CustomWritable 创建Java类JAR文件。 为您的Hadoop发行版提供一个包含 hadoop-common.jar 文件的类路径。 例如, 如果您安装了Hortonworks Data Platform Hadoop客户端:

    1. $ javac -classpath /usr/hdp/current/hadoop-client/hadoop-common.jar PxfExample_CustomWritable.java
    2. $ cd ../../../../../../
    3. $ jar cf pxfex-customwritable.jar com
    4. $ cp pxfex-customwritable.jar /tmp/

    (您的Hadoop库类路径可能不同)

  4. pxfex-customwritable.jar 文件复制到Greenplum 数据库master节点。 例如:

    1. $ scp pxfex-customwritable.jar gpadmin@gpmaster:/home/gpadmin
  5. 登录到您的Greenplum数据库master节点:

  6. pxfex-customwritable.jar JAR文件复制到用户运行时类库目录中,并记下位置。 例如, 如果 PXF_CONF=/usr/local/greenplum-pxf

    1. gpadmin@gpmaster$ cp /home/gpadmin/pxfex-customwritable.jar /usr/local/greenplum-pxf/lib/pxfex-customwritable.jar
  7. 将PXF配置同步到Greenplum数据库集群。 例如:

    1. gpadmin@gpmaster$ $GPHOME/pxf/bin/pxf cluster sync
  8. 重启PXF中所述,在每个Greenplum数据库segment主机上重启PXF。

  9. 使用PXF hdfs:SequenceFile 配置文件创建Greenplum数据库可写外部表。 在 DATA-SCHEMA 自定义选项 中标识您在上面创建的序列化/反序列化Java类。 创建可写外部表时,使用 BZip2 压缩并指定 BLOCK 压缩模式。

    1. postgres=# CREATE WRITABLE EXTERNAL TABLE pxf_tbl_seqfile (location text, month text, number_of_orders integer, total_sales real)
    2. LOCATION ('pxf://data/pxf_examples/pxf_seqfile?PROFILE=hdfs:SequenceFile&DATA-SCHEMA=com.example.pxf.hdfs.writable.dataschema.PxfExample_CustomWritable&COMPRESSION_TYPE=BLOCK&COMPRESSION_CODEC=org.apache.hadoop.io.compress.BZip2Codec')
    3. FORMAT 'CUSTOM' (FORMATTER='pxfwritable_export');
  10. 通过直接插入 pxf_tbl_seqfile 表将一些数据写入 pxf_seqfile HDFS目录中。 例如:

    1. postgres=# INSERT INTO pxf_tbl_seqfile VALUES ( 'Frankfurt', 'Mar', 777, 3956.98 );
    2. postgres=# INSERT INTO pxf_tbl_seqfile VALUES ( 'Cleveland', 'Oct', 3812, 96645.37 );
  11. 回想一下,Greenplum 数据库不支持直接查询一个可写外部表。要读取pxf_seqfile中的数据,请创建一个引用此HDFS目录的可读外部表:

    当您通过 hdfs:SequenceFile 配置文件读取HDFS数据是必须指定 DATA-SCHEMA 自定义选项 。 您无需提供压缩相关的选项。

  12. 查询可读外部表 read_pxf_tbl_seqfile

    1. gpadmin=# SELECT * FROM read_pxf_tbl_seqfile ORDER BY total_sales;
    1. location | month | number_of_orders | total_sales
    2. -----------+-------+------------------+-------------
    3. Frankfurt | Mar | 777 | 3956.98
    4. Cleveland | Oct | 3812 | 96645.4
    5. (2 rows)

当Greenplum数据库外部表引用SequenceFile或其他以键-值对存储行的数据格式时, 您可以通过使用 recordkey 关键字作为字段名称,在Greenplum查询中访问记录键的值。

recordkey 字段类型必须与记录键类型相对应,就像其他字段必须与HDFS数据匹配一样。.

您可以将 recordkey 定义为以下任何Hadoop类型:

  • BooleanWritable
  • ByteWritable
  • DoubleWritable
  • FloatWritable
  • IntWritable
  • LongWritable
  • Text

如果没有为行定义记录键, Greenplum数据库将返回处理该行的segment的id。

示例: 使用记录键

创建一个可读外部表,以访问您在 示例: 将二进制数据写入HDFS 创建的可写外部表 pxf_tbl_seqfile 的记录键。 本例中将 recordkey 定义为 int8 类型。

  1. postgres=# CREATE EXTERNAL TABLE read_pxf_tbl_seqfile_recordkey(recordkey int8, location text, month text, number_of_orders integer, total_sales real)
  2. LOCATION ('pxf://data/pxf_examples/pxf_seqfile?PROFILE=hdfs:SequenceFile&DATA-SCHEMA=com.example.pxf.hdfs.writable.dataschema.PxfExample_CustomWritable')
  3. FORMAT 'CUSTOM' (FORMATTER='pxfwritable_import');
  4. gpadmin=# SELECT * FROM read_pxf_tbl_seqfile_recordkey;
  1. recordkey | location | month | number_of_orders | total_sales
  2. -----------+-------------+-------+------------------+-------------
  3. 2 | Frankfurt | Mar | 777 | 3956.98
  4. (2 rows)

当您将行插入到可写外部表时,您没有定义记录键, 因此 标识为处理该行的segment。