提交spark作业:如何在java中执行shell脚本
2016-06-30 16:24
633 查看
参考
java运行shell脚本方法示例中华石衫
java1.6 API 文档
场景
一. 怎么在J2EE后端调用并执行spark作业呢?执行spark作业通常用一个封装了./spark-submit命令及相关执行参数的shell脚本,例如:
/home/hadoop/spark-1.6.0-bin-hadoop2.6/bin/spark-submit \ --class cool.pengych.sparker.session.UserSessionAnalysis \ --num-executors 1 \ --driver-memory 100m \ --executor-memory 100m \ --executor-cores 1 \ --files /usr/local/hive/conf/hive-site.xml \ --driver-class-path /usr/local/hive/lib/mysql-connector-java-5.1.17.jar \ /usr/local/spark-study/spark-project-0.0.1-SNAPSHOT-jar-with-dependencies.jar \ ${1}
其中 ${1}用于接收执行脚本时传递的第一个参数。
现假设该脚本路径为:
/home/pengyucheng/resource/spark_page.sh
二. 通过 SparkLauncher提交spark作业(相对方法一,更加简洁)
该部分转自:
通过SparkSubmit提交任务
代码
一. 在java中执行shell脚本package cool.pengych.java.reflect; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; /** * 在java中执行 shell脚本 * @author pengyucheng */ public class CommandHelper { // default time out, in millseconds public static int DEFAULT_TIMEOUT; public static final int DEFAULT_INTERVAL = 1000; public static long START; public static CommandResult exec(String command) throws IOException, InterruptedException { Process process = Runtime.getRuntime().exec(command); CommandResult commandResult = wait(process); if (process != null) { process.destroy(); } return commandResult; } private static boolean isOverTime() { return System.currentTimeMillis() - START >= DEFAULT_TIMEOUT; } private static CommandResult wait(Process process) throws InterruptedException, IOException { BufferedReader errorStreamReader = null; BufferedReader inputStreamReader = null; try { errorStreamReader = new BufferedReader(new InputStreamReader(process.getErrorStream())); inputStreamReader = new BufferedReader(new InputStreamReader(process.getInputStream())); // timeout control START = System.currentTimeMillis(); boolean isFinished = false; for (;;) { if (isOverTime()) { CommandResult result = new CommandResult(); result.setExitValue(CommandResult.EXIT_VALUE_TIMEOUT); result.setOutput("Command process timeout"); return result; } if (isFinished) { CommandResult result = new CommandResult(); result.setExitValue(process.waitFor()); // parse error info if (errorStreamReader.ready()) { StringBuilder buffer = new StringBuilder(); String line; while ((line = errorStreamReader.readLine()) != null) { buffer.append(line); } result.setError(buffer.toString()); } // parse info if (inputStreamReader.ready()) { StringBuilder buffer = new StringBuilder(); String line; while ((line = inputStreamReader.readLine()) != null) { buffer.append(line); } result.setOutput(buffer.toString()); } return result; } try { isFinished = true; process.exitValue(); } catch (IllegalThreadStateException e) { // process hasn't finished yet isFinished = false; Thread.sleep(DEFAULT_INTERVAL); } } } finally { if (errorStreamReader != null) { try { errorStreamReader.close(); } catch (IOException e) { } } if (inputStreamReader != null) { try { inputStreamReader.close(); } catch (IOException e) { e.printStackTrace(); } } } } }
package cool.pengych.java.reflect; /** * 执行结果 * @author pengyucheng * */ public class CommandResult { public static final int EXIT_VALUE_TIMEOUT = -1; private String output; void setOutput(String error) { output = error; } String getOutput() { return output; } int exitValue; void setExitValue(int value) { exitValue = value; } int getExitValue() { return exitValue; } private String error; /** * @return the error */ public String getError() { return error; } /** * @param error * the error to set */ public void setError(String error) { this.error = error; } }
package cool.pengych.java.reflect; import java.io.IOException; /** * 在java代码中调用执行shell脚本 * @author pengyucheng */ public class InvokeCommand { public static void main(String[] args) { String shpath = "/home/pengyucheng/resource/spark_page.sh"; String param = " 1" String command1 = "/bin/sh " + shpath + param; try { int timeout = Integer.parseInt("100"); CommandHelper.DEFAULT_TIMEOUT = timeout; CommandResult result = CommandHelper.exec(command1); if (result != null) { System.out.println("Output:" + result.getOutput()); System.out.println("Error:" + } } catch (IOException ex) { System.out.println("IOException:" + ex.getLocalizedMessage()); } catch (InterruptedException ex) { System.out.println("InterruptedException:" + ex.getLocalizedMessage()); } } }
执行结果
Output:null Error:/home/pengyucheng/resource/spark_page.sh: 1: /home/pengyucheng/resource/spark_page.sh: /home/hadoop/spark-1.6.0-bin-hadoop2.6/bin: Permission denied
二. 通过SparkLauncher执行Spark 任务
Sometimes we need to start our spark application from the another scala/java application. So we can use SparkLauncher. we have an example in which we make spark application and run it with another scala application.
Let see our spark application code.
import org.apache.spark.SparkConf import org.apache.spark.SparkContext object SparkApp extends App{ val conf=new SparkConf().setMaster("local[*]").setAppName("spark-app") val sc=new SparkContext(conf) val rdd=sc.parallelize(Array(2,3,2,1)) rdd.saveAsTextFile("result") sc.stop() }
This is our simple spark application, make a jar of this application using sbt assembly, now we make a scala application through which we start this spark application as follows:
import org.apache.spark.launcher.SparkLauncher object Launcher extends App { val spark = new SparkLauncher() .setSparkHome("/home/knoldus/spark-1.4.0-bin-hadoop2.6") .setAppResource("/home/knoldus/spark_launcher-assembly-1.0.jar") .setMainClass("SparkApp") .setMaster("local[*]") .launch(); spark.waitFor(); }
In the above code we use SparkLauncher object and set values for its like
setSparkHome(“/home/knoldus/spark-1.4.0-bin-hadoop2.6”) is use to set spark home which is use internally to call spark submit.
.setAppResource(“/home/knoldus/spark_launcher-assembly-1.0.jar”) is use to specify jar of our spark application.
.setMainClass(“SparkApp”) the entry point of the spark program i.e driver program.
.setMaster(“local[*]”) set the address of master where its start here now we run it on loacal machine.
.launch() is simply start our spark application.
Its a minimal requirement you can also set many other configurations like pass arguments, add jar , set configurations etc.
For source code you can check out following git repo:
Spark_laucher is our spark application
launcher_app is our scala application which start spark application
Change path according to you and make a jar of Spark_laucher, run launcher_app and see result RDD in this directory as a result of spark application because we simple save it as a text file.
总结
一、执行结果显示没有权限,我用的是 pengucheng用户去运行java程序的,而该用户没有相关目录的访问权限。怎么在执行的时候切换用户呢?这个问题,一直没有得到解决 。。。暂时放一下!Output:null
Error:/home/pengyucheng/resource/spark_page.sh: 1: /home/pengyucheng/resource/spark_page.sh: /home/hadoop/spark-1.6.0-bin-hadoop2.6/bin: Permission denied
二、怎么判断spark作业是正常执行了,还是出现了异常呢?
若 exitValue 的值为 0 ,说明负责执行spark作业的进程执行完毕了-但是,执行完毕并不能保证作业就正常运行了,可能执行过程中出现了异常。因此,在 exitValue = 0 的条件下,若异常流没有东西输出(error==null),那么就可以判定spark作业正常执行完毕了。
/** * Returns the exit value for the subprocess. * * @return the exit value of the subprocess represented by this * {@code Process} object. By convention, the value * {@code 0} indicates normal termination. * @throws IllegalThreadStateException if the subprocess represented * by this {@code Process} object has not yet terminated */ public abstract int exitValue();
以上结论,有待后续进一步验证其正确性。
相关文章推荐
- 网卡流量监控脚本 ( Shell )
- powershell玩转SQL SERVER所有版本
- Linux SSH 远程登录后显示-bash-4.1$的解决办法(原创)
- CentOS Grub、BASH 故障、解决方法
- Linux SSH 远程登录后显示-bash-4.1$的解决办法
- shell基础之利用shell检测目录是否存在,不存在提示让用户创建目录
- Shell运算符:Shell算数运算符、关系运算符、布尔运算符、字符串运算符等
- Linux shell 命令
- CentOS6 Shell脚本/bin/bash^M: bad interpreter错误解决方法
- shell常用命令整理
- 基础hadoop shell命令
- #!/bin/sh与#!/bin/bash的区别
- PowerShell 将powershell脚本转换成exe
- Linux Shell 通配符、元字符、转义符学习笔记
- 什么是adb shell模式
- linux查找webshell
- shellz中算数运算归纳
- shell 中循环总结
- windows svn右键清理.svn文件
- HBase shell保存历史命令