hbase0.96 put流程 源码分析
2016-02-21 19:52
381 查看
无意间多瞄了一眼hbase0.98的代码,想复习下put流程。发现htable里面已经找不到processBatchOfPuts()奇怪了。看了半天原来变化还真大事实上0.96就没这个了,于是又搞了个0.96的代码看看
之前有篇能够对照差异,请转移至:/article/1988276.html 只是排版太乱将就看吧。
HTable.java
这里writeAsyncBuffer已经替换了原来的 writeBuffer,事实上仅仅是名字不同
这里应该是backgroundFlushCommits与原来的flushCommits()差点儿相同,but跟踪进去,卧槽!,都是哪跟哪了,差异有点大。之前一行来着
connection.processBatchOfPuts(writeBuffer, tableName, pool);
1。假设当前writeAsyncBuffer不为空或者之前没运行无错误,提交writeAsyncBuffer
这个backgroundFlushCommits看了好久也没看出个啥来。仅仅好跟ap.submit(writeAsyncBuffer, true);
看到这个
的时候感觉一下子有希望了,这个应该跟之前的像吧!
定位row找到loc(HRegionLocation)
按region聚合action:addAction(loc, action, actionsByServer);
然后是sendMultiAction()
这里有几个都不是非常懂的样子
1。resubmitAll
2,receiveMultiAction
3,createCaller
直到后面在callable里的call方法里看到了responseProto = getStub().multi(controller, requestProto);这不是HRegionServer.multi()
先看callable吧,其它的慢慢再看。这个后面有 this.pool.submit(runnable)来提交运行的
callable创建
call()方法
这里主要看HRegionServer.multi()
主要代码。其它都是build PB
当中mutateRows()是PUT和DELETE相关的
之前有篇能够对照差异,请转移至:/article/1988276.html 只是排版太乱将就看吧。
HTable.java
public void put(final Put put) throws InterruptedIOException, RetriesExhaustedWithDetailsException { doPut(put); if (autoFlush) { flushCommits(); } } //批量 @Override public void put(final List<Put> puts) throws InterruptedIOException, RetriesExhaustedWithDetailsException { for (Put put : puts) { doPut(put); } if (autoFlush) { flushCommits(); } }
这里writeAsyncBuffer已经替换了原来的 writeBuffer,事实上仅仅是名字不同
private void doPut(Put put) throws InterruptedIOException, RetriesExhaustedWithDetailsException { if (ap.hasError()){ backgroundFlushCommits(true); } validatePut(put); currentWriteBufferSize += put.heapSize(); writeAsyncBuffer.add(put); while (currentWriteBufferSize > writeBufferSize) { backgroundFlushCommits(false); } }
这里应该是backgroundFlushCommits与原来的flushCommits()差点儿相同,but跟踪进去,卧槽!,都是哪跟哪了,差异有点大。之前一行来着
connection.processBatchOfPuts(writeBuffer, tableName, pool);
1。假设当前writeAsyncBuffer不为空或者之前没运行无错误,提交writeAsyncBuffer
private void backgroundFlushCommits(boolean synchronous) throws InterruptedIOException, RetriesExhaustedWithDetailsException { try { // If there is an error on the operations in progress, we don't add new operations. if (writeAsyncBuffer.size() > 0 && !ap.hasError()) { ap.submit(writeAsyncBuffer, true); } if (synchronous || ap.hasError()) { if (ap.hasError() && LOG.isDebugEnabled()) { LOG.debug(tableName + ": One or more of the operations have failed -" + " waiting for all operation in progress to finish (successfully or not)"); } ap.waitUntilDone(); } if (ap.hasError()) { if (!clearBufferOnFail) { // if clearBufferOnFailed is not set, we're supposed to keep the failed operation in the // write buffer. This is a questionable feature kept here for backward compatibility writeAsyncBuffer.addAll(ap.getFailedOperations()); } RetriesExhaustedWithDetailsException e = ap.getErrors(); ap.clearErrors(); throw e; } } finally { currentWriteBufferSize = 0; for (Row mut : writeAsyncBuffer) { if (mut instanceof Mutation) { currentWriteBufferSize += ((Mutation) mut).heapSize(); } } } }
这个backgroundFlushCommits看了好久也没看出个啥来。仅仅好跟ap.submit(writeAsyncBuffer, true);
看到这个
Map<HRegionLocation, MultiAction<Row>> actionsByServer =new HashMap<HRegionLocation, MultiAction<Row>>();
的时候感觉一下子有希望了,这个应该跟之前的像吧!
public void submit(List<? extends Row> rows, boolean atLeastOne) throws InterruptedIOException { if (rows.isEmpty()) { return; } // This looks like we are keying by region but HRegionLocation has a comparator that compares // on the server portion only (hostname + port) so this Map collects regions by server. Map<HRegionLocation, MultiAction<Row>> actionsByServer = new HashMap<HRegionLocation, MultiAction<Row>>(); List<Action<Row>> retainedActions = new ArrayList<Action<Row>>(rows.size()); do { // Wait until there is at least one slot for a new task. waitForMaximumCurrentTasks(maxTotalConcurrentTasks - 1); // Remember the previous decisions about regions or region servers we put in the // final multi. Map<String, Boolean> regionIncluded = new HashMap<String, Boolean>(); Map<ServerName, Boolean> serverIncluded = new HashMap<ServerName, Boolean>(); int posInList = -1; Iterator<? extends Row> it = rows.iterator(); while (it.hasNext()) { Row r = it.next(); HRegionLocation loc = findDestLocation(r, 1, posInList); if (loc != null && canTakeOperation(loc, regionIncluded, serverIncluded)) { // loc is null if there is an error such as meta not available. Action<Row> action = new Action<Row>(r, ++posInList); retainedActions.add(action); addAction(loc, action, actionsByServer); it.remove(); } } } while (retainedActions.isEmpty() && atLeastOne && !hasError()); HConnectionManager.ServerErrorTracker errorsByServer = createServerErrorTracker(); sendMultiAction(retainedActions, actionsByServer, 1, errorsByServer); }
定位row找到loc(HRegionLocation)
HRegionLocation loc = findDestLocation(r, 1, posInList);
按region聚合action:addAction(loc, action, actionsByServer);
//Group the actions per region server private void addAction(HRegionLocation loc, Action<Row> action, Map<HRegionLocation, MultiAction<Row>> actionsByServer) { final byte[] regionName = loc.getRegionInfo().getRegionName(); MultiAction<Row> multiAction = actionsByServer.get(loc); if (multiAction == null) { multiAction = new MultiAction<Row>(); actionsByServer.put(loc, multiAction); } multiAction.add(regionName, action); }
然后是sendMultiAction()
public void sendMultiAction(final List<Action<Row>> initialActions, Map<HRegionLocation, MultiAction<Row>> actionsByServer, final int numAttempt, final HConnectionManager.ServerErrorTracker errorsByServer) { // Send the queries and add them to the inProgress list // This iteration is by server (the HRegionLocation comparator is by server portion only). for (Map.Entry<HRegionLocation, MultiAction<Row>> e : actionsByServer.entrySet()) { final HRegionLocation loc = e.getKey(); final MultiAction<Row> multiAction = e.getValue(); incTaskCounters(multiAction.getRegions(), loc.getServerName()); Runnable runnable = Trace.wrap("AsyncProcess.sendMultiAction", new Runnable() { @Override public void run() { MultiResponse res; try { MultiServerCallable<Row> callable = createCallable(loc, multiAction); try { res = createCaller(callable).callWithoutRetries(callable); } catch (IOException e) { LOG.warn("Call to " + loc.getServerName() + " failed numAttempt=" + numAttempt + ", resubmitting all since not sure where we are at", e); resubmitAll(initialActions, multiAction, loc, numAttempt + 1, e, errorsByServer); return; } receiveMultiAction(initialActions, multiAction, loc, res, numAttempt, errorsByServer); } finally { decTaskCounters(multiAction.getRegions(), loc.getServerName()); } } }); try { this.pool.submit(runnable); } catch (RejectedExecutionException ree) { // This should never happen. But as the pool is provided by the end user, let's secure // this a little. decTaskCounters(multiAction.getRegions(), loc.getServerName()); LOG.warn("The task was rejected by the pool. This is unexpected." + " Server is " + loc.getServerName(), ree); // We're likely to fail again, but this will increment the attempt counter, so it will // finish. resubmitAll(initialActions, multiAction, loc, numAttempt + 1, ree, errorsByServer); } } }
这里有几个都不是非常懂的样子
1。resubmitAll
2,receiveMultiAction
3,createCaller
直到后面在callable里的call方法里看到了responseProto = getStub().multi(controller, requestProto);这不是HRegionServer.multi()
先看callable吧,其它的慢慢再看。这个后面有 this.pool.submit(runnable)来提交运行的
callable创建
protected MultiServerCallable<Row> createCallable(final HRegionLocation location, final MultiAction<Row> multi) { return new MultiServerCallable<Row>(hConnection, tableName, location, multi); }
call()方法
public MultiResponse call() throws IOException { int countOfActions = this.multiAction.size(); if (countOfActions <= 0) throw new DoNotRetryIOException("No Actions"); MultiRequest.Builder multiRequestBuilder = MultiRequest.newBuilder(); List<CellScannable> cells = null; // The multi object is a list of Actions by region. Iterate by region. for (Map.Entry<byte[], List<Action<R>>> e: this.multiAction.actions.entrySet()) { final byte [] regionName = e.getKey(); final List<Action<R>> actions = e.getValue(); RegionAction.Builder regionActionBuilder; if (this.cellBlock) { // Presize. Presume at least a KV per Action. There are likely more. if (cells == null) cells = new ArrayList<CellScannable>(countOfActions); // Send data in cellblocks. The call to buildNoDataMultiRequest will skip RowMutations. // They have already been handled above. Guess at count of cells regionActionBuilder = RequestConverter.buildNoDataRegionAction(regionName, actions, cells); } else { regionActionBuilder = RequestConverter.buildRegionAction(regionName, actions); } multiRequestBuilder.addRegionAction(regionActionBuilder.build()); } // Controller optionally carries cell data over the proxy/service boundary and also // optionally ferries cell response data back out again. PayloadCarryingRpcController controller = new PayloadCarryingRpcController(cells); controller.setPriority(getTableName()); ClientProtos.MultiResponse responseProto; ClientProtos.MultiRequest requestProto = multiRequestBuilder.build(); try { responseProto = getStub().multi(controller, requestProto); } catch (ServiceException e) { return createAllFailedResponse(requestProto, ProtobufUtil.getRemoteException(e)); } return ResponseConverter.getResults(requestProto, responseProto, controller.cellScanner()); }
这里主要看HRegionServer.multi()
public MultiResponse multi(final RpcController rpcc, final MultiRequest request) throws ServiceException { // rpc controller is how we bring in data via the back door; it is unprotobuf'ed data. // It is also the conduit via which we pass back data. PayloadCarryingRpcController controller = (PayloadCarryingRpcController)rpcc; CellScanner cellScanner = controller != null? controller.cellScanner(): null; if (controller != null) controller.setCellScanner(null); List<CellScannable> cellsToReturn = null; MultiResponse.Builder responseBuilder = MultiResponse.newBuilder(); for (RegionAction regionAction : request.getRegionActionList()) { this.requestCount.add(regionAction.getActionCount()); RegionActionResult.Builder regionActionResultBuilder = RegionActionResult.newBuilder(); HRegion region; try { region = getRegion(regionAction.getRegion()); } catch (IOException e) { regionActionResultBuilder.setException(ResponseConverter.buildException(e)); responseBuilder.addRegionActionResult(regionActionResultBuilder.build()); continue; // For this region it's a failure. } if (regionAction.hasAtomic() && regionAction.getAtomic()) { // How does this call happen? It may need some work to play well w/ the surroundings. // Need to return an item per Action along w/ Action index. TODO. try { mutateRows(region, regionAction.getActionList(), cellScanner); } catch (IOException e) { // As it's atomic, we may expect it's a global failure. regionActionResultBuilder.setException(ResponseConverter.buildException(e)); } } else { // doNonAtomicRegionMutation manages the exception internally cellsToReturn = doNonAtomicRegionMutation(region, regionAction, cellScanner, regionActionResultBuilder, cellsToReturn); } responseBuilder.addRegionActionResult(regionActionResultBuilder.build()); } // Load the controller with the Cells to return. if (cellsToReturn != null && !cellsToReturn.isEmpty() && controller != null) { controller.setCellScanner(CellUtil.createCellScanner(cellsToReturn)); } return responseBuilder.build(); }
主要代码。其它都是build PB
region = getRegion(regionAction.getRegion()); mutateRows(region, regionAction.getActionList(), cellScanner); cellsToReturn = doNonAtomicRegionMutation(region, regionAction, cellScanner, regionActionResultBuilder, cellsToReturn);
当中mutateRows()是PUT和DELETE相关的
protected void mutateRows(final HRegion region, final List<ClientProtos.Action> actions, final CellScanner cellScanner) throws IOException { if (!region.getRegionInfo().isMetaTable()) { cacheFlusher.reclaimMemStoreMemory(); } RowMutations rm = null; for (ClientProtos.Action action: actions) { if (action.hasGet()) { throw new DoNotRetryIOException("Atomic put and/or delete only, not a Get=" + action.getGet()); } MutationType type = action.getMutation().getMutateType(); if (rm == null) { rm = new RowMutations(action.getMutation().getRow().toByteArray()); } switch (type) { case PUT: rm.add(ProtobufUtil.toPut(action.getMutation(), cellScanner)); break; case DELETE: rm.add(ProtobufUtil.toDelete(action.getMutation(), cellScanner)); break; default: throw new DoNotRetryIOException("Atomic put and/or delete only, not " + type.name()); } } region.mutateRow(rm); }
相关文章推荐
- mysql-5.7.11-winx64.zip 安装配置
- Apache2.4 Virtual Hosts配置:模拟真实网站+同时开发多个Web项目
- Feed the dogs(划分树,据说各种树可做)
- UASCO Your Ride Is Here
- J2SE--递归调用
- CentOS,以及跟REDHAT有什么区别。
- Scala常用代码
- 关于/etc/rc.local以及/etc/init.d
- 最详细的Log4j使用教程
- Servlet的使用
- hdu 3577(线段树区间更新)
- 14. Longest Common Prefix
- 基于jquery的,简单的php 表单验证组件
- TP:CF71BDB9
- php课程---练习(联系人信息表)
- swust oj代码+解析0086/0160/0489/1175/1181/0288/0287/0616/0276/1158,0133
- 基于bootstrap的在线布局工具
- Android中Shape的使用
- 第23章 SEH结构化异常处理(1)_系统SEH机制
- Magicodes.WeiChat——V3.0(多租户)版本发布