您的位置:首页 > 其它

Hive 执行流程图 源码入口

2011-07-29 15:00 603 查看


图片地址 :http://hi.csdn.net/attachment/201107/29/0_1311922740tXqK.gif

CliDriver可以说是hive的入口,对应上图中的UI部分。大家看它的结构就可以明白了,main()函数!对!你猜的没错就是从main()开始。

下图是类结构,总共有五个关键的函数。



这个类可以说是用户和hive交互的平台,你可以把它认为是hive客户端。总共有4个key函数:

下图是这个CliDriver类在整个Hive执行过程中的作用的地位。



如图,hive执行流程_按正常步骤走:

1.—CliDriver.classz中main()开始,初始化Hive环境变量,获取客户端提供的string或者file。

2 —将其代码送入processLine(cmd),这步主要是读入cmd:‘;’之前的所有字符串都读入(不做任何检查),之后的会忽略。读完后,传入processCmd()处理

3 —调用processCmd(cmd),分情况处理

//– 读入cmd,并分情况处理,总共分为以下五种情况,根据命令的开头字符串来确定用什么方法处理。

// 1.set.. 设置operator参数,hive环境参数

// 2.quit or exit — 退出Hive环境

// 3.! 开头

// 4.dfs 开头 交给FsShell处理

// 5.hivesql 正常hivesql执行语句,我们最关心的是这里。语句交给了、、Hive真正的核心引擎 Driver。返回ret = Driver.run(cmd);

4.—不同情况不同处理方法。我们关心的第五种情况:正常的HiveSQL如何处理?其实是进入driver.class里面run(),

//读入hivesql ,词法分析,语法分析,直到执行结束

//1.ParseDriver 返回 词法树 CommonTree

//2.BaseSemanticAnalyzer sem.analyze(tree, ctx);//语义解释,生成执行计划

5.—。。。etc

今天的主题是hive的入口,我们只聊前三步。

现在我们细化主要函数,看hive实际是怎么处理的。(如果你只想了解hive工作流程或原理,不想拘泥于细节,可以跳过下面的细节,如果你想修改源码,做优化,可以继续往下看)

下面是hive入口 涉及的一些关键类和关键函数。

——————————-类CliDriver —

由于这个类,可以说贯彻Hive的整个流程架构,所以我聊的比较细。

————————————————main()
public static void main(String[] args) throws IOException {

OptionsProcessor oproc = new OptionsProcessor();
if(! oproc.process_stage1(args)) {
System.exit(1);
}

// NOTE: It is critical to do this here so that log4j is reinitialized before
// any of the other core hive classes are loaded
SessionState.initHiveLog4j();
//建立客户端sesssion
CliSessionState ss = new CliSessionState (new HiveConf(SessionState.class));
ss.in = System.in;//标准输入
try {
ss.out = new PrintStream(System.out, true, "UTF-8");//??
ss.err = new PrintStream(System.err, true, "UTF-8");//??
} catch (UnsupportedEncodingException e) {
System.exit(3);
}

SessionState.start(ss);// -- start session  通过复制当前CliSessionState新建立SessionState

if(! oproc.process_stage2(ss)) {
System.exit(2);
}

// set all properties specified via command line
HiveConf conf = ss.getConf();//设置所有配置属性
for(Map.Entry item: ss.cmdProperties.entrySet()) {
conf.set((String) item.getKey(), (String) item.getValue());
}

sp = new SetProcessor();//?? what is proccessor
qp = new Driver();  // 正常hiveSql的处理引擎
dfs = new FsShell(ss.getConf());//dfs接口,用于 dfs命令处理

if(ss.execString != null) {// 输入的是命令行,按命令执行
System.exit(processLine(ss.execString));
}

try {
if(ss.fileName != null) {// 输入的是文件名,读文件执行
System.exit(processReader(new BufferedReader(new FileReader(ss.fileName))));
}
} catch (FileNotFoundException e) {//没有找到该文件
System.err.println("Could not open input file for reading. ("+e.getMessage()+")");
System.exit(3);
}

Character mask = null;
String trigger = null;

ConsoleReader reader = new ConsoleReader();//hive Console控制台命令读取器
reader.setBellEnabled(false);
//reader.setDebug(new PrintWriter(new FileWriter("writer.debug", true)));

List completors = new LinkedList();
completors.add(new SimpleCompletor(new String[] { "set", "from",
"create", "load",
"describe", "quit", "exit" }));
reader.addCompletor(new ArgumentCompletor(completors));

String line;
PrintWriter out = new PrintWriter(System.out);
final String HISTORYFILE = ".hivehistory";//建立历史文件,记录所有的命令行
String historyFile = System.getProperty("user.home") + File.separator  + HISTORYFILE;
reader.setHistory(new History(new File(historyFile)));
int ret = 0;
Log LOG = LogFactory.getLog("CliDriver");//建立日志
LogHelper console = new LogHelper(LOG);
String prefix = "";
String curPrompt = prompt;// -- is "hive"
//不断地获取hiveSql,读取;之前的所有内容,传个processLine处理
while ((line = reader.readLine(curPrompt+"> ")) != null) {//--循环开始读命令
long start = System.currentTimeMillis();// 命令计时开始
if(line.trim().endsWith(";")) {//如果碰见';'表示结束,该
line = prefix + " " + line;
ret = processLine(line);// ----重点: 把命令行传入给解析,执行
prefix = "";// 把前缀重置为空
curPrompt = prompt;// "hive"
} else {
prefix = prefix + line;
curPrompt = prompt2;// 应该是 "  "
continue;
}
long end = System.currentTimeMillis();
if (end > start) {//统计开始到结束的时间,如:命令开始执行所用的时间,
//console reader需要可以添加很多屏幕操作

double timeTaken = (double)(end-start)/1000.0;
console.printInfo("Time taken: " + timeTaken + " seconds", null);     //对应在Hive Session上。
}
}

System.exit(ret);


—————————— processLine(Cmd)

// 读入cmd:‘;’之前的所有字符串都读入(不做任何检查),之后的都会忽略。读完后,传入processCmd处理.
public static int processLine(String line) {
int ret = 0;
for(String oneCmd: line.split(";")) {
oneCmd = oneCmd.trim();
if(oneCmd.equals(""))
continue;

ret = processCmd(oneCmd);//--执行命令
if(ret != 0) {
// ignore anything after the first failed command
return ret;
}
}
return 0;
}


—————————— processCmd()

//– 读入cmd,并分情况处理,总共分为以下五种情况,根据命令的开头字符串来确定用什么方法处理。

// 1.set.. 设置operator参数,hive环境参数

// 2.quit or exit — 退出Hive环境

// 3.! 开头

// 4.dfs 开头 交给FsShell处理

// 5.hivesql 正常hivesql执行语句,我们最关心的是这里。语句交给了、、Hive真正的核心引
public static int processCmd(String cmd) {
String[] tokens = cmd.split("\\s+");
String cmd_1 = cmd.substring(tokens[0].length());
int ret = 0;

if(tokens[0].equals("set")) { //1
ret = sp.run(cmd_1);// 调用这句就可以更改hadoop配置
} else if (cmd.equals("quit") || cmd.equals("exit")) {//2
//退出Hive环境
System.exit(0);
} else if (cmd.startsWith("!")) {//3 :! 开头的命令
SessionState ss = SessionState.get();
String shell_cmd = cmd.substring(1);
if (shell_cmd.endsWith(";")) {
shell_cmd = shell_cmd.substring(0, shell_cmd.length()-1);
}//--除掉';'??
//shell_cmd = "/bin/bash -c \'" + shell_cmd + "\'";

try {
Process executor = Runtime.getRuntime().exec(shell_cmd);//!!??这句得好好 跟踪
StreamPrinter outPrinter = new StreamPrinter(executor.getInputStream(), null, ss.out);
StreamPrinter errPrinter = new StreamPrinter(executor.getErrorStream(), null, ss.err);

outPrinter.start();
errPrinter.start();

int exitVal = executor.waitFor();//?? look executor
if (exitVal != 0) {
ss.err.write((new String("Command failed with exit code = " + exitVal)).getBytes());
}
}
catch (Exception e) {
e.printStackTrace();
}
} else if (cmd.startsWith("dfs")) {//4  "dfs"  开头解析方法  -- cmd.
// Hadoop DFS 操作接口 处理!
SessionState ss = SessionState.get();
if(dfs == null)
dfs = new FsShell(ss.getConf());
String hadoopCmd = cmd.replaceFirst("dfs\\s+", "");
hadoopCmd = hadoopCmd.trim();
if (hadoopCmd.endsWith(";")) {
hadoopCmd = hadoopCmd.substring(0, hadoopCmd.length()-1);
}
String[] args = hadoopCmd.split("\\s+");//
try {
PrintStream oldOut = System.out;
System.setOut(ss.out);
int val = dfs.run(args);//??
System.setOut(oldOut);
if (val != 0) {
ss.err.write((new String("Command failed with exit code = " + val)).getBytes());
}
} catch (Exception e) {
ss.err.println("Exception raised from DFSShell.run " + e.getLocalizedMessage());
}
} else {//5 hivesql 正常运行,重点在这里
ret = qp.run(cmd);//正常执行hive命令,如:select  .. ;  addfile ..;
Vector res = new Vector();
while (qp.getResults(res)) {//获得执行结果result
for (String r:res) {
SessionState ss  = SessionState.get();
PrintStream out = ss.out;
out.println(r);
}
res.clear();
}

int cret = qp.close();
if (ret == 0) {
ret = cret;
}
}
return ret;
}


———————–类CliSessionState

CliSessionState 除了做了个初始化,基本都是上继承了SessionState的实现方法,可能是作者为了低耦合。

public class CliSessionState extends SessionState

所以我们直接看SessionState。

SessionState可以说你是你自己当前的Hive环境,建立、初始化你Hive的session,一方面它来自conf的初始化设置,一方面来自你手动set。可以通过命令行形式,也可以通过file,这都取决于你的选择。

它会连接Hive元数据数据库,得到现有的元数据信息。

此类主要关键功能:

1 主要是生成session,并赋予一个唯一id(设置规则:用户名_年月日分秒 即 user_id + “_”+ yyyymmddHHmm)的session,

生成session有两种方式:1.直接新建session ,2. 通过拷贝方式复制一个session。

第一种我们最常用,但第二种很有用,我们可以确保我们两个环境是完全一致的,而且避免琐碎设置工作。

2 给每个cmd给予一个queryID,可以通过queryID得到命令行,也可以反过来得到id

3 每个sessionState 都会有一个logHelper,用于日志记录

其中 clude : hiveconf , 连接db元数据数据库

下图是SessionState的类结构:

关键函数:

String makeSessionId() ——— //生成sessionID : user_id+”_” + yyyyMMDDhhmm

setCmd(String cmdString)—— //给命令cmd设置query Id

protected final static HiveConf.ConfVars [] metaVars —-//获取元数据系统,路径等

public String getCmd() —— // –通过queryID获取命令代码cmd

——————————–类 CommandProcessor

CommandProcessor类的很简单,是个接口类。

public interface CommandProcessor {

public int run(String command);

}

你会奇怪为什么先聊这个接口类,因为有三个类实现了这个接口(如下图),其中setProcessor, MetadataProcessor是我们Hive入口的关键类。

———–类MetadataProcessor

Hive部分元信息提取与处理

// run()中 得到表的元信息,如果出错返回1,如:找不到表名等情况
public int run(String command) {
SessionState ss = SessionState.get();
String table_name = command.trim();
if(table_name.equals("")) {
return 0;
}

try {
MetaStoreClient msc = new MetaStoreClient(ss.getConf());

if(!msc.tableExists(table_name)) {//表不存在
ss.err.println("table does not exist: " + table_name);
return 1;
} else {
List fields = msc.get_fields(table_name);//获得表信息

for(FieldSchema f: fields) {
ss.out.println(f.getName() + ": " + f.getType());
}
}
} catch (MetaException err) {
ss.err.println("Got meta exception: " + err.getMessage());
return 1;
} catch (Exception err) {
ss.err.println("Got exception: " + err.getMessage());
return 1;
}
return 0;
}


——————- 类SetProcessor

主要是设置Hive环境,总共分为两大类:

1. set session为安全模式,

如:set silent = true;

2.set 该session的conf配置,即调用hadoop时的配置参数,以及改变执行时的具体实现。 如:set hive.exec.compress.output=’false’;

// 我们可以调用这里run(String command)更改hadoop配置 ,hive执行参数等,
public int run(String command) {
SessionState ss = SessionState.get();//建一个SessionState对象
String nwcmd = command.trim();//去空格
if(nwcmd.equals("")) {
dumpOptions(ss.getConf().getChangedProperties());
return 0;
}
if(nwcmd.equals("-v")) {
dumpOptions(ss.getConf().getAllProperties());
return 0;
}
String[] part = new String [2];
int eqIndex = nwcmd.indexOf('=');
if(eqIndex == -1) {
// no equality sign - print the property out
dumpOption(ss.getConf().getAllProperties(), nwcmd);
return (0);
} else if (eqIndex == nwcmd.length()-1) {
part[0] = nwcmd.substring(0, nwcmd.length()-1);
part[1] = "";
} else {
part[0] = nwcmd.substring(0, eqIndex);// 中间=号隔开的 set cmd
part[1] = nwcmd.substring(eqIndex+1);
}

try {//
if (part[0].equals("silent")) {// 设置silent模式
boolean val = getBoolean(part[1]);//
ss.setIsSilent(val);//
} else {
ss.getConf().set(part[0], part[1]);// 设置key - value(如:.gmmt = ture)修改该session的conf里配置
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: