技术标签: 实例
不同场景平均值算法
求平均值系列之一:
val input = sc.parallelize(Seq(("t1", 1), ("t1", 2), ("t1", 3), ("t2", 2), ("t2", 5)))
val result = input.combineByKey(
(v) => (v, 1),
(acc: (Int, Int), v) => (acc._1 + v, acc._2 + 1),
(acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2)
).map{ case (key, value) => (key, value._1 / value._2.toFloat) }
result.collectAsMap().foreach(println(_))
-----------------测试运行结果:--------------------
scala> val input = sc.parallelize(Seq(("t1", 1), ("t1", 2), ("t1", 3), ("t2", 2), ("t2", 5)))
input: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[17] at parallelize at <console>:25
scala> val result = input.combineByKey(
| (v) => (v, 1),
| (acc: (Int, Int), v) => (acc._1 + v, acc._2 + 1),
| (acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2)
| ).map{ case (key, value) => (key, value._1 / value._2.toFloat) }
result: org.apache.spark.rdd.RDD[(String, Float)] = MapPartitionsRDD[19] at map at <console>:31
scala> result.collectAsMap().foreach(println(_))
(t1,2.0)
(t2,3.5)
求平均值系列之二:
val testData = sc.parallelize(Seq(("t1", (1,2)), ("t1", (2,4)), ("t1", (3,5)), ("t2", (2,1)), ("t2", (5,2))))
(t2,(6,3))(t1,(4,2))
val result = testData.combineByKey(
(v) => (v._1, v._2),
(acc: (Int, Int), v) => (acc._1 + v._1, acc._2 + v._2),
(acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2)
).map{ case (key, value) => (key, value._1 / value._2) }
result.collectAsMap().foreach(println(_))
val result = testData.combineByKey(
(v) => (v._1, v._2),
(acc: (Int, Int), v) => (acc._1 + v._1, acc._2 + v._2),
(acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2)
).map{ case (key, value) => (key, value._1 / value._2) }
result.collectAsMap().foreach(println(_))
求平均值算法验证,下面不能直接使用于生成环境
val testData = sc.parallelize(Seq(("t1", (1,2)), ("t1", (2,4)), ("t1", (3,5)), ("t2", (2,1)), ("t2", (5,2))))
val tt = testData.combineByKey((_: String) => (0, 0),(pair: (Int, Int), value: String) =>(pair._1 + Integer.parseInt(value.split("#")(0)), pair._2 + Integer.parseInt(value.split("#")(1))),(pair1: (Int, Int), pair2: (Int, Int)) =>(pair1._1 + pair2._1, pair2._2 + pair2._2))
(t2,(5,2))(t1,(2,0))
val tt = testData.combineByKey((_: String) => (0, 0),(pair: (Int, Int), value: String) => (pair._1 + Integer.parseInt(value.split("#")(0)), pair._2 + Integer.parseInt(value.split("#")(1))),(pair1: (Int, Int), pair2: (Int, Int)) =>( pair1._1 + pair2._1, pair2._2 + pair2._2 ))
val averages: RDD[String, Double] = sumCountPairs.mapValues {
case (sum, 0L) => 0D
case (sum, count) => sum.toDouble / count
}
val sumCountPairs:RDD[(String, (Int, Long))] = testData.combineByKey(
(_: Int) => (0, 0L),
(pair: (Int, Long), value: Int) =>
(pair._1 + value, pair._2 + 1L),
(pair1: (Int, Long), pair2: (Int, Long)) =>
(pair1._1 + pair2._1, pair2._2 + pair2._2)
)
val sumCountPairs = testData.combineByKey(
(_: Int) => (0, 0L),
(pair: (Int, Long), value: Int) =>
(pair._1 + value, pair._2 + 1L),
(pair1: (Int, Long), pair2: (Int, Long)) =>
(pair1._1 + pair2._1, pair2._2 + pair2._2)
)
val averages = sumCountPairs.mapValues {
case (sum, 0L) => 0D
case (sum, count) => sum.toDouble / count
}
案例拾遗:
题主示例代码中 testData 这个 RDD 的类型是已经确定为 RDD[(String, Int)],然后通过 RDD.rddToRDDPairFunctions 这个隐式类型转换转为 PairRDDFunctions[String, Int],从而获得 reduceByKey 和 combineByKey 这两个 methods。然后来对比下二者的函数签名: class PairRDDFunctions[K, V](...) {
def reduceByKey(func: (V, V) => V): RDD[(K, V)]
def combineByKey[C](
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C): RDD[(K, C)]
}
可以看到 reduceByKey 的 func 参数的类型只依赖于 PairRDDFunction 的类型参数 V,在这个例子里也就是 Int。于是 func 的类型已经确定为 (Int, Int) => Int,所以就不需要额外标识类型了。而 combineByKey 比 reduceByKey 更加通用,它允许各个 partition 在 shuffle 前先做 local reduce 得到一个类型为 C 的中间值,待 shuffle 后再做合并得到各个 key 对应的 C。以求均值为例,我们可以让每个 partiton 先求出单个 partition 内各个 key 对应的所有整数的和 sum 以及个数 count,然后返回一个 pair (sum, count)。在 shuffle 后累加各个 key 对应的所有 sum 和 count,再相除得到均值:val sumCountPairs: RDD[(String, (Int, Long))] = testData.combineByKey(
(_: Int) => (0, 0L),
(pair: (Int, Long), value: Int) =>
(pair._1 + value, pair._2 + 1L),
(pair1: (Int, Long), pair2: (Int, Long)) =>
(pair1._1 + part2._1, pair2._2 + pair2._2)
)
val averages: RDD[String, Double] = sumCountPairs.mapValues {
case (sum, 0L) => 0D
case (sum, count) => sum.toDouble / count
}
由于 C 这个 类型参数是任意的,并不能从 testData 的类型直接推导出来,所以必须明确指定。只不过题主的例子是最简单的用 reduceByKey 就可以搞定的情况,也就是 V 和 C 完全相同,于是就看不出区别了。
val listRDD = sc.parallelize(List(1,2,3,4,4,5)).map(x => (x,1))
def combineByKey[C](createCombiner: Int => C,mergeValue: (C, Int) => C,mergeCombiners: (C, C) => C): org.apache.spark.rdd.RDD[(Int, C)]
val sumandcnt = listRDD.combineByKey((_: Int)=>(0, 0),(pair:(Int,Int),value:Int)=>(pair._1 + value, pair._2 + 1),(pair1:(Int, Int),pair2:(Int, Int))=>(pair1._1 + pair2._1, pair2._2 + pair2._2))
val ll =sumandcnt.mapValues {
case (sum, 0) => 0D
case (sum, count) => sum.toDouble / count
}
val rdd = List(1,2,3,4)
val input = sc.parallelize(rdd)
val result = input.aggregate((0,0))(
(acc,value) => (acc._1 + value, acc._2 + 1),
(acc1,acc2) => (acc1._1 + acc2._1, acc1._2 + acc2._2)
)
result: (Int, Int) = (10, 4)
val avg = result._1 / result._2
avg: Int = 2.5
程序的详细过程大概如下:
首先定义一个初始值 (0, 0),即我们期待的返回类型的初始值。
(acc,value) => (acc._1 + value, acc._2 + 1), value是函数定义里面的T,这里是List里面的元素。所以acc._1 + value, acc._2 + 1的过程如下:
0+1, 0+1
1+2, 1+1
3+3, 2+1
6+4, 3+1
结果为 (10,4)。在实际Spark执行中是分布式计算,可能会把List分成多个分区,假如3个,p1(1,2), p2(3), p3(4),经过计算各分区的的结果 (3,2), (3,1), (4,1),这样,执行 (acc1,acc2) => (acc1._1 + acc2._1, acc1._2 + acc2._2) 就是 (3+3+4,2+1+1) 即 (10,4),然后再计算平均值。
文章浏览阅读1.1w次,点赞7次,收藏34次。vue-grid-layout的使用、实例、遇到的问题和解决方案_vue-grid-layout
文章浏览阅读218次。然后连接一个数据源,就会在下面自动产生一个添加附件的组件。把这个控件复制粘贴到页面里,就可以单独使用来上传了。插入一个“编辑”窗体。_powerapps点击按钮上传附件
文章浏览阅读264次。(1) Abstraction (抽象)(2) Polymorphism (多态)(3) Inheritance (继承)(4) Encapsulation (封装)_"object(cnofd[\"ofdrender\"])十条"
文章浏览阅读133次。删除node_modules,重新npm install看是否成功。在 package.json 文件中的 scripts 中加入。修改你的第三方库的bug等。然后目录会多出一个目录文件。_修改 node_modules
文章浏览阅读883次。【代码】【】kali--password:su的 Authentication failure问题,&sudo passwd root输入密码时Sorry, try again._password: su: authentication failure
文章浏览阅读1w次,点赞13次,收藏97次。整理5个优秀的微信小程序开源项目。收集了微信小程序开发过程中会使用到的资料、问题以及第三方组件库。_微信小程序开源模板
文章浏览阅读128次。Centos7最简搭建NFS服务器_centos7 搭建nfs server
文章浏览阅读1.2k次,点赞2次,收藏3次。前言mybatis在持久层框架中还是比较火的,一般项目都是基于ssm。虽然mybatis可以直接在xml中通过SQL语句操作数据库,很是灵活。但正其操作都要通过SQL语句进行,就必须写大量的xml文件,很是麻烦。mybatis-plus就很好的解决了这个问题。..._mybaitis-plus ruledataobjectattributemapper' and 'com.picc.rule.management.d
文章浏览阅读325次。EECE 1080C / Programming for ECESummer 2022Laboratory 4: Global Functions PracticePlagiarism will not be tolerated:Topics covered:function creation and call statements (emphasis on global functions)Objective:To practice program development b_eece1080c
文章浏览阅读53次。被同机房早就1年前就学过的东西我现在才学,wtcl。设要求的数为\(x\)。设当前处理到第\(k\)个同余式,设\(M = LCM ^ {k - 1} _ {i - 1}\) ,前\(k - 1\)个的通解就是\(x + i * M\)。那么其实第\(k\)个来说,其实就是求一个\(y\)使得\(x + y * M ≡ a_k(mod b_k)\)转化一下就是\(y * M ...
文章浏览阅读1.3k次。首先,问题是如何出现的?晚上复查代码,发现一个activity没有调用自己的ondestroy方法我表示非常的费解,于是我检查了下代码。发现再finish代码之后接了如下代码finish();System.exit(0);//这就是罪魁祸首为什么这样写会出现问题System.exit(0);////看一下函数的原型public static void exit (int code)//Added ..._android 手动杀死app,activity不执行ondestroy
文章浏览阅读894次。Q: SylixOS 版权是什么形式, 是否分为<开发版税>和<运行时版税>.A: SylixOS 是开源并免费的操作系统, 支持 BSD/GPL 协议(GPL 版本暂未确定). 没有任何的运行时版税. 您可以用她来做任何 您喜欢做的项目. 也可以修改 SylixOS 的源代码, 不需要支付任何费用. 当然笔者希望您可以将使用 SylixOS 开发的项目 (不需要开源)或对 SylixOS 源码的修改及时告知笔者.需要指出: SylixOS 本身仅是笔者用来提升自己水平而开发的_select函数 导致堆栈溢出 sylixos