spark算子大致上可分三大类算子:
Value数据类型的Transformation算子,这种变换不触发提交作业,针对处理的数据项是Value型的数据。
Key-Value数据类型的Transformation算子,这种变换不触发提交作业,针对处理的数据项是Key-Value型的数据。
Action算子,这类算子会触发SparkContext提交作业。
Value型Transformation算子
1)map1
2
3
4
5val a = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3)
val b = a.map(_.length)
val c = a.zip(b)
c.collect
res0: Array[(String, Int)] = Array((dog,3), (salmon,6), (salmon,6), (rat,3), (elephant,8))

2)flatMap
1 | val a = sc.parallelize(1 to 10, 5) |

3)mapPartiions1
2
3
4val x = sc.parallelize(1 to 10, 3)
x.flatMap(List.fill(scala.util.Random.nextInt(10))(_)).collect
res1: Array[Int] = Array(1, 2, 3, 3, 3, 4, 4, 4, 4, 4, 4, 4, 4, 4, 5, 5, 6, 6, 6, 6, 6, 6, 6, 6, 7, 7, 7, 8, 8, 8, 8, 8, 8, 8, 8, 9, 9, 9, 9, 9, 10, 10, 10, 10, 10, 10, 10, 10)

4)glom(形成一个Array数组)
1 | val a = sc.parallelize(1 to 100, 3) |

5)union1
2
3
4val a = sc.parallelize(1 to 3, 1)
val b = sc.parallelize(5 to 7, 1)
(a ++ b).collect
res0: Array[Int] = Array(1, 2, 3, 5, 6, 7)

6)cartesian(笛卡尔操作)1
2
3
4val x = sc.parallelize(List(1,2,3,4,5))
val y = sc.parallelize(List(6,7,8,9,10))
x.cartesian(y).collect
res0: Array[(Int, Int)] = Array((1,6), (1,7), (1,8), (1,9), (1,10), (2,6), (2,7), (2,8), (2,9), (2,10), (3,6), (3,7), (3,8), (3,9), (3,10), (4,6), (5,6), (4,7), (5,7), (4,8), (5,8), (4,9), (4,10), (5,9), (5,10))

7)groupBy(生成相应的key,相同的放在一起)1
2
3val a = sc.parallelize(1 to 9, 3)
a.groupBy(x => { if (x % 2 == 0) "even" else "odd" }).collect
res42: Array[(String, Seq[Int])] = Array((even,ArrayBuffer(2, 4, 6, 8)), (odd,ArrayBuffer(1, 3, 5, 7, 9)))

8)filter1
2
3
4val a = sc.parallelize(1 to 10, 3)
val b = a.filter(_ % 2 == 0)
b.collect
res3: Array[Int] = Array(2, 4, 6, 8, 10)

9)distinct(去重)1
2
3val c = sc.parallelize(List("Gnu", "Cat", "Rat", "Dog", "Gnu", "Rat"), 2)
c.distinct.collect
res6: Array[String] = Array(Dog, Gnu, Cat, Rat)

10)subtract(去掉含有重复的项)1
2
3
4
5val a = sc.parallelize(1 to 9, 3)
val b = sc.parallelize(1 to 3, 3)
val c = a.subtract(b)
c.collect
res3: Array[Int] = Array(6, 9, 4, 7, 5, 8)

11)sample1
2
3val a = sc.parallelize(1 to 10000, 3)
a.sample(false, 0.1, 0).count
res24: Long = 960

12)takesample
1 | val x = sc.parallelize(1 to 1000, 3) |

13)cache、persist
1 | val c = sc.parallelize(List("Gnu", "Cat", "Rat", "Dog", "Gnu", "Rat"), 2) |

Key-Value型Transformation算子
1)mapValues1
2
3
4val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", "eagle"), 2)
val b = a.map(x => (x.length, x))
b.mapValues("x" + _ + "x").collect
res5: Array[(Int, String)] = Array((3,xdogx), (5,xtigerx), (4,xlionx), (3,xcatx), (7,xpantherx), (5,xeaglex))

2)combineByKey
1 | val a = sc.parallelize(List("dog","cat","gnu","salmon","rabbit","turkey","wolf","bear","bee"), 3) |

3)reduceByKey
1 | val a = sc.parallelize(List("dog", "cat", "owl", "gnu", "ant"), 2) |

4)partitionBy
(对RDD进行分区操作)

5)cogroup
1 | val a = sc.parallelize(List(1, 2, 1, 3), 1) |

6)join
1 | val a = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3) |

7)leftOutJoin
1 | val a = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3) |
8)rightOutJoin
1 | val a = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3) |
Actions算子
1)foreach
1 | val c = sc.parallelize(List("cat", "dog", "tiger", "lion", "gnu", "crocodile", "ant", "whale", "dolphin", "spider"), 3) |

2)saveAsTextFile1
2
3val a = sc.parallelize(1 to 10000, 3)
a.saveAsTextFile("mydata_a")
14/04/03 21:11:36 INFO FileOutputCommitter: Saved output of task 'attempt_201404032111_0000_m_000002_71' to file:/home/cloudera/Documents/spark-0.9.0-incubating-bin-cdh4/bin/mydata_a

3)saveAsObjectFile
1 | val x = sc.parallelize(1 to 100, 3) |

4)collect1
2
3val c = sc.parallelize(List("Gnu", "Cat", "Rat", "Dog", "Gnu", "Rat"), 2)
c.collect
res29: Array[String] = Array(Gnu, Cat, Rat, Dog, Gnu, Rat)

5)collectAsMap1
2
3
4val a = sc.parallelize(List(1, 2, 1, 3), 1)
val b = a.zip(a)
b.collectAsMap
res1: scala.collection.Map[Int,Int] = Map(2 -> 2, 1 -> 1, 3 -> 3)

6)reduceByKeyLocally1
2
3
4val a = sc.parallelize(List("dog", "cat", "owl", "gnu", "ant"), 2)
val b = a.map(x => (x.length, x))
b.reduceByKey(_ + _).collect
res86: Array[(Int, String)] = Array((3,dogcatowlgnuant))
7)lookup1
2
3
4val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", "eagle"), 2)
val b = a.map(x => (x.length, x))
b.lookup(5)
res0: Seq[String] = WrappedArray(tiger, eagle)

8)count1
2
3val c = sc.parallelize(List("Gnu", "Cat", "Rat", "Dog"), 2)
c.count
res2: Long = 4

9)top1
2
3val c = sc.parallelize(Array(6, 9, 4, 7, 5, 8), 2)
c.top(2)
res28: Array[Int] = Array(9, 8)
10)reduce
1 | val a = sc.parallelize(1 to 100, 3) |
11)fold
1 | val a = sc.parallelize(List(1,2,3), 3) |

12)aggregate
1 |
|
参考:http://homepage.cs.latrobe.edu.au/zhe/ZhenHeSparkRDDAPIExamples.html