JobPlus知识库 IT 工业智能4.0 文章
理解Spark RDD中的aggregate函数

def aggregate[U](zeroValue: U)(seqOp: (U, T) ⇒ U, combOp: (U, U) ⇒ U)(implicit arg0: ClassTag[U]): U


Aggregate the elements of each partition, and then the results for all the partitions, using given combine functions and a neutral “zero value”. This function can return a different result type, U, than the type of this RDD, T. Thus, we need one operation for merging a T into an U and one operation for merging two U’s, as in scala.TraversableOnce. Both of these functions are allowed to modify and return their first argument instead of creating a new U to avoid memory allocation.   seqOp操作会聚合各分区中的元素,然后combOp操作把所有分区的聚合结果再次聚合,两个操作的初始值都是zeroValue.   seqOp的操作是遍历分区中的所有元素(T),第一个T跟zeroValue做操作,结果再作为与第二个T做操作的zeroValue,直到遍历完整个分区。combOp操作是把各分区聚合的结果,再聚合。aggregate函数返回一个跟RDD不同类型的值。因此,需要一个操作seqOp来把分区中的元素T合并成一个U,另外一个操作combOp把所有U聚合。

  • zeroValue
  • the initial value for the accumulated result of each partition for the seqOp operator, and also the initial value for the combine results from different partitions for the combOp operator - this will typically be the neutral element (e.g. Nil for list concatenation or 0 for summation)

  • seqOp
  • an operator used to accumulate results within a partition

  • combOp
  • an associative operator used to combine results from different partitions

举个例子。假如List(1,2,3,4,5,6,7,8,9,10),对List求平均数,使用aggregate可以这样操作。

C:\Windows\System32>scala
Welcome to Scala 2.11.8 (Java HotSpot(TM) Client VM, Java 1.8.0_91).
Type in expressions for evaluation. Or try :help.

scala> val rdd = List(1,2,3,4,5,6,7,8,9)
rdd: List[Int] = List(1, 2, 3, 4, 5, 6, 7, 8, 9)

scala> rdd.par.aggregate((0,0))(

(acc,number) => (acc._1 + number, acc._2 + 1),

(par1,par2) => (par1._1 + par2._1, par1._2 + par2._2)

)
res0: (Int, Int) = (45,9)

scala> res0._1 / res0._2
res1: Int = 5


过程大概这样:

首先,初始值是(0,0),这个值在后面2步会用到。

然后,(acc,number) => (acc._1 + number, acc._2 + 1),number即是函数定义中的T,这里即是List中的元素。所以acc._1 + number, acc._2 + 1的过程如下。

1.   0+1,  0+1

2.  1+2,  1+1

3.  3+3,  2+1

4.  6+4,  3+1

5.  10+5,  4+1

6.  15+6,  5+1

7.  21+7,  6+1

8.  28+8,  7+1

9.  36+9,  8+1

结果即是(45,9)。这里演示的是单线程计算过程,实际Spark执行中是分布式计算,可能会把List分成多个分区,假如3个,p1(1,2,3,4),p2(5,6,7,8),p3(9),经过计算各分区的的结果(10,4),(26,4),(9,1),这样,执行(par1,par2) => (par1._1 + par2._1, par1._2 + par2._2)就是(10+26+9,4+4+1)即(45,9).再求平均值就简单了。


如果觉得我的文章对您有用,请随意打赏。您的支持将鼓励我继续创作!

¥ 打赏支持
51人赞 举报
分享到
用户评价(0)

暂无评价,你也可以发布评价哦:)

扫码APP

扫描使用APP

扫码使用

扫描使用小程序