您的位置:首页 > 其它

MapReduce算法设计-计算单词共现矩阵

2016-01-10 13:00 435 查看

技术原理

请参见博文:

Data-Intensive Text Processing with MapReduce 第三章(2)——PAIRS AND STRIPES

Data-Intensive Text Processing with MapReduce第三章(3)——COMPUTING RELATIVE FREQUENCIES

程序实现

首先我们实现基于共现次数的单词共现矩阵的MapReduce实现。

Pair的方式

自定义Pair类:

package mp.co_occurrence_matrix;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.io.WritableUtils;

/**
*
* @author liupenghe
* 自定义TextPair Writable类型
*
*/
public class TextPair implements WritableComparable<TextPair>{

private Text first;
private Text second;

/**
* 默认的构造函数,这样MapReduce方法才能创建对象,然后通过readFeilds方法从序列化数据流中独处进行赋值
*/
public TextPair() {
set (new Text(), new Text());
}

public TextPair(String first, String second) {
set(new Text(first), new Text(second));
}

public TextPair(Text first, Text second) {
set(first, second);
}

public void set(Text first, Text second) {
// TODO Auto-generated method stub
this.first = first;
this.second = second;
}

public Text getFirst() {
return first;
}

public Text getSecond() {
return second;
}

/**
* 通过成员对象本身的readFeilds方法,从输入流中反序列化每一个成员对象
* @param arg0
* @throws IOException
*/
@Override
public void readFields(DataInput arg0) throws IOException {
// TODO Auto-generated method stub
first.readFields(arg0);
second.readFields(arg0);
}

/**
* 通过成员对象本身的write方法,序列化每一个成员对象到输出流中
* @param arg0
* @throws IOException
*/
@Override
public void write(DataOutput arg0) throws IOException {
// TODO Auto-generated method stub
first.write(arg0);
second.write(arg0);
}

/**
* 实现WritableComparable必须要实现的方法,用语比较排序
* @param TextPair
* @return
*/

@Override
public int compareTo(TextPair tp) {
// TODO Auto-generated method stub
int cmp = first.compareTo(tp.first);
if(cmp != 0) {
return cmp;
}
return second.compareTo(tp.second);
}

/**
* 就像针对java语言构造任何值的对象,需要重写java.lang.Object中的hashCode(), equals()和toString()方法
*/

/**
* MapReduce需要一个Partitioner把map的输出作为输入分成一块块喂给多个reduce
* 默认的是HashPartitioner,它是通过对象的hashCode函数进行分割,所以hashCode的好坏决定了分割是否均匀,它是一个关键的方法
* @return
*/
//当不使用reletive frequency时采用该hashCode求值方式
@Override
public int hashCode() {
return first.hashCode() * 163 + second.hashCode();
}

@Override
public boolean equals(Object o) {
if(o instanceof TextPair) {
TextPair tp = (TextPair) o;
return first.equals(tp.first) && second.equals(tp.second);
}
return false;
}

/**
* 重写toString方法,作为TextOutputFormat输出格式的输出
* @return
*/
@Override
public String toString() {
return first + "," + second;
}

/**
* 当Textpair被用作健时,需要将数据流反序列化为对象,然后再调用compareTo()方法进行比较。
* 为了提升效率,可以直接对数据的序列化表示来进行比较
*/

public static class Comparator extends WritableComparator {
private static final Text.Comparator TEXT_COMPARATOR = new Text.Comparator();

public Comparator() {
super(TextPair.class);
}

@Override
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
try {
/**
* Text对象的二进制表示是一个长度可变的证书,包含字符串之UTF-8表示的字节数以及UTF-8字节本身。
* 读取该对象的起始长度,由此得知第一个Text对象的字节表示有多长;然后将该长度传给Text对象RawComparator方法
* 最后通过计算第一个字符串和第二个字符串恰当的偏移量,从而实现对象的比较
* decodeVIntSize返回变长整形的长度,readVInt表示文本字节数组的长度,加起来就是某个成员的长度
*/
int firstL1 = WritableUtils.decodeVIntSize(b1[s1]) + readVInt(b1, s1);
int firstL2 = WritableUtils.decodeVIntSize(b2[s2]) + readVInt(b2, s2);

//先比较first
int cmp = TEXT_COMPARATOR.compare(b1, s1, firstL1, b2, s2, firstL2);
if(cmp != 0) {
return cmp;
}
//再比较second
return TEXT_COMPARATOR.compare(b1, s1 + firstL1, l1 - firstL1, b2, s2 + firstL2,  l2 - firstL2);
} catch (IOException e) {
throw new IllegalArgumentException();
}
}
}

