9.HBase and MapReduce

    本章讨论下要在HBase中使用MapReduce需要指定的配置步骤。另外,讨论下其他HBase和MapReduce工作之间的相互作用和问题。 Finally, it discusses Cascading, an alternative API for MapReduce。

    默认情况下,MapReduce job部署在MR集群上,不会与$HBASE_CONF_DIR下的HBase配置或HBase类产生关联。

    要使MapReduce工作获得它们需要的,你要将hbase-site.xml放入$HADOOP_HOME/conf目录,并将HBase jar放入$HADOOP_HOME/lib目录,每个集群都要如此配置。或者你可以编辑$HADOOP_HOME/conf/hadoop-env.sh,并把它加入到HADOOP_CLASSPATH 变量中,但并不推荐你这种方法,因为这会影响你根据Hbase referenc的Hadoop安装。这也需要你重启hadoop集群以使Hadoop能够使用Hbase data。

    建议的方法是使HBase自己增加他的依赖jars并使用HADOOP_CLASSPATH or -libjars

    从0.90.x起,HBase可自动将依赖jars加入到job配置中,只需保证相关jars存在于本地的CLASSPATH。下面的事例运行了绑定的HBase RowCounter MR工作,其中表名叫usertable。如果你没有设置命令中(该命令以$为前缀,并被花括号包围)预期的环境变量,可以使用真实系统路径替代。确保为你的系统使用正确的HBase jar。单引号会使shell执行子命令setting the output of hbase classpath (the command to dump HBase CLASSPATH) to HADOOP_CLASSPATH.This example assumes you use a BASH-compatible shell.

    当这个命令运行时,内部地, HBase jar会去找它需要的依赖并将它们加入到MR工作配置中。See the source at TableMapReduceUtil#addDependencyJars(org.apache.hadoop.mapreduce.Job) for how this is done.

    命令 hbase mapredcp 能帮你转储MR需要的CLASSPATH条目,同样jars TableMapReduceUtil#addDependencyJars会加入。你可以将它们一起放入HBase配置目录 HADOOP_CLASSPATH 中。那些不打包依赖或调用TableMapReduceUtil#addDependencyJars的工作,下面的命令结构式必须的:

    0.96.1至0.98.4的Hbase MapReduce使用者请注意 某些使用Hbase的MR工作可能无法启动。The symptom is an exception similar to the following:

    Exception in thread “main” java.lang.IllegalAccessError: class com.google.protobuf.ZeroCopyLiteralByteString cannot access its superclass com.google.protobuf.LiteralByteString at java.lang.ClassLoader.defineClass1(Native Method) at java.lang.ClassLoader.defineClass(ClassLoader.java:792) at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) at java.net.URLClassLoader.defineClass(URLClassLoader.java:449) at java.net.URLClassLoader.access$100(URLClassLoader.java:71) at java.net.URLClassLoader$1.run(URLClassLoader.java:361) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at org.apache.hadoop.hbase.protobuf.ProtobufUtil.toScan(ProtobufUtil.java:818) at org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.convertScanToString(TableMapReduceUtil.java:433) at org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.initTableMapperJob(TableMapReduceUtil.java:186) at org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.initTableMapperJob(TableMapReduceUtil.java:147) at org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.initTableMapperJob(TableMapReduceUtil.java:270) at org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.initTableMapperJob(TableMapReduceUtil.java:100) …

    这是由HBASE-9867版本引入的优化引起的,即引进了类加载依赖。

    This affects both jobs using the -libjars option and “fat jar,” those which package their runtime dependencies in a nested lib folder.

    要满足新类加载器的要求,hadoop的类路径必须包含hbase-protocol.jar。See 46.HBase, MapReduce, and the CLASSPATH for current recommendations for resolving classpath errors.

    这个可以在系统范围内被解决通过将hbase-protocol.jar的索引添加到Hadoop的lib目录,通过符号链接或者将jar直接拷贝到目录中。

    也可以每次job启动时通过将它包含进HADOOP_CLASSPATH环境变量中解决。当启动job时要打包依赖,下面所有三个job启动命令都可以:

    1. $ HADOOP_CLASSPATH=/path/to/hbase-protocol.jar:/path/to/hbase/conf hadoop jar MyJob.jar MyJobMainClass
    2. $ HADOOP_CLASSPATH=$(hbase mapredcp):/path/to/hbase/conf hadoop jar MyJob.jar MyJobMainClass
    3. $ HADOOP_CLASSPATH=$(hbase classpath) hadoop jar MyJob.jar MyJobMainClass

    For jars that do not package their dependencies, the following command structure is necessary:

    1. $ HADOOP_CLASSPATH=$(hbase mapredcp):/etc/hbase/conf hadoop jar MyApp.jar MyJobMainClass -libjars $(hbase mapredcp | tr ':' ',') ...

    47.MapReduce Scan Caching MR扫描缓存

    TableMapReduceUtil恢复了设置对传入扫描对象的缓存选项(返回客户端结果之前可以缓存的行数量),这个功能曾因为0.95版中的bug而被放弃,这个bug已经在0.98.5和0.96.3中解决了选择扫描缓存的优先级顺序如下:

    1. 扫描对象的缓存设置。
    2. 配置选项 hbase.client.scanner.caching中指定的缓存设置,可以在hbase-site.xml中手动设置也可以通过辅助方法TableMapReduceUtil.setScannerCaching()设置。
    3. HConstants.DEFAULT_HBASE_CLIENT_SCANNER_CACHING的默认值,100.

    上面提到的优先级列表允许您设置一个合理的默认值,并为特定的操作重写它。

    48. Bundled HBase MapReduce Jobs

    The HBase JAR also serves as a Driver for some bundled MapReduce jobs. To learn about the bundled MapReduce jobs, run the following command.

    1. $ ${HADOOP_HOME}/bin/hadoop jar ${HBASE_HOME}/hbase-server-VERSION.jar
    2. An example program must be given as the first argument.
    3. Valid program names are:
    4. copytable: Export a table from local cluster to peer cluster
    5. completebulkload: Complete a bulk data load.
    6. export: Write table data to HDFS.
    7. import: Import data written by Export.
    8. importtsv: Import data in TSV format.
    9. rowcounter: Count rows in HBase table

    Each of the valid program names are bundled MapReduce jobs. To run one of the jobs, model your command after the following example.

    1. $ ${HADOOP_HOME}/bin/hadoop jar ${HBASE_HOME}/hbase-server-VERSION.jar rowcounter myTable

    HBase可以被用作mapreduce的数据源,TableInputFormat 和数据接收器,TableOutputFormat或MultiTableOutputFormat。写Mr作业,读或写HBase,最好是子类TableMapper and/or tableReducer。See the do-nothing pass-through classes IdentityTableMapper and IdentityTableReducer for basic usage. For a more involved example, see RowCounter or review the org.apache.hadoop.hbase.mapreduce.TestTableMapReduce unit test.

    如果运行MR job时使用HBase作为数据源或数据接收器,需要在配置中指定源和结果表及列名。

    当你从HBase中读时, TableInputFormat从HBase中请求regions表并制成map,或者是map-per-region或是mapreduce.job.maps map,无论哪个是更小的。如果job只有两个map,提高mapreduce.job.maps使其大于regions数量。如果你在每个regionServer节点上都运行了TaskTracker/NodeManager,Maps将运行在相邻的TaskTracker/NodeManager上。当向HBase写入时,可以避免Reduce步骤并从map中写回HBase。当任务不需要分类和整理,这种map发出数据的方法会管用。插入时, Hbase ‘sorts’中没有double-sorting(以及对你的MR集群shuffling data),除非你需要这个。如果不需要Reduce,map可能会发出处理的记录数来报告任务结束,或将Reduce数设为0,并使用TableOutputFormat。如果在你的案例中,有必要启动Reduce,通常你应该使用多个reducers,这样负载可以分布到整个HBase集群中。

    HRegionPartitioner,HBase新的分割,可启动与regions同样数量的reducers.如果你的表格很大而上传不会大大改变现有region的数量,HRegionPartitioner很合适。 否则使用默认的分割方法。

    50. 批量导入时直接写入HFiles

    如果你在导入一张新的表格,可以忽略HBase API直接将内容写入文件系统,格式化成HBase数据文件(HFiles.)导入将运行得更快,也许是一个数量级的更快。

    51. RowCounter Example

    包括RowCounter的MR job使用TableInputFormat并记录指定表中行的数量。 To run it, use the following command:

    这将引用HBase MapReduce Driver类。在提供的选择中选择rowcounter。会将rowcounter使用建议打印到标准输出中。指定表名、计数的列以及输出目录。

    在MapReduce job中使用TableInputFormat从HBase表中请求数据时,分离器将为表的每个region生成一个map任务。因此, 如果表中有100个regions这有100个map任务,不管扫描操作中选择了多少列族。

    52.2. 自定义分离器

    For those interested in implementing custom splitters, see the method getSplits in TableInputFormatBase. That is where the logic for map-task assignment resides.

    53. HBase MapReduce Examples

    53.1. HBase MapReduce Read Example

    下面是以只读方式使用HBase作为MR数据源的例子。明确地,这里只有Mapper实例而没有Reducer,不会从Mapper中发出任何数据。

    1. Configuration config = HBaseConfiguration.create();
    2. Job job = new Job(config, "ExampleRead");
    3. job.setJarByClass(MyReadJob.class); // class that contains mapper
    4. Scan scan = new Scan();
    5. scan.setCaching(500); // 1 is the default in Scan, which will be bad for MapReduce jobs
    6. scan.setCacheBlocks(false); // don't set to true for MR jobs
    7. // set other scan attrs
    8. ...
    9. TableMapReduceUtil.initTableMapperJob(
    10. tableName, // input HBase table name
    11. scan, // Scan instance to control CF and attribute selection
    12. MyMapper.class, // mapper
    13. null, // mapper output key
    14. null, // mapper output value
    15. job);
    16. job.setOutputFormatClass(NullOutputFormat.class); // because we aren't emitting anything from mapper
    17. boolean b = job.waitForCompletion(true);
    18. if (!b) {
    19. throw new IOException("error with job!");
    20. }

    …mapper实例将extend TableMapper…

    1. public static class MyMapper extends TableMapper<Text, Text> {
    2. public void map(ImmutableBytesWritable row, Result value, Context context) throws InterruptedException, IOException {
    3. //process data for the row from the Result instance.
    4. }
    5. }

    下面是个使用HBase即作为MR的数据源又作为数据接收端。这个例子只是将数据从一张表拷贝到另一张表。

    1. Configuration config = HBaseConfiguration.create();
    2. Job job = new Job(config, "ExampleReadWrite");
    3. job.setJarByClass(MyReadWriteJob.class); //包含 mapper的类
    4. Scan scan = new Scan();
    5. scan.setCashing(500); //Scan中默认为1,对MR job来说不太好
    6. scan.setCasheBlocks(false); //对MR JOB不要设置为true
    7. //设置其他scan属性
    8. TableMapReduceUtil.intTableMapperJob(
    9. sourceTable, //input table
    10. scan, //Scan instance to control CF and attribute selection
    11. MyMapper.class, //mapper class
    12. null, //mapper output key
    13. null, //mapper output value
    14. job);
    15. TableMapReduceUtil.initTableReducerJob(
    16. null, //reducer class
    17. job);
    18. job.setNumReduceTasks(0);
    19. boolean b = job.waitForCompletion(true);
    20. if(!b){
    21. throw new IOException("error with job!");
    22. }

    这里要解释下TableMapReduceUtil的工作内容,特别是对于reducer.TableOutputFormat,被用作outputFormat类,许多参数被在config上设置(如,TableOutputFormat.OUTPUT_TABLE ),同样设置reducer输出值为ImmutableBytesWritable ,reducer值为Writable.这些可以被programmer设置在job和conf上,但是TableMapReduceUtil试着使事情简单。

    下面的mapper例子,将创建一个put操作,并匹配输入结果并发送它。注意这是CopyTable utility所完成的。

    1. public static class MyMapper extends TableMapper<ImmutableBytesWritable, Put> {
    2. public void map(ImmutableBytesWritable row, Result value, Context context) throws IOException, InterruptedException {
    3. // this example is just copying the data from the source table...
    4. context.write(row, resultToPut(row,value));
    5. }
    6. private static Put resultToPut(ImmutableBytesWritable key, Result result) throws IOException {
    7. Put put = new Put(key.get());
    8. for (KeyValue kv : result.raw()) {
    9. put.add(kv);
    10. }
    11. return put;
    12. }
    13. }

    这仅仅是个例子,开发者可以选择不使用TableOutputFormat而自己连接目标表格。

    53.3. HBase MapReduce Read/Write Example With Multi-Table Output

    TODO: example for MultiTableOutputFormat.

    53.4. HBase MapReduce Summary to HBase Example

    下面的例子是使用HBase作为MR的数据源和数据接收端来完成一个总结步骤,这个例子将统计表中某个值的不同实例的数目。并将那些计数放入其他表中。​

    1. Configuration config = HBaseConfiguration.create();
    2. Job job = new Job(config, "ExampleReadWrite");
    3. job.setJarByClass(MyReadWriteJob.class); //包含 mapper的类
    4. Scan scan = new Scan();
    5. scan.setCashing(500); //Scan中默认为1,对MR job来说不太好
    6. scan.setCasheBlocks(false); //对MR JOB不要设置为true
    7. //设置其他scan属性
    8. TableMapReduceUtil.initTableMapperJob(
    9. sourceTable, // input table
    10. scan, // Scan instance to control CF and attribute selection
    11. MyMapper.class, // mapper class
    12. Text.class, // mapper output key
    13. IntWritable.class, // mapper output value
    14. job);
    15. TableMapReduceUtil.initTableReducerJob(
    16. targetTable, // output table
    17. MyTableReducer.class, // reducer class
    18. job);
    19. job.setNumReduceTasks(1); // at least one, adjust as required
    20. boolean b = job.waitForCompletion(true);
    21. if(!b){
    22. throw new IOException("error with job");
    23. }

    在这个例子中将字符串值映射为一列被选为总结值。This value is used as the key to emit from the mapper,and an IntWritable represents an instance counter.

    在Reducer程序中, “ones”被累计(就像其他MR例子中那样),然后发出一个Put。

    1. public static class MyTableReducer extends TableReducer<Text, IntWritable, ImmutableBytesWritable> {
    2. public static final byte[] CF = "cf".getBytes();
    3. public static final byte[] COUNT = "count".getBytes();
    4. public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
    5. int i = 0;
    6. i += val.get();
    7. }
    8. Put put = new Put(Bytes.toBytes(key.toString()));
    9. put.add(CF, COUNT, Bytes.toBytes(i));
    10. context.write(null, put);
    11. }

    这个和上面总结的例子很像,不同的是这里使用Hbase作为MR数据源,而作为HDFS作为接收端。不同之处在任务设定和reducer中,mapper还是一样。

    1. Configuration config = HBaseConfiguration.create();
    2. Job job = new Job(config,"ExampleSummaryToFile");
    3. job.setJarByClass(MySummaryFileJob.class); // class that contains mapper and reducer
    4. Scan scan = new Scan();
    5. scan.setCaching(500); // 1 is the default in Scan, which will be bad for MapReduce jobs
    6. scan.setCacheBlocks(false); // don't set to true for MR jobs
    7. // set other scan attrs
    8. TableMapReduceUtil.initTableMapperJob(
    9. sourceTable, // input table
    10. scan, // Scan instance to control CF and attribute selection
    11. MyMapper.class, // mapper class
    12. Text.class, // mapper output key
    13. IntWritable.class, // mapper output value
    14. job);
    15. job.setReducerClass(MyReducer.class); // reducer class
    16. job.setNumReduceTasks(1); // at least one, adjust as required
    17. FileOutputFormat.setOutputPath(job, new Path("/tmp/mr/mySummaryFile")); // adjust directories as required
    18. boolean b = job.waitForCompletion(true);
    19. if (!b) {
    20. throw new IOException("error with job!");
    21. }

    如上所述, 这个例子的Mapper和之前的一样。对于Reducer,使用”generic“代替继承TableMapper并发出puts

    1. public static class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
    2. public void reduce(Text key, Iteralbe<IntWritable> values, Context context) throw IOException, InterruptedException{
    3. int i = 0;
    4. for (IntWritable val: values){
    5. i += val.get();
    6. }
    7. context.write(key, new IntWritable(i));
    8. }
    9. }

    53.6 HBase MapReduce Summary to HBase Without Reducer

    没有reducer也可以进行总结,如果使用Hbase作为reducer。

    一个HBase目标表有必要为job总结而存在。Table方法incrementColumnValue将使用自增值。从性能角度,每个map任务保持值的Map与它的值一起增加很有意义,当mapper的cleanup方法时,每个key更新一次。然而, 你的mileage可能会有所不同,这取决于要处理的行数和唯一键。

    最后,总结结果在Hbase中。

    53.7 HBase MapReduce Summary to RDBMS

    有时,产生总结放入RDBMS中更合适。对这些例子来说,自定义reducer产生总结直接到RDBMS中是可能的。setup方法可以连接到RDBMS上,cleanup方法可以断开这个连接。

    理解job的reducer数量将影响总结的实现,你必须将这个设计到你的reducer中。特别地,不管是以singleton来运行还是multiple来运行reducer。正确或错误,根据用例。越多的reducer被布置到job中,越多的RDBMS同步连接会被建立,this will scale, but only to a point.(?)

    1. public static class MyRdbmsReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
    2. private Connection c = null;
    3. public void setup(Context context) {
    4. //创建 DB连接。。。
    5. }
    6. public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
    7. //do summarisation
    8. // 本例中Text是键, 但这只是个例子
    9. }
    10. public void cleanup(Context context){
    11. //关闭db连接
    12. }
    13. }

    In the end, the summary results are written to your RDBMS table/s.

    54. 在一个MR 任务中连接其他 Hbase 表

    尽管当前框架内允许一个HBase表作为MR任务的输入,其他HBase表可作为查找表访问,在一个MR job中在Mapper的setup方法中创建一个表的实例。

    1. public class MyMapper extends TableMapper<Text,LongWritable> {
    2. public Talbe myOtherTable;
    3. public void setup(Context context) {
    4. //这里创建连接到集群上并保存下来或使用已存在表的连接
    5. myOtherTable = connection.getTable("myOtherTable");
    6. }
    7. public void map(ImmutableBytesWritable row, Result value, Context context) throws IOException,InterruptedException {
    8. //处理结果
    9. //使用 ‘myOtherTable’查询

    通常建议使用HBase作为数据源的MR jobs关闭预测执行。这个可以通过配置在每个job基础上实现, 或者集群上实现。特别是对那些长时间运行的工作,预测执行将创建重复的map任务,这将你的数据写两遍到HBase中,这可能不是你想要的。

    See spec.ex for more information.

    56.Cascading 级联

    级联是MR的可替换API,事实上还是使用MapReduce,但允许你以一种简单的方法写MapReduce代码。