toc: true
title: Flink 从 0 到 1 学习 —— Flink Data transformation(转换)
date: 2018-11-04
tags:
在第一篇介绍 Flink 的文章 《《从0到1学习Flink》—— Apache Flink 介绍》 中就说过 Flink 程序的结构
Flink 应用程序结构就是如上图所示:
1、Source: 数据源,Flink 在流处理和批处理上的 source 大概有 4 类:基于本地集合的 source、基于文件的 source、基于网络套接字的 source、自定义的 source。自定义的 source 常见的有 Apache kafka、Amazon Kinesis Streams、RabbitMQ、Twitter Streaming API、Apache NiFi 等,当然你也可以定义自己的 source。
2、Transformation:数据转换的各种操作,有 Map / FlatMap / Filter / KeyBy / Reduce / Fold / Aggregations / Window / WindowAll / Union / Window join / Split / Select / Project 等,操作很多,可以将数据转换计算成你想要的数据。
3、Sink:接收器,Flink 将转换计算后的数据发送的地点 ,你可能需要存储下来,Flink 常见的 Sink 大概有如下几类:写入文件、打印出来、写入 socket 、自定义的 sink 。自定义的 sink 常见的有 Apache kafka、RabbitMQ、MySQL、ElasticSearch、Apache Cassandra、Hadoop FileSystem 等,同理你也可以定义自己的 Sink。
在上四篇文章介绍了 Source 和 Sink:
1、《从0到1学习Flink》—— Data Source 介绍
2、《从0到1学习Flink》—— 如何自定义 Data Source ?
3、《从0到1学习Flink》—— Data Sink 介绍
4、《从0到1学习Flink》—— 如何自定义 Data Sink ?
那么这篇文章我们就来看下 Flink Data Transformation 吧,数据转换操作还是蛮多的,需要好好讲讲!
这是最简单的转换之一,其中输入是一个数据流,输出的也是一个数据流:
还是拿上一篇文章的案例来将数据进行 map 转换操作:
SingleOutputStreamOperator<Student> map = student.map(new MapFunction<Student, Student>() {
@Override
public Student map(Student value) throws Exception {
Student s1 = new Student();
s1.id = value.id;
s1.name = value.name;
s1.password = value.password;
s1.age = value.age 5;
return s1;
}
});
map.print();
将每个人的年龄都增加 5 岁,其他不变。
FlatMap 采用一条记录并输出零个,一个或多个记录。
SingleOutputStreamOperator<Student> flatMap = student.flatMap(new FlatMapFunction<Student, Student>() {
@Override
public void flatMap(Student value, Collector<Student> out) throws Exception {
if (value.id % 2 == 0) {
out.collect(value);
}
}
});
flatMap.print();
这里将 id 为偶数的聚集出来。
Filter 函数根据条件判断出结果。
SingleOutputStreamOperator<Student> filter = student.filter(new FilterFunction<Student>() {
@Override
public boolean filter(Student value) throws Exception {
if (value.id > 95) {
return true;
}
return false;
}
});
filter.print();
这里将 id 大于 95 的过滤出来,然后打印出来。
KeyBy 在逻辑上是基于 key 对流进行分区。在内部,它使用 hash 函数对流进行分区。它返回 KeyedDataStream 数据流。
KeyedStream<Student, Integer> keyBy = student.keyBy(new KeySelector<Student, Integer>() {
@Override
public Integer getKey(Student value) throws Exception {
return value.age;
}
});
keyBy.print();
上面对 student 的 age 做 KeyBy 操作分区
Reduce 返回单个的结果值,并且 reduce 操作每处理一个元素总是创建一个新值。常用的方法有 average, sum, min, max, count,使用 reduce 方法都可实现。
SingleOutputStreamOperator<Student> reduce = student.keyBy(new KeySelector<Student, Integer>() {
@Override
public Integer getKey(Student value) throws Exception {
return value.age;
}
}).reduce(new ReduceFunction<Student>() {
@Override
public Student reduce(Student value1, Student value2) throws Exception {
Student student1 = new Student();
student1.name = value1.name value2.name;
student1.id = (value1.id value2.id) / 2;
student1.password = value1.password value2.password;
student1.age = (value1.age value2.age) / 2;
return student1;
}
});
reduce.print();
上面先将数据流进行 keyby 操作,因为执行 reduce 操作只能是 KeyedStream,然后将 student 对象的 age 做了一个求平均值的操作。
Fold 通过将最后一个文件夹流与当前记录组合来推出 KeyedStream。 它会发回数据流。
KeyedStream.fold("1", new FoldFunction<Integer, String>() {
@Override
public String fold(String accumulator, Integer value) throws Exception {
return accumulator "=" value;
}
})
DataStream API 支持各种聚合,例如 min,max,sum 等。 这些函数可以应用于 KeyedStream 以获得 Aggregations 聚合。
KeyedStream.sum(0)
KeyedStream.sum("key")
KeyedStream.min(0)
KeyedStream.min("key")
KeyedStream.max(0)
KeyedStream.max("key")
KeyedStream.minBy(0)
KeyedStream.minBy("key")
KeyedStream.maxBy(0)
KeyedStream.maxBy("key")
max 和 maxBy 之间的区别在于 max 返回流中的最大值,但 maxBy 返回具有最大值的键, min 和 minBy 同理。
Window 函数允许按时间或其他条件对现有 KeyedStream 进行分组。 以下是以 10 秒的时间窗口聚合:
inputStream.keyBy(0).window(Time.seconds(10));
Flink 定义数据片段以便(可能)处理无限数据流。 这些切片称为窗口。 此切片有助于通过应用转换处理数据块。 要对流进行窗口化,我们需要分配一个可以进行分发的键和一个描述要对窗口化流执行哪些转换的函数
要将流切片到窗口,我们可以使用 Flink 自带的窗口分配器。 我们有选项,如 tumbling windows, sliding windows, global 和 session windows。 Flink 还允许您通过扩展 WindowAssginer 类来编写自定义窗口分配器。 这里先预留下篇文章来讲解这些不同的 windows 是如何工作的。
windowAll 函数允许对常规数据流进行分组。 通常,这是非并行数据转换,因为它在非分区数据流上运行。
与常规数据流功能类似,我们也有窗口数据流功能。 唯一的区别是它们处理窗口数据流。 所以窗口缩小就像 Reduce 函数一样,Window fold 就像 Fold 函数一样,并且还有聚合。
inputStream.keyBy(0).windowAll(Time.seconds(10));
Union 函数将两个或多个数据流结合在一起。 这样就可以并行地组合数据流。 如果我们将一个流与自身组合,那么它会输出每个记录两次。
inputStream.union(inputStream1, inputStream2, ...);
我们可以通过一些 key 将同一个 window 的两个数据流 join 起来。
inputStream.join(inputStream1)
.where(0).equalTo(1)
.window(Time.seconds(5))
.apply (new JoinFunction () {
...});
以上示例是在 5 秒的窗口中连接两个流,其中第一个流的第一个属性的连接条件等于另一个流的第二个属性。
此功能根据条件将流拆分为两个或多个流。 当您获得混合流并且您可能希望单独处理每个数据流时,可以使用此方法。
SplitStream<Integer> split = inputStream.split(new OutputSelector<Integer>() {
@Override
public Iterable<String> select(Integer value) {
List<String> output = new ArrayList<String>();
if (value % 2 == 0) {
output.add("even");
}
else {
output.add("odd");
}
return output;
}
});
此功能允许您从拆分流中选择特定流。
SplitStream<Integer> split;
DataStream<Integer> even = split.select("even");
DataStream<Integer> odd = split.select("odd");
DataStream<Integer> all = split.select("even","odd");
Project 函数允许您从事件流中选择属性子集,并仅将所选元素发送到下一个处理流。
DataStream<Tuple4<Integer, Double, String, String>> in = // [...]
DataStream<Tuple2<String, String>> out = in.project(3,2);
上述函数从给定记录中选择属性号 2 和 3。 以下是示例输入和输出记录:
(1,10.0,A,B)=> (B,A)
(2,20.0,C,D)=> (D,C)
本文主要介绍了 Flink Data 的常用转换方式:Map、FlatMap、Filter、KeyBy、Reduce、Fold、Aggregations、Window、WindowAll、Union、Window Join、Split、Select、Project 等。并用了点简单的 demo 介绍了如何使用,具体在项目中该如何将数据流转换成我们想要的格式,还需要根据实际情况对待。
转载请务必注明原创地址为:http://www.54tianzhisheng.cn/2018/11/04/Flink-Data-transformation/
微信公众号:zhisheng
另外我自己整理了些 Flink 的学习资料,目前已经全部放到微信公众号(zhisheng)了,你可以回复关键字:Flink 即可无条件获取到。另外也可以加我微信 你可以加我的微信:yuanblog_tzs,探讨技术!
更多私密资料请加入知识星球!
https://github.com/zhisheng17/flink-learning/
以后这个项目的所有代码都将放在这个仓库里,包含了自己学习 flink 的一些 demo 和博客
1、Flink 从0到1学习 —— Apache Flink 介绍
2、Flink 从0到1学习 —— Mac 上搭建 Flink 1.6.0 环境并构建运行简单程序入门
3、Flink 从0到1学习 —— Flink 配置文件详解
4、Flink 从0到1学习 —— Data Source 介绍
5、Flink 从0到1学习 —— 如何自定义 Data Source ?
6、Flink 从0到1学习 —— Data Sink 介绍
7、Flink 从0到1学习 —— 如何自定义 Data Sink ?
8、Flink 从0到1学习 —— Flink Data transformation(转换)
9、Flink 从0到1学习 —— 介绍 Flink 中的 Stream Windows
10、Flink 从0到1学习 —— Flink 中的几种 Time 详解
11、Flink 从0到1学习 —— Flink 读取 Kafka 数据写入到 ElasticSearch
12、Flink 从0到1学习 —— Flink 项目如何运行?
13、Flink 从0到1学习 —— Flink 读取 Kafka 数据写入到 Kafka
14、Flink 从0到1学习 —— Flink JobManager 高可用性配置
15、Flink 从0到1学习 —— Flink parallelism 和 Slot 介绍
16、Flink 从0到1学习 —— Flink 读取 Kafka 数据批量写入到 MySQL
17、Flink 从0到1学习 —— Flink 读取 Kafka 数据写入到 RabbitMQ
18、Flink 从0到1学习 —— Flink 读取 Kafka 数据写入到 HBase
19、Flink 从0到1学习 —— Flink 读取 Kafka 数据写入到 HDFS
20、Flink 从0到1学习 —— Flink 读取 Kafka 数据写入到 Redis
21、Flink 从0到1学习 —— Flink 读取 Kafka 数据写入到 Cassandra
22、Flink 从0到1学习 —— Flink 读取 Kafka 数据写入到 Flume
23、Flink 从0到1学习 —— Flink 读取 Kafka 数据写入到 InfluxDB
24、Flink 从0到1学习 —— Flink 读取 Kafka 数据写入到 RocketMQ
25、Flink 从0到1学习 —— 你上传的 jar 包藏到哪里去了
26、Flink 从0到1学习 —— 你的 Flink job 日志跑到哪里去了
28、Flink 从0到1学习 —— Flink 中如何管理配置?
29、Flink 从0到1学习—— Flink 不可以连续 Split(分流)?
30、Flink 从0到1学习—— 分享四本 Flink 国外的书和二十多篇 Paper 论文
32、为什么说流处理即未来?
33、OPPO 数据中台之基石:基于 Flink SQL 构建实时数据仓库
36、Apache Flink 结合 Kafka 构建端到端的 Exactly-Once 处理
38、如何基于Flink TensorFlow打造实时智能异常检测平台?只看这一篇就够了
40、Flink 全网最全资源(视频、博客、PPT、入门、实战、源码解析、问答等持续更新)
42、Flink 从0到1学习 —— 如何使用 Side Output 来分流?
4、Flink 源码解析 —— standalone session 模式启动流程
5、Flink 源码解析 —— Standalone Session Cluster 启动流程深度分析之 Job Manager 启动
6、Flink 源码解析 —— Standalone Session Cluster 启动流程深度分析之 Task Manager 启动
7、Flink 源码解析 —— 分析 Batch WordCount 程序的执行过程
8、Flink 源码解析 —— 分析 Streaming WordCount 程序的执行过程
9、Flink 源码解析 —— 如何获取 JobGraph?
10、Flink 源码解析 —— 如何获取 StreamGraph?
11、Flink 源码解析 —— Flink JobManager 有什么作用?
12、Flink 源码解析 —— Flink TaskManager 有什么作用?
13、Flink 源码解析 —— JobManager 处理 SubmitJob 的过程
14、Flink 源码解析 —— TaskManager 处理 SubmitJob 的过程
15、Flink 源码解析 —— 深度解析 Flink Checkpoint 机制
16、Flink 源码解析 —— 深度解析 Flink 序列化机制
17、Flink 源码解析 —— 深度解析 Flink 是如何管理好内存的?
18、Flink Metrics 源码解析 —— Flink-metrics-core
19、Flink Metrics 源码解析 —— Flink-metrics-datadog
20、Flink Metrics 源码解析 —— Flink-metrics-dropwizard
21、Flink Metrics 源码解析 —— Flink-metrics-graphite
22、Flink Metrics 源码解析 —— Flink-metrics-influxdb
23、Flink Metrics 源码解析 —— Flink-metrics-jmx
24、Flink Metrics 源码解析 —— Flink-metrics-slf4j
25、Flink Metrics 源码解析 —— Flink-metrics-statsd
26、Flink Metrics 源码解析 —— Flink-metrics-prometheus
文章目录1. pom.xml添加mybatis等依赖1.1 添加依赖1.2 添加mybatis-generator所需插件2. 创建数据库与设计表3. 通过mybatis-generator快捷创建文件3.1 配置mybatis-generator.xml文件3.2 通过Maven运行mybatis-generator4. 配置application.properties文件和APP启动类4.1 ...
这里编写速度指的是把代码敲入编辑器的速度,而不包括程序构思过程。我现在感觉自己敲代码很慢,10 个字母里面会出现 2 个字母打错。尤其是一些特殊符号,比如 ,我使用的是搜狗,搜狗和英文切换是 shift 键,但有时候极容易弄错所处状态,很少去观察是中文状态还是英文状态,因为切换的太频繁了,比如经常会把 打成 》。中文环境下打字,很快,几乎不会出现按错字母的现象,而英文状态下就经常会出现。
Mybatis-Spring(整合)1方式一:【SqlSessionTemplate】1.1创建项目利用maven搭建普通普通项目1.2创建数据库/* Navicat Premium Data Transfer Source Server : test Source Server Type : MySQL Source Server Version : 50527 Source Host : 127.0.0.1:3306 Source Sch
使用过Oracle的语句块的都熟悉,在不用创建存储过程或函数就可以执行PLSQL数据库脚本,这样做通常可以用来做一些复杂业务数据初始化的功能。那么在PostgreSQL中也有相应的功能,称为PL/pgSQL,具体语法如下:[ <<label>> ][ DECLARE declarations ]BEGIN statementsEND [ label ];语句块由两部分组成:声明部分与主体部分,声明部分是可选的,主体部分是必须的,在主体部分最后的END使用
由于某种原因我将OpenStack的一个计算节点移除了,但移除前并没有删除在其上运行的实例,后来想通过dash删除这些实例,于是N天过去了,我的dash还显示如下内容:很碍眼是不是?于是我打算手动从数据库中删除它们!1.数据库中与删除实例相关的表 数据库中与删除实例相关的表如下:fixed_ips记录给实例分配的fixed ip,floating_ips显然与实例分配的flo...
vue sortable.js element ui 数据更新,页面未更新Vue 强制刷新——$forceUpdate()项目场景:在一个需求中,我需要实现一个拖拽的功能,其中我使用了 sortable.js 去实现,但我发现我拖拽之后的请求后台的数据并没有渲染在页面上。简而言之,举个例子,原先的数组是 [1,2,3],拖拽之后,变成了 [3,1,2],但在视图上并没有显现,数据未渲染问题描述:估计是数据没有渲染上去原因分析:总结:设置key这种,就像路由不重新加载的解决方法一样,也是在ro
一、依赖倒置原则DIP是过程式编程和OO编程的分水岭:“大多数开发人员对设计正交系统的必要性都很熟悉。只不过他们可能会使用其他一些词来描述这个过程,例如模块化、基于组件和分层,等等。系统应该由一组相互协作的模块构成,每个模块实现的功能应独立于其他模块。有时这些模块组件被组织到不同的层次上,每一层都做了一级抽象。这种分层的实现是设计正交系统的有力途径。因为每一层只使用它下面一层提供的抽象,所以可以在...
在PyCharm的代码编辑区域,有一条竖线(如下图所示),这个竖线是提醒代码宽度最好不要超过此线,该线默认宽度是120,如果想宽一点,是可以设置的调整的位置是:File—— settings —— Code Style —— Hard wrap at xxx colums 数字改变应用后,上图的竖线也相应变化了...
对于第一次想要做APP的客户来说,如何去做APP是一件头疼的事情,我想做APP,但是又不知道APP开发是个怎样的事情,下面就让创息软件的小编为您细细道来:在不就的将来,APP的生态链到底是Web App(所谓的套壳App)主掌大权还是Native App(所谓的原生态App)引领大军?而作为一个App的设计者,我们到底是应该努力把客户端的体验做到最好,还是应该在网页应用层面上做更多的设计?我想,这...
R-CNN传统的目标检测方法一般分为四个阶段:(1)图像预处理,(2)目标区域选择,(3)特征提取,(4)分类器分类。其中,目标区域选择通常采用的方法是 利用不同尺寸大小的滑动窗口对图片进行遍历,这导致了一个问题,就是时空复杂度很高,计算量大。此外,在特征提取阶段,需要人为选取特征,鲁棒性差。针对这些不足,2014年Girshick R等提出了首个用于图像目标检测的深度学习模型 R-CNN...
下载并打开工程项目:文档的上传。运行:复制随便一篇文档,粘贴进去。通过粘贴后,文档以及图片被粘贴进来了,看看html代码:图片全部使用img标签统一。传输进度条的效果也不错。文档图片被放置在哪了:存放地址:D:\apache-tomcat-6.0.29\webapps\WordPaster2UEditor1x\upload\2019\04\16...
企业实践总结:在企业内部推行团队管理首先要解决观念上的问题,将传统组织架构、岗位体系与虚拟团队并列起来作为企业组织人力资源的方式和路径,从观念层面打破组织架构带来的固化团队思想,这是团队管理的基础。虚拟团队往往承担短期重要的任务,因此提高团队建设要求的响应速度,快速组建应需团队投入工作是团队管理的重要目标,也是首要步骤。项目组激励最本质的问题是可否公平衡量,这与组织内不同职能价值贡献对比是一样的难...