阿里云-云小站(无限量代金券发放中)
【腾讯云】云服务器、云数据库、COS、CDN、短信等热卖云产品特惠抢购

Ubuntu 14.04 LTS下HBase开发实例学习

218次阅读
没有评论

共计 37272 个字符,预计需要花费 94 分钟才能阅读完成。

1 开发环境

在进行 Hbase 开发前,需要安装 JDK、Hadoop 和 HBase,选择一款合适的开发 IDE,具体安装方法就不介绍了,网上有很多参考资料,这里给出我的开发环境:

操作系统:Ubuntu 14.04 LTS

Java 版本:jdk1.7.0_79

Hadoop 版本:hadoop-2.6.0-cdh5.7.1

HBase 版本:hbase-1.2.0-cdh5.7.1

Ecipse 版本:Eclipse Java EE LunaRelease

Hadoop+HBase 搭建云存储总结 PDF http://www.linuxidc.com/Linux/2013-05/83844.htm

Ubuntu Server 14.04 下 Hbase 数据库安装  http://www.linuxidc.com/Linux/2016-05/131499.htm

HBase 结点之间时间不一致造成 regionserver 启动失败 http://www.linuxidc.com/Linux/2013-06/86655.htm

Hadoop+ZooKeeper+HBase 集群配置 http://www.linuxidc.com/Linux/2013-06/86347.htm

Hadoop 集群安装 &HBase 实验环境搭建 http://www.linuxidc.com/Linux/2013-04/83560.htm

基于 Hadoop 集群的 HBase 集群的配置 http://www.linuxidc.com/Linux/2013-03/80815.htm‘

Hadoop 安装部署笔记之 -HBase 完全分布模式安装 http://www.linuxidc.com/Linux/2012-12/76947.htm

单机版搭建 HBase 环境图文教程详解 http://www.linuxidc.com/Linux/2012-10/72959.htm

使用 Maven 构建项目,在 pom.xml 中添加 hbase 的依赖如下:

 

<repositories>
        <repository>
          <id>cloudera</id>
          <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
        </repository>
    </repositories>

    <dependencies>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>3.8.1</version>
            <scope>test</scope>
        </dependency>
        <dependency>  
            <groupId>org.apache.hadoop</groupId>  
            <artifactId>hadoop-common</artifactId>  
            <version>2.6.0-cdh5.7.1</version>  
        </dependency>  
        <dependency>  
            <groupId>org.apache.hadoop</groupId>  
            <artifactId>hadoop-hdfs</artifactId>  
            <version>2.6.0-cdh5.7.1</version>  
        </dependency>
        <dependency>  
            <groupId>org.apache.hbase</groupId>  
            <artifactId>hbase-client</artifactId>  
            <version>1.2.0-cdh5.7.1</version>  
        </dependency>
		<dependency>  
            <groupId>org.apache.hbase</groupId>  
            <artifactId>hbase-server</artifactId>  
            <version>1.2.0-cdh5.7.1</version>  
        </dependency>
    </dependencies>

 

2 初始化配置

 

首先需要设置 HBase 的配置,如 ZooKeeper 的地址、端口号等等。可以通过 org.apache.hadoop.conf.Configuration.set 方法手工设置 HBase 的配置信息,也可以直接将 HBase 的 hbase-site.xml 配置文件引入项目即可。下面给出配置代码:

// 声明静态配置
    private static Configuration conf = null;
    static {conf = HBaseConfiguration.create();
        conf.set("hbase.zookeeper.quorum", "localhost");
        conf.set("hbase.zookeeper.property.clientPort", "2181");
    }

3 常见 API 的使用

HBase 的常用操作包括建表、插入表数据、删除表数据、获取一行数据、表扫描、删除列族、删除表等等,下面给出具体代码。

3.1 创建数据库表

// 创建数据库表
    public static void createTable(String tableName, String[] columnFamilys) throws IOException {
        // 建立一个数据库的连接
        Connection conn = ConnectionFactory.createConnection(conf);
        // 创建一个数据库管理员
        HBaseAdmin hAdmin = (HBaseAdmin) conn.getAdmin();
        if (hAdmin.tableExists(tableName)) {System.out.println(tableName + "表已存在");
            conn.close();
            System.exit(0);
        } else {
            // 新建一个表描述
            HTableDescriptor tableDesc = new HTableDescriptor(TableName.valueOf(tableName));
            // 在表描述里添加列族
            for (String columnFamily : columnFamilys) {tableDesc.addFamily(new HColumnDescriptor(columnFamily));
            }
            // 根据配置好的表描述建表
            hAdmin.createTable(tableDesc);
            System.out.println("创建" + tableName + "表成功");
        }
        conn.close();}

3.2 添加一条数据

// 添加一条数据
    public static void addRow(String tableName, String rowKey, String columnFamily, String column, String value) 
            throws IOException {
        // 建立一个数据库的连接
        Connection conn = ConnectionFactory.createConnection(conf);
        // 获取表
        HTable table = (HTable) conn.getTable(TableName.valueOf(tableName));
        // 通过 rowkey 创建一个 put 对象
        Put put = new Put(Bytes.toBytes(rowKey));
        // 在 put 对象中设置列族、列、值
        put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(column), Bytes.toBytes(value));
        // 插入数据, 可通过 put(List<Put>) 批量插入
        table.put(put);
        // 关闭资源
        table.close();
        conn.close();}

3.3 获取一条数据

// 通过 rowkey 获取一条数据
    public static void getRow(String tableName, String rowKey) throws IOException {
        // 建立一个数据库的连接
        Connection conn = ConnectionFactory.createConnection(conf);
        // 获取表
        HTable table = (HTable) conn.getTable(TableName.valueOf(tableName));
        // 通过 rowkey 创建一个 get 对象
        Get get = new Get(Bytes.toBytes(rowKey));
        // 输出结果
        Result result = table.get(get);
        for (Cell cell : result.rawCells()) {
            System.out.println("行键:" + new String(CellUtil.cloneRow(cell)) + "\t" +
                    "列族:" + new String(CellUtil.cloneFamily(cell)) + "\t" + 
                    "列名:" + new String(CellUtil.cloneQualifier(cell)) + "\t" + 
                    "值:" + new String(CellUtil.cloneValue(cell)) + "\t" +
                    "时间戳:" + cell.getTimestamp());
        }
        // 关闭资源
        table.close();
        conn.close();}

3.4 全表扫描

// 全表扫描
    public static void scanTable(String tableName) throws IOException {
        // 建立一个数据库的连接
        Connection conn = ConnectionFactory.createConnection(conf);
        // 获取表
        HTable table = (HTable) conn.getTable(TableName.valueOf(tableName));
        // 创建一个扫描对象
        Scan scan = new Scan();
        // 扫描全表输出结果
        ResultScanner results = table.getScanner(scan);
        for (Result result : results) {for (Cell cell : result.rawCells()) {
                System.out.println("行键:" + new String(CellUtil.cloneRow(cell)) + "\t" +
                        "列族:" + new String(CellUtil.cloneFamily(cell)) + "\t" + 
                        "列名:" + new String(CellUtil.cloneQualifier(cell)) + "\t" + 
                        "值:" + new String(CellUtil.cloneValue(cell)) + "\t" +
                        "时间戳:" + cell.getTimestamp());
            }
        }
        // 关闭资源
		results.close();
        table.close();
        conn.close();}

