Spark读取本地文件操作
2017-08-26 16:41
323 查看
import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.sql.DataFrame; import org.apache.spark.sql.Row; import org.apache.spark.sql.SQLContext; import spark.mapreduce.com.SparkUitl_Java2; import java.util.List; /** * Created by student on 2017/8/23. */ public class SparkSqlByText_Java { public static void main(String[] args) { final String textInput = "C:\\Users\\student\\modules\\datas\\person.txt"; final String tableName = "person"; System.setProperty("hadoop.home.dir", "C:\\Users\\student\\modules\\hadoop-2.6.0-cdh5.8.5"); SparkConf conf = new SparkConf().setAppName("SparkSqlByText_Java").setMaster(SparkUitl_Java2.master); JavaSparkContext jsc = new JavaSparkContext(conf); SQLContext sqlCon = new SQLContext(jsc); // 读取文件text类型 JavaRDD<String> lines = jsc.textFile(SparkUitl_Java2.textInput); // 行格式 JavaRDD<Person> persons = lines.map(new Function<String, Person>() { Person person = null; @Override public Person call(String v1) throws Exception { String[] strs = v1.split(","); person = new Person(); person.setId(Integer.parseInt(strs[0])); person.setName(strs[1]); person.setAge(Integer.parseInt(strs[2])); person.setSex(strs[3]); person.setAddr(strs[4]); return person; } }); //创建 DataFrame收集person类信息 DataFrame df = sqlCon.createDataFrame(persons, Person.class); //register 注册表名 df.registerTempTable(SparkUitl_Java2.tableName); //table operater String sql = "select * from "+ SparkUitl_Java2.tableName+""; DataFrame dfSql = sqlCon.sql(sql); JavaRDD<Row> rowRDD = dfSql.javaRDD(); //row foreach 将收集的信息已列形式set到person类列里面 JavaRDD<Person> personResult = rowRDD.map(new Function<Row, Person>() { Person person = null; @Override public Person call(Row v1) throws Exception { //System.out.println(v1.get(0) + ":" + v1.get(1) + ":" + v1.get(2) +":" + v1.get(3) + ":" + v1.get(4)); //将列中得到的数据放到person对象中,spark中以列名排序a-z person = new Person(); person.setId(v1.getInt(2)); person.setName(v1.getString(3)); person.setAge(v1.getInt(1)); person.setSex(v1.getString(4)); person.setAddr(v1.getString(0)); return person; } }); //打印结果 List<Person> list = personResult.collect(); for(Person val:list){ System.out.println(val); } } }spark要导入text的内容:10001,zhang1,21,male,shanghai110002,zhang2,22,male,shanghai210003,zhang3,23,male,shanghai310004,zhang4,24,male,shanghai4相对比较scala语言比较简洁:object SparkSqlByText_Scala {def main(args: Array[String]) {System.setProperty("hadoop.home.dir", "C:\\Users\\student\\modules\\hadoop-2.6.0-cdh5.8.5")val conf = new SparkConf().setAppName("").setMaster(SparkUitl_Java2.master)val sc = new SparkContext(conf)val sqlCon = new SQLContext(sc)//read textval lines = sc.textFile(SparkUitl_Java2.textInput);var person:Person = nullval persons = lines.map(line=>{val items = line.split(",")person = new Personperson.setId(items(0).toInt)person.setName(items(1))person.setAge(items(2).toInt)person.setSex(items(3))person.setAddr(items(4))person})//create dataframeval df = sqlCon.createDataFrame(persons, new Person().getClass)//register tabledf.registerTempTable(SparkUitl_Java2.tableName)//table operaterval sql:String = "select * from "+SparkUitl_Java2.tableName+""val dfSql = sqlCon.sql(sql)//row foreachval personResult = dfSql.map(row => {person = new Personperson.setId(row.getInt(2))person.setName(row.getString(3))person.setAge(row.getInt(1))person.setSex(row.getString(4))person.setAddr(row.getString(0))person})//print resultpersonResult.collect().foreach(p =>{println(p)})
} }以下为json格式文件:{"id":10001,"name":"zhang1","age":21,"sex":"male","addr":"shanghai1"}{"id":10002,"name":"zhang2","age":22,"sex":"male","addr":"shanghai2"}{"id":10003,"name":"zhang3","age":23,"sex":"male","addr":"shanghai3"}{"id":10004,"name":"zhang4","age":24,"sex":"male","addr":"shanghai4"}public class SparkSqlByJson_Java {public static void main(String[] args) {System.setProperty("hadoop.home.dir", "C:\\Users\\student\\modules\\hadoop-2.6.0-cdh5.8.5");//create SQLContextSparkConf conf = new SparkConf().setAppName(SparkSqlByJson_Java.class.getName()).setMaster("local");JavaSparkContext sc = new JavaSparkContext(conf);SQLContext sqlCon = new SQLContext(sc);//create DataFrameString path = "C:\\Users\\student\\modules\\datas\\person.json";//sampa6b2le 1DataFrame df = sqlCon.read().json(path);//sample 2//row structList<StructField> fields = new ArrayList<StructField>();fields.add(DataTypes.createStructField("id", DataTypes.IntegerType, true));fields.add(DataTypes.createStructField( "name", DataTypes.StringType, true ));fields.add(DataTypes.createStructField( "age", DataTypes.IntegerType, true ));fields.add(DataTypes.createStructField( "sex", DataTypes.StringType, true ));fields.add(DataTypes.createStructField( "addr", DataTypes.StringType, true ));StructType st = DataTypes.createStructType(fields);df = sqlCon.jsonFile(path,st);// 相当于 select * from person;df.show();System.out.println("===============================================");// //select name from xxx// df.select("name").show();// //select name, age+10 from xxx// df.select(df.col("name"), df.col("age").plus(10)).show();// //select * from xxx where age <=50// df.filter(df.col("age").leq(50)).show();// //select count form xx group by sex// df.groupBy(df.col("sex")).count().show();} }Scala语言: object SparkSqlByJson_Scala {def main(args: Array[String]) {System.setProperty("hadoop.home.dir", "C:\\Users\\student\\modules\\hadoop-2.6.0-cdh5.8.5")val conf = new SparkConf().setAppName("").setMaster("local[*]")val sc = new SparkContext(conf)val sqlCon = new SQLContext(sc)val path:String = "C:\\Users\\student\\modules\\datas\\person.json"//sample 1// val df = sqlCon.read.json(path)//sample 2val st = StructType.apply(Array(new StructField("id", IntegerType, true),new StructField("name", StringType, true),StructField("age", IntegerType, true),StructField("sex", StringType, true),new StructField("addr", StringType, true)))val df = sqlCon.jsonFile(path, st)//------------- select show -------------------------------------------//select *df.show();//select name from xxxdf.select("name").show();//select name, age+10 from xxxdf.select(df.col("name"), df.col("age").plus(10)).show();//select * from xxx where age <=50df.filter(df.col("age").leq(50)).show();//select count form xx group by sexdf.groupBy(df.col("sex")).count().show();} } person类必须 implements Serializable实现序列化并且ToStringSpark集群开启必须在hadoop的namenode,datanode之上Maven的编译将项目导出:pom.xml配置文件:<!-- maven编译项目 --><build><sourceDirectory>src/main/java</sourceDirectory><testSourceDirectory>src/test</testSourceDirectory><plugins><plugin><artifactId>maven-assembly-plugin</artifactId><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs><archive><manifest><mainClass></mainClass></manifest></archive></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin><plugin><groupId>org.codehaus.mojo</groupId><artifactId>exec-maven-plugin</artifactId><version>1.2.1</version><executions><execution><goals><goal>exec</goal></goals></execution></executions><configuration><executable>java</executable><includeProjectDependencies>true</includeProjectDependencies><includePluginDependencies>false</includePluginDependencies><classpathScope>compile</classpathScope><!-- <mainClass>cn.spark.study.App</mainClass> --></configuration></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><configuration><source>1.7</source><target>1.7</target></configuration></plugin></plugins></build>
相关文章推荐
- [android开发]对象的持久化操作 (写入对象到本地文件, 从本地文件中读取对象)
- Spark中加载本地(或者hdfs)文件以及 spark使用SparkContext实例的textFile读取多个文件夹(嵌套)下的多个数据文件
- java14.语言进阶------本地文件操作---文件属性的读取、设置
- [2.1]Spark DataFrame操作(一)之读取并过滤json文件
- Java本地文件操作(三)文件属性的读取
- SparkStreaming python 读取kafka数据将结果输出到单个指定本地文件
- Spark Streaming 读取本地文件压文件
- 踩坑事件:windows操作系统下的eclipse中编写SparkSQL不能从本地读取或者保存parquet文件
- Spark 读取本地日志文件,抽取最高的访问地址,排序,并保存到本地文件
- spark读取本地文件
- php读取本地文件操作函数
- JAVA实现:将文件从本地上传到HDFS上、从HDFS上读取等操作
- 踩坑事件:windows操作系统下的eclipse中编写SparkSQL不能从本地读取或者保存parquet文件
- 关于在Spark集群中读取本地文件抛出找不到文件异常的问题
- spark本地读取写入s3文件
- 关于在Spark集群中读取本地文件抛出找不到文件异常的问题
- java操作数据库—SqlHelper(读取properties文件)
- FileReader、FileWriter 操作,从文件读取出来,在组合读入文件
- Spark集群工作异常,无法读取Hadoop集群文件处理办法
- 【C语言】文件常用读写操作(含读取学生信息示例)