Spark查找某个IP的归属地,二分算法,try{}catch{}的使用,将结果存MySQL数据库
2017-07-09 12:39
555 查看
1、创建Maven工程
调整Maven仓库所在的位置,具体参考:http://blog.csdn.net/tototuzuoquan/article/details/745713742、编写Pom文件
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>cn.toto.spark</groupId> <artifactId>bigdata</artifactId> <version>1.0-SNAPSHOT</version> <properties> <maven.compiler.source>1.7</maven.compiler.source> <maven.compiler.target>1.7</maven.compiler.target> <encoding>UTF-8</encoding> <scala.version>2.10.6</scala.version> <spark.version>1.6.2</spark.version> <hadoop.version>2.6.4</hadoop.version> </properties> <dependencies> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>${scala.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.10</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>${hadoop.version}</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.38</version> </dependency> </dependencies> <build> <sourceDirectory>src/main/scala</sourceDirectory> <testSourceDirectory>src/test/scala</testSourceDirectory> <plugins> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <version>3.2.2</version> <executions> <execution> <goals> <goal>compile</goal> <goal>testCompile</goal> </goals> <configuration> <args> <arg>-make:transitive</arg> <arg>-dependencyfile</arg> <arg>${project.build.directory}/.scala_dependencies</arg> </args> </configuration> </execution> </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>2.4.3</version> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <filters> <filter> <artifact>*:*</artifact> <excludes> <exclude>META-INF/*.SF</exclude> <exclude>META-INF/*.DSA</exclude> <exclude>META-INF/*.RSA</exclude> </excludes> </filter> </filters> </configuration> </execution> </executions> </plugin> </plugins> </build> </project>
3、准备要处理的文件
其中ip信息的文件(ip.txt)如下:1.0.1.0|1.0.3.255|16777472|16778239|亚洲|中国|福建|福州||电信|350100|China|CN|119.306239|26.075302 1.0.8.0|1.0.15.255|16779264|16781311|亚洲|中国|广东|广州||电信|440100|China|CN|113.280637|23.125178 1.0.32.0|1.0.63.255|16785408|16793599|亚洲|中国|广东|广州||电信|440100|China|CN|113.280637|23.125178 1.1.0.0|1.1.0.255|16842752|16843007|亚洲|中国|福建|福州||电信|350100|China|CN|119.306239|26.075302 1.1.2.0|1.1.7.255|16843264|16844799|亚洲|中国|福建|福州||电信|350100|China|CN|119.306239|26.075302 1.1.8.0|1.1.63.255|16844800|16859135|亚洲|中国|广东|广州||电信|440100|China|CN|113.280637|23.125178 1.2.0.0|1.2.1.255|16908288|16908799|亚洲|中国|福建|福州||电信|350100|China|CN|119.306239|26.075302
数据访问文件(access.log)如下:**
20090121000132095572000|125.213.100.123|show.51.com|/shoplist.php?phpfile=shoplist2.php&style=1&sex=137|Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1; Mozilla/4.0(Compatible Mozilla/4.0(Compatible-EmbeddedWB 14.59 http://bsalsa.com/ EmbeddedWB- 14.59 from: http://bsalsa.com/ )|http://show.51.com/main.php| 20090121000132124542000|117.101.215.133|www.jiayuan.com|/19245971|Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1; TencentTraveler 4.0)|http://photo.jiayuan.com/index.php?uidhash=d1c3b69e9b8355a5204474c749fb76ef|__tkist=0; myloc=50%7C5008; myage=2009; PROFILE=14469674%3A%E8%8B%A6%E6%B6%A9%E5%92%96%E5%95%A1%3Am%3Aphotos2.love21cn.com%2F45%2F1b%2F388111afac8195cc5d91ea286cdd%3A1%3A%3Ahttp%3A%2F%2Fimages.love21cn.com%2Fw4%2Fglobal%2Fi%2Fhykj_m.jpg; last_login_time=1232454068; SESSION_HASH=8176b100a84c9a095315f916d7fcbcf10021e3af; RAW_HASH=008a1bc48ff9ebafa3d5b4815edd04e9e7978050; COMMON_HASH=45388111afac8195cc5d91ea286cdd1b; pop_1232093956=1232468896968; pop_time=1232466715734; pop_1232245908=1232469069390; pop_1219903726=1232477601937; LOVESESSID=98b54794575bf547ea4b55e07efa2e9e; main_search:14469674=%7C%7C%7C00; registeruid=14469674; REG_URL_COOKIE=http%3A%2F%2Fphoto.jiayuan.com%2Fshowphoto.php%3Fuid_hash%3D0319bc5e33ba35755c30a9d88aaf46dc%26total%3D6%26p%3D5; click_count=0%2C3363619 20090121000132406516000|117.101.222.68|gg.xiaonei.com|/view.jsp?p=389|Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1; CIBA)|http://home.xiaonei.com/Home.do?id=229670724|_r01_=1; __utma=204579609.31669176.1231940225.1232462740.1232467011.145; __utmz=204579609.1231940225.1.1.utmccn=(direct) 20090121000132581311000|115.120.36.118|tj.tt98.com|/tj.htm|Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1; TheWorld)|http://www.tt98.com/|
4.获取ip归属地信息
package cn.toto.spark import java.io.{BufferedReader, FileInputStream, InputStreamReader} import scala.collection.mutable.ArrayBuffer /** * Created by toto on 2017/7/8. * 查找IP的归属地信息 */ object IPLocationDemo { def ip2Long(ip: String): Long = { val fragments = ip.split("[.]") var ipNum = 0L for (i <- 0 until fragments.length){ ipNum = fragments(i).toLong | ipNum << 8L } ipNum } def readData(path: String) = { val br = new BufferedReader(new InputStreamReader(new FileInputStream(path))) var s: String = null var flag = true val lines = new ArrayBuffer[String]() while (flag) { s = br.readLine() if (s != null) lines += s else flag = false } lines } def binarySearch(lines: ArrayBuffer[String], ip: Long) : Int = { var low = 0 var high = lines.length - 1 while (low <= high) { val middle = (low + high) / 2 if ((ip >= lines(middle).split("\\|")(2).toLong) && (ip <= lines(middle).split("\\|")(3).toLong)) return middle if (ip < lines(middle).split("\\|")(2).toLong) high = middle - 1 else { low = middle + 1 } } -1 } /** * 运行后的结果是: * 2016917821 * 120.55.0.0|120.55.255.255|2016870400|2016935935|亚洲|中国|浙江|杭州||阿里巴巴|330100|China|CN|120.153576|30.287459 * * 要求2016917821 在 |2016870400|2016935935| 之间。 * @param args */ def main(args: Array[String]): Unit = { val ip = "120.55.185.61" val ipNum = ip2Long(ip) println(ipNum) val lines = readData("E:\\learnTempFolder\\ip.txt") val index = binarySearch(lines, ipNum) print(lines(index)) } }
运行结果:
5.查询IP归属地相关信息,并将这些信息存储到MySQL数据库中
代码如下:package cn.toto.spark import java.sql.{Connection, Date, DriverManager, PreparedStatement} import org.apache.spark.{SparkConf, SparkContext} /** * Created by toto on 2017/7/8. */ object IPLocation { val data2MySQL = (iterator: Iterator[(String, Int)]) => { var conn: Connection = null var ps : PreparedStatement = null val sql = "INSERT INTO location_info (location, counts, accesse_date) VALUES (?, ?, ?)" try { conn = DriverManager.getConnection("jdbc:mysql://192.168.106.100:3306/bigdata", "root", "123456") iterator.foreach(line => { ps = conn.prepareStatement(sql) ps.setString(1, line._1) ps.setInt(2, line._2) ps.setDate(3, new Date(System.currentTimeMillis())) ps.executeUpdate() }) } catch { case e: Exception => println("Mysql Exception") } finally { if (ps != null) ps.close() if (conn != null) conn.close() } } def ip2Long(ip: String): Long = { val fragments = ip.split("[.]") var ipNum = 0L for (i <- 0 until fragments.length){ ipNum = fragments(i).toLong | ipNum << 8L } ipNum } def binarySearch(lines: Array[(String, String, String)], ip: Long) : Int = { var low = 0 var high = lines.length - 1 while (low <= high) { val middle = (low + high) / 2 if ((ip >= lines(middle)._1.toLong) && (ip <= lines(middle)._2.toLong)) return middle if (ip < lines(middle)._1.toLong) high = middle - 1 else { low = middle + 1 } } -1 } def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local[2]").setAppName("IpLocation") val sc = new SparkContext(conf) val ipRulesRdd = sc.textFile("E://workspace//ip.txt").map(line =>{ val fields = line.split("\\|") val start_num = fields(2) val end_num = fields(3) val province = fields(6) (start_num, end_num, province) }) //全部的ip映射规则 val ipRulesArrary = ipRulesRdd.collect() //广播规则 val ipRulesBroadcast = sc.broadcast(ipRulesArrary) //加载要处理的数据 val ipsRDD = sc.textFile("E://workspace//access.log").map(line => { val fields = line.split("\\|") fields(1) }) val result = ipsRDD.map(ip => { val ipNum = ip2Long(ip) val index = binarySearch(ipRulesBroadcast.value, ipNum) val info = ipRulesBroadcast.value(index) //(ip的起始Num, ip的结束Num,省份名) info }).map(t => (t._3, 1)).reduceByKey(_+_) //向MySQL写入数据 result.foreachPartition(data2MySQL(_)) //println(result.collect().toBuffer) sc.stop() } }
数据库SQL:
CREATE DATABASE bigdata CHARACTER SET utf8; USE bigdata; CREATE TABLE location_info ( id INT(10) AUTO_INCREMENT PRIMARY KEY, location VARCHAR(100), counts INT(10), accesse_date DATE ) ENGINE=INNODB DEFAULT CHARSET=utf8;
运行程序,运行结果后:
相关文章推荐
- Spark中ip映射数据应用库,二分查找省份,将结果写入mysql
- Python中的递归函数及二分查找算法如何使用?
- 使用泛型封装返回结果以及使用委托封装try-catch流程
- Spark Streaming之使用Spark Streaming完成词频统计,并将结果写入到MySQL数据库中
- IntellJ iDEA中使用Maven 编译直接本地跑spark ,如何给算法传参数(仅作备忘记录)
- 如何使用try catch throw
- 二十一天学通C++之使用try/catch捕获异常
- Java数据结构和算法总结-数组、二分查找
- spark查询任意字段,并使用dataframe输出结果
- 二分查找算法Python3实现
- 使用“初中知识”实现查找重复最优算法 + 最终极限算法
- 算法--查找--二分查找
- asp.net 在使用Response.Redirect try{}catch{}块失效
- 算法----二分查找算法
- 准备面试需要知道的经典算法--二分查找
- 实用算法实现-第 5 篇 二分查找树
- 【算法基础】二分查找
- php关于二分查找的算法
- Kotlin练习 之使用Kotlin实现:二分查找,选择排序,冒泡排序,快速排序
- asp.net 在使用Response.Redirect try{}catch{}块失效