您的位置:首页 > 编程语言 > Java开发

Eclipse重写Wordcount类实现处理中文字符,利用hadoop Eclipse插件远程调试hadoop运行WordCount程序

2015-02-16 10:26 866 查看
在上面一篇博客中,制作了hadoop 的 Eclipse 插件。现在我们使用Eclipse来重写wordcount统计中文关键词keywords.

1、环境准备。

1、1  准备好有hadoop 插件的Eclipse 

1、2  启动好linux虚拟机

1、3   启动好虚拟机后,需要对  hdfs-site.xml  配置文件  添加一个新的属性:dfs.permissions  为 false。 如下:

<configuration>

<property>

        <name>dfs.replication</name>

        <value>1</value>

</property>

<property>

        <name>dfs.permissions</name>

        <value>false</value>

</property>

</configuration>

保存并退出

1、4  start-all.sh 启动 hadoop。 

1、5  禁用ipatbles 防火墙规则。

2、为linux 添加一个root权限的账户。  该账户名称为你登陆的windwos 的用户名。因为是远程调试,所以需要一个与windows一样的账户名称来运行hadoop。且该账户有足够的权限来运行hadoop。 我直接使用root权限。如下是添加账户示例:(PS: 我 windows 的用户名是 Caixen ,所以以下以 Caixen 为例)

[root@caixen-3 ~]# adduser Caixen       //添加一个名为Caixen的用户

[root@caixen-3 ~]#passwd Caixen       //修改密码

账户添加成功后。给Caixen附 root 权限。

修改 /etc/passwd 文件,找到如下行,把用户ID修改为 0 ,如下所示:

Caixen:x:0:500::/home/Caixen:/bin/bash

保存并退出。

测试Caixen 账户: logout   登出当前用户, 然后使用 Caixen 用户名和密码登陆即可。

3、在Eclipse 中新建一个  MapReduce Driver 项目,取名为:KeyWordCount

3、1   在hadoop-1.2.1 原解压文件中找到 WordCount.java 文件,并拷贝到 KeyWordCount 项目中。 

  WordCount.java 文件路径是:hadoop-1.2.1\src\examples\org\apache\hadoop\examples

3、2   新建一个Class: Ejob .java   整个类代码如下:

public class EJob {

    // To declare global field

    private static List<URL> classPath = new ArrayList<URL>();

    // To declare method

    public static File createTempJar(String root) throws IOException {

        if (!new File(root).exists()) {

            return null;

        }

        Manifest manifest = new Manifest();

        manifest.getMainAttributes().putValue("Manifest-Version", "1.0");

        final File jarFile = File.createTempFile("EJob-", ".jar", new File(

                System.getProperty("java.io.tmpdir")));

        Runtime.getRuntime().addShutdownHook(new Thread() {

            public void run() {

                jarFile.delete();

            }

        });

        JarOutputStream out = new JarOutputStream(

                new FileOutputStream(jarFile), manifest);

        createTempJarInner(out, new File(root), "");

        out.flush();

        out.close();

        return jarFile;

    }

    private static void createTempJarInner(JarOutputStream out, File f,

            String base) throws IOException {

        if (f.isDirectory()) {

            File[] fl = f.listFiles();

            if (base.length() > 0) {

                base = base + "/";

            }

            for (int i = 0; i < fl.length; i++) {

                createTempJarInner(out, fl[i], base + fl[i].getName());

            }

        } else {

            out.putNextEntry(new JarEntry(base));

            FileInputStream in = new FileInputStream(f);

            byte[] buffer = new byte[1024];

            int n = in.read(buffer);

            while (n != -1) {

                out.write(buffer, 0, n);

                n = in.read(buffer);

            }

            in.close();

        }

    }

    public static ClassLoader getClassLoader() {

        ClassLoader parent = Thread.currentThread().getContextClassLoader();

        if (parent == null) {

            parent = EJob.class.getClassLoader();

        }

        if (parent == null) {

            parent = ClassLoader.getSystemClassLoader();

        }

        return new URLClassLoader(classPath.toArray(new URL[0]), parent);

    }

    public static void addClasspath(String component) {

        if ((component != null) && (component.length() > 0)) {

            try {

                File f = new File(component);

                if (f.exists()) {

                    URL key = f.getCanonicalFile().toURL();

                    if (!classPath.contains(key)) {

                        classPath.add(key);

                    }

                }

            } catch (IOException e) {

            }

        }

    }

}

3、3   新建一个Class: GBKFileOutFormat.java     整个类代码如下:

public class GBKFileOutputFormat<K, V> extends FileOutputFormat<K, V> {//TextInputFormat是默认的输出文件格式  

  protected static class LineRecordWriter<K, V>//默认  

    extends RecordWriter<K, V> {  

    private static final String utf8 = "gbk";  //硬编码,将“UTF-8”改为“GBK”  

    private static final byte[] newline;//行结束符?  

    static {  

      try {  

        newline = "\n".getBytes(utf8);  

      } catch (UnsupportedEncodingException uee) {  

        throw new IllegalArgumentException("can't find " + utf8 + " encoding");  

      }  

    }  

  

    protected DataOutputStream out;  

    private final byte[] keyValueSeparator;//key和value的分隔符,默认的好像是Tab  

  

    public LineRecordWriter(DataOutputStream out, String keyValueSeparator) {//构造函数,初始化输出流及分隔符   

      this.out = out;  

      try {  

        this.keyValueSeparator = keyValueSeparator.getBytes(utf8);  

      } catch (UnsupportedEncodingException uee) {  

        throw new IllegalArgumentException("can't find " + utf8 + " encoding");  

      }  

    }  

  

    public LineRecordWriter(DataOutputStream out) {//默认的分隔符  

      this(out, "\t");  

    }  

  

    /** 

     * Write the object to the byte stream, handling Text as a special输出流是byte格式的 

     * case. 

     * @param o the object to print是要输出的对象 

     * @throws IOException if the write throws, we pass it on 

     */  

    private void writeObject(Object o) throws IOException {//应该是一行一行的写 key keyValueSeparator value \n  

      if (o instanceof Text) {//如果o是Text的实例  

        Text to = (Text) o;  

        out.write(to.getBytes(), 0, to.getLength());//写出  

      } else {  

        out.write(o.toString().getBytes(utf8));  

      }  

    }  

  

    public synchronized void write(K key, V value)//给写线程加锁,写是互斥行为  

      throws IOException {  

//下面是为了判断key和value是否为空值  

      boolean nullKey = key == null || key instanceof NullWritable;//这语句太牛了  

      boolean nullValue = value == null || value instanceof NullWritable;  

      if (nullKey && nullValue) {//  

        return;  

      }  

      if (!nullKey) {  

        writeObject(key);  

      }  

      if (!(nullKey || nullValue)) {  

        out.write(keyValueSeparator);  

      }  

      if (!nullValue) {  

        writeObject(value);  

      }  

      out.write(newline);  

    }  

  

    public synchronized   

    void close(TaskAttemptContext context) throws IOException {  

      out.close();  

    }  

  }  

  

  public RecordWriter<K, V>    getRecordWriter(TaskAttemptContext job//获得writer实例  

                         ) throws IOException, InterruptedException {  

    Configuration conf = job.getConfiguration();  

    boolean isCompressed = getCompressOutput(job);//  

    String keyValueSeparator= conf.get("mapred.textoutputformat.separator",  

                                       "\t");  

    CompressionCodec codec = null;//压缩格式 还是?  

    String extension = "";  

    if (isCompressed) {  

      Class<? extends CompressionCodec> codecClass =   <
cd9a
br />
        getOutputCompressorClass(job, GzipCodec.class);  

      codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf);  

      extension = codec.getDefaultExtension();  

    }  

    Path file = getDefaultWorkFile(job, extension);//这个是获取缺省的文件路径及名称,在FileOutput中有对其的实现  

    FileSystem fs = file.getFileSystem(conf);  

    if (!isCompressed) {  

      FSDataOutputStream fileOut = fs.create(file, false);  

      return new LineRecordWriter<K, V>(fileOut, keyValueSeparator);  

    } else {  

      FSDataOutputStream fileOut = fs.create(file, false);  

      return new LineRecordWriter<K, V>(new DataOutputStream  

                                        (codec.createOutputStream(fileOut)),  

                                        keyValueSeparator);  

    }  

  }  



3、3    修改WordCount.java 类,整个类代码如下:

public class WordCount {

  public static class TokenizerMapper 

       extends Mapper<Object, Text, Text, IntWritable>{

    

    private final static IntWritable one = new IntWritable(1);

    private Text word = new Text();

      

    public void map(Object key, Text value, Context context

                    ) throws IOException, InterruptedException {

    /*byte[] bt = value.getBytes();

        InputStream ip = new ByteArrayInputStream(bt);

        Reader read = new InputStreamReader(ip);

        IKSegmenter iks = new IKSegmenter(read,true);

        Lexeme t;

        while ((t = iks.next()) != null)

        {

            word.set(t.getLexemeText());

            context.write(word, one);

        }*/

      StringTokenizer itr = new StringTokenizer(value.toString());

      while (itr.hasMoreTokens()) {

        word.set(itr.nextToken());

        context.write(word, one);

      }

    }

  }

  

  public static class IntSumReducer 

       extends Reducer<Text,IntWritable,Text,IntWritable> {

    private IntWritable result = new IntWritable();

    public void reduce(Text key, Iterable<IntWritable> values, 

                       Context context

                       ) throws IOException, InterruptedException {

      int sum = 0;

      for (IntWritable val : values) {

        sum += val.get();

      }

      result.set(sum);

      context.write(key, result);

    }

  }

  public static void main(String[] args) throws Exception {

    Configuration conf = new Configuration();

    conf.set("mapred.job.tracker", "hdfs://192.168.31.103:9001");

    

    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();

    if (otherArgs.length != 2) {

      System.err.println("Usage: wordcount <in> <out>");

      System.exit(2);

    }

    Job job = new Job(conf, "word count");

    

    File jarFile = EJob.createTempJar("bin");

    EJob.addClasspath("/usr/hadoop/hadoop-1.2.1/conf");

    ClassLoader classLoader = EJob.getClassLoader();

    Thread.currentThread().setContextClassLoader(classLoader);

    ((JobConf) job.getConfiguration()).setJar(jarFile.toString());

    

    

    job.setJarByClass(WordCount.class);

    job.setMapperClass(TokenizerMapper.class);

    job.setCombinerClass(IntSumReducer.class);

    job.setReducerClass(IntSumReducer.class);

    job.setOutputKeyClass(Text.class);

    

    job.setOutputFormatClass(GBKFileOutputFormat.class);

    

    job.setOutputValueClass(IntWritable.class);

    FileInputFormat.addInputPath(job, new Path(otherArgs[0]));

    FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));

    System.exit(job.waitForCompletion(true) ? 0 : 1);

  }

}

4、 准备 需要处理的文件。  并上传到hadoop集群中。1  这里可以通过FTP工具上传到linux目录中,并通过命令上传到 HDFS 中。同时 2 也可以直接使用Eclipse 的hadoop 插件选择文件直接上传到 HDFS 中。推荐 1。在使用 2 中,可能会出现本地文件中文ok,但是上传到 HDFS 中就会出现乱码的情况。

我的文件名称为:keywords.txt  内容如下(小部分):【我上传文件到 HDFS 中:  /input/   下】

搜索词

标签纸(TEPRA18白色强粘)

标签打印解决方案

重庆 标签纸(TEPRA18白色强粘)

车载电子标签(OBU)

重庆专业印不干胶

RFID标签打印机

名片打印机

微信二维码

完美二维码

二维码扫描

重庆金属二维码

ups集成商

苏 logo

苹果5s防伪码查找

重庆 标签纸(TEPRA18白色强粘)

商标生产批发

重庆线材加工厂

用二维码做营销

pvc不干胶

rfid标签

印制不干胶

重庆不干胶干胶广告加工厂

重庆不干胶打印纸

GoDEX

二维码扫描天天快递

重庆 二维码

二维码

快照二维码

泳衣品牌

怎样做二维码

发票查询真伪

重庆标签

.........

........

.......

5、 运行WordCount程序。  

5、1    在WordCount.java 中,右键: run  as  --->    run  configurations --> MainClass: deenken.WordCount  

     ---> Arguments --->    Program arguments :  hdfs://192.168.31.103:9000/input/keywords.txt
    hdfs://192.168.31.103:9000/output

   ----->  VM Arguments : -Xms512m -Xmx1024m -XX:MaxPermSize=256m

  ------> Applay ----> Run。

程序运行后,会在 console 显示运行信息:

15/02/16 15:26:39 INFO input.FileInputFormat: Total input paths to process : 1

15/02/16 15:26:39 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

15/02/16 15:26:39 WARN snappy.LoadSnappy: Snappy native library not loaded

15/02/16 15:26:40 INFO mapred.JobClient: Running job: job_201502161506_0002
15/02/16 15:26:41 INFO mapred.JobClient:  map 0% reduce 0%

15/02/16 15:26:52 INFO mapred.JobClient:  map 100% reduce 0%

15/02/16 15:27:01 INFO mapred.JobClient:  map 100% reduce 33%

15/02/16 15:27:03 INFO mapred.JobClient:  map 100% reduce 100%


15/02/16 15:27:06 INFO mapred.JobClient: Job complete: job_201502161506_0002

15/02/16 15:27:06 INFO mapred.JobClient: Counters: 29

15/02/16 15:27:06 INFO mapred.JobClient:   Job Counters 

15/02/16 15:27:06 INFO mapred.JobClient:     Launched reduce tasks=1

15/02/16 15:27:06 INFO mapred.JobClient:     SLOTS_MILLIS_MAPS=12433

15/02/16 15:27:06 INFO mapred.JobClient:     Total time spent by all reduces waiting after reserving slots (ms)=0

15/02/16 15:27:06 INFO mapred.JobClient:     Total time spent by all maps waiting after reserving slots (ms)=0

15/02/16 15:27:06 INFO mapred.JobClient:     Launched map tasks=1

15/02/16 15:27:06 INFO mapred.JobClient:     Data-local map tasks=1

15/02/16 15:27:06 INFO mapred.JobClient:     SLOTS_MILLIS_REDUCES=11394

15/02/16 15:27:06 INFO mapred.JobClient:   File Output Format Counters 

15/02/16 15:27:06 INFO mapred.JobClient:     Bytes Written=64234

15/02/16 15:27:06 INFO mapred.JobClient:   FileSystemCounters

15/02/16 15:27:06 INFO mapred.JobClient:     FILE_BYTES_READ=74969

15/02/16 15:27:06 INFO mapred.JobClient:     HDFS_BYTES_READ=91407

15/02/16 15:27:06 INFO mapred.JobClient:     FILE_BYTES_WRITTEN=265814

15/02/16 15:27:06 INFO mapred.JobClient:     HDFS_BYTES_WRITTEN=64234

15/02/16 15:27:06 INFO mapred.JobClient:   File Input Format Counters 

15/02/16 15:27:06 INFO mapred.JobClient:     Bytes Read=91297

15/02/16 15:27:06 INFO mapred.JobClient:   Map-Reduce Framework

15/02/16 15:27:06 INFO mapred.JobClient:     Map output materialized bytes=74969

15/02/16 15:27:06 INFO mapred.JobClient:     Map input records=4363

15/02/16 15:27:06 INFO mapred.JobClient:     Reduce shuffle bytes=74969

15/02/16 15:27:06 INFO mapred.JobClient:     Spilled Records=5388

15/02/16 15:27:06 INFO mapred.JobClient:     Map output bytes=110539

15/02/16 15:27:06 INFO mapred.JobClient:     Total committed heap usage (bytes)=176492544

15/02/16 15:27:06 INFO mapred.JobClient:     CPU time spent (ms)=2130

15/02/16 15:27:06 INFO mapred.JobClient:     Combine input records=4818

15/02/16 15:27:06 INFO mapred.JobClient:     SPLIT_RAW_BYTES=110

15/02/16 15:27:06 INFO mapred.JobClient:     Reduce input records=2694

15/02/16 15:27:06 INFO mapred.JobClient:     Reduce input groups=2694

15/02/16 15:27:06 INFO mapred.JobClient:     Combine output records=2694

15/02/16 15:27:06 INFO mapred.JobClient:     Physical memory (bytes) snapshot=184217600

15/02/16 15:27:06 INFO mapred.JobClient:     Reduce output records=2694

15/02/16 15:27:06 INFO mapred.JobClient:     Virtual memory (bytes) snapshot=696303616

15/02/16 15:27:06 INFO mapred.JobClient:     Map output records=4818

显示运行成功。

5、2  运行提示成功后,查看运行结果。结果在我们配置的output文件夹中。

[root@caixen-3 ~]# hadoop dfs -ls /output/

Found 3 items

-rw-r--r--   3 Caixen supergroup          0 2015-02-16 15:19 /output/_SUCCESS

drwxr-xr-x   - Caixen supergroup          0 2015-02-16 15:18 /output/_logs

-rw-r--r--   3 Caixen supergroup      76863 2015-02-16 15:19 /output/part-r-00000

查看结果

[root@caixen-3 hadoop_data]# hadoop dfs -cat /output/part-r-00000    【部分结果】

.......

20*50条码纸     1

200E和244比较   1

200点   1

2030兄弟标签机回收重庆  1

214     1

214报价 1

2410    1

244     5

244条码打印机   1

2微码   2

2维码怎么扫     1

2维码扫描       1

300dpi  3

300点打印头     1

3010电缆标示    1

308标致 2

315查询防伪码   2

3190G扫描枪     3

342ePro 1

344m    1

360手机 1

3M不干胶        1

3d打印公司      1

3d打印公司招聘  2

400     1

600点打印机     1

61372613        2

64位usb转com    2

6925794300072条码价格搜索       1

6926161802014   1

70*25条码不干胶标签打印纸哪里有卖       1

80*80热敏条码纸 1

.............

愉快的完成!!!

注: 以上一些代码是网络参考得到。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: