Flink 读取Kafka数据示例_flink读取kafka中的ogg格式-程序员宅基地

技术标签: flink  Kafka  window水位  Flink  

1、目标

本例模拟中将集成Kafka与Flink:Flink实时从Kafka中获取消息,每隔10秒去统计机器当前可用的内存数并将结果写入到本地文件中或者打印出来。

2、环境

Apache Kafka 0.11.0.0

Apache Flink 1.3.2

Maven 3.5.3

本例运行在Windows环境本地,使用idea开发代码,代码进行的是本地测试,没有跑在flink集群上,参考博客中是跑在flink集群上,而且flink集群是以local模式启动的。

Maven工程完整pom文件:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>flinktest</groupId>
    <artifactId>flinktest</artifactId>
    <version>1.0-SNAPSHOT</version>

    <build>
        <plugins>
            <plugin>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.7.0</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>



            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.1.0</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <transformers>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>com.ink.FlinkLambdaTest.FlinkToLambda</mainClass>
                                </transformer>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                                    <resource>reference.conf</resource>
                                </transformer>
                            </transformers>
                            <relocations>
                                <relocation>
                                    <pattern>org.codehaus.plexus.util</pattern>
                                    <shadedPattern>org.shaded.plexus.util</shadedPattern>
                                    <excludes>
                                        <exclude>org.codehaus.plexus.util.xml.Xpp3Dom</exclude>
                                        <exclude>org.codehaus.plexus.util.xml.pull.*</exclude>
                                    </excludes>
                                </relocation>
                            </relocations>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

    <dependencies>
        <!--<dependency>-->
            <!--<groupId>org.apache.flink</groupId>-->
            <!--<artifactId>flink-table_2.10</artifactId>-->
            <!--<version>1.3.2</version>-->
        <!--</dependency>-->

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>1.3.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.10</artifactId>
            <version>1.3.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.10</artifactId>
            <version>1.3.2</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-scala_2.10</artifactId>
            <version>1.3.2</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_2.10</artifactId>
            <version>1.3.2</version>
        </dependency>



        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka-0.10_2.10</artifactId>
            <version>1.3.2</version>
        </dependency>
    </dependencies>
</project>

3、创建工程

代码目录结构

4、 代码开发

代码主要由两部分组成:

  • MessageSplitter类、MessageWaterEmitter类和KafkaMessageStreaming类:Flink
    streaming实时处理Kafka消息类
  • KafkaProducerTest类和MemoryUsageExtrator类:构建Kafka测试消息

本例中,Kafka消息格式固定为:时间戳,主机名,当前可用内存数。其中主机名固定设置为machine-1,而时间戳和当前可用内存数都是动态获取。由于本例只会启动一个Kafka producer来模拟单台机器发来的消息,因此在最终的统计结果中只会统计machine-1这一台机器的内存。下面我们先来看完整代码实现。

KafkaMessageStreaming:

   package kafkatoflink;
    
    import org.apache.flink.api.common.functions.FlatMapFunction;
    import org.apache.flink.api.java.tuple.Tuple;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.streaming.api.TimeCharacteristic;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
    import org.apache.flink.streaming.api.windowing.time.Time;
    import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
    import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
    import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
    import org.apache.flink.util.Collector;
    
    import java.util.Properties;
    
   /**
* @Description    KafkaMessageStreaming Flink入口类,封装了对于Kafka消息的处理逻辑。本例每1秒统计一次结果并写入到本地件或者打印出来
* @Author         0262000099 Hengtai Nie
* @CreateDate     2018/9/21 16:51
*/
    public class KafkaMessageStreaming {
    
      public static void main(String[] args) throws Exception {
    
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
        // 非常关键,一定要设置启动检查点!!
        env.enableCheckpointing(5000);
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", "10.47.85.158:9092");
        props.setProperty("group.id", "flink-group");
    
    //    args[0] = "test-0921";  //传入的是kafka中的topic
        FlinkKafkaConsumer010<String> consumer =
                new FlinkKafkaConsumer010<>("test-0921", new SimpleStringSchema(), props);
        consumer.assignTimestampsAndWatermarks(new MessageWaterEmitter());
    
        DataStream<Tuple2<String, Long>> keyedStream = env
                .addSource(consumer)
                .flatMap(new MessageSplitter())
                .keyBy(0)
                .timeWindow(Time.seconds(2))
                .apply(new WindowFunction<Tuple2<String, Long>, Tuple2<String, Long>, Tuple, TimeWindow>() {
                  public void apply(Tuple tuple, TimeWindow window, Iterable<Tuple2<String, Long>> input, Collector<Tuple2<String, Long>> out) throws Exception {
                    long sum = 0L;
                    int count = 0;
                    for (Tuple2<String, Long> record: input) {
                      sum += record.f1;
                      count++;
                    }
                    Tuple2<String, Long> result = input.iterator().next();
                    result.f1 = sum / count;
                    out.collect(result);
                  }
                });
    
    //将结果打印出来
        keyedStream.print();
    //    将结果保存到文件中
    //    args[1] = "E:\\FlinkTest\\KafkaFlinkTest";//传入的是结果保存的路径
        keyedStream.writeAsText("E:\\FlinkTest\\KafkaFlinkTest");
        env.execute("Kafka-Flink Test");
      } 
    }

KafkaProducerTest:

package kafkatoflink;

import org.apache.kafka.clients.producer.*;

import java.util.Properties;

/**
* @Description    KafkaProducerTest 发送Kafka消息
* @Author         0262000099 Hengtai Nie
* @CreateDate     2018/9/21 11:29
*/
public class KafkaProducerTest {

  public static void main(String[] args) throws Exception {
    Properties props = new Properties();
    props.put("bootstrap.servers", "10.47.85.158:9092");
    props.put("acks", "all");
    props.put("retries", 0);
    props.put("batch.size", 16384);
    props.put("linger.ms", 1);
    props.put("buffer.memory", 33554432);
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

    Producer<String, String> producer = new KafkaProducer<>(props);
    int totalMessageCount = 100;
    for (int i = 0; i < totalMessageCount; i++) {
      String value = String.format("%d,%s,%d", System.currentTimeMillis(), "machine-1", currentMemSize());
      producer.send(new ProducerRecord<>("test-0921", value), new Callback() {
        @Override
        public void onCompletion(RecordMetadata metadata, Exception exception) {
          if (exception != null) {
            System.out.println("Failed to send message with exception " + exception);
          }
        }
      });
      Thread.sleep(100L);
    }
    producer.close();
  }

  private static long currentMemSize() {
    return MemoryUsageExtrator.currentFreeMemorySizeInBytes();
  }
}

MemoryUsageExtrator:

        package kafkatoflink;  
        
        import com.sun.management.OperatingSystemMXBean;
        
        import java.lang.management.ManagementFactory;
        
        /**
        * @Description    MemoryUsageExtrator 很简单的工具类,提取当前可用内存字节数
        * @Author         0262000099 Hengtai Nie
        * @CreateDate     2018/9/21 11:28
        */
        public class MemoryUsageExtrator {
        
          private static OperatingSystemMXBean mxBean =
                  (OperatingSystemMXBean) ManagementFactory.getOperatingSystemMXBean();
        
          /**
           * Get current free memory size in bytes
           * @return  free RAM size
           */
          public static long currentFreeMemorySizeInBytes() {
            return mxBean.getFreePhysicalMemorySize();
          }
        }

MessageSplitter:

package kafkatoflink;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;

/**
* @Description    MessageSplitter 将获取到的每条Kafka消息根据“,”分割取出其中的主机名和内存数信息
* @Author         0262000099 Hengtai Nie
* @CreateDate     2018/9/21 11:27
*/
public class MessageSplitter implements FlatMapFunction<String, Tuple2<String, Long>> {

  @Override
  public void flatMap(String value, Collector<Tuple2<String, Long>> out) throws Exception {
    if (value != null && value.contains(",")) {
      String[] parts = value.split(",");
      out.collect(new Tuple2<>(parts[1], Long.parseLong(parts[2])));
    }
  }
}

MessageWaterEmitter:

package kafkatoflink;

import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.watermark.Watermark;
/**
* @Description    MessageWaterEmitter 根据Kafka消息确定Flink的水位
* @Author         0262000099 Hengtai Nie
* @CreateDate     2018/9/21 11:26
*/
public class MessageWaterEmitter implements AssignerWithPunctuatedWatermarks<String> {

  public Watermark checkAndGetNextWatermark(String lastElement, long extractedTimestamp) {
    if (lastElement != null && lastElement.contains(",")) {
      String[] parts = lastElement.split(",");
      return new Watermark(Long.parseLong(parts[0]));
    }
    return null;
  }

  public long extractTimestamp(String element, long previousElementTimestamp) {
    if (element != null && element.contains(",")) {
      String[] parts = element.split(",");
      return Long.parseLong(parts[0]);
    }
    return 0L;
  }
}

5、参考博客

https://www.cnblogs.com/huxi2b/p/7219792.html

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/weixin_41939278/article/details/82803773

智能推荐

使用 IMQ+HTB+iptable 统一流量控制心得_imqhiot-程序员宅基地

文章浏览阅读904次。IMQ 是中介队列设备的简称,是一个虚拟的网卡设备,与物理网卡不同的是,通过它可以进行全局的流量整形,不需要一个网卡一个网卡地限速。这对有多个ISP接入的情况特别方便。配合 Iptables,可以非常方便地进行上传和下载限速。一、网络环境简介目的站点 (Internet) IP: 218.x.x.x (用 TARGET_IP 表示)路由器 (Router) eth1_imqhiot

C++将UTF-8编码的文件转化为GB2312编码_c++ utf-8转gb2312-程序员宅基地

文章浏览阅读1.9k次。C++将UTF-8编码的文件转化为GB2312编码我需要对一个html网页进行解析,html是使用UTF-8编码的。但是,我使用的visual Studio 19是使用gb2312进行编码的。当读入html文件并在控制台上输出时,中文自然全部变成了乱码所以,需要通过某些操作,对读入的字符串进行转化,将UTF-8编码转为GB2312编码在网上看了很多文章,都没有解决问题,或者过于复..._c++ utf-8转gb2312

信安软考 第十四章 恶意代码防范技术原理_第14章恶意代码防范技术原理-程序员宅基地

文章浏览阅读2k次,点赞2次,收藏9次。恶意代码(Malicious code),是一种违背目标系统安全策略的程序代码,会造成目标系统信息泄露、资源滥用,破坏系统的完整性及可用性。它能够经过存储介质或网络进行传播,从一台计算机系统传到另一外一台计算机系统,未经授权认证访问或破坏计算机系统。通常许多人认为“病毒”代表了所有感染计算机并造成破坏的程序,但实际上,换成“恶意代码”更为贴切,病毒只是恶意代码的一种。恶意代码的种类包括。_第14章恶意代码防范技术原理

【IoT】产品设计:硬件成本核算,这篇文章就够了_硬件产品的成本能算得出来吗-程序员宅基地

文章浏览阅读4.5k次,点赞12次,收藏29次。今天主要谈谈硬件产品的成本核算问题。一款新产品上市,面临的直接问题就是「定价」,尤其对于初创企业而言,现金流意味着企业的生命线,定价会直接影响到公司的「毛利润」。毛利润直接决定你银行账户里的收益,是指你卖产品给用户获得的钱与你将产品交付到用户手中需要花费钱的差值。不同类型产品的毛利润差别很大,一般会通过计算毛利率代替。相比于小米硬件成本定价,靠增值服务收费的商业模式,多数硬件公司必须获取足够高的毛利润才能生存下去。无论一款产品要走什么样的定价策略,定价绕不开「成本核算..._硬件产品的成本能算得出来吗

使用IntrospectorCleanupListener 解决quartz引起的内存泄漏问题_quartz 定时任务 报错会引起内存增长吗-程序员宅基地

文章浏览阅读1.7k次。"在服务器运行过程中,Spring不停的运行的计划任务和OpenSessionInViewFilter,使得Tomcat反复加载对象而产生框架并用时可能产生的内存泄漏,则使用IntrospectorCleanupListener作为相应的解决办法。"对于这一句话,引用关于IntrospectorCleanupListener一段解释:spring中的提供了一个名为 org.spring_quartz 定时任务 报错会引起内存增长吗

cordova + ionic混合开发常见问题_ionic4 import org.apache.cordova.file.fileutils;-程序员宅基地

文章浏览阅读1.3k次。1.ion-content 内置元素边距问题在使用的过程中发现ion-content里的元素都距四周有一定的距离,在实际需求中不需要四周距离,经查阅文档去掉ion-content 里面的padding属性即可。2.ionic app 国际化经查阅文档和博客目前ionic+cordova混合开发的app目前国际化较好的方案是ngx-translate2.1 插件安装Angular5安装..._ionic4 import org.apache.cordova.file.fileutils;

随便推点

python在windows下载非最新的安装包_python windows 安装文件下载-程序员宅基地

文章浏览阅读340次。python在windows下载非最新的安装包_python windows 安装文件下载

基于RTMP的视频采集上报播放预警方案设计与实现_小米摄像头流地址-程序员宅基地

文章浏览阅读3.5k次。摘 要为了满足人们日益增长的家庭安防需求,结合基本每家每户都有闲置的智能手机的现状,本文提出了一种基于移动智能端的家庭安防系统构造方案。系统充分利用了智能手机的传感器、摄像头、麦克风、闪光灯以及3G/4G/wifi等网络通讯模块,构造了一个集视频直播、视频点播、远程报警为一体的家庭安防系统。系统采用C/S的架构设计,分设三台客户端和两台服务器。客户端以Android系统为依托分别实现传感器数据采集、视频数据采集、视频播放功能。服务器分为事务服务器和流媒体服务器,事务服务器使用python语言搭建_小米摄像头流地址

sja1000 CAN控制器波特率计算方法详解_bus timing register-程序员宅基地

文章浏览阅读5.4k次。这段时间调试公司处理器can总线,实现最基本的对发实验,can控制器是sja1000,起初因为是对发实验,同样2块开发板,同样内核配置相同,因此时钟以及波特率肯定一样,也没有仔细研究can的时钟以及波特率,今天有客户问can控制器的时钟以及波特率,下午仔细看了一下sja1000手册,并且写了一个由波特率和时钟来计算分频值的小程序,这里总结一下。 sja1000 can工作频率和波特率之间分频_bus timing register

arm处理器异常处理-swi_arm 进入异常之后 lr = pc -4-程序员宅基地

文章浏览阅读1w次。ARM处理器共有7中运行模式: 用户模式(usr) -- 正常程序执行模式 |-- |-- 快速中断模式(fiq) -- 用于高速数据传输和通道处理 特 | 异 | 外部中断模式(irq) _arm 进入异常之后 lr = pc -4

IT行业就只是程序员吗,不要局限于敲代码_ⅰt是程序员吗-程序员宅基地

文章浏览阅读2k次。IT这个行业太广泛了,虽然写代码编程占了其中很重要的一个部分,但是真的不是全部!在IT行业有很多不同的工作角色。开始是一个程序猿,是的,我入行第一份工作确实就是coding,然后做企业级产品的硬件安装工程师,然后是做软件安装工程师,接下来是做解决方案工程师,再然后是系统架构师,再然后是解决方案顾问,现在我在一家外企IT公司负责公司相关产品在几个重点行业的技术支持工作。我除了入行的最开始写了两年程序,其他的职位都不是程序猿!大家不要再把IT局限到写程序了好么,有大把的工作岗..._ⅰt是程序员吗

Tomcat安装与配置(详细教程)_tomcat安装及配置教程-程序员宅基地

文章浏览阅读10w+次,点赞161次,收藏1k次。Tomcat安装与配置,Eclipse集成Tomcat,Eclipse如何配置Tomcat_tomcat安装及配置教程

推荐文章

热门文章

相关标签