您的位置:首页 > 运维架构 > Shell

Hadoop基于Shell命令与底层Unix操作系统的交互

2013-03-20 16:50 337 查看
在阅读Hadoop源代码过程中,在org.apache.hadoop.security.UnixUserGroupInformation类中,需要获取到Unix系统的用户名和所属组的信息,就需要通过执行Shell命令得到相应的结果,这里,通过阅读Hadoop项目org.apache.hadoop.util包、org.apache.hadoop.fs.shell包、org.apache.hadoop.fs包中文件来了解,Hadoop基于Shell命令与底层Unix操作系统的交互,以及在MapReduce模型中通过命令行的方式提交管理计算任务的一些详细情况。

首先看一下,与Unix系统命令行执行有关的一些类的继承层次关系:

[java] view
plaincopy

◦org.apache.hadoop.util.Shell

◦org.apache.hadoop.util.Shell.ShellCommandExecutor

◦org.apache.hadoop.fs.DF

◦org.apache.hadoop.fs.DU

◦org.apache.hadoop.fs.FileUtil.CygPathCommand

Shell命令最顶层的抽象类是org.apache.hadoop.util.Shell,它定义了如何在当前文件系统环境中,与底层的Unix系统通过命令行进行必要的交互。

从org.apache.hadoop.util.Shell类定义的属性来看,可以分为两种类型的属性,一种是static final的字符串命令,另一种是与实现命令的执行相关的属性。第一种属性(我把两个static final的获取命令的方式也列出,放到这里的属性的后面)如下所示:

[java] view
plaincopy

/** Unix命令whoami :执行命令得到当前用户名 */

public final static String USER_NAME_COMMAND = "whoami";

/** Unix命令chmod :执行命令设置用户操作权限 */

public static final String SET_PERMISSION_COMMAND = "chmod";

/** Unix命令chown :执行命令设置属主 */

public static final String SET_OWNER_COMMAND = "chown";

/** Unix命令chgrp :执行命令设置组 */

public static final String SET_GROUP_COMMAND = "chgrp";

/** Unix命令bash -c groups :执行命令得到当前用户所属的组列表 */

public static String[] getGROUPS_COMMAND() {

return new String[]{"bash", "-c", "groups"};

}

/** Unix命令ls -ld :执行命令设置组,不支持Windows系统,但可以支持Cygwin */

public static String[] getGET_PERMISSION_COMMAND() {

return new String[] {(WINDOWS ? "ls" : "/bin/ls"), "-ld"};

}

看到这些Unix的命令,应该非常熟悉。

第二种属性,都属于与如何实现定义的上述命令行的执行有关的,如下所示:

[java] view
plaincopy

private long interval; // 刷新间隔

private long lastTime; // 最后执行命令的时间

private Map<String, String> environment; // 命令行执行所需要的操作系统环境

private File dir;

private Process process; // 执行命令行的子进程

private int exitCode; // 执行命令行完成后,退出状态码

dir属性表示当前执行命令所在的工作目录,environment属性表示当前命令执行的环境,它们在Shell类中都提供了一个受保护的设置操作,可以在Shell抽象类的子类中根据需要设置不同工作目录和环境,其中,dir默认为系统“user.dir”变量值,environment使用系统默认的环境。

通过interval与lastTime属性来检查,是否有必要重新执行一次,如果是就执行,否则重置退出状态码exitCode为0,正常退出,可以在Shell类的run方法中看到:

[java] view
plaincopy

protected void run() throws IOException {

if (lastTime + interval > System.currentTimeMillis())

return; // 不需要重新执行命令行,返回

exitCode = 0;

runCommand(); // 调用:需要重新执行命令行

}

通过runCommand方法就可以执行指定的Shell命令,它是Shell类的核心。在分析runCommand方法之前,先了解一下与Shell命令执行相关的环境信息。

当在Windows系统下,打开一个cmd窗口的时候,执行set命令,就能看到当前系统的环境变量的信息,如下所示:

[xhtml] view
plaincopy

ALLUSERSPROFILE=C:/Documents and Settings/All Users

APPDATA=C:/Documents and Settings/Administrator/Application Data

CLASSPATH=.;E:/Program Files/Java/jdk1.6.0_14/lib/tools.jar;E:/Program Files/Java/jdk1.6.0_14/lib/dt.jar;E:/Program Files/Java/jdk1.6.0_14/jre/lib/rt.jar;E:/Program Files/Java/jdk1.6.0_14/jre/lib/charsets.jar

CLIENTNAME=Console

CommonProgramFiles=C:/Program Files/Common Files

COMPUTERNAME=SYJ

ComSpec=C:/WINDOWS/system32/cmd.exe

DEVMGR_SHOW_NONPRESENT_DEVICES=1

FP_NO_HOST_CHECK=NO

HERITRIX_HOME=E:/MyEclipse/workspace/heritrix

HOME=C:/Documents and Settings/Administrator

HOMEDRIVE=C:

HOMEPATH=/Documents and Settings/Administrator

JAVA_HOME=E:/Program Files/Java/jdk1.6.0_14

JSERV=D:/oracle/ora92/Apache/Jserv/conf

LOGONSERVER=//SYJ

NUMBER_OF_PROCESSORS=2

NUTSUFFIX=1

NUT_SUFFIXED_SEARCHING=1

OS=Windows_NT

Path=D:/oracle/ora92/bin;C:/Program Files/Oracle/jre/1.3.1/bin;C:/Program Files/Oracle/jre/1.1.8/bin;E:/Program Files/CollabNet Subversion Client;E:/Program Files/Java/jdk1.6.0_14/bin;C:/WINDOWS/system32;C:/WINDOWS;C:/WINDOWS/System32/Wbem;C:/Program Files/Microsoft SQL Server/80/Tools/Binn/;C:/Program Files/Microsoft SQL Server/90/Tools/binn/;C:/Program Files/MyEclipse 7.0M1/jre/bin;E:/Program Files/TortoiseSVN/bin;E:/PROGRA~1/F-Secure/SSHTRI~1;D:/Program Files/MySQL/MySQL Server 5.1/bin;F:/Program Files/Rational/common;C:/Program Files/StormII/Codec;C:/Program Files/StormII;C:/Program Files/SSH Communications Security/SSH Secure Shell;C:/Program Files/IDM Computer Solutions/UltraEdit/

PATHEXT=.COM;.EXE;.BAT;.CMD;.VBS;.VBE;.JS;.JSE;.WSF;.WSH

PROCESSOR_ARCHITECTURE=x86

PROCESSOR_IDENTIFIER=x86 Family 6 Model 23 Stepping 10, GenuineIntel

PROCESSOR_LEVEL=6

PROCESSOR_REVISION=170a

ProgramFiles=C:/Program Files

PROMPT=$P$G

RATL_RTHOME=F:/Program Files/Rational/Rational Test

SESSIONNAME=Console

SystemDrive=C:

SystemRoot=C:/WINDOWS

TEMP=C:/DOCUME~1/ADMINI~1/LOCALS~1/Temp

TMP=C:/DOCUME~1/ADMINI~1/LOCALS~1/Temp

TMPDIR=C:/DOCUME~1/ADMINI~1/LOCALS~1/Temp

USERDOMAIN=SYJ

USERNAME=Administrator

USERPROFILE=C:/Documents and Settings/Administrator

VBOX_INSTALL_PATH=E:/Program Files/Sun/xVM VirtualBox/

VS90COMNTOOLS=d:/Microsoft Visual Studio 9.0/Common7/Tools/

windir=C:/WINDOWS

WV_GATEWAY_CFG=D:/oracle/ora92/Apache/modplsql/cfg/wdbsvr.app

这些环境变量的信息都是以键值对的形式出现的,当在操作系统上运行相关的应用程序的时候,其实就是在上述环境变量所设置的一个上下文中运行,环境是应用程序运行不可缺少的条件。你在Unix系统中执行env命令,同样也能得到与上面类似的键值对的环境变量信息。

所以,org.apache.hadoop.util.Shell作为Shell命令的抽象,一定要获取到当前所在操作系统(Unix系统) 的环境变量,在这样一个上下文中,才能如同从Unix系统中输入执行Shell命令进行执行一样。

在Java中,实现了一个java.lang.ProcessBuilder类,该类能够创建一个操作系统的进程,通过为该进程设置运行环境变量,从而启动进行执行。默认情况下ProcessBuilder类已经实现了设置当前操作系统环境的功能,可以通过environment()方法查看它的实例所具有的环境信息,这等价于使用System.getenv()获取到的环境变量,都是以键值对的形式存在于ProcessBuilder类实例的上下文中。

下面,分析Shell类实现执行命令的过程,实现方法为runCommand,如下所示:

[java] view
plaincopy

private void runCommand() throws IOException {

ProcessBuilder builder = new ProcessBuilder(getExecString()); // 方法getExecString()在该类中式抽象的,需要在实现类中实现,获取到一个命令名称及其参数,从而基于此 构造一个ProcessBuilder进程实例

boolean completed = false; // 标识执行命令完成情况

if (environment != null) {

builder.environment().putAll(this.environment); // 如果有必要,设置命令行执行环境

}

if (dir != null) {

builder.directory(this.dir); // 如果必要,设置命令行执行所在工作目录

}

process = builder.start(); // 启动ProcessBuilder builder进程,返回一个用来管理命令行执行情况的子进程process

final BufferedReader errReader = new BufferedReader(new InputStreamReader(process.getErrorStream())); // 当builder进程启动后,检查提交的命令行是否合法,如果不合法或者执行出错,将出错信息写入到缓冲流中,可以从其中解析读取出来

BufferedReader inReader = new BufferedReader(new InputStreamReader(process.getInputStream())); // 执行命令返回执行结果,通过process管理子线程来获取执行流中的执行结果信息

final StringBuffer errMsg = new StringBuffer(); // 存放执行命令出错信息的String缓冲区

// 定义解析线程,解析命令行执行出错信息所在的流,解析完成后释放流缓冲区

Thread errThread = new Thread() {

@Override

public void run() {

try {

String line = errReader.readLine();

while((line != null) && !isInterrupted()) {

errMsg.append(line);

errMsg.append(System.getProperty("line.separator"));

line = errReader.readLine();

}

} catch(IOException ioe) {

LOG.warn("Error reading the error stream", ioe);

}

}

};

try {

errThread.start(); // 启动线程,处理出错信息

} catch (IllegalStateException ise) { }

try {

parseExecResult(inReader); // 调用:解析执行命令返回的结果信息

String line = inReader.readLine();

while(line != null) {

line = inReader.readLine();

}

exitCode = process.waitFor(); // 等待进程process处理完毕,置exitCode状态码

try {

errThread.join(); // 等待出错信息处理线程执行完成

} catch (InterruptedException ie) {

LOG.warn("Interrupted while reading the error stream", ie);

}

completed = true; // 置命令行执行完成状态

if (exitCode != 0) {

throw new ExitCodeException(exitCode, errMsg.toString());

}

} catch (InterruptedException ie) {

throw new IOException(ie.toString());

} finally {

try {

inReader.close(); // 最后,需要关闭流对象

} catch (IOException ioe) {

LOG.warn("Error while closing the input stream", ioe);

}

if (!completed) {

errThread.interrupt(); // 可能处理错误信息的线程发生异常,未能置completed=true,最后终止要该线程

}

try {

errReader.close(); // 关闭流对象

} catch (IOException ioe) {

LOG.warn("Error while closing the error stream", ioe);

}

process.destroy(); // 终止子进程process

lastTime = System.currentTimeMillis(); // 设置当前时间为该命令行执行的最后时间,为了判断一个命令是否需要被重新执行

}

}

上面已经做了详细的注释,基本上阐明了一个命令行的执行过程。

在类中,还提供了一个static方法execCommand,为执行命令提供入口:

[java] view
plaincopy

public static String execCommand(String ... cmd) throws IOException {

return execCommand(null, cmd);

}

执行该方法,调用了另一个重载的execCommand方法,返回命令执行结果的信息。

注意,在Shell抽象类中并没有实现该怎样获取一个命令名称及其参数的方法,需要在实现类中给出,因此,在Shell类内部定义了一个静态内部类ShellCommandExecutor,该类实现了获取命令名称及其参数的方法。在上面方法execCommand中,调用了一个重载的execCommand方法,该方法中通过实例化一个ShellCommandExecutor类,来提供获取命令名称及其参数,进而构造一个ProcessBuilder实例,创建一个操作系统线程来执行命令行。

? extends Shell

下面看实现Shell抽象类的一些子类的实现。

ShellCommandExecutor类

ShellComandExecutor类的实现如下所示:

[java] view
plaincopy

public static class ShellCommandExecutor extends Shell {

private String[] command; // 命令名称及其参数

private StringBuffer output; // 存放执行命令行返回结果的String缓冲区

public ShellCommandExecutor(String[] execString) {

command = execString.clone();

}

public ShellCommandExecutor(String[] execString, File dir) {

this(execString);

this.setWorkingDirectory(dir);

}

public ShellCommandExecutor(String[] execString, File dir, Map<String, String> env) {

this(execString, dir);

this.setEnvironment(env);

}

/** 继承自Shell基类,执行命令行 */

public void execute() throws IOException {

this.run();

}

protected String[] getExecString() {

return command; // 输入的就是命令名称+参数的格式,直接得到

}

/**

* 解析命令行执行后的输出结果流,存放到String缓冲区中

*/

protected void parseExecResult(BufferedReader lines) throws IOException {

output = new StringBuffer();

char[] buf = new char[512];

int nRead;

while ( (nRead = lines.read(buf, 0, buf.length)) > 0 ) {

output.append(buf, 0, nRead);

}

}

DF类

org.apache.hadoop.fs.DF类实现了Unix系统中Shell命令df,用来获取磁盘使用情况的统计数据。该Shell实现类中定义域df命令操作相关的内容,可以从属性来看:

[java] view
plaincopy

public static final long DF_INTERVAL_DEFAULT = 3 * 1000; // 设置df命令刷新间隔为3s

private String dirPath; // 执行df命令所在工作目录

private String filesystem; // 磁盘设备名

private long capacity; // 磁盘总容量

private long used; // 磁盘使用量

private long available; // 磁盘可用量

private int percentUsed; // 磁盘使用率

private String mount; // 磁盘挂载位置

只需要实现Shell类定义的getExecString与parseExecResult方法即可。比较简单,getExecString方法实现如下:

[java] view
plaincopy

protected String[] getExecString() {

return new String[] {"bash","-c","exec 'df' '-k' '" + dirPath + "' 2>/dev/null"};

}

该方法返回的字符串数组,用来构造一个ProcessBuilder进程实例。

parseExecResult方法实现如下所示:

[java] view
plaincopy

protected void parseExecResult(BufferedReader lines) throws IOException {

lines.readLine(); // 去掉流中的首行

String line = lines.readLine();

if (line == null) {

throw new IOException( "Expecting a line not the end of stream" );

}

StringTokenizer tokens = new StringTokenizer(line, " /t/n/r/f%");

this.filesystem = tokens.nextToken();

if (!tokens.hasMoreTokens()) {

line = lines.readLine();

if (line == null) {

throw new IOException( "Expecting a line not the end of stream" );

}

tokens = new StringTokenizer(line, " /t/n/r/f%");

}

/**

* 下面处理并设置执行df -k命令的结果信息

*/

this.capacity = Long.parseLong(tokens.nextToken()) * 1024;

this.used = Long.parseLong(tokens.nextToken()) * 1024;

this.available = Long.parseLong(tokens.nextToken()) * 1024;

this.percentUsed = Integer.parseInt(tokens.nextToken());

this.mount = tokens.nextToken();

}

DU类

DU类实现了Unix的du命令,显示目录或者文件大小的信息,具体实现可以参考org.apache.hadoop.fs.DU类,这里跳过。

CygPathCommand类

CygPathCommand类是org.apache.hadoop.fs.FileUtil类的一个内部静态类,实现了Windows系统上模拟Unix系统的Cygwin系统的cygpath命令,这里跳过。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: