您的位置:首页 > 运维架构 > 网站架构

mapreduce——统计电商网站,每个订单成交金额topn

2019-05-30 18:02 2381 查看

需求:有如下数据(本案例用了排序控制、分区控制、分组控制)

order001,u001,小米6,1999.9,2
order001,u001,雀巢咖啡,99.0,2
order001,u001,安慕希,250.0,2
order001,u001,经典红双喜,200.0,4
order001,u001,防水电脑包,400.0,2
order002,u002,小米手环,199.0,3
order002,u002,榴莲,15.0,10
order002,u002,苹果,4.5,20
order002,u002,肥皂,10.0,40
order003,u001,小米6,1999.9,2
order003,u001,雀巢咖啡,99.0,2
order003,u001,安慕希,250.0,2
order003,u001,经典红双喜,200.0,4
order003,u001,防水电脑包,400.0,2

问题分析

需要求出每一个订单中成交金额最大的n笔

本质:求分组TOPN

实现思路:

map: 读取数据切分字段,封装数据到一个bean中作为key传输,key要按照成交金额比大小

预期结果:

order001,u001,小米6,1999.9,3999.8
order001,u001,雀巢咖啡,99.0,198.0
order001,u001,安慕希,250.0,500.0
order001,u001,经典红双喜,200.0,800.0
order001,u001,防水电脑包,400.0,800.0
order002,u002,小米手环,199.0,597.0
order002,u002,榴莲,15.0,150.0
order002,u002,苹果,4.5,90.0
order002,u002,肥皂,10.0,400.0
order003,u001,小米6,1999.9,3999.8
order003,u001,雀巢咖啡,99.0,198.0
order003,u001,安慕希,250.0,500.0
order003,u001,经典红双喜,200.0,800.0
order003,u001,防水电脑包,400.0,800.0

reduce:利用自定义GroupingComparator将数据按订单id进行分组,然后在reduce方法中输出每组数据的前N条即可

预期结果输出每个订单前2条:

order001,u001,小米6,1999.9,3999.8
order001,u001,经典红双喜,200.0,800.0
order003,u001,小米6,1999.9,3999.8
order003,u001,经典红双喜,200.0,800.0
order002,u002,小米手环,199.0,597.0
order002,u002,肥皂,10.0,400.0

 

代码实现

版权@须臾之余https://my.oschina.net/u/3995125

封装数据到一个bean作为key传输,需要实现Hadoop序列化,走网络传输

public class OrderBean implements WritableComparable<OrderBean>{
private String orderId;
private String userId;
private String pdtName;
private float price;
private int number;
private float amountFee;

public void set(String orderId, String userId, String pdtName, float price, int number) {
this.orderId = orderId;
this.userId = userId;
this.pdtName = pdtName;
this.price = price;
this.number = number;
this.amountFee = price * number;
}

public String getOrderId() {
return orderId;
}

public void setOrderId(String orderId) {
this.orderId = orderId;
}

public String getUserId() {
return userId;
}

public void setUserId(String userId) {
this.userId = userId;
}

public String getPdtName() {
return pdtName;
}

public void setPdtName(String pdtName) {
this.pdtName = pdtName;
}

public float getPrice() {
return price;
}

public void setPrice(float price) {
this.price = price;
}

public int getNumber() {
return number;
}

public void setNumber(int number) {
this.number = number;
}

public float getAmountFee() {
return amountFee;
}

public void setAmountFee(float amountFee) {
this.amountFee = amountFee;
}

@Override
public String toString() {
return this.orderId + "," + this.userId + "," +
this.pdtName + "," + this.price + "," +
this.amountFee;
}

public int compareTo(OrderBean o) {
//比两个浮点数
return Float.compare(o.getAmountFee(),this.getAmountFee())==0
?this.pdtName.compareTo(o.pdtName)
:Float.compare(o.getAmountFee(),this.getAmountFee());
}

public void write(DataOutput dataOutput) throws IOException {

dataOutput.writeUTF(this.orderId);
dataOutput.writeUTF(this.userId);
dataOutput.writeUTF(this.pdtName);
dataOutput.writeFloat(this.price);
dataOutput.writeInt(this.number);

}
public void readFields(DataInput dataInput) throws IOException {
this.orderId=dataInput.readUTF();
this.userId=dataInput.readUTF();
this.pdtName=dataInput.readUTF();
this.price=dataInput.readFloat();
this.number=dataInput.readInt();
this.amountFee=this.price * this.number;
}
}

实现逻辑代码

public class OrderTopn {
public static class OrderTopnMapper extends Mapper<LongWritable,Text,Text,OrderBean>{

OrderBean orderBean = new OrderBean();
Text k=new Text();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] fields = value.toString().split(",");

orderBean.set(fields[0],fields[1],
fields[2],Float.parseFloat(fields[3]),
Integer.parseInt(fields[4]));
k.set(fields[0]);
//序列化的数据跟原来的不一样了
context.write(k,orderBean);

}
}

public static class OrderTopnReducer extends Reducer<Text,OrderBean,OrderBean,NullWritable>{

@Override
protected void reduce(Text key, Iterable<OrderBean> values, Context context) throws IOException, InterruptedException {

int topn = context.getConfiguration().getInt("order.top.n", 3);
ArrayList<OrderBean> beanList = new ArrayList<OrderBean>();
//reduce task提供的values迭代器,每次迭代返回的是同一个对象,只是set了不同的值

for(OrderBean orderBean:values){
//构造一个新的对象,来存储本次迭代出来的值
OrderBean newBean = new OrderBean();
newBean.set(orderBean.getOrderId(),orderBean.getUserId(),orderBean.getPdtName()
,orderBean.getPrice(),orderBean.getNumber());
beanList.add(newBean);
}

//对beanList中的orderBean对象进行排序(金额,商品名称)
Collections.sort(beanList);
for(int i=0;i<topn;i++){
context.write(beanList.get(i),NullWritable.get());
}
}
}

public static void main(String[] args) throws Exception{
Configuration conf = new Configuration();
conf.setInt("order.top.n",2);
Job job = Job.getInstance(conf);
//动态获取jar包在哪里
job.setJarByClass(OrderTopn.class);
//2.封装参数:本次job所要调用的mapper实现类
job.setMapperClass(OrderTopnMapper.class);
job.setReducerClass(OrderTopnReducer.class);
//3.封装参数:本次job的Mapper实现类产生的数据key,value的类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(OrderBean.class);
//4.封装参数:本次Reduce返回的key,value数据类型
job.setOutputKeyClass(OrderBean.class);
job.setOutputValueClass(NullWritable.class);

//6.封装参数:想要启动的reduce task的数量
job.setNumReduceTasks(2);
FileInputFormat.setInputPaths(job,new Path("F:\\mrdata\\ordertopn\\input"));
FileOutputFormat.setOutputPath(job,new Path("F:\\mrdata\\ordertopn\\out"));
boolean res = job.waitForCompletion(true);
System.exit(res ? 0:-1);
}
}

输出结果

part-r-000000

order001,u001,小米6,1999.9,3999.8
order001,u001,经典红双喜,200.0,800.0
order003,u001,小米6,1999.9,3999.8
order003,u001,经典红双喜,200.0,800.0

part-r-000001

order002,u002,小米手环,199.0,597.0
order002,u002,肥皂,10.0,400.0

版权@须臾之余https://my.oschina.net/u/3995125

 

 

 

 

 

 

(adsbygoogle = window.adsbygoogle || []).push({});
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  MapReduce Hadoop