您的位置:首页 > Web前端 > JavaScript

jstorm源码分析:提交任务过程

2016-03-17 23:03 656 查看
submitTopologyWithOpts(ServiceHandler)

1 首先是校验任务的名称是否合法

if (!Common.charValidate(topologyName)) {
throw new InvalidTopologyException(topologyName + " is not a valid topology name");
}

所以真正干活的是在charValidate

/**
* Validation of topology name chars. Only alpha char, number, '-', '_', '.' are valid.
*
* @return
*/
public static boolean charValidate(String name) {
return name.matches("[a-zA-Z0-9-_.]+");
}
也就是任务的名称只能包含字母、数字、“-”、“_”和“.” 这5类。

2 校验是否有同名的任务已经在了

try {
checkTopologyActive(data, topologyName, false);
} catch (AlreadyAliveException e) {
LOG.info(topologyName + " already exists ");
throw e;
} catch (Throwable e) {
LOG.info("Failed to check whether topology is alive or not", e);
throw new TException(e);
}


在来看下checkTopologyActive

public void checkTopologyActive(NimbusData nimbus, String topologyName, boolean bActive) throws Exception {
if (isTopologyActive(nimbus.getStormClusterState(), topologyName) != bActive) {
if (bActive) {
throw new NotAliveException(topologyName + " is not alive");
} else {
throw new AlreadyAliveException(topologyName + " is already active");
}
}
}
这个函数主要是主要功能校验任务的状态是否和期望的一样,当前任务的状态通过isTopologyActive函数获取

public boolean isTopologyActive(StormClusterState stormClusterState, String topologyName) throws Exception {
boolean rtn = false;
if (Cluster.get_topology_id(stormClusterState, topologyName) != null) {
rtn = true;
}
return rtn;
}
如何判断任务是否活着呢?很简单,就是根据名称是否能拿到任务信息。这个就是在get_topology_id

public static String get_topology_id(StormClusterState zkCluster, String storm_name) throws Exception {
List<String> active_storms = zkCluster.active_storms();
String rtn = null;
if (active_storms != null) {
for (String topology_id : active_storms) {

if (topology_id.indexOf(storm_name) < 0) {
continue;
}
StormBase base = zkCluster.storm_base(topology_id, null);
if (base != null && storm_name.equals(Common.getTopologyNameById(topology_id))) {
rtn = topology_id;
break;
}

}
}
return rtn;
}
这个函数首先获取所有活着任务的id,因为任务的id是以任务名称为前缀的,所有通过字符串indexOf函数进行初步的判断,如果找到了,那还不能完全确认,需要通过任务id获取任务名称,最后再进行名称的完全匹配,匹配上了才算找到了。

那为什么不把或有任务的名称列表获取再进行直接匹配呢?看下面这个函数就会明白了

@Override
public List<String> active_storms() throws Exception {
return cluster_state.get_children(Cluster.STORMS_SUBTREE, false);
}

@Override
public List<String> get_children(String path, boolean watch) throws Exception {
return zkobj.getChildren(zk, path, watch);
}
所有任务信息通过遍历zookeeper上的目录来获取的,而且子目录就是任务的id,这样获取比较简单。然后进行初步判断,如果匹配上了,再拿到具体的任务名进行精准的匹。下面看看根据任务id获取任务名称的过程getTopologyNameById

public static String getTopologyNameById(String topologyId) {
String topologyName = null;
try {
topologyName = topologyIdToName(topologyId);
} catch (InvalidTopologyException e) {
LOG.error("Invalid topologyId=" + topologyId);
}
return topologyName;
}

public static String topologyIdToName(String topologyId) throws InvalidTopologyException {
String ret = null;
int index = topologyId.lastIndexOf('-');
if (index != -1 && index > 2) {
index = topologyId.lastIndexOf('-', index - 1);
if (index != -1 && index > 0)
ret = topologyId.substring(0, index);
else
throw new InvalidTopologyException(topologyId + " is not a valid topologyId");
} else
throw new InvalidTopologyException(topologyId + " is not a valid topologyId");
return ret;
}
根据任务的id获取任务名称,就是按照“-”进行分割,把最后两段去掉。为什么不取第一段能?因为任务名称中可以包含"-",另外这里也介绍下任务id是怎么组成的

public static String topologyNameToId(String topologyName, int counter) {
return topologyName + "-" + counter + "-" + TimeUtils.current_time_secs();
}
这里的counter是整个集群启动后从0开始递增的一个整数。

3 校验在提交列表中是否有同名的任务,这个是避免连续的多次提交

String topologyId = null;
synchronized (data) {
// avoid to the same topologys wered submmitted at the same time
Set<String> pendingTopologys =
data.getPendingSubmitTopoloygs().keySet();
for (String cachTopologyId : pendingTopologys) {
if (cachTopologyId.contains(topologyName + "-"))
throw new AlreadyAliveException(
topologyName + "  were submitted");
}
int counter = data.getSubmittedCount().incrementAndGet();
topologyId = Common.topologyNameToId(topologyName, counter);
data.getPendingSubmitTopoloygs().put(topologyId, null);
}
这个函数看任务是否在提交列表,如果不存在,那么就把任务id放到这个列表中,如果存在抛出个异常。 其实这个提交列表是一个TimeCacheMap,key是任务的id,value 是null,这个和普通的map相比,多了个值是有生命周期的,这里设置成30分钟,也就是任务提交30分钟还不成功,那么就会被提交列表中删除(可以继续提交了)

4 前面做了很多的准备工作,现在正在提交任务了,这个也是最复杂的部分,也分几个部分来介绍

4.1 对配置信息进行处理

4.2 验证任务的结构

Common.validate_basic(normalizedTopology, totalStormConf, topologyId);
public static void validate_basic(StormTopology topology, Map<Object, Object> totalStormConf, String topologyid) throws InvalidTopologyException {
validate_ids(topology, topologyid);

for (StormTopology._Fields field : Thrift.SPOUT_FIELDS) {
Object value = topology.getFieldValue(field);
if (value != null) {
Map<String, Object> obj_map = (Map<String, Object>) value;
for (Object obj : obj_map.values()) {
validate_component_inputs(obj);
}
}

}

Integer workerNum = JStormUtils.parseInt(totalStormConf.get(Config.TOPOLOGY_WORKERS));
if (workerNum == null || workerNum <= 0) {
String errMsg = "There are no Config.TOPOLOGY_WORKERS in configuration of " + topologyid;
throw new InvalidParameterException(errMsg);
}

Integer ackerNum = JStormUtils.parseInt(totalStormConf.get(Config.TOPOLOGY_ACKER_EXECUTORS));
if (ackerNum != null && ackerNum < 0) {
String errMsg = "Invalide Config.TOPOLOGY_ACKERS in configuration of " + topologyid;
throw new InvalidParameterException(errMsg);
}

}
我们先看下注释:

Validate the topology 1. component id name is valid or not 2. check some spout's input is empty or not


那么就分别看下这两个部分

public static void validate_ids(StormTopology topology, String topologyId) throws InvalidTopologyException {
String topologyName = topologyIdToName(topologyId);
if (!charValidate(topologyName)) {
throw new InvalidTopologyException(topologyName + " is not a valid topology name. " + nameErrorInfo);
}

List<String> list = new ArrayList<String>();

for (StormTopology._Fields field : Thrift.STORM_TOPOLOGY_FIELDS) {
Object value = topology.getFieldValue(field);
if (value != null) {
Map<String, Object> obj_map = (Map<String, Object>) value;

Set<String> commids = obj_map.keySet();

for (String id : commids) {
if (system_id(id) || !charComponentValidate(id)) {
throw new InvalidTopologyException(id + " is not a valid component id. " + compErrorInfo);
}
}

for (Object obj : obj_map.values()) {
validate_component(obj);
}

list.addAll(commids);
}
}

List<String> offending = JStormUtils.getRepeat(list);
if (offending.isEmpty() == false) {
throw new InvalidTopologyException("Duplicate component ids: " + offending);
}

}
这个首先校验任务的名称(这个上面干过了,在这里也可以不干),然后把任务中的spout, bolt 和state_spout三类组件名称那么出来,如果有重复,那么就抛个异常。

接下来对任务中的所有组件名称进行校验 validate_component_inputs

private static void validate_component_inputs(Object obj) throws InvalidTopologyException {
if (obj instanceof StateSpoutSpec) {
StateSpoutSpec spec = (StateSpoutSpec) obj;
if (!spec.get_common().get_inputs().isEmpty()) {
throw new InvalidTopologyException("May not declare inputs for a spout");
}

}

if (obj instanceof SpoutSpec) {
SpoutSpec spec = (SpoutSpec) obj;
if (!spec.get_common().get_inputs().isEmpty()) {
throw new InvalidTopologyException("May not declare inputs for a spout");
}
}
}
这个函数确保所有的spout (普通spout和state_spout)必须有输入 .

最后来校验任务配置个数(必须大于0), 另外校验ack任务个数。

4.3 在nimbus本地建立任务的目录

private void setupStormCode(Map<Object, Object> conf, String topologyId, String tmpJarLocation, Map<Object, Object> stormConf, StormTopology topology)
throws IOException {
// local-dir/nimbus/stormdist/topologyId
String stormroot = StormConfig.masterStormdistRoot(conf, topologyId);

FileUtils.forceMkdir(new File(stormroot));
FileUtils.cleanDirectory(new File(stormroot));

// copy jar to /local-dir/nimbus/topologyId/stormjar.jar
setupJar(conf, tmpJarLocation, stormroot);

// serialize to file /local-dir/nimbus/topologyId/stormcode.ser
FileUtils.writeByteArrayToFile(new File(StormConfig.stormcode_path(stormroot)), Utils.serialize(topology));

// serialize to file /local-dir/nimbus/topologyId/stormconf.ser
FileUtils.writeByteArrayToFile(new File(StormConfig.stormconf_path(stormroot)), Utils.serialize(stormConf));

// Update downloadCode timeStamp
StormConfig.write_nimbus_topology_timestamp(data.getConf(), topologyId, System.currentTimeMillis());
}


4.4 在zookeeper上建立所有spout和bolt目录信息setupZkTaskInfo

public void setupZkTaskInfo(Map<Object, Object> conf, String topologyId, StormClusterState stormClusterState) throws Exception {
Map<Integer, TaskInfo> taskToTaskInfo = mkTaskComponentAssignments(conf, topologyId);

// mkdir /ZK/taskbeats/topoologyId
int masterId = NimbusUtils.getTopologyMasterId(taskToTaskInfo);
TopologyTaskHbInfo topoTaskHbinfo = new TopologyTaskHbInfo(topologyId, masterId);
data.getTasksHeartbeat().put(topologyId, topoTaskHbinfo);
stormClusterState.topology_heartbeat(topologyId, topoTaskHbinfo);

if (taskToTaskInfo == null || taskToTaskInfo.size() == 0) {
throw new InvalidTopologyException("Failed to generate TaskIDs map");
}
// key is taskid, value is taskinfo
stormClusterState.set_task(topologyId, taskToTaskInfo);
}


这个函数首先为每个task生成一个taskId

public Map<Integer, TaskInfo> mkTaskComponentAssignments(Map<Object, Object> conf, String topologyid) throws IOException, InvalidTopologyException {

// @@@ here exist a little problem,
// we can directly pass stormConf from Submit method
Map<Object, Object> stormConf = StormConfig.read_nimbus_topology_conf(conf, topologyid);
StormTopology stopology = StormConfig.read_nimbus_topology_code(conf, topologyid);
StormTopology topology = Common.system_topology(stormConf, stopology);

return Common.mkTaskInfo(stormConf, topology, topologyid);
}


这里主要函数是mkTaskInfo 来完成

public static Map<Integer, TaskInfo> mkTaskInfo(Map<Object, Object> stormConf, StormTopology sysTopology, String topologyid) {

// use TreeMap to make task as sequence
// id(从0开始) -> taskinfo
Map<Integer, TaskInfo> rtn = new TreeMap<Integer, TaskInfo>();

Integer count = 0;
count = mkTaskMaker(stormConf, sysTopology.get_bolts(), rtn, count);
count = mkTaskMaker(stormConf, sysTopology.get_spouts(), rtn, count);
count = mkTaskMaker(stormConf, sysTopology.get_state_spouts(), rtn, count);

return rtn;
}
很清楚,就是分别为三类组件生成id(bolts, spouts, state_spouts) id, 他们id从0开始进行递增的。我们来看下mkTaskMaker

public static Integer mkTaskMaker(Map<Object, Object> stormConf, Map<String, ?> cidSpec, Map<Integer, TaskInfo> rtn, Integer cnt) {
if (cidSpec == null) {
LOG.warn("Component map is empty");
return cnt;
}

Set<?> entrySet = cidSpec.entrySet();
for (Iterator<?> it = entrySet.iterator(); it.hasNext();) {
Entry entry = (Entry) it.next();
Object obj = entry.getValue();

ComponentCommon common = null;
String componentType = "bolt";
if (obj instanceof Bolt) {
common = ((Bolt) obj).get_common();
componentType = "bolt";
} else if (obj instanceof SpoutSpec) {
common = ((SpoutSpec) obj).get_common();
componentType = "spout";
} else if (obj instanceof StateSpoutSpec) {
common = ((StateSpoutSpec) obj).get_common();
componentType = "spout";
}

if (common == null) {
throw new RuntimeException("No ComponentCommon of " + entry.getKey());
}

int declared = Thrift.parallelismHint(common);
Integer parallelism = declared;
// Map tmp = (Map) Utils_clj.from_json(common.get_json_conf());

Map newStormConf = new HashMap(stormConf);
// newStormConf.putAll(tmp);
Integer maxParallelism = JStormUtils.parseInt(newStormConf.get(Config.TOPOLOGY_MAX_TASK_PARALLELISM));
if (maxParallelism != null) {
parallelism = Math.min(maxParallelism, declared);
}

for (int i = 0; i < parallelism; i++) {
cnt++;
TaskInfo taskInfo = new TaskInfo((String) entry.getKey(), componentType);
rtn.put(cnt, taskInfo);
}
}
return cnt;
}
总结起来也很简单,遍历所有组件,对于每个组件确定组件的类型,并根据组件的并行度,创建对应数量的taskInfo

最后把任务的heatbeat信息和上面创建的taskInfo写入的zookeeper

4.5 makeAssignment 为拓扑结果创建任务

private void makeAssignment(String topologyName, String topologyId, TopologyInitialStatus status) throws FailedAssignTopologyException {
TopologyAssignEvent assignEvent = new TopologyAssignEvent();
assignEvent.setTopologyId(topologyId);
assignEvent.setScratch(false);
assignEvent.setTopologyName(topologyName);
assignEvent.setOldStatus(Thrift.topologyInitialStatusToStormStatus(status));

TopologyAssign.push(assignEvent);

boolean isSuccess = assignEvent.waitFinish();
if (isSuccess == true) {
LOG.info("Finish submit for " + topologyName);
} else {
throw new FailedAssignTopologyException(assignEvent.getErrorMsg());
}
}


把任务时间发送到一个LinkedBlockingQueue, 最后调用doTopologyAssignment , 这个函数中最主要的调用了mkAssignment函数

public Assignment mkAssignment(TopologyAssignEvent event) throws Exception {
String topologyId = event.getTopologyId();

LOG.info("Determining assignment for " + topologyId);

TopologyAssignContext context = prepareTopologyAssign(event);

Set<ResourceWorkerSlot> assignments = null;

if (!StormConfig.local_mode(nimbusData.getConf())) {

IToplogyScheduler scheduler = schedulers.get(DEFAULT_SCHEDULER_NAME);

assignments = scheduler.assignTasks(context);

} else {
assignments = mkLocalAssignment(context);
}

Assignment assignment = null;
if (assignments != null && assignments.size() > 0) {
//获取任务对应机器
Map<String, String> nodeHost = getTopologyNodeHost(context.getCluster(), context.getOldAssignment(), assignments);
//获取子任务的启动时间
Map<Integer, Integer> startTimes = getTaskStartTimes(context, nimbusData, topologyId, context.getOldAssignment(), assignments);
//获取代码的路径
String codeDir = StormConfig.masterStormdistRoot(nimbusData.getConf(), topologyId);

assignment = new Assignment(codeDir, assignments, nodeHost, startTimes);

//  the topology binary changed.
if (event.isScaleTopology()){
assignment.setAssignmentType(Assignment.AssignmentType.ScaleTopology);
}
StormClusterState stormClusterState = nimbusData.getStormClusterState();
//把任务分配的信息写入zk
stormClusterState.set_assignment(topologyId, assignment);

// update task heartbeat's start time
NimbusUtils.updateTaskHbStartTime(nimbusData, assignment, topologyId);

// @@@ TODO

// Update metrics information in ZK when rebalance or reassignment
// Only update metrics monitor status when creating topology
// if (context.getAssignType() ==
// TopologyAssignContext.ASSIGN_TYPE_REBALANCE
// || context.getAssignType() ==
// TopologyAssignContext.ASSIGN_TYPE_MONITOR)
// NimbusUtils.updateMetricsInfo(nimbusData, topologyId, assignment);

NimbusUtils.updateTopologyTaskTimeout(nimbusData, topologyId);

LOG.info("Successfully make assignment for topology id " + topologyId + ": " + assignment);
}
return assignment;
}


这样nimbus这里提交任务就算完成了,接下来就看supvisor了
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: