已复制
全屏展示
复制代码

SparkStreaming Scala 示例

· 1 min read

SparkStreaming 初次上手非常简单的示例。

准备TCP连接

  • 命令行启动 8888 端口,等待 SparkStreaming 来连接,当 SparkStreaming 启动后再这里输入内容,SparkStreaming 会立即读取输入的内容。
$ nc -lk 8888

准备SparkStreaming程序

  • 添加SparkStreaming的依赖
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming_${scala.version.major}</artifactId>
    <version>${spark.version}</version>
</dependency>
  • 编写代码
package com.yuchaoshui

import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.{Seconds, StreamingContext}


object WordCount {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().master("local[*]").appName("myApp").getOrCreate()
    val ssc: StreamingContext = new StreamingContext(spark.sparkContext, Seconds(5))
    ssc.sparkContext.setLogLevel("WARN")

    val lines: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 8888)
    lines.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _).print()

    ssc.start()
    ssc.awaitTermination()
  }
}
  • 输出结果
// 命令输入
$ nc  -lk 8888
a a b b b b b b
we are good we are bad


// idea console打印内容
-------------------------------------------
(are,2)
(a,2)
(bad,1)
(b,6)
(good,1)
(we,2)

-------------------------------------------
Time: 1618047110000 ms
-------------------------------------------
🔗

文章推荐