SparkStreaming 初次上手非常简单的示例。
准备TCP连接
- 命令行启动 8888 端口,等待 SparkStreaming 来连接,当 SparkStreaming 启动后再这里输入内容,SparkStreaming 会立即读取输入的内容。
$ nc -lk 8888
准备SparkStreaming程序
- 添加SparkStreaming的依赖
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_${scala.version.major}</artifactId>
<version>${spark.version}</version>
</dependency>
- 编写代码
package com.yuchaoshui
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.{Seconds, StreamingContext}
object WordCount {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().master("local[*]").appName("myApp").getOrCreate()
val ssc: StreamingContext = new StreamingContext(spark.sparkContext, Seconds(5))
ssc.sparkContext.setLogLevel("WARN")
val lines: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 8888)
lines.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _).print()
ssc.start()
ssc.awaitTermination()
}
}
- 输出结果
// 命令输入
$ nc -lk 8888
a a b b b b b b
we are good we are bad
// idea console打印内容
-------------------------------------------
(are,2)
(a,2)
(bad,1)
(b,6)
(good,1)
(we,2)
-------------------------------------------
Time: 1618047110000 ms
-------------------------------------------