SparkStreaming连接Kafka示例
配置 kafka
配置kafka,确保kafka能正常生产消息,消费消息。
# 创建 topic1
./bin/kafka-topics.sh --zookeeper node1:2181 --create --replication-factor 2 --partitions 3 --topic topic1
# 向 topic1 发送一些消息
./bin/kafka-console-producer.sh --broker-list node1:6667 --topic topic1
>wh wh wh
>a b c
>
# 打开另一个终端,消费消息
./bin/kafka-console-consumer.sh --bootstrap-server node1:6667 --from-beginning --topic topic1
wh wh wh
a b c
依赖 pom
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<spark.version>2.3.0</spark.version>
<scala.version.major>2.11</scala.version.major>
<scala.version.minor>2.11.12</scala.version.minor>
<hadoop.version>3.1.1</hadoop.version>
</properties>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.version.major}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_${scala.version.major}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_${scala.version.major}</artifactId>
<version>${spark.version}</version>
</dependency>
SparkStreaming
使用和 kafka 直连的方式:SparkStreaming的一个task对应一个kafka的一个partition。这个task不仅仅是读数据,还有处理的逻辑。
package com.yuchaoshui
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}
object WordCount {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().master("local[*]").appName("myApp").getOrCreate()
val ssc: StreamingContext = new StreamingContext(spark.sparkContext, Seconds(5))
ssc.sparkContext.setLogLevel("WARN")
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "node1:6667,node2:6667,node3:6667",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "yzy.group",
"auto.offset.reset" -> "earliest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val topics = Array("topic1")
val KafkaDStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
)
//在 map 里面可以判断这个消息来自哪个topic
//KafkaDStream.map(record => {
// record.topic()
//}).print()
KafkaDStream.map(_.value()).flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _).print()
ssc.start()
ssc.awaitTermination()
}
}
- 控制台输出结果
-------------------------------------------
Time: 1618050870000 ms
-------------------------------------------
(a,1)
(wh,3)
(b,2)
(c,1)