jstorm源码分析:提交任务过程
2016-03-17 23:03
656 查看
submitTopologyWithOpts(ServiceHandler)
1 首先是校验任务的名称是否合法
所以真正干活的是在charValidate
2 校验是否有同名的任务已经在了
在来看下checkTopologyActive
那为什么不把或有任务的名称列表获取再进行直接匹配呢?看下面这个函数就会明白了
3 校验在提交列表中是否有同名的任务,这个是避免连续的多次提交
4 前面做了很多的准备工作,现在正在提交任务了,这个也是最复杂的部分,也分几个部分来介绍
4.1 对配置信息进行处理
4.2 验证任务的结构
那么就分别看下这两个部分
接下来对任务中的所有组件名称进行校验 validate_component_inputs
最后来校验任务配置个数(必须大于0), 另外校验ack任务个数。
4.3 在nimbus本地建立任务的目录
4.4 在zookeeper上建立所有spout和bolt目录信息setupZkTaskInfo
这个函数首先为每个task生成一个taskId
这里主要函数是mkTaskInfo 来完成
最后把任务的heatbeat信息和上面创建的taskInfo写入的zookeeper
4.5 makeAssignment 为拓扑结果创建任务
把任务时间发送到一个LinkedBlockingQueue, 最后调用doTopologyAssignment , 这个函数中最主要的调用了mkAssignment函数
这样nimbus这里提交任务就算完成了,接下来就看supvisor了
1 首先是校验任务的名称是否合法
if (!Common.charValidate(topologyName)) { throw new InvalidTopologyException(topologyName + " is not a valid topology name"); }
所以真正干活的是在charValidate
/** * Validation of topology name chars. Only alpha char, number, '-', '_', '.' are valid. * * @return */ public static boolean charValidate(String name) { return name.matches("[a-zA-Z0-9-_.]+"); }也就是任务的名称只能包含字母、数字、“-”、“_”和“.” 这5类。
2 校验是否有同名的任务已经在了
try { checkTopologyActive(data, topologyName, false); } catch (AlreadyAliveException e) { LOG.info(topologyName + " already exists "); throw e; } catch (Throwable e) { LOG.info("Failed to check whether topology is alive or not", e); throw new TException(e); }
在来看下checkTopologyActive
public void checkTopologyActive(NimbusData nimbus, String topologyName, boolean bActive) throws Exception { if (isTopologyActive(nimbus.getStormClusterState(), topologyName) != bActive) { if (bActive) { throw new NotAliveException(topologyName + " is not alive"); } else { throw new AlreadyAliveException(topologyName + " is already active"); } } }这个函数主要是主要功能校验任务的状态是否和期望的一样,当前任务的状态通过isTopologyActive函数获取
public boolean isTopologyActive(StormClusterState stormClusterState, String topologyName) throws Exception { boolean rtn = false; if (Cluster.get_topology_id(stormClusterState, topologyName) != null) { rtn = true; } return rtn; }如何判断任务是否活着呢?很简单,就是根据名称是否能拿到任务信息。这个就是在get_topology_id
public static String get_topology_id(StormClusterState zkCluster, String storm_name) throws Exception { List<String> active_storms = zkCluster.active_storms(); String rtn = null; if (active_storms != null) { for (String topology_id : active_storms) { if (topology_id.indexOf(storm_name) < 0) { continue; } StormBase base = zkCluster.storm_base(topology_id, null); if (base != null && storm_name.equals(Common.getTopologyNameById(topology_id))) { rtn = topology_id; break; } } } return rtn; }这个函数首先获取所有活着任务的id,因为任务的id是以任务名称为前缀的,所有通过字符串indexOf函数进行初步的判断,如果找到了,那还不能完全确认,需要通过任务id获取任务名称,最后再进行名称的完全匹配,匹配上了才算找到了。
那为什么不把或有任务的名称列表获取再进行直接匹配呢?看下面这个函数就会明白了
@Override public List<String> active_storms() throws Exception { return cluster_state.get_children(Cluster.STORMS_SUBTREE, false); } @Override public List<String> get_children(String path, boolean watch) throws Exception { return zkobj.getChildren(zk, path, watch); }所有任务信息通过遍历zookeeper上的目录来获取的,而且子目录就是任务的id,这样获取比较简单。然后进行初步判断,如果匹配上了,再拿到具体的任务名进行精准的匹。下面看看根据任务id获取任务名称的过程getTopologyNameById
public static String getTopologyNameById(String topologyId) { String topologyName = null; try { topologyName = topologyIdToName(topologyId); } catch (InvalidTopologyException e) { LOG.error("Invalid topologyId=" + topologyId); } return topologyName; } public static String topologyIdToName(String topologyId) throws InvalidTopologyException { String ret = null; int index = topologyId.lastIndexOf('-'); if (index != -1 && index > 2) { index = topologyId.lastIndexOf('-', index - 1); if (index != -1 && index > 0) ret = topologyId.substring(0, index); else throw new InvalidTopologyException(topologyId + " is not a valid topologyId"); } else throw new InvalidTopologyException(topologyId + " is not a valid topologyId"); return ret; }根据任务的id获取任务名称,就是按照“-”进行分割,把最后两段去掉。为什么不取第一段能?因为任务名称中可以包含"-",另外这里也介绍下任务id是怎么组成的
public static String topologyNameToId(String topologyName, int counter) { return topologyName + "-" + counter + "-" + TimeUtils.current_time_secs(); }这里的counter是整个集群启动后从0开始递增的一个整数。
3 校验在提交列表中是否有同名的任务,这个是避免连续的多次提交
String topologyId = null; synchronized (data) { // avoid to the same topologys wered submmitted at the same time Set<String> pendingTopologys = data.getPendingSubmitTopoloygs().keySet(); for (String cachTopologyId : pendingTopologys) { if (cachTopologyId.contains(topologyName + "-")) throw new AlreadyAliveException( topologyName + " were submitted"); } int counter = data.getSubmittedCount().incrementAndGet(); topologyId = Common.topologyNameToId(topologyName, counter); data.getPendingSubmitTopoloygs().put(topologyId, null); }这个函数看任务是否在提交列表,如果不存在,那么就把任务id放到这个列表中,如果存在抛出个异常。 其实这个提交列表是一个TimeCacheMap,key是任务的id,value 是null,这个和普通的map相比,多了个值是有生命周期的,这里设置成30分钟,也就是任务提交30分钟还不成功,那么就会被提交列表中删除(可以继续提交了)
4 前面做了很多的准备工作,现在正在提交任务了,这个也是最复杂的部分,也分几个部分来介绍
4.1 对配置信息进行处理
4.2 验证任务的结构
Common.validate_basic(normalizedTopology, totalStormConf, topologyId);
public static void validate_basic(StormTopology topology, Map<Object, Object> totalStormConf, String topologyid) throws InvalidTopologyException { validate_ids(topology, topologyid); for (StormTopology._Fields field : Thrift.SPOUT_FIELDS) { Object value = topology.getFieldValue(field); if (value != null) { Map<String, Object> obj_map = (Map<String, Object>) value; for (Object obj : obj_map.values()) { validate_component_inputs(obj); } } } Integer workerNum = JStormUtils.parseInt(totalStormConf.get(Config.TOPOLOGY_WORKERS)); if (workerNum == null || workerNum <= 0) { String errMsg = "There are no Config.TOPOLOGY_WORKERS in configuration of " + topologyid; throw new InvalidParameterException(errMsg); } Integer ackerNum = JStormUtils.parseInt(totalStormConf.get(Config.TOPOLOGY_ACKER_EXECUTORS)); if (ackerNum != null && ackerNum < 0) { String errMsg = "Invalide Config.TOPOLOGY_ACKERS in configuration of " + topologyid; throw new InvalidParameterException(errMsg); } }我们先看下注释:
Validate the topology 1. component id name is valid or not 2. check some spout's input is empty or not
那么就分别看下这两个部分
public static void validate_ids(StormTopology topology, String topologyId) throws InvalidTopologyException { String topologyName = topologyIdToName(topologyId); if (!charValidate(topologyName)) { throw new InvalidTopologyException(topologyName + " is not a valid topology name. " + nameErrorInfo); } List<String> list = new ArrayList<String>(); for (StormTopology._Fields field : Thrift.STORM_TOPOLOGY_FIELDS) { Object value = topology.getFieldValue(field); if (value != null) { Map<String, Object> obj_map = (Map<String, Object>) value; Set<String> commids = obj_map.keySet(); for (String id : commids) { if (system_id(id) || !charComponentValidate(id)) { throw new InvalidTopologyException(id + " is not a valid component id. " + compErrorInfo); } } for (Object obj : obj_map.values()) { validate_component(obj); } list.addAll(commids); } } List<String> offending = JStormUtils.getRepeat(list); if (offending.isEmpty() == false) { throw new InvalidTopologyException("Duplicate component ids: " + offending); } }这个首先校验任务的名称(这个上面干过了,在这里也可以不干),然后把任务中的spout, bolt 和state_spout三类组件名称那么出来,如果有重复,那么就抛个异常。
接下来对任务中的所有组件名称进行校验 validate_component_inputs
private static void validate_component_inputs(Object obj) throws InvalidTopologyException { if (obj instanceof StateSpoutSpec) { StateSpoutSpec spec = (StateSpoutSpec) obj; if (!spec.get_common().get_inputs().isEmpty()) { throw new InvalidTopologyException("May not declare inputs for a spout"); } } if (obj instanceof SpoutSpec) { SpoutSpec spec = (SpoutSpec) obj; if (!spec.get_common().get_inputs().isEmpty()) { throw new InvalidTopologyException("May not declare inputs for a spout"); } } }这个函数确保所有的spout (普通spout和state_spout)必须有输入 .
最后来校验任务配置个数(必须大于0), 另外校验ack任务个数。
4.3 在nimbus本地建立任务的目录
private void setupStormCode(Map<Object, Object> conf, String topologyId, String tmpJarLocation, Map<Object, Object> stormConf, StormTopology topology) throws IOException { // local-dir/nimbus/stormdist/topologyId String stormroot = StormConfig.masterStormdistRoot(conf, topologyId); FileUtils.forceMkdir(new File(stormroot)); FileUtils.cleanDirectory(new File(stormroot)); // copy jar to /local-dir/nimbus/topologyId/stormjar.jar setupJar(conf, tmpJarLocation, stormroot); // serialize to file /local-dir/nimbus/topologyId/stormcode.ser FileUtils.writeByteArrayToFile(new File(StormConfig.stormcode_path(stormroot)), Utils.serialize(topology)); // serialize to file /local-dir/nimbus/topologyId/stormconf.ser FileUtils.writeByteArrayToFile(new File(StormConfig.stormconf_path(stormroot)), Utils.serialize(stormConf)); // Update downloadCode timeStamp StormConfig.write_nimbus_topology_timestamp(data.getConf(), topologyId, System.currentTimeMillis()); }
4.4 在zookeeper上建立所有spout和bolt目录信息setupZkTaskInfo
public void setupZkTaskInfo(Map<Object, Object> conf, String topologyId, StormClusterState stormClusterState) throws Exception { Map<Integer, TaskInfo> taskToTaskInfo = mkTaskComponentAssignments(conf, topologyId); // mkdir /ZK/taskbeats/topoologyId int masterId = NimbusUtils.getTopologyMasterId(taskToTaskInfo); TopologyTaskHbInfo topoTaskHbinfo = new TopologyTaskHbInfo(topologyId, masterId); data.getTasksHeartbeat().put(topologyId, topoTaskHbinfo); stormClusterState.topology_heartbeat(topologyId, topoTaskHbinfo); if (taskToTaskInfo == null || taskToTaskInfo.size() == 0) { throw new InvalidTopologyException("Failed to generate TaskIDs map"); } // key is taskid, value is taskinfo stormClusterState.set_task(topologyId, taskToTaskInfo); }
这个函数首先为每个task生成一个taskId
public Map<Integer, TaskInfo> mkTaskComponentAssignments(Map<Object, Object> conf, String topologyid) throws IOException, InvalidTopologyException { // @@@ here exist a little problem, // we can directly pass stormConf from Submit method Map<Object, Object> stormConf = StormConfig.read_nimbus_topology_conf(conf, topologyid); StormTopology stopology = StormConfig.read_nimbus_topology_code(conf, topologyid); StormTopology topology = Common.system_topology(stormConf, stopology); return Common.mkTaskInfo(stormConf, topology, topologyid); }
这里主要函数是mkTaskInfo 来完成
public static Map<Integer, TaskInfo> mkTaskInfo(Map<Object, Object> stormConf, StormTopology sysTopology, String topologyid) { // use TreeMap to make task as sequence // id(从0开始) -> taskinfo Map<Integer, TaskInfo> rtn = new TreeMap<Integer, TaskInfo>(); Integer count = 0; count = mkTaskMaker(stormConf, sysTopology.get_bolts(), rtn, count); count = mkTaskMaker(stormConf, sysTopology.get_spouts(), rtn, count); count = mkTaskMaker(stormConf, sysTopology.get_state_spouts(), rtn, count); return rtn; }很清楚,就是分别为三类组件生成id(bolts, spouts, state_spouts) id, 他们id从0开始进行递增的。我们来看下mkTaskMaker
public static Integer mkTaskMaker(Map<Object, Object> stormConf, Map<String, ?> cidSpec, Map<Integer, TaskInfo> rtn, Integer cnt) { if (cidSpec == null) { LOG.warn("Component map is empty"); return cnt; } Set<?> entrySet = cidSpec.entrySet(); for (Iterator<?> it = entrySet.iterator(); it.hasNext();) { Entry entry = (Entry) it.next(); Object obj = entry.getValue(); ComponentCommon common = null; String componentType = "bolt"; if (obj instanceof Bolt) { common = ((Bolt) obj).get_common(); componentType = "bolt"; } else if (obj instanceof SpoutSpec) { common = ((SpoutSpec) obj).get_common(); componentType = "spout"; } else if (obj instanceof StateSpoutSpec) { common = ((StateSpoutSpec) obj).get_common(); componentType = "spout"; } if (common == null) { throw new RuntimeException("No ComponentCommon of " + entry.getKey()); } int declared = Thrift.parallelismHint(common); Integer parallelism = declared; // Map tmp = (Map) Utils_clj.from_json(common.get_json_conf()); Map newStormConf = new HashMap(stormConf); // newStormConf.putAll(tmp); Integer maxParallelism = JStormUtils.parseInt(newStormConf.get(Config.TOPOLOGY_MAX_TASK_PARALLELISM)); if (maxParallelism != null) { parallelism = Math.min(maxParallelism, declared); } for (int i = 0; i < parallelism; i++) { cnt++; TaskInfo taskInfo = new TaskInfo((String) entry.getKey(), componentType); rtn.put(cnt, taskInfo); } } return cnt; }总结起来也很简单,遍历所有组件,对于每个组件确定组件的类型,并根据组件的并行度,创建对应数量的taskInfo
最后把任务的heatbeat信息和上面创建的taskInfo写入的zookeeper
4.5 makeAssignment 为拓扑结果创建任务
private void makeAssignment(String topologyName, String topologyId, TopologyInitialStatus status) throws FailedAssignTopologyException { TopologyAssignEvent assignEvent = new TopologyAssignEvent(); assignEvent.setTopologyId(topologyId); assignEvent.setScratch(false); assignEvent.setTopologyName(topologyName); assignEvent.setOldStatus(Thrift.topologyInitialStatusToStormStatus(status)); TopologyAssign.push(assignEvent); boolean isSuccess = assignEvent.waitFinish(); if (isSuccess == true) { LOG.info("Finish submit for " + topologyName); } else { throw new FailedAssignTopologyException(assignEvent.getErrorMsg()); } }
把任务时间发送到一个LinkedBlockingQueue, 最后调用doTopologyAssignment , 这个函数中最主要的调用了mkAssignment函数
public Assignment mkAssignment(TopologyAssignEvent event) throws Exception { String topologyId = event.getTopologyId(); LOG.info("Determining assignment for " + topologyId); TopologyAssignContext context = prepareTopologyAssign(event); Set<ResourceWorkerSlot> assignments = null; if (!StormConfig.local_mode(nimbusData.getConf())) { IToplogyScheduler scheduler = schedulers.get(DEFAULT_SCHEDULER_NAME); assignments = scheduler.assignTasks(context); } else { assignments = mkLocalAssignment(context); } Assignment assignment = null; if (assignments != null && assignments.size() > 0) { //获取任务对应机器 Map<String, String> nodeHost = getTopologyNodeHost(context.getCluster(), context.getOldAssignment(), assignments); //获取子任务的启动时间 Map<Integer, Integer> startTimes = getTaskStartTimes(context, nimbusData, topologyId, context.getOldAssignment(), assignments); //获取代码的路径 String codeDir = StormConfig.masterStormdistRoot(nimbusData.getConf(), topologyId); assignment = new Assignment(codeDir, assignments, nodeHost, startTimes); // the topology binary changed. if (event.isScaleTopology()){ assignment.setAssignmentType(Assignment.AssignmentType.ScaleTopology); } StormClusterState stormClusterState = nimbusData.getStormClusterState(); //把任务分配的信息写入zk stormClusterState.set_assignment(topologyId, assignment); // update task heartbeat's start time NimbusUtils.updateTaskHbStartTime(nimbusData, assignment, topologyId); // @@@ TODO // Update metrics information in ZK when rebalance or reassignment // Only update metrics monitor status when creating topology // if (context.getAssignType() == // TopologyAssignContext.ASSIGN_TYPE_REBALANCE // || context.getAssignType() == // TopologyAssignContext.ASSIGN_TYPE_MONITOR) // NimbusUtils.updateMetricsInfo(nimbusData, topologyId, assignment); NimbusUtils.updateTopologyTaskTimeout(nimbusData, topologyId); LOG.info("Successfully make assignment for topology id " + topologyId + ": " + assignment); } return assignment; }
这样nimbus这里提交任务就算完成了,接下来就看supvisor了
相关文章推荐
- JavaScript中with语句的优缺点
- JS二级联动
- javascript几个提高性能的知识点
- js调用RadioButton1
- JS执行效率与性能提升方案
- JS---一步一步学会如何使用构造函数创建对象
- fastjson转换json格式数据为List<HashMap>转换异常问题
- JavaScript instanceof 运算符深入剖析
- JavaScript开发中几个常用知识点总结
- javascript模块化详解
- JS中Window,document,form的属性,方法,事件(转)
- jsp 导航栏显示日期和星期几
- 关于JS数组对象
- js总结1
- Newtonsoft.Json
- C_Json数据解析_s
- JS学习笔记(一)
- js代码判断浏览器种类IE、FF、Opera、Safari、chrome及版本
- 第八章:JavaScript事件驱动编程和访问CSS技术
- JS拖拽pro