您的当前位置:首页正文

HBase开发实践

2023-04-21 来源:易榕旅网


前言 .................................................................................................................................................. 3 第一部分 数据结构 ......................................................................................................................... 3

表定义....................................................................................................................................... 3 数据结构定义 ........................................................................................................................... 3 主键........................................................................................................................................... 3 初始化....................................................................................................................................... 4 第二部分 表的初始化 ..................................................................................................................... 4

初始化策略 ............................................................................................................................... 4

保留最新版本 ................................................................................................................... 4 压缩策略 ........................................................................................................................... 4 自动过期 ........................................................................................................................... 4 预分区 ............................................................................................................................... 4 写入内存 ........................................................................................................................... 4

第三部分 代码开发 ......................................................................................................................... 5

CRUD操作 .............................................................................................................................. 5

新增操作 ........................................................................................................................... 5 更新操作 ........................................................................................................................... 5 查询操作 ........................................................................................................................... 5 删除操作 ........................................................................................................................... 5 表实例池 ................................................................................................................................... 6 获取总数 ................................................................................................................................... 6 列表分页 ................................................................................................................................... 7 列表排序 ................................................................................................................................... 8 第四部分 Hbase查询 ...................................................................................................................... 8

比较运算符 ............................................................................................................................... 8 比较器介绍 ............................................................................................................................... 9

BinaryComparator ............................................................................................................. 9 NullComparator ................................................................................................................ 9 RegexStringComparator .................................................................................................... 9 常用过滤器介绍 ..................................................................................................................... 10 过滤器应用实例 ..................................................................................................................... 10 过滤器集合的使用 ................................................................................................................. 10 复杂查询过滤 ......................................................................................................................... 10 第五部分 HBase性能优化 ........................................................................................................... 11

查询缓存 ................................................................................................................................. 11 多线程配置 ............................................................................................................................. 11 预分区..................................................................................................................................... 12 附录:代码..................................................................................................................................... 12

InitHBaseTable ....................................................................................................................... 12 InitHBaseTable.properties ...................................................................................................... 15 table=PK_YCONTROL/BASEINFO/ /false/false/false ............................................ 15 HBaseRowKeyUtil HBase主键生成工具 .......................................................................... 15 PKXControlDaoHBaseImpl .................................................................................................... 17

