您的位置:首页 > 理论基础 > 计算机网络

spark高级数据分析实战--网络流量异常检测1

2016-07-22 23:58 756 查看

项目结构图

项目结构如下图所示



CountClass.scala

package internet

import org.apache.spark.mllib.clustering.{KMeans, KMeansModel}
import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.rdd.RDD

/**
* Created by 汪本成 on 2016/7/22.
*/
object CountClass {

/**
* 欧氏距离公式
* x.toArray.zip(y.toArray)对应 "两个向量相应元素"
* map(p => p._1 - p._2)对应 "差"
* map(d => d*d).sum对应 "平方和"
* math.sqrt()对应 "平方根"
*
* @param x
* @param y
* @return
*/
def distance(x: Vector, y: Vector) = {
math.sqrt(x.toArray.zip(y.toArray).map(p => p._1 - p._2).map(d => d*d).sum)
}

/**
* 欧氏距离公式应用到model中
* KMeansModel.predict方法中调用了KMeans对象的findCloest方法
*
* @param datum
* @param model
* @return
*/
def distToCentroid(datum: Vector, model: KMeansModel) = {
//找最短距离的点
val cluster = model.predict(datum)
//找中心点
val centroid = model.clusterCenters(cluster)
distance(centroid, datum)
}

/**
* k值model平均质心距离
*
* @param data RDD向量格式
* @param k  分类数
* @return
*/
def clusteringScore(data: RDD[Vector], k: Int) = {
val kmeans = new KMeans()
kmeans.setK(k)
val model = kmeans.run(data)
data.map(datum => distToCentroid(datum, model)).mean()
}

/**
* 对k的取值进行评价
* scala通常采用(x to y by z)这种形式建立一个数字集合,该集合的元素为闭合区间的等差数列
* 这种语法可用于建立一系列k值,然后对每个值分别执行莫项任务
* @param data
*/
def check(data: RDD[Vector]) = {
(5 to 40 by 5).map(k => (k, CountClass.clusteringScore(data, k))).foreach(println)
}
}

CheckValue1.scala

package internet

import org.apache.spark.{SparkContext, SparkConf}

/**
* Created by 汪本成 on 2016/7/22.
*/
object CheckValue1 {

def main(args: Array[String]) {

//创建入口对象
val conf = new SparkConf().setAppName("CheckValue1").setMaster("local")
val sc= new SparkContext(conf)
val HDFS_DATA_PATH = "hdfs://node1:9000/user/spark/sparkLearning/cluster/kddcup.data"
val rawData = sc.textFile(HDFS_DATA_PATH)

/**
* 实验一
* 分类统计样本个数,降序排序
*/
val sort_result = rawData.map(_.split(",").last).countByValue().toSeq.sortBy(_._2).reverse
sort_result.foreach(println)
//    程序结果运行如下:
//          (smurf.,2807886)
//          (neptune.,1072017)
//          (normal.,972781)
//          (satan.,15892)
//          (ipsweep.,12481)
//          (portsweep.,10413)
//          (nmap.,2316)
//          (back.,2203)
//          (warezclient.,1020)
//          (teardrop.,979)
//          (pod.,264)
//          (guess_passwd.,53)
//          (buffer_overflow.,30)
//          (land.,21)
//          (warezmaster.,20)
//          (imap.,12)
//          (rootkit.,10)
//          (loadmodule.,9)
//          (ftp_write.,8)
//          (multihop.,7)
//          (phf.,4)
//          (perl.,3)
//          (spy.,2)

}

}

CheckValue2.scala

package internet

import org.apache.spark.mllib.clustering.KMeans
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.mllib.linalg.Vectors

/**
* Created by 汪本成 on 2016/7/22.
*/
object CheckValue2 {

def main(args: Array[String]) {

//创建入口对象
val conf = new SparkConf().setAppName("CheckValue1").setMaster("local")
val sc= new SparkContext(conf)
val HDFS_DATA_PATH = "hdfs://node1:9000/user/spark/sparkLearning/cluster/kddcup.data"
val rawData = sc.textFile(HDFS_DATA_PATH)

val LabelsAndData = rawData.map{   //代码块执行RDD[String] => RDD[Vector]
line =>
//toBuffer创建一个可变列表(Buffer)
val buffer = line.split(",").toBuffer
buffer.remove(1, 3)
val label = buffer.remove(buffer.length-1)
val vector = Vectors.dense(buffer.map(_.toDouble).toArray)
(label, vector)
}
val data = LabelsAndData.values.cache()  //转化值并进行缓存

//建立kmeansModel
val kmeans = new KMeans()
val model = kmeans.run(data)
model.clusterCenters.foreach(println)

//    程序运行结果:
//          向量1:
//          [48.34019491959669,1834.6215497618625,826.2031900016945,5.7161172049003456E-6,
//          6.487793027561892E-4,7.961734678254053E-6,0.012437658596734055,
//          3.205108575604837E-5,0.14352904910348827,0.00808830584493399,
//          6.818511237273984E-5,3.6746467745787934E-5,0.012934960793560386,
//          0.0011887482315762398,7.430952366370449E-5,0.0010211435092468404,
//          0.0,4.082940860643104E-7,8.351655530445469E-4,334.9735084506668,
//          295.26714620807076,0.17797031701994342,0.1780369894027253,
//          0.05766489875327374,0.05772990937912739,0.7898841322630883,
//          0.021179610609908736,0.02826081009629284,232.98107822302248,
//          189.21428335201279,0.7537133898006421,0.030710978823798966,
//          0.6050519309248854,0.006464107887636004,0.1780911843182601,
//          0.17788589813474293,0.05792761150001131,0.05765922142400886]
//          向量2:
//          [10999.0,0.0,1.309937401E9,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,
//          0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,1.0,0.0,0.0,1.0,1.0,1.0,
//          0.0,0.0,255.0,1.0,0.0,0.65,1.0,0.0,0.0,0.0,1.0,1.0]
}
}

CheckValue3.scala

package internet

import org.apache.spark.mllib.clustering.KMeans
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.{SparkContext, SparkConf}

/**
* Created by 汪本成 on 2016/7/22.
*/
object CheckValue3 {

def main(args: Array[String]) {
//创建入口对象
val conf = new SparkConf().setAppName("CheckValue1").setMaster("local")
val sc= new SparkContext(conf)
val HDFS_DATA_PATH = "hdfs://node1:9000/user/spark/sparkLearning/cluster/kddcup.data"
val rawData = sc.textFile(HDFS_DATA_PATH)

val LabelsAndData = rawData.map{   //代码块执行RDD[String] => RDD[Vector]
line =>
//toBuffer创建一个可变列表(Buffer)
val buffer = line.split(",").toBuffer
buffer.remove(1, 3)
val label = buffer.remove(buffer.length-1)
val vector = Vectors.dense(buffer.map(_.toDouble).toArray)
(label, vector)
}
val data = LabelsAndData.values.cache()  //转化值并进行缓存

//建立kmeansModel
val kmeans = new KMeans()
val model = kmeans.run(data)

/**由CheckValue1已知该数据集有23类,CheckValue2分类肯定不准确,所以下面我利用给定的类别标号信息来
*直观的看到分好的簇中包含哪些类型的样本,对每个簇中的标号进行计数,并以可读的方式输出
*/
//对标号进行计数
val clusterLabelCount = LabelsAndData.map {
case (label, datum) =>
val cluster = model.predict(datum)
(cluster, label)
}.countByValue()
//将簇-类别进行计数,输出
println("计数结果如下")
clusterLabelCount.toSeq.sorted.foreach {
case ((cluster, label), count) =>
//使用字符插值器对变量的输出进行格式化
println(f"$cluster%1s$label%18s$count%8s")
}

//    计数结果如下
//    0             back.    2203
//    0  buffer_overflow.      30
//    0        ftp_write.       8
//    0     guess_passwd.      53
//    0             imap.      12
//    0          ipsweep.   12481
//    0             land.      21
//    0       loadmodule.       9
//    0         multihop.       7
//    0          neptune. 1072017
//    0             nmap.    2316
//    0           normal.  972781
//    0             perl.       3
//    0              phf.       4
//    0              pod.     264
//    0        portsweep.   10412
//    0          rootkit.      10
//    0            satan.   15892
//    0            smurf. 2807886
//    0              spy.       2
//    0         teardrop.     979
//    0      warezclient.    1020
//    0      warezmaster.      20
//    1        portsweep.       1
}

}

CheckValue4.scala

package internet

import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.mllib.linalg.Vectors

