Spark教程
作者: 时海 风自在
构造RDD

scala程序框架:

 def main(args: Array[String]): Unit = {

    val spark = SparkSession
      .builder()
      .appName("RddTest")
      .master("local[4]")
      .getOrCreate()

    val sc = spark.sparkContext

    var rdd1=sc.makeRDD(Seq(8,3,6,2,5))

    rdd1.collect().foreach(x=>println(x))

    spark.stop()

  }

sc.parallelize

scala> //从序列创建

scala> val rdd1 = sc.parallelize(Seq(1,2,3,4,5,6))
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[23] at parallelize at <console>:24

scala> //从列表创建

scala> val rdd2 = sc.parallelize(List(1,2,3,4,5,6))
rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[24] at parallelize at <console>:24

scala> //从数组创建

scala> val rdd3 = sc.parallelize(Array(1,2,3,4,5,6))
rdd3: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[25] at parallelize at <console>:24

sc.makeRDD

本质上调用了sc.parallelize,sc.makeRDD的源码如下:


  /** Distribute a local Scala collection to form an RDD.
   *
   * This method is identical to `parallelize`.
   */
  def makeRDD[T: ClassTag](
      seq: Seq[T],
      numSlices: Int = defaultParallelism): RDD[T] = withScope {
    parallelize(seq, numSlices)
  }

而sc.parallelize源码如下:

  /** Distribute a local Scala collection to form an RDD.
   *
   * @note Parallelize acts lazily. If `seq` is a mutable collection and is altered after the call
   * to parallelize and before the first action on the RDD, the resultant RDD will reflect the
   * modified collection. Pass a copy of the argument to avoid this.
   * @note avoid using `parallelize(Seq())` to create an empty `RDD`. Consider `emptyRDD` for an
   * RDD with no partitions, or `parallelize(Seq[T]())` for an RDD of `T` with empty partitions.
   */
  def parallelize[T: ClassTag](
      seq: Seq[T],
      numSlices: Int = defaultParallelism): RDD[T] = withScope {
    assertNotStopped()
    new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]())
  }

使用sc.makeRDD实例如下:

scala> //从序列创建

scala> val rdd1 = sc.makeRDD(Seq(1,2,3,4,5,6))
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[18] at makeRDD at <console>:24

scala> //从列表创建

scala> val rdd2 = sc.makeRDD(List(1,2,3,4,5,6))
rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[19] at makeRDD at <console>:24

scala> //从数组创建

scala> val rdd3 = sc.makeRDD(Array(1,2,3,4,5,6))
rdd3: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[20] at makeRDD at <console>:24

scala> //从文件读取

scala> val rdd4 = sc.textFile("file:///home/k6k4/data.txt", 1)
rdd4: org.apache.spark.rdd.RDD[String] = file:///home/k6k4/data.txt MapPartitionsRDD[22] at textFile at <console>:24


标签: parallelize、rdd、scala、makerdd、sc
一个创业中的苦逼程序员
  • 回复
隐藏