Eclipse重写Wordcount类实现处理中文字符,利用hadoop Eclipse插件远程调试hadoop运行WordCount程序
2015-02-16 10:26
866 查看
在上面一篇博客中,制作了hadoop 的 Eclipse 插件。现在我们使用Eclipse来重写wordcount统计中文关键词keywords.
1、环境准备。
1、1 准备好有hadoop 插件的Eclipse
1、2 启动好linux虚拟机
1、3 启动好虚拟机后,需要对 hdfs-site.xml 配置文件 添加一个新的属性:dfs.permissions 为 false。 如下:
<configuration>
<property>
<name>dfs.replication</name>
<value>1</value>
</property>
<property>
<name>dfs.permissions</name>
<value>false</value>
</property>
</configuration>
保存并退出
1、4 start-all.sh 启动 hadoop。
1、5 禁用ipatbles 防火墙规则。
2、为linux 添加一个root权限的账户。 该账户名称为你登陆的windwos 的用户名。因为是远程调试,所以需要一个与windows一样的账户名称来运行hadoop。且该账户有足够的权限来运行hadoop。 我直接使用root权限。如下是添加账户示例:(PS: 我 windows 的用户名是 Caixen ,所以以下以 Caixen 为例)
[root@caixen-3 ~]# adduser Caixen //添加一个名为Caixen的用户
[root@caixen-3 ~]#passwd Caixen //修改密码
账户添加成功后。给Caixen附 root 权限。
修改 /etc/passwd 文件,找到如下行,把用户ID修改为 0 ,如下所示:
Caixen:x:0:500::/home/Caixen:/bin/bash
保存并退出。
测试Caixen 账户: logout 登出当前用户, 然后使用 Caixen 用户名和密码登陆即可。
3、在Eclipse 中新建一个 MapReduce Driver 项目,取名为:KeyWordCount
3、1 在hadoop-1.2.1 原解压文件中找到 WordCount.java 文件,并拷贝到 KeyWordCount 项目中。
WordCount.java 文件路径是:hadoop-1.2.1\src\examples\org\apache\hadoop\examples
3、2 新建一个Class: Ejob .java 整个类代码如下:
public class EJob {
// To declare global field
private static List<URL> classPath = new ArrayList<URL>();
// To declare method
public static File createTempJar(String root) throws IOException {
if (!new File(root).exists()) {
return null;
}
Manifest manifest = new Manifest();
manifest.getMainAttributes().putValue("Manifest-Version", "1.0");
final File jarFile = File.createTempFile("EJob-", ".jar", new File(
System.getProperty("java.io.tmpdir")));
Runtime.getRuntime().addShutdownHook(new Thread() {
public void run() {
jarFile.delete();
}
});
JarOutputStream out = new JarOutputStream(
new FileOutputStream(jarFile), manifest);
createTempJarInner(out, new File(root), "");
out.flush();
out.close();
return jarFile;
}
private static void createTempJarInner(JarOutputStream out, File f,
String base) throws IOException {
if (f.isDirectory()) {
File[] fl = f.listFiles();
if (base.length() > 0) {
base = base + "/";
}
for (int i = 0; i < fl.length; i++) {
createTempJarInner(out, fl[i], base + fl[i].getName());
}
} else {
out.putNextEntry(new JarEntry(base));
FileInputStream in = new FileInputStream(f);
byte[] buffer = new byte[1024];
int n = in.read(buffer);
while (n != -1) {
out.write(buffer, 0, n);
n = in.read(buffer);
}
in.close();
}
}
public static ClassLoader getClassLoader() {
ClassLoader parent = Thread.currentThread().getContextClassLoader();
if (parent == null) {
parent = EJob.class.getClassLoader();
}
if (parent == null) {
parent = ClassLoader.getSystemClassLoader();
}
return new URLClassLoader(classPath.toArray(new URL[0]), parent);
}
public static void addClasspath(String component) {
if ((component != null) && (component.length() > 0)) {
try {
File f = new File(component);
if (f.exists()) {
URL key = f.getCanonicalFile().toURL();
if (!classPath.contains(key)) {
classPath.add(key);
}
}
} catch (IOException e) {
}
}
}
}
3、3 新建一个Class: GBKFileOutFormat.java 整个类代码如下:
public class GBKFileOutputFormat<K, V> extends FileOutputFormat<K, V> {//TextInputFormat是默认的输出文件格式
protected static class LineRecordWriter<K, V>//默认
extends RecordWriter<K, V> {
private static final String utf8 = "gbk"; //硬编码,将“UTF-8”改为“GBK”
private static final byte[] newline;//行结束符?
static {
try {
newline = "\n".getBytes(utf8);
} catch (UnsupportedEncodingException uee) {
throw new IllegalArgumentException("can't find " + utf8 + " encoding");
}
}
protected DataOutputStream out;
private final byte[] keyValueSeparator;//key和value的分隔符,默认的好像是Tab
public LineRecordWriter(DataOutputStream out, String keyValueSeparator) {//构造函数,初始化输出流及分隔符
this.out = out;
try {
this.keyValueSeparator = keyValueSeparator.getBytes(utf8);
} catch (UnsupportedEncodingException uee) {
throw new IllegalArgumentException("can't find " + utf8 + " encoding");
}
}
public LineRecordWriter(DataOutputStream out) {//默认的分隔符
this(out, "\t");
}
/**
* Write the object to the byte stream, handling Text as a special输出流是byte格式的
* case.
* @param o the object to print是要输出的对象
* @throws IOException if the write throws, we pass it on
*/
private void writeObject(Object o) throws IOException {//应该是一行一行的写 key keyValueSeparator value \n
if (o instanceof Text) {//如果o是Text的实例
Text to = (Text) o;
out.write(to.getBytes(), 0, to.getLength());//写出
} else {
out.write(o.toString().getBytes(utf8));
}
}
public synchronized void write(K key, V value)//给写线程加锁,写是互斥行为
throws IOException {
//下面是为了判断key和value是否为空值
boolean nullKey = key == null || key instanceof NullWritable;//这语句太牛了
boolean nullValue = value == null || value instanceof NullWritable;
if (nullKey && nullValue) {//
return;
}
if (!nullKey) {
writeObject(key);
}
if (!(nullKey || nullValue)) {
out.write(keyValueSeparator);
}
if (!nullValue) {
writeObject(value);
}
out.write(newline);
}
public synchronized
void close(TaskAttemptContext context) throws IOException {
out.close();
}
}
public RecordWriter<K, V> getRecordWriter(TaskAttemptContext job//获得writer实例
) throws IOException, InterruptedException {
Configuration conf = job.getConfiguration();
boolean isCompressed = getCompressOutput(job);//
String keyValueSeparator= conf.get("mapred.textoutputformat.separator",
"\t");
CompressionCodec codec = null;//压缩格式 还是?
String extension = "";
if (isCompressed) {
Class<? extends CompressionCodec> codecClass = <
cd9a
br />
getOutputCompressorClass(job, GzipCodec.class);
codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf);
extension = codec.getDefaultExtension();
}
Path file = getDefaultWorkFile(job, extension);//这个是获取缺省的文件路径及名称,在FileOutput中有对其的实现
FileSystem fs = file.getFileSystem(conf);
if (!isCompressed) {
FSDataOutputStream fileOut = fs.create(file, false);
return new LineRecordWriter<K, V>(fileOut, keyValueSeparator);
} else {
FSDataOutputStream fileOut = fs.create(file, false);
return new LineRecordWriter<K, V>(new DataOutputStream
(codec.createOutputStream(fileOut)),
keyValueSeparator);
}
}
}
3、3 修改WordCount.java 类,整个类代码如下:
public class WordCount {
public static class TokenizerMapper
extends Mapper<Object, Text, Text, IntWritable>{
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
/*byte[] bt = value.getBytes();
InputStream ip = new ByteArrayInputStream(bt);
Reader read = new InputStreamReader(ip);
IKSegmenter iks = new IKSegmenter(read,true);
Lexeme t;
while ((t = iks.next()) != null)
{
word.set(t.getLexemeText());
context.write(word, one);
}*/
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}
public static class IntSumReducer
extends Reducer<Text,IntWritable,Text,IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values,
Context context
) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
conf.set("mapred.job.tracker", "hdfs://192.168.31.103:9001");
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: wordcount <in> <out>");
System.exit(2);
}
Job job = new Job(conf, "word count");
File jarFile = EJob.createTempJar("bin");
EJob.addClasspath("/usr/hadoop/hadoop-1.2.1/conf");
ClassLoader classLoader = EJob.getClassLoader();
Thread.currentThread().setContextClassLoader(classLoader);
((JobConf) job.getConfiguration()).setJar(jarFile.toString());
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputFormatClass(GBKFileOutputFormat.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
4、 准备 需要处理的文件。 并上传到hadoop集群中。1 这里可以通过FTP工具上传到linux目录中,并通过命令上传到 HDFS 中。同时 2 也可以直接使用Eclipse 的hadoop 插件选择文件直接上传到 HDFS 中。推荐 1。在使用 2 中,可能会出现本地文件中文ok,但是上传到 HDFS 中就会出现乱码的情况。
我的文件名称为:keywords.txt 内容如下(小部分):【我上传文件到 HDFS 中: /input/ 下】
搜索词
标签纸(TEPRA18白色强粘)
标签打印解决方案
重庆 标签纸(TEPRA18白色强粘)
车载电子标签(OBU)
重庆专业印不干胶
RFID标签打印机
名片打印机
微信二维码
完美二维码
二维码扫描
重庆金属二维码
ups集成商
苏 logo
苹果5s防伪码查找
重庆 标签纸(TEPRA18白色强粘)
商标生产批发
重庆线材加工厂
用二维码做营销
pvc不干胶
rfid标签
印制不干胶
重庆不干胶干胶广告加工厂
重庆不干胶打印纸
GoDEX
二维码扫描天天快递
重庆 二维码
二维码
快照二维码
泳衣品牌
怎样做二维码
发票查询真伪
重庆标签
.........
........
.......
5、 运行WordCount程序。
5、1 在WordCount.java 中,右键: run as ---> run configurations --> MainClass: deenken.WordCount
---> Arguments ---> Program arguments : hdfs://192.168.31.103:9000/input/keywords.txt
hdfs://192.168.31.103:9000/output
-----> VM Arguments : -Xms512m -Xmx1024m -XX:MaxPermSize=256m
------> Applay ----> Run。
程序运行后,会在 console 显示运行信息:
15/02/16 15:26:39 INFO input.FileInputFormat: Total input paths to process : 1
15/02/16 15:26:39 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
15/02/16 15:26:39 WARN snappy.LoadSnappy: Snappy native library not loaded
15/02/16 15:26:40 INFO mapred.JobClient: Running job: job_201502161506_0002
15/02/16 15:26:41 INFO mapred.JobClient: map 0% reduce 0%
15/02/16 15:26:52 INFO mapred.JobClient: map 100% reduce 0%
15/02/16 15:27:01 INFO mapred.JobClient: map 100% reduce 33%
15/02/16 15:27:03 INFO mapred.JobClient: map 100% reduce 100%
15/02/16 15:27:06 INFO mapred.JobClient: Job complete: job_201502161506_0002
15/02/16 15:27:06 INFO mapred.JobClient: Counters: 29
15/02/16 15:27:06 INFO mapred.JobClient: Job Counters
15/02/16 15:27:06 INFO mapred.JobClient: Launched reduce tasks=1
15/02/16 15:27:06 INFO mapred.JobClient: SLOTS_MILLIS_MAPS=12433
15/02/16 15:27:06 INFO mapred.JobClient: Total time spent by all reduces waiting after reserving slots (ms)=0
15/02/16 15:27:06 INFO mapred.JobClient: Total time spent by all maps waiting after reserving slots (ms)=0
15/02/16 15:27:06 INFO mapred.JobClient: Launched map tasks=1
15/02/16 15:27:06 INFO mapred.JobClient: Data-local map tasks=1
15/02/16 15:27:06 INFO mapred.JobClient: SLOTS_MILLIS_REDUCES=11394
15/02/16 15:27:06 INFO mapred.JobClient: File Output Format Counters
15/02/16 15:27:06 INFO mapred.JobClient: Bytes Written=64234
15/02/16 15:27:06 INFO mapred.JobClient: FileSystemCounters
15/02/16 15:27:06 INFO mapred.JobClient: FILE_BYTES_READ=74969
15/02/16 15:27:06 INFO mapred.JobClient: HDFS_BYTES_READ=91407
15/02/16 15:27:06 INFO mapred.JobClient: FILE_BYTES_WRITTEN=265814
15/02/16 15:27:06 INFO mapred.JobClient: HDFS_BYTES_WRITTEN=64234
15/02/16 15:27:06 INFO mapred.JobClient: File Input Format Counters
15/02/16 15:27:06 INFO mapred.JobClient: Bytes Read=91297
15/02/16 15:27:06 INFO mapred.JobClient: Map-Reduce Framework
15/02/16 15:27:06 INFO mapred.JobClient: Map output materialized bytes=74969
15/02/16 15:27:06 INFO mapred.JobClient: Map input records=4363
15/02/16 15:27:06 INFO mapred.JobClient: Reduce shuffle bytes=74969
15/02/16 15:27:06 INFO mapred.JobClient: Spilled Records=5388
15/02/16 15:27:06 INFO mapred.JobClient: Map output bytes=110539
15/02/16 15:27:06 INFO mapred.JobClient: Total committed heap usage (bytes)=176492544
15/02/16 15:27:06 INFO mapred.JobClient: CPU time spent (ms)=2130
15/02/16 15:27:06 INFO mapred.JobClient: Combine input records=4818
15/02/16 15:27:06 INFO mapred.JobClient: SPLIT_RAW_BYTES=110
15/02/16 15:27:06 INFO mapred.JobClient: Reduce input records=2694
15/02/16 15:27:06 INFO mapred.JobClient: Reduce input groups=2694
15/02/16 15:27:06 INFO mapred.JobClient: Combine output records=2694
15/02/16 15:27:06 INFO mapred.JobClient: Physical memory (bytes) snapshot=184217600
15/02/16 15:27:06 INFO mapred.JobClient: Reduce output records=2694
15/02/16 15:27:06 INFO mapred.JobClient: Virtual memory (bytes) snapshot=696303616
15/02/16 15:27:06 INFO mapred.JobClient: Map output records=4818
显示运行成功。
5、2 运行提示成功后,查看运行结果。结果在我们配置的output文件夹中。
[root@caixen-3 ~]# hadoop dfs -ls /output/
Found 3 items
-rw-r--r-- 3 Caixen supergroup 0 2015-02-16 15:19 /output/_SUCCESS
drwxr-xr-x - Caixen supergroup 0 2015-02-16 15:18 /output/_logs
-rw-r--r-- 3 Caixen supergroup 76863 2015-02-16 15:19 /output/part-r-00000
查看结果
[root@caixen-3 hadoop_data]# hadoop dfs -cat /output/part-r-00000 【部分结果】
.......
20*50条码纸 1
200E和244比较 1
200点 1
2030兄弟标签机回收重庆 1
214 1
214报价 1
2410 1
244 5
244条码打印机 1
2微码 2
2维码怎么扫 1
2维码扫描 1
300dpi 3
300点打印头 1
3010电缆标示 1
308标致 2
315查询防伪码 2
3190G扫描枪 3
342ePro 1
344m 1
360手机 1
3M不干胶 1
3d打印公司 1
3d打印公司招聘 2
400 1
600点打印机 1
61372613 2
64位usb转com 2
6925794300072条码价格搜索 1
6926161802014 1
70*25条码不干胶标签打印纸哪里有卖 1
80*80热敏条码纸 1
.............
愉快的完成!!!
注: 以上一些代码是网络参考得到。
1、环境准备。
1、1 准备好有hadoop 插件的Eclipse
1、2 启动好linux虚拟机
1、3 启动好虚拟机后,需要对 hdfs-site.xml 配置文件 添加一个新的属性:dfs.permissions 为 false。 如下:
<configuration>
<property>
<name>dfs.replication</name>
<value>1</value>
</property>
<property>
<name>dfs.permissions</name>
<value>false</value>
</property>
</configuration>
保存并退出
1、4 start-all.sh 启动 hadoop。
1、5 禁用ipatbles 防火墙规则。
2、为linux 添加一个root权限的账户。 该账户名称为你登陆的windwos 的用户名。因为是远程调试,所以需要一个与windows一样的账户名称来运行hadoop。且该账户有足够的权限来运行hadoop。 我直接使用root权限。如下是添加账户示例:(PS: 我 windows 的用户名是 Caixen ,所以以下以 Caixen 为例)
[root@caixen-3 ~]# adduser Caixen //添加一个名为Caixen的用户
[root@caixen-3 ~]#passwd Caixen //修改密码
账户添加成功后。给Caixen附 root 权限。
修改 /etc/passwd 文件,找到如下行,把用户ID修改为 0 ,如下所示:
Caixen:x:0:500::/home/Caixen:/bin/bash
保存并退出。
测试Caixen 账户: logout 登出当前用户, 然后使用 Caixen 用户名和密码登陆即可。
3、在Eclipse 中新建一个 MapReduce Driver 项目,取名为:KeyWordCount
3、1 在hadoop-1.2.1 原解压文件中找到 WordCount.java 文件,并拷贝到 KeyWordCount 项目中。
WordCount.java 文件路径是:hadoop-1.2.1\src\examples\org\apache\hadoop\examples
3、2 新建一个Class: Ejob .java 整个类代码如下:
public class EJob {
// To declare global field
private static List<URL> classPath = new ArrayList<URL>();
// To declare method
public static File createTempJar(String root) throws IOException {
if (!new File(root).exists()) {
return null;
}
Manifest manifest = new Manifest();
manifest.getMainAttributes().putValue("Manifest-Version", "1.0");
final File jarFile = File.createTempFile("EJob-", ".jar", new File(
System.getProperty("java.io.tmpdir")));
Runtime.getRuntime().addShutdownHook(new Thread() {
public void run() {
jarFile.delete();
}
});
JarOutputStream out = new JarOutputStream(
new FileOutputStream(jarFile), manifest);
createTempJarInner(out, new File(root), "");
out.flush();
out.close();
return jarFile;
}
private static void createTempJarInner(JarOutputStream out, File f,
String base) throws IOException {
if (f.isDirectory()) {
File[] fl = f.listFiles();
if (base.length() > 0) {
base = base + "/";
}
for (int i = 0; i < fl.length; i++) {
createTempJarInner(out, fl[i], base + fl[i].getName());
}
} else {
out.putNextEntry(new JarEntry(base));
FileInputStream in = new FileInputStream(f);
byte[] buffer = new byte[1024];
int n = in.read(buffer);
while (n != -1) {
out.write(buffer, 0, n);
n = in.read(buffer);
}
in.close();
}
}
public static ClassLoader getClassLoader() {
ClassLoader parent = Thread.currentThread().getContextClassLoader();
if (parent == null) {
parent = EJob.class.getClassLoader();
}
if (parent == null) {
parent = ClassLoader.getSystemClassLoader();
}
return new URLClassLoader(classPath.toArray(new URL[0]), parent);
}
public static void addClasspath(String component) {
if ((component != null) && (component.length() > 0)) {
try {
File f = new File(component);
if (f.exists()) {
URL key = f.getCanonicalFile().toURL();
if (!classPath.contains(key)) {
classPath.add(key);
}
}
} catch (IOException e) {
}
}
}
}
3、3 新建一个Class: GBKFileOutFormat.java 整个类代码如下:
public class GBKFileOutputFormat<K, V> extends FileOutputFormat<K, V> {//TextInputFormat是默认的输出文件格式
protected static class LineRecordWriter<K, V>//默认
extends RecordWriter<K, V> {
private static final String utf8 = "gbk"; //硬编码,将“UTF-8”改为“GBK”
private static final byte[] newline;//行结束符?
static {
try {
newline = "\n".getBytes(utf8);
} catch (UnsupportedEncodingException uee) {
throw new IllegalArgumentException("can't find " + utf8 + " encoding");
}
}
protected DataOutputStream out;
private final byte[] keyValueSeparator;//key和value的分隔符,默认的好像是Tab
public LineRecordWriter(DataOutputStream out, String keyValueSeparator) {//构造函数,初始化输出流及分隔符
this.out = out;
try {
this.keyValueSeparator = keyValueSeparator.getBytes(utf8);
} catch (UnsupportedEncodingException uee) {
throw new IllegalArgumentException("can't find " + utf8 + " encoding");
}
}
public LineRecordWriter(DataOutputStream out) {//默认的分隔符
this(out, "\t");
}
/**
* Write the object to the byte stream, handling Text as a special输出流是byte格式的
* case.
* @param o the object to print是要输出的对象
* @throws IOException if the write throws, we pass it on
*/
private void writeObject(Object o) throws IOException {//应该是一行一行的写 key keyValueSeparator value \n
if (o instanceof Text) {//如果o是Text的实例
Text to = (Text) o;
out.write(to.getBytes(), 0, to.getLength());//写出
} else {
out.write(o.toString().getBytes(utf8));
}
}
public synchronized void write(K key, V value)//给写线程加锁,写是互斥行为
throws IOException {
//下面是为了判断key和value是否为空值
boolean nullKey = key == null || key instanceof NullWritable;//这语句太牛了
boolean nullValue = value == null || value instanceof NullWritable;
if (nullKey && nullValue) {//
return;
}
if (!nullKey) {
writeObject(key);
}
if (!(nullKey || nullValue)) {
out.write(keyValueSeparator);
}
if (!nullValue) {
writeObject(value);
}
out.write(newline);
}
public synchronized
void close(TaskAttemptContext context) throws IOException {
out.close();
}
}
public RecordWriter<K, V> getRecordWriter(TaskAttemptContext job//获得writer实例
) throws IOException, InterruptedException {
Configuration conf = job.getConfiguration();
boolean isCompressed = getCompressOutput(job);//
String keyValueSeparator= conf.get("mapred.textoutputformat.separator",
"\t");
CompressionCodec codec = null;//压缩格式 还是?
String extension = "";
if (isCompressed) {
Class<? extends CompressionCodec> codecClass = <
cd9a
br />
getOutputCompressorClass(job, GzipCodec.class);
codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf);
extension = codec.getDefaultExtension();
}
Path file = getDefaultWorkFile(job, extension);//这个是获取缺省的文件路径及名称,在FileOutput中有对其的实现
FileSystem fs = file.getFileSystem(conf);
if (!isCompressed) {
FSDataOutputStream fileOut = fs.create(file, false);
return new LineRecordWriter<K, V>(fileOut, keyValueSeparator);
} else {
FSDataOutputStream fileOut = fs.create(file, false);
return new LineRecordWriter<K, V>(new DataOutputStream
(codec.createOutputStream(fileOut)),
keyValueSeparator);
}
}
}
3、3 修改WordCount.java 类,整个类代码如下:
public class WordCount {
public static class TokenizerMapper
extends Mapper<Object, Text, Text, IntWritable>{
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
/*byte[] bt = value.getBytes();
InputStream ip = new ByteArrayInputStream(bt);
Reader read = new InputStreamReader(ip);
IKSegmenter iks = new IKSegmenter(read,true);
Lexeme t;
while ((t = iks.next()) != null)
{
word.set(t.getLexemeText());
context.write(word, one);
}*/
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}
public static class IntSumReducer
extends Reducer<Text,IntWritable,Text,IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values,
Context context
) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
conf.set("mapred.job.tracker", "hdfs://192.168.31.103:9001");
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: wordcount <in> <out>");
System.exit(2);
}
Job job = new Job(conf, "word count");
File jarFile = EJob.createTempJar("bin");
EJob.addClasspath("/usr/hadoop/hadoop-1.2.1/conf");
ClassLoader classLoader = EJob.getClassLoader();
Thread.currentThread().setContextClassLoader(classLoader);
((JobConf) job.getConfiguration()).setJar(jarFile.toString());
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputFormatClass(GBKFileOutputFormat.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
4、 准备 需要处理的文件。 并上传到hadoop集群中。1 这里可以通过FTP工具上传到linux目录中,并通过命令上传到 HDFS 中。同时 2 也可以直接使用Eclipse 的hadoop 插件选择文件直接上传到 HDFS 中。推荐 1。在使用 2 中,可能会出现本地文件中文ok,但是上传到 HDFS 中就会出现乱码的情况。
我的文件名称为:keywords.txt 内容如下(小部分):【我上传文件到 HDFS 中: /input/ 下】
搜索词
标签纸(TEPRA18白色强粘)
标签打印解决方案
重庆 标签纸(TEPRA18白色强粘)
车载电子标签(OBU)
重庆专业印不干胶
RFID标签打印机
名片打印机
微信二维码
完美二维码
二维码扫描
重庆金属二维码
ups集成商
苏 logo
苹果5s防伪码查找
重庆 标签纸(TEPRA18白色强粘)
商标生产批发
重庆线材加工厂
用二维码做营销
pvc不干胶
rfid标签
印制不干胶
重庆不干胶干胶广告加工厂
重庆不干胶打印纸
GoDEX
二维码扫描天天快递
重庆 二维码
二维码
快照二维码
泳衣品牌
怎样做二维码
发票查询真伪
重庆标签
.........
........
.......
5、 运行WordCount程序。
5、1 在WordCount.java 中,右键: run as ---> run configurations --> MainClass: deenken.WordCount
---> Arguments ---> Program arguments : hdfs://192.168.31.103:9000/input/keywords.txt
hdfs://192.168.31.103:9000/output
-----> VM Arguments : -Xms512m -Xmx1024m -XX:MaxPermSize=256m
------> Applay ----> Run。
程序运行后,会在 console 显示运行信息:
15/02/16 15:26:39 INFO input.FileInputFormat: Total input paths to process : 1
15/02/16 15:26:39 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
15/02/16 15:26:39 WARN snappy.LoadSnappy: Snappy native library not loaded
15/02/16 15:26:40 INFO mapred.JobClient: Running job: job_201502161506_0002
15/02/16 15:26:41 INFO mapred.JobClient: map 0% reduce 0%
15/02/16 15:26:52 INFO mapred.JobClient: map 100% reduce 0%
15/02/16 15:27:01 INFO mapred.JobClient: map 100% reduce 33%
15/02/16 15:27:03 INFO mapred.JobClient: map 100% reduce 100%
15/02/16 15:27:06 INFO mapred.JobClient: Job complete: job_201502161506_0002
15/02/16 15:27:06 INFO mapred.JobClient: Counters: 29
15/02/16 15:27:06 INFO mapred.JobClient: Job Counters
15/02/16 15:27:06 INFO mapred.JobClient: Launched reduce tasks=1
15/02/16 15:27:06 INFO mapred.JobClient: SLOTS_MILLIS_MAPS=12433
15/02/16 15:27:06 INFO mapred.JobClient: Total time spent by all reduces waiting after reserving slots (ms)=0
15/02/16 15:27:06 INFO mapred.JobClient: Total time spent by all maps waiting after reserving slots (ms)=0
15/02/16 15:27:06 INFO mapred.JobClient: Launched map tasks=1
15/02/16 15:27:06 INFO mapred.JobClient: Data-local map tasks=1
15/02/16 15:27:06 INFO mapred.JobClient: SLOTS_MILLIS_REDUCES=11394
15/02/16 15:27:06 INFO mapred.JobClient: File Output Format Counters
15/02/16 15:27:06 INFO mapred.JobClient: Bytes Written=64234
15/02/16 15:27:06 INFO mapred.JobClient: FileSystemCounters
15/02/16 15:27:06 INFO mapred.JobClient: FILE_BYTES_READ=74969
15/02/16 15:27:06 INFO mapred.JobClient: HDFS_BYTES_READ=91407
15/02/16 15:27:06 INFO mapred.JobClient: FILE_BYTES_WRITTEN=265814
15/02/16 15:27:06 INFO mapred.JobClient: HDFS_BYTES_WRITTEN=64234
15/02/16 15:27:06 INFO mapred.JobClient: File Input Format Counters
15/02/16 15:27:06 INFO mapred.JobClient: Bytes Read=91297
15/02/16 15:27:06 INFO mapred.JobClient: Map-Reduce Framework
15/02/16 15:27:06 INFO mapred.JobClient: Map output materialized bytes=74969
15/02/16 15:27:06 INFO mapred.JobClient: Map input records=4363
15/02/16 15:27:06 INFO mapred.JobClient: Reduce shuffle bytes=74969
15/02/16 15:27:06 INFO mapred.JobClient: Spilled Records=5388
15/02/16 15:27:06 INFO mapred.JobClient: Map output bytes=110539
15/02/16 15:27:06 INFO mapred.JobClient: Total committed heap usage (bytes)=176492544
15/02/16 15:27:06 INFO mapred.JobClient: CPU time spent (ms)=2130
15/02/16 15:27:06 INFO mapred.JobClient: Combine input records=4818
15/02/16 15:27:06 INFO mapred.JobClient: SPLIT_RAW_BYTES=110
15/02/16 15:27:06 INFO mapred.JobClient: Reduce input records=2694
15/02/16 15:27:06 INFO mapred.JobClient: Reduce input groups=2694
15/02/16 15:27:06 INFO mapred.JobClient: Combine output records=2694
15/02/16 15:27:06 INFO mapred.JobClient: Physical memory (bytes) snapshot=184217600
15/02/16 15:27:06 INFO mapred.JobClient: Reduce output records=2694
15/02/16 15:27:06 INFO mapred.JobClient: Virtual memory (bytes) snapshot=696303616
15/02/16 15:27:06 INFO mapred.JobClient: Map output records=4818
显示运行成功。
5、2 运行提示成功后,查看运行结果。结果在我们配置的output文件夹中。
[root@caixen-3 ~]# hadoop dfs -ls /output/
Found 3 items
-rw-r--r-- 3 Caixen supergroup 0 2015-02-16 15:19 /output/_SUCCESS
drwxr-xr-x - Caixen supergroup 0 2015-02-16 15:18 /output/_logs
-rw-r--r-- 3 Caixen supergroup 76863 2015-02-16 15:19 /output/part-r-00000
查看结果
[root@caixen-3 hadoop_data]# hadoop dfs -cat /output/part-r-00000 【部分结果】
.......
20*50条码纸 1
200E和244比较 1
200点 1
2030兄弟标签机回收重庆 1
214 1
214报价 1
2410 1
244 5
244条码打印机 1
2微码 2
2维码怎么扫 1
2维码扫描 1
300dpi 3
300点打印头 1
3010电缆标示 1
308标致 2
315查询防伪码 2
3190G扫描枪 3
342ePro 1
344m 1
360手机 1
3M不干胶 1
3d打印公司 1
3d打印公司招聘 2
400 1
600点打印机 1
61372613 2
64位usb转com 2
6925794300072条码价格搜索 1
6926161802014 1
70*25条码不干胶标签打印纸哪里有卖 1
80*80热敏条码纸 1
.............
愉快的完成!!!
注: 以上一些代码是网络参考得到。
相关文章推荐
- 本地eclipse连接远程hadoop集群运行wordcount实例,实现远程调试
- Hadoop eclipse插件安装和在eclipse运行wordcount程序
- 在windows上用eclipse远程运行hadoop上的wordcount程序出现的问题,求解决
- win7下安装hadoop 2.6.0 的eclipse插件并编写运行WordCount程序
- 一步一步跟我学习hadoop(2)----hadoop eclipse插件安装和运行wordcount程序
- hadoop2.2.0配置eclipse运行wordcount程序问题及解决方法
- hadoop学习之HDFS(2.1):linux下eclipse中配置hadoop-mapreduce开发环境并运行WordCount.java程序
- win7(64位)平台下Cygwin+Eclipse搭建Hadoop单机开发环境 (四) 导入Hadoop源码+wordcount程序+运行
- Windows环境下eclipse提交到远程wordcount程序报错 at org.apache.hadoop.util.Shell.runCommand(Shell.java:545)
- ubuntu系统下eclipse配置hadoop开发环境并运行wordcount程序
- eclipse配置hadoop开发环境并运行WordCount小程序
- ubuntu系统下eclipse配置hadoop开发环境并运行wordcount程序
- Hadoop小兵笔记【三】利用Eclipse将wordcount打包成可以运行在hadoop上的jar包
- window8.1编译hadoop2 eclipse插件,并远程调试hadoop程序
- 利用hadoop自带程序运行wordcount
- eclipse配置hadoop开发环境并运行WordCount小程序
- Eclipse下运行hadoop自带的mapreduce程序--wordcount