您的位置:首页 > 运维架构 > Shell

提交spark作业:如何在java中执行shell脚本

2016-06-30 16:24 633 查看

参考

java运行shell脚本方法示例

中华石衫

java1.6 API 文档

场景

一. 怎么在J2EE后端调用并执行spark作业呢?

执行spark作业通常用一个封装了./spark-submit命令及相关执行参数的shell脚本,例如:

/home/hadoop/spark-1.6.0-bin-hadoop2.6/bin/spark-submit \
--class cool.pengych.sparker.session.UserSessionAnalysis \
--num-executors 1 \
--driver-memory 100m \
--executor-memory 100m \
--executor-cores 1 \
--files /usr/local/hive/conf/hive-site.xml \
--driver-class-path /usr/local/hive/lib/mysql-connector-java-5.1.17.jar \
/usr/local/spark-study/spark-project-0.0.1-SNAPSHOT-jar-with-dependencies.jar \
${1}


其中 ${1}用于接收执行脚本时传递的第一个参数。

现假设该脚本路径为:

/home/pengyucheng/resource/spark_page.sh


二. 通过 SparkLauncher提交spark作业(相对方法一,更加简洁)

该部分转自:

通过SparkSubmit提交任务

代码

一. 在java中执行shell脚本

package cool.pengych.java.reflect;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;

/**
* 在java中执行 shell脚本
* @author pengyucheng
*/
public class CommandHelper
{
// default time out, in millseconds
public static int DEFAULT_TIMEOUT;
public static final int DEFAULT_INTERVAL = 1000;
public static long START;

public static CommandResult exec(String command) throws IOException, InterruptedException
{
Process process = Runtime.getRuntime().exec(command);
CommandResult commandResult = wait(process);
if (process != null)
{
process.destroy();
}
return commandResult;
}

private static boolean isOverTime()
{
return System.currentTimeMillis() - START >= DEFAULT_TIMEOUT;
}

private static CommandResult wait(Process process) throws InterruptedException, IOException
{
BufferedReader errorStreamReader = null;
BufferedReader inputStreamReader = null;
try {
errorStreamReader = new BufferedReader(new InputStreamReader(process.getErrorStream()));
inputStreamReader = new BufferedReader(new InputStreamReader(process.getInputStream()));
// timeout control
START = System.currentTimeMillis();
boolean isFinished = false;
for (;;)
{
if (isOverTime())
{
CommandResult result = new CommandResult();
result.setExitValue(CommandResult.EXIT_VALUE_TIMEOUT);
result.setOutput("Command process timeout");
return result;
}
if (isFinished)
{
CommandResult result = new CommandResult();
result.setExitValue(process.waitFor());
// parse error info
if (errorStreamReader.ready())
{
StringBuilder buffer = new StringBuilder();
String line;
while ((line = errorStreamReader.readLine()) != null)
{
buffer.append(line);
}
result.setError(buffer.toString());
}
// parse info
if (inputStreamReader.ready())
{
StringBuilder buffer = new StringBuilder();
String line;
while ((line = inputStreamReader.readLine()) != null)
{
buffer.append(line);
}
result.setOutput(buffer.toString());
}
return result;
}
try
{
isFinished = true;
process.exitValue();
}
catch (IllegalThreadStateException e)
{
// process hasn't finished yet
isFinished = false;
Thread.sleep(DEFAULT_INTERVAL);
}
}
}
finally
{
if (errorStreamReader != null)
{
try
{
errorStreamReader.close();
} catch (IOException e) {
}
}
if (inputStreamReader != null)
{
try
{
inputStreamReader.close();
}
catch (IOException e)
{
e.printStackTrace();
}
}
}
}
}


package cool.pengych.java.reflect;

/**
* 执行结果
* @author pengyucheng
*
*/
public class CommandResult
{
public static final int EXIT_VALUE_TIMEOUT = -1;
private String output;

void setOutput(String error)
{
output = error;
}

String getOutput()
{
return output;
}

int exitValue;

void setExitValue(int value)
{
exitValue = value;
}

int getExitValue()
{
return exitValue;
}

private String error;

/**
* @return the error
*/
public String getError()
{
return error;
}

/**
* @param error
*            the error to set
*/
public void setError(String error) {
this.error = error;
}
}


package cool.pengych.java.reflect;

import java.io.IOException;

/**
* 在java代码中调用执行shell脚本
* @author pengyucheng
*/
public class InvokeCommand
{
public static void main(String[] args)
{
String shpath = "/home/pengyucheng/resource/spark_page.sh";
String param = " 1"
String command1 = "/bin/sh " + shpath + param;
try
{
int timeout = Integer.parseInt("100");
CommandHelper.DEFAULT_TIMEOUT = timeout;
CommandResult result = CommandHelper.exec(command1);
if (result != null)
{
System.out.println("Output:" + result.getOutput());
System.out.println("Error:" +
}
}
catch (IOException ex)
{
System.out.println("IOException:" + ex.getLocalizedMessage());
}
catch (InterruptedException ex) {
System.out.println("InterruptedException:" + ex.getLocalizedMessage());
}
}
}


执行结果

Output:null
Error:/home/pengyucheng/resource/spark_page.sh: 1: /home/pengyucheng/resource/spark_page.sh: /home/hadoop/spark-1.6.0-bin-hadoop2.6/bin: Permission denied


二. 通过SparkLauncher执行Spark 任务

Sometimes we need to start our spark application from the another scala/java application. So we can use SparkLauncher. we have an example in which we make spark application and run it with another scala application.

Let see our spark application code.

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext

object SparkApp extends App{
val conf=new SparkConf().setMaster("local[*]").setAppName("spark-app")
val sc=new SparkContext(conf)
val rdd=sc.parallelize(Array(2,3,2,1))
rdd.saveAsTextFile("result")
sc.stop()
}


This is our simple spark application, make a jar of this application using sbt assembly, now we make a scala application through which we start this spark application as follows:

import org.apache.spark.launcher.SparkLauncher
object Launcher extends App {
val spark = new SparkLauncher()
.setSparkHome("/home/knoldus/spark-1.4.0-bin-hadoop2.6")
.setAppResource("/home/knoldus/spark_launcher-assembly-1.0.jar")
.setMainClass("SparkApp")
.setMaster("local[*]")
.launch();
spark.waitFor();
}


In the above code we use SparkLauncher object and set values for its like

setSparkHome(“/home/knoldus/spark-1.4.0-bin-hadoop2.6”) is use to set spark home which is use internally to call spark submit.

.setAppResource(“/home/knoldus/spark_launcher-assembly-1.0.jar”) is use to specify jar of our spark application.

.setMainClass(“SparkApp”) the entry point of the spark program i.e driver program.

.setMaster(“local[*]”) set the address of master where its start here now we run it on loacal machine.

.launch() is simply start our spark application.

Its a minimal requirement you can also set many other configurations like pass arguments, add jar , set configurations etc.

For source code you can check out following git repo:

Spark_laucher is our spark application

launcher_app is our scala application which start spark application

Change path according to you and make a jar of Spark_laucher, run launcher_app and see result RDD in this directory as a result of spark application because we simple save it as a text file.

总结

一、执行结果显示没有权限,我用的是 pengucheng用户去运行java程序的,而该用户没有相关目录的访问权限。怎么在执行的时候切换用户呢?

这个问题,一直没有得到解决 。。。暂时放一下!Output:null

Error:/home/pengyucheng/resource/spark_page.sh: 1: /home/pengyucheng/resource/spark_page.sh: /home/hadoop/spark-1.6.0-bin-hadoop2.6/bin: Permission denied

二、怎么判断spark作业是正常执行了,还是出现了异常呢?

若 exitValue 的值为 0 ,说明负责执行spark作业的进程执行完毕了-但是,执行完毕并不能保证作业就正常运行了,可能执行过程中出现了异常。因此,在 exitValue = 0 的条件下,若异常流没有东西输出(error==null),那么就可以判定spark作业正常执行完毕了。

/**
* Returns the exit value for the subprocess.
*
* @return the exit value of the subprocess represented by this
*         {@code Process} object.  By convention, the value
*         {@code 0} indicates normal termination.
* @throws IllegalThreadStateException if the subprocess represented
*         by this {@code Process} object has not yet terminated
*/
public abstract int exitValue();


以上结论,有待后续进一步验证其正确性。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: