在Eclipse下开发Hadoop程序并提交到集群中运行
2013-09-20 12:22
603 查看
本文假设Hadoop集群环境已经搭建成功,在此基础上讲解怎样在Eclipse上开发Hadoop程序后,提交到集群中运行,并通过Hadoop管理界面观察任务的执行情况。所使用的Eclipse版本是Spring组织推出的IDE工具 Spring tool suite ,可以到Spring组织的官网上下载。
一、配置Eclipse连接到HDFS
1、把Hadoop的Eclipse插件工具hadoop-eclipse-plugin-1.0.3.jar放入Eclipse的插件目录下 sts-3.2.0.RELEASE\plugins\
2、重启Eclipse后,会发现界面上多了一种视图:Map/Reduce
3、打开Map/Reduce Locations视图工具界面
4、新建一个Hadoop本地连接
5、配置连接属性
6、配置完成后,在Project Explorer视图中可以看到刚才配置的连接项,从这里可以看到远程Hadoop集群的HDFS文件。至此配置完成。
二、在Eclipse下提交任务到Hadoop集群中运行
1、修改本地windows系统下的hosts
(1)以管理员省份打开记事本
(2)在记事本下打开C:\Windows\System32\drivers\etc\hosts文件
(3)编辑该文件,在文件头加上hadoop集群的的配置信息,例如以下的形式:
192.168.100.1 namenode
192.168.100.2 datanode1
192.168.100.3 datanode2
192.168.100.4 datanode3
2、在Eclipse下新建一个普通的Java工程,并新建src、conf、lib 三个目录,如下图
3、把hadoop开发包的lib目录下(hadoop-1.0.3\lib)的所有jar包拷贝到工程的lib目录下,并加到classpath下
4、把hadoop开发包的conf目录下(hadoop-1.0.3\conf)的core-site.xml、hdfs-site.xml、mapred-site.xml三个文件拷贝到本工程的conf目录下
(1)core-site.xml内容如下:
(2)hdfs-site.xml内容如下:
(3)mapred-site.xml内容如下:
5、在src目录下新建com目录,把以下两个类放入com目录下
(1)EJob.java
(2)WordCount.java
6、运行WordCount.java类,在 http://namenode:50030 页面下可以看到提交的任务
注:如果提示FileUtil.java类有误,请把错误信息拷贝到网上查找正确的FileUtil类。
Hadoop默认HTTP端口号以及HTTP地址
一、配置Eclipse连接到HDFS
1、把Hadoop的Eclipse插件工具hadoop-eclipse-plugin-1.0.3.jar放入Eclipse的插件目录下 sts-3.2.0.RELEASE\plugins\
2、重启Eclipse后,会发现界面上多了一种视图:Map/Reduce
3、打开Map/Reduce Locations视图工具界面
4、新建一个Hadoop本地连接
5、配置连接属性
6、配置完成后,在Project Explorer视图中可以看到刚才配置的连接项,从这里可以看到远程Hadoop集群的HDFS文件。至此配置完成。
二、在Eclipse下提交任务到Hadoop集群中运行
1、修改本地windows系统下的hosts
(1)以管理员省份打开记事本
(2)在记事本下打开C:\Windows\System32\drivers\etc\hosts文件
(3)编辑该文件,在文件头加上hadoop集群的的配置信息,例如以下的形式:
192.168.100.1 namenode
192.168.100.2 datanode1
192.168.100.3 datanode2
192.168.100.4 datanode3
2、在Eclipse下新建一个普通的Java工程,并新建src、conf、lib 三个目录,如下图
3、把hadoop开发包的lib目录下(hadoop-1.0.3\lib)的所有jar包拷贝到工程的lib目录下,并加到classpath下
4、把hadoop开发包的conf目录下(hadoop-1.0.3\conf)的core-site.xml、hdfs-site.xml、mapred-site.xml三个文件拷贝到本工程的conf目录下
(1)core-site.xml内容如下:
<?xml version="1.0"?> <?xml-stylesheet type="text/xsl" href="configuration.xsl"?> <!-- Put site-specific property overrides in this file. --> <configuration> <property> <name>fs.default.name</name> <value>hdfs://namenode:9000/</value> </property> <property> <name>hadoop.tmp.dir</name> <value>/usr/hadoop/tmp/</value> </property> </configuration>
(2)hdfs-site.xml内容如下:
<?xml version="1.0"?> <?xml-stylesheet type="text/xsl" href="configuration.xsl"?> <!-- Put site-specific property overrides in this file. --> <configuration> <property> <name>dfs.replication</name> <value>3</value> </property> <property> <name>dfs.permissions</name> <value>false</value> </property> <property> <name>dfs.support.append</name> <value>true</value> </property> </configuration>
(3)mapred-site.xml内容如下:
<?xml version="1.0"?> <?xml-stylesheet type="text/xsl" href="configuration.xsl"?> <!-- Put site-specific property overrides in this file. --> <configuration> <property> <name>mapred.job.tracker</name> <value>namenode:9001</value> </property> <property> <name>mapred.map.tasks</name> <value>16</value> </property> <property> <name>mapred.reduce.tasks</name> <value>5</value> <!-- 有多少台datanode就设置为多少 --> </property> <property> <name>mapred.tasktracker.map.tasks.maximum</name> <value>20</value> </property> <property> <name>mapred.tasktracker.reduce.tasks.maximum</name> <value>20</value> </property> </configuration>
5、在src目录下新建com目录,把以下两个类放入com目录下
(1)EJob.java
import java.io.File; import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.lang.reflect.Array; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.net.URL; import java.net.URLClassLoader; import java.util.ArrayList; import java.util.Arrays; import java.util.Enumeration; import java.util.jar.JarEntry; import java.util.jar.JarFile; import java.util.jar.JarOutputStream; import java.util.jar.Manifest; public class EJob { private static ArrayList<URL> classPath = new ArrayList<URL>(); /** Unpack a jar file into a directory. */ public static void unJar(File jarFile, File toDir) throws IOException { JarFile jar = new JarFile(jarFile); try { Enumeration entries = jar.entries(); while (entries.hasMoreElements()) { JarEntry entry = (JarEntry) entries.nextElement(); if (!entry.isDirectory()) { InputStream in = jar.getInputStream(entry); try { File file = new File(toDir, entry.getName()); if (!file.getParentFile().mkdirs()) { if (!file.getParentFile().isDirectory()) { throw new IOException("Mkdirs failed to create " + file.getParentFile().toString()); } } OutputStream out = new FileOutputStream(file); try { byte[] buffer = new byte[8192]; int i; while ((i = in.read(buffer)) != -1) { out.write(buffer, 0, i); } } finally { out.close(); } } finally { in.close(); } } } } finally { jar.close(); } } /** * Run a Hadoop job jar. If the main class is not in the jar's manifest, * then it must be provided on the command line. */ public static void runJar(String[] args) throws Throwable { String usage = "jarFile [mainClass] args..."; if (args.length < 1) { System.err.println(usage); System.exit(-1); } int firstArg = 0; String fileName = args[firstArg++]; File file = new File(fileName); String mainClassName = null; JarFile jarFile; try { jarFile = new JarFile(fileName); } catch (IOException io) { throw new IOException("Error opening job jar: " + fileName).initCause(io); } Manifest manifest = jarFile.getManifest(); if (manifest != null) { mainClassName = manifest.getMainAttributes().getValue("Main-Class"); } jarFile.close(); if (mainClassName == null) { if (args.length < 2) { System.err.println(usage); System.exit(-1); } mainClassName = args[firstArg++]; } mainClassName = mainClassName.replaceAll("/", "."); File tmpDir = new File(System.getProperty("java.io.tmpdir")); tmpDir.mkdirs(); if (!tmpDir.isDirectory()) { System.err.println("Mkdirs failed to create " + tmpDir); System.exit(-1); } final File workDir = File.createTempFile("hadoop-unjar", "", tmpDir); workDir.delete(); workDir.mkdirs(); if (!workDir.isDirectory()) { System.err.println("Mkdirs failed to create " + workDir); System.exit(-1); } Runtime.getRuntime().addShutdownHook(new Thread() { public void run() { try { fullyDelete(workDir); } catch (IOException e) { } } }); unJar(file, workDir); classPath.add(new File(workDir + "/").toURL()); classPath.add(file.toURL()); classPath.add(new File(workDir, "classes/").toURL()); File[] libs = new File(workDir, "lib").listFiles(); if (libs != null) { for (int i = 0; i < libs.length; i++) { classPath.add(libs[i].toURL()); } } ClassLoader loader = new URLClassLoader(classPath.toArray(new URL[0])); Thread.currentThread().setContextClassLoader(loader); Class<?> mainClass = Class.forName(mainClassName, true, loader); Method main = mainClass.getMethod("main", new Class[] { Array.newInstance(String.class, 0).getClass() }); String[] newArgs = Arrays.asList(args).subList(firstArg, args.length).toArray(new String[0]); try { main.invoke(null, new Object[] { newArgs }); } catch (InvocationTargetException e) { throw e.getTargetException(); } } /** * Delete a directory and all its contents. If we return false, the * directory may be partially-deleted. */ public static boolean fullyDelete(File dir) throws IOException { File contents[] = dir.listFiles(); if (contents != null) { for (int i = 0; i < contents.length; i++) { if (contents[i].isFile()) { if (!contents[i].delete()) { return false; } } else { // try deleting the directory // this might be a symlink boolean b = false; b = contents[i].delete(); if (b) { // this was indeed a symlink or an empty directory continue; } // if not an empty directory or symlink let // fullydelete handle it. if (!fullyDelete(contents[i])) { return false; } } } } return dir.delete(); } /** * Add a directory or file to classpath. * * @param component */ public static void addClasspath(String component) { if ((component != null) && (component.length() > 0)) { try { File f = new File(component); if (f.exists()) { URL key = f.getCanonicalFile().toURL(); if (!classPath.contains(key)) { classPath.add(key); } } } catch (IOException e) { } } } /** * Add default classpath listed in bin/hadoop bash. * * @param hadoopHome */ public static void addDefaultClasspath(String hadoopHome) { // Classpath initially contains conf dir. addClasspath(hadoopHome + "/conf"); // For developers, add Hadoop classes to classpath. addClasspath(hadoopHome + "/build/classes"); if (new File(hadoopHome + "/build/webapps").exists()) { addClasspath(hadoopHome + "/build"); } addClasspath(hadoopHome + "/build/test/classes"); addClasspath(hadoopHome + "/build/tools"); // For releases, add core hadoop jar & webapps to classpath. if (new File(hadoopHome + "/webapps").exists()) { addClasspath(hadoopHome); } addJarsInDir(hadoopHome); addJarsInDir(hadoopHome + "/build"); // Add libs to classpath. addJarsInDir(hadoopHome + "/lib"); addJarsInDir(hadoopHome + "/lib/jsp-2.1"); addJarsInDir(hadoopHome + "/build/ivy/lib/Hadoop/common"); } /** * Add all jars in directory to classpath, sub-directory is excluded. * * @param dirPath */ public static void addJarsInDir(String dirPath) { File dir = new File(dirPath); if (!dir.exists()) { return; } File[] files = dir.listFiles(); if (files == null) { return; } for (int i = 0; i < files.length; i++) { if (files[i].isDirectory()) { continue; } else { addClasspath(files[i].getAbsolutePath()); } } } /** * Create a temp jar file in "java.io.tmpdir". * * @param root * @return * @throws IOException */ public static File createTempJar(String root) throws IOException { if (!new File(root).exists()) { return null; } Manifest manifest = new Manifest(); manifest.getMainAttributes().putValue("Manifest-Version", "1.0"); final File jarFile = File.createTempFile("EJob-", ".jar", new File(System.getProperty("java.io.tmpdir"))); Runtime.getRuntime().addShutdownHook(new Thread() { public void run() { jarFile.delete(); } }); JarOutputStream out = new JarOutputStream(new FileOutputStream(jarFile), manifest); createTempJarInner(out, new File(root), ""); out.flush(); out.close(); return jarFile; } private static void createTempJarInner(JarOutputStream out, File f, String base) throws IOException { if (f.isDirectory()) { File[] fl = f.listFiles(); if (base.length() > 0) { base = base + "/"; } for (int i = 0; i < fl.length; i++) { createTempJarInner(out, fl[i], base + fl[i].getName()); } } else { out.putNextEntry(new JarEntry(base)); FileInputStream in = new FileInputStream(f); byte[] buffer = new byte[1024]; int n = in.read(buffer); while (n != -1) { out.write(buffer, 0, n); n = in.read(buffer); } in.close(); } } /** * Return a classloader based on user-specified classpath and parent * classloader. * * @return */ public static ClassLoader getClassLoader() { ClassLoader parent = Thread.currentThread().getContextClassLoader(); if (parent == null) { parent = EJob.class.getClassLoader(); } if (parent == null) { parent = ClassLoader.getSystemClassLoader(); } return new URLClassLoader(classPath.toArray(new URL[0]), parent); } }
(2)WordCount.java
import java.io.IOException; import java.util.Iterator; import java.util.StringTokenizer; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.MapReduceBase; import org.apache.hadoop.mapred.Mapper; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reducer; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.TextInputFormat; import org.apache.hadoop.mapred.TextOutputFormat; public class WordCount { public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> { private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { String line = value.toString(); StringTokenizer tokenizer = new StringTokenizer(line); while (tokenizer.hasMoreTokens()) { word.set(tokenizer.nextToken()); output.collect(word, one); } } } public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> { public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { int sum = 0; while (values.hasNext()) { sum += values.next().get(); } output.collect(key, new IntWritable(sum)); } } public static void main(String[] args) throws Exception { File jarFile = EJob.createTempJar(WordCount.class.getClassLoader().getResource("").getPath()); ClassLoader classLoader = EJob.getClassLoader(); Thread.currentThread().setContextClassLoader(classLoader); Configuration conf = new Configuration(); Job job = new Job(conf, "WordCount"); ((JobConf) job.getConfiguration()).setJar(jarFile.toString()); job.setJarByClass(WordCount.class); conf.setOutputKeyClass(Text.class); conf.setOutputValueClass(IntWritable.class); conf.setMapperClass(Map.class); conf.setCombinerClass(Reduce.class); conf.setReducerClass(Reduce.class); conf.setInputFormat(TextInputFormat.class); conf.setOutputFormat(TextOutputFormat.class); FileInputFormat.setInputPaths(conf, "hdfs://192.168.100.1:9000/trade")); FileOutputFormat.setOutputPath(conf, "hdfs://192.168.100.1:9000/temp")); JobClient.runJob(conf); } }
6、运行WordCount.java类,在 http://namenode:50030 页面下可以看到提交的任务
注:如果提示FileUtil.java类有误,请把错误信息拷贝到网上查找正确的FileUtil类。
Hadoop默认HTTP端口号以及HTTP地址
Hadoop模块 | 守护进程 | 默认端口号 | 配置参数 |
HDFS | NameNode | 50070 | dfs.http.address |
SecondNameNode | 50090 | dfs.secondary.htttp.address | |
DataNodes | 50075 | dfs.datanode.http.addres | |
MapReduce | JobTracker | 50030 | mapred.job.tracker.http.address |
TaskTracker | 50060 | mapred.task.tracker.http.address |
相关文章推荐
- mac电脑的eclipse把mapreduce程序提交到hadoop2.x集群虚拟机上运行
- Linux下用Eclipse开发调试Hadoop程序后打jar包后,到Hadoop集群上运行
- Eclipse打包mapreduce程序并提交至hadoop集群运行
- Linux下用Eclipse开发调试Hadoop程序后打jar包后,到Hadoop集群上运行
- eclipse或idea中开发spark程序本地运行以及提交集群运行
- 创建Hbase索引表之在eclipse上运行与直接在hadoop集群上运行的程序编写的差异
- 在JAVA应用中远程提交MapReduce程序至Hadoop集群运行
- window7使用eclipse提交Hadoop作业到Hadoop集群运行方法
- eclipse配置hadoop开发环境并运行WordCount小程序
- 在eclipse将mapreduce程序运行在hadoop集群中
- MapReduce程序的3种集群提交运行模式详解---基于Windows与Linux两种开发环境
- ubuntu系统下eclipse配置hadoop开发环境并运行wordcount程序
- 攻城狮在路上(陆)-- 提交运行MapReduce程序到hadoop集群运行
- Ubuntu系统下的Hadoop集群(4)_使用Eclipse编译运行MapReduce程序
- ubuntu系统下eclipse配置hadoop开发环境并运行wordcount程序
- eclipse中配置hadoop开发环境-----删除之前版本的hadoop插件、编译hadoop eclipse插件、安装插件、eclipse下运行hadoop程序
- 在Windows下的Eclipse中如何将WordCount程序提交到集群运行
- windows eclipse远程连接hadoop集群并提交任务运行
- 本地Spark程序提交到hadoop集群运行流程
- 如何在eclipse下开发和运行hadoop程序。