// 在本地启动名为SimpleDemo的SparkStreaming应用 // 该应用拥有两个线程,其批处理时间间隔为1s // 创建SparkConf val conf = new SparkConf().setMaster("local[2]").setAppName("SimpleDemo") // 从SparkConf创建StreamingContext并指定1秒钟的批处理大小 val ssc = new StreamingContext(conf, Seconds(1)) // 创建ReceiverInputDStream,该InputDStream的Receiver监听本地机器的7777端口 val lines = ssc.socketTextStream("localhost", 7777) // 类型是ReceiverInputDStream // 从DStream中筛选出包含字符串"error"的行,构造出了 // lines -> errorLines -> .print()这样一个DStreamGraph val errorLines = lines.filter(_.contains("error")) // 打印出含有"error"的行 errorLines.print() // 启动流计算环境StreamingContext并等待它"完成" ssc.start() // 等待作业完成 ssc.awaitTermination()