已复制
全屏展示
复制代码

SparkStreaming连接Kafka示例


· 2 min read

配置 kafka

配置kafka,确保kafka能正常生产消息,消费消息。

# 创建 topic1
./bin/kafka-topics.sh --zookeeper node1:2181 --create --replication-factor 2 --partitions 3 --topic topic1

# 向 topic1 发送一些消息
./bin/kafka-console-producer.sh --broker-list node1:6667 --topic topic1
>wh wh wh 
>a b c
>

# 打开另一个终端,消费消息
./bin/kafka-console-consumer.sh --bootstrap-server node1:6667 --from-beginning --topic topic1
wh wh wh 
a b c

依赖 pom

<properties>
    <maven.compiler.source>8</maven.compiler.source>
    <maven.compiler.target>8</maven.compiler.target>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <spark.version>2.3.0</spark.version>
    <scala.version.major>2.11</scala.version.major>
    <scala.version.minor>2.11.12</scala.version.minor>
    <hadoop.version>3.1.1</hadoop.version>
</properties>

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_${scala.version.major}</artifactId>
    <version>${spark.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming_${scala.version.major}</artifactId>
    <version>${spark.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming-kafka-0-10_${scala.version.major}</artifactId>
    <version>${spark.version}</version>
</dependency>

SparkStreaming

使用和 kafka 直连的方式:SparkStreaming的一个task对应一个kafka的一个partition。这个task不仅仅是读数据,还有处理的逻辑。

package com.yuchaoshui

import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}


object WordCount {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().master("local[*]").appName("myApp").getOrCreate()
    val ssc: StreamingContext = new StreamingContext(spark.sparkContext, Seconds(5))
    ssc.sparkContext.setLogLevel("WARN")

    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> "node1:6667,node2:6667,node3:6667",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "yzy.group",
      "auto.offset.reset" -> "earliest",
      "enable.auto.commit" -> (false: java.lang.Boolean)
    )
    val topics = Array("topic1")
    val KafkaDStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(
      ssc,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
    )

    //在 map 里面可以判断这个消息来自哪个topic
    //KafkaDStream.map(record => {
    //  record.topic()
    //}).print()

    KafkaDStream.map(_.value()).flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _).print()

    ssc.start()
    ssc.awaitTermination()
  }
}
  • 控制台输出结果
-------------------------------------------
Time: 1618050870000 ms
-------------------------------------------
(a,1)
(wh,3)
(b,2)
(c,1)
🔗

文章推荐