3.5 删除一条数据

// 删除一条数据
    public static void delRow(String tableName, String rowKey) throws IOException {
        // 建立一个数据库的连接
        Connection conn = ConnectionFactory.createConnection(conf);
        // 获取表
        HTable table = (HTable) conn.getTable(TableName.valueOf(tableName));
        // 删除数据
        Delete delete = new Delete(Bytes.toBytes(rowKey));
        table.delete(delete);
        // 关闭资源
        table.close();
        conn.close();}

3.6 删除多条数据

// 删除多条数据
    public static void delRows(String tableName, String[] rows) throws IOException {
        // 建立一个数据库的连接
        Connection conn = ConnectionFactory.createConnection(conf);
        // 获取表
        HTable table = (HTable) conn.getTable(TableName.valueOf(tableName));
        // 删除多条数据
        List<Delete> list = new ArrayList<Delete>();
        for (String row : rows) {Delete delete = new Delete(Bytes.toBytes(row));
            list.add(delete);
        }
        table.delete(list);
        // 关闭资源
        table.close();
        conn.close();}

3.7 删除列族

// 删除列族
    public static void delColumnFamily(String tableName, String columnFamily) throws IOException {
        // 建立一个数据库的连接
        Connection conn = ConnectionFactory.createConnection(conf);
        // 创建一个数据库管理员
        HBaseAdmin hAdmin = (HBaseAdmin) conn.getAdmin();
        // 删除一个表的指定列族
        hAdmin.deleteColumn(tableName, columnFamily);
        // 关闭资源
        conn.close();}

3.8 删除数据库表

// 删除数据库表
    public static void deleteTable(String tableName) throws IOException {
        // 建立一个数据库的连接
        Connection conn = ConnectionFactory.createConnection(conf);
        // 创建一个数据库管理员
        HBaseAdmin hAdmin = (HBaseAdmin) conn.getAdmin();
        if (hAdmin.tableExists(tableName)) {
            // 失效表
            hAdmin.disableTable(tableName);
            // 删除表
            hAdmin.deleteTable(tableName);
            System.out.println("删除" + tableName + "表成功");
            conn.close();} else {System.out.println("需要删除的" + tableName + "表不存在");
            conn.close();
            System.exit(0);
        }
    }

3.9 追加插入

// 追加插入 (将原有 value 的后面追加新的 value,如原有 value= a 追加 value=bc 则最后的 value=abc)
    public static void appendData(String tableName, String rowKey, String columnFamily, String column, String value) 
            throws IOException {
        // 建立一个数据库的连接
        Connection conn = ConnectionFactory.createConnection(conf);
        // 获取表
        HTable table = (HTable) conn.getTable(TableName.valueOf(tableName));
        // 通过 rowkey 创建一个 append 对象
        Append append = new Append(Bytes.toBytes(rowKey));
        // 在 append 对象中设置列族、列、值
        append.add(Bytes.toBytes(columnFamily), Bytes.toBytes(column), Bytes.toBytes(value));
        // 追加数据
        table.append(append);
        // 关闭资源
        table.close();
        conn.close();}

3.10 符合条件后添加数据

// 符合条件后添加数据 (只能针对某一个 rowkey 进行原子操作)
    public static boolean checkAndPut(String tableName, String rowKey, String columnFamilyCheck, String columnCheck, String valueCheck, String columnFamily, String column, String value) throws IOException {
        // 建立一个数据库的连接
        Connection conn = ConnectionFactory.createConnection(conf);
        // 获取表
        HTable table = (HTable) conn.getTable(TableName.valueOf(tableName));
        // 设置需要添加的数据
        Put put = new Put(Bytes.toBytes(rowKey));
        put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(column), Bytes.toBytes(value));
        // 当判断条件为真时添加数据
        boolean result = table.checkAndPut(Bytes.toBytes(rowKey), Bytes.toBytes(columnFamilyCheck), 
                Bytes.toBytes(columnCheck), Bytes.toBytes(valueCheck), put);
        // 关闭资源
        table.close();
        conn.close();
        
        return result;
    }

3.11 符合条件后删除数据

// 符合条件后刪除数据 (只能针对某一个 rowkey 进行原子操作)
    public static boolean checkAndDelete(String tableName, String rowKey, String columnFamilyCheck, String columnCheck, 
            String valueCheck, String columnFamily, String column) throws IOException {
        // 建立一个数据库的连接
        Connection conn = ConnectionFactory.createConnection(conf);
        // 获取表
        HTable table = (HTable) conn.getTable(TableName.valueOf(tableName));
        // 设置需要刪除的 delete 对象
        Delete delete = new Delete(Bytes.toBytes(rowKey));
        delete.addColumn(Bytes.toBytes(columnFamilyCheck), Bytes.toBytes(columnCheck));
        // 当判断条件为真时添加数据
        boolean result = table.checkAndDelete(Bytes.toBytes(rowKey), Bytes.toBytes(columnFamilyCheck), Bytes.toBytes(columnCheck), 
                Bytes.toBytes(valueCheck), delete);
        // 关闭资源
        table.close();
        conn.close();

        return result;
    }

3.12 计数器

// 计数器 (amount 为正数则计数器加,为负数则计数器减,为 0 则获取当前计数器的值)
    public static long incrementColumnValue(String tableName, String rowKey, String columnFamily, String column, long amount) 
            throws IOException {
        // 建立一个数据库的连接
        Connection conn = ConnectionFactory.createConnection(conf);
        // 获取表
        HTable table = (HTable) conn.getTable(TableName.valueOf(tableName));
        // 计数器
        long result = table.incrementColumnValue(Bytes.toBytes(rowKey), Bytes.toBytes(columnFamily), Bytes.toBytes(column), amount);
        // 关闭资源
        table.close();
        conn.close();
        
        return result;
    }

4 内置过滤器的使用

HBase 为筛选数据提供了一组过滤器,通过这个过滤器可以在 HBase 中数据的多个维度(行、列、数据版本)上进行对数据的筛选操作,也就是说过滤器最终能够筛选的数据能够细化到具体的一个存储单元格上(由行键、列名、时间戳定位)。通常来说,通过行键、值来筛选数据的应用场景较多。需要说明的是,过滤器会极大地影响查询效率。所以,在数据量较大的数据表中,应尽量避免使用过滤器。

下面介绍一些常用的 HBase 内置过滤器的用法:

1、RowFilter:筛选出匹配的所有的行。使用 BinaryComparator 可以筛选出具有某个行键的行,或者通过改变比较运算符(下面的例子中是 CompareFilter.CompareOp.EQUAL)来筛选出符合某一条件的多条数据,如下示例就是筛选出行键为 row1 的一行数据。

// 筛选出匹配的所有的行
Filter rf = new RowFilter(CompareFilter.CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes("row1")));

