从一次 RPC 请求,探索 MOSN 的工作流程

news/2024/4/28 17:52:27/文章来源:https://blog.csdn.net/SOFAStack/article/details/137063041

589492c7a2e334cafda9cd3b4408165a.gif

王程铭(呈铭)

蚂蚁集团技术工程师,Apache Committer

专注 RPC、Service Mesh 和云原生等领域。

本文  7368  字,预计阅读  15  分钟

前言

MOSN(Modular Open Smart Network)是一款主要使用 Go 语言开发的云原生网络代理平台,由蚂蚁集团开源并经过了双十一大促几十万容器的生产级验证。

MOSN 为服务提供多协议、模块化、智能化、安全的代理能力,融合了大量云原生通用组件,同时也可以集成 Envoy 作为网络库,具备高性能、易扩展的特点。MOSN 可以和 Istio 集成构建 Service Mesh,也可以作为独立的四、七层负载均衡,API Gateway、云原生 Ingress 等使用。

MOSN 作为数据面,整体由 NET/IO、Protocol、Stream、Proxy 四个层次组成,其中:

  • NET/IO 用于底层的字节流传输

  • Protocol 用于协议的 decode/encode

  • Stream 用于封装请求和响应,在一个 conn 上做连接复用

  • Proxy 做 downstream 和 upstream 之间 stream 的转发

那么 MOSN 是如何工作的呢?下图展示的是使用 Sidecar 方式部署运行 MOSN 的示意图,服务和 MOSN 分别部署在同机部署的 Pod 上, 您可以在配置文件中设置 MOSN 的上游和下游协议,协议可以在 HTTP、HTTP2.0 以及 SOFARPC 等中选择。

84b5d708858154b33caff68a61c4da4d.jpeg

以上内容来自官网 https://mosn.io

RPC 场景下 MOSN 的工作机制

RPC 场景下,MOSN 的工作机制示意图如下:

6bd8e2578ca772f43bb356bd27320891.png

我们简单理解一下上面这张图的意义:

1. Server 端 MOSN 会将自身 ingress 的协议端口写入到注册中心;

2. Client 端 MOSN 会从注册中心订阅地址列表,第一次订阅也会返回全量地址列表,端口号是 Server 端 ingress 绑定的端口号;

3. 注册中心会实时推送地址列表变更到 Client 端(全量)

4. Client 端发起 rpc 调用时,请求会被 SDK 打到本地 Client 端 MOSN 的 egress 端口上;

5. Client 端 MOSN 将 RPC 请求通过网络转发,将流量通过负载均衡转发到某一台 Server 端 MOSN 的 ingress 端口处理;

6. 最终到了 Server 端 ingress listener,会转发给本地 Server 应用;

7. 最终会根据原来的 TCP 链路返回。

全局视野下的 MOSN 工作流程

现在我们已经了解了 MOSN 的工作机制,那么 MOSN 作为 MESH 的数据面,是怎么进行流量拦截并且将响应原路返回的呢?

outside_default.png

为了方便大家理解,我将以上时序图内容进行拆分,我们一一攻破。

3.1 建立连接

MOSN 在启动期间,会暴露本地 egress 端口接收 Client 的请求。MOSN 会开启 2 个协程,分别死循环去对 TCP 进行读取和写处理。MOSN 会通过读协程获取到请求字节流,进入 MOSN 的协议层处理。

// 代码路径 mosn.io/mosn/pkg/network/connection.go
func (c *connection) Start(lctx context.Context) {// udp downstream connection do not use read/write loopif c.network == "udp" && c.rawConnection.RemoteAddr() == nil {return}c.startOnce.Do(func() {// UseNetpollMode = falseif UseNetpollMode {c.attachEventLoop(lctx)} else {// 启动读/写循环c.startRWLoop(lctx)}})
}func (c *connection) startRWLoop(lctx context.Context) {// 标记读循环已经启动c.internalLoopStarted = trueutils.GoWithRecover(func() {// 开始读操作c.startReadLoop()}, func(r interface{}) {c.Close(api.NoFlush, api.LocalClose)})// 省略。。。
}

3.2  Protocol 处理

Protocol 作为多协议引擎层,对数据包进行检测,并使用对应协议做 decode/encode 处理。MOSN 会循环解码,一旦收到完整的报文就会创建与其关联的 xstream,用于保持 tcp 连接用于后续响应。

// 代码路径 mosn.io/mosn/pkg/stream/xprotocol/conn.go
func (sc *streamConn) Dispatch(buf types.IoBuffer) {// decode framesfor {// 协议 decode,比如 dubbo、bolt 协议等frame, err := sc.protocol.Decode(streamCtx, buf)if frame != nil {// 创建和请求 frame 关联的 xstream,用于保持 tcp 连接用于后续响应sc.handleFrame(streamCtx, xframe)}}
}func (sc *streamConn) handleFrame(ctx context.Context, frame api.XFrame) {switch frame.GetStreamType() {case api.Request:// 创建和请求 frame 关联的 xstream,用于保持 tcp 连接用于后续响应,之后进入 proxy 层sc.handleRequest(ctx, frame, false)}
}func (sc *streamConn) handleRequest(ctx context.Context, frame api.XFrame, oneway bool) {// 创建和请求 frame 关联的 xstreamserverStream := sc.newServerStream(ctx, frame)// 进入 proxy 层并创建 downstreamserverStream.receiver = sc.serverCallbacks.NewStreamDetect(serverStream.ctx, sender, span)serverStream.receiver.OnReceive(serverStream.ctx, frame.GetHeader(), frame.GetData(), nil)
}

3.3  Proxy 层处理

proxy 层负责 filter 请求/响应链、路由匹配、负载均衡最终将请求转发到集群的某台机器上。

  downstream 部分  

// 代码路径 mosn.io/mosn/pkg/proxy/downstream.go
func (s *downStream) OnReceive(ctx context.Context, headers types.HeaderMap, data types.IoBuffer, trailers types.HeaderMap) {s.downstreamReqHeaders = headers// filter 请求/响应链、路由匹配、负载均衡phase = s.receive(s.context, id, phase)
}func (s *downStream) receive(ctx context.Context, id uint32, phase types.Phase) types.Phase {for i := 0; i <= int(types.End-types.InitPhase); i++ {s.phase = phaseswitch phase {// downstream filter 相关逻辑case types.DownFilter:s.printPhaseInfo(phase, id)s.tracks.StartTrack(track.StreamFilterBeforeRoute)s.streamFilterChain.RunReceiverFilter(s.context, api.BeforeRoute,s.downstreamReqHeaders, s.downstreamReqDataBuf, s.downstreamReqTrailers, s.receiverFilterStatusHandler)s.tracks.EndTrack(track.StreamFilterBeforeRoute)if p, err := s.processError(id); err != nil {return p}phase++// route 相关逻辑case types.MatchRoute:s.printPhaseInfo(phase, id)s.tracks.StartTrack(track.MatchRoute)s.matchRoute()s.tracks.EndTrack(track.MatchRoute)if p, err := s.processError(id); err != nil {return p}phase++// 在集群中选择一个机器、包含cluster和loadblancecase types.ChooseHost:s.printPhaseInfo(phase, id)s.tracks.StartTrack(track.LoadBalanceChooseHost)// 这里很重要,在选中一个机器之后,这里upstreamRequest对象有两个作用// 1. 这里通过持有downstream保持着对客户端app的tcp引用,用来接收请求// 2. 转发服务端tcp引用,转发客户端app请求以及响应服务端response时的通知s.chooseHost(s.downstreamReqDataBuf == nil && s.downstreamReqTrailers == nil)s.tracks.EndTrack(track.LoadBalanceChooseHost)if p, err := s.processError(id); err != nil {return p}phase++}}
}

  upstream 部分  

至此已经选中一台服务端的机器,开始准备转发。

// 代码路径 mosn.io/mosn/pkg/proxy/upstream.go
func (r *upstreamRequest) appendHeaders(endStream bool) {if r.downStream.oneway {_, streamSender, failReason = r.connPool.NewStream(r.downStream.context, nil)} else {// 会使用 ChooseHost 中选中的机器 host 创建 sender,xstream 是客户端的流对象_, streamSender, failReason = r.connPool.NewStream(r.downStream.context, r)}
}

接下来会到达 conn.go 的 handleFrame 的 handleResponse 方法,此时 handleResponse 方法继续调用 downStream 的 receiveData 方法接收数据。

//代码路径 mosn.io/mosn/pkg/stream/xprotocol/conn.go
func (sc *streamConn) handleFrame(ctx context.Context, frame api.XFrame) {switch frame.GetStreamType() {case api.Response:// 调用 downStream 的 receiveData 方法接收数据// 因为 mosn 在转发之前修改了请求id,因此会重新 encode 请求sc.handleResponse(ctx, frame)}
}

一旦准备好转发就会通过 upstreamRequest 选择的下游主机直接发送 write 请求,请求的协程此时会被阻塞。

// 代码路径 mosn.io/mosn/pkg/stream/xprotocol/stream.go
func (s *xStream) endStream() {defer func() {if s.direction == stream.ServerStream {s.DestroyStream()}}()if log.Proxy.GetLogLevel() >= log.DEBUG {log.Proxy.Debugf(s.ctx, "[stream] [xprotocol] connection %d endStream, direction = %d, requestId = %v", s.sc.netConn.ID(), s.direction, s.id)}if s.frame != nil {// replace requestIDs.frame.SetRequestId(s.id)// 因为 mosn 在转发之前修改了请求 id,因此会重新 encode 请求buf, err := s.sc.protocol.Encode(s.ctx, s.frame)if err != nil {log.Proxy.Errorf(s.ctx, "[stream] [xprotocol] encode error:%s, requestId = %v", err.Error(), s.id)s.ResetStream(types.StreamLocalReset)return}tracks := track.TrackBufferByContext(s.ctx).Trackstracks.StartTrack(track.NetworkDataWrite)// 一旦准备好转发就会通过upstreamRequest选择的下游主机直接发送 write 请求,请求的协程此时会被阻塞err = s.sc.netConn.Write(buf)tracks.EndTrack(track.NetworkDataWrite)}}
}

3.4  准备将响应写回客户端

接下来客户端 xstream 将通过读协程接收响应的字节流,proxy.go 的 OnData 方法作为 proxy 层的数据接收点。

// 代码位置 mosn.io/mosn/pkg/proxy/proxy.go
func (p *proxy) OnData(buf buffer.IoBuffer) api.FilterStatus {// 这里会做两件事// 1. 调用 protocol 层进行decode// 2. 完成后通知upstreamRequest对象,唤醒downstream阻塞的协程p.serverStreamConn.Dispatch(buf)return api.Stop
}// 代码位置 mosn.io/mosn/pkg/proxy/upstream.go
func (r *upstreamRequest) OnReceive(ctx context.Context, headers types.HeaderMap, data types.IoBuffer, trailers types.HeaderMap) {// 结束当前streamr.endStream()// 唤醒r.downStream.sendNotify()
}

downstream 被唤醒处理收到的响应,重新替换回正确的请求 ID,并调用 protocol 层重新编码成字节流写回客户端,最后销毁请求相关的资源,流程执行完毕。

// 比如我的 demo 是 dubbo 协议
func encodeFrame(ctx context.Context, frame *Frame) (types.IoBuffer, error) {// 1. fast-path, use existed raw dataif frame.rawData != nil {// 1.1 replace requestIdbinary.BigEndian.PutUint64(frame.rawData[IdIdx:], frame.Id)// hack: increase the buffer count to avoid premature recycleframe.data.Count(1)return frame.data, nil}// alloc encode bufferframeLen := int(HeaderLen + frame.DataLen)buf := buffer.GetIoBuffer(frameLen)// encode headerbuf.WriteByte(frame.Magic[0])buf.WriteByte(frame.Magic[1])buf.WriteByte(frame.Flag)buf.WriteByte(frame.Status)buf.WriteUint64(frame.Id)buf.WriteUint32(frame.DataLen)// encode payloadbuf.Write(frame.payload)return buf, nil
}

总结

本文以工作中非常常见的一个思路为出发点,详细描述了 MOSN 内部网络转发的详细流程,希望可以帮助小伙伴加深对 MOSN 的理解。

MOSN 是一款非常优秀的开源产品。MOSN 支持多种网络协议(如HTTP/2, gRPC, Dubbo 等),并且能够很容易地增加对新协议的支持;MOSN 提供了丰富的流量治理功能,例如限流、熔断、重试、负载均衡等;MOSN 在性能方面进行了大量优化,比如内存零拷贝、自适应缓冲区、连接池、协程池等,这些都有助于提升其在高并发环境下的表现。此外在连接管理方面,MOSN 设计了多协议连接池;在内存管理方面,MOSN 在 sync.Pool 之上封装了一层资源对的注册管理模块,可以方便的扩展各种类型的对象进行复用和管理。总的来说,MOSN 的设计体现了可扩展性、高性能、安全性以及对现代云环境的适应性等多方面的考虑。

对于开发者来说,深入研究 MOSN 的代码和架构,无疑可以学到很多关于高性能网络编程和云原生技术的知识。MOSN 社区欢迎您的加入!

  • MOSN 官网:https://mosn.io/

  • MOSN Github:https://github.com/mosn/mosn

f953581bce51073cc9fb8ab2e55c74dc.png

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.luyixian.cn/news_show_1026334.aspx

如若内容造成侵权/违法违规/事实不符,请联系dt猫网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

吴恩达深度学习笔记:神经网络的编程基础2.5-2.8

目录 第一门课&#xff1a;神经网络和深度学习 (Neural Networks and Deep Learning)第二周&#xff1a;神经网络的编程基础 (Basics of Neural Network programming)2.5 导数&#xff08;Derivatives&#xff09;2.6 更多的导数例子&#xff08;More Derivative Examples&…

Node.js学习(一)

版权声明 本文章由B站上的黑马课程整理所得&#xff0c;仅供个人学习交流使用。如涉及侵权问题&#xff0c;请立即与本人联系&#xff0c;本人将积极配合删除相关内容。感谢理解和支持&#xff0c;本人致力于维护原创作品的权益&#xff0c;共同营造一个尊重知识产权的良好环境…

【二叉树】Leetcode 543. 二叉树的直径【简单】

二叉树的直径 给你一棵二叉树的根节点&#xff0c;返回该树的 直径 。 二叉树的 直径 是指树中任意两个节点之间最长路径的 长度 。这条路径可能经过也可能不经过根节点 root 。 两节点之间路径的 长度 由它们之间边数表示。 示例1&#xff1a; 输入&#xff1a;root [1,2…

C语言实现顺序表(增,删,改,查)

目录 一.概念&#xff1a; 1.静态顺序表&#xff1a;使用定长数组存储元素。 2.动态顺序表&#xff1a;使用动态开辟的数组存储。 二.顺序表的实现: 1.顺序表增加元素 1.检查顺序表 2.头插 3.尾插 2.顺序表删除元素 1.头删 2.尾删 3.指定位置删 3.顺序表查找元素 …

使用Qt生成图片

Qt之生成png/jpg/bmp格式图片_qt生成图片-CSDN博客 (1)使用QPainter 示例关键代码&#xff1a; QImage image(QSize(this->width(),this->height()),QImage::Format_ARGB32);image.fill("white");QPainter *painter new QPainter(&image);painter->…

深入浅出:探索Hadoop生态系统的核心组件与技术架构

目录 前言 HDFS Yarn Hive HBase Spark及Spark Streaming 书本与课程推荐 关于作者&#xff1a; 推荐理由&#xff1a; 作者直播推荐&#xff1a; 前言 进入大数据阶段就意味着 进入NoSQL阶段&#xff0c;更多的是面向OLAP场景&#xff0c;即数据仓库、BI应用等。 …

TCPView下载安装使用教程(图文教程)超详细

「作者简介」&#xff1a;CSDN top100、阿里云博客专家、华为云享专家、网络安全领域优质创作者 「推荐专栏」&#xff1a;更多干货&#xff0c;请关注专栏《网络安全自学教程》 TCPView是微软提供的一款「查看网络连接」和进程的工具&#xff0c;常用来查看电脑上的TCP/UDP连接…

【Leetcode】2580. 统计将重叠区间合并成组的方案数

文章目录 题目思路代码复杂度分析时间复杂度空间复杂度 结果总结 题目 题目链接&#x1f517; 给你一个二维整数数组 ranges &#xff0c;其中 ranges[i] [starti, endi] 表示 starti 到 endi 之间&#xff08;包括二者&#xff09;的所有整数都包含在第 i 个区间中。 你需要…

MappedByteBuffer VS FileChannel:从内核层面对比两者的性能差异

本文基于 Linux 内核 5.4 版本进行讨论 自上篇文章《从 Linux 内核角度探秘 JDK MappedByteBuffer》 发布之后&#xff0c;很多读者朋友私信我说&#xff0c;文章的信息量太大了&#xff0c;其中很多章节介绍的内容都是大家非常想要了解&#xff0c;并且是频繁被搜索的内容&…

ubuntu 中安装docker

1 资源地址 进入ubuntu官网下载Ubuntu23.04的版本的镜像 2 安装ubuntu 这里选择再Vmware上安装Ubuntu23.04.6 创建一个虚拟机&#xff0c;下一步下一步 注意虚拟机配置网络桥接&#xff0c;CD/DVD选择本地的镜像地址 开启此虚拟机&#xff0c;下一步下一步等待镜像安装。 3…

Git bash获取ssh key

目录 1、获取密钥 2、查看密钥 3、在vs中向GitHub推送代码 4、重新向GitHub推送修改过的代码 1、获取密钥 指令&#xff1a;ssh-keygen -t rsa -C "邮箱地址" 连续按三次回车&#xff0c;直到出现类似以下界面&#xff1a; 2、查看密钥 路径&#xff1a;C:\U…

银行监管报送系统介绍(十一):金融基础数据报送系统

为了全面落实和实现国务院办公厅下发《关于全面推进金融业综合统计工作的意见》中的综合统计工作的总体目标&#xff0c;中国人民银行调查统计司于2020年6月12日下发了《关于建立金融基础数据统计制度的通知&#xff08;试行&#xff09;》。 2020金融基础数据采集报送 报送时…

Kubernetes概念:服务、负载均衡和联网:2. Gateway API

Gateway API 官方文档&#xff1a;https://kubernetes.io/zh-cn/docs/concepts/services-networking/gateway/ Gateway API 通过使用可扩展的、角色导向的、 协议感知的配置机制来提供网络服务。它是一个附加组件&#xff0c; 包含可提供动态基础设施配置和高级流量路由的 API…

9.windows ubuntu 子系统,centrifuge:微生物物种分类。

上次我们用了karken2和bracken进行了物种分类&#xff0c;这次我们使用centrifuge. Centrifuge 是一种用于快速和准确进行微生物分类和物种鉴定的软件。其主要功能包括&#xff1a; 快速分类和物种鉴定: Centrifuge 可以对高通量测序数据&#xff08;如 metagenomic 或 RNA-Se…

[NLP] 初窥000001

NL(natural language)–自然语言 人类的语言–中文&#xff0c;英语&#xff0c;法语 NLP(Natural Language Processing)–自认语言处理 计算机处理人类语言的技术&#xff0c;它包含翻译、智能问答、文本分类、情感分析等常见应用。 CV(Computational Vision) 感知NLP 认知…

【Java程序设计】【C00388】基于(JavaWeb)Springboot的校园竞赛管理系统(有论文)

Springboot的校园竞赛管理系统&#xff08;有论文&#xff09; 项目简介项目获取开发环境项目技术运行截图 博主介绍&#xff1a;java高级开发&#xff0c;从事互联网行业六年&#xff0c;已经做了六年的毕业设计程序开发&#xff0c;开发过上千套毕业设计程序&#xff0c;博客…

2024/3/27打卡更小的数(十四届蓝桥杯)——区间DP

目录 题目 思路 代码 题目 思路 题目说求数组某个区间中的数进行翻转&#xff0c;由于区间选择多&#xff0c;首先想到DP问题。 第一版想到的方法&#xff08;错误的&#xff09;&#xff0c;当进行状态计算的时候&#xff0c;无法判定区间是否翻转后满足要求&#xff0c;…

js改变图片曝光度(高亮度)

方法一&#xff1a; 原理&#xff1a; 使用canvas进行滤镜操作&#xff0c;通过改变图片数据每个像素点的RGB值来提高图片亮度。 缺点 当前项目使用的是svg&#xff0c;而不是canvas 调整出来的效果不是很好&#xff0c;图片不是高亮&#xff0c;而是有些发白 效果 代码 …

阿里云ECS选型推荐配置

本文介绍构建Kubernetes集群时该如何选择ECS类型以及选型的注意事项。 集群规格规划 目前在创建Kubernetes集群时&#xff0c;存在着使用很多小规格ECS的现象&#xff0c;这样做有以下弊端&#xff1a; 网络问题&#xff1a;小规格Worker ECS的网络资源受限。 容量问题&…

验证码/数组元素的复制.java

1&#xff0c;验证码 题目&#xff1a;定义方法实现随机产生一个5位的验证码&#xff0c;前面四位是大写或小写的英文字母&#xff0c;最后一位是数字 分析&#xff1a;定义一个包含所有大小写字母的数组&#xff0c;然后对数组随机抽取4个索引&#xff0c;将索引对应的字符拼…