您的位置:首页 > 其它

Spark读取本地文件操作

2017-08-26 16:41 323 查看
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import spark.mapreduce.com.SparkUitl_Java2;

import java.util.List;

/**
* Created by student on 2017/8/23.
*/
public class SparkSqlByText_Java {
public static void main(String[] args) {
final String textInput = "C:\\Users\\student\\modules\\datas\\person.txt";
final String tableName = "person";
System.setProperty("hadoop.home.dir", "C:\\Users\\student\\modules\\hadoop-2.6.0-cdh5.8.5");
SparkConf conf = new SparkConf().setAppName("SparkSqlByText_Java").setMaster(SparkUitl_Java2.master);
JavaSparkContext jsc = new JavaSparkContext(conf);
SQLContext sqlCon = new SQLContext(jsc);

// 读取文件text类型
JavaRDD<String> lines = jsc.textFile(SparkUitl_Java2.textInput);
// 行格式
JavaRDD<Person> persons = lines.map(new Function<String, Person>() {
Person person = null;
@Override
public Person call(String v1) throws Exception {
String[] strs = v1.split(",");
person = new Person();
person.setId(Integer.parseInt(strs[0]));
person.setName(strs[1]);
person.setAge(Integer.parseInt(strs[2]));
person.setSex(strs[3]);
person.setAddr(strs[4]);
return person;
}
});

//创建 DataFrame收集person类信息
DataFrame df = sqlCon.createDataFrame(persons, Person.class);
//register 注册表名
df.registerTempTable(SparkUitl_Java2.tableName);

//table operater
String sql = "select * from "+ SparkUitl_Java2.tableName+"";
DataFrame dfSql = sqlCon.sql(sql);
JavaRDD<Row> rowRDD = dfSql.javaRDD();

//row foreach    将收集的信息已列形式set到person类列里面
JavaRDD<Person> personResult = rowRDD.map(new Function<Row, Person>() {
Person person = null;
@Override
public Person call(Row v1) throws Exception {
//System.out.println(v1.get(0) + ":" + v1.get(1) + ":" + v1.get(2) +":" + v1.get(3) + ":" + v1.get(4));
//将列中得到的数据放到person对象中,spark中以列名排序a-z
person = new Person();
person.setId(v1.getInt(2));
person.setName(v1.getString(3));
person.setAge(v1.getInt(1));
person.setSex(v1.getString(4));
person.setAddr(v1.getString(0));
return person;
}
});
//打印结果
List<Person> list =  personResult.collect();
for(Person val:list){
System.out.println(val);
}
}
}
 spark要导入text的内容:10001,zhang1,21,male,shanghai110002,zhang2,22,male,shanghai210003,zhang3,23,male,shanghai310004,zhang4,24,male,shanghai4相对比较scala语言比较简洁:object SparkSqlByText_Scala {def main(args: Array[String]) {System.setProperty("hadoop.home.dir", "C:\\Users\\student\\modules\\hadoop-2.6.0-cdh5.8.5")val conf = new SparkConf().setAppName("").setMaster(SparkUitl_Java2.master)val sc = new SparkContext(conf)val sqlCon = new SQLContext(sc)//read textval lines = sc.textFile(SparkUitl_Java2.textInput);var person:Person = nullval persons = lines.map(line=>{val items = line.split(",")person = new Personperson.setId(items(0).toInt)person.setName(items(1))person.setAge(items(2).toInt)person.setSex(items(3))person.setAddr(items(4))person})//create dataframeval df = sqlCon.createDataFrame(persons, new Person().getClass)//register tabledf.registerTempTable(SparkUitl_Java2.tableName)//table operaterval sql:String = "select * from "+SparkUitl_Java2.tableName+""val dfSql = sqlCon.sql(sql)//row foreachval personResult = dfSql.map(row => {person = new Personperson.setId(row.getInt(2))person.setName(row.getString(3))person.setAge(row.getInt(1))person.setSex(row.getString(4))person.setAddr(row.getString(0))person})//print resultpersonResult.collect().foreach(p =>{println(p)})
}
}
以下为json格式文件:{"id":10001,"name":"zhang1","age":21,"sex":"male","addr":"shanghai1"}{"id":10002,"name":"zhang2","age":22,"sex":"male","addr":"shanghai2"}{"id":10003,"name":"zhang3","age":23,"sex":"male","addr":"shanghai3"}{"id":10004,"name":"zhang4","age":24,"sex":"male","addr":"shanghai4"}public class SparkSqlByJson_Java {public static void main(String[] args) {System.setProperty("hadoop.home.dir", "C:\\Users\\student\\modules\\hadoop-2.6.0-cdh5.8.5");//create SQLContextSparkConf conf = new SparkConf().setAppName(SparkSqlByJson_Java.class.getName()).setMaster("local");JavaSparkContext sc = new JavaSparkContext(conf);SQLContext sqlCon = new SQLContext(sc);//create DataFrameString path = "C:\\Users\\student\\modules\\datas\\person.json";//sampa6b2le 1DataFrame df = sqlCon.read().json(path);//sample 2//row structList<StructField> fields = new ArrayList<StructField>();fields.add(DataTypes.createStructField("id", DataTypes.IntegerType, true));fields.add(DataTypes.createStructField( "name", DataTypes.StringType, true ));fields.add(DataTypes.createStructField( "age", DataTypes.IntegerType, true ));fields.add(DataTypes.createStructField( "sex", DataTypes.StringType, true ));fields.add(DataTypes.createStructField( "addr", DataTypes.StringType, true ));StructType st = DataTypes.createStructType(fields);df = sqlCon.jsonFile(path,st);// 相当于 select * from person;df.show();System.out.println("===============================================");// //select name from xxx// df.select("name").show();// //select name, age+10 from xxx// df.select(df.col("name"), df.col("age").plus(10)).show();// //select * from xxx where age <=50// df.filter(df.col("age").leq(50)).show();// //select count form xx group by sex// df.groupBy(df.col("sex")).count().show();} }Scala语言:  object SparkSqlByJson_Scala {def main(args: Array[String]) {System.setProperty("hadoop.home.dir", "C:\\Users\\student\\modules\\hadoop-2.6.0-cdh5.8.5")val conf = new SparkConf().setAppName("").setMaster("local[*]")val sc = new SparkContext(conf)val sqlCon = new SQLContext(sc)val path:String = "C:\\Users\\student\\modules\\datas\\person.json"//sample 1// val df = sqlCon.read.json(path)//sample 2val st = StructType.apply(Array(new StructField("id", IntegerType, true),new StructField("name", StringType, true),StructField("age", IntegerType, true),StructField("sex", StringType, true),new StructField("addr", StringType, true)))val df = sqlCon.jsonFile(path, st)//------------- select show -------------------------------------------//select *df.show();//select name from xxxdf.select("name").show();//select name, age+10 from xxxdf.select(df.col("name"), df.col("age").plus(10)).show();//select * from xxx where age <=50df.filter(df.col("age").leq(50)).show();//select count form xx group by sexdf.groupBy(df.col("sex")).count().show();} }   person类必须  implements  Serializable实现序列化并且ToStringSpark集群开启必须在hadoop的namenode,datanode之上Maven的编译将项目导出:pom.xml配置文件:<!-- maven编译项目 --><build><sourceDirectory>src/main/java</sourceDirectory><testSourceDirectory>src/test</testSourceDirectory><plugins><plugin><artifactId>maven-assembly-plugin</artifactId><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs><archive><manifest><mainClass></mainClass></manifest></archive></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin><plugin><groupId>org.codehaus.mojo</groupId><artifactId>exec-maven-plugin</artifactId><version>1.2.1</version><executions><execution><goals><goal>exec</goal></goals></execution></executions><configuration><executable>java</executable><includeProjectDependencies>true</includeProjectDependencies><includePluginDependencies>false</includePluginDependencies><classpathScope>compile</classpathScope><!-- <mainClass>cn.spark.study.App</mainClass> --></configuration></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><configuration><source>1.7</source><target>1.7</target></configuration></plugin></plugins></build>

                                            
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: