控制Hive MAP个数详解_hive 字段map支持多少个key_suchy_sz的博客-程序员秘密

Hive的MAP数或者说MAPREDUCE的MAP数是由谁来决定的呢?inputsplit size,那么对于每一个inputsplit size是如何计算出来的,这是做MAP数调整的关键.

HADOOP给出了Inputformat接口用于描述输入数据的格式,其中一个关键的方法就是getSplits,对输入的数据进行分片.

Hive对InputFormat进行了封装:

213505502.png

而具体采用的实现是由参数hive.input.format来决定的,主要使用2中类型HiveInputFormat和CombineHiveInputFormat.

对于HiveInputFormat来说:

 

 

public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
    //扫描每一个分区
    for (Path dir : dirs) {
      PartitionDesc part = getPartitionDescFromPath(pathToPartitionInfo, dir);
    //获取分区的输入格式
      Class inputFormatClass = part.getInputFileFormatClass();
      InputFormat inputFormat = getInputFormatFromCache(inputFormatClass, job);
    //按照相应格式的分片算法获取分片
    //注意:这里的Inputformat只是old version API:org.apache.hadoop.mapred而不是org.apache.hadoop.mapreduce,因此不能采用新的API,否则在查询时会报异常:Input format must implement InputFormat.区别就是新的API的计算inputsplit size(Math.max(minSize, Math.min(maxSize, blockSize))和老的(Math.max(minSize, Math.min(goalSize, blockSize)))不一样;
      InputSplit[] iss = inputFormat.getSplits(newjob, numSplits / dirs.length);
      for (InputSplit is : iss) {
    //封装结果,返回
        result.add(new HiveInputSplit(is, inputFormatClass.getName()));
      }
    }
    return result.toArray(new HiveInputSplit[result.size()]);
}

 

对于CombineHiveInputFormat来说的计算就比较复杂了:

 

public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
    //加载CombineFileInputFormatShim,这个类继承了org.apache.hadoop.mapred.lib.CombineFileInputFormat
    CombineFileInputFormatShim combine = ShimLoader.getHadoopShims()
        .getCombineFileInputFormat();
if (combine == null) {
//若为空则采用HiveInputFormat的方式,下同
      return super.getSplits(job, numSplits);
    }
    Path[] paths = combine.getInputPathsShim(job);
for (Path path : paths) {
//若是外部表,则按照HiveInputFormat方式分片
      if ((tableDesc != null) && tableDesc.isNonNative()) {
        return super.getSplits(job, numSplits);
      }
      Class inputFormatClass = part.getInputFileFormatClass();
      String inputFormatClassName = inputFormatClass.getName();
      InputFormat inputFormat = getInputFormatFromCache(inputFormatClass, job);
      if (this.mrwork != null && !this.mrwork.getHadoopSupportsSplittable()) {
        if (inputFormat instanceof TextInputFormat) {
         if ((new CompressionCodecFactory(job)).getCodec(path) != null)
//在未开启hive.hadoop.supports.splittable.combineinputformat(MAPREDUCE-1597)参数情况下,对于TextInputFormat并且为压缩则采用HiveInputFormat分片算法
                    return super.getSplits(job, numSplits);
        }
      }
    //对于连接式同上
      if (inputFormat instanceof SymlinkTextInputFormat) {
        return super.getSplits(job, numSplits);
      }
      CombineFilter f = null;
      boolean done = false;
Path filterPath = path;
//由参数hive.mapper.cannot.span.multiple.partitions控制,默认false;如果没true,则对每一个partition创建一个pool,以下省略为true的处理;对于同一个表的同一个文件格式的split创建一个pool为combine做准备;
      if (!mrwork.isMapperCannotSpanPartns()) {
        opList = HiveFileFormatUtils.doGetWorksFromPath(
                   pathToAliases, aliasToWork, filterPath);
        f = poolMap.get(new CombinePathInputFormat(opList, inputFormatClassName));
      }
      if (!done) {
        if (f == null) {
          f = new CombineFilter(filterPath);
          combine.createPool(job, f);
        } else {
          f.addPath(filterPath);
        }
      }
    }
if (!mrwork.isMapperCannotSpanPartns()) {
//到这里才调用combine的分片算法,继承了org.apache.hadoop.mapred.lib.CombineFileInputFormat extends 新版本CombineFileInputformat
      iss = Arrays.asList(combine.getSplits(job, 1));
}
//对于sample查询特殊处理
    if (mrwork.getNameToSplitSample() != null && !mrwork.getNameToSplitSample().isEmpty()) {
      iss = sampleSplits(iss);
}
//封装结果返回
    for (InputSplitShim is : iss) {
      CombineHiveInputSplit csplit = new CombineHiveInputSplit(job, is);
      result.add(csplit);
    }
    return result.toArray(new CombineHiveInputSplit[result.size()]);
  }

具体combine的getSplits算法如下:

 

public List<InputSplit> getSplits(JobContext job)
    throws IOException {
        //决定切分的几个参数
    if (minSplitSizeNode != 0) {
      minSizeNode = minSplitSizeNode;
    } else {
      minSizeNode = conf.getLong(SPLIT_MINSIZE_PERNODE, 0);
    }
    if (minSplitSizeRack != 0) {
      minSizeRack = minSplitSizeRack;
    } else {
      minSizeRack = conf.getLong(SPLIT_MINSIZE_PERRACK, 0);
    }
    if (maxSplitSize != 0) {
      maxSize = maxSplitSize;
    } else {
      maxSize= = conf.getLong("mapreduce.input.fileinputformat.split.maxsize", 0);
    }
        for (MultiPathFilter onepool : pools) {
      ArrayList<Path> myPaths = new ArrayList<Path>();
      // create splits for all files in this pool.
      getMoreSplits(job, myPaths.toArray(new Path[myPaths.size()]),
                    maxSize, minSizeNode, minSizeRack, splits);
    }
}

跳到getMoreSplits:主要是填充如下数据结构,

 

// all blocks for all the files in input set
    OneFileInfo[] files;
    // mapping from a rack name to the list of blocks it has
    HashMap<String, List<OneBlockInfo>> rackToBlocks = new HashMap<String, List<OneBlockInfo>>();
    // mapping from a block to the nodes on which it has replicas
    HashMap<OneBlockInfo, String[]> blockToNodes = new HashMap<OneBlockInfo, String[]>();
    // mapping from a node to the list of blocks that it contains
    HashMap<String, List<OneBlockInfo>> nodeToBlocks = new HashMap<String, List<OneBlockInfo>>();

 

 

大概流程则是(这里blockInfo生成略过不表,可以参考MAPREDUCE-2046):

1.首先处理每个Datanode的blockInfo,先按照>=maxsplitsize来切分split,剩余的再按照blockinfo>=minSplitSizeNode切分,其余的等和rack的其余blockinfo进行合并

2.其次对每个Rack进行处理:先按照>=maxsplitsize来切分split,剩余的再按照blockinfo>=minSplitSizeRack切分,其余的等和overflow的其余blockinfo进行合并

3.对于overflow blockInfo直接根据maxsplitsize来进行切分.

 

其余影响MAP数的参数比较好理解了:

1.影响在MAPREDUCE后是否会启动MAP进行文件合并

hive.merge.mapfiles,hive.merge.mapredfiles,hive.merge.size.per.task(default=256 * 1000 * 1000),hive.merge.smallfiles.avgsize(default=16 * 1000 * 1000)

2.影响是否存在skew开启多MAP:

hive.groupby.skewindata=false:

当该参数有true时会生成2个MR:

第一个MR的分区键是grouping key+distinct key,通过hash分配到reduce进行第一次聚合操作

第二个MR的分区键则是grouping key进行第二次聚合;(2个MR的sort key都是grouping key+distinct key)

https://issues.apache.org/jira/browse/HIVE-5118

hive.optimize.skewjoin=false

hive.optimize.skewjoin.compiletime=false

hive.skewjoin.key=100000

hive.skewjoin.mapjoin.map.tasks=10000

hive.skewjoin.mapjoin.min.split=33554432

3.mapreduce参数,是否开启map speculative

4.bucket table.

对于MAP/REDUCE的性能分析放到下一篇再说吧

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/shuyun123456789/article/details/90353368

智能推荐

虚拟机和主机之间互相复制粘贴文本_iteye_14608的博客-程序员秘密

就在下面的blog里面。讲的很清楚。http://sdbaby.blog.51cto.com/149645/301494

mysql之slave_skip_errors选项_weixin_30389003的博客-程序员秘密

要说slave_skip_errors选项,就不得不提mysql的replication机制,总的来说它分了三步来实现mysql主从库的同步master将改变记录到二进制日志(binary log)中(这些记录叫做二进制日志事件,binary log events);slave将master的binary log events拷贝到它的中继日志(relay log);slave重做...

activemq无法访问_-星雨-的博客-程序员秘密

activemq无法访问服务器安装activemq无法访问服务器安装activemq无法访问服务器安装activemq,在本地浏览器无法访问8161端口,这是因为activemq默认是本地回环监听127.0.0.1,修改conf/jetty.xml文件,把127.0.0.1修改成0.0.0.0就可以了输入./bin/activemq restart重启即可curl 127.0.0.1:8161 //查看服务器是否可以连接本地的activemqcurl x.x.x.x:8161

一个接口存在多个实现类,动态调用不同的实现类_黑衣如墨剑如雪的博客-程序员秘密

实现动态调用,方法有很多种,在这里记下一种我觉得比较简单的,方便以后翻阅。接口类package com.su.mybatis.oracle.service;public interface TestService { String firstStep(); String secondStep(); String thirdStep();...

iTop系统使用手册_itop使用手册_杨治中的博客-程序员秘密

目录CMDB概述 iTop系统概述 iTop功能操作 3.1. 配置管理 3.2. 变更管理 3.3. 事件管理 3.4. 问题管理 3.5. 服务管理1. CMDB概述随着信息技术的发展, IT系统已经成为企业业务发展不可或缺的支撑基础。IT运维管理系统是以CMDB为核心,以网络、服务器、应用的监控为基础,操作行为审计为安全准则,上层整合了符合ITIL管理思想的服务台、事件管理、问题管理、变更管理等流程,从而使IT管理从日常的运营监控、统计分析、发现问题、解决问题...

Thinkphp5使用ZipArchive批量打包下载图片_半途爱人的博客-程序员秘密

thinkphp5中自带了ZipArchive,我们可以直接调用打包文件;第一步: 因为是批量打包文件,所以我们需要传入数据id,我是在页面中使用js方法拼接的id字符串,所以在后台我需要explode将字符串转换为数组才可以,如果你们的方法与我的不一样,可省掉这一部分;第二步: 循环id,查找要打包的文件路径,并且将所有的文件路径保存在同一个数组中(代码中的$files[]);第三步: 参数解释:$filename: 设置压缩文件的名称以及位置,这样我们下载的时候才能下载成功;array_po

随便推点

Qt下libusb-win32的使用(二)批量读写操作_weixin_34122810的博客-程序员秘密

一、概述 学习libusb-win32的使用。使用批量传输方式与USB开发板进行数据读、写操作。上位机使用Qt做界面,使用USB开发板的端点2作为批量传输端点。 二、实现 代码比较简单,直接给出,如下: 1 #include "testlibusb.h" 2 3 //for Tiny6410 4 //#defi...

Java实现一棵二叉树的构建以及其深度优先遍历算法_林志鹏JAVA的博客-程序员秘密

Java实现二叉树树这个数据结构在平时的实际开发中其实十分常见,例如,要表示级联关系时,通常使用树来表达。例如,生活中的家族族谱关系,组织架构之间的关系,系统菜单中的级联关系,省市区的一对多关系,业务数据的父子关系等,可见,掌握树的基本数据结构以及如何去应用他,对我们平时的开发是十分有利的。树的定义树是 n 个节点的有限集,这里n&gt;=0。当n=0时,称之为一棵空树。树的特点有且仅有一个特定的称为根的节点。当n&gt;1时,其余的节点可分为 m 个互不相交的有限集,每一个集合本身又是

罗马数字的再解释_Ⅹx.96.cc_五道口纳什的博客-程序员秘密

罗马数字与罗马字母1. 构成规律IVXLCDMI\quad V\quad X\quad L\quad C \quad D\quad M5 个 II 是 1 个 VV,2 个 VV 是 1 个 XX5 个 XX 是 1 个 LL,2 个 LL 是 1 个 CC 5 个 CC 是 1 个 DD,2 个 DD 是 1 个 MM

嵌入式工程师培训技能 嵌入式开发学什么?_weixin_34008805的博客-程序员秘密

  想知道嵌入式软件开发编程学的是什么?我们得先知道什么是嵌入式开发,今天就来整理整理嵌入式工程师培训技能。  一般来说它负责上层应用软件,主要要用:  (1)CPU:51/MSP430/ARM7/ARM9;  (2)外设:AD/DA、UART、USB、LCD;  (3)总线和通讯接口:I2C,SPI,RS232,RS484,CAN,USB,MODBUS(或有实时性要求)  ...

TSE-定义Page类_deepfuture的博客-程序员秘密

有了URL,搜集系统就可以按照URL标识抓取其所对应的网页,网页信息保存在Page类中。下面是Page类的定义,对应文件Page.h。class CPage{public:string m_sUrl;// 网页头信息string m_sHeader;int m_nLenHeader;int m_nStatusCode;int m_nContentLength;string m_sLocation;

Python MySQL 删除表_python sql 删除表 execute_八狐云|酷画册|二维码生成的博客-程序员秘密

Python MySQL 教程Python MySQL 入门 Python MySQL 创建数据库 Python MySQL 创建表 Python MySQL 插入表 Python MySQL Select Python MySQL Where Python MySQL Order By Python MySQL Delete Python MySQL 删除表 Python ...

推荐文章

热门文章

相关标签