2、PrefixFilter:筛选出具有特定前缀的行键的数据。这个过滤器所实现的功能其实也可以由 RowFilter 结合 RegexComparator 来实现,不过这里提供了一种简便的使用方法,如下示例就是筛选出行键以 row 为前缀的所有的行。

// 筛选匹配行键的前缀成功的行
Filter pf = new PrefixFilter(Bytes.toBytes("row"));

3、KeyOnlyFilter:这个过滤器唯一的功能就是只返回每行的行键,值全部为空,这对于只关注于行键的应用场景来说非常合适,这样忽略掉其值就可以减少传递到客户端的数据量,能起到一定的优化作用。

 

// 返回所有的行键,但值全是空
Filter kof = new KeyOnlyFilter();

 

4、RandomRowFilter:按照一定的几率(<= 0 会过滤掉所有的行,>= 1 会包含所有的行)来返回随机的结果集,对于同样的数据集,多次使用同一个 RandomRowFilter 会返回不同的结果集,对于需要随机抽取一部分数据的应用场景,可以使用此过滤器。

// 随机选出一部分的行
Filter rrf = new RandomRowFilter((float) 0.8);

5、InclusiveStopFilter:扫描的时候,我们可以设置一个开始行键和一个终止行键,默认情况下,这个行键的返回是前闭后开区间,即包含起始行,但不包含终止行。如果我们想要同时包含起始行和终止行,那么可以使用此过滤器。

// 包含了扫描的上限在结果之内
Filter isf = new InclusiveStopFilter(Bytes.toBytes("row1"));

6、FirstKeyOnlyFilter:如果想要返回的结果集中只包含第一列的数据,那么这个过滤器能够满足要求。它在找到每行的第一列之后会停止扫描,从而使扫描的性能也得到了一定的提升。

// 筛选出每行的第一个单元格
Filter fkof = new FirstKeyOnlyFilter();

7、ColumnPrefixFilter:它按照列名的前缀来筛选单元格,如果我们想要对返回的列的前缀加以限制的话,可以使用这个过滤器。

// 筛选出前缀匹配的列
Filter cpf = new ColumnPrefixFilter(Bytes.toBytes("qual1"));

8、ValueFilter:按照具体的值来筛选单元格的过滤器,这会把一行中值不能满足的单元格过滤掉,如下面的构造器,对于每一行的一个列,如果其对应的值不包含 ROW2_QUAL1,那么这个列就不会返回给客户端。

// 筛选某个(值的条件满足的)特定的单元格
Filter vf = new ValueFilter(CompareFilter.CompareOp.EQUAL, new SubstringComparator("ROW2_QUAL1"));

9、ColumnCountGetFilter:这个过滤器在遇到一行的列数超过我们所设置的限制值的时候,结束扫描操作。

// 如果突然发现一行中的列数超过设定的最大值时,整个扫描操作会停止
Filter ccf = new ColumnCountGetFilter(2);

10、SingleColumnValueFilter:用一列的值决定这一行的数据是否被过滤,可对它的对象调用 setFilterIfMissing 方法,默认的参数是 false。其作用是,对于咱们要使用作为条件的列,如果参数为 true,这样的行将会被过滤掉,如果参数为 false,这样的行会包含在结果集中。

// 将满足条件的列所在的行过滤掉
SingleColumnValueFilter scvf = new SingleColumnValueFilter(•          Bytes.toBytes("colfam1"),   
•          Bytes.toBytes("qual2"),   
•          CompareFilter.CompareOp.NOT_EQUAL,   
•          new SubstringComparator("BOGUS"));  
scvf.setFilterIfMissing(true);

11、SingleColumnValueExcludeFilter:这个过滤器与第 10 种过滤器唯一的区别就是,作为筛选条件的列,其行不会包含在返回的结果中。

12、SkipFilter:这是一种附加过滤器,其与 ValueFilter 结合使用,如果发现一行中的某一列不符合条件,那么整行就会被过滤掉。

// 发现某一行中的一列需要过滤时,整个行就会被过滤掉
Filter skf = new SkipFilter(vf);

13、WhileMatchFilter:使用这个过滤器,当遇到不符合设定条件的数据的时候,整个扫描结束。

// 当遇到不符合过滤器 rf 设置的条件时,整个扫描结束
Filter wmf = new WhileMatchFilter(rf);

14. FilterList:可以用于综合使用多个过滤器。其有两种关系:Operator.MUST_PASS_ONE 表示关系 AND,Operator.MUST_PASS_ALL 表示关系 OR,并且 FilterList 可以嵌套使用,使得我们能够表达更多的需求。

// 综合使用多个过滤器,AND 和 OR 两种关系
List<Filter> filters = new ArrayList<Filter>();  
filters.add(rf);  
filters.add(vf);  
FilterList fl = new FilterList(FilterList.Operator.MUST_PASS_ALL,filters);

下面给出一个使用 RowFilter 过滤器的完整示例:

public class HBaseFilter {
    
    private static final String TABLE_NAME = "table1";

    public static void main(String[] args) throws IOException {
        // 设置配置
        Configuration conf = HBaseConfiguration.create();
        conf.set("hbase.zookeeper.quorum", "localhost");
        conf.set("hbase.zookeeper.property.clientPort", "2181");
        // 建立一个数据库的连接
        Connection conn = ConnectionFactory.createConnection(conf);
        // 获取表
        HTable table = (HTable) conn.getTable(TableName.valueOf(TABLE_NAME));
        // 创建一个扫描对象
        Scan scan = new Scan();
        // 创建一个 RowFilter 过滤器
        Filter filter = new RowFilter(CompareFilter.CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes("abc")));
        // 将过滤器加入扫描对象
        scan.setFilter(filter);
        // 输出结果
        ResultScanner results = table.getScanner(scan);
        for (Result result : results) {for (Cell cell : result.rawCells()) {
                System.out.println("行键:" + new String(CellUtil.cloneRow(cell)) + "\t" +
                        "列族:" + new String(CellUtil.cloneFamily(cell)) + "\t" + 
                        "列名:" + new String(CellUtil.cloneQualifier(cell)) + "\t" + 
                        "值:" + new String(CellUtil.cloneValue(cell)) + "\t" +
                        "时间戳:" + cell.getTimestamp());
            }
        }
        // 关闭资源
        results.close();
        table.close();
        conn.close();}

}

5 HBase 与 MapReduce

我们知道,在伪分布式模式和完全分布式模式下的 HBase 是架构在 HDFS 之上的,因此完全可以将 MapReduce 编程框架和 HBase 结合起来使用。也就是说,将 HBase 作为底层存储结构,MapReduce 调用 HBase 进行特殊的处理,这样能够充分结合 HBase 分布式大型数据库和 MapReduce 并行计算的优点。

HBase 实现了 TableInputFormatBase 类,该类提供了对表数据的大部分操作,其子类 TableInputFormat 则提供了完整的实现,用于处理表数据并生成键值对。TableInputFormat 类将数据表按照 Region 分割成 split,即有多少个 Regions 就有多个 splits,然后将 Region 按行键分成 <key,value> 对,key 值对应与行键,value 值为该行所包含的数据。

HBase 实现了 MapReduce 计算框架对应的 TableMapper 类和 TableReducer 类。其中,TableMapper 类并没有具体的功能,只是将输入的 <key,value> 对的类型分别限定为 Result 和 ImmutableBytesWritable。IdentityTableMapper 类和 IdentityTableReducer 类则是上述两个类的具体实现,其和 Mapper 类和 Reducer 类一样,只是简单地将 <key,value> 对输出到下一个阶段。

HBase 实现的 TableOutputFormat 将输出的 <key,value> 对写到指定的 HBase 表中,该类不会对 WAL(Write-Ahead Log)进行操作,即如果服务器发生故障将面临丢失数据的风险。可以使用 MultipleTableOutputFormat 类解决这个问题,该类可以对是否写入 WAL 进行设置。

为了能使 Hadoop 集群上运行 HBase 程序,还需要把相关的类文件引入 Hadoop 集群上,不然会出现 ClassNotFoundException 错误。其具体方法是可在 hadoop 的环境配置文件 hadoop-env.sh 中引入 HBASE_HOME 和 HBase 的相关 jar 包,或者直接将 HBase 的 jar 包打包到应用程序文件中。

下面这个例子是将 MapReduce 和 HBase 结合起来的 WordCount 程序,它首先从指定文件中搜集数据,进行统计计算,最后将结果存储到 HBase 中:

 

package com.hbase.demo;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;

public class HBaseWordCount {
    
    public static class hBaseMapper extends Mapper<LongWritable, Text, Text, IntWritable> {private final static IntWritable ONE = new IntWritable(1);
        private Text word = new Text();

        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String[] words = value.toString().split(" ");
            for (String w : words) {word.set(w);
                context.write(word, ONE);
            }
        }
    }
    
    public static class hBaseReducer extends TableReducer<Text, IntWritable, NullWritable> {

        @Override
        protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
            int sum = 0;
            for (IntWritable value : values) {sum += value.get();
            }
            
            // Put 实例化,每个词存一行
            Put put = new Put(key.getBytes());
            // 列族为 content, 列名为 count, 列值为单词的数目
            put.addColumn("content".getBytes(), "count".getBytes(), String.valueOf(sum).getBytes());
            
            context.write(NullWritable.get(), put);
        }
        
    }
    
    // 创建 HBase 数据表
    public static void createHBaseTable(String tableName) throws IOException {
        // 配置 HBse
        Configuration conf = HBaseConfiguration.create();
        conf.set("hbase.zookeeper.quorum", "localhost");
        conf.set("hbase.zookeeper.property.clientPort", "2181");
        // 建立一个数据库的连接
        Connection conn = ConnectionFactory.createConnection(conf);
        // 创建一个数据库管理员
        HBaseAdmin hAdmin = (HBaseAdmin) conn.getAdmin();
        // 判断表是否存在
        if (hAdmin.tableExists(tableName)) {System.out.println("该数据表已存在,正在重新创建");
            hAdmin.disableTable(tableName);
            hAdmin.deleteTable(tableName);
        }
        // 创建表描述
        HTableDescriptor tableDesc = new HTableDescriptor(TableName.valueOf(tableName));
        // 在表描述里添加列族
        tableDesc.addFamily(new HColumnDescriptor("content"));
        // 创建表
        hAdmin.createTable(tableDesc);
        System.out.println("创建" + tableName + "表成功");
    }
    
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {if (args.length != 3) {System.out.println("args error");
            System.exit(0);
        }
        
        String input = args[0];
        String jobName = args[1];
        String tableName = args[2];
        
        // 创建数据表
        HBaseWordCount.createHBaseTable(tableName);
        
        // 配置 MapReduce(或者将 hadoop 和 hbase 的相关配置文件引入项目)
        Configuration conf = new Configuration();
        conf.set("fs.defaultFS", "localhost:9000");
       conf.set("mapred.job.tracker", "localhost:9001");
        conf.set("hbase.zookeeper.quorum", "localhost");
        conf.set("hbase.zookeeper.property.clientPort", "2181");
        conf.set(TableOutputFormat.OUTPUT_TABLE, tableName);
        
        // 配置任务
        Job job = Job.getInstance(conf, jobName);
        job.setJarByClass(HBaseWordCount.class);
        job.setMapperClass(hBaseMapper.class);
        job.setReducerClass(hBaseReducer.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TableOutputFormat.class);
        FileInputFormat.addInputPath(job, new Path(input));
        
        // 执行 MR 任务
        boolean result = job.waitForCompletion(true);
        System.exit(result ? 0 : 1);
    }

}

更多详情见请继续阅读下一页的精彩内容 :http://www.linuxidc.com/Linux/2016-08/134181p2.htm

6 HBase 的 Bulkload

HBase 可以让我们随机的、实时的访问大数据,但是怎样有效的将数据导入到 HBase 呢?HBase 有多种导入数据的方法,最直接的方法就是在 MapReduce 作业中使用 TableOutputFormat 作为输出,或者使用标准的客户端 API,但是这些都不是非常有效的方法。

如果 HDFS 中有海量数据要导入 HBase,可以先将这些数据生成 HFile 文件,然后批量导入 HBase 的数据表中,这样可以极大地提升数据导入 HBase 的效率。这就是 HBase 的 Bulkload,即利用 MapReduce 作业输出 HBase 内部数据格式的表数据,然后将生成的 StoreFiles 直接导入到集群中。与使用 HBase API 相比,使用 Bulkload 导入数据占用更少的 CPU 和网络资源。两个表之间的数据迁移也可以使用这种方法。下面给出具体示例:

 

package com.hbase.demo;

import java.io.IOException;

import org.apache.Hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
import org.apache.hadoop.hbase.mapreduce.PutSortReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;


public class HBaseBulk {
    
