您的位置:首页 > 编程语言 > Java开发

Sequential Task Process based on Spring Event Framework

2013-11-07 23:46 381 查看
This is a follow-up to my last post, where I explained basic concept and usage of Observer Pattern in enterprise application. Here let's dive into a more detailed practice.

Given that you have a series sequential tasks(simple directed graph, no cycle inside) as below need to be processed, one root task has multiple sub tasks, and only after the pre task has been processed successfully
will a sub task start to be handled.



First, we need to define the task model(s) inside the sequential process framework to distinguish from that in application, the basic data structure of the task model contains basic task object oid, task relationships(pre tasks and sub tasks), task
location(root task or not), as well as task type(for example, email task, fax task etc.), so we've got the structure described in below Java code:

public class LightweightTask implements java.io.Serializable {

private long taskOid;
private String taskType;
private boolean isRootTask;
private List<LightweightTask> subTasks = new ArrayList<LightweightTask>();
private List<LightweightTask> preTasks = new ArrayList<LightweightTask>();

public LightweightTask(long taskOid, String taskType, boolean isRootTask) {
this.taskOid = taskOid;
this.isRootTask = isRootTask;
this.taskType = taskType;
}
// cut for brevity
}

Different task needs different task processor, I've define a common interface as an identify of task processor, all we need to do is implement it and process corresponding type of task. Of course I've got a dispatcher class which will accept a String
parameter(stands for task type) and return a task processor, this could be managed in spring by map injection, will not involved here.

OK, so let's continue the main story, we've got the tasks, and processors, so what should we do? From the root task, process it, and then if success, for each of its sub task, and repeat process like before, we also need to handle the exceptional
case, concurrent issue e.g., maybe we need to reprocess that task again..

This approach really works, but any improvements? Review the solution above, we can see some drawbacks, the task process and workflow control are tightly coupled, all tasks are processed synchronizely...maybe we can find another way, yes, Observer Pattern!

So, imagine that our task workflow somehow is like a pipeline, every task inside it is like a single stage, and the stage always starts from an input stream and ends with an
output stream, hence we can picture three kind of events in our case: InputStreamEvent, OutputStreamEvent and ExceptionStreamEvent. And the whole flow is like below:



The workflow starts from root stage, it will act in this way:

1. Publish an input event for current stage(task).

2. Input event listener will process the event.

3. If 2 got exception, then publish an exception event.

4. If the root cause of the exception is concurrent issue, then re-publish the input stream again, else exit the process.

5. If 2 process successfully with no exception, then publish an output event.

6. Output event listener convert the output event into multiple input event, and publish them.

So, here we have it. Now, for the implementation, I'd like to do it with the help of Spring Event Framework. And the InputStreamEvent will be like:

/**
* @ClassName: InputStreamEvent
* @Description: Java event used to monitor the process stream within task
*               pipeline network, attention that, no inheritance should be used
*               within Event POJO(s), which would lead to event multi-processings
* @author ZHUGA3
*
* @date Oct 23, 2013
*/
public class InputStreamEvent extends ApplicationEvent {

private LightweightTask currentStage;
/**
* @param currentStage
*/
public InputStreamEvent(LightweightTask currentStage) {
// Attention: Spring Event FWK takes Event Type and Event Source Object
// Type as an unique key for event listener, here we only care about the
// event type, hence simply use Object type as the event source type
super(new Object());
this.currentStage = currentStage;
}

public LightweightTask getCurrentStage() {
return currentStage;
}
public void setCurrentStage(LightweightTask currentStage) {
this.currentStage = currentStage;
}
private static final long serialVersionUID = -6855044300379366418L;
}

And the input listener will be like:

public class InputStreamEventListener extends
AbstractEventListener<InputStreamEvent> {

@Override
public void onApplicationEvent(InputStreamEvent event) {
final LightweightTask lTask = event.getCurrentStage();

if (!InterfaceTaskStatus.SUCCESS.toString().equals(
getITaskStatus(lTask.getTaskOid()))) {
try {
// Here we need inside exception if any to distinguish whether
// the process failure comes after an concurrent issue
new LBPFWriteTransaction() {
@Override
protected void process() {
String processorClass = LbpfProcessorDispatcherManager.instance
.getTaskProcessor(lTask.getActionType());
try {
LbpfProcessor processor = ((LbpfProcessor) Class
.forName(processorClass).newInstance());
processor.process(lTask.getTaskOid());
updateITaskstatus(
lTask.getTaskOid(),
processor.isProcessorSynchronized() ? InterfaceTaskStatus.SUCCESS
.toString()
: InterfaceTaskStatus.PENDING
.toString());
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}.execute();
} catch (Exception e) {
EventManager.instance.publishEvent(new ExceptionStreamEvent(
lTask, e));
return;
}
}

if (InterfaceTaskStatus.SUCCESS.toString().equals(
getITaskStatus(lTask.getTaskOid()))) {
EventManager.instance.publishEvent(new OutputStreamEvent(lTask));
}
}
}

Other models and listeners will be like these two, and from the code above, you can see Spring event listener is designed with the help of generic type, so do not use inheritance
when design event models unless you have special requirement that one event should be processed by multiple different listeners.

At last, don't forget to register the listener(s) in Spring for initialization when the context starts up.

So, In this way, we've finished the implementation in a total different way, already get rid of the coupling, so what about asynchronized process? Any chance for tasks where
there's no dependency with each other, like task2 and task 3?

Of course, Spring has build-in supports for both synchronized and asynchronized listener, what should we do is just to configure an executor(like a thread pool) for it, pretty cool, right?
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: