Flink实战3-数据实时写入HBase的Sink方式_flink sink hbase-程序员宅基地

技术标签: flink  spark  数据中台  实时数仓  Flink  大数据  

背景

  • 接入Kafka实时数据经过数据处理写入HBase,后续会应用于类似变量系统以及实时日志中,对于变量系统这类中间需要做实时缓存宽表可能使用HBase连接极其频繁,所以是使用客户端还是Sink的方式就看实际情况而定,具体数据处理后的落库Sink还是比较方便的;

摘要

关键字

  • Flink,Sink,HBase,数据处理,数据流转

设计

  • 使用的是Max Well数据源,将业务数据接入Kafka,Flink-Source接入Kafka,中间经过数据流转将数据存储到HBase作实时表;

实现

  • 说明

    此处的处理没有写成项目中使用的比较复杂的可配置化的形式,所以在invoke中做了过滤,也就是只针对单表的操作;

  • 依赖

    <scala.main.version>2.11</scala.main.version>
    <flink.version>1.9.1</flink.version>
    <hbase.version>2.1.0</hbase.version>
    
    <!--flink-->
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-scala_${scala.main.version}</artifactId>
                <version>${flink.version}</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-streaming-scala_${scala.main.version}</artifactId>
                <version>${flink.version}</version>
            </dependency>
    
    
            <!--flink kafka-->
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-connector-kafka_${scala.main.version}</artifactId>
                <version>${flink.version}</version>
            </dependency>
    
    
            <!--flink table & sql-->
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-table-api-scala-bridge_${scala.main.version}</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-table-planner_${scala.main.version}</artifactId>
                <version>${flink.version}</version>
            </dependency>
    
            <!--导入flink连接redis的文件-->
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-connector-redis_${scala.main.version}</artifactId>
                <version>${flink.redis.version}</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-connector-elasticsearch6_${scala.main.version}</artifactId>
                <version>${flink.version}</version>
            </dependency>
    
            <!--rocksdb 与flink 进行整合依赖-->
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-statebackend-rocksdb_2.11</artifactId>
                <version>1.9.1</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-cep-scala_2.11</artifactId>
                <version>${flink.version}</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.hbase</groupId>
                <artifactId>hbase-client</artifactId>
                <version>${hbase.version}</version>
    <!--            <scope>provided</scope>-->
            </dependency>
            <dependency>
                <groupId>org.apache.hbase</groupId>
                <artifactId>hbase-server</artifactId>
                <version>1.2.1</version>
    <!--            <scope>provided</scope>-->
            </dependency>
            <dependency>
                <groupId>org.apache.hbase</groupId>
                <artifactId>hbase-common</artifactId>
                <version>${hbase.version}</version>
    <!--            <scope>provided</scope>-->
            </dependency>
    
  • main

    object MyHbaseSinkTest {
          
      def main(args: Array[String]): Unit = {
          
        //环境
        val env = StreamExecutionEnvironment.getExecutionEnvironment
    
        /**
         * 获取基础参数
         */
        val bootstrapserversnew = Contant.BOOTSTRAP_SERVERS_NEW
        import org.apache.flink.api.scala._
    
        /**
         * 定义kafka-source得到DataStream
         */
        val topics = "vs_merging_middle_topic"
        //将kafka中数据反序列化,
        val valueDeserializer: DeserializationSchema[String] = new SimpleStringSchema()
    
        val properties = new Properties()
        properties.put("bootstrap.servers", bootstrapserversnew)
        properties.put("group.id", "flink_hbase_sink_consumer2")
        properties.put("auto.offset.reset", Contant.AUTO_OFFSET_RESET_VALUE)
    
        println(Contant.BOOTSTRAP_SERVERS_NEW)
        val kafkaSinkDStream = env.addSource(new FlinkKafkaConsumer[String](topics, valueDeserializer, properties))
    
        kafkaSinkDStream.flatMap(data => {
          
          val jsonData = JSON.parseObject(data)
          val table = jsonData.getString("table")
          val rowkey = jsonData.getJSONObject("data").get("id").toString
          val jsonResult = jsonData.getJSONObject("data")
          import scala.collection.JavaConversions._
          jsonResult.keySet().map(key => {
          
            HbaseValueBean("variable_system:" + table, rowkey, "info", key, jsonResult.get(key).toString)
          })
        })
          .addSink(MyHbaseSinkSingle)
    
        env.execute(this.getClass.getSimpleName)
      }
    }
    
  • bean

    case class HbaseValueBean(
                             tableName : String ,
                             rowkey : String ,
                             family : String ,
                             column : String ,
                             value : String
                             )
    
  • HBase-Sink

    object MyHbaseSinkSingle extends RichSinkFunction[HbaseValueBean] {
          
      private var putsResult: List[Put] = collection.immutable.List[Put]()
      private var hbaseConn: Connection = null
      private var maxSize = 100
    
      /**
       * 开始,打开连接
       *
       * @param parameters
       */
      override def open(parameters: Configuration): Unit = {
          
        hbaseConn = HBaseUtils.getHbaseConnNotPoolNew
        maxSize = parameters.getInteger("maxSize" , 100)
        //    val lastInvokeTime = System.currentTimeMillis
      }
    
      /**
       * 数据处理
       *
       * @param value
       * @param context
       */
      override def invoke(value: HbaseValueBean, context: SinkFunction.Context[_]): Unit = {
          
        if (value.tableName == "variable_system:risk_task") {
          
    
          import scala.collection.JavaConversions._
          /**
           * rowkey与put
           */
          val put = new Put(value.rowkey.getBytes())
          PutUtils.setDataString(put , value.family , value.column , value.value)
          putsResult = putsResult :+ put
          println(s"puts-size:${putsResult.size}")
    
          /**
           * 判断输出
           */
          if (putsResult.size() >= maxSize) {
          
            val table = hbaseConn.getTable(TableName.valueOf(value.tableName))
            table.put(putsResult)
    
            println("进行sink")
            println(s"table:${value.tableName} , 已经达到:${maxSize} , 已存储;")
            println(s"puts:${putsResult}")
    
            /**
             * 因为Java与Scala集合转换,所以这里是没有scala的清除方法的
             */
            putsResult = collection.immutable.List[Put]()
            table.close()
          }
        }
      }
    
      /**
       * 满足条件输出数据并且关闭连接
       */
      override def close(): Unit = {
          
        hbaseConn.close()
      }
    }
    
  • Hbase-Connect

    def getHbaseConnNotPoolNew: Connection = {
          
      var conf: Configuration = HBaseConfiguration.create
      conf.set("hbase.zookeeper.quorum", "host:port")
      conf.set("hbase.zookeeper.property.clientPort", "port")
      conf.set("hbase.master", "16000")
      conf.set("hbase.rootdir", "dir")
      val conn = ConnectionFactory.createConnection(conf)
      conn
    }
    
  • HBase-Utiles

    object PutUtils {
          
      def setDataString(put: Put,cf:String,col:String,data:String): Unit ={
          
        put.addColumn(Bytes.toBytes(cf) , Bytes.toBytes(col) , Bytes.toBytes(data))
      }
    
      def setDataStringGetPut(put: Put,cf:String,col:String,data:String): Put ={
          
        put.addColumn(Bytes.toBytes(cf) , Bytes.toBytes(col) , Bytes.toBytes(if(data!= null && data != "") data else "null"))
      }
    }
    

部署

#!/bin/bash
flink run -m yarn-cluster \
-c com.xxxx.flink.MyHbaseSinkTest \
-p 8 \
/home/cdh/xxxx/2020/11/FlinkToKuduDemo/realtime_source_dw.jar

注意事项

  1. 不能够以Spark Streaming的方式来理解Fink的Source和Sink,如果使用客户端是无法针对每个分区进行连接数据处理的,使用Sink可以建立全局连接进行数据存储;
  2. 由上面,全局连接,全局数据处理导致的问题就是不能够每个分区建立线程不安全的集合进行数据存储,必须使用线程安全的集合,也就是不可变的集合进行数据处理,那么使用了Scala之间的集合转换就要注意方法的使用,很多Java结合的方法Scala是没有的,所以一般的清空操作还是使用地址替换重新复制覆盖的方式来进行
  3. addSink的之前的DataStream的数据类型一定是与自定义Sink的操作类型一致的,这个刚开始如果没有注意到就很没有头绪,所以是针对最后的数据类型进行处理存储的;
版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/Kevin__Durant/article/details/110731519

智能推荐

IDEA 出现问题:报错unable to establish loopback connection解决方案_caused by: java.io.ioexception: unable to establis-程序员宅基地

文章浏览阅读2.4w次。错误信息:Caused by: java.io.IOException: Unable to establish loopback connectionat sun.nio.ch.PipeImpl$Initializer.run(PipeImpl.java:101)at sun.nio.ch.PipeImpl$Initializer.run(PipeImpl.java:68)at java.security.AccessController.解决方案_caused by: java.io.ioexception: unable to establish loopback connection

酒店智能门锁方案功能及其特点介绍_门锁发卡系统用什么语言开发出来的-程序员宅基地

文章浏览阅读1.1k次。世界上自有门锁以来,门锁的变革经历了:机械锁—光孔锁—磁片锁—电子锁—智能感应卡锁,门锁不断地更新换代。进入九十年代后,智能卡锁以其无余伦比的优势创建了门锁新概念。计算机、电子机械和智能卡的诞生,门锁形成了完美的梦幻组合。电子门锁集计算机、电子机械、磁电技术为一体,以智能来控制门锁的开启,即给管理者带来安全、迅捷、自动化的管理模式,也给使用者提供了极大方便。科技以人为本,智能电子门锁的产生,源于人..._门锁发卡系统用什么语言开发出来的

IDEA没有自动检查语法,自动关联报错_idea的自动检查不触发-程序员宅基地

文章浏览阅读4.4k次。1 问题藐视idea没有自动报错.例如随便写一段错误代码,其关联类没有报错;新加一个接口方法其实现类没有实现,没有报错;其实就是没有自动检查语法合法性、2 产生原因2.1 开启了省电模式开启了省电模式,关掉即可。2.2 没有开启自动编译项目 2.3 inspection功能故障单击右下角小人头,检查高亮等级是否在none(没有)上,移到Inspections.以上三个问题检查一遍,基本可以解决问题。..._idea的自动检查不触发

mysql修改字段名称脚本_mysql数据库修改字段及新增字段脚本-程序员宅基地

文章浏览阅读1.3k次。1.修改字段的长度ALTER TABLE MODIFY COLUMN 字段名 数据类型(修改后的长度)例句:ALTER TABLE test_table MODIFY COLUMN id INT(20)2.修改字段的名称alter table change 。例句:ALTER TABLE test_tableCHANGE attence_name NAME VARCHAR(20)3...._mysql 修改字段脚本

黑马程序员学习日记(4)--继承、多态-程序员宅基地

文章浏览阅读466次。---------------------- ASP.Net+Android+IO开发S、.Net培训、期待与您交流! -----------------------继承:用法。。父类派生子类;子类可以用父类的所有属性方法,还可以有自己的方法属性。子类除了不能继承父类的构造函数和析构函数外,可以继承父类所有的成员和方法。子类不能直接修改父类的私有成员,只能通过父类的公有方法对其

1028: C语言程序设计教程(第三版)课后习题8.2-程序员宅基地

文章浏览阅读675次。题目描述求方程 的根,用三个函数分别求当b^2-4ac大于0、等于0、和小于0时的根,并输出结果。从主函数输入a、b、c的值。输入a b c输出x1=? x2=?样例输入4 1 1样例输出x1=-0.125+0.484i x2=-0.125-0.484i 1 #include <stdio.h> 2 #include <math.h..._1028:c语言程序设计教程(第三版)课后习题8.2

随便推点

Qt学习之《C++ GUI Qt4编程》(第14章)多线程互斥量使用错误_qt使用互斥锁编译出错-程序员宅基地

文章浏览阅读408次。虽然一直使用VC,但很久之前有使用过Qt写过一些小工具,但没有完整看过Qt一些相关书籍,最近看书发现《C++ GUI Qt4编程》(第14章)多线程互斥量使用好像有问题。 《C++ GUI Qt4编程》(第14章)14.2 这一节将QMutex定义在了 Thread 类下,编译调试书本中的例子,发现使用QMutex Lock()方法会报错: The infer_qt使用互斥锁编译出错

源码解读腾讯 GT 的性能测试方案_service call surfaceflinger-程序员宅基地

文章浏览阅读4.5k次,点赞2次,收藏6次。前言本文将整理腾讯GT各个性能测试项的测试方法,目的是为了帮助移动性能专项测试同学快速过一遍腾讯GT各个性能数据是如何获取的。 一.GT性能测试方案之CPU测试1.简要流程 初始化cpu的数据 提供了两种方法获取CPU数据 getCpuUsage: 整机的CPU使用水平,主要用于实时刷新GT上的CPU数据。通过读取/proc/stat的数据,将每一个核的cpu使用跟闲..._service call surfaceflinger

PHP - 垃圾回收机制收集_gc_collect_cycles 是分进程吗-程序员宅基地

文章浏览阅读3.9k次。了解学习PHP[内存]垃圾回收机制, 帮助理解编程语言的运行机制, 提高开发水平._gc_collect_cycles 是分进程吗

Matlab 绘制两端带尖角(两端尖尖,尖色头条)的colorbar_matlab画图例带尖角-程序员宅基地

文章浏览阅读4.6k次,点赞6次,收藏22次。其实这样的colorbar可以直接手动画出来,不多说,直接上代码cmap = [69 117 180 116 173 203 171 217 233 254 224 144 253 174 77 244 109 67 215 48 39 165 0 38]/255;%画图的部分代码colormap(ax,cmap);xmin = -1..._matlab画图例带尖角

pb与java_Java中使用PB教程-程序员宅基地

文章浏览阅读1.6k次。前言之前在写Netty的时候,说过要写一篇关于PB的应用,所以现在兑现承诺。在应用的过程中,发现了很多问题,本文主要介绍两个最关键的问题。PB如何与java项目融合,自动刷新编译,以及pb文件如何与其他项目共用,互不影响。java中如何实现pb的Extension概述ProtocolBuff 是 google 提出的的一种数据交换格式,跨语言,跨平台,可扩展。基于这种特性广泛的用于网络数据通信。目..._java开发pb接口

线程的同步_package com.atguigu.java1; //死锁的演示 class a { publi-程序员宅基地

文章浏览阅读230次。线程的同步1.背景2.同步代码块和同步方法package com.atguigu.java;/** * 例子:创建三个窗口卖票,总票数为100张.使用实现Runnable接口的方式 * * 1.问题:卖票过程中,出现了重票、错票 -->出现了线程的安全问题 * 2.问题出现的原因:当某个线程操作车票的过程中,尚未操作完成时,其他线程参与进来,也操作车票。 * 3.如何解决:当一个线程a在操作ticket的时候,其他线程不能参与进来。直到线程a操作完ticket时,其他_package com.atguigu.java1; //死锁的演示 class a { public synchronized void