Storm 多语言支持之ShellBolt原理及改进
2014-12-02 10:08
281 查看
转载请注明出处:http://blog.csdn.net/jmppok/article/details/17752221
Storm是一个实时分布式流处理框架,现在正被越来越多的人使用。众所周知,Storm是一个Java平台,这就给我们的使用带来了一个问题:我们在实际工作中很少从0开始,往往是在一些已有的基础执行进行开发,而如果我们已有的基础程序不是Java平台而是C/C++,python等,如何将其移植到Storm中运行呢?
为了解决这个问题,Storm本身提出了ShellBolt,用于支持由不同语言便编写的程序在Storm平台中运行。
Storm中以Topology作为运行的基本单位。而Topology又是由Spout和Bolt组成,实际上Spout是数据接入者,而Bolt才是Topology中数据的真正处理者。
于是我们只需要能将程序封装为一个Topology中Bolt的就可以了。而Storm提出的ShellBolt就完成了该功能,ShellBolt本质上是一个壳子程序,他允许开发者将自己的程序(任意的程序)封装成一个ShellBolt,从而加入到Topology中运行。是不是很神奇呢?
下面我们就来看一下ShellBolt的原理。
![](https://oscdn.geek-share.com/Uploads/Images/Content/202008/10/f7b1bcb1b908ed85253b48b698e81fb9)
1)ShellBolt本质上是一个Bolt;
2)ShellBolt中接收Shell命令,根据Shell命令调用创建一个ShellProcess。并分别启动两个线程,向该ShellProcess发送消息和读取消息;
3)再进一步,ShellProcess实际上又是调用了ProcessBuild创建了一个Process,并通过该Process的InputStream,OutputStream,ErrorStream和该Process进行交互。
到这里,事情似乎变得清晰了:
ShellBolt本质上就是通过Shell命令,启动新的进程,并通过该进程的stdIn,stdOut,stdErr和其进行交互。
方法看似简单,其实蕴含了至理“简单的才是最好的”。
上面说到ShellBolt通过通过Shell命令,启动新的进程,并通过该进程的stdIn,stdOut,stdErr和其进行交互。这样做可能可回导致一下三个问题:
1)潜藏风险
交互完全是读写进程的StdIn,StdOut;这就对编程者提出了要求,不能私自向stdIn和StdOUt上输出东西,也就是说在程序中绝对不能有printf,scanf,cout,cin,System.Out,System.in之类的。
这一点,对于小规模程序或者完全新开发的程序可以进行约定。可是对于打响程序,或者基于已有程序进行重构的时候就变得不靠谱了,只有上帝才知道有没有人偷偷的写了stdin或stdout,这必然会导致程序崩溃。而且你根本无从查起。
2)效率低
还是交互,通过读写进程的StdIn,StdOut交互,所有数据必须是文本的,Storm中通过Json编码实现。而编码解码,写Stdio,Stdout,这种交互方式的效率无疑是比较低的。
3)僵尸进程
ShellBolt根据Shell命令,启动新的进程,而目前还没有很好的方法保证它会杀死所有他启动的进程。本人在使用的过程中就经常发现一个topology停止后,后台经常会驻留有还没有死掉的进程。
4)占用资源
ShellBolt根据Shell命令,启动新的进程。也即是说在task较多时,它会启动很多进程,比较占用资源。当然这个不能说是缺点,因为进程是独立空间,当你的程序需要的资源比较多时,启动单独的进程是很好的选择。
针对上面分析的问题,有以下几点改进空间:
1)不再是使用stdio,stdout通讯,而是使用其它通讯方式进行进程间的通讯,如pipe,共享内存,socket等。由于Java本身的限制(没有进程通讯的管道和共享内存),所以最后是使用socket通讯。这样不但避免了使用stdio和stdout时的风险,而且不再需要Json编码,从而可以提交效率。
大概的方法就是:
改写ShellProcess,启动一个ServerSocket,并监听端口。通过Shell命令将该端口传递给子Process,子Process通过该端口和ShellProcess进行通讯。
貌似挺复杂的,因此作者也没有花时间去实现。而且不同的客户端都得去连接该socket,解析消息协议。
这就说明了:虽然是一个小小的改动,但代价是巨大的!
2)可以通过JNI封装与C/C++进行通讯,这个效率最高,实现也比较简单。但应用场景有限,只能适用于C++。不过他刚好能满足作者的需求。
实现方法和 1)中类似,改写ShellProcess,其中实现一个JNIInvoker,通过JNIInvoker调用C++,写消息,读消息。
相比1)中方法效率更高,实现难度小,但只适用于C++。
3)记录ShellBolt启动的进程,在程序结束时,kill掉。
ShellBolt.Java
[java] view
plaincopy
![](https://oscdn.geek-share.com/Uploads/Images/Content/201611/a7c8e286f463007e2a900848b93dd72c.png)
![](https://oscdn.geek-share.com/Uploads/Images/Content/201611/9e12f1d3e499fc949c886e7c9e0484f9)
package backtype.storm.task;
import backtype.storm.generated.ShellComponent;
import backtype.storm.tuple.MessageId;
import backtype.storm.tuple.Tuple;
import backtype.storm.utils.Utils;
import backtype.storm.utils.ShellProcess;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import static java.util.concurrent.TimeUnit.SECONDS;
import java.util.List;
import java.util.Map;
import java.util.Random;
import org.apache.log4j.Logger;
import org.json.simple.JSONObject;
/**
* A bolt that shells out to another process to process tuples. ShellBolt
* communicates with that process over stdio using a special protocol. An ~100
* line library is required to implement that protocol, and adapter libraries
* currently exist for Ruby and Python.
*
* <p>To run a ShellBolt on a cluster, the scripts that are shelled out to must be
* in the resources directory within the jar submitted to the master.
* During development/testing on a local machine, that resources directory just
* needs to be on the classpath.</p>
*
* <p>When creating topologies using the Java API, subclass this bolt and implement
* the IRichBolt interface to create components for the topology that use other languages. For example:
* </p>
*
* <pre>
* public class MyBolt extends ShellBolt implements IRichBolt {
* public MyBolt() {
* super("python", "mybolt.py");
* }
*
* public void declareOutputFields(OutputFieldsDeclarer declarer) {
* declarer.declare(new Fields("field1", "field2"));
* }
* }
* </pre>
*/
public class ShellBolt implements IBolt {
public static Logger LOG = Logger.getLogger(ShellBolt.class);
Process _subprocess;
OutputCollector _collector;
Map<String, Tuple> _inputs = new ConcurrentHashMap<String, Tuple>();
private String[] _command;
private ShellProcess _process;
private volatile boolean _running = true;
private volatile Throwable _exception;
private LinkedBlockingQueue _pendingWrites = new LinkedBlockingQueue();
private Random _rand;
private Thread _readerThread;
private Thread _writerThread;
public ShellBolt(ShellComponent component) {
this(component.get_execution_command(), component.get_script());
}
public ShellBolt(String... command) {
_command = command;
}
public void prepare(Map stormConf, TopologyContext context,
final OutputCollector collector) {
_rand = new Random();
_process = new ShellProcess(_command);
_collector = collector;
try {
//subprocesses must send their pid first thing
Number subpid = _process.launch(stormConf, context);
LOG.info("Launched subprocess with pid " + subpid);
} catch (IOException e) {
throw new RuntimeException("Error when launching multilang subprocess\n" + _process.getErrorsString(), e);
}
// reader
_readerThread = new Thread(new Runnable() {
public void run() {
while (_running) {
try {
JSONObject action = _process.readMessage();
if (action == null) {
// ignore sync
}
String command = (String) action.get("command");
if(command.equals("ack")) {
handleAck(action);
} else if (command.equals("fail")) {
handleFail(action);
} else if (command.equals("error")) {
handleError(action);
} else if (command.equals("log")) {
String msg = (String) action.get("msg");
LOG.info("Shell msg: " + msg);
} else if (command.equals("emit")) {
handleEmit(action);
}
} catch (InterruptedException e) {
} catch (Throwable t) {
die(t);
}
}
}
});
_readerThread.start();
_writerThread = new Thread(new Runnable() {
public void run() {
while (_running) {
try {
Object write = _pendingWrites.poll(1, SECONDS);
if (write != null) {
_process.writeMessage(write);
}
} catch (InterruptedException e) {
} catch (Throwable t) {
die(t);
}
}
}
});
_writerThread.start();
}
public void execute(Tuple input) {
if (_exception != null) {
throw new RuntimeException(_exception);
}
//just need an id
String genId = Long.toString(_rand.nextLong());
_inputs.put(genId, input);
try {
JSONObject obj = new JSONObject();
obj.put("id", genId);
obj.put("comp", input.getSourceComponent());
obj.put("stream", input.getSourceStreamId());
obj.put("task", input.getSourceTask());
obj.put("tuple", input.getValues());
_pendingWrites.put(obj);
} catch(InterruptedException e) {
throw new RuntimeException("Error during multilang processing", e);
}
}
public void cleanup() {
_running = false;
_process.destroy();
_inputs.clear();
}
private void handleAck(Map action) {
String id = (String) action.get("id");
Tuple acked = _inputs.remove(id);
if(acked==null) {
throw new RuntimeException("Acked a non-existent or already acked/failed id: " + id);
}
_collector.ack(acked);
}
private void handleFail(Map action) {
String id = (String) action.get("id");
Tuple failed = _inputs.remove(id);
if(failed==null) {
throw new RuntimeException("Failed a non-existent or already acked/failed id: " + id);
}
_collector.fail(failed);
}
private void handleError(Map action) {
String msg = (String) action.get("msg");
_collector.reportError(new Exception("Shell Process Exception: " + msg));
}
private void handleEmit(Map action) throws InterruptedException {
String stream = (String) action.get("stream");
if(stream==null) stream = Utils.DEFAULT_STREAM_ID;
Long task = (Long) action.get("task");
List<Object> tuple = (List) action.get("tuple");
List<Tuple> anchors = new ArrayList<Tuple>();
Object anchorObj = action.get("anchors");
if(anchorObj!=null) {
if(anchorObj instanceof String) {
anchorObj = Arrays.asList(anchorObj);
}
for(Object o: (List) anchorObj) {
Tuple t = _inputs.get((String) o);
if (t == null) {
throw new RuntimeException("Anchored onto " + o + " after ack/fail");
}
anchors.add(t);
}
}
if(task==null) {
List<Integer> outtasks = _collector.emit(stream, anchors, tuple);
Object need_task_ids = action.get("need_task_ids");
if (need_task_ids == null || ((Boolean) need_task_ids).booleanValue()) {
_pendingWrites.put(outtasks);
}
} else {
_collector.emitDirect((int)task.longValue(), stream, anchors, tuple);
}
}
private void die(Throwable exception) {
_exception = exception;
}
}
ShellProcess.java
[java] view
plaincopy
![](https://oscdn.geek-share.com/Uploads/Images/Content/201611/a7c8e286f463007e2a900848b93dd72c.png)
![](https://oscdn.geek-share.com/Uploads/Images/Content/201611/9e12f1d3e499fc949c886e7c9e0484f9)
package backtype.storm.utils;
import backtype.storm.task.TopologyContext;
import java.io.BufferedReader;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.DataOutputStream;
import java.io.File;
import java.io.IOException;
import java.util.Map;
import java.util.List;
import org.apache.commons.io.IOUtils;
import org.json.simple.JSONObject;
import org.json.simple.JSONValue;
public class ShellProcess {
private DataOutputStream processIn;
private BufferedReader processOut;
private InputStream processErrorStream;
private Process _subprocess;
private String[] command;
public ShellProcess(String[] command) {
this.command = command;
}
public Number launch(Map conf, TopologyContext context) throws IOException {
ProcessBuilder builder = new ProcessBuilder(command);
builder.directory(new File(context.getCodeDir()));
_subprocess = builder.start();
processIn = new DataOutputStream(_subprocess.getOutputStream());
processOut = new BufferedReader(new InputStreamReader(_subprocess.getInputStream()));
processErrorStream = _subprocess.getErrorStream();
JSONObject setupInfo = new JSONObject();
setupInfo.put("pidDir", context.getPIDDir());
setupInfo.put("conf", conf);
setupInfo.put("context", context);
writeMessage(setupInfo);
return (Number)readMessage().get("pid");
}
public void destroy() {
_subprocess.destroy();
}
public void writeMessage(Object msg) throws IOException {
writeString(JSONValue.toJSONString(msg));
}
private void writeString(String str) throws IOException {
byte[] strBytes = str.getBytes("UTF-8");
processIn.write(strBytes, 0, strBytes.length);
processIn.writeBytes("\nend\n");
processIn.flush();
}
public JSONObject readMessage() throws IOException {
String string = readString();
JSONObject msg = (JSONObject)JSONValue.parse(string);
if (msg != null) {
return msg;
} else {
throw new IOException("unable to parse: " + string);
}
}
public String getErrorsString() {
if(processErrorStream!=null) {
try {
return IOUtils.toString(processErrorStream);
} catch(IOException e) {
return "(Unable to capture error stream)";
}
} else {
return "";
}
}
private String readString() throws IOException {
StringBuilder line = new StringBuilder();
//synchronized (processOut) {
while (true) {
String subline = processOut.readLine();
if(subline==null) {
StringBuilder errorMessage = new StringBuilder();
errorMessage.append("Pipe to subprocess seems to be broken!");
if (line.length() == 0) {
errorMessage.append(" No output read.\n");
}
else {
errorMessage.append(" Currently read output: " + line.toString() + "\n");
}
errorMessage.append("Shell Process Exception:\n");
errorMessage.append(getErrorsString() + "\n");
throw new RuntimeException(errorMessage.toString());
}
if(subline.equals("end")) {
break;
}
if(line.length()!=0) {
line.append("\n");
}
line.append(subline);
}
//}
return line.toString();
}
Storm是一个实时分布式流处理框架,现在正被越来越多的人使用。众所周知,Storm是一个Java平台,这就给我们的使用带来了一个问题:我们在实际工作中很少从0开始,往往是在一些已有的基础执行进行开发,而如果我们已有的基础程序不是Java平台而是C/C++,python等,如何将其移植到Storm中运行呢?
为了解决这个问题,Storm本身提出了ShellBolt,用于支持由不同语言便编写的程序在Storm平台中运行。
1.ShellBolt原理
Storm中以Topology作为运行的基本单位。而Topology又是由Spout和Bolt组成,实际上Spout是数据接入者,而Bolt才是Topology中数据的真正处理者。于是我们只需要能将程序封装为一个Topology中Bolt的就可以了。而Storm提出的ShellBolt就完成了该功能,ShellBolt本质上是一个壳子程序,他允许开发者将自己的程序(任意的程序)封装成一个ShellBolt,从而加入到Topology中运行。是不是很神奇呢?
下面我们就来看一下ShellBolt的原理。
1)ShellBolt本质上是一个Bolt;
2)ShellBolt中接收Shell命令,根据Shell命令调用创建一个ShellProcess。并分别启动两个线程,向该ShellProcess发送消息和读取消息;
3)再进一步,ShellProcess实际上又是调用了ProcessBuild创建了一个Process,并通过该Process的InputStream,OutputStream,ErrorStream和该Process进行交互。
到这里,事情似乎变得清晰了:
ShellBolt本质上就是通过Shell命令,启动新的进程,并通过该进程的stdIn,stdOut,stdErr和其进行交互。
方法看似简单,其实蕴含了至理“简单的才是最好的”。
2.ShellBolt可能存在的问题
上面说到ShellBolt通过通过Shell命令,启动新的进程,并通过该进程的stdIn,stdOut,stdErr和其进行交互。这样做可能可回导致一下三个问题:1)潜藏风险
交互完全是读写进程的StdIn,StdOut;这就对编程者提出了要求,不能私自向stdIn和StdOUt上输出东西,也就是说在程序中绝对不能有printf,scanf,cout,cin,System.Out,System.in之类的。
这一点,对于小规模程序或者完全新开发的程序可以进行约定。可是对于打响程序,或者基于已有程序进行重构的时候就变得不靠谱了,只有上帝才知道有没有人偷偷的写了stdin或stdout,这必然会导致程序崩溃。而且你根本无从查起。
2)效率低
还是交互,通过读写进程的StdIn,StdOut交互,所有数据必须是文本的,Storm中通过Json编码实现。而编码解码,写Stdio,Stdout,这种交互方式的效率无疑是比较低的。
3)僵尸进程
ShellBolt根据Shell命令,启动新的进程,而目前还没有很好的方法保证它会杀死所有他启动的进程。本人在使用的过程中就经常发现一个topology停止后,后台经常会驻留有还没有死掉的进程。
4)占用资源
ShellBolt根据Shell命令,启动新的进程。也即是说在task较多时,它会启动很多进程,比较占用资源。当然这个不能说是缺点,因为进程是独立空间,当你的程序需要的资源比较多时,启动单独的进程是很好的选择。
3.ShellBolt的改进空间
针对上面分析的问题,有以下几点改进空间:1)不再是使用stdio,stdout通讯,而是使用其它通讯方式进行进程间的通讯,如pipe,共享内存,socket等。由于Java本身的限制(没有进程通讯的管道和共享内存),所以最后是使用socket通讯。这样不但避免了使用stdio和stdout时的风险,而且不再需要Json编码,从而可以提交效率。
大概的方法就是:
改写ShellProcess,启动一个ServerSocket,并监听端口。通过Shell命令将该端口传递给子Process,子Process通过该端口和ShellProcess进行通讯。
貌似挺复杂的,因此作者也没有花时间去实现。而且不同的客户端都得去连接该socket,解析消息协议。
这就说明了:虽然是一个小小的改动,但代价是巨大的!
2)可以通过JNI封装与C/C++进行通讯,这个效率最高,实现也比较简单。但应用场景有限,只能适用于C++。不过他刚好能满足作者的需求。
实现方法和 1)中类似,改写ShellProcess,其中实现一个JNIInvoker,通过JNIInvoker调用C++,写消息,读消息。
相比1)中方法效率更高,实现难度小,但只适用于C++。
3)记录ShellBolt启动的进程,在程序结束时,kill掉。
4.最后附上ShellBolt和ShellProcess代码
ShellBolt.Java[java] view
plaincopy
![](https://oscdn.geek-share.com/Uploads/Images/Content/201611/a7c8e286f463007e2a900848b93dd72c.png)
package backtype.storm.task;
import backtype.storm.generated.ShellComponent;
import backtype.storm.tuple.MessageId;
import backtype.storm.tuple.Tuple;
import backtype.storm.utils.Utils;
import backtype.storm.utils.ShellProcess;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import static java.util.concurrent.TimeUnit.SECONDS;
import java.util.List;
import java.util.Map;
import java.util.Random;
import org.apache.log4j.Logger;
import org.json.simple.JSONObject;
/**
* A bolt that shells out to another process to process tuples. ShellBolt
* communicates with that process over stdio using a special protocol. An ~100
* line library is required to implement that protocol, and adapter libraries
* currently exist for Ruby and Python.
*
* <p>To run a ShellBolt on a cluster, the scripts that are shelled out to must be
* in the resources directory within the jar submitted to the master.
* During development/testing on a local machine, that resources directory just
* needs to be on the classpath.</p>
*
* <p>When creating topologies using the Java API, subclass this bolt and implement
* the IRichBolt interface to create components for the topology that use other languages. For example:
* </p>
*
* <pre>
* public class MyBolt extends ShellBolt implements IRichBolt {
* public MyBolt() {
* super("python", "mybolt.py");
* }
*
* public void declareOutputFields(OutputFieldsDeclarer declarer) {
* declarer.declare(new Fields("field1", "field2"));
* }
* }
* </pre>
*/
public class ShellBolt implements IBolt {
public static Logger LOG = Logger.getLogger(ShellBolt.class);
Process _subprocess;
OutputCollector _collector;
Map<String, Tuple> _inputs = new ConcurrentHashMap<String, Tuple>();
private String[] _command;
private ShellProcess _process;
private volatile boolean _running = true;
private volatile Throwable _exception;
private LinkedBlockingQueue _pendingWrites = new LinkedBlockingQueue();
private Random _rand;
private Thread _readerThread;
private Thread _writerThread;
public ShellBolt(ShellComponent component) {
this(component.get_execution_command(), component.get_script());
}
public ShellBolt(String... command) {
_command = command;
}
public void prepare(Map stormConf, TopologyContext context,
final OutputCollector collector) {
_rand = new Random();
_process = new ShellProcess(_command);
_collector = collector;
try {
//subprocesses must send their pid first thing
Number subpid = _process.launch(stormConf, context);
LOG.info("Launched subprocess with pid " + subpid);
} catch (IOException e) {
throw new RuntimeException("Error when launching multilang subprocess\n" + _process.getErrorsString(), e);
}
// reader
_readerThread = new Thread(new Runnable() {
public void run() {
while (_running) {
try {
JSONObject action = _process.readMessage();
if (action == null) {
// ignore sync
}
String command = (String) action.get("command");
if(command.equals("ack")) {
handleAck(action);
} else if (command.equals("fail")) {
handleFail(action);
} else if (command.equals("error")) {
handleError(action);
} else if (command.equals("log")) {
String msg = (String) action.get("msg");
LOG.info("Shell msg: " + msg);
} else if (command.equals("emit")) {
handleEmit(action);
}
} catch (InterruptedException e) {
} catch (Throwable t) {
die(t);
}
}
}
});
_readerThread.start();
_writerThread = new Thread(new Runnable() {
public void run() {
while (_running) {
try {
Object write = _pendingWrites.poll(1, SECONDS);
if (write != null) {
_process.writeMessage(write);
}
} catch (InterruptedException e) {
} catch (Throwable t) {
die(t);
}
}
}
});
_writerThread.start();
}
public void execute(Tuple input) {
if (_exception != null) {
throw new RuntimeException(_exception);
}
//just need an id
String genId = Long.toString(_rand.nextLong());
_inputs.put(genId, input);
try {
JSONObject obj = new JSONObject();
obj.put("id", genId);
obj.put("comp", input.getSourceComponent());
obj.put("stream", input.getSourceStreamId());
obj.put("task", input.getSourceTask());
obj.put("tuple", input.getValues());
_pendingWrites.put(obj);
} catch(InterruptedException e) {
throw new RuntimeException("Error during multilang processing", e);
}
}
public void cleanup() {
_running = false;
_process.destroy();
_inputs.clear();
}
private void handleAck(Map action) {
String id = (String) action.get("id");
Tuple acked = _inputs.remove(id);
if(acked==null) {
throw new RuntimeException("Acked a non-existent or already acked/failed id: " + id);
}
_collector.ack(acked);
}
private void handleFail(Map action) {
String id = (String) action.get("id");
Tuple failed = _inputs.remove(id);
if(failed==null) {
throw new RuntimeException("Failed a non-existent or already acked/failed id: " + id);
}
_collector.fail(failed);
}
private void handleError(Map action) {
String msg = (String) action.get("msg");
_collector.reportError(new Exception("Shell Process Exception: " + msg));
}
private void handleEmit(Map action) throws InterruptedException {
String stream = (String) action.get("stream");
if(stream==null) stream = Utils.DEFAULT_STREAM_ID;
Long task = (Long) action.get("task");
List<Object> tuple = (List) action.get("tuple");
List<Tuple> anchors = new ArrayList<Tuple>();
Object anchorObj = action.get("anchors");
if(anchorObj!=null) {
if(anchorObj instanceof String) {
anchorObj = Arrays.asList(anchorObj);
}
for(Object o: (List) anchorObj) {
Tuple t = _inputs.get((String) o);
if (t == null) {
throw new RuntimeException("Anchored onto " + o + " after ack/fail");
}
anchors.add(t);
}
}
if(task==null) {
List<Integer> outtasks = _collector.emit(stream, anchors, tuple);
Object need_task_ids = action.get("need_task_ids");
if (need_task_ids == null || ((Boolean) need_task_ids).booleanValue()) {
_pendingWrites.put(outtasks);
}
} else {
_collector.emitDirect((int)task.longValue(), stream, anchors, tuple);
}
}
private void die(Throwable exception) {
_exception = exception;
}
}
ShellProcess.java
[java] view
plaincopy
![](https://oscdn.geek-share.com/Uploads/Images/Content/201611/a7c8e286f463007e2a900848b93dd72c.png)
package backtype.storm.utils;
import backtype.storm.task.TopologyContext;
import java.io.BufferedReader;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.DataOutputStream;
import java.io.File;
import java.io.IOException;
import java.util.Map;
import java.util.List;
import org.apache.commons.io.IOUtils;
import org.json.simple.JSONObject;
import org.json.simple.JSONValue;
public class ShellProcess {
private DataOutputStream processIn;
private BufferedReader processOut;
private InputStream processErrorStream;
private Process _subprocess;
private String[] command;
public ShellProcess(String[] command) {
this.command = command;
}
public Number launch(Map conf, TopologyContext context) throws IOException {
ProcessBuilder builder = new ProcessBuilder(command);
builder.directory(new File(context.getCodeDir()));
_subprocess = builder.start();
processIn = new DataOutputStream(_subprocess.getOutputStream());
processOut = new BufferedReader(new InputStreamReader(_subprocess.getInputStream()));
processErrorStream = _subprocess.getErrorStream();
JSONObject setupInfo = new JSONObject();
setupInfo.put("pidDir", context.getPIDDir());
setupInfo.put("conf", conf);
setupInfo.put("context", context);
writeMessage(setupInfo);
return (Number)readMessage().get("pid");
}
public void destroy() {
_subprocess.destroy();
}
public void writeMessage(Object msg) throws IOException {
writeString(JSONValue.toJSONString(msg));
}
private void writeString(String str) throws IOException {
byte[] strBytes = str.getBytes("UTF-8");
processIn.write(strBytes, 0, strBytes.length);
processIn.writeBytes("\nend\n");
processIn.flush();
}
public JSONObject readMessage() throws IOException {
String string = readString();
JSONObject msg = (JSONObject)JSONValue.parse(string);
if (msg != null) {
return msg;
} else {
throw new IOException("unable to parse: " + string);
}
}
public String getErrorsString() {
if(processErrorStream!=null) {
try {
return IOUtils.toString(processErrorStream);
} catch(IOException e) {
return "(Unable to capture error stream)";
}
} else {
return "";
}
}
private String readString() throws IOException {
StringBuilder line = new StringBuilder();
//synchronized (processOut) {
while (true) {
String subline = processOut.readLine();
if(subline==null) {
StringBuilder errorMessage = new StringBuilder();
errorMessage.append("Pipe to subprocess seems to be broken!");
if (line.length() == 0) {
errorMessage.append(" No output read.\n");
}
else {
errorMessage.append(" Currently read output: " + line.toString() + "\n");
}
errorMessage.append("Shell Process Exception:\n");
errorMessage.append(getErrorsString() + "\n");
throw new RuntimeException(errorMessage.toString());
}
if(subline.equals("end")) {
break;
}
if(line.length()!=0) {
line.append("\n");
}
line.append(subline);
}
//}
return line.toString();
}
相关文章推荐
- Storm 多语言支持之ShellBolt原理及改进
- Storm 多语言支持之ShellBolt原理及改进
- 国际:支持还是反对?对Java SE 7 语言的若干细微改进
- Storm 多语言支持
- Eclipse 插件EclipseShell 初版发布 支持脚本语言
- Storm Component多语言支持剖析
- 查看CentOS系统的shell脚本语言支持情况
- C语言 实现一个简单的Shell (支持管道和"cd")
- ctags 对shell语言的支持
- Eclipse 插件EclipseShell 初版发布 支持脚本语言
- Taglib原理和实现 第五章:再论支持El表达式和jstl标签
- 将PHP作为Shell脚本语言使用
- Taglib 原理和实现:第二章 让Tag支持El表达式
- 第二章:Taglib 原理和实现:让Tag支持El表达式
- WEB页面多语言支持解决方案(转自CSDN)
- [小改进]编辑器支持Tab键
- Java中,利用语言与平台特性改进RMI分布计算框架
- UTF-8到acsii的转换(让自己的C++程序支持多语言)
- 怎样编写支持多国语言的程序, 让它能够根据所运行的操作系统自动显示相应的文字。
- Outlook中时间的自然语言支持