Spark源码(二)-Netty简介

news/2024/7/26 10:52:01/文章来源:https://blog.csdn.net/weixin_37901366/article/details/137190893

一、Netty简介

Netty 是一个异步事件驱动的网络通信应用框架,用于快速开发可维护的高性能服务器和客户端。简单地说Netty封装了JDK的NIO,不用再写一大堆复杂的代码,从NIO各种繁复的细节中脱离出来,让开发者重点关心业务逻辑。

二、Netty重要API

说明:大部分是模板代码,重点应该关注的是处理业务逻辑的Handler, 其按照角色不同分为ServerHandler和ClientHandler,按照数据处理流向不同,分为InboundHandler(对应接口是ChannelInboundHandlerAdapter)-处理输入数据,比如从客户端的角度,是处理从服务端发送过来的数据, 和OutboundHandler(对应接口是ChannelOutboundHandlerAdapter)-处理输出数据,比如从客户端的角度,是处理其发送给服务端的数据。

1、服务端

//用于接收客户端的连接请求的线程池
val bossGroup = new NioEventLoopGroup()
//用与处理客户端SocketChannel的网络读写的线程池
val workerGroup = new NioEventLoopGroup()
//是Netty用户启动NIO服务端的辅助启动类,降低服务端的开发复杂度
val bootstrap = new ServerBootstrap()
//将两个NIO线程组作为参数传入到ServerBootstrap
bootstrap.group(bossGroup, workerGroup)//创建NioServerSocketChannel.channel(classOf[NioServerSocketChannel])//绑定事件处理类.childHandler(new ChannelInitializer[SocketChannel] {override def initChannel(ch: SocketChannel): Unit = {// 日常主要要写的是ServerHandler逻辑 // 并且通常会添加多个ServerHandlerch.pipeline().addLast(new ServerHandler1)}})
//绑定端口地址端口
bootstrap.bind(host, port)

2、客户端

//创建客户端NIO线程组
val eventGroup = new NioEventLoopGroup
//创建客户端辅助启动类
val bootstrap = new Bootstrap
//将NIO线程组传入到Bootstrap
bootstrap.group(eventGroup)//创建NioSocketChannel.channel(classOf[NioSocketChannel])//绑定事件处理类.handler(new ChannelInitializer[SocketChannel] {override def initChannel(ch: SocketChannel): Unit = {// 日常主要要写的是ClientHandler逻辑 // 并且通常会添加多个ClientHandlerch.pipeline().addLast(new ClientHandler1)}})
//发送连接操作
bootstrap.connect(host, port)

三、Handler执行顺序

如第二部分所述,日常开发主要是写Handler,用于处理不同的业务逻辑,并且通常需要添加多个Handler,那么多个Handler的执行顺序如何呢?示例如下

比如一个客户端添加了5个Handler, 其中1和2是inBound,3和4是OutBound,5是InboundOutBound,

该客户端向外发送的消息会依次经过如下Handle的处理:5->4->3,而其接收到的外部信息会依次经过如下Handle的处理:1->2->5

四、应用举例

1、案例1

描述:客户端与服务端建立连接,会各自执行Handler中的channelActive方法

客户端:ClientHandler1d的channelActive方法被调用[已跟服务器建立连接]

服务端: ServerHandler1的channelActive方法被调用[一个客户端连接上]

(1) 服务端代码

① NettyServer