//注册RawComparator, 这样MapReduce使用TextPair时就会直接调用Comparator
static {
WritableComparator.define(TextPair.class, new Comparator());
}

}


Mapper的实现

/**
* 使用pair的方式,使用自定义了TextPiar Writable对象
*
*/
public static class Co_OccurrenceMatrixMapperWithPair extends Mapper<LongWritable, Text, TextPair, DoubleWritable> {

@Override
public void map(LongWritable inputKey, Text inputValue, Context context)
throws IOException, InterruptedException {

String doc = inputValue.toString();
//这里只是简单的根据正则分词,如果希望准确分词,请使用相关分词包
String reg = "[\\p{P}\\s]";
String[] allTerms = doc.split(reg);
for(int i = 0; i < allTerms.length; i++) {
if((!"".equals(allTerms[i])) && allTerms[i] != null) {
//考虑in-mapper combining
Map<String, Integer> pairMap = new HashMap<String, Integer>();

//取出该单词对应的一定窗口大小内的共现词
String[] termNeighbors = neighborsOfTerm(allTerms[i], i, allTerms, 3);
for(String nbTerm : termNeighbors) {
if((!"".equals(nbTerm)) && nbTerm != null) {
String textPairStr = allTerms[i] + "," + nbTerm;
//in-mapper combining
if(!pairMap.containsKey(textPairStr)) {
pairMap.put(textPairStr, 1);
} else {
pairMap.put(textPairStr, pairMap.get(textPairStr) + 1);
}

}
}
for(Entry<String, Integer> entry: pairMap.entrySet()) {
String[] pairStrs = entry.getKey().split(",");
TextPair textPair = new TextPair(pairStrs[0], pairStrs[1]);
context.write(textPair, new DoubleWritable(entry.getValue()));
}
}

}

}

/**
* 计算某个词在某窗口大小内的共现词
* @param term
* @param allterms
* @return
*/
public String[] neighborsOfTerm(String term, int pos, String[] allterms, int windowSize) {
String[] neighbors = new String[windowSize];
int count = allterms.length;
int j = 0;
int leftOffSet = 0;
int rightOffSet = 0;
if(pos < windowSize / 2) {
leftOffSet = pos;
rightOffSet = windowSize - leftOffSet;
} else if (pos >= count - 1 - windowSize / 2) {
rightOffSet = count - 1 - pos;
leftOffSet = windowSize - rightOffSet;
} else {
leftOffSet = windowSize / 2;
rightOffSet = windowSize - leftOffSet;
}
for(int i = pos - leftOffSet; i <= pos + rightOffSet && i >=0 && i < count; i++) {
if(term != allterms[i] ) {
neighbors[j] = allterms[i];
j ++;
}
}

return neighbors;
}
}


Reducer的实现

public static class Co_OccurrenceMatrixReducerWithPair extends Reducer<TextPair, DoubleWritable, TextPair, DoubleWritable> {
@Override
public void reduce(TextPair inputKey, Iterable<DoubleWritable> inputValues, Context context)
throws IOException, InterruptedException {
int sum = 0;
for(DoubleWritable inC : inputValues) {
sum += inC.get();
}
context.write(inputKey, new DoubleWritable(sum));
}
}


Stripe方式实现

自定义Stripe类

package mp.co_occurrence_matrix;

import java.math.BigDecimal;
import java.util.HashMap;
import java.util.Map;

import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;

/**
*
* @author liupenghe
* 自定义TextStripe类型,其实是继承自Hadoop提供的MapWritable
*
*/

