Go 控制协程(goroutine)的并发数量

news/2024/7/25 2:29:32/文章来源:https://blog.csdn.net/qq_39335595/article/details/139126137

在使用协程并发处理某些任务时, 其并发数量往往因为各种因素的限制不能无限的增大. 例如网络请求、数据库查询等等。

从运行效率角度考虑,在相关服务可以负载的前提下(限制最大并发数),尽可能高的并发。

在Go语言中,可以使用一些方法来控制协程(goroutine)的并发数量,以防止并发过多导致资源耗尽或性能下降

1、使用信号量(Semaphore)

可以使用 Go 语言中的 channel 来实现简单的信号量,限制并发数量

package mainimport ("fmt""sync"
)func worker(id int, sem chan struct{}) {sem <- struct{}{} // 占用一个信号量defer func() {<-sem // 方法运行结束释放信号量}()// 执行工作任务fmt.Printf("Worker %d: Working...\n", id)
}func main() {concurrency := 3sem := make(chan struct{}, concurrency)var wg sync.WaitGroupfor i := 0; i < 10; i++ {wg.Add(1)go func(id int) {defer wg.Done()worker(id, sem)}(i)}wg.Wait()close(sem)
}

sem 是一个有缓冲的 channel,通过控制 channel 中元素的数量,实现了一个简单的信号量机制 

 2、使用协程池

可以创建一个固定数量的协程池,将任务分发给这些协程执行。 

package mainimport ("fmt""sync"
)func worker(id int, jobs <-chan int, results chan<- int) {//jobs等待主要协程往jobs放数据for j := range jobs {fmt.Printf("协程池 %d: 协程池正在工作 %d\n", id, j)results <- j}
}func main() {const numJobs = 5    //协程要做的工作数量const numWorkers = 3 //协程池数量jobs := make(chan int, numJobs)results := make(chan int, numJobs)var wg sync.WaitGroup// 启动协程池for i := 1; i <= numWorkers; i++ {wg.Add(1)go func(id int) {defer wg.Done()worker(id, jobs, results)}(i)}// 提交任务for j := 1; j <= numJobs; j++ {jobs <- j}close(jobs)// 等待所有工作完成go func() {wg.Wait()close(results)}()// 处理结果for result := range results {fmt.Println("Result:", result)}
}

jobs 通道用于存储任务,results 通道用于存储处理结果。通过创建固定数量的工作协程,可以有效地控制并发数量。

3、使用其他包

Go 1.16 引入了 golang.org/x/sync/semaphore 包,它提供了一个更为灵活的信号量实现。

案例一

限制对外部API的并发请求 

假设我们有一个外部API,它对并发请求有限制,我们希望确保不超过这个限制。我们可以使用semaphore.Weighted来控制对API的并发访问

package mainimport ("context""fmt""golang.org/x/sync/semaphore""sync""time"
)func main() {/*1、在并发量一定的情况下,通过改变允许并发请求数可以更快处理请求任务(在CPU够用的前提下)2、sem := semaphore.NewWeighted(n),参数n就是权重量3、当一个协程需要获取的单位的权重越多,运行就会慢(比如权重总量n=5个,一个协程分配了2个,跟一个协程分配1个效率是不一样的)4、信号量没有足够的可用权重的情况发生在所有已分配的权重单位都已经被占用,即信号量的当前权重计数达到了它的总容量。在这种情况下,任何尝试通过Acquire方法获取更多权重的调用都将无法立即完成,从而导致调用者(通常是goroutine)阻塞,直到其他调用者释放一些权重单位。*//*1、权权重较大的任务在资源竞争时有更高的优先级,更容易获得执行的机会2、如果当前资源足够满足高权重任务的需求,这些任务将立即执行;若资源不足,则按照权重高低顺序排队等待3、一旦任务开始执行,其完成的速度主要取决于任务自身的逻辑复杂度、所需资源以及系统的当前负载等因素,与任务在信号量中的权重无关3、高权重的任务并不会中断已经在执行的低权重任务,而是等待这些任务自行释放资源。一旦资源释放,等待队列中的高权重任务将优先被唤醒4、Acquire 方法会检查当前信号量的可用资源量是否满足请求的权重,如果满足,则立即减少信号量的资源计数并返回,允许任务继续执行。如果不满足,任务将阻塞等待,直到有足够的资源被释放*/// 记录开始时间startTime := time.Now()// 假设外部API允许的最大并发请求为5(信号量的总容量是5个权重单位)const (maxConcurrentRequests = 5)sem := semaphore.NewWeighted(maxConcurrentRequests)var wg sync.WaitGroup// 模拟对外部API的10个并发请求for i := 0; i < 10; i++ {wg.Add(1)go func(requestId int) {defer wg.Done()// 假设我们想要获取2个单位的权重if err := sem.Acquire(context.Background(), 2); err != nil {fmt.Printf("请求 %d 无法获取信号量: %v\n", requestId, err)return}defer sem.Release(2) // 请求完成后释放信号量// 模拟对API的请求处理fmt.Printf("请求 %d 开始...\n", requestId)time.Sleep(2 * time.Second) // 模拟网络延迟fmt.Printf("请求 %d 完成。\n", requestId)}(i)}wg.Wait()// 记录结束时间endTime := time.Now()// 计算并打印总耗时fmt.Printf("程序总耗时: %v\n", endTime.Sub(startTime))
}

信号量没有足够的可用权重的情况发生在所有已分配的权重单位都已经被占用,即信号量的当前权重计数达到了它的总容量。在这种情况下,任何尝试通过Acquire方法获取更多权重的调用都将无法立即完成,从而导致调用者(通常是goroutine)阻塞,直到其他调用者释放一些权重单位。

以下是一些导致信号量没有足够可用权重的具体情况:

  1. 信号量初始化容量较小:如果信号量的总容量设置得较小,而并发请求的数量较大,那么很快就会出现权重不足的情况。

  2. 长时间占用权重:如果某些goroutine长时间占用权重单位而不释放,这会导致其他goroutine无法获取到权重,即使这些goroutine只是少数。

  3. 权重分配不均:在某些情况下,可能存在一些goroutine占用了不成比例的权重单位,导致其他goroutine无法获取足够的权重。

  4. 权重释放不及时:如果goroutine因为错误或异常情况提前退出,而没有正确释放它们所占用的权重,那么这些权重单位将不会被回收到信号量中。

  5. 高频率的请求:在短时间内有大量goroutine请求权重,即使它们请求的权重不大,累积起来也可能超过信号量的总容量。

  6. 信号量权重未正确管理:如果信号量的权重管理逻辑存在缺陷,例如错误地释放了过多的权重,或者在错误的时间点释放权重,也可能导致可用权重不足。

为了避免信号量没有足够的可用权重,可以采取以下措施:

  • 合理设置信号量容量:根据资源限制和并发需求合理设置信号量的总容量。
  • 及时释放权重:确保在goroutine完成工作后及时释放权重。
  • 使用超时:在Acquire调用中使用超时,避免无限期地等待权重。
  • 监控和日志记录:监控信号量的使用情况,并记录关键信息,以便及时发现和解决问题。
  • 权重分配策略:设计合理的权重分配策略,确保权重的公平和高效分配。

通过这些措施,可以更好地管理信号量的使用,避免因权重不足导致的并发问题。

案例二

假设有一个在线视频平台,它需要处理不同分辨率的视频转码任务。由于高清视频转码比标清视频更消耗计算资源,因此平台希望设计一个系统,能够优先处理更多标清视频转码请求,同时又不完全阻塞高清视频的转码,以保持整体服务质量和资源的有效利用。

package mainimport ("fmt""golang.org/x/net/context""golang.org/x/sync/semaphore""runtime""sync""time"
)// VideoTranscodeJob 视频转码任务
type VideoTranscodeJob struct {resolution stringweight     int64
}func main() {cpuCount := runtime.NumCPU()fmt.Printf("当前CPU个数%v\n", cpuCount)/*1、权权重较大的任务在资源竞争时有更高的优先级,更容易获得执行的机会2、如果当前资源足够满足高权重任务的需求,这些任务将立即执行;若资源不足,则按照权重高低顺序排队等待3、一旦任务开始执行,其完成的速度主要取决于任务自身的逻辑复杂度、所需资源以及系统的当前负载等因素,与任务在信号量中的权重无关3、高权重的任务并不会中断已经在执行的低权重任务,而是等待这些任务自行释放资源。一旦资源释放,等待队列中的高权重任务将优先被唤醒4、Acquire 方法会检查当前信号量的可用资源量是否满足请求的权重,如果满足,则立即减少信号量的资源计数并返回,允许任务继续执行。如果不满足,任务将阻塞等待,直到有足够的资源被释放*/// 初始化两个信号量,一个用于标清,一个用于高清,假设总共有8个CPU核心可用normalSem := semaphore.NewWeighted(6)  // 标清任务,分配6个单位权重,因为它们消耗资源较少highDefSem := semaphore.NewWeighted(2) // 高清任务,分配2个单位权重,因为它们更消耗资源var wg sync.WaitGroup//假设有20个需要转码的视频videoJobs := []VideoTranscodeJob{{"HD", 2}, {"HD", 2}, {"SD", 1}, {"SD", 1}, {"HD", 2},{"SD", 1}, {"SD", 1}, {"HD", 2}, {"SD", 1}, {"HD", 2},{"SD", 4}, {"SD", 4}, {"HD", 2}, {"SD", 1}, {"HD", 2},{"SD", 1}, {"SD", 4}, {"HD", 2}, {"SD", 6}, {"HD", 2},}for _, job := range videoJobs {wg.Add(1)go func(job VideoTranscodeJob) {defer wg.Done()var sem *semaphore.Weightedswitch job.resolution {case "SD":sem = normalSem //分配权重大,当前为6,任务在获取执行机会上有优势,但并不直接意味着执行速度快case "HD":sem = highDefSemdefault:panic("无效的分辨率")}if err := sem.Acquire(context.Background(), job.weight); err != nil {fmt.Printf("名为 %s 视频无法获取信号量: %v\n", job.resolution, err)return}defer sem.Release(job.weight) //释放权重对应的信号量// 模拟转码任务执行fmt.Printf("转码 %s 视频 (权重: %d)...\n", job.resolution, job.weight)//通过利用VideoTranscodeJob的weight值来模拟转码时间的长短,HD用时长则设置2比SD的1大,*时间就自然长,运行就时间长time.Sleep(time.Duration(job.weight*100) * time.Millisecond) // 模拟不同分辨率视频转码所需时间fmt.Printf("------------------------%s 视频转码完成。。。\n", job.resolution)}(job)}wg.Wait()
}

标清(SD)和高清(HD),分别分配了不同的权重(1和2)。通过创建两个不同权重的信号量,我们可以控制不同类型任务的同时执行数量,从而优先保证标清视频的快速处理,同时也确保高清视频能够在不影响系统稳定性的情况下进行转码。这展示了带权重的并发控制如何帮助在资源有限的情况下优化任务调度和执行效率。

 注意:对协程分配的权重单位数不能大于对应上下文semaphore.NewWeighted(n)中参数n的单位数

案例三

package mainimport ("context""fmt""sync""time""golang.org/x/sync/semaphore"
)type weightedTask struct {id     intweight int64
}func main() {const (maxTotalWeight = 20 // 最大总权重)sem := semaphore.NewWeighted(maxTotalWeight)var wg sync.WaitGrouptasksCh := make(chan weightedTask, 10)// 发送任务for i := 1; i <= 10; i++ {tasksCh <- weightedTask{id: i, weight: int64(i)} // 假设任务ID即为其权重}close(tasksCh)// 启动任务处理器for task := range tasksCh {wg.Add(1)go func(task weightedTask) {defer wg.Done()if err := sem.Acquire(context.Background(), int64(task.id)); err != nil {fmt.Printf("任务 %d 无法获取信号量: %v\n", task.id, err)return}defer sem.Release(int64(task.id)) //释放// 模拟任务执行fmt.Printf("任务 %d (权重: %d) 正在运行...\n", task.id, task.weight)time.Sleep(time.Duration(task.weight*100) * time.Millisecond) // 示例中简单用时间模拟权重影响fmt.Printf("任务 %d 完成.\n", task.id)}(task)}wg.Wait()
}

 

总结

选择哪种方法取决于具体的应用场景和需求。使用信号量是一种简单而灵活的方法,而协程池则更适用于需要批量处理任务的情况。golang.org/x/sync/semaphore 包提供了一个标准库外的更灵活的信号量实现         

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.luyixian.cn/news_show_1052877.aspx

如若内容造成侵权/违法违规/事实不符,请联系dt猫网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Linux 一键部署alfresco 6

alfresco 前言 Alfresco是一个流行的企业级开源内容管理系统和协作平台。它提供了丰富的功能,包括文档管理、记录管理、协作工具、工作流管理、搜索和版本控制等。Alfresco还具有灵活的部署选项,可以作为本地部署的软件或云服务来使用。 该平台可以帮助组织管理和存储各种类…

WPS文件没有保存怎么恢复?5个解决方案轻松恢复!

“我在WPS上编辑了一个文件&#xff0c;但是还没来得及将它保存&#xff0c;我不小心就退出软件了&#xff0c;现在不知道有什么方法可以恢复WPS文件呢&#xff1f;大家可以帮帮我吗” WPS作为一款功能强大且用户友好的软件&#xff0c;给我们的工作带来了很多的便利。但我们在…

适用于Android的最佳数据恢复软件

如果您的 Android 设备崩溃&#xff0c;您需要找到一种方法来取回您的数据。幸运的是&#xff0c;有许多数据恢复程序可以帮助您恢复丢失的文件。有些是免费的&#xff0c;而另一些则需要付费。这是适用于Android设备的最佳数据恢复软件列表。 什么是数据恢复软件&#xff1f; …

紫光展锐前沿探索 | 满足未来6G多差异化应用场景的技术体系思考

在6G架构/系统设计中&#xff0c;紫光展锐提出了未来6G空口“一体多翼”的技术体系概念&#xff0c;即“Big-Lite Multi-RAT”。本文将详细对该技术体系展开介绍。 “一体多翼”技术体系通过 “体”&#xff08;Big RAT&#xff09;和“翼”&#xff08;Lite RAT&#xff09;的…

Java语言ADR药物不良反应系统源码Java+IntelliJ+IDEA+MySQL一款先进的药物警戒系统

Java语言ADR药物不良反应系统源码JavaIntelliJIDEAMySQL一款先进的药物警戒系统源码 ADR药物不良反应监测系统是一个综合性的监测平台&#xff0c;旨在收集、报告、分析和评价药品在使用过程中可能出现的不良反应&#xff0c;以确保药品的安全性和有效性。 以下是对该系统的详细…

C#【进阶】俄罗斯方块

俄罗斯方块 文章目录 Test1_场景切换相关BeginScene.csBegionOrEndScene.csEndScene.csGame.csGameScene.csISceneUpdate.cs Test2_绘制对象基类和枚举信息DrawObject.csIDraw.csPosition.cs Test3_地图相关Map.cs Test4_坐标信息类BlockInfo.cs Test5_板砖工人类BlockWorker.…

04_前端三大件JS

文章目录 JavaScript1.JS的组成部分2.JS引入2.1 直接在head中通过一对script标签定义脚本代码2.2创建JS函数池文件&#xff0c;所有html文件共享调用 3.JS的数据类型和运算符4.分支结构5.循环结构6.JS函数的声明7.JS中自定义对象8.JS_JSON在客户端使用8.1JSON串格式8.2JSON在前…

Python与OpenCV:图像处理与计算机视觉实战指南

前言 OpenCV&#xff08;Open Source Computer Vision Library&#xff09;是一个开源的计算机视觉和机器学习软件库&#xff0c;它包含了数百种计算机视觉算法&#xff0c;包括图像处理、视频分析、物体检测、面部识别等。结合Python语言的强大功能&#xff0c;OpenCV可以用于…

java医院管理系统源码(springboot+vue+mysql)

风定落花生&#xff0c;歌声逐流水&#xff0c;大家好我是风歌&#xff0c;混迹在java圈的辛苦码农。今天要和大家聊的是一款基于springboot的医院管理系统。项目源码以及部署相关请联系风歌&#xff0c;文末附上联系信息 。 项目简介&#xff1a; 医院管理系统的主要使用者分…

【AJAX前端框架】Asynchronous Javascript And Xml

1 传统请求及缺点 传统的请求都有哪些&#xff1f; 直接在浏览器地址栏上输入URL。点击超链接提交form表单使用JS代码发送请求 window.open(url)document.location.href urlwindow.location.href url… 传统请求存在的问题 页面全部刷新导致了用户的体验较差。传统的请求导…

Android15 Beta更新速览

Android15 Beta更新速览 前台服务变更 前台服务使应用保持活动状态&#xff0c;以便它们可以执行关键且对用户可见的操作&#xff0c;通常以牺牲电池寿命为代价。在 Android 15 Beta 2 中&#xff0c;dataSync 和 mediaProcessing 前台服务类型现在具有约 6 小时的超时时间&a…

Python 植物大战僵尸

文章目录 效果图项目结构实现思路源代码 效果图 项目结构 实现思路 下面是代码的实现思路&#xff1a; 导入必要的库和模块&#xff1a;首先&#xff0c;我们导入了Python的os、time库以及pygame库&#xff0c;还有植物大战僵尸游戏中用到的各个植物和僵尸的类。 初始化游戏和…

AI写作工具的革命:AIGC如何提升内容生产效率

AIGC&#xff0c;即人工智能生成内容&#xff0c;是一种新兴的内容生产方式&#xff0c;它利用人工智能技术来自动生成文本、图像、音频、视频等多种形式的内容即进入实际应用层面。 所以AI不再是高深的、让人望尘莫及的算力算法&#xff0c;而是真实地贴近了我们的生活&#…

VectorDBBench在windows的调试

VectorDBBench在windows的调试 VectorDBBench是一款向量数据库基准测试工具&#xff0c;支持milvus、Zilliz Cloud、Elastic Search、Qdrant Cloud、Weaviate Cloud 、 PgVector、PgVectorRS等&#xff0c;可以测试其QPS、时延、recall。 VectorDBBench是一款使用python编写的…

鸿蒙ArkUI-X跨语言调用说明:【平台桥接(@arkui-x.bridge)】

平台桥接(arkui-x.bridge) 简介 平台桥接用于客户端&#xff08;ArkUI&#xff09;和平台&#xff08;Android或iOS&#xff09;之间传递消息&#xff0c;即用于ArkUI与平台双向数据传递、ArkUI侧调用平台的方法、平台调用ArkUI侧的方法。 以Android平台为例&#xff0c;Ark…

OM电商系统asp.net

OM电商系统&#xff0c;可以让顾客全面了解商品的详细信息&#xff0c;消除网上购物的信息不对称问题。通过商品分类来组织众多的商品&#xff0c;方便顾客找到所需要的商品。提供客服顾客互动机制&#xff0c;提高顾客的参与度。通过设计合理的订单处理流程&#xff0c;提高顾…

第一个Flutter3项目

配置flutter国内源 首先&#xff0c;配置flutter的国内源&#xff1a; env:PUB_HOSTED_URL"https://pub.flutter-io.cn"; env:FLUTTER_STORAGE_BASE_URL"https://storage.flutter-io.cn"配置gradle国内源 修改gradle\wrapper\gradle-wrapper.properties…

使用Python操作Jenkins

大家好&#xff0c;Python作为一种简洁、灵活且功能丰富的编程语言&#xff0c;可以与各种API轻松集成&#xff0c;Jenkins的API也不例外。借助于Python中的python-jenkins模块&#xff0c;我们可以轻松地编写脚本来连接到Jenkins服务器&#xff0c;并执行各种操作&#xff0c;…

MySQL触发器实战:自动执行的秘密

欢迎来到我的博客&#xff0c;代码的世界里&#xff0c;每一行都是一个故事 &#x1f38f;&#xff1a;你只管努力&#xff0c;剩下的交给时间 &#x1f3e0; &#xff1a;小破站 MySQL触发器实战&#xff1a;自动执行的秘密 前言触发器的定义和作用触发器的定义和作用触发器的…

PostgreSQL发展史

PostgreSQL是一个开源的对象-关系型数据库管理系统&#xff08;ORDBMS&#xff09;&#xff0c;其历史可以追溯到上世纪80年代。以下是对PostgreSQL发展史的深入解析&#xff1a; 1980年代&#xff1a;起源 1.Ingres 项目 1977年&#xff0c;Michael Stonebraker 和他的团队…