package top.doe.netty.demo1import io.netty.bootstrap.ServerBootstrap
import io.netty.channel.ChannelInitializer
import io.netty.channel.nio.NioEventLoopGroup
import io.netty.channel.socket.SocketChannel
import io.netty.channel.socket.nio.NioServerSocketChannelclass NettyServer1 {def bind(host: String, port: Int): Unit = {//用于接收客户端的连接请求的线程池val bossGroup = new NioEventLoopGroup()//用与处理客户端SocketChannel的网络读写的线程池val workerGroup = new NioEventLoopGroup()//是Netty用户启动NIO服务端的辅助启动类,降低服务端的开发复杂度val bootstrap = new ServerBootstrap()//将两个NIO线程组作为参数传入到ServerBootstrapbootstrap.group(bossGroup, workerGroup)//创建NioServerSocketChannel.channel(classOf[NioServerSocketChannel])//绑定事件处理类.childHandler(new ChannelInitializer[SocketChannel] {override def initChannel(ch: SocketChannel): Unit = {ch.pipeline().addLast(new ServerHandler1)}})//绑定端口地址端口bootstrap.bind(host, port)}
}object NettyServer1 {def main(args: Array[String]) {val host = "localhost"val port = 8888val server = new NettyServer1server.bind(host, port)}
}

②ServerHandler

package top.doe.netty.demo1import io.netty.channel.{ChannelHandlerContext, ChannelInboundHandlerAdapter}class ServerHandler1 extends ChannelInboundHandlerAdapter {/*** 有客户端与服务端建立连接后调用*/override def channelActive(ctx: ChannelHandlerContext): Unit = {println("ServerHandler的channelActive方法被调用【一个客户端连接上】")}/*** 有客户端与服务端断开连接后调用*/override def channelInactive(ctx: ChannelHandlerContext): Unit = {println("ServerHandler的channelInactive方法被调用【一个客户端与服务端断开连接了】")}/*** 接受客户端发送来的消息*/override def channelRead(ctx: ChannelHandlerContext, msg: scala.Any): Unit = {println("ServerHandler的channelRead方法被调用【收到客户端发送的消息了】")}}

(2) 客户端代码

① NettyClient

package top.doe.netty.demo1import io.netty.bootstrap.Bootstrap
import io.netty.channel.ChannelInitializer
import io.netty.channel.nio.NioEventLoopGroup
import io.netty.channel.socket.SocketChannel
import io.netty.channel.socket.nio.NioSocketChannelclass NettyClient1 {def connect(host: String, port: Int): Unit = {//创建客户端NIO线程组val eventGroup = new NioEventLoopGroup//创建客户端辅助启动类val bootstrap = new Bootstrap//将NIO线程组传入到Bootstrapbootstrap.group(eventGroup)//创建NioSocketChannel.channel(classOf[NioSocketChannel])//绑定事件处理类.handler(new ChannelInitializer[SocketChannel] {override def initChannel(ch: SocketChannel): Unit = {ch.pipeline().addLast(new ClientHandler1)}})//发送连接操作bootstrap.connect(host, port)}
}object NettyClient1 {def main(args: Array[String]) {val host = "localhost"val port = 8888val client = new NettyClient1client.connect(host, port)}
}

②ClientHandler

package top.doe.netty.demo1import io.netty.channel.{ChannelHandlerContext, ChannelInboundHandlerAdapter}class ClientHandler1 extends ChannelInboundHandlerAdapter {/*** 一旦跟服务端建立上连接,channelActive方法将被调用*/override def channelActive(ctx: ChannelHandlerContext): Unit = {println("ClientHandler的channelActive方法被调用!【已经跟服务端连接上了】")}/*** 服务端返回消息后,channelRead方法被调用,该方法用于接送服务端返回的消息*/override def channelRead(ctx: ChannelHandlerContext, msg: scala.Any): Unit = {println("ClientHandler的channelRead方法被调用!")}}

(3) 执行结果

>>客户端

>>服务端

2、案例2

在案例1的基础上,服务端和客户端给彼此发送一条消息。

(1) 基本流程

 (2) 服务端代码

① NettyServer

package com.wakedata.demo2import io.netty.bootstrap.ServerBootstrap
import io.netty.channel.ChannelInitializer
import io.netty.channel.nio.NioEventLoopGroup
import io.netty.channel.socket.SocketChannel
import io.netty.channel.socket.nio.NioServerSocketChannelclass NettyServer2 {def bind(host:String,port:Int) = {val bossGroup = new NioEventLoopGroup()val workerGroup = new NioEventLoopGroup()val bootstrap = new ServerBootstrap()bootstrap.group(bossGroup,workerGroup).channel(classOf[NioServerSocketChannel]).childHandler(new ChannelInitializer[SocketChannel] {override def initChannel(ch: SocketChannel): Unit = {ch.pipeline().addLast(new ServerHandler2)}})bootstrap.bind(host,port)}}object NettyServer2{def main (args: Array[String] ): Unit = {val host = "localhost"val port = 8888val server2 = new NettyServer2server2.bind(host,port)
}
}

 ②ServerHandler

package com.wakedata.demo2import io.netty.buffer.{ByteBuf, Unpooled}
import io.netty.channel.{ChannelHandlerContext, ChannelInboundHandlerAdapter}class ServerHandler2 extends ChannelInboundHandlerAdapter {override def channelActive(ctx: ChannelHandlerContext): Unit = {println("ServerHandler1的channelActive方法被调用[一个客户端连接上]")}override def channelInactive(ctx: ChannelHandlerContext): Unit = {println("ServerHandler1的channelInactive方法被调用[一个客户端断开连接]")}override def channelRead(ctx: ChannelHandlerContext, msg: scala.Any): Unit = {//接收客户端发送过来的消息val byteBuf = msg.asInstanceOf[ByteBuf]val bytes = new Array[Byte](byteBuf.readableBytes())byteBuf.readBytes(bytes)val message = new String(bytes, "UTF-8")println("ServerHandler2的channelRead方法被调用[收到客户端发送的消息了]" + message)//将数据发送到客户端val back = "你好"val resp = Unpooled.copiedBuffer(back.getBytes("UTF-8"))ctx.writeAndFlush(resp)}
}

 (3) 客户端代码

① NettyClient

package com.wakedata.demo2import io.netty.bootstrap.Bootstrap
import io.netty.channel.ChannelInitializer
import io.netty.channel.nio.NioEventLoopGroup
import io.netty.channel.socket.SocketChannel
import io.netty.channel.socket.nio.NioSocketChannelclass NettyClient2 {def connect(host:String,port:Int):Unit = {//创建客户端的NIO线程组val eventGroup = new NioEventLoopGroup()//创建客户端辅助启动类val bootstrap = new Bootstrap()//将NIO线程组传入到Bootstrapbootstrap.group(eventGroup).channel(classOf[NioSocketChannel])//绑定事件处理类.handler(new ChannelInitializer[SocketChannel] {override def initChannel(ch: SocketChannel): Unit = {ch.pipeline().addLast(new ClientHandler2)}})//发送连接操作bootstrap.connect(host,port)}
}object NettyClient2{def main(args: Array[String]): Unit = {val host = "localhost"val port = 8888val client = new NettyClient2()client.connect(host,port)}
}

 ②ClientHandler

package com.wakedata.demo2import io.netty.buffer.{ByteBuf, Unpooled}
import io.netty.channel.{ChannelHandlerContext, ChannelInboundHandlerAdapter}class ClientHandler2 extends ChannelInboundHandlerAdapter{override def channelActive(ctx: ChannelHandlerContext): Unit = {println("ClientHandler2的channelActive方法被调用[已跟服务器建立连接]")//向服务端发送消息val msg = "hello"ctx.writeAndFlush(Unpooled.copiedBuffer(msg.getBytes("UTF-8")))}override def channelRead(ctx: ChannelHandlerContext, msg: Any): Unit = {//读取服务端返回的消息val byteBuf = msg.asInstanceOf[ByteBuf]val bytes = new Array[Byte](byteBuf.readableBytes())byteBuf.readBytes(bytes)val message = new String(bytes, "UTF-8")print("ClientHandler2的channelRead方法被调用,接收到服务器端发送过来的消息:" + message)}
}

(4) 执行结果

>> 客户端

 >> 服务端

3、案例3

在案例2的基础上,添加多个Handler处理器(共计3个)。因为发送的case class对象,所以消息发出之前,会经过一个OurBoundHandler进行序列化,然后接受消息时,首先经过一个InBoundHandler进行解码,然后再经过另外一个 InBoundHandler对解码后的数据进行读取。

 (1) 服务端代码

① NettyServer

package com.wakedata.demo3import io.netty.bootstrap.ServerBootstrap
import io.netty.channel.ChannelInitializer
import io.netty.channel.nio.NioEventLoopGroup
import io.netty.channel.socket.SocketChannel
import io.netty.channel.socket.nio.NioServerSocketChannel
import io.netty.handler.codec.serialization.{ClassResolvers, ObjectDecoder, ObjectEncoder}class NettyServer3 {def bind(host: String, port: Int): Unit = {//配置服务端线程池组//用于服务器接收客户端的连接val bossGroup = new NioEventLoopGroup()//用户进行SocketChannel的网络读写val workerGroup = new NioEventLoopGroup()//是Netty用户启动NIO服务端的辅助启动类,降低服务端的开发复杂度val bootstrap = new ServerBootstrap()//将两个NIO线程组作为参数传入到ServerBootstrapbootstrap.group(bossGroup, workerGroup)//创建NioServerSocketChannel.channel(classOf[NioServerSocketChannel])//绑定I/O事件处理类.childHandler(new ChannelInitializer[SocketChannel] {override def initChannel(ch: SocketChannel): Unit = {//处理输入的数据执行顺序 decoder -> handler//处理返回的数据执行顺序 encoderch.pipeline().addLast("encoder", new ObjectEncoder) //实现了ChannelOutboundHandlerch.pipeline().addLast("decoder", new ObjectDecoder(ClassResolvers.cacheDisabled(getClass.getClassLoader))) //实现了ChannelInboundHandlerch.pipeline().addLast("handler", new ServerHandler3) //实现了ChannelInboundHandler}})val channelFuture = bootstrap.bind(host, port)channelFuture.syncUninterruptibly}
}object NettyServer3 {def main(args: Array[String]) {val host = "localhost"val port = 8888val server = new NettyServer3server.bind(host, port)}
}

② ServerHandler

package com.wakedata.demo3import io.netty.channel.{ChannelHandlerContext, ChannelInboundHandlerAdapter}class ServerHandler3 extends ChannelInboundHandlerAdapter {/*** 有客户端建立连接后调用*/override def channelActive(ctx: ChannelHandlerContext): Unit = {println("一个客户端连接上了...")}/*** 接受客户端发送来的消息*/override def channelRead(ctx: ChannelHandlerContext, msg: scala.Any): Unit = {//进行模式匹配msg match {case RequestMsg(msg) => {println("收到客户端发送的消息:" + msg)//将数据发送到客户端ctx.writeAndFlush(ResponseMsg("haha"))}}}}

③ ResponseMsg

package com.wakedata.demo3case class ResponseMsg(msg: String)

 (2) 服务端代码

① NettyClient

package com.wakedata.demo3import io.netty.bootstrap.Bootstrap
import io.netty.channel.ChannelInitializer
import io.netty.channel.nio.NioEventLoopGroup
import io.netty.channel.socket.SocketChannel
import io.netty.channel.socket.nio.NioSocketChannel
import io.netty.handler.codec.serialization.{ClassResolvers, ObjectDecoder, ObjectEncoder}class NettyClient3 {def connect(host: String, port: Int): Unit = {//创建客户端NIO线程组val eventGroup = new NioEventLoopGroup//创建客户端辅助启动类val bootstrap = new Bootstrap//将NIO线程组传入到Bootstrapbootstrap.group(eventGroup)//创建NioSocketChannel.channel(classOf[NioSocketChannel])//绑定I/O事件处理类.handler(new ChannelInitializer[SocketChannel] {override def initChannel(ch: SocketChannel): Unit = {ch.pipeline().addLast("encoder", new ObjectEncoder)ch.pipeline().addLast("decoder", new ObjectDecoder(ClassResolvers.cacheDisabled(getClass.getClassLoader)))ch.pipeline().addLast("handler", new ClientHandler3)}})//发起异步连接操作val channelFuture = bootstrap.connect(host, port)channelFuture.syncUninterruptibly}
}object NettyClient3 {def main(args: Array[String]) {val host = "localhost"val port = 8888val client = new NettyClient3client.connect(host, port)}
}

② ClientHandler

package com.wakedata.demo3import io.netty.channel.{ChannelHandlerContext, ChannelInboundHandlerAdapter}class ClientHandler3 extends ChannelInboundHandlerAdapter {//一旦跟服务端建立上连接channelActive将被调用override def channelActive(ctx: ChannelHandlerContext): Unit = {println("已经跟服务端连接上了")//向服务端发送case class实例ctx.writeAndFlush(RequestMsg("hello"))}override def channelRead(ctx: ChannelHandlerContext, msg: scala.Any): Unit = {msg match {case ResponseMsg(msg) => {println("收到服务端返回的消息:" + msg)}}}
}

③ RequestMsg

package com.wakedata.demo3case class RequestMsg(content: String)

(3) 执行结果

>>客户端

>>服务端

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.luyixian.cn/news_show_1034306.aspx

如若内容造成侵权/违法违规/事实不符,请联系dt猫网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Java 设计模式系列:备忘录模式

简介 备忘录模式是一种软件设计模式,用于在不破坏封闭的前提下捕获一个对象的内部状态,并在该对象之外保存这个状态。这样以后就可将该对象恢复到原先保存的状态。 备忘录模式提供了一种状态恢复的实现机制,使得用户可以方便地回到一个特定…

Micron FY24 Q2业绩强劲,凭内存实现翻盘

根据TechInsights数据显示,美光科技24财年第二季度业绩强劲,公司通过技术创新和产能优化,成功抓住了AI服务器和其他高性能应用带来的市场需求增长机遇。尽管短期内面临供应紧张的问题,但美光通过加大研发投入和产能转换力度&#…

计算机网络基础——网络安全/ 网络通信介质

chapter3 网络安全与管理 1. 网络安全威胁 网络安全:目的就是要让网络入侵者进不了网络系统,及时强行攻入网络,也拿不走信息,改不了数据,看不懂信息。 事发后能审查追踪到破坏者,让破坏者跑不掉。 网络…

基于java+SpringBoot+Vue的时间管理系统设计与实现

基于javaSpringBootVue的时间管理系统设计与实现 开发语言: Java 数据库: MySQL技术: SpringBoot MyBatis工具: IDEA/Eclipse、Navicat、Maven 系统展示 前台展示 后台展示 系统简介 整体功能包含: 时间管理系统是一个基于Internet的应用程序,旨在…

JavaBean是什么?

Bean的本意为豌豆、子实,在这里引申为一种实体。JavaBean 是一种JAVA语言写成的可重用组件。为写成JavaBean,类必须是具体的和公共的,并且具有无参数的构造器。JavaBean 通过提供符合一致性设计模式的公共方法将内部域暴露成员属性&#xff0…

【unity】unity安装及路线图

学习路线图 二、有关unity的下载 由于unity公司是在国外,所以.com版(https://developer.unity.cn/)不一定稳定,学习时推荐从.cn国内版(https://developer.unity.cn/)前往下载,但是后期仍需回…

React Native框架开发APP,安装免费的图标库(react-native-vector-icons)并使用详解

一、安装图标库 要使用免费的图标库,你可以使用 React Native Vector Icons 库。 首先,确保你已经安装了 react-native-vector-icons: npm install --save react-native-vector-iconsnpm install --save-dev types/react-native-vector-ic…

最新AI工具系统ChatGPT网站运营源码SparkAi系统V6.0版本,GPTs应用、AI绘画、AI换脸、垫图混图、Suno-v3-AI音乐生成大模型全支持

一、前言 SparkAi创作系统是基于ChatGPT进行开发的Ai智能问答系统和Midjourney绘画系统,支持OpenAI-GPT全模型国内AI全模型。本期针对源码系统整体测试下来非常完美,那么如何搭建部署AI创作ChatGPT?小编这里写一个详细图文教程吧。已支持GPT…

bugku-web-cookies

进来以后看到一个巨长的字符串 源码同样 rfrgrggggggoaihegfdiofi48ty598whrefeoiahfeiafehbaienvdivrbgtubgtrsgbvaerubaufibryrfrgrggggggoaihegfdiofi48ty598whrefeoiahfeiafehbaienvdivrbgtubgtrsgbvaerubaufibryrfrgrggggggoaihegfdiofi48ty598whrefeoiahfeiafehbaienvdi…

景顺长城:《重塑与创造——2024 ai+洞察报告》

近期,景顺长城发布了《重塑与创造——2024 ai洞察报告》,报告深入探讨了人工智能(AI)产业的发展现状、未来趋势以及对各行业的潜在影响。报告认为,AI产业发展是多层次、多浪潮的,目前我们处于第二阶段但未来将持续伴随…

Windows 11 专业版 23H2 Docker Desktop 下载 安装 配置 使用

博文目录 文章目录 Docker Desktop准备系统要求 (WSL 2 backend)在 Windows 上打开 WSL 2 功能先决条件开启 WSL 2 WSL下载安装启动配置使用镜像 Image卷积 Volumes容器 Containers 命令RedisMySQLPostGreSQL Docker Desktop Overview of Docker Desktop Docker Desktop 疑难解…

vscode安装

🌈个人主页:Rookie Maker 🏆🏆关注博主,随时获取更多关于IT的优质内容!🏆🏆 😀欢迎来到小田代码世界~ 😁 喜欢的小伙伴记得一键三连哦 ૮(˶ᵔ ᵕ ᵔ˶)ა …

coooooode

1.局部变量在栈上初始化:.stack .const 2.未初始化的全局变量在.bss区 3.初始化的全局变量在.data和.const区

【经典算法】LeetCode14:最长公共前缀(Java/C/Python3实现含注释说明,Easy)

最长公共前缀 题目思路及实现方式一:横向扫描思路代码实现Java版本C语言版本Python3版本 复杂度分析 方式二:纵向扫描思路代码实现Java版本C语言版本Python3版本 复杂度分析 方式三:分治思路代码实现Java版本C语言版本Python3版本 复杂度分析…

【Go】八、常用字符串函数与时间函数

文章目录 1、字符串常用的函数2、常用的时间函数3、内置函数 1、字符串常用的函数 核心包strings 求字符串长度,按字节(len函数内置,不用导包) 字符串遍历 //转切片 r:[]rune(str)字符串与整数的互转 查找是否包含子字符串 re…

Nginx-记

Nginx是一个高性能的web服务器和反向代理服务器,用于HTTP、HTTPS、SMTP、POP3和IMAP协议。因它的稳定性、丰富的功能集、示例配置文件和低系统资源的消耗而闻名。 (1)更快 这表现在两个方面:一方面,在正常情况下&…

elementui日期时间选择框自定义组件

1.需求场景 业务中需要&#xff0c;日期选择框方便客户对日期的选择&#xff08;比如近5天&#xff0c;本周&#xff0c;本月&#xff0c;本年等等&#xff09;&#xff0c;并按小时展示。 2.组件代码MyDateTimeChange.vue <template><el-date-pickerv-model"…

SinoDB备份恢复工具之onbar

onbar是SinoDB数据库的备份工具之一&#xff0c;它可以根据用户选择的线程数量并行地运行备份或恢复。不同于 ontape&#xff0c;onbar 必须先安装和配置存储管理器&#xff0c;进行才能备份和恢复。 1. onbar功能特性 支持选择具体的存储空间进行备份或恢复 支持基于时间点的…

Codeforces Round #818 (Div. 2) A-C

人类智慧 A. 题意&#xff1a;求满足1<a,b<n且lcm(a,b)/gcd(a,b)<3的(a,b)的个数 转化 a/gcd*b*gcd<3 可以划归为1*2 1*1 2*1 3*1 1*3 则可以转变成一个统计倍数问题 #include<bits/stdc.h> using namespace std; using ll long long; using pii pair&…

【总结】在嵌入式设备上可以离线运行的LLM--Llama

文章目录 Llama 简介运用另一种&#xff1a;MLC-LLM 一个令人沮丧的结论在资源受限的嵌入式设备上无法运行LLM&#xff08;大语言模型&#xff09;。 一丝曙光&#xff1a;tinyLlama-1.1b&#xff08;10亿参数&#xff0c;需要至少2.98GB的RAM&#xff09; Llama 简介 LLaMA…