Spark教程
scala程序框架:
def main(args: Array[String]): Unit = { val spark = SparkSession .builder() .appName("RddTest") .master("local[4]") .getOrCreate() val sc = spark.sparkContext var rdd1=sc.makeRDD(Seq(8,3,6,2,5)) rdd1.collect().foreach(x=>println(x)) spark.stop() }
scala> //从序列创建 scala> val rdd1 = sc.parallelize(Seq(1,2,3,4,5,6)) rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[23] at parallelize at <console>:24 scala> //从列表创建 scala> val rdd2 = sc.parallelize(List(1,2,3,4,5,6)) rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[24] at parallelize at <console>:24 scala> //从数组创建 scala> val rdd3 = sc.parallelize(Array(1,2,3,4,5,6)) rdd3: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[25] at parallelize at <console>:24
本质上调用了sc.parallelize,sc.makeRDD的源码如下:
/** Distribute a local Scala collection to form an RDD. * * This method is identical to `parallelize`. */ def makeRDD[T: ClassTag]( seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T] = withScope { parallelize(seq, numSlices) }
而sc.parallelize源码如下:
/** Distribute a local Scala collection to form an RDD. * * @note Parallelize acts lazily. If `seq` is a mutable collection and is altered after the call * to parallelize and before the first action on the RDD, the resultant RDD will reflect the * modified collection. Pass a copy of the argument to avoid this. * @note avoid using `parallelize(Seq())` to create an empty `RDD`. Consider `emptyRDD` for an * RDD with no partitions, or `parallelize(Seq[T]())` for an RDD of `T` with empty partitions. */ def parallelize[T: ClassTag]( seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T] = withScope { assertNotStopped() new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]()) }
使用sc.makeRDD实例如下:
scala> //从序列创建 scala> val rdd1 = sc.makeRDD(Seq(1,2,3,4,5,6)) rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[18] at makeRDD at <console>:24 scala> //从列表创建 scala> val rdd2 = sc.makeRDD(List(1,2,3,4,5,6)) rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[19] at makeRDD at <console>:24 scala> //从数组创建 scala> val rdd3 = sc.makeRDD(Array(1,2,3,4,5,6)) rdd3: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[20] at makeRDD at <console>:24 scala> //从文件读取 scala> val rdd4 = sc.textFile("file:///home/k6k4/data.txt", 1) rdd4: org.apache.spark.rdd.RDD[String] = file:///home/k6k4/data.txt MapPartitionsRDD[22] at textFile at <console>:24