    public static class bulkMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> {

        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            // 将输入数据用 tab 键分词
            String[] values = value.toString().split("\t");
            if (values.length == 2) {
                // 设置行键、列族、列名和值
                byte[] rowKey = Bytes.toBytes(values[0]);
                byte[] family = Bytes.toBytes("content");
                byte[] column = Bytes.toBytes("number");
                byte[] colValue = Bytes.toBytes(values[1]);
                // 将行键序列化作为 mapper 输出的 key
                ImmutableBytesWritable rowKeyWritable = new ImmutableBytesWritable(rowKey);
                // 将 put 对象作为 mapper 输出的 value
                Put put = new Put(rowKey);
                put.addColumn(family, column, colValue);
                context.write(rowKeyWritable, put);
            }
        }
    }
  
    @SuppressWarnings("deprecation")
    public static void main(String[] args) throws Exception {if (args.length != 3) {System.out.println("args error");
            System.exit(0);
        }
        
        String input = args[0];
        String output = args[1];
        String jobName = args[2];
        String tableName = args[3];
        
        // 配置 MapReduce(或者将 hadoop 的相关配置文件引入项目)
        Configuration hadoopConf = new Configuration();
        hadoopConf.set("fs.defaultFS", "localhost:9000");
        hadoopConf.set("mapred.job.tracker", "localhost:9001");
        Job job = Job.getInstance(hadoopConf, jobName);
        job.setJarByClass(HBaseBulk.class);
        job.setMapperClass(bulkMapper.class);
        job.setReducerClass(PutSortReducer.class);
        job.setMapOutputKeyClass(ImmutableBytesWritable.class);
        job.setMapOutputValueClass(Put.class);
        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(HFileOutputFormat2.class);
        FileInputFormat.addInputPath(job, new Path(input));
        FileOutputFormat.setOutputPath(job, new Path(output));
        
        // 配置 HBase(或者将 hbase 的相关配置文件引入项目)
        Configuration hbaseConf = HBaseConfiguration.create();
        hbaseConf.set("hbase.zookeeper.quorum", "localhost");
        hbaseConf.set("hbase.zookeeper.property.clientPort", "2181");
        
        // 生成 HFile
        Connection conn = ConnectionFactory.createConnection(hbaseConf);
        HTable table = (HTable) conn.getTable(TableName.valueOf(tableName));
        HFileOutputFormat2.configureIncrementalLoad(job, table);
        
        // 执行任务
        job.waitForCompletion(true);
        
        // 将 HFile 文件导入 HBase
        LoadIncrementalHFiles loader = new LoadIncrementalHFiles(hbaseConf);
        loader.doBulkLoad(new Path(output), table);
    }
}

上述代码首先将 HDFS 中的数据文件通过 MapReduce 任务生成 HFile 文件,然后将 HFile 文件导入 HBase 数据表(该数据表已存在)。HDFS 中的数据文件和导入 HBase 后的数据表分别如下图所示:

 

Ubuntu 14.04 LTS 下 HBase 开发实例学习

 

Ubuntu 14.04 LTS 下 HBase 开发实例学习

HBase 的详细介绍 :请点这里
HBase 的下载地址 :请点这里

本文永久更新链接地址 :http://www.linuxidc.com/Linux/2016-08/134181.htm

1 开发环境

在进行 Hbase 开发前,需要安装 JDK、Hadoop 和 HBase,选择一款合适的开发 IDE,具体安装方法就不介绍了,网上有很多参考资料,这里给出我的开发环境:

操作系统:Ubuntu 14.04 LTS

Java 版本:jdk1.7.0_79

Hadoop 版本:hadoop-2.6.0-cdh5.7.1

HBase 版本:hbase-1.2.0-cdh5.7.1

Ecipse 版本:Eclipse Java EE LunaRelease

Hadoop+HBase 搭建云存储总结 PDF http://www.linuxidc.com/Linux/2013-05/83844.htm

Ubuntu Server 14.04 下 Hbase 数据库安装  http://www.linuxidc.com/Linux/2016-05/131499.htm

HBase 结点之间时间不一致造成 regionserver 启动失败 http://www.linuxidc.com/Linux/2013-06/86655.htm

Hadoop+ZooKeeper+HBase 集群配置 http://www.linuxidc.com/Linux/2013-06/86347.htm

Hadoop 集群安装 &HBase 实验环境搭建 http://www.linuxidc.com/Linux/2013-04/83560.htm

基于 Hadoop 集群的 HBase 集群的配置 http://www.linuxidc.com/Linux/2013-03/80815.htm‘

Hadoop 安装部署笔记之 -HBase 完全分布模式安装 http://www.linuxidc.com/Linux/2012-12/76947.htm

单机版搭建 HBase 环境图文教程详解 http://www.linuxidc.com/Linux/2012-10/72959.htm

使用 Maven 构建项目,在 pom.xml 中添加 hbase 的依赖如下:

 

<repositories>
        <repository>
          <id>cloudera</id>
          <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
        </repository>
    </repositories>

    <dependencies>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>3.8.1</version>
            <scope>test</scope>
        </dependency>
        <dependency>  
            <groupId>org.apache.hadoop</groupId>  
            <artifactId>hadoop-common</artifactId>  
            <version>2.6.0-cdh5.7.1</version>  
        </dependency>  
        <dependency>  
            <groupId>org.apache.hadoop</groupId>  
            <artifactId>hadoop-hdfs</artifactId>  
            <version>2.6.0-cdh5.7.1</version>  
        </dependency>
        <dependency>  
            <groupId>org.apache.hbase</groupId>  
            <artifactId>hbase-client</artifactId>  
            <version>1.2.0-cdh5.7.1</version>  
        </dependency>
		<dependency>  
            <groupId>org.apache.hbase</groupId>  
            <artifactId>hbase-server</artifactId>  
            <version>1.2.0-cdh5.7.1</version>  
        </dependency>
    </dependencies>

 

2 初始化配置

 

首先需要设置 HBase 的配置,如 ZooKeeper 的地址、端口号等等。可以通过 org.apache.hadoop.conf.Configuration.set 方法手工设置 HBase 的配置信息,也可以直接将 HBase 的 hbase-site.xml 配置文件引入项目即可。下面给出配置代码:

// 声明静态配置
    private static Configuration conf = null;
    static {conf = HBaseConfiguration.create();
        conf.set("hbase.zookeeper.quorum", "localhost");
        conf.set("hbase.zookeeper.property.clientPort", "2181");
    }

3 常见 API 的使用

HBase 的常用操作包括建表、插入表数据、删除表数据、获取一行数据、表扫描、删除列族、删除表等等,下面给出具体代码。

3.1 创建数据库表

// 创建数据库表
    public static void createTable(String tableName, String[] columnFamilys) throws IOException {
        // 建立一个数据库的连接
        Connection conn = ConnectionFactory.createConnection(conf);
        // 创建一个数据库管理员
        HBaseAdmin hAdmin = (HBaseAdmin) conn.getAdmin();
        if (hAdmin.tableExists(tableName)) {System.out.println(tableName + "表已存在");
            conn.close();
            System.exit(0);
        } else {
            // 新建一个表描述
            HTableDescriptor tableDesc = new HTableDescriptor(TableName.valueOf(tableName));
            // 在表描述里添加列族
            for (String columnFamily : columnFamilys) {tableDesc.addFamily(new HColumnDescriptor(columnFamily));
            }
            // 根据配置好的表描述建表
            hAdmin.createTable(tableDesc);
            System.out.println("创建" + tableName + "表成功");
        }
        conn.close();}

3.2 添加一条数据

// 添加一条数据
    public static void addRow(String tableName, String rowKey, String columnFamily, String column, String value) 
            throws IOException {
        // 建立一个数据库的连接
        Connection conn = ConnectionFactory.createConnection(conf);
        // 获取表
        HTable table = (HTable) conn.getTable(TableName.valueOf(tableName));
        // 通过 rowkey 创建一个 put 对象
        Put put = new Put(Bytes.toBytes(rowKey));
        // 在 put 对象中设置列族、列、值
        put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(column), Bytes.toBytes(value));
        // 插入数据, 可通过 put(List<Put>) 批量插入
        table.put(put);
        // 关闭资源
        table.close();
        conn.close();}