public class TextStripe extends MapWritable {
public TextStripe(){
super();
}

/**
* 重写格式化输出函数,否则以TextOutputFormat输出,文件只会显示该对象的信息
* 如:TextStripe@75123
*/
@Override
public String toString(){
String res = "";
for(Entry<Writable, Writable> entry : this.entrySet()) {
Text key = (Text) entry.getKey();
DoubleWritable value = (DoubleWritable) entry.getValue();
res += key.toString()+ ":" + value.get() + ";";
}
return res;
}

/**
* 添加另一个putAll方法,实现两个TextStripe的相加
* 自带的不知为何不能叠加,没办法,只能自己动手实现了
* @param ts
*/
public void putAll(TextStripe ts) {
for(Entry<Writable, Writable> entry : ts.entrySet()) {
Text tsKey = (Text)entry.getKey();
DoubleWritable tsValue = (DoubleWritable)entry.getValue();
//如果已包含该健,累加值
if(this.containsKey(tsKey)) {
double newValue = ((DoubleWritable)this.get(tsKey)).get() + tsValue.get();
this.put(tsKey, new DoubleWritable(newValue));
} else { //如果不包含该健,则加上
this.put(tsKey, tsValue);
}
}
}


Mapper的实现

/**
* 使用stripe方式,使用自定义的TextStripe Writable对象
*
*/
public static class Co_OccurrenceMatrixMapperWithStripe extends Mapper<LongWritable, Text, Text, TextStripe> {

@Override
public void map(LongWritable inputKey, Text inputValue, Context context)
throws IOException, InterruptedException {
String doc = inputValue.toString();
//这里只是简单的根据正则分词,如果希望准确分词,请使用相关分词包
String reg = "[\\p{P}\\s]";
String[] allTerms = doc.split(reg);
for (int i = 0; i < allTerms.length; i++) {
if((!"".equals(allTerms[i])) && allTerms[i] != null) {
Text outputKey = new Text(allTerms[i]);
//定义一TextStripe存储与该单词共现的词以及频率
TextStripe termTS = new TextStripe();
//取出该单词对应的一定窗口大小内的共现词
String[] termNeighbors = neighborsOfTerm(allTerms[i], i, allTerms, 3);
for(String nbTerm : termNeighbors) {
if((!"".equals(nbTerm)) && nbTerm != null) {
Text co_term = new Text(nbTerm);
//这里其实是做了in-mapper combining
if(!termTS.containsKey(co_term)) {
termTS.put(co_term, new DoubleWritable(1));
} else {
DoubleWritable lastValue = (DoubleWritable) termTS.get(co_term);
double newValue = lastValue.get() + 1.0;
termTS.put(co_term, new DoubleWritable(newValue));
}
}
}
context.write(outputKey, termTS);
}
}
}

/**
* 计算某个词在某窗口大小内的共现词
* @param term
* @param allterms
* @return
*/
public String[] neighborsOfTerm(String term, int pos, String[] allterms, int windowSize) {
String[] neighbors = new String[windowSize];
int count = allterms.length;
int j = 0;
int leftOffSet = 0;
int rightOffSet = 0;
if(pos < windowSize / 2) {
leftOffSet = pos;
rightOffSet = windowSize - leftOffSet;
} else if (pos >= count - 1 - windowSize / 2) {
rightOffSet = count - 1 - pos;
leftOffSet = windowSize - rightOffSet;
} else {
leftOffSet = windowSize / 2;
rightOffSet = windowSize - leftOffSet;
}
for(int i = pos - leftOffSet; i <= pos + rightOffSet && i >=0 && i < count; i++) {
if(term != allterms[i] ) {
neighbors[j] = allterms[i];
j ++;
}
}

return neighbors;
}
}


Reducer的实现

public static class Co_OccurrenceMatrixReducerWithStripe extends Reducer<Text, TextStripe, Text, TextStripe> {

@Override
public void reduce(Text inputKey, Iterable<TextStripe> inputValues, Context context)
throws IOException, InterruptedException {
//创建一表示总和的TexStripe
TextStripe sumStripe = new TextStripe();
for(TextStripe ts : inputValues) {
//将对应的列表加入总列表里
sumStripe.putAll(ts);
}
context.write(inputKey, sumStripe);
}
}


结果对比

Pair方式

Total time spent by all map tasks (ms)=179807
Total time spent by all reduce tasks (ms)=575376
Map output bytes=739261121
Reduce shuffle bytes=18651738
GC time elapsed (ms)=2287
CPU time spent (ms)=153540


Stripe方式

Total time spent by all map tasks (ms)=226361
Total time spent by all reduce tasks (ms)=784184
Map output bytes=607005002
Reduce shuffle bytes=11818684
GC time elapsed (ms)=2809
CPU time spent (ms)=202450


做出对比图





其次我们实现基于共现相对频率的单词共现矩阵的MapReduce实现。

Pair方式

自定义Pair类

这里基本与上述相同,但需要改变一下其中的compareTo方法。

//为了实现reletive frequenc版的共现矩阵,比较方法也要重写,以确保特殊的单词对(word, *)会首先发送到reducer端
@Override
public int compareTo(TextPair tp) {

//先按第一个单词比较
int cmp = this.getFirst().compareTo(tp.getFirst());
if(cmp != 0) {
return cmp;
}

//再比较第二个单词
//如果单词为“*”则说明需排在最前面
if(this.getSecond().toString().equals("*")) {
return -1;
} else if (tp.getSecond().toString().equals("*")) {
return 1;
} else {
return this.getSecond().compareTo(tp.getSecond());
}
}


还需要有一个特殊的Partitioner

//使用pair方式计算相对频率时,也可以不改变TextPair中的hashCode方法,可以重写一个自定义的Partitioner
public static class Co_OccurrenceMatrixWithRFPairPartitioner extends Partitioner<TextPair, DoubleWritable> {
@Override
public int getPartition(TextPair key, DoubleWritable value, int numPartitions) {
return key.getSecond().hashCode() % numPartitions;
}

}


Mapper实现

/**
* 使用pair方式计算相对频率,map端与reducer端均做相应改变
*/
public static class Co_OccurrenceMatrixWithRFPairMapper extends Mapper<LongWritable, Text, TextPair, DoubleWritable> {

@Override
public void map(LongWritable inputKey, Text inputValue, Context context)
throws IOException, InterruptedException {

String doc = inputValue.toString();
//这里只是简单的根据正则分词,如果希望准确分词,请使用相关分词包
String reg = "[\\p{P}\\s]";
String[] allTerms = doc.split(reg);
for(int i = 0; i < allTerms.length; i++) {
if((!"".equals(allTerms[i])) && allTerms[i] != null) {
//取出该单词对应的一定窗口大小内的共现词
String[] termNeighbors = neighborsOfTerm(allTerms[i], i, allTerms, 3);
//考虑in-mapper combining
Map<String, Integer> pairMap = new HashMap<String, Integer>();

//这里作出求频率所需的相应改变,需要把当前的邻接的词的个数发送出去
TextPair totalNeighbors = new TextPair(allTerms[i], "*");   //*号作为特殊记号,排序时被排在最前面,reducer端最先获取到此对
//(word, *)对对应的值是与word共现的所有词的个数
double totalNum = 0.0;

for(String nbTerm : termNeighbors) {
if((!"".equals(nbTerm)) && nbTerm != null) {
totalNum += 1.0;
String textPairStr = allTerms[i] + "," + nbTerm;
//in-mapper combining
if(!pairMap.containsKey(textPairStr)) {
pairMap.put(textPairStr, 1);
} else {
pairMap.put(textPairStr, pairMap.get(textPairStr) + 1);
}

}
}
context.write(totalNeighbors, new DoubleWritable(totalNum));
for(Entry<String, Integer> entry: pairMap.entrySet()) {
String[] pairStrs = entry.getKey().split(",");
TextPair textPair = new TextPair(pairStrs[0], pairStrs[1]);
context.write(textPair, new DoubleWritable(entry.getValue()));
}
}
}

}

/**
* 计算某个词在某窗口大小内的共现词
* @param term
* @param allterms
* @return
*/
public String[] neighborsOfTerm(String term, int pos, String[] allterms, int windowSize) {
String[] neighbors = new String[windowSize];
int count = allterms.length;
int j = 0;
int leftOffSet = 0;
int rightOffSet = 0;
if(pos < windowSize / 2) {
leftOffSet = pos;
rightOffSet = windowSize - leftOffSet;
} else if (pos >= count - 1 - windowSize / 2) {
rightOffSet = count - 1 - pos;
leftOffSet = windowSize - rightOffSet;
} else {
leftOffSet = windowSize / 2;
rightOffSet = windowSize - leftOffSet;
}
for(int i = pos - leftOffSet; i <= pos + rightOffSet && i >=0 && i < count; i++) {
if(term != allterms[i] ) {
neighbors[j] = allterms[i];
j ++;
}
}

return neighbors;
}

}


Reducer实现

public static class Co_OccurrenceMatrixWithRFPairReducer extends Reducer<TextPair, DoubleWritable, TextPair, DoubleWritable> {
/**
* reducer端接受textpair
* 顺序为:(word, *) : {3, 5}
*        (word, word1): {2, 4}
*        (word, word2): {1, 1}
*/

//声明一全局变量,记录与该次共现的所有的词个数
private double totalNum = 0.0;
//声明一变量,表示相对频率
private double rfValue = 0.0;
//声明一变量,记录当前处理到的词
private String currentWord = "#####";

//依次求各个pair的频率
@Override
public void reduce(TextPair inputKey, Iterable<DoubleWritable> inputValues, Context context)
throws IOException, InterruptedException {
String key = inputKey.toString();
String word = key.split(",")[0];
String sign = key.split(",")[1];
//首先将所有共现词的个数求出来
if (sign.equals("*")) {
if (word.equals(currentWord)) {
totalNum += getTotalCount(inputValues);
} else {
currentWord = word;
totalNum = 0.0;
totalNum = getTotalCount(inputValues);
}

}
//求各个共现词的频率
else {
double tempSum = getTotalCount(inputValues);
//求频率
rfValue = tempSum / totalNum;
//保留四位小数
BigDecimal bd = new BigDecimal(rfValue);
rfValue = bd.setScale(4, BigDecimal.ROUND_HALF_UP).doubleValue();
context.write(inputKey, new DoubleWritable(rfValue));
}
}

private int getTotalCount(Iterable<DoubleWritable> values) {
int count = 0;
for(DoubleWritable value: values) {
count += value.get();
}
return count;
}
}


Stripe方式实现

Stripe实现基于相对频率的单词共现矩阵改动较小,只需要改一下Reducer端代码即可。

Reudcer实现

public static class Co_OccurrenceMatrixWithRFStripeReducer extends Reducer<Text, TextStripe, Text, TextStripe> {
/**
*  在reducer端计算相对频率
*/
public void reduce(Text inputKey, Iterable<TextStripe> inputValues, Context context)
throws IOException, InterruptedException {
//创建一表示总和的TexStripe
TextStripe sumStripe = new TextStripe();
for(TextStripe ts : inputValues) {
//将对应的列表加入总列表里
sumStripe.putAll(ts);
}
//计算频率
//统计所有的共现单词数
double totalNumOfWords = 0.0;
for(Entry<Writable, Writable> entry: sumStripe.entrySet()) {
DoubleWritable count = (DoubleWritable) entry.getValue();
totalNumOfWords += count.get();
}
//计算每个单词在所有共现单词中的频率
for(Entry<Writable, Writable> entry: sumStripe.entrySet()) {
Text word = (Text) entry.getKey();
DoubleWritable count = (DoubleWritable) entry.getValue();
double rfValue =  count.get() / totalNumOfWords;
//保留四位小数
BigDecimal bd = new BigDecimal(rfValue);
rfValue = bd.setScale(4, BigDecimal.ROUND_HALF_UP).doubleValue();
//重新放进数据结构中
sumStripe.put(word, new DoubleWritable(rfValue));
}
context.write(inputKey, sumStripe);
}
}


结果对比

Pair方式

Total time spent by all map tasks (ms)=1698777
Total time spent by all reduce tasks (ms)=913924
Map output bytes=968788536
Reduce shuffle bytes=1070082526
GC time elapsed (ms)=9936
CPU time spent (ms)=1220820


Stripe方式

Total time spent by all map tasks (ms)=212706
Total time spent by all reduce tasks (ms)=268947
Map output bytes=1281341916
Reduce shuffle bytes=1309411451
GC time elapsed (ms)=3162
CPU time spent (ms)=305890


做出对比图



内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: