技术标签: flink spark 数据中台 实时数仓 Flink 大数据
说明
此处的处理没有写成项目中使用的比较复杂的可配置化的形式,所以在invoke中做了过滤,也就是只针对单表的操作;
依赖
<scala.main.version>2.11</scala.main.version>
<flink.version>1.9.1</flink.version>
<hbase.version>2.1.0</hbase.version>
<!--flink-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_${scala.main.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.main.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!--flink kafka-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_${scala.main.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!--flink table & sql-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_${scala.main.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_${scala.main.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!--导入flink连接redis的文件-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-redis_${scala.main.version}</artifactId>
<version>${flink.redis.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch6_${scala.main.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!--rocksdb 与flink 进行整合依赖-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb_2.11</artifactId>
<version>1.9.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cep-scala_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>${hbase.version}</version>
<!-- <scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>1.2.1</version>
<!-- <scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-common</artifactId>
<version>${hbase.version}</version>
<!-- <scope>provided</scope>-->
</dependency>
main
object MyHbaseSinkTest {
def main(args: Array[String]): Unit = {
//环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
/**
* 获取基础参数
*/
val bootstrapserversnew = Contant.BOOTSTRAP_SERVERS_NEW
import org.apache.flink.api.scala._
/**
* 定义kafka-source得到DataStream
*/
val topics = "vs_merging_middle_topic"
//将kafka中数据反序列化,
val valueDeserializer: DeserializationSchema[String] = new SimpleStringSchema()
val properties = new Properties()
properties.put("bootstrap.servers", bootstrapserversnew)
properties.put("group.id", "flink_hbase_sink_consumer2")
properties.put("auto.offset.reset", Contant.AUTO_OFFSET_RESET_VALUE)
println(Contant.BOOTSTRAP_SERVERS_NEW)
val kafkaSinkDStream = env.addSource(new FlinkKafkaConsumer[String](topics, valueDeserializer, properties))
kafkaSinkDStream.flatMap(data => {
val jsonData = JSON.parseObject(data)
val table = jsonData.getString("table")
val rowkey = jsonData.getJSONObject("data").get("id").toString
val jsonResult = jsonData.getJSONObject("data")
import scala.collection.JavaConversions._
jsonResult.keySet().map(key => {
HbaseValueBean("variable_system:" + table, rowkey, "info", key, jsonResult.get(key).toString)
})
})
.addSink(MyHbaseSinkSingle)
env.execute(this.getClass.getSimpleName)
}
}
bean
case class HbaseValueBean(
tableName : String ,
rowkey : String ,
family : String ,
column : String ,
value : String
)
HBase-Sink
object MyHbaseSinkSingle extends RichSinkFunction[HbaseValueBean] {
private var putsResult: List[Put] = collection.immutable.List[Put]()
private var hbaseConn: Connection = null
private var maxSize = 100
/**
* 开始,打开连接
*
* @param parameters
*/
override def open(parameters: Configuration): Unit = {
hbaseConn = HBaseUtils.getHbaseConnNotPoolNew
maxSize = parameters.getInteger("maxSize" , 100)
// val lastInvokeTime = System.currentTimeMillis
}
/**
* 数据处理
*
* @param value
* @param context
*/
override def invoke(value: HbaseValueBean, context: SinkFunction.Context[_]): Unit = {
if (value.tableName == "variable_system:risk_task") {
import scala.collection.JavaConversions._
/**
* rowkey与put
*/
val put = new Put(value.rowkey.getBytes())
PutUtils.setDataString(put , value.family , value.column , value.value)
putsResult = putsResult :+ put
println(s"puts-size:${putsResult.size}")
/**
* 判断输出
*/
if (putsResult.size() >= maxSize) {
val table = hbaseConn.getTable(TableName.valueOf(value.tableName))
table.put(putsResult)
println("进行sink")
println(s"table:${value.tableName} , 已经达到:${maxSize} , 已存储;")
println(s"puts:${putsResult}")
/**
* 因为Java与Scala集合转换,所以这里是没有scala的清除方法的
*/
putsResult = collection.immutable.List[Put]()
table.close()
}
}
}
/**
* 满足条件输出数据并且关闭连接
*/
override def close(): Unit = {
hbaseConn.close()
}
}
Hbase-Connect
def getHbaseConnNotPoolNew: Connection = {
var conf: Configuration = HBaseConfiguration.create
conf.set("hbase.zookeeper.quorum", "host:port")
conf.set("hbase.zookeeper.property.clientPort", "port")
conf.set("hbase.master", "16000")
conf.set("hbase.rootdir", "dir")
val conn = ConnectionFactory.createConnection(conf)
conn
}
HBase-Utiles
object PutUtils {
def setDataString(put: Put,cf:String,col:String,data:String): Unit ={
put.addColumn(Bytes.toBytes(cf) , Bytes.toBytes(col) , Bytes.toBytes(data))
}
def setDataStringGetPut(put: Put,cf:String,col:String,data:String): Put ={
put.addColumn(Bytes.toBytes(cf) , Bytes.toBytes(col) , Bytes.toBytes(if(data!= null && data != "") data else "null"))
}
}
#!/bin/bash
flink run -m yarn-cluster \
-c com.xxxx.flink.MyHbaseSinkTest \
-p 8 \
/home/cdh/xxxx/2020/11/FlinkToKuduDemo/realtime_source_dw.jar
文章浏览阅读2.4w次。错误信息:Caused by: java.io.IOException: Unable to establish loopback connectionat sun.nio.ch.PipeImpl$Initializer.run(PipeImpl.java:101)at sun.nio.ch.PipeImpl$Initializer.run(PipeImpl.java:68)at java.security.AccessController.解决方案_caused by: java.io.ioexception: unable to establish loopback connection
文章浏览阅读1.1k次。世界上自有门锁以来,门锁的变革经历了:机械锁—光孔锁—磁片锁—电子锁—智能感应卡锁,门锁不断地更新换代。进入九十年代后,智能卡锁以其无余伦比的优势创建了门锁新概念。计算机、电子机械和智能卡的诞生,门锁形成了完美的梦幻组合。电子门锁集计算机、电子机械、磁电技术为一体,以智能来控制门锁的开启,即给管理者带来安全、迅捷、自动化的管理模式,也给使用者提供了极大方便。科技以人为本,智能电子门锁的产生,源于人..._门锁发卡系统用什么语言开发出来的
文章浏览阅读4.4k次。1 问题藐视idea没有自动报错.例如随便写一段错误代码,其关联类没有报错;新加一个接口方法其实现类没有实现,没有报错;其实就是没有自动检查语法合法性、2 产生原因2.1 开启了省电模式开启了省电模式,关掉即可。2.2 没有开启自动编译项目 2.3 inspection功能故障单击右下角小人头,检查高亮等级是否在none(没有)上,移到Inspections.以上三个问题检查一遍,基本可以解决问题。..._idea的自动检查不触发
文章浏览阅读1.3k次。1.修改字段的长度ALTER TABLE MODIFY COLUMN 字段名 数据类型(修改后的长度)例句:ALTER TABLE test_table MODIFY COLUMN id INT(20)2.修改字段的名称alter table change 。例句:ALTER TABLE test_tableCHANGE attence_name NAME VARCHAR(20)3...._mysql 修改字段脚本
文章浏览阅读466次。---------------------- ASP.Net+Android+IO开发S、.Net培训、期待与您交流! -----------------------继承:用法。。父类派生子类;子类可以用父类的所有属性方法,还可以有自己的方法属性。子类除了不能继承父类的构造函数和析构函数外,可以继承父类所有的成员和方法。子类不能直接修改父类的私有成员,只能通过父类的公有方法对其
文章浏览阅读675次。题目描述求方程 的根,用三个函数分别求当b^2-4ac大于0、等于0、和小于0时的根,并输出结果。从主函数输入a、b、c的值。输入a b c输出x1=? x2=?样例输入4 1 1样例输出x1=-0.125+0.484i x2=-0.125-0.484i 1 #include <stdio.h> 2 #include <math.h..._1028:c语言程序设计教程(第三版)课后习题8.2
文章浏览阅读408次。虽然一直使用VC,但很久之前有使用过Qt写过一些小工具,但没有完整看过Qt一些相关书籍,最近看书发现《C++ GUI Qt4编程》(第14章)多线程互斥量使用好像有问题。 《C++ GUI Qt4编程》(第14章)14.2 这一节将QMutex定义在了 Thread 类下,编译调试书本中的例子,发现使用QMutex Lock()方法会报错: The infer_qt使用互斥锁编译出错
文章浏览阅读4.5k次,点赞2次,收藏6次。前言本文将整理腾讯GT各个性能测试项的测试方法,目的是为了帮助移动性能专项测试同学快速过一遍腾讯GT各个性能数据是如何获取的。 一.GT性能测试方案之CPU测试1.简要流程 初始化cpu的数据 提供了两种方法获取CPU数据 getCpuUsage: 整机的CPU使用水平,主要用于实时刷新GT上的CPU数据。通过读取/proc/stat的数据,将每一个核的cpu使用跟闲..._service call surfaceflinger
文章浏览阅读3.9k次。了解学习PHP[内存]垃圾回收机制, 帮助理解编程语言的运行机制, 提高开发水平._gc_collect_cycles 是分进程吗
文章浏览阅读4.6k次,点赞6次,收藏22次。其实这样的colorbar可以直接手动画出来,不多说,直接上代码cmap = [69 117 180 116 173 203 171 217 233 254 224 144 253 174 77 244 109 67 215 48 39 165 0 38]/255;%画图的部分代码colormap(ax,cmap);xmin = -1..._matlab画图例带尖角
文章浏览阅读1.6k次。前言之前在写Netty的时候,说过要写一篇关于PB的应用,所以现在兑现承诺。在应用的过程中,发现了很多问题,本文主要介绍两个最关键的问题。PB如何与java项目融合,自动刷新编译,以及pb文件如何与其他项目共用,互不影响。java中如何实现pb的Extension概述ProtocolBuff 是 google 提出的的一种数据交换格式,跨语言,跨平台,可扩展。基于这种特性广泛的用于网络数据通信。目..._java开发pb接口
文章浏览阅读230次。线程的同步1.背景2.同步代码块和同步方法package com.atguigu.java;/** * 例子:创建三个窗口卖票,总票数为100张.使用实现Runnable接口的方式 * * 1.问题:卖票过程中,出现了重票、错票 -->出现了线程的安全问题 * 2.问题出现的原因:当某个线程操作车票的过程中,尚未操作完成时,其他线程参与进来,也操作车票。 * 3.如何解决:当一个线程a在操作ticket的时候,其他线程不能参与进来。直到线程a操作完ticket时,其他_package com.atguigu.java1; //死锁的演示 class a { public synchronized void