hadoop运行原理之Job运行(五) 任务调度
2014-11-09 17:54
525 查看
接着上篇来说。hadoop首先调度辅助型task(job-cleanup task、task-cleanup task和job-setup task),这是由JobTracker来完成的;但对于计算型task,则是由作业调度器TaskScheduler来分配的,其默认实现为JobQueueTaskScheduler。具体过程在assignTasks()方法中完成,下面来一段一段的分析该方法。
首先检查是否处于安全模式;接着分别获取该TaskTracker的状态信息、集群状态信息、集群中的TaskTracker数目、集群能运行的最大Map Task个数和Reduce Task个数;再选择一个作业队列,对该队列中的作业进行调度。
这4行分别是获取Map和Reduce的slot,然后是获取当前TaskTracker上正在运行的Map和Reduce task数目;最后一行的集合用来存放分配给该TaskTracker的task。
该段代码用来计算作业队列中还有多少Map和Reduce task需要运行。job.desiredMaps()方法用来计算该Job总共有多少个Map task。job.finishedMaps()方法用来计算该Job有多少个已完成的Map task。同理,job.desiredReduces()方法与job.finishedReduces()方法用来计算Reduce。
用来计算Map和Reduce task的装载百分比,即根据剩余需要运行的Map task和集群能运行的最大Map Task个数的比例,来为TaskTracker计算一个装载因子,使得该TaskTracker上的Map task个数不超过这个比例。Reduce也一样。
第一行根据上一步计算出来的Map task装载因子,计算当前结点能够运行的Map task个数;第二行计算剩余的能够运行Map task的slot个数availableMapSlots。如果availableMapSlots大于0表示还有余地运行Map task。Hadoop不会把所有的slot 都分配完,而是会留一些slot给失败的和推测执行的任务,exceededPadding()方法就是来完成这个任务的。
以上这部分就是分配Map task的过程。obtainNewNodeOrRackLocalMapTask()方法和obtainNewNonLocalMapTask()方法分别用来分配node-local/rack-local task和非本地的task(我觉得hadoop中这个方法的注释写的有问题,第33行,原代码第195行)。他们最终都调用了findNewMapTask()方法来分配task,但区别在于调用时的级别:obtainNewNodeOrRackLocalMapTask ()方法是“maxLevel”,表示可以运行node-local/rack-local级别的task,obtainNewNonLocalMapTask()方法是“NON_LOCAL_CACHE_LEVEL”,表示只能运行off-switch/speculative级别的task。而“anyCacheLevel”级别最高,表示node-local, rack-local, off-switch and speculative task都可以分配。
findNewMapTask
这里穿插说一下findNewMapTask()方法,真正的任务分配都是它来做的,task分配的优先级为:
1)、从failedMaps中调度failed Task
2)、从nonRunningMapCache中选择具有本地性的任务,优先级为node-local、rack-local、off-switch。至于本地性如何体现在后边说。
3)、从nonLocalMaps中选择任务
4)、从runningMapCache中选择任务,为其启动备份执行
5)、从nonLocalRunningMaps中选择任务,为其启动备份执行
最后,如果findNewMapTask()方法返回值为-1,则表示没有找到合适的Map task。否则返回值表示该Map task在JobInProgress的maps[]数组中的下标。
同理,这部分用来计算是否给Reduce task留有足够的slot去执行失败的和推测执行的Reduce task。
这部分用来分配Reduce task。可以看到,与分配Map task时用的双层for循环不同,分配Reduce task的时候是单层for循环,因为每次只分配一个Reduce task。Reduce task分配优先级为:
1)、从nonRunningReduces中选择
2)、从runningReduces选择一个task为其启动推测任务
最后,如果findNewReduceTask ()方法返回值为-1,则表示没有找到合适的Reduce task。否则返回值表示该Reduce task在JobInProgress的reduces[]数组中的下标。
最后返回分配给该TaskTracker的task集合。
说一下JobInProgress中与分配任务相关的重要数据结构:
关于Map task本地性的实现:
JobInProgress中的数据结构nonRunningMapCache体现了本地性,其中记录的是node与该node上待运行的Map task(TaskInProgress)集合。这个数据结构在JobInProgress中的createCache()中创建:
在这个方法中,根据split所在的node,将与该分片对应的Map Task(TaskInProgress)和Node添加到该数据结构中。当选择未运行的Map Task时,只要从该数据结构中查找与该结点对应的任务即可实现本地性。
本文基于hadoop1.2.1
如有错误,还请指正
参考文章:《Hadoop技术内幕 深入理解MapReduce架构设计与实现原理》 董西成
/article/5912643.html
转载请注明出处:/article/5582220.html
public synchronized List<Task> assignTasks(TaskTracker taskTracker) throws IOException { // Check for JT safe-mode if (taskTrackerManager.isInSafeMode()) { LOG.info("JobTracker is in safe-mode, not scheduling any tasks."); return null; } TaskTrackerStatus taskTrackerStatus = taskTracker.getStatus(); ClusterStatus clusterStatus = taskTrackerManager.getClusterStatus(); final int numTaskTrackers = clusterStatus.getTaskTrackers(); final int clusterMapCapacity = clusterStatus.getMaxMapTasks(); final int clusterReduceCapacity = clusterStatus.getMaxReduceTasks(); Collection<JobInProgress> jobQueue = jobQueueJobInProgressListener.getJobQueue();
首先检查是否处于安全模式;接着分别获取该TaskTracker的状态信息、集群状态信息、集群中的TaskTracker数目、集群能运行的最大Map Task个数和Reduce Task个数;再选择一个作业队列,对该队列中的作业进行调度。
// // Get map + reduce counts for the current tracker. // final int trackerMapCapacity = taskTrackerStatus.getMaxMapSlots(); final int trackerReduceCapacity = taskTrackerStatus.getMaxReduceSlots(); final int trackerRunningMaps = taskTrackerStatus.countMapTasks(); final int trackerRunningReduces = taskTrackerStatus.countReduceTasks(); // Assigned tasks List<Task> assignedTasks = new ArrayList<Task>();
这4行分别是获取Map和Reduce的slot,然后是获取当前TaskTracker上正在运行的Map和Reduce task数目;最后一行的集合用来存放分配给该TaskTracker的task。
// // Compute (running + pending) map and reduce task numbers across pool // int remainingReduceLoad = 0; int remainingMapLoad = 0; synchronized (jobQueue) { for (JobInProgress job : jobQueue) { if (job.getStatus().getRunState() == JobStatus.RUNNING) { remainingMapLoad += (job.desiredMaps() - job.finishedMaps()); if (job.scheduleReduces()) { remainingReduceLoad += (job.desiredReduces() - job.finishedReduces()); } } } }
该段代码用来计算作业队列中还有多少Map和Reduce task需要运行。job.desiredMaps()方法用来计算该Job总共有多少个Map task。job.finishedMaps()方法用来计算该Job有多少个已完成的Map task。同理,job.desiredReduces()方法与job.finishedReduces()方法用来计算Reduce。
// Compute the 'load factor' for maps and reduces double mapLoadFactor = 0.0; if (clusterMapCapacity > 0) { mapLoadFactor = (double)remainingMapLoad / clusterMapCapacity; } double reduceLoadFactor = 0.0; if (clusterReduceCapacity > 0) { reduceLoadFactor = (double)remainingReduceLoad / clusterReduceCapacity; }
用来计算Map和Reduce task的装载百分比,即根据剩余需要运行的Map task和集群能运行的最大Map Task个数的比例,来为TaskTracker计算一个装载因子,使得该TaskTracker上的Map task个数不超过这个比例。Reduce也一样。
// // In the below steps, we allocate first map tasks (if appropriate), // and then reduce tasks if appropriate. We go through all jobs // in order of job arrival; jobs only get serviced if their // predecessors are serviced, too. // // // We assign tasks to the current taskTracker if the given machine // has a workload that's less than the maximum load of that kind of // task. // However, if the cluster is close to getting loaded i.e. we don't // have enough _padding_ for speculative executions etc., we only // schedule the "highest priority" task i.e. the task from the job // with the highest priority. // final int trackerCurrentMapCapacity = Math.min((int)Math.ceil(mapLoadFactor * trackerMapCapacity), trackerMapCapacity); int availableMapSlots = trackerCurrentMapCapacity - trackerRunningMaps; boolean exceededMapPadding = false; if (availableMapSlots > 0) { exceededMapPadding = exceededPadding(true, clusterStatus, trackerMapCapacity); }
第一行根据上一步计算出来的Map task装载因子,计算当前结点能够运行的Map task个数;第二行计算剩余的能够运行Map task的slot个数availableMapSlots。如果availableMapSlots大于0表示还有余地运行Map task。Hadoop不会把所有的slot 都分配完,而是会留一些slot给失败的和推测执行的任务,exceededPadding()方法就是来完成这个任务的。
int numLocalMaps = 0; int numNonLocalMaps = 0; scheduleMaps: for (int i=0; i < availableMapSlots; ++i) { synchronized (jobQueue) { for (JobInProgress job : jobQueue) { if (job.getStatus().getRunState() != JobStatus.RUNNING) { continue; } Task t = null; // Try to schedule a Map task with locality between node-local // and rack-local t = job.obtainNewNodeOrRackLocalMapTask(taskTrackerStatus, numTaskTrackers, taskTrackerManager.getNumberOfUniqueHosts()); if (t != null) { assignedTasks.add(t); ++numLocalMaps; // Don't assign map tasks to the hilt! // Leave some free slots in the cluster for future task-failures, // speculative tasks etc. beyond the highest priority job if (exceededMapPadding) { break scheduleMaps; } // Try all jobs again for the next Map task break; } // Try to schedule a node-local or rack-local Map task t = job.obtainNewNonLocalMapTask(taskTrackerStatus, numTaskTrackers, taskTrackerManager.getNumberOfUniqueHosts()); if (t != null) { assignedTasks.add(t); ++numNonLocalMaps; // We assign at most 1 off-switch or speculative task // This is to prevent TaskTrackers from stealing local-tasks // from other TaskTrackers. break scheduleMaps; } } } } int assignedMaps = assignedTasks.size();
以上这部分就是分配Map task的过程。obtainNewNodeOrRackLocalMapTask()方法和obtainNewNonLocalMapTask()方法分别用来分配node-local/rack-local task和非本地的task(我觉得hadoop中这个方法的注释写的有问题,第33行,原代码第195行)。他们最终都调用了findNewMapTask()方法来分配task,但区别在于调用时的级别:obtainNewNodeOrRackLocalMapTask ()方法是“maxLevel”,表示可以运行node-local/rack-local级别的task,obtainNewNonLocalMapTask()方法是“NON_LOCAL_CACHE_LEVEL”,表示只能运行off-switch/speculative级别的task。而“anyCacheLevel”级别最高,表示node-local, rack-local, off-switch and speculative task都可以分配。
1 /** 2 * Find new map task 3 * @param tts The task tracker that is asking for a task 4 * @param clusterSize The number of task trackers in the cluster 5 * @param numUniqueHosts The number of hosts that run task trackers 6 * @param avgProgress The average progress of this kind of task in this job 7 * @param maxCacheLevel The maximum topology level until which to schedule 8 * maps. 9 * A value of {@link #anyCacheLevel} implies any 10 * available task (node-local, rack-local, off-switch and 11 * speculative tasks). 12 * A value of {@link #NON_LOCAL_CACHE_LEVEL} implies only 13 * off-switch/speculative tasks should be scheduled. 14 * @return the index in tasks of the selected task (or -1 for no task) 15 */ 16 private synchronized int findNewMapTask(final TaskTrackerStatus tts, 17 final int clusterSize, 18 final int numUniqueHosts, 19 final int maxCacheLevel, 20 final double avgProgress) { 21 if (numMapTasks == 0) { 22 if(LOG.isDebugEnabled()) { 23 LOG.debug("No maps to schedule for " + profile.getJobID()); 24 } 25 return -1; 26 } 27 28 String taskTracker = tts.getTrackerName(); 29 TaskInProgress tip = null; 30 31 // 32 // Update the last-known clusterSize 33 // 34 this.clusterSize = clusterSize; 35 36 if (!shouldRunOnTaskTracker(taskTracker)) { 37 return -1; 38 } 39 40 // Check to ensure this TaskTracker has enough resources to 41 // run tasks from this job 42 long outSize = resourceEstimator.getEstimatedMapOutputSize(); 43 long availSpace = tts.getResourceStatus().getAvailableSpace(); 44 if(availSpace < outSize) { 45 LOG.warn("No room for map task. Node " + tts.getHost() + 46 " has " + availSpace + 47 " bytes free; but we expect map to take " + outSize); 48 49 return -1; //see if a different TIP might work better. 50 } 51 52 53 // When scheduling a map task: 54 // 0) Schedule a failed task without considering locality 55 // 1) Schedule non-running tasks 56 // 2) Schedule speculative tasks 57 // 3) Schedule tasks with no location information 58 59 // First a look up is done on the non-running cache and on a miss, a look 60 // up is done on the running cache. The order for lookup within the cache: 61 // 1. from local node to root [bottom up] 62 // 2. breadth wise for all the parent nodes at max level 63 // We fall to linear scan of the list ((3) above) if we have misses in the 64 // above caches 65 66 // 0) Schedule the task with the most failures, unless failure was on this 67 // machine 68 tip = findTaskFromList(failedMaps, tts, numUniqueHosts, false); 69 if (tip != null) { 70 // Add to the running list 71 scheduleMap(tip); 72 LOG.info("Choosing a failed task " + tip.getTIPId()); 73 return tip.getIdWithinJob(); 74 } 75 76 Node node = jobtracker.getNode(tts.getHost()); 77 78 // 79 // 1) Non-running TIP : 80 // 81 82 // 1. check from local node to the root [bottom up cache lookup] 83 // i.e if the cache is available and the host has been resolved 84 // (node!=null) 85 if (node != null) { 86 Node key = node; 87 int level = 0; 88 // maxCacheLevel might be greater than this.maxLevel if findNewMapTask is 89 // called to schedule any task (local, rack-local, off-switch or 90 // speculative) tasks or it might be NON_LOCAL_CACHE_LEVEL (i.e. -1) if 91 // findNewMapTask is (i.e. -1) if findNewMapTask is to only schedule 92 // off-switch/speculative tasks 93 int maxLevelToSchedule = Math.min(maxCacheLevel, maxLevel); 94 for (level = 0;level < maxLevelToSchedule; ++level) { 95 List <TaskInProgress> cacheForLevel = nonRunningMapCache.get(key); 96 if (cacheForLevel != null) { 97 tip = findTaskFromList(cacheForLevel, tts, 98 numUniqueHosts,level == 0); 99 if (tip != null) { 100 // Add to running cache 101 scheduleMap(tip); 102 103 // remove the cache if its empty 104 if (cacheForLevel.size() == 0) { 105 nonRunningMapCache.remove(key); 106 } 107 108 return tip.getIdWithinJob(); 109 } 110 } 111 key = key.getParent(); 112 } 113 114 // Check if we need to only schedule a local task (node-local/rack-local) 115 if (level == maxCacheLevel) { 116 return -1; 117 } 118 } 119 120 //2. Search breadth-wise across parents at max level for non-running 121 // TIP if 122 // - cache exists and there is a cache miss 123 // - node information for the tracker is missing (tracker's topology 124 // info not obtained yet) 125 126 // collection of node at max level in the cache structure 127 Collection<Node> nodesAtMaxLevel = jobtracker.getNodesAtMaxLevel(); 128 129 // get the node parent at max level 130 Node nodeParentAtMaxLevel = 131 (node == null) ? null : JobTracker.getParentNode(node, maxLevel - 1); 132 133 for (Node parent : nodesAtMaxLevel) { 134 135 // skip the parent that has already been scanned 136 if (parent == nodeParentAtMaxLevel) { 137 continue; 138 } 139 140 List<TaskInProgress> cache = nonRunningMapCache.get(parent); 141 if (cache != null) { 142 tip = findTaskFromList(cache, tts, numUniqueHosts, false); 143 if (tip != null) { 144 // Add to the running cache 145 scheduleMap(tip); 146 147 // remove the cache if empty 148 if (cache.size() == 0) { 149 nonRunningMapCache.remove(parent); 150 } 151 LOG.info("Choosing a non-local task " + tip.getTIPId()); 152 return tip.getIdWithinJob(); 153 } 154 } 155 } 156 157 // 3. Search non-local tips for a new task 158 tip = findTaskFromList(nonLocalMaps, tts, numUniqueHosts, false); 159 if (tip != null) { 160 // Add to the running list 161 scheduleMap(tip); 162 163 LOG.info("Choosing a non-local task " + tip.getTIPId()); 164 return tip.getIdWithinJob(); 165 } 166 167 // 168 // 2) Running TIP : 169 // 170 171 if (hasSpeculativeMaps) { 172 long currentTime = jobtracker.getClock().getTime(); 173 174 // 1. Check bottom up for speculative tasks from the running cache 175 if (node != null) { 176 Node key = node; 177 for (int level = 0; level < maxLevel; ++level) { 178 Set<TaskInProgress> cacheForLevel = runningMapCache.get(key); 179 if (cacheForLevel != null) { 180 tip = findSpeculativeTask(cacheForLevel, tts, 181 avgProgress, currentTime, level == 0); 182 if (tip != null) { 183 if (cacheForLevel.size() == 0) { 184 runningMapCache.remove(key); 185 } 186 return tip.getIdWithinJob(); 187 } 188 } 189 key = key.getParent(); 190 } 191 } 192 193 // 2. Check breadth-wise for speculative tasks 194 195 for (Node parent : nodesAtMaxLevel) { 196 // ignore the parent which is already scanned 197 if (parent == nodeParentAtMaxLevel) { 198 continue; 199 } 200 201 Set<TaskInProgress> cache = runningMapCache.get(parent); 202 if (cache != null) { 203 tip = findSpeculativeTask(cache, tts, avgProgress, 204 currentTime, false); 205 if (tip != null) { 206 // remove empty cache entries 207 if (cache.size() == 0) { 208 runningMapCache.remove(parent); 209 } 210 LOG.info("Choosing a non-local task " + tip.getTIPId() 211 + " for speculation"); 212 return tip.getIdWithinJob(); 213 } 214 } 215 } 216 217 // 3. Check non-local tips for speculation 218 tip = findSpeculativeTask(nonLocalRunningMaps, tts, avgProgress, 219 currentTime, false); 220 if (tip != null) { 221 LOG.info("Choosing a non-local task " + tip.getTIPId() 222 + " for speculation"); 223 return tip.getIdWithinJob(); 224 } 225 } 226 227 return -1; 228 }
findNewMapTask
这里穿插说一下findNewMapTask()方法,真正的任务分配都是它来做的,task分配的优先级为:
1)、从failedMaps中调度failed Task
2)、从nonRunningMapCache中选择具有本地性的任务,优先级为node-local、rack-local、off-switch。至于本地性如何体现在后边说。
3)、从nonLocalMaps中选择任务
4)、从runningMapCache中选择任务,为其启动备份执行
5)、从nonLocalRunningMaps中选择任务,为其启动备份执行
最后,如果findNewMapTask()方法返回值为-1,则表示没有找到合适的Map task。否则返回值表示该Map task在JobInProgress的maps[]数组中的下标。
// // Same thing, but for reduce tasks // However we _never_ assign more than 1 reduce task per heartbeat // final int trackerCurrentReduceCapacity = Math.min((int)Math.ceil(reduceLoadFactor * trackerReduceCapacity), trackerReduceCapacity); final int availableReduceSlots = Math.min((trackerCurrentReduceCapacity - trackerRunningReduces), 1); boolean exceededReducePadding = false; if (availableReduceSlots > 0) { exceededReducePadding = exceededPadding(false, clusterStatus, trackerReduceCapacity);
同理,这部分用来计算是否给Reduce task留有足够的slot去执行失败的和推测执行的Reduce task。
synchronized (jobQueue) { for (JobInProgress job : jobQueue) { if (job.getStatus().getRunState() != JobStatus.RUNNING || job.numReduceTasks == 0) { continue; } Task t = job.obtainNewReduceTask(taskTrackerStatus, numTaskTrackers, taskTrackerManager.getNumberOfUniqueHosts() ); if (t != null) { assignedTasks.add(t); break; } // Don't assign reduce tasks to the hilt! // Leave some free slots in the cluster for future task-failures, // speculative tasks etc. beyond the highest priority job if (exceededReducePadding) { break; } } } }
这部分用来分配Reduce task。可以看到,与分配Map task时用的双层for循环不同,分配Reduce task的时候是单层for循环,因为每次只分配一个Reduce task。Reduce task分配优先级为:
1)、从nonRunningReduces中选择
2)、从runningReduces选择一个task为其启动推测任务
最后,如果findNewReduceTask ()方法返回值为-1,则表示没有找到合适的Reduce task。否则返回值表示该Reduce task在JobInProgress的reduces[]数组中的下标。
if (LOG.isDebugEnabled()) { LOG.debug("Task assignments for " + taskTrackerStatus.getTrackerName() + " --> " + "[" + mapLoadFactor + ", " + trackerMapCapacity + ", " + trackerCurrentMapCapacity + ", " + trackerRunningMaps + "] -> [" + (trackerCurrentMapCapacity - trackerRunningMaps) + ", " + assignedMaps + " (" + numLocalMaps + ", " + numNonLocalMaps + ")] [" + reduceLoadFactor + ", " + trackerReduceCapacity + ", " + trackerCurrentReduceCapacity + "," + trackerRunningReduces + "] -> [" + (trackerCurrentReduceCapacity - trackerRunningReduces) + ", " + (assignedTasks.size()-assignedMaps) + "]"); } return assignedTasks;
最后返回分配给该TaskTracker的task集合。
说一下JobInProgress中与分配任务相关的重要数据结构:
Map<Node, List<TaskInProgress>> nonRunningMapCache:Node与未运行的TIP集合映射关系,通过作业的InputFormat可直接获取 Map<Node, Set<TaskInProgress>> runningMapCache:Node与运行的TIP集合映射关系,一个任务获得调度机会,其TIP便会添加进来 final List<TaskInProgress> nonLocalMaps:non-local(没有输入数据,InputSplit为空)且未运行的TIP集合 final SortedSet<TaskInProgress> failedMaps:按照Task Attempt失败次数排序的TIP集合 Set<TaskInProgress> nonLocalRunningMaps:non-local且正在运行的TIP集合 Set<TaskInProgress> nonRunningReduces:等待运行的Reduce集合 Set<TaskInProgress> runningReduces:正在运行的Reduce集合
关于Map task本地性的实现:
JobInProgress中的数据结构nonRunningMapCache体现了本地性,其中记录的是node与该node上待运行的Map task(TaskInProgress)集合。这个数据结构在JobInProgress中的createCache()中创建:
private Map<Node, List<TaskInProgress>> createCache( TaskSplitMetaInfo[] splits, int maxLevel) throws UnknownHostException { Map<Node, List<TaskInProgress>> cache = new IdentityHashMap<Node, List<TaskInProgress>>(maxLevel); Set<String> uniqueHosts = new TreeSet<String>(); for (int i = 0; i < splits.length; i++) { String[] splitLocations = splits[i].getLocations(); if (splitLocations == null || splitLocations.length == 0) { nonLocalMaps.add(maps[i]); continue; } for(String host: splitLocations) { Node node = jobtracker.resolveAndAddToTopology(host); uniqueHosts.add(host); LOG.info("tip:" + maps[i].getTIPId() + " has split on node:" + node); for (int j = 0; j < maxLevel; j++) { List<TaskInProgress> hostMaps = cache.get(node); if (hostMaps == null) { hostMaps = new ArrayList<TaskInProgress>(); cache.put(node, hostMaps); hostMaps.add(maps[i]); } //check whether the hostMaps already contains an entry for a TIP //This will be true for nodes that are racks and multiple nodes in //the rack contain the input for a tip. Note that if it already //exists in the hostMaps, it must be the last element there since //we process one TIP at a time sequentially in the split-size order if (hostMaps.get(hostMaps.size() - 1) != maps[i]) { hostMaps.add(maps[i]); } node = node.getParent(); } } } // Calibrate the localityWaitFactor - Do not override user intent! if (localityWaitFactor == DEFAULT_LOCALITY_WAIT_FACTOR) { int jobNodes = uniqueHosts.size(); int clusterNodes = jobtracker.getNumberOfUniqueHosts(); if (clusterNodes > 0) { localityWaitFactor = Math.min((float)jobNodes/clusterNodes, localityWaitFactor); } LOG.info(jobId + " LOCALITY_WAIT_FACTOR=" + localityWaitFactor); } return cache; }
在这个方法中,根据split所在的node,将与该分片对应的Map Task(TaskInProgress)和Node添加到该数据结构中。当选择未运行的Map Task时,只要从该数据结构中查找与该结点对应的任务即可实现本地性。
本文基于hadoop1.2.1
如有错误,还请指正
参考文章:《Hadoop技术内幕 深入理解MapReduce架构设计与实现原理》 董西成
/article/5912643.html
转载请注明出处:/article/5582220.html
相关文章推荐
- eclipse中运行wordcount任务后在hadoop0.20.2监控页面没有显示该job的问题
- hadoop运行job任务出现异常
- Hadoop运行任务时一直卡在: INFO mapreduce.Job: Running job
- hadoop的三种任务调度的原理
- Spring调度任务cronjob 精确指定任务的运行时间
- Hadoop运行任务时一直卡在: INFO mapreduce.Job: Running job
- OFBiz实现任务调度和JOB运行机制
- 【niubi-job——一个分布式的任务调度框架】----框架设计原理以及实现
- hadoop job -kill 与 yarn application -kii(作业卡了或作业重复提交或MapReduce任务运行到running job卡住)
- OFBiz实现任务调度和JOB运行机制
- hadoop学习笔记(5)-运行任务(Job)小结:第三方jar包、hadoop jar命令
- Spring调度任务cronjob 精确指定任务的运行时间
- 【niubi-job——一个分布式的任务调度框架】----框架设计原理以及实现
- niubi-job:一个分布式的任务调度框架设计原理以及实现
- OFBiz实现任务调度和JOB运行机制
- hadoop学习笔记(5)-运行任务(Job)小结:第三方jar包、hadoop jar命令
- hadoop 任务运行到running job就卡住了 INFO mapreduce.Job: Running job: job_1403905542893_0004
- Hadoop运行任务时一直卡在: INFO mapreduce.Job: Running job
- Hadoop运行任务时一直卡在: INFO mapreduce.Job: Running job
- 【niubi-job——一个分布式的任务调度框架】----框架设计原理以及实现