目录
一、Spark Streaming概述
二、添加依赖
三、配置log4j
1.依赖下载好后打开IDEA最左侧的外部库
2.找到spark-core
3.找到apache.spark目录
4.找到log4j-defaults.properties文件
5.将该文件放在资源目录下,并修改文件名
6.修改log4j.properties第19行的内容
四、Spark Streaming读取Socket数据流
1.代码编写
2.开启nc -lk
3.启动Scala程序
五、Spark Streaming读取kafka消息
1.代码编写
2.开启生产者sparkkafkastu并生产消息
3. 运行scala代码
一、Spark Streaming概述
Spark Streaming 用于流式数据的处理。Spark Streaming 支持的数据输入源很多,例如:Kafka、Flume、Twitter、ZeroMQ 和简单的 TCP 套接字等等。数据输入后可以用 Spark 的RDD如:map、reduce、join、window 等进行运算。而结果也能保存在很多地方,如 HDFS,数据库等。
Spark Streaming与Flink的区别:Spark Streaming是基于秒级别,而Flink是基于毫秒级别,是真正的实时流,Spark Streaming属于伪实时。因此,在选择实时流计算框架时,如果对实时速度要求不高的话,选择Spark Streaming基本足够。
Spark Streaming的编程抽象是离散化流,也就是DStream。它是一个 RDD 序列,每个RDD代表数据流中一个时间片内的数据。
应用于 DStream 上的转换操作都会转换为底层RDD上的操作。如对行 DStream中的每个RDD应用flatMap操作以生成单词 DStream 的RDD。
二、添加依赖
<properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target><spark.version>3.1.2</spark.version><mysql.version>8.0.29</mysql.version></properties><dependencies><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.11</version><scope>test</scope></dependency><!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core --><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.12</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.12</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.12</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka-0-10_2.12</artifactId><version>${spark.version}</version></dependency></dependencies>
三、配置log4j
1.依赖下载好后打开IDEA最左侧的外部库
2.找到spark-core
3.找到apache.spark目录
4.找到log4j-defaults.properties文件
5.将该文件放在资源目录下,并修改文件名
6.修改log4j.properties第19行的内容
log4j.rootCategory=ERROR, console
四、Spark Streaming读取Socket数据流
1.代码编写
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}object SparkStreamDemo1 {def main(args: Array[String]): Unit = {val conf: SparkConf = new SparkConf().setMaster("local[2]").setAppName("sparkstream1")// 定义流,采集周期3秒val streamingContext = new StreamingContext(conf, Seconds(3))// TODO 配置数据源为指定机器和端口val socketLineStream: ReceiverInputDStream[String] = streamingContext.socketTextStream("lxm147", 8888)// TODO 业务处理val wordStream: DStream[String] = socketLineStream.flatMap(_.split("\\s+"))val mapStream: DStream[(String, Int)] = wordStream.map((_, 1))val wordCountStream: DStream[(String, Int)] = mapStream.reduceByKey(_ + _)// TODO 输出结果wordCountStream.print()// TODO 启动采集器streamingContext.start()streamingContext.awaitTermination()}
}
2.开启nc -lk
3.启动Scala程序
五、Spark Streaming读取kafka消息
1.代码编写
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}object SparkStreamingKafkaSource {def main(args: Array[String]): Unit = {val conf: SparkConf = new SparkConf().setAppName("sparkKafkaStream").setMaster("local[*]")val streamingContext = new StreamingContext(conf, Seconds(5))val kafkaParams = Map((ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "lxm147:9092"),(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer"),(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer"),(ConsumerConfig.GROUP_ID_CONFIG -> "sparkstreamgroup1"))val kafkaStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(streamingContext,LocationStrategies.PreferConsistent,// 如果没有topic需要创建// kafka-topics.sh --create --zookeeper lxm147:2181 --topic sparkkafkastu --partitions 1 --replication-factor 1ConsumerStrategies.Subscribe(Set("sparkkafkastu"), kafkaParams))// KeyValue(key,value)val wordCountStream: DStream[(String, Int)] = kafkaStream.flatMap(_.value().toString.split("\\s+")).map((_, 1)).reduceByKey(_ + _)wordCountStream.print()streamingContext.start()streamingContext.awaitTermination()}
}
2.开启生产者sparkkafkastu并生产消息
3. 运行scala代码