您的位置:首页 > 其它

[MIT 6.824 Distributed System] Lab 1: MapReduce (2016)

2016-05-23 09:48 507 查看
MIT分布式系统课程实验1:Lab 1: MapReduce

以下是我自己实现的版本,与大家分享一下,有问题欢迎提出,也希望各位指出错误!

common.go
里面可以打开调试:

// Debugging enabled?
const debugEnabled = true


Overview



Part I: Map/Reduce input and output

第一部分主要是实现文件读写,读写内容当然就是key/value了。

假设,

M:Map的数目,也就是将数据集split成M份,分配给M个 Mappers 处理。如上图(3)read。

R: Reduce的数目,也即有R个Reducers,最后有R个输出文件。每个 Mapper 会将读入的key/value 数据写到R份中间文件中,也就是分配给R个Reducers。

common_map.go


我用了一个数组存放输出文件的指针,一次过读入输入文件,用mapF函数生成key/value。遍历key/value,将每个key/value哈希到不同的输出文件中。

// doMap does the job of a map worker: it reads one of the input files
// (inFile), calls the user-defined map function (mapF) for that file's
// contents, and partitions the output into nReduce intermediate files.
func doMap(
jobName string, // the name of the MapReduce job
mapTaskNumber int, // which map task this is
inFile string,
nReduce int, // the number of reduce task that will be run ("R" in the paper)
mapF func(file string, contents string) []KeyValue,
) {
// TODO:
// You will need to write this function.
// You can find the filename for this map task's input to reduce task number
// r using reduceName(jobName, mapTaskNumber, r). The ihash function (given
// below doMap) should be used to decide which file a given key belongs into.
//
// The intermediate output of a map task is stored in the file
// system as multiple files whose name indicates which map task produced
// them, as well as which reduce task they are for. Coming up with a
// scheme for how to store the key/value pairs on disk can be tricky,
// especially when taking into account that both keys and values could
// contain newlines, quotes, and any other character you can think of.
//
// One format often used for serializing data to a byte stream that the
// other end can correctly reconstruct is JSON. You are not required to
// use JSON, but as the output of the reduce tasks *must* be JSON,
// familiarizing yourself with it here may prove useful. You can write
// out a data structure as a JSON string to a file using the commented
// code below. The corresponding decoding functions can be found in
// common_reduce.go.
//
//   enc := json.NewEncoder(file)
//   for _, kv := ... {
//     err := enc.Encode(&kv)
//
// Remember to close the file after you have written all the values!

fi, err := ioutil.ReadFile(inFile)
if err != nil{
log.Fatal("doMap - inFile error: ", err)
}

keyValue := mapF(inFile, string(fi))

var encs []*json.Encoder
for r:=0; r < nReduce; r++ {
// Create intermediate file.
outfile, err := os.Create(reduceName(jobName, mapTaskNumber, r))
if err != nil {
log.Fatal("doMap - outfile cannot open: ", err)
}
defer outfile.Close()

// Create encoder.
enc := json.NewEncoder(outfile)
encs = append(encs, enc)
}
for _, kv := range keyValue {
r := ihash(kv.Key) % uint32(nReduce)
// Choose the r-th file to write encode key value.
encs[r].Encode(&kv)
}
}

func ihash(s string) uint32 {
h := fnv.New32a()
h.Write([]byte(s))
return h.Sum32()
}


2.
common_reduce.go


每个reducer读入M个中间文件,用一个哈希表kvMap存放(key, values[])。最后遍历kvMap,用reduceF函数生成key/value,写到mergeFile中。

// doReduce does the job of a reduce worker: it reads the intermediate
// key/value pairs (produced by the map phase) for this task, sorts the
// intermediate key/value pairs by key, calls the user-defined reduce function
// (reduceF) for each key, and writes the output to disk.
func doReduce(
jobName string, // the name of the whole MapReduce job
reduceTaskNumber int, // which reduce task this is
nMap int, // the number of map tasks that were run ("M" in the paper)
reduceF func(key string, values []string) string,
) {
// TODO:
// You will need to write this function.
// You can find the intermediate file for this reduce task from map
4000
task number
// m using reduceName(jobName, m, reduceTaskNumber).
// Remember that you've encoded the values in the intermediate files, so you
// will need to decode them. If you chose to use JSON, you can read out
// multiple decoded values by creating a decoder, and then repeatedly calling
// .Decode() on it until Decode() returns an error.
//
// You should write the reduced output in as JSON encoded KeyValue
// objects to a file named mergeName(jobName, reduceTaskNumber). We require
// you to use JSON here because that is what the merger than combines the
// output from all the reduce tasks expects. There is nothing "special" about
// JSON -- it is just the marshalling format we chose to use. It will look
// something like this:
//
// enc := json.NewEncoder(mergeFile)
// for key in ... {
//  enc.Encode(KeyValue{key, reduceF(...)})
// }
// file.Close()

// Read from intermediate files and put into kvMap.
kvMap := make(map[string][]string)  // pair (Key, []Values)
for m:=0; m < nMap; m++ {
// Open intermediate file.
fi, err := os.Open(reduceName(jobName, m, reduceTaskNumber))
if err != nil {
log.Fatal("doReduce 2: ", err)
}
defer fi.Close()

// Decoder
dec := json.NewDecoder(fi)
// Decode
for {
var kv KeyValue
if err := dec.Decode(&kv); err == io.EOF {
break
} else if err != nil {
log.Fatal(err)
}
// Put into kvMap.
kvMap[kv.Key] = append(kvMap[kv.Key], kv.Value)
}
}

// Create merge file.
mergeFile, err := os.Create(mergeName(jobName, reduceTaskNumber))
if err != nil {
log.Fatal("doReduce 1: ", err)
}
defer mergeFile.Close()

// Write merge file.
enc := json.NewEncoder(mergeFile)
for key, values := range kvMap {
enc.Encode(KeyValue{key, reduceF(key, values)})
}
}


Part II: Single-worker word count

这部分比较简单。

wc.go


package main

import (
"fmt"
"mapreduce"
"os"
"strings"
"unicode"
"strconv"
)

// The mapping function is called once for each piece of the input.
// In this framework, the key is the name of the file that is being processed,
// and the value is the file's contents. The return value should be a slice of
// key/value pairs, each represented by a mapreduce.KeyValue.
func mapF(document string, value string) (res []mapreduce.KeyValue) {
// TODO: you have to write this function

// Delimeter function: only pass letter and number.
f := func (c rune) bool  {
return !unicode.IsLetter(c) && !unicode.IsNumber(c)
}
// Split words.
words := strings.FieldsFunc(value, f)
for _, w := range words {
kv := mapreduce.KeyValue {w, ""}
res = append(res, kv)
}
return
}

// The reduce function is called once for each key generated by Map, with a
// list of that key's string value (merged across all inputs). The return value
// should be a single output value for that key.
func reduceF(key string, values []string) string {
// TODO: you also have to write this function

// Count the values.
return strconv.Itoa(len(values))
}


Part III: Distributing MapReduce tasks

schedule.go


从registerChannel中拿到一个可用的worker,然后用RPC调用它。如果成功完成,将它重新放入到registerChannel中(由于registerChannel是unbuffered的,所以要开一个线程来放入,否则会被阻塞);如果失败,重新分配一个worker再做一次。

package mapreduce

import "fmt"

// schedule starts and waits for all tasks in the given phase (Map or Reduce).
func (mr *Master) schedule(phase jobPhase) {
var ntasks int
var nios int // number of inputs (for reduce) or outputs (for map)
switch phase {
case mapPhase:
ntasks = len(mr.files)
nios = mr.nReduce
case reducePhase:
ntasks = mr.nReduce
nios = len(mr.files)
}

fmt.Printf("Schedule: %v %v tasks (%d I/Os)\n", ntasks, phase, nios)

// All ntasks tasks have to be scheduled on workers, and only once all of
// them have been completed successfully should the function return.
// Remember that workers may fail, and that any given worker may finish
// multiple tasks.
//
// TODO TODO TODO TODO TODO TODO TODO TODO TODO TODO TODO TODO TODO
//

switch phase {
case mapPhase:
for i:=0; i < ntasks; i++ {
// Get a worker from register channel.
worker := <- mr.registerChannel
// Set arguments.
args := new(DoTaskArgs)
args.Phase = phase
args.JobName = mr.jobName
args.NumOtherPhase = nios
args.File = mr.files[i]
args.TaskNumber = i
// RPC to ask worker to do task.
ok := call(worker,
"Worker.DoTask", args, new(struct{}))
if ok {
// If success, such worker is reusable.
// So, put it back to register channel.
go func ()  {
mr.registerChannel <- worker
}()
} else{
// RPC failed, re-allocate the i-th task.
i = i -1
fmt.Printf("Worker doMap failed.\n")
}
}
case reducePhase:
for i:=0; i < ntasks; i++ {
worker := <- mr.registerChannel

args := new(DoTaskArgs)
args.Phase = phase
args.JobName = mr.jobName
args.TaskNumber = i
args.NumOtherPhase = nios

ok := call(worker,
"Worker.DoTask", args, new(struct{}))
if ok {
go func ()  {
mr.registerChannel <- worker
}()
} else{
i = i - 1
fmt.Printf("Worker doReduce failed.\n")
}
}
}

fmt.Printf("Schedule: %v phase done\n", phase)
}


Part IV: Handling worker failures

代码如上。也就是对RPC返回错误进行了处理。

测试结果

smtech@smtech-MacBook-Air ~/p/6/s/main> sh ./test-mr.sh

==> Part I
ok      mapreduce   3.715s

==> Part II
Passed test

==> Part III
ok      mapreduce   3.129s

==> Part IV
ok      mapreduce   13.613s
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  分布式 mapreduce MIT