您的位置:首页 > 编程语言 > Java开发

Flink使用java实现读取csv文件简单实例

2020-01-13 18:39 351 查看

Flink使用java实现读取csv文件简单实例

首先我们来看官方文档中给出的几种方法:

第一种:
DataSet<Tuple3<Integer, String, Double>> csvInput = env.readCsvFile(“hdfs:///the/CSV/file”)
.types(Integer.class, String.class, Double.class);

第二种:
DataSet<Tuple2<String, Double>> csvInput = env.readCsvFile(“hdfs:///the/CSV/file”)
.includeFields(“10010”)
.types(String.class, Double.class);
第三种:
DataSet> csvInput = env.readCsvFile(“hdfs:///the/CSV/file”)
.pojoType(Person.class, “name”, “age”, “zipcode”);

其中第一种和第二种比较类似,无非就是后面要使用types定义接受的数据类型,对于includeFields() 方法,里面有两种传参方式,一种是includeFields(“10010”),另一种是includeFields(true,false。。。)。表示我们对于这个文件需要第几列,就写上1或者true即可,但是一定要注意一定要对应上即可

这里我们演示第三种,并且扩展一些参数的使用:
第三种是用一个POJO类型来接受数据,所以我们一定先要定义一个POJO的类,这里有很坑爹的地方就是,我们在定义属性之后,一定要加上这三个东西,
1.getter,setter方法
2.construct构造器
3.还有就是一个空的构造器,不带任何参数,这个一定要加上,否则java不识别这是一个POJO
4.把tostring方法也加上
其实我们文件中的内容如下:

name,age,add
fanglei,20,nishi
liujun,29,lalala,
woshi,18,tashi

但是我们假设不需要中间那一列,所以我们定义的POJO只有两个属性!

public class newPerson {
private String name;
private String address;

public newPerson(){}
public newPerson(String name, String address) {
this.name = name;
this.address = address;
}

public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}

public String getAddress() {
return address;
}

public void setAddress(String address) {
this.address = address;
}

@Override
public String toString() {
return "newPerson{" +
"name='" + name + '\'' +
", address='" + address + '\'' +
'}';
}

直接上例子

public class JavaDataSetDataSourceApp {
public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
csvfile(env);
}

public static void csvfile(ExecutionEnvironment env) throws Exception {
String filePath = "C:\\Users\\77051\\Desktop\\hello.csv";
DataSet<newPerson> csvInput = env.readCsvFile(filePath)
.fieldDelimiter(",")
.ignoreFirstLine()
.includeFields(true,false,true)
.pojoType(newPerson.class,"name","address");
csvInput.print();
}
}

fieldDelimiter(",")表示分隔符
ignoreFirstLine()表示是否忽略第一行
includeFields(true,false,true)表示不需要第二列
pojoType(newPerson.class,“name”,“address”);表示数据类型是newPerson,使用后面这两个属性

结果输出

newPerson{name='woshi', address='tashi'}
newPerson{name='fanglei', address='nishi'}
newPerson{name='liujun', address='lalala'}

但是有一个问题就是,当我的POJO中如果定义多了一个属性,用这种方法读取的话,结果也会把所有的age读出来,但是全部赋了0 结果如下:
Person{name=‘fanglei’, age=0, address=‘nishi’}
Person{name=‘liujun’, age=0, address=‘lalala’}
Person{name=‘woshi’, age=0, address=‘tashi’}

所以解决方法其实很简单,重写tostring方法即可,不要输出age就行了!

  • 点赞
  • 收藏
  • 分享
  • 文章举报
我是方小磊 发布了18 篇原创文章 · 获赞 4 · 访问量 196 私信 关注
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: