您的位置:首页 > 其它

基于pyspark的对KDD-99数据集的聚类分析实验

2016-05-06 10:49 190 查看
官话套话不想讲,介绍也不想打,都知道pyspark和KDD-99是啥吧?

不知道的话...点这里1

或者这里2

转载记得注明出处

/article/10057918.html

pyspark本身是用Scala语言编写的,而Scala语言呢又是Java的变形状态,虽说spark也支持Python,但是还是没有Scala支持的好,对于pyspark的书也很少.

所以恰好前几天研究了一些,现在跟大家分享交流一下吧.

首先我是用替换后的kdd-99-10-precent文件,怎么替换文件,看这里 替换文件

然后结果大概有个70多M的样子,

我们先打开spark.

使用terminal进入spark的主目录

然后输入IPYTHON=1 ./bin/pyspark

对,没错,我用了IPython, IPython是啥?点这里IPython

然后将你替换后的文件放进spark主目录的新建文件夹1

回到terminal中导入模块

from pyspark import SparkContext, SparkConf
from pyspark.mllib.clustering import KMeans
from pyspark.mllib.feature import StandardScaler
from numpy import array
from math import sqrt


然后定义四个函数

def parse_interaction(line):
line_split = line.split(",")
clean_line_split = line_split[0:-1]
return (line_split[-1], array([float(x) for x in clean_line_split]))

def distance(a, b):
return sqrt(a.zip(b).map(lambda x: (x[0]-x[1])).map(lambda x: x*x).reduce(lambda a,b: a+b))

def dist_to_centroid(datum, clusters):
cluster = clusters.predict(datum)
centroid = clusters.centers[cluster]
return sqrt(sum([x**2 for x in (centroid - datum)]))

def clustering_score(data, k):
clusters = KMeans.train(data, k, maxIterations=10, runs=5, initializationMode="random")
result = (k, clusters, data.map(lambda datum: dist_to_centroid(datum, clusters)).mean())
print "Clustering score for k=%(k)d is %(score)f" % {"k": k, "score": result[2]}
return result


第一个函数parse_interaction(line),是将文件最后那个labels的小尾巴切下来的

第二个函数distance(a, b),一看也知道,是计算两个点之间的空间距离的

第三个函数dist_to_centroid(datum, clusters)是用来计算每个点到聚类中心的距离的

第四个函数clustering_score(data, k)就是所有函数中的中心函数了,它训练kmeans,然后调用第三个函数计算,最后返回一个result包含了这次聚类训练用的k值和中心点的坐标和所有点到中心点距离的平均值.

本文所有的工作都是围绕这个函数进行的.

函数定义完之后,开始前期的数据准备工作

定义一下

max_k = 30
data_file = "1/result"


解释一下,max_k是对k值选取测试时候将要使用测试k值的最大值,如果你计算资源丰富的话也可以选100,150之类的,这里暂定为30

然后data_file是替换文件的位置.刚刚说的在spark主目录下新建一个文件夹1,然后将替换后的文件放进去就行

raw_data = sc.textFile(data_file)


载入数据

labels = raw_data.map(lambda line: line.strip().split(",")[-1])


这一步就是将数据集中标签文本切出来

方便以后的比对工作

parsed_data = raw_data.map(parse_interaction)
parsed_data_values = parsed_data.values().cache()


这一步就是将数据进行切割,将不能计算的labels项切掉

然后将数据读入内寸中

然后数据中的数据矩阵看上去很稀疏,现在我们将其转化为密集矩阵

standardizer = StandardScaler(True, True)
standardizer_model = standardizer.fit(parsed_data_values)
standardized_data_values = standardizer_model.transform(parsed_data_values)


使用Spark的规整化函数将数据密集话

然后得到规整化后的数据standardized_data_values

然后就是将数据进行训练了

scores = map(lambda k: clustering_score(standardized_data_values, k), range(10, max_k+1, 10))


这一步里面的range函数中有三个参数10, max_k+1,10

还是那句话,如果你计算资源丰富的话,可以将最后那个10改为5或者直接删掉

那个10的意义是在10到max_k+1的范围内的步伐数

就是一步跳个10步的样子

第一次k取10,第二次就直接跳到20了.

然后输出一下

min_k = min(scores, key=lambda x: x[2])[0]
print "Best k value is %(best_k)d" % {"best_k": min_k}


best_k就是最好的k取值

还记得那个函数的返回值不

现在我们将其中训练好的model提取出来

best_model = min(scores, key=lambda x: x[2])[1]


这里提取出的最好模块,也就是最佳k值训练出来的model

最后就是聚类运算kmeans

cluster_assignments_sample = standardized_data_values.map(lambda datum: str(best_model.predict(datum))+","+",".join(map(str,datum)))


然后保存计算结果

cluster_assignments_sample.saveAsTextFile("standardized")
labels.saveAsTextFile("labels")


standardized目录下生成的文件



labels目录下生成的文件也和上面的名字一样,只是在的目录不同罢了

standardized目录下的文件中,每一个文件都和labels目录中的文件一一对应,每个文件中的每一行也是一一对应

下面我们将standardized中文件按照顺序改为



好吧,我知道名字很烂,但是简单...

哈哈

然后labels目录下的也是改名字



改名之后将所有除了_SUCCESSS的文件,拖入一个单独文件中



忽视那个gg开头的文件,那是运行后的输出文件

你也看到那个ll.py了...取名比较随意...

那就是合并文件用的

可以打开文件看一下

standardized目录下的文件截取一行

16,-0.0677916519216,-1.23798593805,1.59687725081,-3.03993409184,-0.00306168273249,-0.0262872999946,-0.00667341089394,-0.0477201370966,-0.00257146549756,-0.0441358669765,-0.00978217473084,-0.417191282147,-0.00567867840782,-0.0105519415435,-0.00467566599809,-0.00564000635973,-0.011232070172,-0.00991896489162,-0.0276317877068,0.0,0.0,-0.0372626246721,-0.404816973483,-1.14040006348,-0.464089282412,-0.463520002343,4.06939148225,4.05898474876,-1.91027154631,0.596281475651,-0.203632862202,0.347966485214,-1.66602170748,-1.71327236727,0.449337984497,-1.25061954309,-0.15862913227,-0.464417155895,-0.463201923033,4.08467150823,4.09571132604


是这样的,第一个数据16就是将这样一行数据进行预测和归类后的簇的名字

它属于第16簇

然后这就难办了

labels是保存另一个文件之中,而聚类的结果保存在另一个文件中

怎么合并

反正到这里我是用土方法,有可以在spark中就将数据合并并输出的同学请联系我

super_big_hero@sina.com

介绍一下我的土办法吧

这个github的传送门

土办法

然后源代码用的时候还是要改一改,把文件名改一下就好

然后就等着它把所有数据每一行每一行的对应合并到一起

要提取结果的还有一步...

把最后的结果文件,我的是gg系列拷到另一个文件中

然后执行下面这个代码

github

得到数据结果

最后就可以在terminal中看到输出结果了

最后我的输出是这样的



从最后那个count.py代码中大量的注释也可以看出,本人对于这个数据处理的过程还是左右摇摆的

到底怎么分离和记录不同的类型

最后还是用了按簇保存,每个簇保存在一个文件中

最后用了一下pandas的value_counts()函数输出结果

然后欢迎知道怎么直接在spark中合并数据的同学发邮件给我

super_big_hero@sina.com

原谅我不会取个好名字...

对代码有修改意见的可以直接上github fork一下,修改一下,然后push给我
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: