博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
hbase scan客户端服务端流程
阅读量:6452 次
发布时间:2019-06-23

本文共 6362 字,大约阅读时间需要 21 分钟。

hbase scan客户端服务端流程

一:基础知识了解:

scanner可分为两种InternalScanner和KeyValueScanner,区别如下

1.InternalScanner,可以理解为包含其他scanner的scanner,它的主要接口为next(),作用是从其包含的scanner中获取下一个KeyValue,它的角色可以理解为雇佣KeyValueScanner

2.KeyValueScanner,从内存或文件中获取KeyValue的scanner,

大致:

scanner, RegionScanner、StoreScanner属于InternalScanner,

而MemstoreScanner、StoreFileScanner、StoreScanner属于KeyValueScanner

二:hbase中client端rpc服务定义:

service ClientService {    rpc Get (GetRequest) returns (GetResponse);    rpc Mutate (MutateRequest) returns (MutateResponse);    rpc Scan (ScanRequest) returns (ScanResponse);    rpc BulkLoadHFile (BulkLoadHFileRequest) returns (BulkLoadHFileResponse);    rpc ExecService (CoprocessorServiceRequest) returns (CoprocessorServiceResponse);    rpc ExecRegionServerService (CoprocessorServiceRequest) returns (CoprocessorServiceResponse);    rpc Multi (MultiRequest) returns (MultiResponse);}

关于PB的rpc具体实现可以看源代码。

三:直接代码跟踪了解流程。scan-client客户端源码跟踪

从客户端scan-api代码开始:

table.getScanner(scan)

然后进入HTable的getScanner(final Scan scan)方法

if (scan.getBatch() > 0 && scan.isSmall()) {        throw new IllegalArgumentException("Small scan should not be used with batching");    }    if (scan.getCaching() <= 0) {        scan.setCaching(getScannerCaching());    }    if (scan.getMaxResultSize() <= 0) {        scan.setMaxResultSize(scannerMaxResultSize);    }    if (scan.isReversed()) {        if (scan.isSmall()) {            return new ClientSmallReversedScanner(getConfiguration(), scan, getName(), this.connection, this.rpcCallerFactory, this.rpcControllerFactory, pool, connConfiguration.getReplicaCallTimeoutMicroSecondScan());        } else {            return new ReversedClientScanner(getConfiguration(), scan, getName(), this.connection, this.rpcCallerFactory, this.rpcControllerFactory, pool, connConfiguration.getReplicaCallTimeoutMicroSecondScan());        }    }    if (scan.isSmall()) {        return new ClientSmallScanner(getConfiguration(), scan, getName(), this.connection, this.rpcCallerFactory, this.rpcControllerFactory, pool, connConfiguration.getReplicaCallTimeoutMicroSecondScan());    } else {        return new ClientScanner(getConfiguration(), scan, getName(), this.connection, this.rpcCallerFactory, this.rpcControllerFactory, pool, connConfiguration.getReplicaCallTimeoutMicroSecondScan());    }

根据不同条件执行不同操作,接着进入ClientScanner的构造函数中,

先是属性的取值;再是initializeScannerInConstruction();

进入initializeScannerInConstruction(),发现是一个nextScanner(this.caching, false),初始化scanner;

进接着进入nextscanner;

关闭之前的scanner;下个scanner开始key;判断是否在表的末尾;接着:

callable = getScannerCallable(localStartKey, nbRows);        // Open a scanner on the region server starting at the        // beginning of the region        call(callable, caller, scannerTimeout);        this.currentRegion = callable.getHRegionInfo();        if (this.scanMetrics != null) {            this.scanMetrics.countOfRegions.incrementAndGet();        }

生成一个ScannerCallableWithReplicas;

然后利用生成的ScannerCallableWithReplicas通过call函数用来在RS上的region的开始打开一个scanner,扫描数据

进入call方法:

try {        callable.prepare(false);        return callable.call(callTimeout);    }

请求由ScannerCallable.call()发起,

this.scannerId = openScanner();        ScanResponse response = getStub().scan(controller, request);        long id = response.getScannerId();  scannerID存在的话:        获取服务观返回的数据结果集:                    response = getStub().scan(controller, request);                    nextCallSeq++;                    long timestamp = System.currentTimeMillis();                    setHeartbeatMessage(response.hasHeartbeatMessage() && response.getHeartbeatMessage());                    // Results are returned via controller                    CellScanner cellScanner = controller.cellScanner();                    rrs = ResponseConverter.getResults(cellScanner, response);

四:scan-server服务端源码跟踪

1.进入HRegion的getScanner(Scan scan, List<KeyValueScanner> additionalScanners, long nonceGroup, long nonce)开始

列族的判断添加及检查

然后返回初始化的RegionScanner: instantiateRegionScanner(scan, additionalScanners, nonceGroup, nonce);

实现在RegionScannerImpl中,初始化scanner

initializeScanners(Scan scan, List<KeyValueScanner> additionalScanners)

根据列族获取对应store:

Store store = stores.get(entry.getKey());                KeyValueScanner scanner;                try {                    scanner = store.getScanner(scan, entry.getValue(), this.readPt);                } catch (FileNotFoundException e) {                    throw handleFileNotFound(e);                }                instantiatedScanners.add(scanner);                if (this.filter == null || !scan.doLoadColumnFamiliesOnDemand() || this.filter.isFamilyEssential(entry.getKey())) {                    scanners.add(scanner);                } else {                    joinedScanners.add(scanner);                }            }            initializeKVHeap(scanners, joinedScanners, region);

通过memstore,snapshot,所有的storefiles打开一scanner,并且不处于compact情形;

然后为一个scan构造一个ScanQueryMatcher

然后根据ScanQueryMatcher执行

seekScanners(scanners, matcher.getStartKey(), explicitColumnQuery && lazySeekEnabledGlobally, parallelSeekEnabled);

包括定位起始row的所有scanner,布隆过滤器的检查删除标记等;

最后会将所有的scanner放入堆中;

2.接着对于客户端的scanner的next()函数服务端实现;

在storesanner中的next:

从store中获取下个row数据;

do {        // Update and check the time limit based on the configured value of cellsPerTimeoutCheck        if ((kvsScanned % cellsPerHeartbeatCheck == 0)) {            scannerContext.updateTimeProgress();            if (scannerContext.checkTimeLimit(LimitScope.BETWEEN_CELLS)) {                return scannerContext.setScannerState(NextState.TIME_LIMIT_REACHED).hasMoreValues();            }        }        if (prevCell != cell)            ++kvsScanned; // Do object compare - we set prevKV from the same heap.        checkScanOrder(prevCell, cell, comparator);        prevCell = cell;        ScanQueryMatcher.MatchCode qcode = matcher.match(cell);    其中包含:MatchCode的状态 {    INCLUDE,    SKIP,    NEXT,    DONE,    SEEK_NEXT_ROW,     SEEK_NEXT_COL,    DONE_SCAN,    SEEK_NEXT_USING_HINT,    INCLUDE_AND_SEEK_NEXT_COL,INCLUDE_AND_SEEK_NEXT_ROW;        } while ((cell = this.heap.peek()) != null);    if (count > 0) {        return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();    }    // No more keys    close();    return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();

最终返回匹配的resultsanner,中间含有数据结果集;

五:只是简单地流程跟踪,并没有写出更多的细节实现。可以自己阅读源码解读。

转载地址:http://rkwzo.baihongyu.com/

你可能感兴趣的文章
结对作业——潘学
查看>>
冷备份恢复
查看>>
Pipeline MIPS Processor MIPS processor in C++
查看>>
PHP - 引用计数
查看>>
DO.NET操作数据库
查看>>
MapReduce计算每年最大值测试样例生成程序
查看>>
MapReduce计算每年最大值
查看>>
针对微信的一篇推送附有的数据链接进行MapReduce统计
查看>>
SpingMvc +WebSocket实现简单的在线聊天
查看>>
程序员面试逻辑题解析
查看>>
模拟QQ登录
查看>>
C++的新类创建:继承与组合
查看>>
m5-第9周作业
查看>>
odoo 权限设置
查看>>
asp操作access提示“无法从指定的数据表中删除”
查看>>
git bash 风格调整
查看>>
997D Cycles in product
查看>>
bzoj4589 Hard Nim
查看>>
JSON 请求太大,无法反序列化。
查看>>
mysql gen.ci_mysql cmake 参数详解
查看>>