combineByKey算子求解平均值实例_scala combinebykey求平均值-程序员宅基地

技术标签: 实例  

不同场景平均值算法


求平均值系列之一:

val input = sc.parallelize(Seq(("t1", 1), ("t1", 2), ("t1", 3), ("t2", 2), ("t2", 5)))
val result = input.combineByKey( 
(v) => (v, 1), 
(acc: (Int, Int), v) => (acc._1 + v, acc._2 + 1),
(acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2) 
).map{ case (key, value) => (key, value._1 / value._2.toFloat) }
result.collectAsMap().foreach(println(_)) 


-----------------测试运行结果:--------------------
scala> val input = sc.parallelize(Seq(("t1", 1), ("t1", 2), ("t1", 3), ("t2", 2), ("t2", 5)))
input: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[17] at parallelize at <console>:25


scala> val result = input.combineByKey( 
     | (v) => (v, 1), 
     | (acc: (Int, Int), v) => (acc._1 + v, acc._2 + 1),
     | (acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2) 
     | ).map{ case (key, value) => (key, value._1 / value._2.toFloat) }
result: org.apache.spark.rdd.RDD[(String, Float)] = MapPartitionsRDD[19] at map at <console>:31


scala> result.collectAsMap().foreach(println(_)) 
(t1,2.0)
(t2,3.5)



求平均值系列之二:

val testData = sc.parallelize(Seq(("t1", (1,2)), ("t1", (2,4)), ("t1", (3,5)), ("t2", (2,1)), ("t2", (5,2))))
(t2,(6,3))(t1,(4,2))


val result = testData.combineByKey( 
(v) => (v._1, v._2), 
(acc: (Int, Int), v) => (acc._1 + v._1, acc._2 + v._2),
(acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2) 
).map{ case (key, value) => (key, value._1 / value._2) }
result.collectAsMap().foreach(println(_)) 


val result = testData.combineByKey( 
(v) => (v._1, v._2), 
(acc: (Int, Int), v) => (acc._1 + v._1, acc._2 + v._2),
(acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2) 
).map{ case (key, value) => (key, value._1 / value._2) }
result.collectAsMap().foreach(println(_)) 


求平均值算法验证,下面不能直接使用于生成环境


val testData = sc.parallelize(Seq(("t1", (1,2)), ("t1", (2,4)), ("t1", (3,5)), ("t2", (2,1)), ("t2", (5,2))))
 
 val tt = testData.combineByKey((_: String) => (0, 0),(pair: (Int, Int), value: String) =>(pair._1 + Integer.parseInt(value.split("#")(0)), pair._2 + Integer.parseInt(value.split("#")(1))),(pair1: (Int, Int), pair2: (Int, Int)) =>(pair1._1 + pair2._1, pair2._2 + pair2._2))


(t2,(5,2))(t1,(2,0))
val tt = testData.combineByKey((_: String) => (0, 0),(pair: (Int, Int), value: String) => (pair._1 + Integer.parseInt(value.split("#")(0)), pair._2 + Integer.parseInt(value.split("#")(1))),(pair1: (Int, Int), pair2: (Int, Int)) =>( pair1._1 + pair2._1, pair2._2 + pair2._2 ))




val averages: RDD[String, Double] = sumCountPairs.mapValues {
  case (sum, 0L) => 0D
  case (sum, count) => sum.toDouble / count
}


val sumCountPairs:RDD[(String, (Int, Long))] = testData.combineByKey(
  (_: Int) => (0, 0L),
  (pair: (Int, Long), value: Int) =>
    (pair._1 + value, pair._2 + 1L),
  (pair1: (Int, Long), pair2: (Int, Long)) =>
    (pair1._1 + pair2._1, pair2._2 + pair2._2)
)


val sumCountPairs = testData.combineByKey(
  (_: Int) => (0, 0L),
  (pair: (Int, Long), value: Int) =>
    (pair._1 + value, pair._2 + 1L),
  (pair1: (Int, Long), pair2: (Int, Long)) =>
    (pair1._1 + pair2._1, pair2._2 + pair2._2)
)


val averages = sumCountPairs.mapValues {
  case (sum, 0L) => 0D
  case (sum, count) => sum.toDouble / count
}



案例拾遗:


题主示例代码中 testData 这个 RDD 的类型是已经确定为 RDD[(String, Int)],然后通过 RDD.rddToRDDPairFunctions 这个隐式类型转换转为 PairRDDFunctions[String, Int],从而获得 reduceByKey 和 combineByKey 这两个 methods。然后来对比下二者的函数签名: class PairRDDFunctions[K, V](...) {
  def reduceByKey(func: (V, V) => V): RDD[(K, V)]


  def combineByKey[C](
      createCombiner: V => C,
      mergeValue: (C, V) => C,
      mergeCombiners: (C, C) => C): RDD[(K, C)]
}
可以看到 reduceByKey 的 func 参数的类型只依赖于 PairRDDFunction 的类型参数 V,在这个例子里也就是 Int。于是 func 的类型已经确定为 (Int, Int) => Int,所以就不需要额外标识类型了。而 combineByKey 比 reduceByKey 更加通用,它允许各个 partition 在 shuffle 前先做 local reduce 得到一个类型为 C 的中间值,待 shuffle 后再做合并得到各个 key 对应的 C。以求均值为例,我们可以让每个 partiton 先求出单个 partition 内各个 key 对应的所有整数的和 sum 以及个数 count,然后返回一个 pair (sum, count)。在 shuffle 后累加各个 key 对应的所有 sum 和 count,再相除得到均值:val sumCountPairs: RDD[(String, (Int, Long))] = testData.combineByKey(
  (_: Int) => (0, 0L),


  (pair: (Int, Long), value: Int) =>
    (pair._1 + value, pair._2 + 1L),


  (pair1: (Int, Long), pair2: (Int, Long)) =>
    (pair1._1 + part2._1, pair2._2 + pair2._2)
)


val averages: RDD[String, Double] = sumCountPairs.mapValues {
  case (sum, 0L) => 0D
  case (sum, count) => sum.toDouble / count
}
由于 C 这个 类型参数是任意的,并不能从 testData 的类型直接推导出来,所以必须明确指定。只不过题主的例子是最简单的用 reduceByKey 就可以搞定的情况,也就是 V 和 C 完全相同,于是就看不出区别了。




val listRDD = sc.parallelize(List(1,2,3,4,4,5)).map(x => (x,1))
def combineByKey[C](createCombiner: Int => C,mergeValue: (C, Int) => C,mergeCombiners: (C, C) => C): org.apache.spark.rdd.RDD[(Int, C)]


val sumandcnt = listRDD.combineByKey((_: Int)=>(0, 0),(pair:(Int,Int),value:Int)=>(pair._1 + value, pair._2 + 1),(pair1:(Int, Int),pair2:(Int, Int))=>(pair1._1 + pair2._1, pair2._2 + pair2._2))
val ll =sumandcnt.mapValues {
  case (sum, 0) => 0D
  case (sum, count) => sum.toDouble / count
}


val rdd = List(1,2,3,4)
val input = sc.parallelize(rdd)
val result = input.aggregate((0,0))(
(acc,value) => (acc._1 + value, acc._2 + 1),
(acc1,acc2) => (acc1._1 + acc2._1, acc1._2 + acc2._2)
)




result: (Int, Int) = (10, 4)
val avg = result._1 / result._2
avg: Int = 2.5




程序的详细过程大概如下:
首先定义一个初始值 (0, 0),即我们期待的返回类型的初始值。
(acc,value) => (acc._1 + value, acc._2 + 1), value是函数定义里面的T,这里是List里面的元素。所以acc._1 + value, acc._2 + 1的过程如下:
0+1, 0+1
1+2, 1+1
3+3, 2+1
6+4, 3+1
结果为 (10,4)。在实际Spark执行中是分布式计算,可能会把List分成多个分区,假如3个,p1(1,2), p2(3), p3(4),经过计算各分区的的结果 (3,2), (3,1), (4,1),这样,执行 (acc1,acc2) => (acc1._1 + acc2._1, acc1._2 + acc2._2) 就是 (3+3+4,2+1+1) 即 (10,4),然后再计算平均值。




版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/andyliuzhii/article/details/76851197

智能推荐

前端开发之vue-grid-layout的使用和实例-程序员宅基地

文章浏览阅读1.1w次,点赞7次,收藏34次。vue-grid-layout的使用、实例、遇到的问题和解决方案_vue-grid-layout

Power Apps-上传附件控件_powerapps点击按钮上传附件-程序员宅基地

文章浏览阅读218次。然后连接一个数据源,就会在下面自动产生一个添加附件的组件。把这个控件复制粘贴到页面里,就可以单独使用来上传了。插入一个“编辑”窗体。_powerapps点击按钮上传附件

C++ 面向对象(Object-Oriented)的特征 & 构造函数& 析构函数_"object(cnofd[\"ofdrender\"])十条"-程序员宅基地

文章浏览阅读264次。(1) Abstraction (抽象)(2) Polymorphism (多态)(3) Inheritance (继承)(4) Encapsulation (封装)_"object(cnofd[\"ofdrender\"])十条"

修改node_modules源码,并保存,使用patch-package打补丁,git提交代码后,所有人可以用到修改后的_修改 node_modules-程序员宅基地

文章浏览阅读133次。删除node_modules,重新npm install看是否成功。在 package.json 文件中的 scripts 中加入。修改你的第三方库的bug等。然后目录会多出一个目录文件。_修改 node_modules

【】kali--password:su的 Authentication failure问题,&sudo passwd root输入密码时Sorry, try again._password: su: authentication failure-程序员宅基地

文章浏览阅读883次。【代码】【】kali--password:su的 Authentication failure问题,&sudo passwd root输入密码时Sorry, try again._password: su: authentication failure

整理5个优秀的微信小程序开源项目_微信小程序开源模板-程序员宅基地

文章浏览阅读1w次,点赞13次,收藏97次。整理5个优秀的微信小程序开源项目。收集了微信小程序开发过程中会使用到的资料、问题以及第三方组件库。_微信小程序开源模板

随便推点

Centos7最简搭建NFS服务器_centos7 搭建nfs server-程序员宅基地

文章浏览阅读128次。Centos7最简搭建NFS服务器_centos7 搭建nfs server

Springboot整合Mybatis-Plus使用总结(mybatis 坑补充)_mybaitis-plus ruledataobjectattributemapper' and '-程序员宅基地

文章浏览阅读1.2k次,点赞2次,收藏3次。前言mybatis在持久层框架中还是比较火的,一般项目都是基于ssm。虽然mybatis可以直接在xml中通过SQL语句操作数据库,很是灵活。但正其操作都要通过SQL语句进行,就必须写大量的xml文件,很是麻烦。mybatis-plus就很好的解决了这个问题。..._mybaitis-plus ruledataobjectattributemapper' and 'com.picc.rule.management.d

EECE 1080C / Programming for ECESummer 2022 Laboratory 4: Global Functions Practice_eece1080c-程序员宅基地

文章浏览阅读325次。EECE 1080C / Programming for ECESummer 2022Laboratory 4: Global Functions PracticePlagiarism will not be tolerated:Topics covered:function creation and call statements (emphasis on global functions)Objective:To practice program development b_eece1080c

洛谷p4777 【模板】扩展中国剩余定理-程序员宅基地

文章浏览阅读53次。被同机房早就1年前就学过的东西我现在才学,wtcl。设要求的数为\(x\)。设当前处理到第\(k\)个同余式,设\(M = LCM ^ {k - 1} _ {i - 1}\) ,前\(k - 1\)个的通解就是\(x + i * M\)。那么其实第\(k\)个来说,其实就是求一个\(y\)使得\(x + y * M ≡ a_k(mod b_k)\)转化一下就是\(y * M ...

android 退出应用没有走ondestory方法,[Android基础论]为何Activity退出之后,系统没有调用onDestroy方法?...-程序员宅基地

文章浏览阅读1.3k次。首先,问题是如何出现的?晚上复查代码,发现一个activity没有调用自己的ondestroy方法我表示非常的费解,于是我检查了下代码。发现再finish代码之后接了如下代码finish();System.exit(0);//这就是罪魁祸首为什么这样写会出现问题System.exit(0);////看一下函数的原型public static void exit (int code)//Added ..._android 手动杀死app,activity不执行ondestroy

SylixOS快问快答_select函数 导致堆栈溢出 sylixos-程序员宅基地

文章浏览阅读894次。Q: SylixOS 版权是什么形式, 是否分为<开发版税>和<运行时版税>.A: SylixOS 是开源并免费的操作系统, 支持 BSD/GPL 协议(GPL 版本暂未确定). 没有任何的运行时版税. 您可以用她来做任何 您喜欢做的项目. 也可以修改 SylixOS 的源代码, 不需要支付任何费用. 当然笔者希望您可以将使用 SylixOS 开发的项目 (不需要开源)或对 SylixOS 源码的修改及时告知笔者.需要指出: SylixOS 本身仅是笔者用来提升自己水平而开发的_select函数 导致堆栈溢出 sylixos

推荐文章

热门文章

相关标签