/**
* Created by 汪本成 on 2016/7/22.
*/
object CheckValue4 {

def main(args: Array[String]) {

//创建入口对象
val conf = new SparkConf().setAppName("CheckValue1").setMaster("local")
val sc= new SparkContext(conf)
val HDFS_DATA_PATH = "hdfs://node1:9000/user/spark/sparkLearning/cluster/kddcup.data"
val rawData = sc.textFile(HDFS_DATA_PATH)

val LabelsAndData = rawData.map{   //代码块执行RDD[String] => RDD[Vector]
line =>
//toBuffer创建一个可变列表(Buffer)
val buffer = line.split(",").toBuffer
buffer.remove(1, 3)
val label = buffer.remove(buffer.length-1)
val vector = Vectors.dense(buffer.map(_.toDouble).toArray)
(label, vector)
}
val data = LabelsAndData.values.cache()  //转化值并进行缓存

CountClass.check(data)     //给k的取值进行评价,k=(5,10,15,20,25,30,35,40)
//    运行结果:
//    (5,1938.8583418059188)
//    (10,1629.469780026074)
//    (15,1380.2560462290849)
//    (20,1309.6468484397622)
//    (25,1041.0183009597501)
//    (30,1007.0769941770079)
//    (35,562.989358358847)
//    (40,421.86047502003527)
}

}

CheckValue5.scala

package internet

import org.apache.spark.mllib.clustering.KMeans
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.{SparkContext, SparkConf}

/**
* Created by 汪本成 on 2016/7/22.
*/
object CheckValue5 {
def main(args: Array[String]) {

//创建入口对象
val conf = new SparkConf().setAppName("CheckValue1").setMaster("local")
val sc= new SparkContext(conf)
val HDFS_DATA_PATH = "hdfs://node1:9000/user/spark/sparkLearning/cluster/kddcup.data"
val rawData = sc.textFile(HDFS_DATA_PATH)

val LabelsAndData = rawData.map{   //代码块执行RDD[String] => RDD[Vector]
line =>
//toBuffer创建一个可变列表(Buffer)
val buffer = line.split(",").toBuffer
buffer.remove(1, 3)
val label = buffer.remove(buffer.length-1)
val vector = Vectors.dense(buffer.map(_.toDouble).toArray)
(label, vector)
}
val data = LabelsAndData.values.cache()  //转化值并进行缓存

//建立kmeansModel
val kmeans = new KMeans()
val model = kmeans.run(data)
//设置给定k值的运行次数
kmeans.setRuns(10)
kmeans.setEpsilon(1.0e-6)
(30 to 100 by 10).par.map(k => (k, CountClass.clusteringScore(data, k))).toList.foreach(println)

//    程序运行如下:
//          (30,584.3824466136748)
//          (40,473.13630965059355)
//          (50,397.1680468789708)
//          (60,224.7032729131013)
//          (70,209.75091102083454)
//          (80,189.24155085526263)
//          (90,192.57698780271707)
//          (100,119.81903683729702)

/**
* 总结:随着k的增大,结果得分持续下降,我们要找到k值的临界点,过了这个临界点之后继续增加k值并不会显著降低得分
* 这个点的k值-得分曲线的拐点。这条曲线通常在拐点之后会继续下行但最终趋于水平。
* 在本实例中k>100之后得分下降很明显,故得出k的拐点应该大于100
*/

}
}

README.txt

internet包下的代码主要目的是做个网络流量异常的检测的实验
数据集是基于KDD Cup1999数据集建立生产系统,举办方已经对这个数据集(网络流量包)进行了加工

1、CheckValue1是对数据集里面的样本进行统计和分类,按降序排序

2、CheckValue2是对数据集里面的样本进行建立KMeansModel,在输出每个簇的质心

3、CheckValue3是对数据集里面的样本中的每个簇中每个标号出现的次数进行计数,按照簇-类别计数,并且按照可读的方法输出

4、CheckValue4是对数据集里面的样本进行对k的取值进行评价

5、CheckValue5是对数据集进行再次k的取值然后进行评价操作,来大概统计出k的最佳值在什么区间,这里用到的是利用k值进行多次聚类操作,
利用setRuns()方法给定k的运行次数

6、CountClass单例对象里面封装了欧氏距离公式和将欧氏距离应用到model中,还有计算k值model平均质心距离和对k的取值进行评价的方法

说明:这个小项目主要完成的是对网络流量异常的检测

以上是我写的小程序供大家参考,以后没事还会总结一些别的,谢谢,有问题可以留评论给我,我会的话会及时帮大家解决,不会的还要请大家多多指教,我是菜鸟,呵呵,下篇继续完善这个项目,我会尽快给出后面的异常检测部分
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: