SparkStreaming学习——读取socket的数据和kafka生产者的消息

news/2024/4/29 23:01:27/文章来源:https://blog.csdn.net/Helen_1997_1997/article/details/130344380

目录

一、Spark Streaming概述

二、添加依赖

三、配置log4j

1.依赖下载好后打开IDEA最左侧的外部库

2.找到spark-core

3.找到apache.spark目录

4.找到log4j-defaults.properties文件

5.将该文件放在资源目录下,并修改文件名

6.修改log4j.properties第19行的内容

四、Spark Streaming读取Socket数据流

1.代码编写

2.开启nc -lk

3.启动Scala程序

五、Spark Streaming读取kafka消息

1.代码编写

2.开启生产者sparkkafkastu并生产消息

3. 运行scala代码


一、Spark Streaming概述

        Spark Streaming 用于流式数据的处理。Spark Streaming 支持的数据输入源很多,例如:Kafka、Flume、Twitter、ZeroMQ 和简单的 TCP 套接字等等。数据输入后可以用 Spark 的RDD如:map、reduce、join、window 等进行运算。而结果也能保存在很多地方,如 HDFS,数据库等。

         Spark Streaming与Flink的区别:Spark Streaming是基于秒级别,而Flink是基于毫秒级别,是真正的实时流,Spark Streaming属于伪实时。因此,在选择实时流计算框架时,如果对实时速度要求不高的话,选择Spark Streaming基本足够。

        Spark Streaming的编程抽象是离散化流,也就是DStream。它是一个 RDD 序列,每个RDD代表数据流中一个时间片内的数据。

        应用于 DStream 上的转换操作都会转换为底层RDD上的操作。如对行 DStream中的每个RDD应用flatMap操作以生成单词 DStream 的RDD。

二、添加依赖

<properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target><spark.version>3.1.2</spark.version><mysql.version>8.0.29</mysql.version></properties><dependencies><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.11</version><scope>test</scope></dependency><!--  https://mvnrepository.com/artifact/org.apache.spark/spark-core  --><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.12</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.12</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.12</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka-0-10_2.12</artifactId><version>${spark.version}</version></dependency></dependencies>

三、配置log4j

1.依赖下载好后打开IDEA最左侧的外部库

2.找到spark-core

3.找到apache.spark目录

4.找到log4j-defaults.properties文件

5.将该文件放在资源目录下,并修改文件名

6.修改log4j.properties第19行的内容

log4j.rootCategory=ERROR, console

四、Spark Streaming读取Socket数据流

1.代码编写

import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}object SparkStreamDemo1 {def main(args: Array[String]): Unit = {val conf: SparkConf = new SparkConf().setMaster("local[2]").setAppName("sparkstream1")// 定义流,采集周期3秒val streamingContext = new StreamingContext(conf, Seconds(3))// TODO 配置数据源为指定机器和端口val socketLineStream: ReceiverInputDStream[String] = streamingContext.socketTextStream("lxm147", 8888)// TODO 业务处理val wordStream: DStream[String] = socketLineStream.flatMap(_.split("\\s+"))val mapStream: DStream[(String, Int)] = wordStream.map((_, 1))val wordCountStream: DStream[(String, Int)] = mapStream.reduceByKey(_ + _)// TODO 输出结果wordCountStream.print()// TODO 启动采集器streamingContext.start()streamingContext.awaitTermination()}
}

2.开启nc -lk

3.启动Scala程序

五、Spark Streaming读取kafka消息

1.代码编写

import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}object SparkStreamingKafkaSource {def main(args: Array[String]): Unit = {val conf: SparkConf = new SparkConf().setAppName("sparkKafkaStream").setMaster("local[*]")val streamingContext = new StreamingContext(conf, Seconds(5))val kafkaParams = Map((ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "lxm147:9092"),(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer"),(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer"),(ConsumerConfig.GROUP_ID_CONFIG -> "sparkstreamgroup1"))val kafkaStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(streamingContext,LocationStrategies.PreferConsistent,// 如果没有topic需要创建// kafka-topics.sh --create --zookeeper lxm147:2181 --topic sparkkafkastu --partitions 1 --replication-factor 1ConsumerStrategies.Subscribe(Set("sparkkafkastu"), kafkaParams))// KeyValue(key,value)val wordCountStream: DStream[(String, Int)] = kafkaStream.flatMap(_.value().toString.split("\\s+")).map((_, 1)).reduceByKey(_ + _)wordCountStream.print()streamingContext.start()streamingContext.awaitTermination()}
}

2.开启生产者sparkkafkastu并生产消息

3. 运行scala代码

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.luyixian.cn/news_show_104289.aspx

如若内容造成侵权/违法违规/事实不符,请联系dt猫网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

通达信结构紧凑形态选股公式编写思路

在威廉欧奈尔的《笑傲股市》、马克米勒维尼的《股票魔法师》等书籍中都有结构紧凑形态的相关描述&#xff0c;股票在形成基底时&#xff0c;价格波动幅度逐渐减小&#xff0c;量能逐步萎缩&#xff0c;同时价格相对强度较高。 结构紧凑的形态通过眼睛观察&#xff0c;一般可以…

搭建家庭影音媒体中心 --公网远程连接Jellyfin流媒体服务器

文章目录 前言1. 安装Home Assistant2. 配置Home Assistant3. 安装cpolar内网穿透3.1 windows系统3.2 Linux系统3.3 macOS系统 4. 映射Home Assistant端口5. 公网访问Home Assistant6. 固定公网地址6.1 保留一个固定二级子域名6.2 配置固定二级子域名 转载自远程穿透的文章&…

如何编写高质量代码、提高编程效率?

一、 前言 高质量代码是指在满足功能需求的基础上&#xff0c;具备高性能、安全、可扩展、易维护、可测试等特点的代码。它不仅可以提高开发效率和代码质量&#xff0c;更能有效减少代码维护成本&#xff0c;促进团队协作和项目成功。因此&#xff0c;编写高质量代码对程序员来…

CHAPTER 5: 《DESIGN CONSISTENT HASHING》 第5章 《设计一致的哈希》

CHAPTER 5: DESIGN CONSISTENT HASHING 为了实现水平扩展&#xff0c;有效且均匀地分发请求/数据是很重要的在服务器上。一致散列是实现这一目标的常用技术。但首先&#xff0c;让我们深入了解一下这个问题。 重组问题 如果您有n个缓存服务器&#xff0c;那么平衡负载的常用…

【LeetCode】297. 二叉树的序列化与反序列化

1.问题 序列化是将一个数据结构或者对象转换为连续的比特位的操作&#xff0c;进而可以将转换后的数据存储在一个文件或者内存中&#xff0c;同时也可以通过网络传输到另一个计算机环境&#xff0c;采取相反方式重构得到原数据。 请设计一个算法来实现二叉树的序列化与反序列…

代码审计实战3-android java

jks java keystore 作用&#xff1a;保证应用的唯一性 简介&#xff1a;可以理解为java的密钥库&#xff0c;是一个用来存放密钥和证书的仓库。 &#xff08;而keytool就是密钥和证书的管理工具&#xff0c;它把key&#xff08;密钥&#xff09;和certificate&#xff08;证…

Android性能优化—ViewPagers + Fragment缓存优化

大家看标题&#xff0c;可能会有点儿懵&#xff0c;什么是ViewPagers&#xff0c;因为在很久之前&#xff0c;我们使用的都是ViewPager&#xff0c;但是现在更多的是在用ViewPager2&#xff0c;因此用ViewPagers&#xff08;ViewPager、ViewPager2&#xff09;来代替两者&#…

Camtasia2023简体中文标准版免费更新下载

Camtasia专业的 屏幕录制和视频剪辑软件3000多万专业人士在全球范围内使用Camtasia展示产品&#xff0c;教授课程&#xff0c;培训他人&#xff0c;以更快的速度和更吸引人的方式进行沟通和屏幕分享。使您在Windows和Mac上进行录屏和剪辑创作专业外观的视频变得更为简单。 Camt…

一家传统制造企业的上云之旅,怎样成为了数字化转型典范?

众所周知&#xff0c;中国是一个制造业大国。在想要上云以及正在上云的企业当中&#xff0c;传统制造企业也占据了相当大的比例。 那么这类企业在实施数字化转型的时候&#xff0c;应该如何着手&#xff1f;我们不妨来看看一家传统制造企业的现身说法。 国茂股份的数字化转型诉…

mysql免安装版本(简化版)

1&#xff1a;解压mysql-5.7.26-winx64 2&#xff1a;添加data文件夹 3&#xff1a;添加my.ini文件 内容如下&#xff1a; port "3306" # 设置mysql的安装目录 basedir "D://tools\mysql-5.7.26-winx64\mysql-5.7.26-winx64\" # 设置mysql数据库的数…

软件测试面试一定要看的面试题和笔试题全套教程

1、什么是软件测试&#xff1f;2’ 【要点】 在规定条件下对程序进行操作&#xff0c;以发现错误&#xff0c;对软件质量进行评估&#xff0c;包括对软件形成过程的文档、数据以及程序进行测试。 【详解】 软件测试就是在软件投入运行前对软件需求分析、软件设计规格说明书…

【社区图书馆】PyTorch高级机器学习实战

PyTorch高级机器学习实战 作者&#xff1a;王宇龙&#xff0c;清华大学计算机博士&#xff0c;大型互联网公司算法专家&#xff0c;在国际学术会议及期刊发表过多篇论曾出版书籍《PyTorch深度学习入门与实战》&#xff0c;知乎"机器学习”话题优秀回答者。 亮点&#xf…

ssm+java企业公司产品分销商管理系统

一、 二、经营管理&#xff1a; ①分销商每月提交自己进多少货物&#xff08;从总部进购了多少“鹊巢”的商品给自己负责区的大型商超&#xff09;——对应的种类一共进多少货物&#xff1b;该种类中具体的产品又进了多少货物具体到&#xff08;参考三产品管理模块&#xff09;…

PCIE内核注册详解

代码结构 在Linux内核中&#xff0c;PCIe驱动程序的注册和处理涉及到许多文件&#xff0c;其中一些主要的文件包括&#xff1a; drivers/pci/pci.h&#xff1a;这个文件定义了PCIe驱动程序结构体和相关的函数。驱动程序需要包含这个头文件才能使用PCIe相关的函数和结构体。 d…

李宏毅 深度学习

目录 深度学习与自然语言处理 | 斯坦福CS224n 课程带学与全套笔记解读&#xff08;NLP通关指南完结&#xff09;pytorch快速入门csdn快速入门OS包PIL包Opencv包Dataset类Tensorboard的使用torchvision.transforms 的使用torchvision中数据集的使用DataLoader的使用(torch.util…

Git在工作中的使用流程

Git中的分支 master分支&#xff1a;所有用户可见的正式版本&#xff0c;都从master发布&#xff08;也是用于部署生产环境的分支&#xff0c;确保master分支稳定性&#xff09;。主分支作为稳定的唯一代码库&#xff0c;不做任何开发使用。master 分支一般由develop以及hotfi…

从“恰当”的项目管理工具中,了解自己的缺点

项目管理工具是为了帮助管理者&#xff0c;但管理者需要了解自己在特定情况下的“缺点”&#xff0c;才能从“恰当”的工具中获得“恰当”的帮助。如果你不知道在某个特定项目中自己&#xff08;作为项目经理&#xff09;的缺点&#xff0c;也不知道自己需要利用哪些好用的项目…

记录-因为写不出拖拽移动效果,我恶补了一下Dom中的各种距离

这里给大家分享我在网上总结出来的一些知识&#xff0c;希望对大家有所帮助 背景 最近在项目中要实现一个拖拽头像的移动效果&#xff0c;一直对JS Dom拖拽这一块不太熟悉&#xff0c;甚至在网上找一个示例&#xff0c;都看得云里雾里的&#xff0c;发现遇到最大的拦路虎就是JS…

2022年NOC大赛创客智慧编程赛道Python初赛题,包含答案

目录 一、单选题 二、多选题 三、判断题 下载文档打印做题: NOC Python 初赛考题 一、单选题 <

【大数据之Hadoop】十八、MapReduce之压缩

1 概述 优点&#xff1a;减少磁盘IO、减少磁盘存储空间。 缺点&#xff1a;因为压缩解压缩都需要cpu处理&#xff0c;所以增加CPU开销。 原则&#xff1a;运算密集型的Job&#xff0c;少用压缩&#xff1b;IO密集型的Job&#xff0c;多用压缩。 2 压缩算法对比 压缩方式选择时…