您的位置:首页 > 其它

mahout中贝叶斯算法之训练模型

2017-03-02 11:15 337 查看

1. 生成模型

训练样本,对应类org.apache.mahout.classifier.naivebayes.training.TrainNaiveBayesJob。输入是经过mahout
seqdirectory和mahout seq2sparse向量化的序列化文件,输出是一个model(训练器)。具体流程:

(1)叠加所有相同label的tfidf vector

通过TrainNaiveBayesJob-IndexInstancesMapper-Reducer实现。该mr使用到了缓存文件labelindex,labelindex存储每个label对应的id。生成的文件是放到summedObservations临时目录下面。这一个过程实际上是得到了每个label下的feature的得分。

(2)叠加上一步中所有label和所有feature的向量

通过TrainNaiveBayesJob-WeightsMapper-Reducer实现,输出到tmp的weights目录下面。将(1)中所有vector相加,得到每个Feature的权重向量weightsPerFeature;将(1)中每个label对应的所有vector的所有元素相加,得到每个Label的权重向量weightsPerLabel。这一过程实际上是计算每个label的得分和每个feature在所有label下的得分。

(3)建立model

新建一个Matrix scoresPerLabelAndFeature,这是一个二维的矩阵,行是weightsPerLabel的大小,列是weightsPerFeature的大小,然后将(1)中生成的每个label下的每个feature的得分填充这个scoresPerLabelAndFeature,这样scoresPerLabelAndFeature实际上就是每个feature在每个label下的得分,加上(2)中生成的weightsPerFeature、weightsPerLabel就可以构造一个NaiveBayesModel,写入到hadoop的model/naiveBayesModel.bin中。

publicfinalclass TrainNaiveBayesJob extends AbstractJob
{

privatestaticfinal String TRAIN_COMPLEMENTARY = "trainComplementary";

privatestaticfinal String ALPHA_I = "alphaI";

privatestaticfinal String LABEL_INDEX = "labelIndex";

privatestaticfinal String EXTRACT_LABELS = "extractLabels";

privatestaticfinal String LABELS = "labels";

publicstaticfinal String WEIGHTS_PER_FEATURE = "__SPF";

publicstaticfinal String WEIGHTS_PER_LABEL = "__SPL";

publicstaticfinal String LABEL_THETA_NORMALIZER = "_LTN";

publicstaticfinal String SUMMED_OBSERVATIONS = "summedObservations";

publicstaticfinal String WEIGHTS = "weights";

publicstaticfinal String THETAS = "thetas";

publicstaticvoid main(String[] args) throws Exception
{

ToolRunner.run(new Configuration(), new TrainNaiveBayesJob(), args);

}

@Override

publicint run(String[] args) throws Exception
{

Path labPath;

String labPathStr =
getOption(LABEL_INDEX);

if (labPathStr != null)
{

labPath = new Path(labPathStr);

} else {

labPath =
getTempPath(LABEL_INDEX);

}

//生成类索引,即给每个类对应一个编号

longlabelSize = createLabelIndex(labPath);

floatalphaI =
Float.parseFloat(getOption(ALPHA_I));

booleantrainComplementary =
hasOption(TRAIN_COMPLEMENTARY);

HadoopUtil.setSerializations(getConf());

//将包含类与类编号的文件放到分布式缓存中

HadoopUtil.cacheFiles(labPath,
getConf());

//参照类编号文件,将输入key,value中的key转化成对应的编号

//并对相同类编号的向量累加

Job indexInstances =
prepareJob(getInputPath(),

getTempPath(SUMMED_OBSERVATIONS),

SequenceFileInputFormat.class,

IndexInstancesMapper.class,

IntWritable.class,

VectorWritable.class,

VectorSumReducer.class,

IntWritable.class,

VectorWritable.class,

SequenceFileOutputFormat.class);

indexInstances.setCombinerClass(VectorSumReducer.class);

booleansucceeded = indexInstances.waitForCompletion(true);

if (!succeeded)
{

return -1;

}

//将上面的结果相加,并将其key标为__SPF, 将上面每个类对应的向量中的元素相加

//并加入到以__SPL为key的向量中,在向量中的位置为该类的编号

Job weightSummer =
prepareJob(getTempPath(SUMMED_OBSERVATIONS),

getTempPath(WEIGHTS),

SequenceFileInputFormat.class,

WeightsMapper.class,

Text.class,

VectorWritable.class,

VectorSumReducer.class,

Text.class,

VectorWritable.class,

SequenceFileOutputFormat.class);

weightSummer.getConfiguration().set(WeightsMapper.NUM_LABELS,
String.valueOf(labelSize));

weightSummer.setCombinerClass(VectorSumReducer.class);

succeeded = weightSummer.waitForCompletion(true);

if (!succeeded)
{

return -1;

}

//将上面生成的结果,放到分布式缓存中

HadoopUtil.cacheFiles(getTempPath(WEIGHTS),
getConf());

//将alphaI参数写入配置文件中

getConf().setFloat(ThetaMapper.ALPHA_I, alphaI);

//生成贝叶斯模型

NaiveBayesModel naiveBayesModel = BayesUtils.readModelFromDir(getTempPath(),
getConf());

naiveBayesModel.validate();

//将贝叶斯模型写到文件中

naiveBayesModel.serialize(getOutputPath(),
getConf());

return 0;

}

privatelongcreateLabelIndex(Path labPath) throws IOException
{

longlabelSize =
0;

if (hasOption(LABELS))
{

Iterable<String> labels =
Splitter.on(",").split(getOption(LABELS));

labelSize =
BayesUtils.writeLabelIndex(getConf(), labels, labPath);

} elseif (hasOption(EXTRACT_LABELS))
{

Iterable<Pair<Text,IntWritable>> iterable =

new SequenceFileDirIterable<Text,
IntWritable>(getInputPath(),

PathType.LIST,

PathFilters.logsCRCFilter(),

getConf());

//将类与编号写入文件,并返回类的个数

labelSize =
BayesUtils.writeLabelIndex(getConf(), labPath, iterable);

}

returnlabelSize;

}

}

1.1.BayesUtils.readModelFromDir

//从base路径中读取上一个mr生成的结果,并初始化贝叶斯模型

publicstaticNaiveBayesModelreadModelFromDir(Path base,
Configuration conf) {

floatalphaI = conf.getFloat(ThetaMapper.ALPHA_I,
1.0f);

Vector scoresPerLabel = null;

Vector scoresPerFeature = null;

for (Pair<Text,VectorWritable> record : new SequenceFileDirIterable<Text,
VectorWritable>(

new Path(base,
TrainNaiveBayesJob.WEIGHTS),
PathType.LIST,
PathFilters.partFilter(), conf))
{

String key = record.getFirst().toString();

VectorWritable value = record.getSecond();

if (key.equals(TrainNaiveBayesJob.WEIGHTS_PER_FEATURE))
{

scoresPerFeature = value.get();

} elseif (key.equals(TrainNaiveBayesJob.WEIGHTS_PER_LABEL))
{

scoresPerLabel = value.get();

}

}

Preconditions.checkNotNull(scoresPerFeature);

Preconditions.checkNotNull(scoresPerLabel);

Matrix scoresPerLabelAndFeature = new SparseMatrix(scoresPerLabel.size(), scoresPerFeature.size());

for (Pair<IntWritable,VectorWritable> entry : new SequenceFileDirIterable<IntWritable,VectorWritable>(

new Path(base,
TrainNaiveBayesJob.SUMMED_OBSERVATIONS),
PathType.LIST,
PathFilters.partFilter(), conf))
{

scoresPerLabelAndFeature.assignRow(entry.getFirst().get(), entry.getSecond().get());

}

Vector perlabelThetaNormalizer = scoresPerLabel.like();

returnnewNaiveBayesModel(scoresPerLabelAndFeature, scoresPerFeature, scoresPerLabel, perlabelThetaNormalizer,

alphaI);

}

1.2.naiveBayesModel.serialize

publicvoid serialize(Path output,
Configuration conf) throws IOException
{

FileSystem fs = output.getFileSystem(conf);

//输出到文件output/naiveBayesModel.bin中

FSDataOutputStream out = fs.create(new Path(output, "naiveBayesModel.bin"));

try {

out.writeFloat(alphaI);

VectorWritable.writeVector(out, weightsPerFeature);

VectorWritable.writeVector(out, weightsPerLabel);

VectorWritable.writeVector(out, perlabelThetaNormalizer);

for (introw =
0; row < weightsPerLabelAndFeature.numRows(); row++)
{

VectorWritable.writeVector(out, weightsPerLabelAndFeature.viewRow(row));

}

} finally {

Closeables.close(out, false);

}

}

2. 测试模型

使用训练器进行分类。org.apache.mahout.classifier.naivebayes.test.TestNaiveBayesDriver。该过程只有一个MR 叫TestNaiveBayesDriver-BayesTestMapper-Reducer。它的输出key是期望的label,而value是在每一个label下的打分。首先将上面生成的model读出来,建立NaiveBayesModel对象,然后就可以计算待分类向量在每个label下的得分了。根据贝叶斯的特征独立性特点,它实际上是将待分类向量的每个feature在这个label下的分值相加了(因为feature在label下的打分使用log)。根据选择的分类器是ComplementaryNaiveBayesClassifier还是StandardNaiveBayesClassifier, feature在label下的打分的计算公式不一样:

(1)StandardNaiveBayesClassifier:For
Bayes

Weight = Log [ ( W-N-Tf-Idf + alpha_i ) / ( Sigma_k + N ) ]

其中

featureLabelWeight即W-N-Tf-Idf

labelWeight即Sigma_k

numFeatures即N

(2)ComplementaryNaiveBayesClassifier:For
CBayes

Weight = Log [ ( Sigma_j - W-N-Tf-Idf + alpha_i ) / ( Sigma_jSigma_k - Sigma_k + N ) ]

其中

featureWeight即Sigma_j

featureLabelWeight即W-N-Tf-Idf

labelWeight即Sigma_k

totalWeight即Sigma_jSigma_k,是labelWeight中各项之和。

numFeatures即N

publicclass TestNaiveBayesDriver extends AbstractJob
{

privatestaticfinal Logger log =
LoggerFactory.getLogger(TestNaiveBayesDriver.class);

publicstaticfinal String COMPLEMENTARY = "class"; //b
for bayes, c for complementary

privatestaticfinal Pattern SLASH =
Pattern.compile("/");

publicstaticvoid main(String[] args) throws Exception
{

ToolRunner.run(new Configuration(), new TestNaiveBayesDriver(), args);

}

@Override

publicint run(String[] args) throws Exception
{

booleancomplementary =
hasOption("testComplementary");

booleansequential =
hasOption("runSequential");

//sequential =true 串行进行分类

//否则,并行分类

if (sequential)
{

FileSystem fs =
FileSystem.get(getConf());

NaiveBayesModel model =
Na
4000
iveBayesModel.materialize(new Path(getOption("model")),
getConf());

AbstractNaiveBayesClassifier classifier;

//选折ComplementaryNaiveBayesClassifier分类器

//还是StandardNaiveBayesClassifier分类器

if (complementary)
{

classifier = new ComplementaryNaiveBayesClassifier(model);

} else {

classifier = new StandardNaiveBayesClassifier(model);

}

SequenceFile.Writer writer =

new SequenceFile.Writer(fs,
getConf(), getOutputPath(), Text.class,
VectorWritable.class);

Reader reader = new Reader(fs,
getInputPath(), getConf());

Text key = new Text();

VectorWritable vw = new VectorWritable();

while (reader.next(key, vw))
{

//Key是真实的类别,value是在不同类别下的打分,组成的向量

writer.append(new Text(SLASH.split(key.toString())[1]),

new VectorWritable(classifier.classifyFull(vw.get())));

}

writer.close();

r
1162c
eader.close();

} else {

booleansucceeded = runMapReduce(parsedArgs);

if (!succeeded)
{

return -1;

}

}

//加载类与类编号

Map<Integer, String> labelMap =
BayesUtils.readLabelIndex(getConf(), new Path(getOption("labelIndex")));

//loop
over the results and create the confusion matrix

SequenceFileDirIterable<Text, VectorWritable> dirIterable =

new SequenceFileDirIterable<Text,
VectorWritable>(getOutputPath(),

PathType.LIST,

PathFilters.partFilter(),

getConf());

ResultAnalyzer analyzer = new ResultAnalyzer(labelMap.values(), "DEFAULT");

analyzeResults(labelMap, dirIterable, analyzer);

log.info("{}
Results: {}", complementary ? "Complementary" : "Standard
NB", analyzer);

return 0;

}

//并行打分

privateboolean runMapReduce(Map<String,
List<String>> parsedArgs) throws IOException,

InterruptedException, ClassNotFoundException {

Path model = new Path(getOption("model"));

HadoopUtil.cacheFiles(model,
getConf());

//the
output key is the expected value, the output value are the scores for all the labels

Job testJob =
prepareJob(getInputPath(), getOutputPath(), SequenceFileInputFormat.class,
BayesTestMapper.class,

Text.class,
VectorWritable.class,
SequenceFileOutputFormat.class);

booleancomplementary =
hasOption("testComplementary"); //or complementary
= parsedArgs.containsKey("--testComplementary");

testJob.getConfiguration().set(COMPLEMENTARY,
String.valueOf(complementary));

returntestJob.waitForCompletion(true);

}

privatestaticvoidanalyzeResults(Map<Integer,
String> labelMap,

SequenceFileDirIterable<Text, VectorWritable> dirIterable,

ResultAnalyzer analyzer)
{

for (Pair<Text,
VectorWritable> pair : dirIterable)
{

intbestIdx =
Integer.MIN_VALUE;

doublebestScore =
Long.MIN_VALUE;

for (Vector.Element element : pair.getSecond().get().all())
{

if (element.get()
> bestScore) {

bestScore = element.get();

bestIdx = element.index();

}

}

if (bestIdx !=
Integer.MIN_VALUE)
{

ClassifierResult classifierResult = new ClassifierResult(labelMap.get(bestIdx), bestScore);

analyzer.addInstance(pair.getFirst().toString(), classifierResult);

}

}

}

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息