您的位置:首页 > 运维架构 > Linux

linux 命令行如何模拟map-reduce的实现过程

2013-11-13 12:43 274 查看
提起map-reduce ,典型的示例就是词频统计,例如统计词典中单词出现的次数等

为什么总用这个示例来说明呢,其实map-reduce就是一个shell 命令的串联过程,只不过用集群的多机来处理,

例如有  test.data , map , reduce测试数据和程序文件

则map-reduce可以本地概括为:  cat test.data | ./map | sort -k1 | ./reduce     > res.data

这个用管道串起来的命令行,之所以map输出和reduce间有个按第一列排序,其实这个就是map-reduce间的关系, map的输出作为reduce的输入,但这中间隐含了sort过程(按第一列)

所以总是用词频统计来介绍map-reduce的使用,另外本地写完map-reduce程序时也可以本地测一下,预期正常的话可以提到hadoop上进行任务处理

//////////////////////////  res.data   ////////////////

a|1    2

a|3    2

b|2    1

b|4    1

c|2    1

d|1    1

e|5    2

//////////////////////////////////   test.data     /////////////////

a    1

b    2

a    3

b    4

a    1

c    2

a    3

d    1

e    5

e    5

///////////////////////////////////////          map.cpp             ///////////////////////////

#include <iostream>

#include <vector>

#include <boost/regex.hpp>

#include <boost/algorithm/string.hpp>

#include <boost/algorithm/string/split.hpp>

#include <boost/algorithm/string/classification.hpp>

using namespace std;

using namespace boost;

vector<string> svec;

/*-----------------------------------------------------------

------------------------------------------------------------*/

void process(std::string str) {

        /*do-something*/

        boost::split( svec, str, boost::is_any_of( "\t" ), boost::token_compress_on );

        if(2 == svec.size()){

                trim(svec[0]);

                trim(svec[1]);

                printf("%s|%s\n", (svec[0]).c_str(), (svec[1]).c_str());

        }

}

/*-----------------------------------------------------------

------------------------------------------------------------*/

int main(int argc, char** argv) {

        const int BUF_SIZE = 1024;

        char buf[BUF_SIZE] = {0};

        boost::regex reg(".+");

        while(!feof(stdin)) {

                if(NULL == fgets(buf, BUF_SIZE, stdin)){

                        continue;

                }

                bool yn = boost::regex_match(buf, reg);

                if( yn ){

                        process(buf);

                }

        }

        return 0;
}                          

////////////////////////    reduce.cpp    ///////////////////////

#include <iostream>

#include <map>

#include <boost/algorithm/string.hpp>

#include <boost/algorithm/string/split.hpp>

#include <boost/algorithm/string/classification.hpp>

using namespace std;

using namespace boost;

typedef map<string,int> smap;

typedef map<string,int>::iterator mapiter;

smap keyval;

mapiter mit;

/*-----------------------------------------------------------

------------------------------------------------------------*/

void process() {

    /*do-something*/

    for(mit=keyval.begin(); mit != keyval.end(); ++mit) {

        cout<<mit->first<<"\t"<<mit->second<<endl;

    }

    keyval.clear();

    return;

}

/*-----------------------------------------------------------

------------------------------------------------------------*/

int main(int argc, char** argv) {

    const int BUF_SIZE = 1024;

    char buf[BUF_SIZE];

    string key, last_str;

    const int default_val = 1;

    while(!feof(stdin)) {

        if(NULL == fgets(buf, BUF_SIZE, stdin)){

            continue;

        }

        key = buf;

                trim(key);

        if( key == last_str ) {//exist

            mit = keyval.find( key );

            if( keyval.end() == mit ) {//first log

                keyval.insert(pair<string,int>(key, default_val));

            } else {//update value

                int tmpval = default_val + mit->second;

                keyval.erase(mit);

                keyval.insert(pair<string,int>(key,tmpval));

            }

        } else {

            last_str = key;

            process();

            keyval.insert(pair<string,int>(key, default_val));

        }

    }

    //process last buf

    process();

    return 0;

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: