hbase分页应用场景及分页思路与代码实现

更新时间:2016-11-22 10:45:42 点击次数:1889次
可以带着下面问题来阅读
1.hbasef分页什么情况下比较有用?
2.hbase分页的思路是什么?
3.hbase分页代码与sql分页代码有什么区别和联系?

一、hbase分页应用场景:


一、应用场景
hbase到底需不需要分页,hbase的数据量肯定不少,肯定是需要分页的。很多人认为数量量如此大,怎么会展示。这个从客户角度来讲,我们做的系统,不可能会给机器看的。这里面我们可以对其进行统计分析,这样利于我们决策。
比如我们:平台中有一个场景是要做用户历史订单数据的查询,并且需要支持分页。这里只是举了一个场景,后面大家可以根据自己的经验。下面给大家讲讲分页的思路。





二、hbase分页思路:

hbase通过scan来扫描表,通过startKey,stopKey来确定范围,hbase官方提供了一个PageFilter来支持一次scan可以返回多少条数据即每页的行数。假如一页是10条,这样是页还好,但是第二页呢,如果不改变PageFilter的pageSize,那返回的还是页的数据,如果改变pageSize为20,则返回了页10多余的数据,在客户端要过滤掉,性能不好。那怎么办呢,方法就是在查询下一页时,指定下一页的startKey,这样PageFilter每次就不会返回多余的记录,stopKey可以不用变,那现在问题是,怎么得到下一页的startKey(即下一页行的rowkey)呢?,有两种方法来取每一页的startKey

1.  上一页的后一行记录的rowkey作为下一页的startKey。
2.   在每次scan时多取一条记录,即把下一页条行页取出来,把该行的rowkey做为下一页的startKey。

这两种方法,都要注意,hbase scan时是包含startKey的,如果是采用种,则要在记录多取一条,排除条。第二种页是多取一条,但是排除后一条,用来做下一页的startKey。还有需要注意的是在计算是否有下一页时,可以根据返回的条数来判断。

startKey怎么取没有问题了。但是怎么存储呢,有同学可能会想到存到session,但是如果你的服务是rest api型的,就没有session的概念了。那还有两种选择:
1. 是存到客户端,让客户端每次请求时把startKey再传回来,这样需要依赖客户端,如果客户端是远程,或者是开放平台的情况下,可能不合适。
2. 存在服务端,存在服务端需要注意并发访问的情况。比如scan同一个表,一个访问第2页,一个访问第3页,服务端就需要对每一个table的scan 存每一页的startKey,需要为同一个查询条件包含pageSize,因为pageSize不一样,startKey也会不一样,
在服务crash情况下,从起后都从页开始。

我自己是采用第二种方案,存在服务端.


import java.io.IOException;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.HTablePool;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.hadoop.hbase.util.Bytes;

public class HBaseUtils {
        private static Configuration config = null;
        private static HTablePool tp = null;
        static {
                // 加载集群配置
                config = HBaseConfiguration.create();
                config.set("hbase.zookeeper.quorum", "xx.xx.xx");
                config.set("hbase.zookeeper.property.clientPort", "2181");
                // 创建表池(可伟略提高查询性能,具体说明请百度或官方API)
                tp = new HTablePool(config, 10);
        }

        /*
         * 获取hbase的表
         */
        public static HTableInterface getTable(String tableName) {

                if (StringUtils.isEmpty(tableName))
                        return null;

                return tp.getTable(getBytes(tableName));
        }

        /* 转换byte数组 */
        public static byte[] getBytes(String str) {
                if (str == null)
                        str = "";

                return Bytes.toBytes(str);
        }

        /**
         * 查询数据
         * @param tableKey 表标识
         * @param queryKey 查询标识
         * @param startRow 开始行
         * @param paramsMap 参数集合
         * @return 结果集
         */
        public static TBData getDataMap(String tableName, String startRow,
                        String stopRow, Integer currentPage, Integer pageSize)
                        throws IOException {
                List<Map<String, String>> mapList = null;
                mapList = new LinkedList<Map<String, String>>();

                ResultScanner scanner = null;
                // 为分页创建的封装类对象,下面有给出具体属性
                TBData tbData = null;
                try {
                        // 获取大返回结果数量
                        if (pageSize == null || pageSize == 0L)
                                pageSize = 100;

                        if (currentPage == null || currentPage == 0)
                                currentPage = 1;

                        // 计算起始页和结束页
                        Integer firstPage = (currentPage - 1) * pageSize;

                        Integer endPage = firstPage + pageSize;

                        // 从表池中取出HBASE表对象
                        HTableInterface table = getTable(tableName);
                        // 获取筛选对象
                        Scan scan = getScan(startRow, stopRow);
                        // 给筛选对象放入过滤器(true标识分页,具体方法在下面)
                        scan.setFilter(packageFilters(true));
                        // 缓存1000条数据
                        scan.setCaching(1000);
                        scan.setCacheBlocks(false);
                        scanner = table.getScanner(scan);
                        int i = 0;
                        List<byte[]> rowList = new LinkedList<byte[]>();
                        // 遍历扫描器对象, 并将需要查询出来的数据row key取出
                        for (Result result : scanner) {
                                String row = toStr(result.getRow());
                                if (i >= firstPage && i < endPage) {
                                        rowList.add(getBytes(row));
                                }
                                i++;
                        }

                        // 获取取出的row key的GET对象
                        List<Get> getList = getList(rowList);
                        Result[] results = table.get(getList);
                        // 遍历结果
                        for (Result result : results) {
                                Map<byte[], byte[]> fmap = packFamilyMap(result);
                                Map<String, String> rmap = packRowMap(fmap);
                                mapList.add(rmap);
                        }

                        // 封装分页对象
                        tbData = new TBData();
                        tbData.setCurrentPage(currentPage);
                        tbData.setPageSize(pageSize);
                        tbData.setTotalCount(i);
                        tbData.setTotalPage(getTotalPage(pageSize, i));
                        tbData.setResultList(mapList);
                } catch (IOException e) {
                        e.printStackTrace();
                } finally {
                        closeScanner(scanner);
                }

                return tbData;
        }

        private static int getTotalPage(int pageSize, int totalCount) {
                int n = totalCount / pageSize;
                if (totalCount % pageSize == 0) {
                        return n;
                } else {
                        return ((int) n) + 1;
                }
        }

        // 获取扫描器对象
        private static Scan getScan(String startRow, String stopRow) {
                Scan scan = new Scan();
                scan.setStartRow(getBytes(startRow));
                scan.setStopRow(getBytes(stopRow));

                return scan;
        }

        /**
         * 封装查询条件
         */
        private static FilterList packageFilters(boolean isPage) {
                FilterList filterList = null;
                // MUST_PASS_ALL(条件 AND) MUST_PASS_ONE(条件OR)
                filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL);
                Filter filter1 = null;
                Filter filter2 = null;
                filter1 = newFilter(getBytes("family1"), getBytes("column1"),
                                CompareOp.EQUAL, getBytes("condition1"));
                filter2 = newFilter(getBytes("family2"), getBytes("column1"),
                                CompareOp.LESS, getBytes("condition2"));
                filterList.addFilter(filter1);
                filterList.addFilter(filter2);
                if (isPage) {
                        filterList.addFilter(new FirstKeyOnlyFilter());
                }
                return filterList;
        }

        private static Filter newFilter(byte[] f, byte[] c, CompareOp op, byte[] v) {
                return new SingleColumnValueFilter(f, c, op, v);
        }

        private static void closeScanner(ResultScanner scanner) {
                if (scanner != null)
                        scanner.close();
        }

        /**
         * 封装每行数据
         */
        private static Map<String, String> packRowMap(Map<byte[], byte[]> dataMap) {
                Map<String, String> map = new LinkedHashMap<String, String>();

                for (byte[] key : dataMap.keySet()) {

                        byte[] value = dataMap.get(key);

                        map.put(toStr(key), toStr(value));

                }
                return map;
        }

        /* 根据ROW KEY集合获取GET对象集合 */
        private static List<Get> getList(List<byte[]> rowList) {
                List<Get> list = new LinkedList<Get>();
                for (byte[] row : rowList) {
                        Get get = new Get(row);

                        get.addColumn(getBytes("family1"), getBytes("column1"));
                        get.addColumn(getBytes("family1"), getBytes("column2"));
                        get.addColumn(getBytes("family2"), getBytes("column1"));
                        list.add(get);
                }
                return list;
        }

        /**
         * 封装配置的所有字段列族
         */
        private static Map<byte[], byte[]> packFamilyMap(Result result) {
                Map<byte[], byte[]> dataMap = null;
                dataMap = new LinkedHashMap<byte[], byte[]>();
                dataMap.putAll(result.getFamilyMap(getBytes("family1")));
                dataMap.putAll(result.getFamilyMap(getBytes("family2")));
                return dataMap;
        }

        private static String toStr(byte[] bt) {
                return Bytes.toString(bt);
        }

        public static void main(String[] args) throws IOException {
                // 拿出row key的起始行和结束行
                // #<0<9<:
                String startRow = "aaaa#";
                String stopRow = "aaaa:";
                int currentPage = 1;
                int pageSize = 20;
                // 执行hbase查询
                getDataMap("table", startRow, stopRow, currentPage, pageSize);

        }
}

class TBData {
        private Integer currentPage;
        private Integer pageSize;
        private Integer totalCount;
        private Integer totalPage;
        private List<Map<String, String>> resultList;

        public Integer getCurrentPage() {
                return currentPage;
        }

        public void setCurrentPage(Integer currentPage) {
                this.currentPage = currentPage;
        }

        public Integer getPageSize() {
                return pageSize;
        }

        public void setPageSize(Integer pageSize) {
                this.pageSize = pageSize;
        }

        public Integer getTotalCount() {
                return totalCount;
        }

        public void setTotalCount(Integer totalCount) {
                this.totalCount = totalCount;
        }

        public Integer getTotalPage() {
                return totalPage;
        }

        public void setTotalPage(Integer totalPage) {
                this.totalPage = totalPage;
        }

        public List<Map<String, String>> getResultList() {
                return resultList;
        }

        public void setResultList(List<Map<String, String>> resultList) {
                this.resultList = resultList;
        }
}

本站文章版权归原作者及原出处所有 。内容为作者个人观点, 并不代表本站赞同其观点和对其真实性负责,本站只提供参考并不构成任何投资及应用建议。本站是一个个人学习交流的平台,网站上部分文章为转载,并不用于任何商业目的,我们已经尽可能的对作者和来源进行了通告,但是能力有限或疏忽,造成漏登,请及时联系我们,我们将根据著作权人的要求,立即更正或者删除有关内容。本站拥有对此声明的最终解释权。

回到顶部
嘿,我来帮您!