前边我们已经开发完毕了上报服务系统
, 我们可以通过上报服务系统把电商页面中的点击流数据发送到Kafka中, 那么接下来我们就来开发Flink实时分析系统
, 通过流的方式读取Kafka中的消息, 进而分析数据。
业务
CheckPoint
和水印
解决Flink生产上遇到的问题(网络延迟、丢数据)real-process
项目的pom.xml <properties>
<scala.version>2.11</scala.version>
<flink.version>1.6.1</flink.version>
<hadoop.version>2.7.5</hadoop.version>
<hbase.version>2.0.0</hbase.version>
</properties>
<dependencies>
<!--kafka 客户端-->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_${
scala.version}</artifactId>
<version>0.10.1.0</version>
</dependency>
<!--flink对接kafka:导入flink使用kafka的依赖-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.10_${
scala.version}</artifactId>
<version>${
flink.version}</version>
</dependency>
<!--批处理-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table_${
scala.version}</artifactId>
<version>${
flink.version}</version>
</dependency>
<!--导入scala的依赖-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_${
scala.version}</artifactId>
<version>${
flink.version}</version>
</dependency>
<!--模块二 流处理-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${
scala.version}</artifactId>
<version>${
flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${
scala.version}</artifactId>
<version>${
flink.version}</version>
</dependency>
<!--数据落地flink和hbase的集成依赖-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-hbase_${
scala.version}</artifactId>
<version>${
flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>${
hbase.version}</version>
</dependency>
<!--<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>2.0.0</version>
</dependency>-->
<!--hbase依赖于hadoop-->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${
hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>${
hadoop.version}</version>
<!--xml.parser冲突 flink hdfs-->
<exclusions>
<exclusion>
<groupId>xml-apis</groupId>
<artifactId>xml-apis</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${
hadoop.version}</version>
<!--数据同步:canal 和 hadoop protobuf-->
<exclusions>
<exclusion>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
</exclusion>
</exclusions>
</dependency>
<!--对象和json 互相转换的-->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.44</version>
</dependency>
</dependencies>
<build>
<sourceDirectory>src/main/scala</sourceDirectory>
<testSourceDirectory>src/test/scala</testSourceDirectory>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.0.0</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<excludes>
<exclude>com.google.code.findbugs:jsr305</exclude>
<exclude>org.slf4j:*</exclude>
<exclude>log4j:*</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<!-- Do not copy the signatures in the META-INF folder.
Otherwise, this might cause SecurityExceptions when using the JAR. -->
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>com.itheima.realprocess.App</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
real-process
模块添加scala支持scala
文件夹,并标记为源代码和测试代码目录包名 | 说明 |
---|---|
com.itheima.realprocess.util | 存放工具类 |
com.itheima.realprocess.bean | 存放实体类 |
com.itheima.realprocess.task | 存放具体的分析任务 每一个业务都是一个任务,对应的分析处理都写在这里 |
导入到
resources`目录#
#kafka的配置
#
# Kafka集群地址
bootstrap.servers="hadoop102:9092,hadoop103:9092,hadoop104:9092"
# ZooKeeper集群地址
zookeeper.connect="hadoop102:2181,hadoop103:2181,hadoop104:2181"
# Kafka Topic名称
input.topic="pyg"
# 消费组ID
group.id="pyg"
# 自动提交拉取到消费端的消息offset到kafka
enable.auto.commit="true"
# 自动提交offset到zookeeper的时间间隔单位(毫秒)
auto.commit.interval.ms="5000"
# 每次消费最新的数据
auto.offset.reset="latest"
log4j.properties
到resources
目录下log4j.rootLogger=warn,stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%5p - %m%n
注意修改`kafka服务器`和`hbase服务器`的机器名称
ConfigFactory.load()介绍
ConfigFactory.load()
可以自动加载配置文件中的application.conf
文件(注意:名字一定不要写错,否则无法加载
),并返回一个Config对象application.conf
文件是一个properties文件,存放key-value键值对的数据。方法名 | 说明 |
---|---|
getString(“key”) | 获取配置文件中指定key的值对应的字符串 |
getInt(“key”) | 获取配置文件中指定key的值对应的整型数字 |
getLong(“key”) | 同上 |
getBoolean(“key”) | 同上 |
##3 编写scala代码读取配置工具类 | |
在com.itheima.realprocess.util 包下创建GlobalConfigUtil 单例对象(object) |
|
步骤 |
ConfigFactory.load
获取配置对象application.conf
配置main
方法测试,工具类是否能够正确读取出配置项。/**
* 加载配置文件工具类
*/
object GlobalConfigUtil {
private val config: Config = ConfigFactory.load()
/*------------------
* Kafka配置
*------------------*/
val bootstrapServers = config.getString("bootstrap.servers")
val zookeeperConnect = config.getString("zookeeper.connect")
val inputTopic = config.getString("input.topic")
val groupId = config.getString("group.id")
val enableAutoCommit = config.getString("enable.auto.commit")
val autoCommitIntervalMs = config.getString("auto.commit.interval.ms")
val autoOffsetReset = config.getString("auto.offset.reset")
// 测试配置文件读取类
def main(args: Array[String]): Unit = {
println(bootstrapServers)
println(zookeeperConnect)
println(inputTopic)
println(groupId)
println(enableAutoCommit)
println(autoCommitIntervalMs)
println(autoOffsetReset)
}
}
步骤
App
单例对象StreamExecutionEnvironment
运行环境EventTime
,使用数据发生的时间来进行数据处理import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.api.scala._
object App {
def main(args: Array[String]): Unit = {
//初始化Flink的流式环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//设置处理的时间为EventTime
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
//设置并行度
env.setParallelism(1)
//本地测试 加载本地集合 成为一个DataStream 打印输出
val localDataStream = env.fromCollection(
List("hadoop", "hive", "hbase", "flink")
)
localDataStream.print()
//执行任务
env.execute("real-process")
}
}
注意:
1. 一定要导入`import org.apache.flink.api.scala._`隐式转换,否则Flink程序无法执行
2. 到导入`org.apache.flink.streaming.api`下的`TimeCharacteristic`,否则`没有EventTime`
Checkpoint
是Flink实现容错机制最核心的功能,它能够根据配置周期性地基于Stream中各个Operator的状态来生成Snapshot
,从而将这些状态数据定期持久化存储下来,当Flink程序一旦意外崩溃时,重新运行程序时可以有选择地从这些Snapshot进行恢复,从而修正因为故障带来的程序数据状态中断。
步骤
checkpoint
支持
//
// 保证程序长时间运行的安全性进行checkpoint操作
//
// 5秒启动一次checkpoint
env.enableCheckpointing(5000)
// 设置checkpoint只checkpoint一次
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
// 设置两次checkpoint的最小时间间隔
env.getCheckpointConfig.setMinPauseBetweenCheckpoints(1000)
// checkpoint超时的时长
env.getCheckpointConfig.setCheckpointTimeout(60000)
// 允许的最大checkpoint并行度
env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
// 当程序关闭的时,触发额外的checkpoint
env.getCheckpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
// 设置checkpoint的地址
env.setStateBackend(new FsStateBackend("hdfs://hadoop102:8020/flink-checkpoint/"))
HDFS
Flink程序
测试步骤
FlinkKafkaConsumer010
整合Kafkazookeeper
kafka
1.配置Kafka连接属性
//
// 整合Kafka
//
val properties = new Properties()
// # Kafka集群地址
properties.setProperty("bootstrap.servers",GlobalConfigUtil.bootstrapServers)
// # ZooKeeper集群地址
properties.setProperty("zookeeper.connect",GlobalConfigUtil.zookeeperConnect)
// # Kafka Topic名称
properties.setProperty("input.topic",GlobalConfigUtil.inputTopic)
// # 消费组ID
properties.setProperty("group.id",GlobalConfigUtil.groupId)
// # 自动提交拉取到消费端的消息offset到kafka
properties.setProperty("enable.auto.commit",GlobalConfigUtil.enableAutoCommit)
// # 自动提交offset到zookeeper的时间间隔单位(毫秒)
properties.setProperty("auto.commit.interval.ms",GlobalConfigUtil.autoCommitIntervalMs)
// # 每次消费最新的数据
properties.setProperty("auto.offset.reset",GlobalConfigUtil.autoOffsetReset)
val consumer = new FlinkKafkaConsumer010[String](
GlobalConfigUtil.inputTopic,
new SimpleStringSchema(),
properties
)
2.添加一个source到当前Flink环境
val kafkaDataStream: DataStream[String] = env.addSource(consumer)
3.打印DataStream中的数据
kafkaDataStream.print()
启动zookeeper
启动kafka
运行Flink程序
运行上报服务系统
启动消息生成器, 测试是否能够从Kafka中消费到数据
如果Flink从Kafka消费成功会打印以下数据,就证明我们的代码是正确的。
{
"count": 1,
"message": "{\"browserType\":\"谷歌浏览器\",\"categoryID\":6,\"channelID\":4,\"city\":\"America\",\"country\":\"china\",\"entryTime\":1544601660000,\"leaveTime\":1544634060000,\"network\":\"联通\",\"produceID\":4,\"province\":\"china\",\"source\":\"百度跳转\",\"userID\":13}",
"timeStamp": 1553188417573
}
步骤
object App {
def main(args: Array[String]): Unit = {
...
//JSON->元组
val tupleDataStream = kafkaDataStream.map {
msgJSON => {
val jsonObject = JSON.parseObject(msgJSON)
val message = jsonObject.getString("message")
val count = jsonObject.getLong("count")
val timeStamp = jsonObject.getLong("timeStamp")
(message, count, timeStamp)
}
}
tupleDataStream.print()
//执行任务
env.execute("real-process")
}
}
步骤
ClickLog
样例类中来封装消息ClickLog
样例类ClickLog
样例类,添加以下字段ClickLog
伴生对象中实现apply
方法JSON.parseObject
方法将JSON字符串构建一个ClickLog
实例对象ClickLog
样例类使用JSONObject.get("key")获取到的数据是一个Any类型,要调用toString方法转换为String类型
参考代码
在bean目录下创建
import com.alibaba.fastjson.JSON
/**
* 使用ClickLog样例类来封装点击流日志
*/
// - 频道ID(channelID)
// - 产品类别ID(categoryID)
// - 产品ID(produceID)
// - 国家(country)
// - 省份(province)
// - 城市(city)
// - 网络方式(network)
// - 来源方式(source)
// - 浏览器类型(browserType)
// - 进入网站时间(entryTime)
// - 离开网站时间(leaveTime)
// - 用户的ID(userID)
case class ClickLog (
var channelID:String,
var categoryID:String,
var produceID:String,
var country:String,
var province:String,
var city:String,
var network:String,
var source:String,
var browserType:String,
var entryTime:String,
var leaveTime:String,
var userID:String)
object ClickLog {
def apply(json:String): ClickLog = {
val jsonObject = JSON.parseObject(json)
val channelID = jsonObject.getString("channelID")
val categoryID = jsonObject.getString("categoryID")
val produceID = jsonObject.getString("produceID")
val country = jsonObject.getString("country")
val province = jsonObject.getString("province")
val city = jsonObject.getString("city")
val network = jsonObject.getString("network")
val source = jsonObject.getString("source")
val browserType = jsonObject.getString("browserType")
val entryTime = jsonObject.getString("entryTime")
val leaveTime = jsonObject.getString("leaveTime")
val userID = jsonObject.getString("userID")
ClickLog(
channelID,
categoryID,
produceID,
country,
province,
city,
network,
source,
browserType,
entryTime,
leaveTime,
userID
)
}
def main(args: Array[String]): Unit = {
val clickLog = ClickLog("{\"browserType\":\"火狐\",\"categoryID\":20,\"channelID\":19,\"city\":\"ZhengZhou\",\"country\":\"china\",\"entryTime\":1544605260000,\"leaveTime\":1544634060000,\"network\":\"电信\",\"produceID\":7,\"province\":\"HeBeijing\",\"source\":\"百度跳转\",\"userID\":6}")
println(clickLog)
}
}
修改一下 (ClickLog(message), count, timeStamp)
//JSON->元组
val tupleDataStream = kafkaDataStream.map {
msgJSON => {
val jsonObject = JSON.parseObject(msgJSON)
val message = jsonObject.getString("message")
val count = jsonObject.getLong("count")
val timeStamp = jsonObject.getLong("timeStamp")
(ClickLog(message), count, timeStamp)
}
}
tupleDataStream.print()
步骤
Message
样例类,将ClickLog、时间戳、数量封装/**
* 封装Kafka中的消息
* @param clickLog 点击流浏览数据Bean对象
* @param count 数量
* @param timestamp 时间戳
*/
case class Message(var clickLog:ClickLog,
var count:Long,
var timestamp:Long)
App.scala
object App {
def main(args: Array[String]): Unit = {
...
// 使用map算子,将kafka中消费到的数据
val tupleDataStream = kafkaDataStream.map {
msgJSON => {
val jsonObject = JSON.parseObject(msgJSON)
val message = jsonObject.getString("message")
val count = jsonObject.getLong("count")
val timeStamp = jsonObject.getLong("timeStamp")
// (ClickLog(message), count, timeStamp)
Message(ClickLog(message),count,timeStamp)
}
}
...
}
}
水印(watermark)就是一个时间戳
,Flink可以给数据流添加水印,可以理解为:Flink收到一条消息后,额外给这个消息添加了一个时间字段,这就是添加水印
。
水印时间 >= 窗口的endTime
,则触发计算60号
消息的EventTime为10:10:00
, 正好符合10:00:00到10:10:00这个时间窗口的endtime。正常情况, 该消息应该被这个窗口计算的。但是当发生网络延迟的时候,该消息可能会晚到几秒钟,那么当它到达flink时,该窗口已经运算完毕。为了解决该问题,我们为该消息设置watermark时间
为10:09:57
,当它到达flink时,会把该watermark时间设置为窗口的当前时间,由于小于endtime,所以该消息到达后并不会立即计算。直到一个携带watermark时间大于或者等于endtime的时候,窗口计算才会被触发。这样就可以有效的解决由于网络延迟造成的数据计算不精确的情况。App.scala
中添加水印支持tupleDataStream.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[Message]{
// 当前时间戳
var currentTimestamp = 0l
//延迟时间
var maxDelayTime = 2000l
// 获取水印时间
override def getCurrentWatermark: Watermark = {
new Watermark(currentTimestamp - maxDelayTime)
}
// 获取EventTime
override def extractTimestamp(element: Message, previousElementTimestamp: Long): Long = {
// 比较当前消息的时间戳和上一个消息的时间戳,取最大值
currentTimestamp = Math.max(element.timeStamp,previousElementTimestamp)
currentTimestamp
}
})
2.启动执行测试,观察输出
Message(1,1557475915909,ClickLog(火狐,9,14,ShiJiaZhuang,china,1544612460000,1544634060000,移动,2,HeNan,必应跳转,11))
因为集群硬盘紧俏,绝对对原来的表加上COMPRESSION=>LZO属性。但是创建表,长时间没有反馈。决定drop掉这张表,但是始终drop失败。重启集群,hbase 60010界面显示有region transaction。为创建失败的表region,在PENDING_OPEN和CLOSED之间跳。describe 表失败, enable表失败,disable表失败,从60010界面查看表失败。
是否允许子View超出父View的范围,Boolean型true 、false ,默认true不允许;android:clipChildren="true":如下android:clipChildren="false":如下代码:<?xml version="1.0" encoding="utf-8"?><RelativeLayout xmlns:android="http://schemas.android.com/apk/res/android" a
点击蓝色“有关SQL”关注我哟加个“星标”,天天与10000人一起快乐成长数据库测试,似乎是被人遗忘的数据库职业,但依然是不错的选择。底下是我在某站找的招聘启事,就连蚂蚁金服都在积极寻找...
功能说明:分割文件。Split:按指定的行数截断文件格式:split[-n][name]参数说明:-n:指定截断的每一文件的长度,不指定缺省为1000行file:要截断的文件name:截断后产生的文件的文件名的开头字母,不指定,缺省为x,即截断后产生的文件的文件名为xaa,xab....直到xzz例一:split-55myfileff将文件myfile依次截断到名为ffaa,f...
我们在学习Linux内核开发的时候,拿到的Linux内核源码都是开发板厂家移植修改过的。我们按照厂家的使用手册,编译并生成最终的镜像文件,然后烧写到开发板,就可以直接运行。有很多用户会好奇开发板厂家提供的Linux内核都做了哪些修改,如果换做是我自己移植Linux内核,需要怎样的移植步骤呢?目前市面上还没有针对i.MX6ULL开发板的Linux移植教程,虽然网络上有很多Linux内核移植的文档,但...
Xcode11可谓是界面上变化最大的一个版本之一了吧,但是新界面同时也会带来对以前自己熟悉的界面操作的问题,最近我就遇到一个问题,自己辛辛苦苦完成了一个xib,之后想要进行拖线添加属性操作,结果就让我懵逼了,搞了半天,发现,之前拖线的那个分页快捷图标怎么都找不到了,如下图:于是乎,各种百度Google,就是怎么都莫法。之后我今天注意到,在这个界面多了个以前没有的一个图标,如下图红...
文章内容参考来源:https://codeabc.cn/yangw/post/play-musichttps://blog.csdn.net/lindorx/article/details/78724518本次主要遇到两个问题首先是文件名需要使用TEXT()函数中或者在文件路径的双引号""前加大写的L其次是mcisendsting函数所播放的mp3格式音乐需要删除封面亲测以下代码在vs...
获取对应按钮的值如下li遍历的后台传送的数据时,对应li的值是变动的<ul class="nav nav-pills" > <li th:each="partlist,partStat:${list}" th:id="${partlist.partId}" th:onclick="|getPartId(${partlist.partId} )|"> <a th:href="@{/law/criminallaw/{partId}(partId=${
昨天做了一个spring+springmv+hibernate整合的例子。一切都好,最后一步一直报GenericJDBCException: Could not open connection。经过一晚上和一上午的试验,最后发现,例子里的mysql驱动版本是6.0.3。然而,我本地的mysql是5.6.35。这个小坑,可花了不少时间。
在计算机中,乘法运算是一种很重要的的运算,有的机器直接由硬件乘法器完成乘法运算,有的机器内没有乘法器,但可以按机器做乘法运算的方法,用软件编程实现。因此,学习乘法运算有助于乘法器的设计,也有助于乘法编程。计算机的乘法运算是根据乘法笔算的方式改进得到。乘法笔算方式特点:1.被乘数A左移多次。2.4个积位的相加运算。但是不难发现笔算乘法的方式不适合计算机的运算方式;1.4个积位难以一次相加。2.乘积位数增加一倍,造成器材浪费,时间增加。
前序扩展 Girdlayer 是受 openlayer 提供的 ol.source.TileDebug 启发,因为 leaflet 是没有这样的 api,所以就想着做一个类似的插件最终实现效果L.Girdlayer L.GridLayer 是一个一般类,其用于 HTML 元素的格网切片。它是所有切片层(Tile Layer)的基类,且替换了之前版本的 TileLayer.Ca...