本文共 6362 字,大约阅读时间需要 21 分钟。
hbase scan客户端服务端流程
scanner可分为两种InternalScanner和KeyValueScanner,区别如下
1.InternalScanner,可以理解为包含其他scanner的scanner,它的主要接口为next(),作用是从其包含的scanner中获取下一个KeyValue,它的角色可以理解为雇佣KeyValueScanner
2.KeyValueScanner,从内存或文件中获取KeyValue的scanner,
大致:
scanner, RegionScanner、StoreScanner属于InternalScanner,
而MemstoreScanner、StoreFileScanner、StoreScanner属于KeyValueScanner
service ClientService { rpc Get (GetRequest) returns (GetResponse); rpc Mutate (MutateRequest) returns (MutateResponse); rpc Scan (ScanRequest) returns (ScanResponse); rpc BulkLoadHFile (BulkLoadHFileRequest) returns (BulkLoadHFileResponse); rpc ExecService (CoprocessorServiceRequest) returns (CoprocessorServiceResponse); rpc ExecRegionServerService (CoprocessorServiceRequest) returns (CoprocessorServiceResponse); rpc Multi (MultiRequest) returns (MultiResponse);}
关于PB的rpc具体实现可以看源代码。
从客户端scan-api代码开始:
table.getScanner(scan)
然后进入HTable的getScanner(final Scan scan)方法
if (scan.getBatch() > 0 && scan.isSmall()) { throw new IllegalArgumentException("Small scan should not be used with batching"); } if (scan.getCaching() <= 0) { scan.setCaching(getScannerCaching()); } if (scan.getMaxResultSize() <= 0) { scan.setMaxResultSize(scannerMaxResultSize); } if (scan.isReversed()) { if (scan.isSmall()) { return new ClientSmallReversedScanner(getConfiguration(), scan, getName(), this.connection, this.rpcCallerFactory, this.rpcControllerFactory, pool, connConfiguration.getReplicaCallTimeoutMicroSecondScan()); } else { return new ReversedClientScanner(getConfiguration(), scan, getName(), this.connection, this.rpcCallerFactory, this.rpcControllerFactory, pool, connConfiguration.getReplicaCallTimeoutMicroSecondScan()); } } if (scan.isSmall()) { return new ClientSmallScanner(getConfiguration(), scan, getName(), this.connection, this.rpcCallerFactory, this.rpcControllerFactory, pool, connConfiguration.getReplicaCallTimeoutMicroSecondScan()); } else { return new ClientScanner(getConfiguration(), scan, getName(), this.connection, this.rpcCallerFactory, this.rpcControllerFactory, pool, connConfiguration.getReplicaCallTimeoutMicroSecondScan()); }
根据不同条件执行不同操作,接着进入ClientScanner的构造函数中,
先是属性的取值;再是initializeScannerInConstruction();
进入initializeScannerInConstruction(),发现是一个nextScanner(this.caching, false),初始化scanner;
进接着进入nextscanner;
关闭之前的scanner;下个scanner开始key;判断是否在表的末尾;接着:
callable = getScannerCallable(localStartKey, nbRows); // Open a scanner on the region server starting at the // beginning of the region call(callable, caller, scannerTimeout); this.currentRegion = callable.getHRegionInfo(); if (this.scanMetrics != null) { this.scanMetrics.countOfRegions.incrementAndGet(); }
生成一个ScannerCallableWithReplicas;
然后利用生成的ScannerCallableWithReplicas通过call函数用来在RS上的region的开始打开一个scanner,扫描数据
进入call方法:
try { callable.prepare(false); return callable.call(callTimeout); }
请求由ScannerCallable.call()发起,
this.scannerId = openScanner(); ScanResponse response = getStub().scan(controller, request); long id = response.getScannerId(); scannerID存在的话: 获取服务观返回的数据结果集: response = getStub().scan(controller, request); nextCallSeq++; long timestamp = System.currentTimeMillis(); setHeartbeatMessage(response.hasHeartbeatMessage() && response.getHeartbeatMessage()); // Results are returned via controller CellScanner cellScanner = controller.cellScanner(); rrs = ResponseConverter.getResults(cellScanner, response);
列族的判断添加及检查
然后返回初始化的RegionScanner: instantiateRegionScanner(scan, additionalScanners, nonceGroup, nonce);
实现在RegionScannerImpl中,初始化scanner
initializeScanners(Scan scan, List<KeyValueScanner> additionalScanners)
根据列族获取对应store:
Store store = stores.get(entry.getKey()); KeyValueScanner scanner; try { scanner = store.getScanner(scan, entry.getValue(), this.readPt); } catch (FileNotFoundException e) { throw handleFileNotFound(e); } instantiatedScanners.add(scanner); if (this.filter == null || !scan.doLoadColumnFamiliesOnDemand() || this.filter.isFamilyEssential(entry.getKey())) { scanners.add(scanner); } else { joinedScanners.add(scanner); } } initializeKVHeap(scanners, joinedScanners, region);
通过memstore,snapshot,所有的storefiles打开一scanner,并且不处于compact情形;
然后为一个scan构造一个ScanQueryMatcher
然后根据ScanQueryMatcher执行
seekScanners(scanners, matcher.getStartKey(), explicitColumnQuery && lazySeekEnabledGlobally, parallelSeekEnabled);
包括定位起始row的所有scanner,布隆过滤器的检查删除标记等;
最后会将所有的scanner放入堆中;
在storesanner中的next:
从store中获取下个row数据;
do { // Update and check the time limit based on the configured value of cellsPerTimeoutCheck if ((kvsScanned % cellsPerHeartbeatCheck == 0)) { scannerContext.updateTimeProgress(); if (scannerContext.checkTimeLimit(LimitScope.BETWEEN_CELLS)) { return scannerContext.setScannerState(NextState.TIME_LIMIT_REACHED).hasMoreValues(); } } if (prevCell != cell) ++kvsScanned; // Do object compare - we set prevKV from the same heap. checkScanOrder(prevCell, cell, comparator); prevCell = cell; ScanQueryMatcher.MatchCode qcode = matcher.match(cell); 其中包含:MatchCode的状态 { INCLUDE, SKIP, NEXT, DONE, SEEK_NEXT_ROW, SEEK_NEXT_COL, DONE_SCAN, SEEK_NEXT_USING_HINT, INCLUDE_AND_SEEK_NEXT_COL,INCLUDE_AND_SEEK_NEXT_ROW; } while ((cell = this.heap.peek()) != null); if (count > 0) { return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues(); } // No more keys close(); return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
最终返回匹配的resultsanner,中间含有数据结果集;
转载地址:http://rkwzo.baihongyu.com/