您的位置:首页 > 运维架构 > Apache

【云星数据---Apache Flink实战系列(精品版)】:Apache Flink批处理API详解与编程实战025--DateSet实用API详解025

2017-11-18 16:28 981 查看

一、Flin
4000
k DateSet定制API详解(JAVA版) -002

flatMap

以element为粒度,对element进行1:n的转化。


执行程序:

package code.book.batch.dataset.advance.api;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.util.Collector;

import java.util.List;
public class FlatMapFunction001java {
public static void main(String[] args) throws Exception {
// 1.设置运行环境,准备运行的数据
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<String> text = env.fromElements("flink vs spark", "buffer vs  shuffle");

// 2.以element为粒度,将element进行map操作,转化为大写并添加后缀字符串"--##bigdata##"
DataSet<String> text2 = text.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String s, Collector<String> collector) throws Exception {
collector.collect(s.toUpperCase() + "--##bigdata##");
}
});
text2.print();

// 3.以element为粒度,将element进行map操作,转化为大写并添加后缀字符串"--##bigdata##"
DataSet<String[]> text3 = text.flatMap(new FlatMapFunction<String, String[]>() {
@Override
public void flatMap(String s, Collector<String[]> collector) throws Exception {
collector.collect(s.toUpperCase().split("\\s+"));
}
});
final List<String[]> collect = text3.collect();
//显示结果,使用Lambda表达式的写法
collect.forEach(arr -> {
for (String token : arr) {
System.out.println(token);
}
});
//显示结果,不使用Lambda表达式的写法
for (String[] arr : collect) {
for (String token : arr) {
System.out.println(token);
}
}
}
}


执行结果:

text2.print();
FLINK VS SPARK--##bigdata##
BUFFER VS  SHUFFLE--##bigdata##

collect.forEach(arr -> {
for (String token : arr) {System.out.println(token);}});
FLINK
VS
SPARK
BUFFER
VS
SHUFFLE


filter

以element为粒度,对element进行过滤操作。将满足过滤条件的element组成新的DataSet


执行程序:

package code.book.batch.dataset.advance.api;

import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;

public class FilterFunction001java {
public static void main(String[] args) throws Exception {
// 1.设置运行环境,准备运行的数据
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Integer> text = env.fromElements(2, 4, 7, 8, 9, 6);

//2.对DataSet的元素进行过滤,筛选出偶数元素
DataSet<Integer> text2 =text.filter(new FilterFunction<Integer>() {
@Override
public boolean filter(Integer e) throws Exception {
return e%2==0;
}
});
text2.print();

//3.对DataSet的元素进行过滤,筛选出大于5的元素
DataSet<Integer> text3 =text.filter(new FilterFunction<Integer>() {
@Override
public boolean filter(Integer e) throws Exception {
return e>5;
}
});
text3.print();
}
}


执行结果:

text2.print()
2
4
8
6

text3.print()
7
8
9
6
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