您的位置:首页 > 大数据 > Hadoop

RDD创建方式(集合,本地,HDFS)

2016-05-13 17:41 549 查看
1.RDD创建的几个方式

2.RDD创建实战

3.RDD内幕

第一个RDD:代表了Spark应用程序输入数据的来源

通过Transformation来对RDD进行各种算子的转换,实现算法

RDD创建方式

1,使用程序中的集合创建RDD;

2,使用本地文件系统创建RDD;

3,使用HDFS创建RDD

4,基于DB创建RDD(Oracle,SQL)

5,基于NoSQL,例如HBase

6,基于S3创建RDD

7,基于数据流创建RDD(Socket)

1,通过集合创建RDD的实际意义:测试

2,使用本地文件系统创建RDD的作用,测试大量数据的文件

3,使用HDFS来创建RDD 生产环境最常用的RDD创建方式(Hadoop+Spark=目前大数据领域最有前途的组合)

4. 4,5,6三种方式要做getPreferedLocations

可以在智能设备。例如手机,平板,电视上使用Spark,

也可以在PC和Sever使用Spark,只要有jvm就可以运行

Local模式,默认情况下如果失败了就是失败了

若为URES_REGEX(threads,maxFailures)失败了可以重试



默认4*8个线程,可修改



每个Core可以承载2-4个Partition32*2-4–》64-128之间,区间是因为若并行数据量特别大,内存受限,计算特别复杂,CPU为短板,

数据的位置被BlockManager管理的(磁盘,Tachyon)

通过集合创建RDD

package com.dt.spark.cores

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext

object RDDBasedOnCollections  {
def main(args :Array[String])
{
val conf=new SparkConf()
conf.setAppName("RDDBasedOnCollections")
conf.setMaster("local")
val sc =new SparkContext(conf)
//创建一个scala集合
val numbers=1 to 100
val rdd=sc.parallelize(numbers)
val sum=rdd.reduce(_+_)
print("1+2+...+100:   "+sum)
}
}


运行结果



使用本地文件系统创建RDD

package com.dt.spark.cores

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext

object RDDBasedOnLocalFile  {
def main(args :Array[String])
{
val conf=new SparkConf()
conf.setAppName("RDDBasedOnCollections")
conf.setMaster("local")
val sc =new SparkContext(conf)
val rdd=sc.textFile("D://spark-1.6.1-bin-hadoop2.6//README.md", 1)
//计算所有行的总和
val linesLength =rdd.map { line => line.length }
//在lingsLength RDD基础上进行相加
val sum=linesLength.reduce(_+_)
print("the total length of the file of ReadMe:"+sum)
}
}


运行结果



4,基于DB创建RDD

创建方式:

1>首先创建数据库连接函数

def createDbconnection() =
{
Class.forName("com.mysql.jdbc.Driver").newInstance()
DriverManager.getConnection("jdbc:mysql://localhost/test/?user=root")
}
def extractValues(r:ResultSet)=
{
(r.getInt(1),r.getString(2))
}

val data = new JdbcRDD(sc,createDbconnection,"select * from userlist", lowerBound =1,upperBound =3,numPartition =2,mapRow=extractValues)
println(data.collect.tolist)


试用场景:

从数据库读数据,适用于结构化数据的处理

5,基于NoSQL,例如HBase

创建方式:

val conf = new SparkConf(true).set("spark.cassandra.connection.host","hostname")
val sc = new SparkConetext(conf)
val data = sc.cassandraTable("userlist","kv")


试用场景:

从Nosql数据库中读取数据,适用于那些互联网公司

6,基于S3创建RDD

创建方式:

你首先把你的S3访问凭据设置为AWS_ACCESS_KEY_ID和AWS_SECRET_ACCESS_KEY环境变量

sc.textFile("s3n://***:***@filepath")


试用场景:

从S3中读取数据,适用中小型公司,没有足够的存储设备

7,基于数据流创建RDD

创建方式:

// Create a local StreamingContext with two working thread and batch interval of 1 second.
// The master requires 2 cores to prevent from a starvation scenario.
val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(conf, Seconds(1))

// Create a DStream that will connect to hostname:port, like localhost:9999
val lines = ssc.socketTextStream("localhost", 9999)


试用场景:

适用于实时数据处理
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: