RDD创建方式(集合,本地,HDFS)
2016-05-13 17:41
549 查看
1.RDD创建的几个方式
2.RDD创建实战
3.RDD内幕
第一个RDD:代表了Spark应用程序输入数据的来源
通过Transformation来对RDD进行各种算子的转换,实现算法
RDD创建方式:
1,使用程序中的集合创建RDD;
2,使用本地文件系统创建RDD;
3,使用HDFS创建RDD
4,基于DB创建RDD(Oracle,SQL)
5,基于NoSQL,例如HBase
6,基于S3创建RDD
7,基于数据流创建RDD(Socket)
1,通过集合创建RDD的实际意义:测试!
2,使用本地文件系统创建RDD的作用,测试大量数据的文件
3,使用HDFS来创建RDD 生产环境最常用的RDD创建方式(Hadoop+Spark=目前大数据领域最有前途的组合)
4. 4,5,6三种方式要做getPreferedLocations
可以在智能设备。例如手机,平板,电视上使用Spark,
也可以在PC和Sever使用Spark,只要有jvm就可以运行
Local模式,默认情况下如果失败了就是失败了
若为URES_REGEX(threads,maxFailures)失败了可以重试
默认4*8个线程,可修改
每个Core可以承载2-4个Partition32*2-4–》64-128之间,区间是因为若并行数据量特别大,内存受限,计算特别复杂,CPU为短板,
数据的位置被BlockManager管理的(磁盘,Tachyon)
通过集合创建RDD
运行结果
使用本地文件系统创建RDD
运行结果
4,基于DB创建RDD
创建方式:
1>首先创建数据库连接函数
试用场景:
从数据库读数据,适用于结构化数据的处理
5,基于NoSQL,例如HBase
创建方式:
试用场景:
从Nosql数据库中读取数据,适用于那些互联网公司
6,基于S3创建RDD
创建方式:
你首先把你的S3访问凭据设置为AWS_ACCESS_KEY_ID和AWS_SECRET_ACCESS_KEY环境变量
试用场景:
从S3中读取数据,适用中小型公司,没有足够的存储设备
7,基于数据流创建RDD
创建方式:
试用场景:
适用于实时数据处理
2.RDD创建实战
3.RDD内幕
第一个RDD:代表了Spark应用程序输入数据的来源
通过Transformation来对RDD进行各种算子的转换,实现算法
RDD创建方式:
1,使用程序中的集合创建RDD;
2,使用本地文件系统创建RDD;
3,使用HDFS创建RDD
4,基于DB创建RDD(Oracle,SQL)
5,基于NoSQL,例如HBase
6,基于S3创建RDD
7,基于数据流创建RDD(Socket)
1,通过集合创建RDD的实际意义:测试!
2,使用本地文件系统创建RDD的作用,测试大量数据的文件
3,使用HDFS来创建RDD 生产环境最常用的RDD创建方式(Hadoop+Spark=目前大数据领域最有前途的组合)
4. 4,5,6三种方式要做getPreferedLocations
可以在智能设备。例如手机,平板,电视上使用Spark,
也可以在PC和Sever使用Spark,只要有jvm就可以运行
Local模式,默认情况下如果失败了就是失败了
若为URES_REGEX(threads,maxFailures)失败了可以重试
默认4*8个线程,可修改
每个Core可以承载2-4个Partition32*2-4–》64-128之间,区间是因为若并行数据量特别大,内存受限,计算特别复杂,CPU为短板,
数据的位置被BlockManager管理的(磁盘,Tachyon)
通过集合创建RDD
package com.dt.spark.cores import org.apache.spark.SparkConf import org.apache.spark.SparkContext object RDDBasedOnCollections { def main(args :Array[String]) { val conf=new SparkConf() conf.setAppName("RDDBasedOnCollections") conf.setMaster("local") val sc =new SparkContext(conf) //创建一个scala集合 val numbers=1 to 100 val rdd=sc.parallelize(numbers) val sum=rdd.reduce(_+_) print("1+2+...+100: "+sum) } }
运行结果
使用本地文件系统创建RDD
package com.dt.spark.cores import org.apache.spark.SparkConf import org.apache.spark.SparkContext object RDDBasedOnLocalFile { def main(args :Array[String]) { val conf=new SparkConf() conf.setAppName("RDDBasedOnCollections") conf.setMaster("local") val sc =new SparkContext(conf) val rdd=sc.textFile("D://spark-1.6.1-bin-hadoop2.6//README.md", 1) //计算所有行的总和 val linesLength =rdd.map { line => line.length } //在lingsLength RDD基础上进行相加 val sum=linesLength.reduce(_+_) print("the total length of the file of ReadMe:"+sum) } }
运行结果
4,基于DB创建RDD
创建方式:
1>首先创建数据库连接函数
def createDbconnection() = { Class.forName("com.mysql.jdbc.Driver").newInstance() DriverManager.getConnection("jdbc:mysql://localhost/test/?user=root") } def extractValues(r:ResultSet)= { (r.getInt(1),r.getString(2)) } val data = new JdbcRDD(sc,createDbconnection,"select * from userlist", lowerBound =1,upperBound =3,numPartition =2,mapRow=extractValues) println(data.collect.tolist)
试用场景:
从数据库读数据,适用于结构化数据的处理
5,基于NoSQL,例如HBase
创建方式:
val conf = new SparkConf(true).set("spark.cassandra.connection.host","hostname") val sc = new SparkConetext(conf) val data = sc.cassandraTable("userlist","kv")
试用场景:
从Nosql数据库中读取数据,适用于那些互联网公司
6,基于S3创建RDD
创建方式:
你首先把你的S3访问凭据设置为AWS_ACCESS_KEY_ID和AWS_SECRET_ACCESS_KEY环境变量
sc.textFile("s3n://***:***@filepath")
试用场景:
从S3中读取数据,适用中小型公司,没有足够的存储设备
7,基于数据流创建RDD
创建方式:
// Create a local StreamingContext with two working thread and batch interval of 1 second. // The master requires 2 cores to prevent from a starvation scenario. val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount") val ssc = new StreamingContext(conf, Seconds(1)) // Create a DStream that will connect to hostname:port, like localhost:9999 val lines = ssc.socketTextStream("localhost", 9999)
试用场景:
适用于实时数据处理
相关文章推荐
- hadoop工作流引擎azkaban
- 利用HDFs上传文件的错误
- HDFS读写程序小测试
- 为什么HDFS的副本数通常选择3?
- Hadoop中map数的计算
- shell脚本新增或修改hadoop的xml配置文件
- Alluxio1.0.1最新版(Tachyon为其前身)介绍,+HDFS分布式环境搭建
- HADOOP监控
- Hadoop集群日常运维
- 使用hdfs-slurper 将数据导入hdfs
- hdfs获取文件列表,利用正则表达式选择文件并作下一步处理
- hadoop2.6在window上搭建测试环境
- HDFS文件上传,命令行模式执行WordCount自带实例
- hdfs的实现机制和文件系统概念
- hadoop namenode safe mode
- 漫画解读HDFS机制
- HDFS 读写流程
- HDFS块检查命令Fsck机理的相关分析
- HDFS跨集群数据合并方案之ViewFileSystem
- Hadoop 2.x伪分布式环境搭建详细步骤