基于pyspark的对KDD-99数据集的聚类分析实验
2016-05-06 10:49
190 查看
官话套话不想讲,介绍也不想打,都知道pyspark和KDD-99是啥吧?
不知道的话...点这里1
或者这里2
转载记得注明出处
/article/10057918.html
pyspark本身是用Scala语言编写的,而Scala语言呢又是Java的变形状态,虽说spark也支持Python,但是还是没有Scala支持的好,对于pyspark的书也很少.
所以恰好前几天研究了一些,现在跟大家分享交流一下吧.
首先我是用替换后的kdd-99-10-precent文件,怎么替换文件,看这里 替换文件
然后结果大概有个70多M的样子,
我们先打开spark.
使用terminal进入spark的主目录
然后输入IPYTHON=1 ./bin/pyspark
对,没错,我用了IPython, IPython是啥?点这里IPython
然后将你替换后的文件放进spark主目录的新建文件夹1中
回到terminal中导入模块
然后定义四个函数
第一个函数parse_interaction(line),是将文件最后那个labels的小尾巴切下来的
第二个函数distance(a, b),一看也知道,是计算两个点之间的空间距离的
第三个函数dist_to_centroid(datum, clusters)是用来计算每个点到聚类中心的距离的
第四个函数clustering_score(data, k)就是所有函数中的中心函数了,它训练kmeans,然后调用第三个函数计算,最后返回一个result包含了这次聚类训练用的k值和中心点的坐标和所有点到中心点距离的平均值.
本文所有的工作都是围绕这个函数进行的.
函数定义完之后,开始前期的数据准备工作
定义一下
解释一下,max_k是对k值选取测试时候将要使用测试k值的最大值,如果你计算资源丰富的话也可以选100,150之类的,这里暂定为30
然后data_file是替换文件的位置.刚刚说的在spark主目录下新建一个文件夹1,然后将替换后的文件放进去就行
载入数据
这一步就是将数据集中标签文本切出来
方便以后的比对工作
这一步就是将数据进行切割,将不能计算的labels项切掉
然后将数据读入内寸中
然后数据中的数据矩阵看上去很稀疏,现在我们将其转化为密集矩阵
使用Spark的规整化函数将数据密集话
然后得到规整化后的数据standardized_data_values
然后就是将数据进行训练了
这一步里面的range函数中有三个参数10, max_k+1,10
还是那句话,如果你计算资源丰富的话,可以将最后那个10改为5或者直接删掉
那个10的意义是在10到max_k+1的范围内的步伐数
就是一步跳个10步的样子
第一次k取10,第二次就直接跳到20了.
然后输出一下
best_k就是最好的k取值
还记得那个函数的返回值不
现在我们将其中训练好的model提取出来
这里提取出的最好模块,也就是最佳k值训练出来的model
最后就是聚类运算kmeans
然后保存计算结果
standardized目录下生成的文件
labels目录下生成的文件也和上面的名字一样,只是在的目录不同罢了
standardized目录下的文件中,每一个文件都和labels目录中的文件一一对应,每个文件中的每一行也是一一对应
下面我们将standardized中文件按照顺序改为
好吧,我知道名字很烂,但是简单...
哈哈
然后labels目录下的也是改名字
改名之后将所有除了_SUCCESSS的文件,拖入一个单独文件中
忽视那个gg开头的文件,那是运行后的输出文件
你也看到那个ll.py了...取名比较随意...
那就是合并文件用的
可以打开文件看一下
standardized目录下的文件截取一行
是这样的,第一个数据16就是将这样一行数据进行预测和归类后的簇的名字
它属于第16簇
然后这就难办了
labels是保存另一个文件之中,而聚类的结果保存在另一个文件中
怎么合并
反正到这里我是用土方法,有可以在spark中就将数据合并并输出的同学请联系我
super_big_hero@sina.com
介绍一下我的土办法吧
这个github的传送门
土办法
然后源代码用的时候还是要改一改,把文件名改一下就好
然后就等着它把所有数据每一行每一行的对应合并到一起
要提取结果的还有一步...
把最后的结果文件,我的是gg系列拷到另一个文件中
然后执行下面这个代码
github
得到数据结果
最后就可以在terminal中看到输出结果了
最后我的输出是这样的
从最后那个count.py代码中大量的注释也可以看出,本人对于这个数据处理的过程还是左右摇摆的
到底怎么分离和记录不同的类型
最后还是用了按簇保存,每个簇保存在一个文件中
最后用了一下pandas的value_counts()函数输出结果
然后欢迎知道怎么直接在spark中合并数据的同学发邮件给我
super_big_hero@sina.com
原谅我不会取个好名字...
对代码有修改意见的可以直接上github fork一下,修改一下,然后push给我
不知道的话...点这里1
或者这里2
转载记得注明出处
/article/10057918.html
pyspark本身是用Scala语言编写的,而Scala语言呢又是Java的变形状态,虽说spark也支持Python,但是还是没有Scala支持的好,对于pyspark的书也很少.
所以恰好前几天研究了一些,现在跟大家分享交流一下吧.
首先我是用替换后的kdd-99-10-precent文件,怎么替换文件,看这里 替换文件
然后结果大概有个70多M的样子,
我们先打开spark.
使用terminal进入spark的主目录
然后输入IPYTHON=1 ./bin/pyspark
对,没错,我用了IPython, IPython是啥?点这里IPython
然后将你替换后的文件放进spark主目录的新建文件夹1中
回到terminal中导入模块
from pyspark import SparkContext, SparkConf from pyspark.mllib.clustering import KMeans from pyspark.mllib.feature import StandardScaler from numpy import array from math import sqrt
然后定义四个函数
def parse_interaction(line): line_split = line.split(",") clean_line_split = line_split[0:-1] return (line_split[-1], array([float(x) for x in clean_line_split])) def distance(a, b): return sqrt(a.zip(b).map(lambda x: (x[0]-x[1])).map(lambda x: x*x).reduce(lambda a,b: a+b)) def dist_to_centroid(datum, clusters): cluster = clusters.predict(datum) centroid = clusters.centers[cluster] return sqrt(sum([x**2 for x in (centroid - datum)])) def clustering_score(data, k): clusters = KMeans.train(data, k, maxIterations=10, runs=5, initializationMode="random") result = (k, clusters, data.map(lambda datum: dist_to_centroid(datum, clusters)).mean()) print "Clustering score for k=%(k)d is %(score)f" % {"k": k, "score": result[2]} return result
第一个函数parse_interaction(line),是将文件最后那个labels的小尾巴切下来的
第二个函数distance(a, b),一看也知道,是计算两个点之间的空间距离的
第三个函数dist_to_centroid(datum, clusters)是用来计算每个点到聚类中心的距离的
第四个函数clustering_score(data, k)就是所有函数中的中心函数了,它训练kmeans,然后调用第三个函数计算,最后返回一个result包含了这次聚类训练用的k值和中心点的坐标和所有点到中心点距离的平均值.
本文所有的工作都是围绕这个函数进行的.
函数定义完之后,开始前期的数据准备工作
定义一下
max_k = 30 data_file = "1/result"
解释一下,max_k是对k值选取测试时候将要使用测试k值的最大值,如果你计算资源丰富的话也可以选100,150之类的,这里暂定为30
然后data_file是替换文件的位置.刚刚说的在spark主目录下新建一个文件夹1,然后将替换后的文件放进去就行
raw_data = sc.textFile(data_file)
载入数据
labels = raw_data.map(lambda line: line.strip().split(",")[-1])
这一步就是将数据集中标签文本切出来
方便以后的比对工作
parsed_data = raw_data.map(parse_interaction) parsed_data_values = parsed_data.values().cache()
这一步就是将数据进行切割,将不能计算的labels项切掉
然后将数据读入内寸中
然后数据中的数据矩阵看上去很稀疏,现在我们将其转化为密集矩阵
standardizer = StandardScaler(True, True) standardizer_model = standardizer.fit(parsed_data_values) standardized_data_values = standardizer_model.transform(parsed_data_values)
使用Spark的规整化函数将数据密集话
然后得到规整化后的数据standardized_data_values
然后就是将数据进行训练了
scores = map(lambda k: clustering_score(standardized_data_values, k), range(10, max_k+1, 10))
这一步里面的range函数中有三个参数10, max_k+1,10
还是那句话,如果你计算资源丰富的话,可以将最后那个10改为5或者直接删掉
那个10的意义是在10到max_k+1的范围内的步伐数
就是一步跳个10步的样子
第一次k取10,第二次就直接跳到20了.
然后输出一下
min_k = min(scores, key=lambda x: x[2])[0] print "Best k value is %(best_k)d" % {"best_k": min_k}
best_k就是最好的k取值
还记得那个函数的返回值不
现在我们将其中训练好的model提取出来
best_model = min(scores, key=lambda x: x[2])[1]
这里提取出的最好模块,也就是最佳k值训练出来的model
最后就是聚类运算kmeans
cluster_assignments_sample = standardized_data_values.map(lambda datum: str(best_model.predict(datum))+","+",".join(map(str,datum)))
然后保存计算结果
cluster_assignments_sample.saveAsTextFile("standardized") labels.saveAsTextFile("labels")
standardized目录下生成的文件
labels目录下生成的文件也和上面的名字一样,只是在的目录不同罢了
standardized目录下的文件中,每一个文件都和labels目录中的文件一一对应,每个文件中的每一行也是一一对应
下面我们将standardized中文件按照顺序改为
好吧,我知道名字很烂,但是简单...
哈哈
然后labels目录下的也是改名字
改名之后将所有除了_SUCCESSS的文件,拖入一个单独文件中
忽视那个gg开头的文件,那是运行后的输出文件
你也看到那个ll.py了...取名比较随意...
那就是合并文件用的
可以打开文件看一下
standardized目录下的文件截取一行
16,-0.0677916519216,-1.23798593805,1.59687725081,-3.03993409184,-0.00306168273249,-0.0262872999946,-0.00667341089394,-0.0477201370966,-0.00257146549756,-0.0441358669765,-0.00978217473084,-0.417191282147,-0.00567867840782,-0.0105519415435,-0.00467566599809,-0.00564000635973,-0.011232070172,-0.00991896489162,-0.0276317877068,0.0,0.0,-0.0372626246721,-0.404816973483,-1.14040006348,-0.464089282412,-0.463520002343,4.06939148225,4.05898474876,-1.91027154631,0.596281475651,-0.203632862202,0.347966485214,-1.66602170748,-1.71327236727,0.449337984497,-1.25061954309,-0.15862913227,-0.464417155895,-0.463201923033,4.08467150823,4.09571132604
是这样的,第一个数据16就是将这样一行数据进行预测和归类后的簇的名字
它属于第16簇
然后这就难办了
labels是保存另一个文件之中,而聚类的结果保存在另一个文件中
怎么合并
反正到这里我是用土方法,有可以在spark中就将数据合并并输出的同学请联系我
super_big_hero@sina.com
介绍一下我的土办法吧
这个github的传送门
土办法
然后源代码用的时候还是要改一改,把文件名改一下就好
然后就等着它把所有数据每一行每一行的对应合并到一起
要提取结果的还有一步...
把最后的结果文件,我的是gg系列拷到另一个文件中
然后执行下面这个代码
github
得到数据结果
最后就可以在terminal中看到输出结果了
最后我的输出是这样的
从最后那个count.py代码中大量的注释也可以看出,本人对于这个数据处理的过程还是左右摇摆的
到底怎么分离和记录不同的类型
最后还是用了按簇保存,每个簇保存在一个文件中
最后用了一下pandas的value_counts()函数输出结果
然后欢迎知道怎么直接在spark中合并数据的同学发邮件给我
super_big_hero@sina.com
原谅我不会取个好名字...
对代码有修改意见的可以直接上github fork一下,修改一下,然后push给我
相关文章推荐
- Ubuntu下获取内核源码
- [51NOD1087]1 10 100 1000(规律,二分)
- RecyclerView学习(一)----初步认知
- 我对View层的视图模板解析引擎的认识(Java&PHP)
- vb6不支持多重接口继承
- 开机启动服务
- 查看端口的使用情况
- Linux-36-linux基础重要命令13(L005-18)
- 浅谈c++中的复制构造函数
- 1.1 大型网站软件系统的特点[读书敲录]
- Linux进程管理之“四大名捕”
- CSS 实现:父元素包含子元素,子元素垂直居中布局
- 加密解密
- 重启apk
- IP数据包个人理解
- 基于支持向量机(SVM)的人脸识别
- mybatis映射文件中<if>使用注意事项
- Android Activity的4大加载模式(launchmode)
- 26. Remove Duplicates from Sorted Array
- 第一章 大型网站架构演化[手工敲录]