Flink最全面教程(自己总结的)_flink教程-程序员宅基地

技术标签: flink  学习  kafka  大数据  

DataSet/Stream API

1.1 Environment

1.1.1 getExecutionEnvironment

创建一个执行环境,表示当前执行程序的上下文。 如果程序是独立调用的,则此方法返回本地执行环境;如果从命令行客户端调用程序以提交到集群,则此方法返回此集群的执行环境,也就是说,getExecutionEnvironment 会根据查询运行的方式决定返回什么样的运行环境,是最常用的一种创建执行环境的方式。

// 初始化环境
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
val env = StreamExecutionEnvironment.getExecutionEnvironment

如果没有设置并行度,会以 flink-conf.yaml 中的配置为准,默认是 1

1.2 Source

1.2.1 基于本地集合的source

在一个本地内存中,生成一个集合作为Flink处理的source。
离线处理代码如下:

import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.api.scala._

object ListSource {
    
  def main(args: Array[String]): Unit = {
    
    val env = ExecutionEnvironment.getExecutionEnvironment
    val listDataSet: DataSet[String] = env.fromCollection(List("hadoop spark","hive hbase"))
    listDataSet.print()
  }
}

实时处理代码如下:

import org.apache.flink.api.scala.{
    ExecutionEnvironment, _}
import org.apache.flink.streaming.api.scala.{
    DataStream, StreamExecutionEnvironment}

object ListSourceStream {
    
  def main(args: Array[String]): Unit = {
    
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val listDataStream: DataStream[String] = env.fromCollection(List("hadoop spark","hive hbase"))
    listDataStream.print()
    env.execute("ListSourceStream is runned")
  }
}

1.2.2 基于本地文件的source

导入本地文本数据作为数据源。
离线处理代码如下:

import org.apache.flink.api.scala.{
    DataSet, ExecutionEnvironment}

object FileSource {
    
  def main(args: Array[String]): Unit = {
    
    val env = ExecutionEnvironment.getExecutionEnvironment
    val fileDataSet = env.readTextFile("C:\\Users\\thinkpad\\Desktop\\words.txt")
fileDataSet.print()
  }
}

实时处理代码如下:

import org.apache.flink.streaming.api.scala.{
    DataStream, StreamExecutionEnvironment}

object FileSourceStream {
    
  def main(args: Array[String]): Unit = {
    
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val fileDataStream: DataStream[String] = env.readTextFile("C:\\Users\\thinkpad\\Desktop\\words.txt")
    fileDataStream.print()
    env.execute("FileSourceStream is runned")
  }
}

1.2.3 基于HDFS的source

读取hdfs文件,作为数据源。
离线处理代码如下:

import org.apache.flink.api.scala.{
    DataSet, ExecutionEnvironment}

object hdfsSource {
    
  def main(args: Array[String]): Unit = {
    
    val env = ExecutionEnvironment.getExecutionEnvironment
    val hdfsDataSet: DataSet[String] = env.readTextFile("hdfs://linux01:9000/a.txt")
    hdfsDataSet.print()
  }
}

实时处理代码如下:

import org.apache.flink.streaming.api.scala.{
    DataStream, StreamExecutionEnvironment}
object hdfsSourceStream {
    
  def main(args: Array[String]): Unit = {
    
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val hdfsDataStream: DataStream[String] = env.readTextFile("hdfs://linux01:9000/a.txt")
    hdfsDataStream.print()
    env.execute("hdfsSourceStream is runned")
  }
}

1.2.4 基于 kafka 消息队列的source

处理代码如下:

import java.util.Properties
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala.{
    DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010
import org.apache.flink.api.scala._

object kafkaSourceStream {
    
  def main(args: Array[String]): Unit = {
    
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val props = new Properties()
    props.setProperty("bootstrap.servers", "linux01:9092,linux02:9092,linux03:9092")
    props.setProperty("group.id", "consumer-group")
    props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props.setProperty("auto.offset.reset", "latest")
//SimpleStringSchema反序列化工具
    val kafkaDataStream: DataStream[String] = 
env.addSource(new FlinkKafkaConsumer010[String]("test",new SimpleStringSchema(),props))
    kafkaDataStream.print()
    env.execute(“kafkaSourceStream is runned”)
  }
}

1.2.5 自定义 Source作为数据源

除了以上的source数据来源,我们还可以自定义source,只是继承SourceFunction即可。
自定义source代码如下:

import org.apache.flink.streaming.api.functions.source.SourceFunction

class MySource extends SourceFunction[String] {
    
  //定义标志位用来标记是否正常运行
  var running = true

  override def cancel(): Unit = {
    
    running = false
  }

  override def run(sourceContext: SourceFunction.SourceContext[String]): Unit = {
    
    val data: Range.Inclusive = 1.to(10)
    while (running) {
    
      data.foreach(t => {
    
        sourceContext.collect(t.toString)
      })
    }
  }
}

调用自定义source代码如下:

import org.apache.flink.streaming.api.scala.{
    DataStream, StreamExecutionEnvironment}
import org.apache.flink.api.scala._

object DefineSource {
    
  def main(args: Array[String]): Unit = {
    
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val defineSource: DataStream[String] = env.addSource(new MySource())
    defineSource.print()
    env.execute("DefineSource is runned")
  }
}

1.3 Sink

sink 也就是Flink运行完后,最终要将数据输出到哪儿。

1.3.1基于本地内存集合的sink

将数据最终输出到内存中的集合中。
示例代码如下:

import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.api.scala._

object listSink {
    
  def main(args: Array[String]): Unit = {
    
    val env = ExecutionEnvironment.getExecutionEnvironment
    val listDataSet: DataSet[String] = env.fromCollection(List("hadoop","spark","hive"))
    val list: Seq[String] = listDataSet.collect()
    list.foreach(println(_))
  }
}

1.3.2基于本地文件的sink
将结果输出到本地文件系统中。
示例代码如下:

import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.api.scala._

object fileSink {
    
  def main(args: Array[String]): Unit = {
    
    val env = ExecutionEnvironment.getExecutionEnvironment
    val fileDataSet: DataSet[String] = env.fromCollection(List("hadoop","spark"))
    fileDataSet.writeAsText("C:\\Users\\thinkpad\\Desktop\\print.txt")
    env.execute("fileSink is runned")
  }
}

1.3.3基于HDFS文件系统的sink

将结果输出到hdfs文件系统中。
示例代码如下:

import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.api.scala._

object hdfsSink {
    
  def main(args: Array[String]): Unit = {
    
    val env = ExecutionEnvironment.getExecutionEnvironment
    val hdfsDataSet: DataSet[String] = env.fromCollection(List("hadoop","spark"))
    hdfsDataSet.writeAsText("hdfs://linux01:9000/hdfsSink")
    env.execute("hdfsSink is runned")
  }
}

1.3.4基于Kafka消息队列的sink

将结果输出到kafka文件系统中,用flink作为kafka的生产者。
示例代码如下:

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.{
    DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010

object kafkaSink {
    
  def main(args: Array[String]): Unit = {
    
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val props = new Properties()
    props.setProperty("bootstrap.servers", "node01:9092,node02:9092,node03:9092")
    props.setProperty("group.id", "consumer-group")
    props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    props.setProperty("auto.offset.reset", "latest")
    val listDataStream: DataStream[String] = env.fromCollection(List("hadoop","spark"))
    listDataStream.addSink(new FlinkKafkaProducer010[String]("linux01:9092,linux02:9092,linux03:9092","test",new SimpleStringSchema()))
    env.execute("kafkaSink is runned")
  }
}

1.3.5基于JDBC自定义sink

将计算结果存储到关系数据库中,如mysql等。
导入依赖:

 <dependency>
     <groupId>mysql</groupId>
     <artifactId>mysql-connector-java</artifactId>
     <version>5.1.47</version>
</dependency>

实现MyJdbcSink类,继承RichSinkFunction,用来是实现保存到mysql中调用的命令。

import java.sql
import java.sql.DriverManager
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.{
    RichSinkFunction, SinkFunction}
//为什么继承的是富函数
class MyJdbcSink extends RichSinkFunction[String] {
    
  //定义连接参数成员属性
  var conn: Connection = _
  var prepare: PreparedStatement = _

  //打开连接
  override def open(parameters: Configuration): Unit = {
    
    conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/bigdata",
      "root", "root")
    prepare= conn.prepareStatement("INSERT INTO infoTest VALUES (?, ?)")
  }

  //执行sql语句
  override def invoke(value: String, context: SinkFunction.Context[_]): Unit = {
    
    prepare.setString(1,value)
    prepare.setString(2,value)
    prepare.execute()
  }

  //关闭资源
  override def close(): Unit = {
    
    prepare.close()
    conn.close()
  }
}

将结果写入mysql,调用自定义mysql类,代码如下:

import org.apache.flink.streaming.api.scala.{
    DataStream, StreamExecutionEnvironment}
import org.apache.flink.api.scala._

object mysqlSInk {
    
  def main(args: Array[String]): Unit = {
    
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val listDataSream: DataStream[String] = env.fromCollection(List("hadoop","spark"))
    listDataSream.addSink(new MyJdbcSink())
    env.execute("mysqlSInk is runned")
  }
}

1.3.5基于Redis非关系型数据库的sink

将计算结果存储到redis非关系数据库中。
导入flink-redis依赖:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-redis_2.11</artifactId>
    <version>1.1.0</version>
</dependency>

定义一个redis的mapper类,继承RedisMapper类,用于定义保存到 redis时调用的命令,代码如下:

import org.apache.flink.streaming.connectors.redis.common.mapper.{
    RedisCommand, RedisCommandDescription, RedisMapper}

class MyRedisMapper extends RedisMapper[String]{
    
//定义保存到redis中的命令
  override def getCommandDescription: RedisCommandDescription = {
    
    new RedisCommandDescription(RedisCommand.HSET,"redis")
  }

  override def getKeyFromData(t: String): String = {
    
    t.hashCode.toString
  }

  override def getValueFromData(t: String): String = {
    
    t
  }
}

将结果输入到redis代码如下:

import org.apache.flink.streaming.api.scala.{
    DataStream, StreamExecutionEnvironment}
import org.apache.flink.api.scala._
import org.apache.flink.streaming.connectors.redis.RedisSink
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig

object RedisSink {
    
  def main(args: Array[String]): Unit = {
    
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val listDataStream: DataStream[String] = env.fromCollection(List("hadoop","spark"))
    val conf = new FlinkJedisPoolConfig.Builder().setHost("localhost").setPort(6379).setDatabase(0).build()
    listDataStream.addSink(new RedisSink[String](conf,new MyRedisMapper))
    env.execute("RedisSink is runned")
  }
}

1.4 Transform

在flink中有类似于spark的一类转换算子,就是transform,在Flink的编程体系中,我们获取到数据源之后,需要经过一系列的处理即transformation操作,再将最终结果输出到目的Sink使数据落地。

常用的transform转换算子如下:

Transformation 说明
map 将DataSet中的每一个元素转换为另外一个元素
flatMap 将DataSet中的每一个元素转换为0…n个元素
mapPartition 将一个分区中的元素转换为另一个元素
filter 过滤出来一些符合条件的元素
reduce 可以对一个dataset或者一个group来进行聚合计算,最终聚合成一个元素
reduceGroup 将一个dataset或者一个group聚合成一个或多个元素
aggregate 按照内置的方式来进行聚合。例如:SUM/MIN/MAX…
distinct 去重
join 将两个DataSet按照一定条件连接到一起,形成新的DataSet
union 将两个DataSet取并集,并自动进行去重
KeyBy 逻辑地将一个流拆分成不相交的分区,每个分区包含具有相同 key 的元素,在内部以 hash 的形式实现的
Split 根据某些特征把一个 DataStream 拆分成两个或者多个
Select 从一个 SplitStream 中获取一个或者多个 DataStream
Connect 连接两个保持他们类型的数据流,两个数据流被 Connect 之后,只是被放在了一个同一个流中,内部依然保持各自的数据和形式不发生任何变化,两个流相互独立。
CoMap,CoFlatMap 跟map and flatMap类似,只不过作用在ConnectedStreams上
rebalance 让每个分区的数据均匀分布,避免数据倾斜
partitionByHash 按照指定的key进行hash分区
sortPartition 指定字段对分区中的数据进行排序

1.4.1 map

将DataSet中的每一个元素转换为另外一种形式的元素
示例代码如下:

import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.api.scala._

object Transform {
    
  def main(args: Array[String]): Unit = {
    
    val env = ExecutionEnvironment.getExecutionEnvironment
    val listDataSet: DataSet[Int] = env.fromCollection(List(1,2,3))
    val result: DataSet[Int] = listDataSet.map(_*2)
    result.print()
  }
}

1.4.2 flatMap

flatMap也是一种类似于遍历循环,是将每一个元素按照特定的标识切分,变成多个元素。
如将集合中每个元素按照空格切分。

示例代码如下:

import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.api.scala._

object Transform {
    
  def main(args: Array[String]): Unit = {
    
    val env = ExecutionEnvironment.getExecutionEnvironment
    val listDataSet: DataSet[String] = env.fromCollection(List("hadoop spark","hive mysql","hbase kafka"))
    val result: DataSet[String] = listDataSet.flatMap(_.split(" "))
    result.print()
  }
}

1.4.3 mapPartition

mapPartition:中的函数是在每个分区运行一次

map :每个元素运行一次

mapPartition是按照分区进行处理数据,传入是一个迭代,是将分区中的元素进行转换,map 和 mapPartition 的效果是一样的,但如果在map的函数中,需要访问一些外部存储。例如:
访问mysql数据库,需要打开连接,此时map效率较低。而使用 mapPartition 可以有效减少连接数,提高效率。

示例代码如下:

import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.api.scala._

object Transform {
    
  def main(args: Array[String]): Unit = {
    
    val env = ExecutionEnvironment.getExecutionEnvironment
    val listDataSet: DataSet[String] = env.fromCollection(List("hadoop spark","hive mysql","hbase kafka"))
    val result: DataSet[String] = listDataSet.mapPartition(iter => {
    
      iter.flatMap(_.split(" "))
    })
    result.print()
  }
}

1.4.4 Filter

filter是遍历循环dataset中每一个元素,filter中满足表达式的过滤出来,不满足表达式的过滤掉。

示例代码如下:

import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.api.scala._

object Transform {
    
  def main(args: Array[String]): Unit = {
    
    val env = ExecutionEnvironment.getExecutionEnvironment
    val listDataSet: DataSet[String] = env.fromCollection(List("hadoop spark","hive","hbase kafka"))
    val result: DataSet[String] = listDataSet.filter(_.length>=5)
    result.print()
  }
}

1.4.5 reduce

reduce是对一个 dataset 或者一个 group 来进行聚合计算,按照表达逻辑最终聚合成一个元素。

示例代码如下:

import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.api.scala._

object Transform {
    
  def main(args: Array[String]): Unit = {
    
    val env = ExecutionEnvironment.getExecutionEnvironment
    val listDataSet: DataSet[Int] = env.fromCollection(List(1,2,3,4))
    val result = listDataSet.reduce(_+_)
    result.print()
  }
}

Window操作

2.1 Window概述

streaming 流式计算是一种被设计用于处理无限数据集的数据处理引擎,而无限数据集是指一种不断增长的本质上无限的数据集,而 window 是一种切割无限数据为有限块进行处理的手段。
Window 是无限数据流处理的核心,Window 将一个无限的 stream 拆分成有

2.2 Window类型

Window 可以分成两类:CountWindow:按照指定的数据条数生成一个 Window,与时间无关;TimeWindow:按照时间生成 Window。

2.2.1 CountWindow

CountWindow 根据窗口中相同 key 元素的数量来触发执行,执行时只计算元素数量达到窗口大小的 key 对应的结果。

注意:CountWindow 的 window_size 指的是相同 Key 的元素的个数,不是输入的所有元素的总数。

(1) 滚动窗口

默认的 CountWindow 是一个滚动窗口,只需要指定窗口大小即可,当相同key元素数量达到窗口大小时,就会触发窗口的执行。

object Windows {
    
    def main(args: Array[String]): Unit = {
    
        val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
        val file: DataStream[String] = env.socketTextStream("node01",9999)
        val countStream: DataStream[(String, Int)] = file.flatMap(_.split(" "))
            .map((_, 1))
            .keyBy(0)
            .countWindow(2)
            .sum(1)
        countStream.print()
        env.execute("Windows is runned")
    }
}
(2) 滑动窗口

滑动窗口和滚动窗口的函数名是完全一致的,只是在传参数时需要传入两个参数,一个是 window_size,一个是 sliding_size。

下面代码中的 sliding_size 设置为了 2,也就是说,每收到两个相同 key 的数据就计算一次,每一次计算的 window 范围是 5 个元素。

object Windows {
    
    def main(args: Array[String]): Unit = {
    
        val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
        val file: DataStream[String] = env.socketTextStream("node01",9999)
        val countStream: DataStream[(String, Int)] = file.flatMap(_.split(" "))
            .map((_, 1))
            .keyBy(0)
            .countWindow(5,2)
            .sum(1)
        countStream.print()
        env.execute("Windows is runned")
    }
}

2.2.2 TimeWindow

对于 TimeWindow,可以根据窗口实现原理的不同分成三类:

  • 滚动窗口(Tumbling Window)

将数据依据固定的窗口长度对数据进行切片。
特点:时间对齐,窗口长度固定,没有重叠。所有的数据只能落在一个窗口里面
滚动窗口分配器将每个元素分配到一个指定窗口大小的窗口中,滚动窗口有一个固定的大小,并且不会出现重叠。例如:如果你指定了一个 5 分钟大小的滚动窗口

适用场景: 适合做 BI 统计等(做每个时间段的聚合计算)。

  • 滑动窗口(Sliding Window)

滑动窗口是固定窗口的更广义的一种形式,滑动窗口由固定的窗口长度和滑动间隔组成。一次数据统计的时间长度 每次统计移动多长的时间
特点:时间对齐,窗口长度固定,可以有重叠。一个数据可以被统计多次,滑动间隔、窗口长度是某个数值的整数倍
滑动窗口分配器将元素分配到固定长度的窗口中,与滚动窗口类似,窗口的大小由窗口大小参数来配置,另一个窗口滑动参数控制滑动窗口开始的频率。因此,滑动窗口如果滑动参数小于窗口大小的话,窗口是可以重叠的,在这种情况下元素会被分配到多个窗口中。
例如,你有 10 分钟的窗口和 5 分钟的滑动,那么每个窗口中 5 分钟的窗口里包含着上个 10 分钟产生的数据

  • 会话窗口(Session Window)

电商网站: 登录一个系统之后,多长时间没有操作,session就失效。

手机银行: 登录一个系统之后,多长时间没有操作,session就失效要求重新登录。

由一系列事件组合一个指定时间长度的 timeout 间隙组成,类似于web应用的session,也就是一段时间没有接收到新数据就会生成新的窗口。

特点: 时间无对齐。多长时间之内没有收到数据,这个不是人为能规定的。

session 窗口分配器通过 session 活动来对元素进行分组,session 窗口跟滚动窗口和滑动窗口相比,不会有重叠和固定的开始时间和结束时间的情况,相反,当它在一个固定的时间周期内不再收到元素,即非活动间隔产生,那这个窗口就会关闭。一个 session 窗口通过一个 session 间隔来配置,这个 session 间隔定义了非活跃周期的长度,当这个非活跃周期产生,那么当前的 session 将关闭并且后续的元素将被分配到新的 session 窗口中去

2.2 Window Function

window function 定义了要对窗口中收集的数据做的计算操作,主要可以分为两类:

2.2.1 增量聚合函数(incremental aggregation functions)

每条数据到来就进行计算,保持一个简单的状态。典型的增量聚合函数有ReduceFunction, AggregateFunction。

2.2.2 全窗口函数(full window functions)

先把窗口所有数据收集起来,等到计算的时候会遍历所有数据。ProcessWindowFunction 就是一个全窗口函数。

2.3 Window 其他操作

2.3.1 trigger()

触发器 定义 window 什么时候关闭,触发计算并输出结果

2.3.2 evitor()

移除器 定义移除某些数据的逻辑

2.3.3 allowedLateness()

允许处理迟到的数据

2.3.4 sideOutputLateData()

将迟到的数据放入侧输出流

2.3.5 getSideOutput()

获取侧输出流

Table&SQL

3.1 概述

Table API是流处理和批处理通用的关系型 API,Table API 可以基于流输入或者批输入来运行而不需要进行任何修改。Table API 是 SQL 语言的超集并专门为 Apache Flink 设计的,Table API 是 Scala 和 Java 语言集成式的 API。与常规 SQL 语言中将查询指定为字符串不同,Table API 查询是以 Java 或 Scala 中的语言嵌入样式来定义的,具有 IDE 支持如:自动完成和语法检测;允许以非常直观的方式组合关系运算符的查询,例如 select,filter 和 join。Flink SQL 的支持是基于实现了SQL标准的 Apache Calcite。无论输入是批输入(DataSet)还是流输入(DataStream),任一接口中指定的查询都具有相同的语义并指定相同的结果。

3.2 Table API

3.2.1 依赖

<!-- flink-table&sql -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table</artifactId>
    <version>1.9.1</version>
    <type>pom</type>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-planner_2.11</artifactId>
    <version>1.9.1</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-api-java-bridge_2.11</artifactId>
    <version>1.9.1</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-api-scala-bridge_2.11</artifactId>
    <version>1.9.1</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-api-scala-bridge_2.11</artifactId>
    <version>1.9.1</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-common</artifactId>
    <version>1.9.1</version>
</dependency>

3.2.2 TableEnvironment

TableEnvironment 是 Table API 和SQL集成的核心概念,它负责:

  • 在内部目录中注册表
  • 注册外部目录
  • 执行SQL查询
  • 注册用户定义的函数
  • DataStream 或 DataSet 转换为 Table
  • 持有 ExecutionEnvironment 或 StreamExecutionEnvironment 的引用
    Table总是与特定的TableEnvironment 绑定。不能在同一查询中组合不同 TableEnvironments 的表(例如,union 或 join)。

创建 TableEnvironment:

// 基于流的tableEnv
val sEnv = StreamExecutionEnvironment.getExecutionEnvironment
// create a TableEnvironment for streaming queries
val sTableEnv = StreamTableEnvironment.create(sEnv)
// 基于批的bTableEnv
val bEnv: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
val bTableEnv: BatchTableEnvironment = BatchTableEnvironment.create(bEnv)

3.2.3 数据加载

数据加载通常有两种:一者基于流/批,一者基于TableSource,但是后者在Flink1.11中已经被废弃,所以不建议使用。

基于批
case class Student(id:Int,name:String,age:Int,gender:String,course:String,score:Int)
object FlinkBatchTableOps {
    
    def main(args: Array[String]): Unit = {
    
        //构建batch的executionEnvironment
        val env = ExecutionEnvironment.getExecutionEnvironment
        val bTEnv = BatchTableEnvironment.create(env)
        val dataSets: DataSet[Student] = env.readCsvFile[Student]("E:\\data\\student.csv",
            //是否忽略文件的第一行数据(主要考虑表头数据)
            ignoreFirstLine = true,
            //字段之间的分隔符
            fieldDelimiter = "|")
        //table 就相当于sparksql中的dataset
        val table: Table = bTEnv.fromDataSet(dataSets)
        //条件查询
        val result: Table = table.select("name,age").where("age=25")
        //打印输出
        bTEnv.toDataSet[Row](result).print()
    }
}
基于流
case class Goods(id: Int,brand:String,category:String)
object FlinkStreamTableOps {
    
    def main(args: Array[String]): Unit = {
    
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        val sTEnv = StreamTableEnvironment.create(env)
        val dataStream: DataStream[Goods] = env.fromElements(
            "001|mi|mobile",
            "002|mi|mobile",
            "003|mi|mobile",
            "004|mi|mobile",
            "005|huawei|mobile",
            "006|huawei|mobile",
            "007|huawei|mobile",
            "008|Oppo|mobile",
            "009|Oppo|mobile",
            "010|uniqlo|clothing",
            "011|uniqlo|clothing",
            "012|uniqlo|clothing",
            "013|uniqlo|clothing",
            "014|uniqlo|clothing",
            "015|selected|clothing",
            "016|selected|clothing",
            "017|selected|clothing",
            "018|Armani|clothing",
            "019|lining|sports",
            "020|nike|sports",
            "021|adidas|sports",
            "022|nike|sports",
            "023|anta|sports",
            "024|lining|sports"
        ).map(line => {
    
            val fields = line.split("\\|")
            Goods(fields(0), fields(1), fields(2))
        })
        //load data from external system
        var table = sTEnv.fromDataStream(dataStream)
        // stream table api
        table.printSchema()
        // 高阶api的操作
        table = table.select("category").distinct()
        /*
            将一个table转化为一个DataStream的时候,有两种选择:
            1. toAppendStream  :在没有聚合操作的时候使用
            2. toRetractStream(缩放的含义) :在进行聚合操作之后使用
         */
        sTEnv.toRetractStream[Row](table).print()
        env.execute("FlinkStreamTableOps")
    }
}

3.2.4 sqlQuery

sql仍然是最主要的分析工具,使用dsl当然也能完成业务分析,但是灵活性,简易性上都不及sql。FlinkTable通过sqlQuery来完成sql的查询操作。

object FlinkSQLOps {
    
    def main(args: Array[String]): Unit = {
    
        val env = ExecutionEnvironment.getExecutionEnvironment
        val sTEnv = BatchTableEnvironment.create(env)
        val dataStream: DataSet[Goods] = env.fromElements(
            "001|mi|mobile",
            "002|mi|mobile",
            "003|mi|mobile",
            "004|mi|mobile",
            "005|huawei|mobile",
            "006|huawei|mobile",
            "007|huawei|mobile",
            "008|Oppo|mobile",
            "009|Oppo|mobile",
            "010|uniqlo|clothing",
            "011|uniqlo|clothing",
            "012|uniqlo|clothing",
            "013|uniqlo|clothing",
            "014|uniqlo|clothing",
            "015|selected|clothing",
            "016|selected|clothing",
            "017|selected|clothing",
            "018|Armani|clothing",
            "019|lining|sports",
            "020|nike|sports",
            "021|adidas|sports",
            "022|nike|sports",
            "023|anta|sports",
            "024|lining|sports"
        ).map(line => {
    
            val fields = line.split("\\|")
            Goods(fields(0), fields(1), fields(2))
        })
        //load data from external system
        sTEnv.registerTable("goods", dataStream)
        //sql操作
        var sql =
            """
              |select
              |   id,
              |   brand,
              |   category
              |from goods
              |""".stripMargin
        sql =
            """
              |select
              |   category,
              |   count(1) counts
              |from goods
              |group by category
              |order by counts desc
              |""".stripMargin
        table = sTEnv.sqlQuery(sql)
        sTEnv.toDataSet[Row](table).print()
    }
}

3.2.5 基于滚动窗口的Table操作

基于EventTIme滑动窗口操作
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.scala.StreamTableEnvironment
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.table.api.Table
import org.apache.flink.types.Row
//基于滚动窗口Table操作
object FlinkTrumblingWindowTableOps {
    
    def main(args: Array[String]): Unit = {
    
        //1、获取流式执行环境
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
        env.setParallelism(1)
        // 2、获取table执行环境
        val tblEnv = StreamTableEnvironment.create(env)
        //3、获取数据源
        //输入数据:
        val ds = env.socketTextStream("node01", 9999)
                .map(line => {
    
                    val fields = line.split("\t")
                    UserLogin(fields(0), fields(1), fields(2), fields(3).toInt,
 fields(4))
                })
                .assignTimestampsAndWatermarks(
                    new BoundedOutOfOrdernessTimestampExtractor[UserLogin](Time.seconds(2)) {
    
                        override def extractTimestamp(userLogin: UserLogin): Long = {
    
                            userLogin.dataUnix * 1000
                        }
                    }
                )
        //4、将DataStream转换成table
        //引入隐式
        //某天每隔2秒的输入记录条数:
        import org.apache.flink.table.api.scala._
        val table: Table = tblEnv.fromDataStream[UserLogin](ds , 'platform, 'server, 'status, 'ts.rowtime)
//        tblEnv.toAppendStream[Row](table).print()
        tblEnv.sqlQuery(
            s"""
               |select
               |platform,
               |count(1) counts
               |from ${
      table}
               |where status = 'LOGIN'
               |group by platform, tumble(ts,interval '2' second)
               |""".stripMargin)
            .toAppendStream[Row]
            .print("每隔2秒不同平台登录用户->")
        env.execute()
    }
}
/** 用户登录
  *
  * @param platform 所在平台 id(e.g. H5/IOS/ADR/IOS_YY)
  * @param server   所在游戏服 id
  * @param uid      用户唯一 id
  * @param dataUnix 事件时间/s 时间戳
  * @param status   登录动作(LOGIN/LOGOUT)
  */
case class UserLogin(platform: String, server: String, uid: String,  dataUnix: Int, status: String)
基于窗口的processTime
object FlinkTrumblingWindowTableOps2 {
    
    def main(args: Array[String]): Unit = {
    
        //1、获取流式执行环境
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        env.setParallelism(1)
        // 2、获取table执行环境
        val tblEnv = StreamTableEnvironment.create(env)
        //3、获取数据源
        //输入数据:
        val ds = env.socketTextStream("node01", 9999)
                .map(line => {
    
                    val fields = line.split("\t")
                    UserLogin(fields(0), fields(1), fields(2), fields(3).toInt, fields(4))
                })
        //4、将DataStream转换成table
        //引入隐式
        //某天每隔2秒的输入记录条数:
        import org.apache.flink.table.api.scala._
        val table: Table = tblEnv.fromDataStream[UserLogin](ds , 'platform, 'server, 'status, 'ts.proctime)
        // tblEnv.toAppendStream[Row](table).print()
        tblEnv.sqlQuery(
            s"""
               |select
               |  platform,
               |  count(1) counts
               |from ${
      table}
               |where status = 'LOGIN'
               |group by platform, tumble(ts,interval '2' second)
               |""".stripMargin)
            .toAppendStream[Row]
            .print("prcotime-每隔2秒不同平台登录用户->")
        env.execute()
    }
}

3.3 Flink Table UDF

3.3.1 说明

自定义标量函数(User Defined Scalar Function)。一行输入一行输出。

3.3.2 数据

某个用户在某个时刻浏览了某个商品,以及商品的价值

{
    "userID": 2, "eventTime": "2020-10-01 10:02:00", "eventType": "browse", "productID": "product_5", "productPrice": 20.99}
{
    "userID": 1, "eventTime": "2020-10-01 10:02:02", "eventType": "browse", "productID": "product_5", "productPrice": 20.99}
{
    "userID": 2, "eventTime": "2020-10-01 10:02:06", "eventType": "browse", "productID": "product_5", "productPrice": 20.99}
{
    "userID": 1, "eventTime": "2020-10-01 10:02:10", "eventType": "browse", "productID": "product_5", "productPrice": 20.99}
{
    "userID": 2, "eventTime": "2020-10-01 10:02:06", "eventType": "browse", "productID": "product_5", "productPrice": 20.99}
{
    "userID": 2, "eventTime": "2020-10-01 10:02:06", "eventType": "browse", "productID": "product_5", "productPrice": 20.99}
{
    "userID": 1, "eventTime": "2020-10-01 10:02:12", "eventType": "browse", "productID": "product_5", "productPrice": 20.99}
{
    "userID": 2, "eventTime": "2020-10-01 10:02:06", "eventType": "browse", "productID": "product_5", "productPrice": 20.99
{
    "userID": 2, "eventTime": "2020-10-01 10:02:06", "eventType": "browse", "productID": "product_5", "productPrice": 20.99}
{
    "userID": 1, "eventTime": "2020-10-01 10:02:15", "eventType": "browse", "productID": "product_5", "productPrice": 20.99}
{
    "userID": 1, "eventTime": "2020-10-01 10:02:16", "eventType": "browse", "productID": "product_5", "productPrice": 20.99}

3.3.3 需求

  • UDF时间转换
  • UDF需要继承ScalarFunction抽象类,主要实现eval方法。
  • 自定义UDF,实现将eventTime转化为时间戳

3.3.4 实现

object FlinkTableUDFOps {
    
    def main(args: Array[String]): Unit = {
    
        val env = ExecutionEnvironment.getExecutionEnvironment
        val bTEnv = BatchTableEnvironment.create(env)
        val ds = env.fromElements(
            "{\"userID\": 2, \"eventTime\": \"2020-10-01 10:02:00\", \"eventType\": \"browse\", \"productID\": \"product_5\", \"productPrice\": 20.99}",
            "{\"userID\": 1, \"eventTime\": \"2020-10-01 10:02:02\", \"eventType\": \"browse\", \"productID\": \"product_5\", \"productPrice\": 20.99}",
            "{\"userID\": 2, \"eventTime\": \"2020-10-01 10:02:06\", \"eventType\": \"browse\", \"productID\": \"product_5\", \"productPrice\": 20.99}",
            "{\"userID\": 1, \"eventTime\": \"2020-10-01 10:02:10\", \"eventType\": \"browse\", \"productID\": \"product_5\", \"productPrice\": 20.99}",
            "{\"userID\": 2, \"eventTime\": \"2020-10-01 10:02:06\", \"eventType\": \"browse\", \"productID\": \"product_5\", \"productPrice\": 20.99}",
            "{\"userID\": 2, \"eventTime\": \"2020-10-01 10:02:06\", \"eventType\": \"browse\", \"productID\": \"product_5\", \"productPrice\": 20.99}",
            "{\"userID\": 1, \"eventTime\": \"2020-10-01 10:02:12\", \"eventType\": \"browse\", \"productID\": \"product_5\", \"productPrice\": 20.99}",
            "{\"userID\": 2, \"eventTime\": \"2020-10-01 10:02:06\", \"eventType\": \"browse\", \"productID\": \"product_5\", \"productPrice\": 20.99}",
            "{\"userID\": 2, \"eventTime\": \"2020-10-01 10:02:06\", \"eventType\": \"browse\", \"productID\": \"product_5\", \"productPrice\": 20.99}",
            "{\"userID\": 1, \"eventTime\": \"2020-10-01 10:02:15\", \"eventType\": \"browse\", \"productID\": \"product_5\", \"productPrice\": 20.99}",
            "{\"userID\": 1, \"eventTime\": \"2020-10-01 10:02:16\", \"eventType\": \"browse\", \"productID\": \"product_5\", \"productPrice\": 20.99}"
        ).map(line => {
    
            val jsonObj = new JSONObject(line)
            val userID = jsonObj.getInt("userID")
            val eventTime = jsonObj.getString("eventTime")
            val eventType = jsonObj.getString("eventType")
            val productID = jsonObj.getString("productID")
            val productPrice = jsonObj.getDouble("productPrice")
            UserBrowseLog(userID, eventTime, eventType, productID, productPrice)
        })
        //自定义udf
        bTEnv.registerFunction("to_time", new TimeScalarFunction())
        bTEnv.registerFunction("myLen", new LenScalarFunction())
        val table = bTEnv.fromDataSet(ds)
        val sql =
            s"""
              |select
              |  userID,
              |  eventTime,
              |  myLen(eventTime) my_len_et,
              |  to_time(eventTime) timestamps
              |from ${
      table}
              |""".stripMargin
        val ret = bTEnv.sqlQuery(sql)

        bTEnv.toDataSet[Row](ret).print
    }
}
case class UserBrowseLog(
    userID: Int,
    eventTime: String,
    eventType: String,
    productID: String,
    productPrice: Double
)

/*
    自定义类去扩展ScalarFunction 复写其中的方法:eval
    at least one method named 'eval' which is public, not
 */
class TimeScalarFunction extends ScalarFunction {
    
    //2020-10-01 10:02:16
    private val df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
    def eval(eventTime: String): Long = {
    
        df.parse(eventTime).getTime
    }
}

class LenScalarFunction extends ScalarFunction {
    
    //2020-10-01 10:02:16
    def eval(str: String): Int = {
    
        str.length
    }
}
版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/MoLeft/article/details/124613097

智能推荐

0719学习总结(文件流的输入输出)_请综合输入输出流、文件流和字符串流、特别是文件流中指针相关函数进行文件随机位-程序员宅基地

文章浏览阅读482次。1.输入输出流常用的有:iostream 包含了对输入输出流进行操作所需的基本信息:fstream 用于用户管理的文件的I/O操作。strstream 用于字符串流I/O。stdiostream 用于混合使用C和C + +的I/O机制时,例如想将C程序转变为C++程序。iomanip 在使用格式化I/O时应包含此头文件。2.在iostream头文件中定_请综合输入输出流、文件流和字符串流、特别是文件流中指针相关函数进行文件随机位

鸿蒙osbeta2.0上手上手,鸿蒙OS 2.0上手视频曝光,Mate40系列首批无缘,界面与EMUI一致...-程序员宅基地

文章浏览阅读47次。原标题:鸿蒙OS 2.0上手视频曝光,Mate40系列首批无缘,界面与EMUI一致华为在今天正式召开鸿蒙OS 2.0开发者沟通会,此次沟通会将会讨论鸿蒙OS 2.0的各种相关开发问题。对于普通消费者来说,开发者的事情并不是大家关心的。作为消费者最关心的还是鸿蒙OS 2.0啥时候能够适配自己的手机。 在华为召开开发者大会的同时,华为也在今天正式放出了鸿蒙OS 2.0手机开发者Beta版。目前已经有博...

Jenkins首次安装推荐插件出错 No such plugin: cloudbees-folder 超详细解决方案-程序员宅基地

文章浏览阅读6.8k次,点赞2次,收藏12次。我的环境:腾讯云 CentOS7 轻量应用服务器docker run -u root -itd -p 8080:8080 -p 50000:50000 -v /var/jenkins_home:/var/jenkins_home -v /var/run/docker.sock:/var/run/docker.sock --name jenkins-master jenkinsci/blueocean当我首次通过镜像启动一个 Jenkins 容器后,访问服务器 公网ip:8080 访问 Jenkin._no such plugin: cloudbees-folder

JSP入门-基本语法_用于为其他动作提供附加信息的动作是-程序员宅基地

文章浏览阅读392次。JSP入门-基本语法_用于为其他动作提供附加信息的动作是

DNS服务器搭建_server registered via geo dns in ap-east-1-程序员宅基地

文章浏览阅读5.9k次,点赞6次,收藏59次。本篇是关于DNS服务器的搭建配置教程!_server registered via geo dns in ap-east-1

7.2(stm32以太网)_w25qxx_write_nocheck-程序员宅基地

文章浏览阅读3.2k次。《7.2》1.SPI的写入整个扇区W25QXX_Write_NoCheck()--->W25QXX_Write_Page()--->SPI1_ReadWriteByte(); void W25QXX_Write_NoCheck(pBuffer , addr,num) { u8 pageremain; pageremain = addr - addr%256;_w25qxx_write_nocheck

随便推点

ROS命令_ros查找功能包-程序员宅基地

文章浏览阅读2.7k次。文章目录1.功能包、功能包集相关2.工作空间1.功能包、功能包集相关(1) rospack find turtlesim:查找turtlesim包的路径/opt/ros/noetic/share/turtlesim(2) rosls turtlesim:可获取功能包下面的文件列表cmake images msg package.xml srv(3) rosstack find [stack_name]:查找已经在系统中安装过的某个功能包集(4)roscd turtlesim:进入某个文_ros查找功能包

hadoop_connecting to 192.168.128.130:22... could not conn-程序员宅基地

文章浏览阅读412次。连接xshell错误信息如下Connecting to 192.168.128.130:22...Could not connect to '192.168.128.130' (port 22): Connection failed.最有可能的原因是在配置 vi /etc/sysconfig/network-scripts/ifcfg-eth0 的时候使用的IPADDR =192.168.128.130 , 这个IP地址不在虚拟机VMware8本身的net 连接的ip范围内,解决步骤如下_connecting to 192.168.128.130:22... could not connect to '192.168.128.130' (

CentOS7 安装 PHP7 完全详细教程_centos7 命令安装php7-程序员宅基地

文章浏览阅读2w次,点赞3次,收藏16次。CentOS7的默认PHP版本是PHP5,但是如果我们要安装PHP7,不需要将现有的PHP5删除,只要将PHP升级到PHP7即可。使用 yum provides php 命令可以获取CentOS7的PHP包安装情况。显示的是在现有的安装源中能够安装的最新版本为:php-5.4.16-46.el7.x86_64在安装PHP7之前,建议先升级更新一下CentOS7的安装包:yum -y upd..._centos7 命令安装php7

偏差平方和说明什么_什么是平方误差和均方误差-程序员宅基地

文章浏览阅读7.1k次。展开全部均方误差是指参数估计636f70793231313335323631343130323136353331333431373161值与参数真值之差平方的期望值,记为MSE。MSE是衡量“平均误差”的一种较为方便的方法,MSE可以评价数据的变化程度,MSE的值越小,说明预测模型描述实验数据具有更好的精确度。误差平方和又称残差平方和、组内平方和等,根据n个观察值拟合适当的模型后,余下未能拟合部份..._参参数偏差平方和

nginx mysql php源码编译_snowy +nginx-程序员宅基地

文章浏览阅读87次。lamp架构=Linux/unix/windows(操作系统)+apache/nginx……+mysql/pgsql +php/python/golang。开发能力要自己努力。起码需要一些语言基础。不懂代码何以精通?不堪官方源码何以做到熟悉?nginxnginx有官方网站:www.nginx.comtar zxf nginx-1.18.0.tar.gz ##解压。解压之后里面有configure,表示他是开源的。./configure --help ##查看一些参数 _snowy +nginx

Rancher备份&&还原_rancher还原db-程序员宅基地

文章浏览阅读834次。Rancher备份&&还原一、Rancher 备份1、备份# !/bin/bash# /opt/backup/backup-rancher.shDATE=$(date +%Y%m%d%H%M)echo "备份rancher数据"echo "1.拷贝容器内数据"IMAGE_ID=`docker ps |grep rancher:v2.5.8|awk '{print $1}'`docker cp $IMAGE_ID:/var/lib/rancher /opt/ba_rancher还原db

推荐文章

热门文章

相关标签