3.3 获取一条数据

// 通过 rowkey 获取一条数据
    public static void getRow(String tableName, String rowKey) throws IOException {
        // 建立一个数据库的连接
        Connection conn = ConnectionFactory.createConnection(conf);
        // 获取表
        HTable table = (HTable) conn.getTable(TableName.valueOf(tableName));
        // 通过 rowkey 创建一个 get 对象
        Get get = new Get(Bytes.toBytes(rowKey));
        // 输出结果
        Result result = table.get(get);
        for (Cell cell : result.rawCells()) {
            System.out.println("行键:" + new String(CellUtil.cloneRow(cell)) + "\t" +
                    "列族:" + new String(CellUtil.cloneFamily(cell)) + "\t" + 
                    "列名:" + new String(CellUtil.cloneQualifier(cell)) + "\t" + 
                    "值:" + new String(CellUtil.cloneValue(cell)) + "\t" +
                    "时间戳:" + cell.getTimestamp());
        }
        // 关闭资源
        table.close();
        conn.close();}

3.4 全表扫描

// 全表扫描
    public static void scanTable(String tableName) throws IOException {
        // 建立一个数据库的连接
        Connection conn = ConnectionFactory.createConnection(conf);
        // 获取表
        HTable table = (HTable) conn.getTable(TableName.valueOf(tableName));
        // 创建一个扫描对象
        Scan scan = new Scan();
        // 扫描全表输出结果
        ResultScanner results = table.getScanner(scan);
        for (Result result : results) {for (Cell cell : result.rawCells()) {
                System.out.println("行键:" + new String(CellUtil.cloneRow(cell)) + "\t" +
                        "列族:" + new String(CellUtil.cloneFamily(cell)) + "\t" + 
                        "列名:" + new String(CellUtil.cloneQualifier(cell)) + "\t" + 
                        "值:" + new String(CellUtil.cloneValue(cell)) + "\t" +
                        "时间戳:" + cell.getTimestamp());
            }
        }
        // 关闭资源
		results.close();
        table.close();
        conn.close();}

3.5 删除一条数据

// 删除一条数据
    public static void delRow(String tableName, String rowKey) throws IOException {
        // 建立一个数据库的连接
        Connection conn = ConnectionFactory.createConnection(conf);
        // 获取表
        HTable table = (HTable) conn.getTable(TableName.valueOf(tableName));
        // 删除数据
        Delete delete = new Delete(Bytes.toBytes(rowKey));
        table.delete(delete);
        // 关闭资源
        table.close();
        conn.close();}

3.6 删除多条数据

// 删除多条数据
    public static void delRows(String tableName, String[] rows) throws IOException {
        // 建立一个数据库的连接
        Connection conn = ConnectionFactory.createConnection(conf);
        // 获取表
        HTable table = (HTable) conn.getTable(TableName.valueOf(tableName));
        // 删除多条数据
        List<Delete> list = new ArrayList<Delete>();
        for (String row : rows) {Delete delete = new Delete(Bytes.toBytes(row));
            list.add(delete);
        }
        table.delete(list);
        // 关闭资源
        table.close();
        conn.close();}

3.7 删除列族

// 删除列族
    public static void delColumnFamily(String tableName, String columnFamily) throws IOException {
        // 建立一个数据库的连接
        Connection conn = ConnectionFactory.createConnection(conf);
        // 创建一个数据库管理员
        HBaseAdmin hAdmin = (HBaseAdmin) conn.getAdmin();
        // 删除一个表的指定列族
        hAdmin.deleteColumn(tableName, columnFamily);
        // 关闭资源
        conn.close();}

3.8 删除数据库表

// 删除数据库表
    public static void deleteTable(String tableName) throws IOException {
        // 建立一个数据库的连接
        Connection conn = ConnectionFactory.createConnection(conf);
        // 创建一个数据库管理员
        HBaseAdmin hAdmin = (HBaseAdmin) conn.getAdmin();
        if (hAdmin.tableExists(tableName)) {
            // 失效表
            hAdmin.disableTable(tableName);
            // 删除表
            hAdmin.deleteTable(tableName);
            System.out.println("删除" + tableName + "表成功");
            conn.close();} else {System.out.println("需要删除的" + tableName + "表不存在");
            conn.close();
            System.exit(0);
        }
    }

3.9 追加插入

// 追加插入 (将原有 value 的后面追加新的 value,如原有 value= a 追加 value=bc 则最后的 value=abc)
    public static void appendData(String tableName, String rowKey, String columnFamily, String column, String value) 
            throws IOException {
        // 建立一个数据库的连接
        Connection conn = ConnectionFactory.createConnection(conf);
        // 获取表
        HTable table = (HTable) conn.getTable(TableName.valueOf(tableName));
        // 通过 rowkey 创建一个 append 对象
        Append append = new Append(Bytes.toBytes(rowKey));
        // 在 append 对象中设置列族、列、值
        append.add(Bytes.toBytes(columnFamily), Bytes.toBytes(column), Bytes.toBytes(value));
        // 追加数据
        table.append(append);
        // 关闭资源
        table.close();
        conn.close();}

3.10 符合条件后添加数据

// 符合条件后添加数据 (只能针对某一个 rowkey 进行原子操作)
    public static boolean checkAndPut(String tableName, String rowKey, String columnFamilyCheck, String columnCheck, String valueCheck, String columnFamily, String column, String value) throws IOException {
        // 建立一个数据库的连接
        Connection conn = ConnectionFactory.createConnection(conf);
        // 获取表
        HTable table = (HTable) conn.getTable(TableName.valueOf(tableName));
        // 设置需要添加的数据
        Put put = new Put(Bytes.toBytes(rowKey));
        put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(column), Bytes.toBytes(value));
        // 当判断条件为真时添加数据
        boolean result = table.checkAndPut(Bytes.toBytes(rowKey), Bytes.toBytes(columnFamilyCheck), 
                Bytes.toBytes(columnCheck), Bytes.toBytes(valueCheck), put);
        // 关闭资源
        table.close();
        conn.close();
        
        return result;
    }

3.11 符合条件后删除数据

// 符合条件后刪除数据 (只能针对某一个 rowkey 进行原子操作)
    public static boolean checkAndDelete(String tableName, String rowKey, String columnFamilyCheck, String columnCheck, 
            String valueCheck, String columnFamily, String column) throws IOException {
        // 建立一个数据库的连接
        Connection conn = ConnectionFactory.createConnection(conf);
        // 获取表
        HTable table = (HTable) conn.getTable(TableName.valueOf(tableName));
        // 设置需要刪除的 delete 对象
        Delete delete = new Delete(Bytes.toBytes(rowKey));
        delete.addColumn(Bytes.toBytes(columnFamilyCheck), Bytes.toBytes(columnCheck));
        // 当判断条件为真时添加数据
        boolean result = table.checkAndDelete(Bytes.toBytes(rowKey), Bytes.toBytes(columnFamilyCheck), Bytes.toBytes(columnCheck), 
                Bytes.toBytes(valueCheck), delete);
        // 关闭资源
        table.close();
        conn.close();

        return result;
    }

3.12 计数器

// 计数器 (amount 为正数则计数器加,为负数则计数器减,为 0 则获取当前计数器的值)
    public static long incrementColumnValue(String tableName, String rowKey, String columnFamily, String column, long amount) 
            throws IOException {
        // 建立一个数据库的连接
        Connection conn = ConnectionFactory.createConnection(conf);
        // 获取表
        HTable table = (HTable) conn.getTable(TableName.valueOf(tableName));
        // 计数器
        long result = table.incrementColumnValue(Bytes.toBytes(rowKey), Bytes.toBytes(columnFamily), Bytes.toBytes(column), amount);
        // 关闭资源
        table.close();
        conn.close();
        
        return result;
    }

4 内置过滤器的使用

HBase 为筛选数据提供了一组过滤器,通过这个过滤器可以在 HBase 中数据的多个维度(行、列、数据版本)上进行对数据的筛选操作,也就是说过滤器最终能够筛选的数据能够细化到具体的一个存储单元格上(由行键、列名、时间戳定位)。通常来说,通过行键、值来筛选数据的应用场景较多。需要说明的是,过滤器会极大地影响查询效率。所以,在数据量较大的数据表中,应尽量避免使用过滤器。

下面介绍一些常用的 HBase 内置过滤器的用法:

1、RowFilter:筛选出匹配的所有的行。使用 BinaryComparator 可以筛选出具有某个行键的行,或者通过改变比较运算符(下面的例子中是 CompareFilter.CompareOp.EQUAL)来筛选出符合某一条件的多条数据,如下示例就是筛选出行键为 row1 的一行数据。

// 筛选出匹配的所有的行
Filter rf = new RowFilter(CompareFilter.CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes("row1")));

2、PrefixFilter:筛选出具有特定前缀的行键的数据。这个过滤器所实现的功能其实也可以由 RowFilter 结合 RegexComparator 来实现,不过这里提供了一种简便的使用方法,如下示例就是筛选出行键以 row 为前缀的所有的行。

// 筛选匹配行键的前缀成功的行
Filter pf = new PrefixFilter(Bytes.toBytes("row"));

3、KeyOnlyFilter:这个过滤器唯一的功能就是只返回每行的行键,值全部为空,这对于只关注于行键的应用场景来说非常合适,这样忽略掉其值就可以减少传递到客户端的数据量,能起到一定的优化作用。

 

// 返回所有的行键,但值全是空
Filter kof = new KeyOnlyFilter();

 

4、RandomRowFilter:按照一定的几率(<= 0 会过滤掉所有的行,>= 1 会包含所有的行)来返回随机的结果集,对于同样的数据集,多次使用同一个 RandomRowFilter 会返回不同的结果集,对于需要随机抽取一部分数据的应用场景,可以使用此过滤器。

// 随机选出一部分的行
Filter rrf = new RandomRowFilter((float) 0.8);

5、InclusiveStopFilter:扫描的时候,我们可以设置一个开始行键和一个终止行键,默认情况下,这个行键的返回是前闭后开区间,即包含起始行,但不包含终止行。如果我们想要同时包含起始行和终止行,那么可以使用此过滤器。

// 包含了扫描的上限在结果之内
Filter isf = new InclusiveStopFilter(Bytes.toBytes("row1"));

6、FirstKeyOnlyFilter:如果想要返回的结果集中只包含第一列的数据,那么这个过滤器能够满足要求。它在找到每行的第一列之后会停止扫描,从而使扫描的性能也得到了一定的提升。

// 筛选出每行的第一个单元格
Filter fkof = new FirstKeyOnlyFilter();

7、ColumnPrefixFilter:它按照列名的前缀来筛选单元格,如果我们想要对返回的列的前缀加以限制的话,可以使用这个过滤器。

// 筛选出前缀匹配的列
Filter cpf = new ColumnPrefixFilter(Bytes.toBytes("qual1"));

8、ValueFilter:按照具体的值来筛选单元格的过滤器,这会把一行中值不能满足的单元格过滤掉,如下面的构造器,对于每一行的一个列,如果其对应的值不包含 ROW2_QUAL1,那么这个列就不会返回给客户端。

// 筛选某个(值的条件满足的)特定的单元格
Filter vf = new ValueFilter(CompareFilter.CompareOp.EQUAL, new SubstringComparator("ROW2_QUAL1"));

9、ColumnCountGetFilter:这个过滤器在遇到一行的列数超过我们所设置的限制值的时候,结束扫描操作。

// 如果突然发现一行中的列数超过设定的最大值时,整个扫描操作会停止
Filter ccf = new ColumnCountGetFilter(2);

10、SingleColumnValueFilter:用一列的值决定这一行的数据是否被过滤,可对它的对象调用 setFilterIfMissing 方法,默认的参数是 false。其作用是,对于咱们要使用作为条件的列,如果参数为 true,这样的行将会被过滤掉,如果参数为 false,这样的行会包含在结果集中。

// 将满足条件的列所在的行过滤掉
SingleColumnValueFilter scvf = new SingleColumnValueFilter(•          Bytes.toBytes("colfam1"),   
•          Bytes.toBytes("qual2"),   
•          CompareFilter.CompareOp.NOT_EQUAL,   
•          new SubstringComparator("BOGUS"));  
scvf.setFilterIfMissing(true);

11、SingleColumnValueExcludeFilter:这个过滤器与第 10 种过滤器唯一的区别就是,作为筛选条件的列,其行不会包含在返回的结果中。

12、SkipFilter:这是一种附加过滤器,其与 ValueFilter 结合使用,如果发现一行中的某一列不符合条件,那么整行就会被过滤掉。

// 发现某一行中的一列需要过滤时,整个行就会被过滤掉
Filter skf = new SkipFilter(vf);

13、WhileMatchFilter:使用这个过滤器,当遇到不符合设定条件的数据的时候,整个扫描结束。

// 当遇到不符合过滤器 rf 设置的条件时,整个扫描结束
Filter wmf = new WhileMatchFilter(rf);

14. FilterList:可以用于综合使用多个过滤器。其有两种关系:Operator.MUST_PASS_ONE 表示关系 AND,Operator.MUST_PASS_ALL 表示关系 OR,并且 FilterList 可以嵌套使用,使得我们能够表达更多的需求。

// 综合使用多个过滤器,AND 和 OR 两种关系
List<Filter> filters = new ArrayList<Filter>();  
filters.add(rf);  
filters.add(vf);  
FilterList fl = new FilterList(FilterList.Operator.MUST_PASS_ALL,filters);

下面给出一个使用 RowFilter 过滤器的完整示例:

public class HBaseFilter {
    
    private static final String TABLE_NAME = "table1";

    public static void main(String[] args) throws IOException {
        // 设置配置
        Configuration conf = HBaseConfiguration.create();
        conf.set("hbase.zookeeper.quorum", "localhost");
        conf.set("hbase.zookeeper.property.clientPort", "2181");
        // 建立一个数据库的连接
        Connection conn = ConnectionFactory.createConnection(conf);
        // 获取表
        HTable table = (HTable) conn.getTable(TableName.valueOf(TABLE_NAME));
        // 创建一个扫描对象
        Scan scan = new Scan();
        // 创建一个 RowFilter 过滤器
        Filter filter = new RowFilter(CompareFilter.CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes("abc")));
        // 将过滤器加入扫描对象
        scan.setFilter(filter);
        // 输出结果
        ResultScanner results = table.getScanner(scan);
        for (Result result : results) {for (Cell cell : result.rawCells()) {
                System.out.println("行键:" + new String(CellUtil.cloneRow(cell)) + "\t" +
                        "列族:" + new String(CellUtil.cloneFamily(cell)) + "\t" + 
                        "列名:" + new String(CellUtil.cloneQualifier(cell)) + "\t" + 
                        "值:" + new String(CellUtil.cloneValue(cell)) + "\t" +
                        "时间戳:" + cell.getTimestamp());
            }
        }
        // 关闭资源
        results.close();
        table.close();
        conn.close();}

}

5 HBase 与 MapReduce

我们知道,在伪分布式模式和完全分布式模式下的 HBase 是架构在 HDFS 之上的,因此完全可以将 MapReduce 编程框架和 HBase 结合起来使用。也就是说,将 HBase 作为底层存储结构,MapReduce 调用 HBase 进行特殊的处理,这样能够充分结合 HBase 分布式大型数据库和 MapReduce 并行计算的优点。

HBase 实现了 TableInputFormatBase 类,该类提供了对表数据的大部分操作,其子类 TableInputFormat 则提供了完整的实现,用于处理表数据并生成键值对。TableInputFormat 类将数据表按照 Region 分割成 split,即有多少个 Regions 就有多个 splits,然后将 Region 按行键分成 <key,value> 对,key 值对应与行键,value 值为该行所包含的数据。

HBase 实现了 MapReduce 计算框架对应的 TableMapper 类和 TableReducer 类。其中,TableMapper 类并没有具体的功能,只是将输入的 <key,value> 对的类型分别限定为 Result 和 ImmutableBytesWritable。IdentityTableMapper 类和 IdentityTableReducer 类则是上述两个类的具体实现,其和 Mapper 类和 Reducer 类一样,只是简单地将 <key,value> 对输出到下一个阶段。

HBase 实现的 TableOutputFormat 将输出的 <key,value> 对写到指定的 HBase 表中,该类不会对 WAL(Write-Ahead Log)进行操作,即如果服务器发生故障将面临丢失数据的风险。可以使用 MultipleTableOutputFormat 类解决这个问题,该类可以对是否写入 WAL 进行设置。

为了能使 Hadoop 集群上运行 HBase 程序,还需要把相关的类文件引入 Hadoop 集群上,不然会出现 ClassNotFoundException 错误。其具体方法是可在 hadoop 的环境配置文件 hadoop-env.sh 中引入 HBASE_HOME 和 HBase 的相关 jar 包,或者直接将 HBase 的 jar 包打包到应用程序文件中。

下面这个例子是将 MapReduce 和 HBase 结合起来的 WordCount 程序,它首先从指定文件中搜集数据,进行统计计算,最后将结果存储到 HBase 中:

 

package com.hbase.demo;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;

public class HBaseWordCount {
    
    public static class hBaseMapper extends Mapper<LongWritable, Text, Text, IntWritable> {private final static IntWritable ONE = new IntWritable(1);
        private Text word = new Text();

        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String[] words = value.toString().split(" ");
            for (String w : words) {word.set(w);
                context.write(word, ONE);
            }
        }
    }
    
    public static class hBaseReducer extends TableReducer<Text, IntWritable, NullWritable> {

        @Override
        protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
            int sum = 0;
            for (IntWritable value : values) {sum += value.get();
            }
            
            // Put 实例化,每个词存一行
            Put put = new Put(key.getBytes());
            // 列族为 content, 列名为 count, 列值为单词的数目
            put.addColumn("content".getBytes(), "count".getBytes(), String.valueOf(sum).getBytes());
            
            context.write(NullWritable.get(), put);
        }
        
    }
    
    // 创建 HBase 数据表
    public static void createHBaseTable(String tableName) throws IOException {
        // 配置 HBse
        Configuration conf = HBaseConfiguration.create();
        conf.set("hbase.zookeeper.quorum", "localhost");
        conf.set("hbase.zookeeper.property.clientPort", "2181");
        // 建立一个数据库的连接
        Connection conn = ConnectionFactory.createConnection(conf);
        // 创建一个数据库管理员
        HBaseAdmin hAdmin = (HBaseAdmin) conn.getAdmin();
        // 判断表是否存在
        if (hAdmin.tableExists(tableName)) {System.out.println("该数据表已存在,正在重新创建");
            hAdmin.disableTable(tableName);
            hAdmin.deleteTable(tableName);
        }
        // 创建表描述
        HTableDescriptor tableDesc = new HTableDescriptor(TableName.valueOf(tableName));
        // 在表描述里添加列族
        tableDesc.addFamily(new HColumnDescriptor("content"));
        // 创建表
        hAdmin.createTable(tableDesc);
        System.out.println("创建" + tableName + "表成功");
    }
    
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {if (args.length != 3) {System.out.println("args error");
            System.exit(0);
        }
        
        String input = args[0];
        String jobName = args[1];
        String tableName = args[2];
        
        // 创建数据表
        HBaseWordCount.createHBaseTable(tableName);
        
        // 配置 MapReduce(或者将 hadoop 和 hbase 的相关配置文件引入项目)
        Configuration conf = new Configuration();
        conf.set("fs.defaultFS", "localhost:9000");
       conf.set("mapred.job.tracker", "localhost:9001");
        conf.set("hbase.zookeeper.quorum", "localhost");
        conf.set("hbase.zookeeper.property.clientPort", "2181");
        conf.set(TableOutputFormat.OUTPUT_TABLE, tableName);
        
        // 配置任务
        Job job = Job.getInstance(conf, jobName);
        job.setJarByClass(HBaseWordCount.class);
        job.setMapperClass(hBaseMapper.class);
        job.setReducerClass(hBaseReducer.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TableOutputFormat.class);
        FileInputFormat.addInputPath(job, new Path(input));
        
        // 执行 MR 任务
        boolean result = job.waitForCompletion(true);
        System.exit(result ? 0 : 1);
    }

}

更多详情见请继续阅读下一页的精彩内容 :http://www.linuxidc.com/Linux/2016-08/134181p2.htm

正文完
星哥玩云-微信公众号
post-qrcode
 0
星锅
版权声明:本站原创文章,由 星锅 于2022-01-22发表,共计37272字。
转载说明:除特殊说明外本站文章皆由CC-4.0协议发布,转载请注明出处。
【腾讯云】推广者专属福利,新客户无门槛领取总价值高达2860元代金券,每种代金券限量500张,先到先得。
阿里云-最新活动爆款每日限量供应
评论(没有评论)
验证码
【腾讯云】云服务器、云数据库、COS、CDN、短信等云产品特惠热卖中