您的位置:首页 > 其它

数据挖掘笔记-关联规则-FPGrowth-MapReduce实现

2014-06-05 17:31 351 查看
前面是单机版的实现,现在通过MapReduce来实现FPGrowth算法,主要用了两个MR,具体过程如下:

1、第一个MR扫描所有数据集统计数据集中的频繁一项集,即每个项的出现次数。

2、读取第一个MR产生的文件,对频繁一项集排序,然后上传到HDFS上。

3、第二个MR扫描所有数据集,并根据第二步产生的排序好的频繁一项集来得出频繁项集。

第二个MR的Map阶段过程:首先根据排好序的频繁一项集将事务数据排好序,然后遍历排好序的事务数据,以频繁项为键,事务数据为值传递给Reduce阶段。

第二个MR的Reduce阶段过程:Reduce节点接收到从Map节点过来的数据,遍历这个频繁项对应的事务数据集,将它们构建起该频繁项的条件FP树。从条件FP树进而得到包含本频繁项的频繁项集。

FPGrowth算法MapReduce简单实现:

public class FPGrowthJob {

private Configuration conf = null;

//频繁一项集生成
public String frequency_1_itemset_gen(String input, String minSupport) {
String output = HDFSUtils.HDFS_TEMP_INPUT_URL + IdentityUtils.generateUUID();
String[] inputArgs = new String[]{input, output, minSupport};
Frequency1ItemSetMR.main(inputArgs);
return output;
}

//频繁一项集排序
public String frequency_1_itemset_sort(String input) {
Map<String, Integer> map = new HashMap<String, Integer>();
SequenceFile.Reader reader = null;
try {
Path dirPath = new Path(input);
Path[] paths = HDFSUtils.getPathFiles(conf, dirPath);
FileSystem fs = FileSystem.get(conf);
reader = new SequenceFile.Reader(fs, paths[0], conf);
Text key = (Text) ReflectionUtils.newInstance(
reader.getKeyClass(), conf);
IntWritable value = new IntWritable();
while (reader.next(key, value)) {
map.put(key.toString(), value.get());
key = new Text();
value = new IntWritable();
}
} catch (IOException e) {
e.printStackTrace();
} finally {
IOUtils.closeQuietly(reader);
}
List<Map.Entry<String, Integer>> entries =
new ArrayList<Map.Entry<String, Integer>>();
for (Map.Entry<String, Integer> entry : map.entrySet()) {
entries.add(entry);
}
//根据出现频次排序项
Collections.sort(entries, new Comparator<Map.Entry<String, Integer>>() {
public int compare(Map.Entry<String, Integer> o1, Map.Entry<String, Integer> o2) {
return ((Integer) o2.getValue()).compareTo((Integer) o1.getValue());
}
});
String output = HDFSUtils.HDFS_TEMP_INPUT_URL + IdentityUtils.generateUUID()
+ File.separator + IdentityUtils.generateUUID();
SequenceFile.Writer writer = null;
try {
Path path = new Path(output);
FileSystem fs = FileSystem.get(conf);
writer = SequenceFile.createWriter(fs, conf, path,
Text.class, IntWritable.class);
for (Map.Entry<String, Integer> entry : entries) {
writer.append(new Text(entry.getKey()), new IntWritable(entry.getValue()));
}
} catch (IOException e) {
e.printStackTrace();
} finally {
IOUtils.closeQuietly(writer);
}
return output;
}

//频繁项集生成
public void frequency_itemset_gen(String input, String output, String sort_input) {
System.out.println("frequency_itemset_gen input: " + input);
System.out.println("frequency_itemset_gen sort input: " + sort_input);
String[] inputArgs = new String[]{input, output, sort_input};
FPGrowthMR.main(inputArgs);
}

public void run(String[] args) {
if (null == conf) conf = new Configuration();
try {
String[] inputArgs = new GenericOptionsParser(
conf, args).getRemainingArgs();
if (inputArgs.length != 3) {
System.out.println("error");
System.out.println("1. input path.");
System.out.println("2. output path.");
System.out.println("3. min support.");
System.exit(2);
}
String fre1_output = frequency_1_itemset_gen(inputArgs[0], inputArgs[2]);
String fre1_sort_output = frequency_1_itemset_sort(fre1_output);
frequency_itemset_gen(inputArgs[0], inputArgs[1], fre1_sort_output);
} catch (Exception e) {
e.printStackTrace();
}
}

public static void main(String[] args) {
FPGrowthJob job = new FPGrowthJob();
long startTime = System.currentTimeMillis();
job.run(args);
long endTime = System.currentTimeMillis();
System.out.println("spend time: " + (endTime - startTime));
}
}

第一个MR很简单就不上了,直接贴第二个MR

public class FPGrowthMR {

private static void configureJob(Job job) {
job.setJarByClass(FPGrowthMR.class);

job.setMapperClass(FPGrowthMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);

job.setReducerClass(FPGrowthReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);

job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
}

public static void main(String[] args) {
Configuration configuration = new Configuration();
try {
String[] inputArgs = new GenericOptionsParser(
configuration, args).getRemainingArgs();
if (inputArgs.length != 3) {
System.out.println("error");
System.out.println("error, please input two path. input and output");
System.out.println("1. input path.");
System.out.println("2. output path.");
System.out.println("3. sort input path.");
System.exit(2);
}
//			configuration.set("mapred.job.queue.name", "q_hudong");
configuration.set("sort.input.path", inputArgs[2]);

Path sortPath = new Path(inputArgs[2]);
DistributedCache.addCacheFile(sortPath.toUri(), configuration);

Job job = new Job(configuration, "FPGrowth Algorithm");

FileInputFormat.setInputPaths(job, new Path(inputArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(inputArgs[1]));

configureJob(job);

System.out.println(job.waitForCompletion(true) ? 0 : 1);
} catch (Exception e) {
e.printStackTrace();
}
}
}

class FPGrowthMapper extends Mapper<LongWritable, Text, Text, Text> {

private List<Map.Entry<String, Integer>> entries = null;

@Override
protected void setup(Context context) throws IOException,
InterruptedException {
super.setup(context);
Configuration conf = context.getConfiguration();
URI[] uris = DistributedCache.getCacheFiles(conf);
Map<String, Integer> map = new HashMap<String, Integer>();
SequenceFile.Reader reader = null;
try {
Path path = new Path(uris[0]);
FileSystem fs = FileSystem.get(conf);
reader = new SequenceFile.Reader(fs, path, conf);
Text key = (Text) ReflectionUtils.newInstance(
reader.getKeyClass(), conf);
IntWritable value = new IntWritable();
while (reader.next(key, value)) {
map.put(key.toString(), value.get());
key = new Text();
value = new IntWritable();
}
} catch (IOException e) {
e.printStackTrace();
} finally {
IOUtils.closeQuietly(reader);
}
entries = new ArrayList<Map.Entry<String, Integer>>();
for (Map.Entry<String, Integer> entry : map.entrySet()) {
entries.add(entry);
}
}

@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
StringTokenizer tokenizer = new StringTokenizer(value.toString());
tokenizer.nextToken();
List<String> results = new ArrayList<String>();
while (tokenizer.hasMoreTokens()) {
String token = tokenizer.nextToken();
String[] items = token.split(",");
for (Map.Entry<String, Integer> entry : entries) {
String eKey = entry.getKey();
for (String item : items) {
if (eKey.equals(item)) {
results.add(eKey);
break;
}
}
}
}
String[] values = results.toArray(new String[0]);
StringBuilder sb = new StringBuilder();
for (String v : values) {
sb.append(v).append(",");
}
if (sb.length() > 0) sb.deleteCharAt(sb.length() - 1);
for (String v : values) {
context.write(new Text(v), new Text(sb.toString()));
}
}

@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
super.cleanup(context);
}
}

class FPGrowthReducer extends Reducer<Text, Text, Text, IntWritable> {

@Override
protected void setup(Context context) throws IOException, InterruptedException {
super.setup(context);
}

@Override
protected void reduce(Text key, Iterable<Text> values,
Context context) throws IOException, InterruptedException {
String keyItem = key.toString();
System.out.println("key: " + keyItem);
Data data = new Data();
for (Text value : values) {
Instance instance = new Instance();
StringTokenizer tokenizer = new StringTokenizer(value.toString());
String token = tokenizer.nextToken();
String[] items = token.split(",");
List<String> temp = new ArrayList<String>();
for (String item : items) {
if (keyItem.equals(item)) {
break;
}
temp.add(item);
}
instance.setValues(temp.toArray(new String[0]));
data.getInstances().add(instance);
}
context.write(new Text(keyItem), new IntWritable(data.getInstances().size()));
FPGrowthBuilder fpBuilder = new FPGrowthBuilder();
fpBuilder.build(data, null);
List<List<ItemSet>> frequencies = fpBuilder.obtainFrequencyItemSet();
for (List<ItemSet> frequency : frequencies) {
for (ItemSet itemSet : frequency) {
StringBuilder sb = new StringBuilder();
for (String i : itemSet.getItems()) {
sb.append(i).append(",");
}
sb.append(keyItem);
context.write(new Text(sb.toString()), new IntWritable(itemSet.getSupport()));
}
}
}

@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
super.cleanup(context);
}

}

HDFS上查看结果如下:

尿布	5
莴苣,尿布	4
葡萄酒,尿布	4
豆奶,尿布	3
橙汁,尿布	2
莴苣,葡萄酒,尿布	3
莴苣,豆奶,尿布	2
葡萄酒,豆奶,尿布	2
橙汁,豆奶,尿布	2
橙汁	2
豆奶,橙汁	2
莴苣	5
豆奶,莴苣	3
葡萄酒	4
莴苣,葡萄酒	3
豆奶,葡萄酒	2
豆奶	4


 代码托管:https://github.com/fighting-one-piece/repository-datamining.git
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
相关文章推荐