您的位置:首页 > 其它

三种方法实现Spark计算WordCount

2017-08-20 21:10 639 查看
1.spark-shell

val lines = sc.textFile("hdfs://spark1:9000/spark.txt")
val words = lines.flatMap(line => line.split(" "))
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)
wordCounts.foreach(wordcount => println(wordcount._1 + " appeared " + wordcount._2 + " times"))

2.Java实现方式

package cn.itcast.hadoop.mr;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/*
* 1.分析具体的业力逻辑,确定输入输出数据样式
* 2.自定义一个类,这个类要继承import org.apache.hadoop.mapreduce.Mapper;
* 重写map方法,实现具体业务逻辑,将新的kv输出
* 3.自定义一个类,这个类要继承import org.apache.hadoop.mapreduce.Reducer;
* 重写reduce,实现具体业务逻辑
* 4.将自定义的mapper和reducer通过job对象组装起来
*/

public class WordCount {
public static class WCMapper extends Mapper {

@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
//接收数据V1
String line = value.toString();
//切分数据
String[] wordsStrings = line.split(" ");
//循环
for (String w: wordsStrings) {
//出现一次,记一个一,输出
context.write(new Text(w), new LongWritable(1));
}
}
}
public static class WCReducer extends Reducer {

@Override
protected void reduce(Text key, Iterable v2s, Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
//接收数据
//Text k3 = k2;
//定义一个计算器
long counter = 0;
//循环v2s
for (LongWritable i : v2s)
{
counter += i.get();
}
//输出
context.write(key, new LongWritable(counter));
}
}
public static void main(String[] args) throws Exception {
// 构建Job对象
Job job = Job.getInstance(new Configuration());

// 注意:main方法所在的类
job.setJarByClass(WordCount.class);

// 设置Mapper相关属性
job.setMapperClass(WCMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));

// 设置Reducer相关属性
job.setReducerClass(WCReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
FileOutputFormat.setOutputPath(job, new Path(args[1]));

job.setCombinerClass(WCReducer.class);
// 提交任务
job.waitForCompletion(true);
}
}

最后,需要使用spark submit提交到spark集群中进行运行,执行脚本如下:
/usr/local/spark/bin/spark-submit \
--class cn.spark.study.core.CopyOfWordCountCluster \
--num-executors 3 \
--driver-memory 100m \
--executor-memory 100m \
--executor-cores 3 \
/usr/local/spark-study/java/spark-study-java-0.0.1-SNAPSHOT-jar-with-dependencies.jar \


3.Scala for eclipse

package cn.spark.study.core

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext

object WordCount {

def main(args: Array[String]) {
val conf = new SparkConf()
.setAppName("WordCount");
val sc = new SparkContext(conf)

val lines = sc.textFile("hdfs://spark1:9000/spark.txt", 1);
val words = lines.flatMap { line => line.split(" ")}
val pairs = words.map {word => (word, 1)}
val wordCount = pairs.reduceByKey(_ + _)
wordCount.foreach(wordCount => println(wordCount._1 + " appeared " + wordCount._2 + " times"))
}
}
最后,需要使用spark submit提交到spark集群中进行运行,执行脚本如下:
/usr/local/spark/bin/spark-submit \
--class cn.spark.study.core.WordCount \
--num-executors 3 \
--driver-memory 100m \
--executor-memory 100m \
--executor-cores 3 \
/usr/local/spark-study/scala/wordcount.jar \
~


运行结果如下:

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: