共计 3984 个字符,预计需要花费 10 分钟才能阅读完成。
What Apache Flink
Apache Flink 是一个 == 分布式大数据处理引擎 ==,可对 == 有限数据流和无限数据流 == 进行 == 有状态计算 ==。可部署在 == 各种集群环境 ==,对各种大小的数据规模进行快速计算。
分布式大数据处理引擎
-
是一个分布式的、高可用的用于大数据处理的计算引擎
有限流和无限流
- 有限流:有始有终的数据流。即传统意义上的批数据,进行批处理
-
无限流:有始无终的数据流。即现实生活中的流数据,进行流处理
有状态计算
-
良好的状态机制,进行较好的容错处理和任务恢复。同时实现 Exactly-Once 语义。
各种集群环境
-
可部署 standalone、Flink on yarn、Flink on Mesos、Flink on k8s 等等
Flink Application
Streams
数据在真实世界中是不停产生不停发出的,所以数据处理也应该还原真实,做到真正的流处理。而批处理则是流处理的特殊情况
- 即上面说的有限流和无限流,贴官网图说明。
State
在流计算场景中,其实所有流计算本质上都是增量计算(Incremental Processing)。
例如,计算前几个小时或者一直以来的某个指标(PV、UV 等),计算完一条数据之后需要保存其计算结果即状态,以便和下一条计算结果合并。
另外,保留计算状态,进行 CheckPoint 可以很好地实现流计算的容错和任务恢复,也可以实现 Exactly Once 处理语义
Time
三类时间:
- Event Time:事件真实产生的时间
- Processing Time:事件被 Flink 程序处理的时间
- Ingestion Time:事件进入到 Flink 程序的时间
API
API 分三层,越接近 SQL 层,越抽象,灵活性越低,但更简单易用。
- SQL/Table 层:直接使用 SQL 进行数据处理
- DataStream/DataSet API:最核心的 API,对流数据进行处理,可在其上实现自定义的 WaterMark、Windows、State 等操作
- ProcessFunction:也叫 RunTime 层,最底层的 API,带状态的事件驱动。
Flink Architecture
Data Pipeline Applications
即 real-time Stream ETL:流式 ETL 拆分。
通常,ETL 都是通过定时任务调度 SQL 文件或者 MR 任务来执行的。在实时 ETL 场景中,将批量 ETL 逻辑写到流处理中,分散计算压力和提高计算结果的实时性。
多用于实时数仓、实时搜索引擎等
Data Analytics Applications
即数据分析,包括流式数据分析和批量数据分析。例如实时报表、实时大屏。
Event-driven Applications
即事件驱动应用,在一个有状态的计算过程中,通常情况下都是将状态保存在第三方系统(如 Hbase Redis 等)中。
而在 Flink 中,状态是保存在内部程序中,减少了状态存取的不必要的 I / O 开销,更大吞吐量和更低延时。
第一个 Flink 程序
开发环境要求
主要是 Java 环境和 Maven 环境。Java 要求 JDK1.8,Maven 要求 3.0 以上,开发工具推荐使用 ItelliJ IDEA,社区说法:Eclipse 在 Java 和 Scala 混合编程下有问题,故不推荐。
代码示例:
package source.streamDataSource; | |
import org.apache.flink.api.common.functions.FlatMapFunction; | |
import org.apache.flink.api.java.tuple.Tuple2; | |
import org.apache.flink.streaming.api.TimeCharacteristic; | |
import org.apache.flink.streaming.api.datastream.DataStream; | |
import org.apache.flink.streaming.api.datastream.DataStreamSource; | |
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; | |
import org.apache.flink.streaming.api.windowing.time.Time; | |
import org.apache.flink.util.Collector; | |
public class SocketWindowWordCount {public static void main(String[] args) throws Exception{if(args.length!=2){System.err.println("Usage:\nSocketWindowWordCount hostname port"); | |
} | |
// 获取程序参数 | |
String hostname = args[0]; | |
int port = Integer.parseInt(args[1]); | |
// 入口类,用于设置环境和参数等 | |
StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment(); | |
// 设置 Time 类型 | |
see.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); | |
// 从指定 IP 端口 读取流数据,返回一个 DataStreamSource | |
DataStreamSource<String> text = see.socketTextStream(hostname, port, "\n", 5); | |
// 在 DataStreamSource 上做操作即 transformation | |
DataStream<Tuple2<String, Integer>> windowCount = text | |
// flatMap , FlatMap 接口的实现:将获取到的数据分割,并每个元素组合成 (word, count) 形式 | |
.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() { | |
public void flatMap(String value, Collector<Tuple2<String, Integer>> collector) throws Exception {for (String word : value.split("\\s")) {collector.collect(Tuple2.of(word, 1)); | |
} | |
} | |
}) | |
// 按位置指定 key,进行聚合操作 | |
.keyBy(0) | |
// 指定窗口大小 | |
.timeWindow(Time.seconds(5)) | |
// 在每个 key 上做 sum | |
// reduce 和 sum 的实现 | |
// .reduce(new ReduceFunction<Tuple2<String, Integer>>() { | |
// @Override | |
// public Tuple2<String, Integer> reduce(Tuple2<String, Integer> stringIntegerTuple2, Tuple2<String, Integer> t1) throws Exception {// return Tuple2.of(stringIntegerTuple2.f0, stringIntegerTuple2.f1+t1.f1); | |
// } | |
// }); | |
.sum(1); | |
// 一个线程执行 | |
windowCount.print().setParallelism(1); | |
see.execute("Socket Window WordCount"); | |
// 其他 transformation 操作示例 | |
// windowCount | |
// .map(new MapFunction<Tuple2<String,Integer>, String>() { | |
// @Override | |
// public String map(Tuple2<String, Integer> stringIntegerTuple2) throws Exception { | |
// return stringIntegerTuple2.f0; | |
// } | |
// }) | |
// .print(); | |
// | |
// text.filter(new FilterFunction<String>() { | |
// @Override | |
// public boolean filter(String s) throws Exception {// return s.contains("h"); | |
// } | |
// }) | |
// .print(); | |
// | |
// SplitStream<String> split = text.split(new OutputSelector<String>() { | |
// @Override | |
// public Iterable<String> select(String value) {// ArrayList<String> strings = new ArrayList<>(); | |
// if (value.contains("h")) | |
// strings.add("Hadoop"); | |
// else | |
// strings.add("noHadoop"); | |
// return strings; | |
// | |
// } | |
// }); | |
// | |
// split.select("hadoop").print(); | |
// split.select("noHadoop").map(new MapFunction<String, String>() { | |
// @Override | |
// public String map(String s) throws Exception { | |
// | |
// return s.toUpperCase(); | |
// } | |
// }).print();} | |
} |
:
