前边我们已经开发完毕了上报服务系统
, 我们可以通过上报服务系统把电商页面中的点击流数据发送到Kafka中, 那么接下来我们就来开发Flink实时分析系统
, 通过流的方式读取Kafka中的消息, 进而分析数据。
业务
CheckPoint
和水印
解决Flink生产上遇到的问题(网络延迟、丢数据)real-process
项目的pom.xml <properties>
<scala.version>2.11</scala.version>
<flink.version>1.6.1</flink.version>
<hadoop.version>2.7.5</hadoop.version>
<hbase.version>2.0.0</hbase.version>
</properties>
<dependencies>
<!--kafka 客户端-->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_${
scala.version}</artifactId>
<version>0.10.1.0</version>
</dependency>
<!--flink对接kafka:导入flink使用kafka的依赖-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.10_${
scala.version}</artifactId>
<version>${
flink.version}</version>
</dependency>
<!--批处理-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table_${
scala.version}</artifactId>
<version>${
flink.version}</version>
</dependency>
<!--导入scala的依赖-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_${
scala.version}</artifactId>
<version>${
flink.version}</version>
</dependency>
<!--模块二 流处理-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${
scala.version}</artifactId>
<version>${
flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${
scala.version}</artifactId>
<version>${
flink.version}</version>
</dependency>
<!--数据落地flink和hbase的集成依赖-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-hbase_${
scala.version}</artifactId>
<version>${
flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>${
hbase.version}</version>
</dependency>
<!--<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>2.0.0</version>
</dependency>-->
<!--hbase依赖于hadoop-->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${
hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>${
hadoop.version}</version>
<!--xml.parser冲突 flink hdfs-->
<exclusions>
<exclusion>
<groupId>xml-apis</groupId>
<artifactId>xml-apis</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${
hadoop.version}</version>
<!--数据同步:canal 和 hadoop protobuf-->
<exclusions>
<exclusion>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
</exclusion>
</exclusions>
</dependency>
<!--对象和json 互相转换的-->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.44</version>
</dependency>
</dependencies>
<build>
<sourceDirectory>src/main/scala</sourceDirectory>
<testSourceDirectory>src/test/scala</testSourceDirectory>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.0.0</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<excludes>
<exclude>com.google.code.findbugs:jsr305</exclude>
<exclude>org.slf4j:*</exclude>
<exclude>log4j:*</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<!-- Do not copy the signatures in the META-INF folder.
Otherwise, this might cause SecurityExceptions when using the JAR. -->
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>com.itheima.realprocess.App</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
real-process
模块添加scala支持scala
文件夹,并标记为源代码和测试代码目录包名 | 说明 |
---|---|
com.itheima.realprocess.util | 存放工具类 |
com.itheima.realprocess.bean | 存放实体类 |
com.itheima.realprocess.task | 存放具体的分析任务 每一个业务都是一个任务,对应的分析处理都写在这里 |
导入到
resources`目录#
#kafka的配置
#
# Kafka集群地址
bootstrap.servers="hadoop102:9092,hadoop103:9092,hadoop104:9092"
# ZooKeeper集群地址
zookeeper.connect="hadoop102:2181,hadoop103:2181,hadoop104:2181"
# Kafka Topic名称
input.topic="pyg"
# 消费组ID
group.id="pyg"
# 自动提交拉取到消费端的消息offset到kafka
enable.auto.commit="true"
# 自动提交offset到zookeeper的时间间隔单位(毫秒)
auto.commit.interval.ms="5000"
# 每次消费最新的数据
auto.offset.reset="latest"
log4j.properties
到resources
目录下log4j.rootLogger=warn,stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%5p - %m%n
注意修改`kafka服务器`和`hbase服务器`的机器名称
ConfigFactory.load()介绍
ConfigFactory.load()
可以自动加载配置文件中的application.conf
文件(注意:名字一定不要写错,否则无法加载
),并返回一个Config对象application.conf
文件是一个properties文件,存放key-value键值对的数据。方法名 | 说明 |
---|---|
getString(“key”) | 获取配置文件中指定key的值对应的字符串 |
getInt(“key”) | 获取配置文件中指定key的值对应的整型数字 |
getLong(“key”) | 同上 |
getBoolean(“key”) | 同上 |
##3 编写scala代码读取配置工具类 | |
在com.itheima.realprocess.util 包下创建GlobalConfigUtil 单例对象(object) |
|
步骤 |
ConfigFactory.load
获取配置对象application.conf
配置main
方法测试,工具类是否能够正确读取出配置项。/**
* 加载配置文件工具类
*/
object GlobalConfigUtil {
private val config: Config = ConfigFactory.load()
/*------------------
* Kafka配置
*------------------*/
val bootstrapServers = config.getString("bootstrap.servers")
val zookeeperConnect = config.getString("zookeeper.connect")
val inputTopic = config.getString("input.topic")
val groupId = config.getString("group.id")
val enableAutoCommit = config.getString("enable.auto.commit")
val autoCommitIntervalMs = config.getString("auto.commit.interval.ms")
val autoOffsetReset = config.getString("auto.offset.reset")
// 测试配置文件读取类
def main(args: Array[String]): Unit = {
println(bootstrapServers)
println(zookeeperConnect)
println(inputTopic)
println(groupId)
println(enableAutoCommit)
println(autoCommitIntervalMs)
println(autoOffsetReset)
}
}
步骤
App
单例对象StreamExecutionEnvironment
运行环境EventTime
,使用数据发生的时间来进行数据处理import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.api.scala._
object App {
def main(args: Array[String]): Unit = {
//初始化Flink的流式环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//设置处理的时间为EventTime
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
//设置并行度
env.setParallelism(1)
//本地测试 加载本地集合 成为一个DataStream 打印输出
val localDataStream = env.fromCollection(
List("hadoop", "hive", "hbase", "flink")
)
localDataStream.print()
//执行任务
env.execute("real-process")
}
}
注意:
1. 一定要导入`import org.apache.flink.api.scala._`隐式转换,否则Flink程序无法执行
2. 到导入`org.apache.flink.streaming.api`下的`TimeCharacteristic`,否则`没有EventTime`
Checkpoint
是Flink实现容错机制最核心的功能,它能够根据配置周期性地基于Stream中各个Operator的状态来生成Snapshot
,从而将这些状态数据定期持久化存储下来,当Flink程序一旦意外崩溃时,重新运行程序时可以有选择地从这些Snapshot进行恢复,从而修正因为故障带来的程序数据状态中断。
步骤
checkpoint
支持
//
// 保证程序长时间运行的安全性进行checkpoint操作
//
// 5秒启动一次checkpoint
env.enableCheckpointing(5000)
// 设置checkpoint只checkpoint一次
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
// 设置两次checkpoint的最小时间间隔
env.getCheckpointConfig.setMinPauseBetweenCheckpoints(1000)
// checkpoint超时的时长
env.getCheckpointConfig.setCheckpointTimeout(60000)
// 允许的最大checkpoint并行度
env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
// 当程序关闭的时,触发额外的checkpoint
env.getCheckpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
// 设置checkpoint的地址
env.setStateBackend(new FsStateBackend("hdfs://hadoop102:8020/flink-checkpoint/"))
HDFS
Flink程序
测试步骤
FlinkKafkaConsumer010
整合Kafkazookeeper
kafka
1.配置Kafka连接属性
//
// 整合Kafka
//
val properties = new Properties()
// # Kafka集群地址
properties.setProperty("bootstrap.servers",GlobalConfigUtil.bootstrapServers)
// # ZooKeeper集群地址
properties.setProperty("zookeeper.connect",GlobalConfigUtil.zookeeperConnect)
// # Kafka Topic名称
properties.setProperty("input.topic",GlobalConfigUtil.inputTopic)
// # 消费组ID
properties.setProperty("group.id",GlobalConfigUtil.groupId)
// # 自动提交拉取到消费端的消息offset到kafka
properties.setProperty("enable.auto.commit",GlobalConfigUtil.enableAutoCommit)
// # 自动提交offset到zookeeper的时间间隔单位(毫秒)
properties.setProperty("auto.commit.interval.ms",GlobalConfigUtil.autoCommitIntervalMs)
// # 每次消费最新的数据
properties.setProperty("auto.offset.reset",GlobalConfigUtil.autoOffsetReset)
val consumer = new FlinkKafkaConsumer010[String](
GlobalConfigUtil.inputTopic,
new SimpleStringSchema(),
properties
)
2.添加一个source到当前Flink环境
val kafkaDataStream: DataStream[String] = env.addSource(consumer)
3.打印DataStream中的数据
kafkaDataStream.print()
启动zookeeper
启动kafka
运行Flink程序
运行上报服务系统
启动消息生成器, 测试是否能够从Kafka中消费到数据
如果Flink从Kafka消费成功会打印以下数据,就证明我们的代码是正确的。
{
"count": 1,
"message": "{\"browserType\":\"谷歌浏览器\",\"categoryID\":6,\"channelID\":4,\"city\":\"America\",\"country\":\"china\",\"entryTime\":1544601660000,\"leaveTime\":1544634060000,\"network\":\"联通\",\"produceID\":4,\"province\":\"china\",\"source\":\"百度跳转\",\"userID\":13}",
"timeStamp": 1553188417573
}
步骤
object App {
def main(args: Array[String]): Unit = {
...
//JSON->元组
val tupleDataStream = kafkaDataStream.map {
msgJSON => {
val jsonObject = JSON.parseObject(msgJSON)
val message = jsonObject.getString("message")
val count = jsonObject.getLong("count")
val timeStamp = jsonObject.getLong("timeStamp")
(message, count, timeStamp)
}
}
tupleDataStream.print()
//执行任务
env.execute("real-process")
}
}
步骤
ClickLog
样例类中来封装消息ClickLog
样例类ClickLog
样例类,添加以下字段ClickLog
伴生对象中实现apply
方法JSON.parseObject
方法将JSON字符串构建一个ClickLog
实例对象ClickLog
样例类使用JSONObject.get("key")获取到的数据是一个Any类型,要调用toString方法转换为String类型
参考代码
在bean目录下创建
import com.alibaba.fastjson.JSON
/**
* 使用ClickLog样例类来封装点击流日志
*/
// - 频道ID(channelID)
// - 产品类别ID(categoryID)
// - 产品ID(produceID)
// - 国家(country)
// - 省份(province)
// - 城市(city)
// - 网络方式(network)
// - 来源方式(source)
// - 浏览器类型(browserType)
// - 进入网站时间(entryTime)
// - 离开网站时间(leaveTime)
// - 用户的ID(userID)
case class ClickLog (
var channelID:String,
var categoryID:String,
var produceID:String,
var country:String,
var province:String,
var city:String,
var network:String,
var source:String,
var browserType:String,
var entryTime:String,
var leaveTime:String,
var userID:String)
object ClickLog {
def apply(json:String): ClickLog = {
val jsonObject = JSON.parseObject(json)
val channelID = jsonObject.getString("channelID")
val categoryID = jsonObject.getString("categoryID")
val produceID = jsonObject.getString("produceID")
val country = jsonObject.getString("country")
val province = jsonObject.getString("province")
val city = jsonObject.getString("city")
val network = jsonObject.getString("network")
val source = jsonObject.getString("source")
val browserType = jsonObject.getString("browserType")
val entryTime = jsonObject.getString("entryTime")
val leaveTime = jsonObject.getString("leaveTime")
val userID = jsonObject.getString("userID")
ClickLog(
channelID,
categoryID,
produceID,
country,
province,
city,
network,
source,
browserType,
entryTime,
leaveTime,
userID
)
}
def main(args: Array[String]): Unit = {
val clickLog = ClickLog("{\"browserType\":\"火狐\",\"categoryID\":20,\"channelID\":19,\"city\":\"ZhengZhou\",\"country\":\"china\",\"entryTime\":1544605260000,\"leaveTime\":1544634060000,\"network\":\"电信\",\"produceID\":7,\"province\":\"HeBeijing\",\"source\":\"百度跳转\",\"userID\":6}")
println(clickLog)
}
}
修改一下 (ClickLog(message), count, timeStamp)
//JSON->元组
val tupleDataStream = kafkaDataStream.map {
msgJSON => {
val jsonObject = JSON.parseObject(msgJSON)
val message = jsonObject.getString("message")
val count = jsonObject.getLong("count")
val timeStamp = jsonObject.getLong("timeStamp")
(ClickLog(message), count, timeStamp)
}
}
tupleDataStream.print()
步骤
Message
样例类,将ClickLog、时间戳、数量封装/**
* 封装Kafka中的消息
* @param clickLog 点击流浏览数据Bean对象
* @param count 数量
* @param timestamp 时间戳
*/
case class Message(var clickLog:ClickLog,
var count:Long,
var timestamp:Long)
App.scala
object App {
def main(args: Array[String]): Unit = {
...
// 使用map算子,将kafka中消费到的数据
val tupleDataStream = kafkaDataStream.map {
msgJSON => {
val jsonObject = JSON.parseObject(msgJSON)
val message = jsonObject.getString("message")
val count = jsonObject.getLong("count")
val timeStamp = jsonObject.getLong("timeStamp")
// (ClickLog(message), count, timeStamp)
Message(ClickLog(message),count,timeStamp)
}
}
...
}
}
水印(watermark)就是一个时间戳
,Flink可以给数据流添加水印,可以理解为:Flink收到一条消息后,额外给这个消息添加了一个时间字段,这就是添加水印
。
水印时间 >= 窗口的endTime
,则触发计算60号
消息的EventTime为10:10:00
, 正好符合10:00:00到10:10:00这个时间窗口的endtime。正常情况, 该消息应该被这个窗口计算的。但是当发生网络延迟的时候,该消息可能会晚到几秒钟,那么当它到达flink时,该窗口已经运算完毕。为了解决该问题,我们为该消息设置watermark时间
为10:09:57
,当它到达flink时,会把该watermark时间设置为窗口的当前时间,由于小于endtime,所以该消息到达后并不会立即计算。直到一个携带watermark时间大于或者等于endtime的时候,窗口计算才会被触发。这样就可以有效的解决由于网络延迟造成的数据计算不精确的情况。App.scala
中添加水印支持tupleDataStream.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[Message]{
// 当前时间戳
var currentTimestamp = 0l
//延迟时间
var maxDelayTime = 2000l
// 获取水印时间
override def getCurrentWatermark: Watermark = {
new Watermark(currentTimestamp - maxDelayTime)
}
// 获取EventTime
override def extractTimestamp(element: Message, previousElementTimestamp: Long): Long = {
// 比较当前消息的时间戳和上一个消息的时间戳,取最大值
currentTimestamp = Math.max(element.timeStamp,previousElementTimestamp)
currentTimestamp
}
})
2.启动执行测试,观察输出
Message(1,1557475915909,ClickLog(火狐,9,14,ShiJiaZhuang,china,1544612460000,1544634060000,移动,2,HeNan,必应跳转,11))
文章浏览阅读2w次。最近要测试一款传感器,传感器的接口是485的,想先在PC上熟悉并验证一下通信,然后再连接到控制器设备上进行支持。先说测试环境:操作系统win10 64bitsusb转485线,型号HXSP-2108F,抽屉里面翻出来了,公司官网都找不到了……最开始插上的现象:电脑本身已经安装了几种usb转串口的驱动,直接插上以后可以识别出串口,但是有黄色感叹号标志。感觉_win10版usb转rs485驱动程序
文章浏览阅读1k次。常用的图片样式属性:border-style图片边框样式:实线solid、虚线dashed、点划线doted.border-color图片边框颜色。border-width图片边框粗细、width控制图片宽度、height控制图片高度。特别说明:border-width、border-style、border-color可以一起使用。例如border:1px solid blue;&...
文章浏览阅读422次。双指针双指针也是所谓滑动窗口算法。一般用于求解具有某种特质的子数组、子字符串、链表查询的分隔问题,通过两个指针来标识窗口的边界(子数组、子字符串、链表标记)。顾名思义,有2个指针,头指针和尾指针(窗口边界),结合while使用。二分也是利用双指针的思想。import java.util.HashMap;import java.util.HashSet;import java.util.Map;import java.util.Set;/** * Title:NC41-最长无重复, 数组中
文章浏览阅读8.2k次,点赞4次,收藏4次。很多小伙伴在每次用到package.json中的npm scripts时,总是被里面的各种命令行参数搞得头大。本文将针对webpack,对其命令行参数进行详解,希望读者们能借此搞清楚每个参数的作用,并在使用时能知道其含义。"scripts": { "build": "./node_modules/.bin/webpack --bail --progress --profile --m..._webpack 参数
文章浏览阅读2.8k次。Hive可以视为在Hadoop和HDFS之上为用户封装_apache-hive-0.13.1-bin
文章浏览阅读343次。为了方便操作常用文件,特归纳总结。一、先贴出处理XML文件代码:public class DealXML { /// <summary> /// 将xml对象内容转换为dataset /// </summary> /// <param name="xmlData"&...
文章浏览阅读1.4k次。IFS FAQ Q1 How difficult is it to port a Windows 9x based file system or file system filter driver to Windows NT/2000/XP?Q2 Is there a WDM model for file systems or file system filter drivers?Q3
文章浏览阅读3.5k次。:十七世纪的中国,恐怖气氛席卷了河南,一个被囚千年的黑狐恶魔逃了出来,为了复仇,他到处制造灾难,妄图征服世界。河南的百姓无力抵挡他,只得求助于少林僧人。一千年前,正是少林三位僧人将黑狐囚禁,可他们早已过世。于是主持决定在少林弟子中寻找三位英雄的转世,可英雄的转世竟是三个小孩,他们是否能够战胜黑狐?他们将如何面对挑战? 片子还未制作完成,仅靠样片销售就收回了4000万元的投资——这是
文章浏览阅读1.2w次。虽然已经默默的告诉自己,非万不得已不要使用c++,_string转wchar_t类型
文章浏览阅读444次。题库来源:安全生产模拟考试一点通公众号小程序安全员-B证(广西省-2021版)找解析是由公众号安全生产模拟考试一点通提供,安全员-B证(广西省-2021版)证模拟考试题库是根据安全员-B证(广西省-2021版)最新版教材汇编出安全员-B证(广西省-2021版)仿真模拟考试。2021年安全员-B证(广西省-2021版)找解析及安全员-B证(广西省-2021版)试题及解析1、【单选题】遇()级及以上大风,应停止模板工程的吊装作业。(B)A、4B、5C、6D、72、【单选题..._凡由建设单位直接发包的,由
文章浏览阅读456次。僵尸进程,孤儿进程是如何产生 通过pstree我们可以查询当前进程的树形结构图,无疑init进程是所有进程的直接或者间接地老子;所有的进程都不是全新的创建,而是通过现有的进程来创建的,所以所有的进程的关系不是兄弟便是直接或间接地父子关系;init进程是所有进程的祖先,其他的进程都是由init直接或者间接fork出来的; 进程有哪些状态: D:不可中断的sle...
文章浏览阅读723次。首先确保电脑和安卓设备连接,然后打开cmd=>输入adb devices查看电脑连接的安卓设备然后>>使用详情