public void addXControlFK(String PK) throws Exception{ .......................................... 17

1

public void deleteXControlFK(String PK) throws Exception{ ....................................... 18 public String getXControlFK(String PK) throws Exception{ ......................................... 18 public List getXControlPKs(String FK) throws Exception{ ............................ 18 public String getXControlPK(String FK) throws Exception{ ......................................... 19 HBaseConvetorUtil 实体转换工具类 ................................................................................... 19 HBaseDaoImpl HBase基础类 ............................................................................................ 21

public void savePut(String tableName,Put put) throws IOException { .......................... 22 public void savePut(String tableName, List puts) throws IOException { ............. 22 public void savePutDelete(String tableName, PutDelete putdelete) throws IOException { ....................................................................................................................................... 22 public void savePutDelete(String tableName, List pds) throws IOException { ....................................................................................................................................... 23 public void delete(String tableName, Delete del) throws IOException { ....................... 23 public void delete(String tableName, String id) throws IOException { .......................... 23 public void delete(String tableName, List dels) throws IOException { ........... 24 public T get(Class cla, String tableName, String id) throws Exception { ........ 24 public Result get(String tableName, String id) throws IOException { ........................... 24 public List getList(Class cla, String tableName,Filter valueFilter) throws Exception { ..................................................................................................................... 25 public ResultScanner get(String tableName, Filter valueFilter) throws IOException { . 25 public List getList(Class cla, String tableName,Filter valueFilter,Integer currteIndex) throws Exception { ..................................................................................... 26 public Long getTotal(String tableName){ ...................................................................... 27 public Long getTotal(String tableName, Filter valueFilter){ .......................................... 27 public long getCount(String tableName,String rowId,String family,String qualifier,long amount) throws IOException{ ........................................................................................ 30 HTablePoolUtil 表实例池 ..................................................................................................... 31 PageLastRowCache 分页缓存工具类 ................................................................................... 31 Utils ......................................................................................................................................... 33 PageInfo 分页 ........................................................................................................................ 34 FilterHelper 过滤帮助类 ....................................................................................................... 36

public static Filter getLESS_EQUAL_GREATERFilter(String startField, String endField, String fieldValue, boolean isFilterNull){ ........................................................ 36 public static Filter getLESS_OR_EQUALFilter(String field, String fieldValue, boolean isFilterNull){ ................................................................................................................... 37 public static Filter getGREATER_OR_EQUALFilter(String field, String fieldValue, boolean isFilterNull){ ..................................................................................................... 37 public static Filter getNOT_LESS_EQUAL_GREATERFilter(String startField, String endField, String fieldValue){ .......................................................................................... 38 public static Filter getRegexStringFilterContainsNull(String Colume,String value){ .... 38 public static Filter getRegexStringFilter(String Colume,String value){ ......................... 39 public static Filter getNotRegexStringFilter(String Colume,String value){ ................... 39 public static Filter getEqualFilter(String Colume,String value){ .................................... 40 public static Filter getNotEqualFilter(String Colume,String value){ .............................. 40

2

public static Filter getNullFilter(String Colume){ .......................................................... 40 public static Filter getNotNullFilter(String Colume){ .................................................... 41

前言

HBase是一个分布式的、面向列的开源数据库.HBase不同于一般的关系数据库,它是一个适合于非结构化数据存储的数据库。另一个不同的是HBase基于列的而不是基于行的模式。HBase – Hadoop Database,是一个高可靠性、高性能、面向列、可伸缩的分布式存储系统,利用HBase技术可在廉价PC Server上搭建起大规模结构化存储集群。

在调控层的开发过程中,调控文件可以录入多个子规则,因此系统拆分出多个调控子规则,用于运价匹配查询时用。由于预先设计的不完善,调控文件和调控子规则都保存到hbase中,因此才会涉及到hbase分页和hbase排序问题,虽然解决了这个问题,但是觉得调控文件确实没必要放在hbase中,因为它不参与运价的匹配查询。

提示:关系型数据操作尽量不要用hbase作为数据库。

第一部分 数据结构

表定义

表名:目前表名定义”T_”+实体名大写

列族:列族定义不超过三个。目前只定义了一个列族”BASEINFO”

数据结构定义

属性类型:推荐使用string。在传统的数据库中因为数据的大小不同,所以会用int、short、long、String、double来保存,定义数据格式的时候需要给定一个大小,但是在Hbase中,并不需要定义数据存储空间大小。 属性名:大写

主键

主键:表名前缀+yyyyMMdd+4位序列号

备注:序号列从自增表中获取,每天定义重置为0.

3

初始化

表名和列族必须先初始化,该表才能使用。示例:create 'T_XCONTROL','BASEINFO'

第二部分 表的初始化

代码参考:InitHBaseTable.properties InitHBaseTable

初始化策略 保留最新版本

只需要保存最新版本的数据HColumnDescriptor.setMaxVersions(1)

压缩策略

是否使用压缩策略

自动过期

数据自动过期,通过HColumnDescriptor.setTimeToLive(inttimeToLive)设置表中数据的存储生命期

预分区

介绍:hbase region split策略 是否使用预分区策略

写入内存

是否需要写入内存

4

第三部分 代码开发

CRUD操作

简单的CRUD操作,参考HBase权威指南(中文版).pdf,下面的是对HBase基本操作进行面向对象封装后的CRUD操作。所有以HBase作为存储数据库的DAO层,都继承

HBaseDaoImpl类,下列是使用示例。

新增操作

public String add(XControl control) throws Exception {

String id = HBaseRowKeyUtil.getRowKey(controlTableName); control.setId(id);

control.setStatus(Status.ADD.getValue());

PutDelete pd=HBaseConvetorUtil.convetor(control,id); super.savePutDelete(controlTableName, pd); return id;

更新操作

public String update(XControl control) throws Exception { String id = control.getId();

PutDelete pd=HBaseConvetorUtil.convetor(control,id); super.savePutDelete(controlTableName, pd); return id; }

查询操作

public XControl getXControl(String id) throws Exception { return super.get(XControl.class,controlTableName, id); }

删除操作

public void delete(String id) throws IOException { delete(controlTableName, id); }

5

表实例池

创建HTable实例是一项非常耗时的操作,通常耗时数秒才能完成。在资源高度紧张的环境下,每秒都有几千个请求,为每个请求单独创建HTable实例根本行不通。用户应当在一开始创建实例,然后再客户端生命周期内不断复用他们。

但是,在多线程环境中重用HTable实例会出现其他问题。

客户端可以通过HTablePool类来解决这个问题。它只有一个目的,那就是为HBase集群提供客户端连接池。

HTablePool类的使用参考HTablePoolUtil 表实例池

获取实例:HTablePoolUtil.getHTablePoolUtil().getHTable(tableName); 关闭实例:HTablePoolUtil.getHTablePoolUtil().putHTable(tableName);

获取总数

获取总数利用了HBase协处理器功能

1.配置

在$HBASE_HOME/conf/hbase-site.xml添加一个配置项。我用的0.94版本自带的实现为AggregateImplementation,具体如下

hbase.coprocessor.region.classes

org.apache.hadoop.hbase.coprocessor.AggregateImplementation

若之前未配置此项,则配置完后,需要重启hbase方能生效。 2.客户端使用代码示例

//获得符合条件结果总数

public Long getTotal(String tableName, Filter valueFilter){ Scan scan=new Scan(); if(null!=valueFilter){

scan.setFilter(valueFilter); }

AggregationClient aggregationClient = new AggregationClient(conf); long rowCount = 0; try {

scan.addColumn(Bytes.toBytes(\"BASEINFO\"), null);//必

6

须有此句,或者用addFamily(),否则出错,异常包含 ci **** rowCount

aggregationClient.rowCount(Bytes.toBytes(tableName), scan);

} catch (Throwable e) { e.printStackTrace(); }

return rowCount; }

= null,

列表分页

HBase的分页实现相对复杂一些。核心思想是结合分页过滤器

PageFilter(pageSize)和查询设置开始行scan.setStartRow(lastRow),lastRow为上一次查询rowkey,需要注意的是该rowkey是一个数组,对应多字段的存储位置;

不同用户登录会产生不同lastRow,因此我们把lastRow存储到session中,参考PageLastRowCache 。

为了解耦,我们又把对lastRow操作封装到HBaseDaoImpl ,以便开发写代码的时候不需要关心lastRow的操作。

public PageInfo searchXControl(QueryControlRuleQO qo,Integer pageSize,Integer currteIndex) throws Exception { //条件过滤器 FilterList filterList = new QueryControlRuleFilterList(qo).getFilterList(); //获得符合条件结果总数

Long total = getTotal(controlTableName, filterList); //过滤器集合

FilterList fl=new FilterList(); //分页过滤器

Filter filter = new PageFilter(pageSize); fl.addFilter(filterList); fl.addFilter(filter); //封装结果集 List list = getList(XControl.class, controlTableName, fl, currteIndex);

log.info(\"--------------------- total : \" + list.size()); //返回结果集

PageInfo page = new PageInfo(total, list); return page;

7

}

列表排序

hbase都是按照字典序进行排序的,也就是降序,在页面的表现就是最早的数据(rowkey最小的)排在前面。

目前的解决方案是:给主键增加一个外键关联表,外键的生成规则是

400000000000-主键号,比如主键是X201401110001,对应外键则是X198598889999,为了实现升序排序功能,保存实体的时候用X198598889999作为主键,页面查询的时候再从关联表中根据X198598889999获取X201401110001。 备注:需要对新增、删除、查询进行关联操作。 示例:

public String add(XControl control) throws Exception { pkControlDao.addXControlFK(id); }

public void delete(String id) throws Exception { pkControlDao.deleteXControlFK(id); }

public PageInfo searchXControl(QueryControlRuleQO qo,Integer pageSize,Integer currteIndex) throws Exception { //根据外键查询出匹配主键

if(StringUtils.isNotBlank(qo.getId())){

qo.setPKs(pkControlDao.getXControlPKs(qo.getId())); }

代码参考:HBaseRowKeyUtil PKXControlDaoHBaseImpl

第四部分 Hbase查询

比较运算符

注意:所有比较运算符匹配的时候都是拿数据库的数据值跟设定值比较,而不是拿设定值去跟数据库的数据值比较 LESS LESS_OR_EQUAL EQUAL NOT_EQUAL GREATER_OR_EQUAL GREATER NO_OP

8

匹配小于设定值的值 匹配小于等于设定值的值 匹配等于设定值的值 匹配与设定值不相等的值 匹配大于或等于设定值的值 匹配大于设定值的值 排除一切值

比较器介绍

BinaryComparator BinaryPrefixComparator NullComparator BitComparator RegexStringComparator SubstringComparator 使用Bytes.compareTo()比较当前值与阀值 与上面的相似,使用Bytes.compareTo()进行匹配,但是是从左端开始前缀匹配 不做匹配,只判断当前值是不是null 通过BitwiseOp类提供的按位与(AND)、或(OR)、异或(XOR)操作执行位级比较 根据一个正则表达式,在实例化这个比较器的时候去匹配表中的数据 把阀值和表中数据当String实例,同时通过contains()操作匹配字符串

上表示HBase对基于CompareFilter的过滤器提供的比较器,在调控项目中我们用到是BinaryComparator、NullComparator、RegexStringComparator,下面详细说明下BinaryComparator、NullComparator使用情况

BinaryComparator

对于所有的比较运算符都可以使用,因此在等于、不等于、范围匹配的时候用它就可以了。

NullComparator

在判断为空或者不为空的时候会用到比较器。

在使用NullComparator需要注意的是,HBase对空的定义。举例说明:

row1的endarea是没有值的,但是在HBase中,它并表示空。 row2的endarea不存在该列,在HBase中,它表示空。

RegexStringComparator

跟SubstringComparator比较器作用差不多,常用来做字符串匹配,与等于、不等于比较运算符结合使用,不能与范围(LESS、GREATER……)比较运算使用。

9

常用过滤器介绍

HBase提供的过滤器有很多,详细参考HBase权威指南(中文版).pdf,在这次项目中用到的过滤器,我们主要用了SingleColumnValueFilter以及分页的时候用到PageFilter。

过滤器应用实例

注意:这里提供的过滤器实例都是针对单列值进行过滤的。

范围过滤:小于、小于或等于、大于、大于或等于、大于等于或小于、小于或大于 值过滤:等于、不等于

字符串过滤:匹配、不匹配 空过滤:空、非空

代码参考FilterHelper 过滤帮助类

过滤器集合的使用

在传统数据库查询中,经常会用到where A like ? and B=?,或者是where A like ? or B=?。

HBase实现这个功能,需要用到FilterList,示例: where A like ? and B=?可以这样写

FilterList andlist = new FilterList(Operator.MUST_PASS_ALL); andlist.addFilter(FilterHelper.getRegexStringFilter(field_A, field_A_Value));

andlist.addFilter(FilterHelper.getEqualFilter(field_B, field_A_Value));

where A like ? or B=?可以这样写

FilterList andlist = new FilterList(Operator.MUST_PASS_ONE); andlist.addFilter(FilterHelper.getRegexStringFilter(field_A, field_A_Value));

andlist.addFilter(FilterHelper.getEqualFilter(field_B, field_A_Value));

复杂查询过滤

在上面查询比较简单,但实际业务中经常遇到更复杂的查询。例如:where (A like ? and B=?)or(where A like ? Or B=?),与上面示例相比,其实是多了一层嵌套。

在HBase中我们也可以嵌套FilterList来实现这种复杂的查询:

FilterList andlist = new FilterList(Operator.MUST_PASS_ALL); andlist.addFilter(FilterHelper.getRegexStringFilter(field_A,

10

field_A_Value));

andlist.addFilter(FilterHelper.getEqualFilter(field_B, field_A_Value));

FilterList orlist = new FilterList(Operator.MUST_PASS_ONE); orlist.addFilter(FilterHelper.getRegexStringFilter(field_A, field_A_Value));

orlist.addFilter(FilterHelper.getEqualFilter(field_B, field_A_Value));

FilterList list = new FilterList(Operator.MUST_PASS_ONE); list.addFilter(andlist);

list.addFilter(orlist);

在调控项目中我们用到很多这种FilterList嵌套,根据业务逻辑不同,嵌套的层级不同。

第五部分 HBase性能优化

查询缓存

Scan的caching属性默认值是1,意味着扫描器每次从region服务器抓取1条记录进行匹配。我们可以设置caching为比1大得多的值。例如,设置为500,则一次可以抓取500条,需要注意的是该值设得越大服务器的内存开销会越多。 HTableInterface hTable=getHTable(tableName); Scan scan=new Scan(); /*设置缓存*/

scan.setCaching(StaticConfig.getIControl_hbase_cache()); ResultScanner scanner= hTable.getScanner(scan);

多线程配置

Hbase.regionser.handler.count

RegionServer中RPC监听器实例的数量。对于master来说,这个属性是master受理的处理线程(handler)数量。默认值是10。

根据调控层的业务场景,1条运价的匹配查询就会产生4条hbase并发查询。如果有20条,就可能有80条并发,这个并发量是相当的。除了将该参数适当调大可以增加并发处理能力外,还跟集群的数量和服务器的配置有直接的关系,预计集群数量越多,服务器CPU核数越高,并发处理能力越强。

11

预分区

HRegion是Hbase中分布式存储和负载均衡的最小单元。最小单元就表示不同的Hregion可以分布在不同的HRegion server上。但一个Hregion是不会拆分到多个server上的。

Hbase.hregion.max.filesize

HstoreFile的最大值。Region中任何一个列族的存储文件如果超过了这个上限,就会被拆分成两个region。默认:268435456(256x1024x1024),即256M。

我们的调控文件比较小,要达到分区最大上限256M需要较多的调控文件。为了提高并发量,我们需要在没有达到分区上限的情况下,产生多个hregion来保存和处理数据,这里就用hbase的预分区功能。 示例:

Configuration conf = HBaseConfiguration.create() HBaseAdmin admin = new HBaseAdmin(conf);

HTableDescriptor desc = new HTableDescriptor( Bytes.toBytes(tablename));

HColumnDescriptor coldef = new HColumnDescriptor( Bytes.toBytes(colfamily));

admin.createTable(desc, Bytes.toBytes(1L), Bytes.toBytes(10L), 10); //以第一位字符不同划分区

desc.setValue(HTableDescriptor.SPLIT_POLICY, KeyPrefixRegionSplitPolicy.class.getName()); desc.setValue(\"prefix_split_key_policy.prefix_length\

附录:代码

InitHBaseTable

public class InitHBaseTable {

private static Logger log Logger.getLogger(InitHBaseTable.class);

private static Configuration conf HBaseConfiguration.create();

private static PropertiesConfiguration config = null; static{

Configuration conf = HBaseConfiguration.create(); try {

config = new PropertiesConfiguration(

12

=

=

\"InitHBaseTable.properties\"); } catch (ConfigurationException e) {

log.error(\"配置加载错误:\" + e.getMessage(), e); } }

public void init() { try {

List list = config.getList(\"table\"); for(Object table : list){

HBaseHelper helper = HBaseHelper.getHelper(conf); HBaseAdmin admin = new HBaseAdmin(conf);

String[] tableParams = ((String)table).split(\"/\"); helper.dropTable(tableParams[0]);

HTableDescriptor desc = getHTableDescriptor(tableParams);

HColumnDescriptor coldef = getHColumnDescriptor(tableParams);

desc.addFamily(coldef); //是否需要region拆分

if(tableParams[3].equals(\"true\")){ admin.createTable(desc, Bytes.toBytes(1L), Bytes.toBytes(10L), 10); }else{

admin.createTable(desc); }

boolean avail = admin.isTableAvailable(Bytes .toBytes(tableParams[0])); if (avail) {

log.info(\"--------HBase table '\" + tableParams[0] + \"' create success!----------\"); }else{

log.info(\"--------HBase table '\" + tableParams[0] + \"' create fail!----------\"); } }

} catch (Exception e) { e.printStackTrace(); }

13

}

/**

* @Title: getHTableDescriptor * @Description: TODO * @param * @return * @throws */

private HTableDescriptor getHTableDescriptor(String[] tableParams) {

HTableDescriptor desc = new HTableDescriptor( Bytes.toBytes(tableParams[0])); //是否需要region拆分

if(tableParams[3].equals(\"true\")){

desc.setValue(HTableDescriptor.SPLIT_POLICY,

KeyPrefixRegionSplitPolicy.class.getName()); desc.setValue(\"prefix_split_key_policy.prefix_length\", \"1\"); }

return desc; }

/**

* @Title: getHColumnDescriptor * @Description: TODO * @param * @return * @throws */

private HColumnDescriptor getHColumnDescriptor(String[] tableParams) {

HColumnDescriptor coldef = new HColumnDescriptor( Bytes.toBytes(tableParams[1])); //默认保存最新版本

coldef.setMaxVersions(1); //是否压缩

if(tableParams[4].equals(\"true\")){

coldef.setCompressionType(Algorithm.LZO); }

//是否自动作废

if(StringUtils.isNotBlank(tableParams[2])){

coldef.setTimeToLive(Integer.valueOf(tableParams[2]));

14

}

}

}

//鉴于数据小、查询量大的情况,将调控子规则数据放在缓存中 if(tableParams[5].equals(\"true\")){ coldef.setInMemory(true); }

return coldef;

InitHBaseTable.properties

#init hbase table, for example:

#TEST/BASEINFO/60,test is tablename,BASEINFO is colfam,60 is life time unit second

#tablename/colfamily/autodelete/split/compression/InMemory table=T_INCREMENT_TABLE/COUNT/ /false/false/false

table=T_OPERATELOG_TABLE/BASEINFO/ /false/false/false table=T_SUB_XCONTROL_RULE/BASEINFO/ /true/false/false table=T_SUB_YCONTROL_RULE/BASEINFO/ /true/false/false table=T_XCONTROL/BASEINFO/ /false/false/false table=T_YCONTROL/BASEINFO/ /false/false/false

table=T_XCONTROL_RULE/BASEINFO/ /false/false/false table=T_YCONTROL_RULE/BASEINFO/ /false/false/false

table=T_FARE_CONTROL_STORE_TABLE/BASEINFO/ /false/false/false table=FARE_STORE_TABLE/BASEINFO/3600/false/false/false table=PK_XCONTROL/BASEINFO/ /false/false/false table=PK_YCONTROL/BASEINFO/ /false/false/false

HBaseRowKeyUtil HBase主键生成工具

public class HBaseRowKeyUtil {

private static String INCREMENT_TABLE=\"T_INCREMENT_TABLE\";

/**

* 主键生成规则:400000000000-yyyymmdd-count * @return

* @throws IOException */

public static String getRowKey(String tableName) throws IOException{

HTableInterface

hTableInterface=HTablePoolUtil.getHTablePoolUtil().getHTable(INCREMENT_TABLE);

15

String fcount = null; String prefix = null; if(null != tableName){

synchronized (tableName) { long

count=hTableInterface.incrementColumnValue(Bytes.toBytes(\"1\"),

Bytes.toBytes(HBaseTable.IncrementFamily.COUNT.getValue()) , Bytes.toBytes(tableName), 0L);

hTableInterface.incrementColumnValue(Bytes.toBytes(\"1\"),

Bytes.toBytes(HBaseTable.IncrementFamily.COUNT.getValue()) , Bytes.toBytes(tableName), 1L); fcount = String.format(\"%04d\", count); prefix = getPrefix(tableName); } }

HTablePoolUtil.getHTablePoolUtil().putHTable(INCREMENT_TABLE); Long systemDate = 400000000000l - Long.valueOf(Utils.getSystemDate(\"\")+fcount); return prefix+systemDate; }

private static String getPrefix(String tableName){

if(tableName.equals(HBaseTable.HBaseTableName.XCONTROL_TABLE .getValue())) return \"X\"; else

if(tableName.equals(HBaseTable.HBaseTableName.YCONTROL_TABLE .getValue())) return \"Y\"; else

if(tableName.equals(HBaseTable.HBaseTableName.XCONTROL_CHANNEL_TABLE

.getValue())) return \"XC\"; else

if(tableName.equals(HBaseTable.HBaseTableName.SUB_XCONTROL_RULE_TABLE

.getValue())){

return Utils.getRandom()+\"SX\"; }else

if(tableName.equals(HBaseTable.HBaseTableName.SUB_YCONTROL_RULE_TABLE

.getValue())){

16

return Utils.getRandom()+\"SY\"; }

else return tableName;

}

PKXControlDaoHBaseImpl

/**

* @ClassName: PKXControlDaoHBaseImpl

* @Description: 对调控文件建立主键(递减)和外键(递增)的关联,用于排序和列表显示用

* 对于实体表:存储的时候用的rowkey是主键 * 对于排序表:存储的时候用的rowkey是外键 * 列表显示的时候用外键代替主键

* 列表查询的时候用主键代替外键去实体表查询 * @author lixiangjing

* @date 2014-8-1 下午3:55:15 * */

@Component

public class PKXControlDaoHBaseImpl extends HBaseDaoImpl implements PKXControlDao {

private static Logger log = Logger.getLogger(PKXControlDaoHBaseImpl.class); private final static String PKTableName = HBaseTableName.PK_XCONTROL_TABLE.getValue();

private final static String family = \"BASEINFO\";

@Override

public void addXControlFK(String PK) throws

Exception{

String FK = Utils.getFK(PK);

Put put = new Put(Bytes.toBytes(FK)); put.add(Bytes.toBytes(family), Bytes.toBytes(\"PK\"), Bytes.toBytes(PK));

savePut(PKTableName, put); }

@Override

17

public void deleteXControlFK(String PK) throws

Exception{

String FK = Utils.getFK(PK);

Delete del = new Delete(Bytes.toBytes(FK)); delete(PKTableName, del); }

@Override

public String getXControlFK(String PK) throws

Exception{

return Utils.getFK(PK); } /**

* 模糊查询 */

@Override

public List getXControlPKs(String FK) throws

Exception{

Filter rowFilter = FilterHelper.getRowRegexStringFilter(FK); ResultScanner resultScanner = get(PKTableName, rowFilter); List list = new ArrayList(); for(Result result : resultScanner){ String pk = Bytes.toString(result.getValue(Bytes.toBytes(family), Bytes.toBytes(\"PK\"))); list.add(pk); }

return list; }

@Override

18

public String getXControlPK(String FK) throws

Exception{

return Utils.getPK(FK); } }

HBaseConvetorUtil 实体转换工具类

public class HBaseConvetorUtil { /**

* @Title: convetor

* @Description: 传入hbase返回结果值,返回实例集合 * @param * @return * @throws */

public static List convetor(Class cla,ResultScanner resultScanner) throws Exception{

List list = new ArrayList(); for (Result result : resultScanner) {

Field [] fileds=cla.getDeclaredFields(); T t = cla.newInstance(); for(Field field:fileds){

field.setAccessible(true);

String fileName=field.getName();

if(result.containsColumn(Bytes.toBytes(\"BASEINFO\"), Bytes.toBytes(fileName.toUpperCase()))){

if(result.getValue(Bytes.toBytes(\"BASEINFO\"), Bytes.toBytes(fileName.toUpperCase())).length==0){ continue; }

String

value=Bytes.toString(result.getValue(Bytes.toBytes(\"BASEINFO\"), Bytes.toBytes(fileName.toUpperCase()))); field.set(t, value); } }

list.add(t); }

return list;

19

} /**

* @Title: convetor

* @Description: 传入hbase返回结果值,返回实例 * @param * @return * @throws */ public static T convetor(Class cla,Result result) throws Exception{

Field [] fileds=cla.getDeclaredFields(); T t = cla.newInstance(); for(Field field:fileds){

field.setAccessible(true);

String fileName=field.getName();

if(result.containsColumn(Bytes.toBytes(\"BASEINFO\"), Bytes.toBytes(fileName.toUpperCase()))){

if(result.getValue(Bytes.toBytes(\"BASEINFO\"), Bytes.toBytes(fileName.toUpperCase())).length==0){ continue; }

String

value=Bytes.toString(result.getValue(Bytes.toBytes(\"BASEINFO\"), Bytes.toBytes(fileName.toUpperCase()))); field.set(t, value); } }

return t; } /**

* @Title: convetor

* @Description: 传入保存实例和主键ID,返回PutDelete * @param * @return * @throws */

public static PutDelete convetor(T t,String id) throws Exception {

Put put=new Put(Bytes.toBytes(id));

Delete delete=new Delete(Bytes.toBytes(id));

Field [] fileds=t.getClass().getDeclaredFields(); for(Field field:fileds){

20

field.setAccessible(true);

String fieldName=field.getName(); Object value = field.get(t); if(null==value){

delete.deleteColumn(Bytes.toBytes(\"BASEINFO\"), Bytes.toBytes(fieldName.toUpperCase())); continue; }

put.add(Bytes.toBytes(\"BASEINFO\"), Bytes.toBytes(fieldName.toUpperCase()), Bytes.toBytes((String)value)); }

PutDelete putdelete = new PutDelete(); putdelete.setPut(put);

putdelete.setDelete(delete); return putdelete; }

}

HBaseDaoImpl HBase基础类

private static Logger log = Logger.getLogger(HBaseDaoImpl.class);

public Configuration conf=HBaseConfiguration.create();

private PageLastRowCache lastRowCache = null; /**

* 取得某个表 *

* @param tableName * @return

* @throws Exception */

private HTableInterface getHTable(String tableName) {

return

HTablePoolUtil.getHTablePoolUtil().getHTable(tableName); }

@Override

21

public void savePut(String tableName,Put put) throws IOException {

HTableInterface hTable=getHTable(tableName); hTable.put(put);

HTablePoolUtil.getHTablePoolUtil().putHTable(tableName); }

@Override

public void savePut(String tableName, List puts) throws IOException {

HTableInterface hTable=getHTable(tableName); hTable.put(puts);

HTablePoolUtil.getHTablePoolUtil().putHTable(tableName); } /**

* @Title: savePutDelete * @Description: TODO * @param * @return * @throws */

public void savePutDelete(String tableName, PutDelete putdelete) throws IOException {

HTableInterface hTable=getHTable(tableName); hTable.setAutoFlush(false);

hTable.put(putdelete.getPut());

hTable.delete(putdelete.getDelete()); hTable.flushCommits();

HTablePoolUtil.getHTablePoolUtil().putHTable(tableName); }

22

public void savePutDelete(String tableName,

List pds) throws IOException {

HTableInterface hTable=getHTable(tableName); hTable.setAutoFlush(false);

for(int i=0; ihTable.delete(pds.get(i).getDelete()); }

hTable.flushCommits();

HTablePoolUtil.getHTablePoolUtil().putHTable(tableName); }

@Override

public void delete(String tableName, Delete del) throws IOException {

HTableInterface hTable=getHTable(tableName); hTable.delete(del);

HTablePoolUtil.getHTablePoolUtil().putHTable(tableName); }

@Override

public void delete(String tableName, String id) throws IOException {

Delete del = new Delete(Bytes.toBytes(id)); HTableInterface hTable=getHTable(tableName); hTable.delete(del);

HTablePoolUtil.getHTablePoolUtil().putHTable(tableName); }

23

public void delete(String tableName, List dels) throws IOException {

HTableInterface hTable=getHTable(tableName); hTable.delete(dels);

HTablePoolUtil.getHTablePoolUtil().putHTable(tableName); }

@Override

public T get(Class cla, String tableName, String id) throws Exception {

}

//封装结果集

Result result = get(tableName, id);

T t = HBaseConvetorUtil.convetor(cla, result); return t;

public Result get(String tableName, String id) throws IOException {

HTableInterface hTable=getHTable(tableName); Get get = new Get(Bytes.toBytes(id)); Result result = hTable.get(get);

HTablePoolUtil.getHTablePoolUtil().putHTable(tableName); return result; }

@Override

24

public List getList(Class cla, String tableName,Filter valueFilter) throws Exception {

//封装结果集 List list;

ResultScanner resultScanner = null; try {

resultScanner = get(tableName, valueFilter);

list = HBaseConvetorUtil.convetor(cla, resultScanner); } finally{

if(null!=resultScanner) resultScanner.close(); }

return list; }

public ResultScanner get(String tableName, Filter valueFilter) throws IOException {

HTableInterface hTable=getHTable(tableName); Scan scan=new Scan(); if(valueFilter!=null)

scan.setFilter(valueFilter); /*设置缓存*/

scan.setCaching(StaticConfig.getIControl_hbase_cache()); ResultScanner scanner= hTable.getScanner(scan);

HTablePoolUtil.getHTablePoolUtil().putHTable(tableName); return scanner; }

@Override

25

public List getList(Class cla, String tableName,Filter valueFilter,Integer currteIndex) throws Exception {

//封装结果集 List list;

ResultScanner resultScanner = null; try {

resultScanner = get(tableName, valueFilter, currteIndex);

list = HBaseConvetorUtil.convetor(cla, resultScanner); } finally{

if(null!=resultScanner) resultScanner.close(); }

return list; }

/**

* @Title: get

* @Description: 获取当前页结果 * @param currteIndex当前页 * @return * @throws */

private ResultScanner get(String tableName, Filter valueFilter,Integer currteIndex) throws IOException { //从缓存中获取上一次lastRow

String lastRow = getLastRow(tableName, currteIndex, valueFilter);

log.info(\"--------------------- lastRow : \" + lastRow);

ResultScanner scanner= get(tableName, valueFilter, lastRow);

//保存该次查询lastRow到缓存

putLastRow(tableName,currteIndex,valueFilter);

return scanner; }

26

private ResultScanner get(String tableName, Filter valueFilter,String lastRow) throws IOException { HTableInterface hTable=getHTable(tableName); Scan scan=new Scan();

scan.setFilter(valueFilter); /*设置缓存*/

scan.setCaching(StaticConfig.getIControl_hbase_cache()); if(StringUtils.isNotBlank(lastRow)){

scan.setStartRow(getStartRow(lastRow)); }

ResultScanner scanner= hTable.getScanner(scan);

HTablePoolUtil.getHTablePoolUtil().putHTable(tableName); return scanner; }

//获得表总数

public Long getTotal(String tableName){

return getTotal(tableName,null); }

//获得符合条件结果总数

public valueFilter){

Long getTotal(String tableName, Filter

Scan scan=new Scan(); if(null!=valueFilter){

scan.setFilter(valueFilter); }

AggregationClient aggregationClient = new AggregationClient(conf); long rowCount = 0; try {

scan.addColumn(Bytes.toBytes(\"BASEINFO\"), null);//必须有此句,或者用addFamily(),否则出错,异常包含 ci **** rowCount = aggregationClient.rowCount(Bytes.toBytes(tableName), null,

27

scan);

} catch (Throwable e) { e.printStackTrace(); }

return rowCount; }

private byte[] getStartRow(String lastRow){ String[] split = lastRow.split(\"/\"); byte[] temRow = new byte[split.length]; for(int i=0; itemRow[i] = Byte.valueOf(split[i]); }

byte[] POSTFIX = Bytes.toBytes(0);

byte[] startRow = Bytes.add(temRow, POSTFIX); return startRow; }

//存放lastRow private void putLastRow(String tableName, Integer currteIndex, Filter fl) throws IOException {

String lastRow = getLastRow(tableName, currteIndex, fl); ResultScanner resultScanner = get(tableName, fl,lastRow); byte[] currentRow = null;

for (Result result : resultScanner) { currentRow = result.getRow(); }

getPageLastRowCache().putLastRow(tableName,currteIndex,resetLastRow(currentRow)); }

//存放lastRow private void putLastRow(String tableName, Integer currteIndex, byte[] currentRow) {

if(null!=currentRow && currentRow.length>0){

String currentRowStr = resetLastRow(currentRow);

getPageLastRowCache().putLastRow(tableName,currteIndex,currentRowStr); } }

28

//获取lastRow

private String getLastRow(String tableName, Integer currteIndex, Filter fl) throws IOException { if(currteIndex.equals(1))return null;

else if(getPageLastRowCache().isContainsKey(tableName, currteIndex))return

getPageLastRowCache().getLastRow(tableName,currteIndex); else{

Integer startIndex = getStartIndex(tableName, currteIndex);

return getLastestRow(tableName, fl, startIndex, currteIndex); } }

//获取有缓存的最近一页作为开始页

private Integer getStartIndex(String tableName, Integer currteIndex){

for(int i=currteIndex; i>0; i--){

if(getPageLastRowCache().isContainsKey(tableName, i)){

return i; } }

return 1; } /**

* @Title: getEndRow * @Description:

* 1、有缓存的最近一页作为开始页 * 2、一页一页的获取lastRow,并缓存

* 3、最后获取的lastRow为当前查询需要的lastRow * @param * @return * @throws */

private String getLastestRow(String tableName, Filter fl, Integer startIndex, Integer currteIndex) throws IOException{ String endRow = null;

for(int i=startIndex; iResultScanner resultScanner = get(tableName, fl, lastRow);

29

byte[] currentRow = null;

for (Result result : resultScanner) { currentRow = result.getRow();

endRow = resetLastRow(currentRow);; }

putLastRow(tableName,i,currentRow); }

return endRow; }

private String resetLastRow(byte[] lastRow){

if(null==lastRow)return \"\";

String temLastRow = \"\";

for(byte row : lastRow){

temLastRow = temLastRow + \"/\" + row; }

return temLastRow.substring(1, temLastRow.length()); }

private PageLastRowCache getPageLastRowCache(){ if(null==lastRowCache){

lastRowCache = new PageLastRowCache(); }

return lastRowCache; }

public long getCount(String tableName,String

rowId,String family,String qualifier,long amount) throws IOException{

long

count=getHTable(tableName).incrementColumnValue(Bytes.toBytes(rowId), Bytes.toBytes(family), Bytes.toBytes(qualifier), amount,true);

HTablePoolUtil.getHTablePoolUtil().putHTable(tableName); return count;

30

}

}

HTablePoolUtil 表实例池

public class HTablePoolUtil {

private static HTablePoolUtil pooUtil=new HTablePoolUtil();

private static HTablePool pools;

private HTablePoolUtil(){

pools = new HTablePool(HBaseConfiguration.create(), StaticConfig.getIControl_tablePool()); }

public static HTablePoolUtil getHTablePoolUtil(){ if(pooUtil==null){

pooUtil=new HTablePoolUtil(); }

return pooUtil; }

public HTableInterface getHTable(String tableName){ return pools.getTable(tableName); } /**

* 释放资源

* @param tableName * @throws IOException */

public void putHTable(String tableName) throws IOException{ pools.closeTablePool(tableName); } }

PageLastRowCache 分页缓存工具类

/** *

* @ClassName: PageLastRowCache

31

* @Description: 缓存每次查询的lastRow,作为下次查询的startRow * @author lixiangjing

* @date 2014-5-26 下午3:18:28 * */

public class PageLastRowCache {

private Map> manager = null;

@SuppressWarnings(\"unchecked\") public PageLastRowCache(){ manager = (Map>) Utils.getSessionAttribute(\"LastRowCache\"); if(null==manager){

manager = createLastRowCache(); } }

private Map> createLastRowCache() {

Map> manager = new ConcurrentHashMap>();

for(HBaseTableName table : HBaseTableName.values()){ String tableName = table.getValue(); if(!manager.containsKey(tableName)){ Map cache = new ConcurrentHashMap();

manager.put(tableName, cache); } }

return manager; }

public boolean isContainsKey(String tableName, Integer currteIndex){ return manager.get(tableName).containsKey(currteIndex-1); }

public String getLastRow(String tableName, Integer currteIndex) {

return manager.get(tableName).get(currteIndex-1); }

public void putLastRow(String tableName, Integer currteIndex,

32

}

String currentRowStr) {

manager.get(tableName).put(currteIndex, currentRowStr); refreshSession(); }

public void refreshSession(){

Utils.setSessionAttribute(\"LastRowCache\", manager); }

Utils

public class Utils {

public static Object getSessionAttribute(String name) { return WebUtils.getSessionAttribute(getRequest(), name);

}

public static void setSessionAttribute(String name, Object value) {

WebUtils.setSessionAttribute(getRequest(), name, value); } /** *

* @Title: formatPrice * @Description: 8位整数 * @param * @return * @throws */

public static String formatPrice(String price){ if(StringUtils.isBlank(price))return null;

DecimalFormat nf = new DecimalFormat(\"00000000\"); return nf.format(Double.valueOf(price)); } /** *

* @Title: formatPoint

* @Description: 格式化点数,前面三位不够补零,后面2位不够也补零 * @param * @return * @throws */

public static String formatPoint(String point){ if(StringUtils.isBlank(point))return null;

33

DecimalFormat nf = new DecimalFormat(\"000.00\"); return nf.format(Double.valueOf(point)); } }

PageInfo 分页

@SuppressWarnings(\"serial\")

public class PageInfo implements Serializable{ private Integer page; private Long total;

private List rows; private static Integer pageSize=10; private static Integer currteIndex=1;

public PageInfo(List rows){ this.rows=rows; }

public PageInfo(Long total, List rows){ this.total=total; this.rows=rows; }

public PageInfo(Long total,List rows,Integer page){

this.page=page; this.total=total; this.rows=rows; }

public PageInfo(Integer page,Long total, List rows,Integer pageSize,Integer currteIndex){ this.page=page; this.total=total; this.rows=rows; }

public Integer getPage() { return page; }

public void setPage(Integer page) { this.page = page;

34

}

public Long getTotal() { return total; }

public void setTotal(Long total) { this.total = total; }

public List getRows() { return rows; }

public void setRows(List rows) { this.rows = rows; }

public static Integer getPageSize() {

if(StringUtils.isNotBlank(Utils.getRequest().getParameter(\"rows\"))){

return

Integer.valueOf(Utils.getRequest().getParameter(\"rows\")); }

return pageSize; }

public static void setPageSize(Integer pageSize) { PageInfo.pageSize = pageSize; }

public static Integer getCurrteIndex() {

if(StringUtils.isNotBlank(Utils.getRequest().getParameter(\"page\"))){

return

Integer.valueOf(Utils.getRequest().getParameter(\"page\"))>0?Integer.valueOf(Utils.getRequest().getParameter(\"page\")):currteIndex; }

return currteIndex; }

35

}

public static void setCurrteIndex(Integer currteIndex) { PageInfo.currteIndex = currteIndex; }

FilterHelper 过滤帮助类

public class FilterHelper {

private static String family = \"BASEINFO\"; /** *

* @Title: getLESS_EQUAL_GREATERFilter * @Description: 获取范围比较过滤器 * @param * @return * @throws */

public static Filter startField,

getLESS_EQUAL_GREATERFilter(String

String endField, String fieldValue, boolean isFilterNull){

FilterList list = new FilterList(Operator.MUST_PASS_ALL); list.addFilter(getLESS_OR_EQUALFilter(startField, fieldValue, isFilterNull));

list.addFilter(getGREATER_OR_EQUALFilter(endField, fieldValue, isFilterNull)); return list; } /**

* @Title: getLESS_OR_EQUALFilter * @Description: 获取小于等比较器 * @param * @return * @throws */

36

public static Filter getLESS_OR_EQUALFilter(String

field, String fieldValue, boolean isFilterNull){

BinaryComparator comparator = BinaryComparator(Bytes.toBytes(fieldValue)); SingleColumnValueFilter filter = SingleColumnValueFilter(

Bytes.toBytes(family), Bytes.toBytes(field),

CompareFilter.CompareOp.LESS_OR_EQUAL, comparator);

filter.setFilterIfMissing(isFilterNull); return filter; } /**

* @Title: getGREATER_OR_EQUALFilter * @Description: 获取大于等比较器 * @param * @return * @throws */

public static

getGREATER_OR_EQUALFilter(String field,

fieldValue, boolean isFilterNull){

BinaryComparator comparator = BinaryComparator(Bytes.toBytes(fieldValue)); SingleColumnValueFilter filter = SingleColumnValueFilter(

Bytes.toBytes(family), Bytes.toBytes(field),

CompareFilter.CompareOp.GREATER_OR_EQUAL, comparator);

filter.setFilterIfMissing(isFilterNull); return filter; } /**

37

new new

Filter String

new new

*

* @Title: getNOT_LESS_EQUAL_GREATERFilter * @Description: 获取不在范围内比较过滤器 * @param * @return * @throws */

public static Filter

getNOT_LESS_EQUAL_GREATERFilter(String startField, String endField, String fieldValue){

FilterList list = new FilterList(Operator.MUST_PASS_ONE); BinaryComparator comparator = new BinaryComparator(Bytes.toBytes(fieldValue)); SingleColumnValueFilter startfilter = new SingleColumnValueFilter(

Bytes.toBytes(family),

Bytes.toBytes(startField),

CompareFilter.CompareOp.GREATER, comparator); SingleColumnValueFilter endfilter = new SingleColumnValueFilter(

Bytes.toBytes(family), Bytes.toBytes(endField),

CompareFilter.CompareOp.LESS, comparator); list.addFilter(startfilter); list.addFilter(endfilter); return list; }

public static Filter

Colume,String

getRegexStringFilterContainsNull(String value){

RegexStringComparator comparator RegexStringComparator(value); SingleColumnValueFilter filter SingleColumnValueFilter(

Bytes.toBytes(family),

38

= =

new new

Bytes.toBytes(Colume),

CompareFilter.CompareOp.EQUAL, comparator); filter.setFilterIfMissing(false); return filter; }

public static Filter getRegexStringFilter(String

Colume,String value){

RegexStringComparator comparator RegexStringComparator(value); SingleColumnValueFilter filter SingleColumnValueFilter(

Bytes.toBytes(family), Bytes.toBytes(Colume),

CompareFilter.CompareOp.EQUAL, comparator); filter.setFilterIfMissing(true); return filter; }

= =

new new

public static Filter getNotRegexStringFilter(String

Colume,String value){

RegexStringComparator comparator RegexStringComparator(value); SingleColumnValueFilter filter SingleColumnValueFilter(

Bytes.toBytes(family), Bytes.toBytes(Colume),

CompareFilter.CompareOp.NOT_EQUAL, comparator); filter.setFilterIfMissing(true); return filter; }

= =

new new

39

public static Filter getEqualFilter(String Colume,String

value){

BinaryComparator comparator BinaryComparator(Bytes.toBytes(value)); SingleColumnValueFilter filter SingleColumnValueFilter(

Bytes.toBytes(family), Bytes.toBytes(Colume),

CompareFilter.CompareOp.EQUAL, comparator); filter.setFilterIfMissing(true); return filter; }

= =

new new

public static Filter getNotEqualFilter(String

Colume,String value){

BinaryComparator comparator BinaryComparator(Bytes.toBytes(value)); SingleColumnValueFilter filter SingleColumnValueFilter(

Bytes.toBytes(family), Bytes.toBytes(Colume),

CompareFilter.CompareOp.NOT_EQUAL, comparator); return filter; }

= =

new new

public static Filter getNullFilter(String Colume){

/*这里不能使用NullComparator,否则在获取总数的时候会出问题 NullComparator comparator = new NullComparator(); 代替方案: BinaryComparator comparator = new BinaryComparator(Bytes.toBytes(\"\"));

filter.setFilterIfMissing(false); */

BinaryComparator comparator = new BinaryComparator(Bytes.toBytes(\"\")); SingleColumnValueFilter filter = new

40

SingleColumnValueFilter(

Bytes.toBytes(family), Bytes.toBytes(Colume),

CompareFilter.CompareOp.EQUAL, comparator); filter.setFilterIfMissing(false); return filter; }

public static Filter getNotNullFilter(String Colume){

BinaryComparator comparator = BinaryComparator(Bytes.toBytes(\"\")); SingleColumnValueFilter filter =

SingleColumnValueFilter(

Bytes.toBytes(family), Bytes.toBytes(Colume),

CompareFilter.CompareOp.NOT_EQUAL, comparator); filter.setFilterIfMissing(true); return filter; } }

41

new new

因篇幅问题不能全部显示,请点此查看更多更全内容

Copyright © 2019- 版权所有