记一次hyperf框架封装swoole自定义进程

news/2024/5/16 8:20:09/文章来源:https://blog.csdn.net/t_fengyun/article/details/133071853

背景

公司准备引入swoole和rabbitmq来处理公司业务。因此,我引入hyperf框架,想用swoole的多进程来实现。

自定义启动服务封装

<?php
/*** 进程启动服务【manager】*/
declare(strict_types=1);namespace App\Command;use Swoole;
use Swoole\Process;
use Swoole\Process\Pool;
use App\Process\BaseProcess;
use Hyperf\Command\Command as HyperfCommand;
use Psr\Container\ContainerInterface;
use Symfony\Component\Console\Input\InputArgument;
use Symfony\Component\Console\Input\InputOption;/*** @Command*/
#[Command]
class TaskProcessCommand extends HyperfCommand
{const MANAGER_PROCESS_PID_PATH = BASE_PATH . '/runtime/taskProcess.pid';/*** @var ContainerInterface*/protected $container;protected $coroutine = false;public function __construct(ContainerInterface $container){$this->container = $container;parent::__construct('task');}public function configure(){parent::configure();$this->setDescription('自定义进程任务');$this->addOption('daemonize', 'd', InputOption::VALUE_NONE, '守护进程化');$this->addArgument('action', InputArgument::REQUIRED, 'start/stop/restart 启动/关闭/重启');}public function handle(){$action = $this->input->getArgument('action');if ($action === 'start') {$this->start();} elseif ($action === 'stop') {$this->stop();} elseif ($action === 'restart') {$this->restart();} else {echo "不支持的action, 请输入 -h 参数查看" . PHP_EOL;}}/*** 重启:php bin/hyperf.php task restart*/protected function restart(){$this->stop();$this->start();}/*** 停止:php bin/hyperf.php task stop*/protected function stop(){if (file_exists(self::MANAGER_PROCESS_PID_PATH)) {//后期可以写入数据表,根据状态进行重启$managerPid = file_get_contents(self::MANAGER_PROCESS_PID_PATH);echo "stopping...\n";echo "kill pid $managerPid \n";$managerPid = intval($managerPid);$startTime = time();$timeout = config('server.settings.max_wait_time', 10);@Process::kill($managerPid);//等待主进程结束while (@Process::kill($managerPid, 0)) {//waiting process stopecho "waiting...\r";usleep(100000);echo "              \r";echo "waiting.\r";usleep(100000);echo "              \r";//超时 强杀所有子进程if ($managerPid > 0 && time() - $startTime >= $timeout) {echo "wait timeout, kill -9 child process, pid: $managerPid \n";echo shell_exec("ps -ef|awk '$3~/^{$managerPid}$/'") . PHP_EOL;echo shell_exec("ps -ef|awk '$3~/^{$managerPid}$/ {print $2}'|xargs kill -9") . PHP_EOL;}}unlink(self::MANAGER_PROCESS_PID_PATH);echo "stopped. \n";} else {echo "找不到manager pid, path: " . self::MANAGER_PROCESS_PID_PATH;}}/*** 启动:php bin/hyperf.php task start* 守护进程启动:php bin/hyperf.php task start -d*/protected function start(){$processConfig = config('processes');if ($processConfig) {echo "start now.\n";$daemonize = $this->input->getOption('daemonize');if ($daemonize) {//重定向标准输出到指定日志文件fclose(STDOUT);fclose(STDERR);$STDOUT = fopen(BASE_PATH . '/runtime/logs/taskProcess_output.log', 'ab');$STDERR = fopen(BASE_PATH . '/runtime/logs/taskProcess_error.log', 'ab');Process::daemon(true, true);}//save pidfile_put_contents(self::MANAGER_PROCESS_PID_PATH, getmypid());//TODO 后期可以根据需要写入配置或者数据表,开启多个主进程、挂载多个子进程BaseProcess::setProcessName('manager');//主进程$startFuncMap = [];foreach ($processConfig as $processClass) {$processObj = new $processClass;if ($processObj->isEnable && ($processObj instanceof BaseProcess) && isset($processObj->nums) && $processObj->nums > 0) {for ($i = 0; $i < $processObj->nums; $i++) {$startFuncMap[] = [[$processObj, 'handle'],$processObj->enableCoroutine ?? false,$i,];}}}$pool = new Pool(count($startFuncMap), SWOOLE_IPC_UNIXSOCK, 0, false);$pool->on('workerStart', function (Pool $pool, int $workerId) use ($startFuncMap) {[$func, $enableCoroutine, $idx] = $startFuncMap[$workerId];if ($enableCoroutine) {run(function () use ($func, $pool, $workerId, $idx) {$pm = $func[0];//process下的类$idx += 1;BaseProcess::setProcessName($pm->name . "[{$idx}/{$pm->nums}]");//多个子进程call_user_func($func, $pool, $workerId);});} else {$func($pool, $workerId);//baseProcess下的handle}});$pool->on('Message', function (Swoole\Process\Pool $pool, string $data) {echo 'process Message,data=' .json_encode($data). PHP_EOL;});//进程关闭$pool->on("WorkerStop", function (Swoole\Process\Pool $pool, int $workerId) {echo "process WorkerId={$workerId} is stopped". PHP_EOL;});$pool->start();} else {printf("没有可启动的自定义进程, 请在配置task_process中声明,且继承%s\n", BaseProcess::class);}}/*** 查看运行状态:php bin/hyperf.php task status*/protected function status(){//TODO 查看任务执行状态}public function getProcess($pid = -1){if ($pid === -1) {$pid = getmypid();}return static::$process[$pid] ?? null;}public function getAllProcess(){return static::$process;}
}

基础process封装

此处可以用hyperf框架自带的,也可以自己封装

<?phpdeclare (strict_types = 1);namespace App\Process;use Swoole;
use Swoole\Process\Pool;abstract class BaseProcess {/*** 进程数* @var integer*/public $nums = 0;/*** 进程名称* @var string*/public $name = '';/*** 是否启用协程* @var bool*/public $enableCoroutine = true;/*** 是否随进程启动服务* @var bool*/public $isEnable = true;protected $isRunning = true;protected $process;static $signal = 0;function __construct() {//进程自动命名if (empty($this->name)) {$this->name = trim(str_replace('\\', '.', str_replace(__NAMESPACE__, '', get_called_class())), '.');}}final public function handle(Pool $pool, int $workerId): void {try {$this->processInit($pool->getProcess());$this->beforeRun();while (true) {//进程结束信号if (BaseProcess::$signal === SIGTERM) {$this->onProcessExit();break;}$this->run();}} catch (\Throwable $e) {throw $e;}}protected function onProcessExit() {$this->isRunning = false;}protected function processInit($process) {$this->process = $process;echo "process {$this->name} start, pid: " . getmypid().PHP_EOL;//注册信号处理器,实现优雅重启(等待任务执行完后或者等待超时)pcntl_signal(SIGTERM, function () {BaseProcess::$signal = SIGTERM;$maxWaitTime = config('server.settings.max_wait_time', 5);$sTime = time();//检查进程任务状态Swoole\Timer::tick(500, function () use ($sTime, $maxWaitTime) {$coStat = \Swoole\Coroutine::stats();//如果主循环结束,且其它协程任务执行完,清理定时器以退出进程if (!$this->isRunning && $coStat['coroutine_num'] <= 1) {Swoole\Timer::clearAll();$this->process->exit();}//等待超时,强制结束进程elseif (time() - $sTime >= $maxWaitTime) {Swoole\Timer::clearAll();if ($this->isRunning) {$this->onProcessExit();}$this->process->exit();}});});}public static function setProcessName(string $name) {swoole_set_process_name(env('APP_NAME', 'app') . '.taskProcess.' . $name);}/*** 事件循环前调用* @return [type] [description]*/abstract function beforeRun();/*** 事件循环,注意这里不能使用死循环* @return [type] [description]*/abstract function run();}

使用demo

demo1

<?phpdeclare (strict_types = 1);namespace App\Process;/*** test*/
class TestProcess extends BaseProcess {/*** 进程数* @var integer*/public $nums = 5;public $enableCoroutine = true;/*** 不随服务启动进程* @var bool */public $isEnable = false;public function beforeRun() {//事件循环前执行,比如一些初始化工作}public function run() {//事件循环主体echo date('Y-m-d H:i:s').PHP_EOL;usleep(1000);}}

demo2

<?phpnamespace App\Process;use App\Amqp\Producer\JbtyProducer;
use App\Amqp\Producer\UpdateZeroStockProducer;
use App\Library\Jbchip\JbchipRequest;
use App\Model\HqchipGoodsModel;
use App\Model\IcbaseGoodsModel;
use App\Model\JbtyGoodsModel;
use App\Model\LcscGoodsModel;
use App\Model\OneyacGoodsModel;
use Hyperf\Amqp\Producer;
use Hyperf\Redis\Redis;
use Hyperf\Utils\ApplicationContext;class UpdateZeroStock extends BaseProcess
{const ZERO_STOCK_KEY = 'platform_zero_stock_cache_key';/*** 进程数* @var integer*/public $nums = 1;public $enableCoroutine = true;/*** 随服务启动进程* @var bool */public $isEnable=true;public function beforeRun() {//事件循环前执行,比如一些初始化工作}public function run() {//事件循环主体$this->updateZeroStock();echo date('Y-m-d H:i:s').PHP_EOL;sleep(300);}public function updateZeroStock(){// 1.全量更新$list_hq = HqchipGoodsModel::select(['id','spu','stock','manufacturer'])->where('excute_time','<',8)->limit(1000)->get();$container = ApplicationContext::getContainer();$redis = $container->get(Redis::class);$producer = ApplicationContext::getContainer()->get(Producer::class);$today = date('Y-m-d');if($list_hq){foreach ($list_hq as $item){$spu = trim($item['spu']);$zeroStockKey =  $this->getZeroStockKey($today,'hqchip',$item['manufacturer']);if($redis->exists($zeroStockKey) && !$redis->hGet($zeroStockKey,$spu)){$sendData = $item;$sendData['appKey'] = $this->appSecretKey();$sendData['platform'] = 'hqchip';$message = new UpdateZeroStockProducer($sendData);$res = $producer->produce($message);echo date('Y-m-d H:i:s') . 'rabbitmq hqchip sendMq: ' .$res . PHP_EOL;}}}}/*** 零库存缓存KEY* @param $brand* @param $sku* @return string*/private function getZeroStockKey($day,$platfrom,$brand){return self::ZERO_STOCK_KEY .":". $platfrom .":" . $day . ":" . $brand;}/*** 密钥生产* @return string*/private function appSecretKey(){$a = 'chipmall-spider&V2&' . date('Y-m-d');$appKey = base64_encode(md5($a)   .'||'. base64_encode(time() . '|' . $a));return $appKey;}
}

在配置中进程需要执行的服务

在这里插入图片描述

以守护进程方式启动服务

php bin/hyperf.php task start -d

在这里插入图片描述
查看进程命令

ps -ef|grep taskProcess

在这里插入图片描述

疑惑

这次封装还存在两个点需要完善!!!
1.重复执行:

php bin/hyperf.php task start -d

会启动多个manager进程

2、没有封装查看进程状态的status方法

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.luyixian.cn/news_show_173537.aspx

如若内容造成侵权/违法违规/事实不符,请联系dt猫网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

StarRocks 社区:从初生到两周年的进化之路

2021 年 9 月 8 日&#xff0c;StarRocks 开源社区诞生。从第一天开始&#xff0c;我们怀揣着“打造世界一流的数据分析产品”的梦想&#xff0c;踏上了星辰大海的征途。 两年间&#xff0c;StarRocks 在 GitHub 上收获了 5.4K Stars&#xff0c;产品共迭代发布了 90 余个版本&…

Java下打印九九乘法表

这个算法是基于打直角三角型演变而来&#xff0c;代码如下&#xff1a; public class MyWork {public static void main(String[] args) {for (int i 1; i < 10; i) {for (int j 1; j < i; j) {System.out.print(j "x" i "" i*j "\t&qu…

以太网媒体接口MII/RMII/SMII/GMII/RGMII/SGMII

以太网媒体接口MII/RMII/SMII/GMII/RGMII/SGMII GMAC系统框架&#xff08;EMAC是百兆mac&#xff0c; GMAC是千兆mac&#xff09;网卡网卡系统框架结构 PHY&#xff08;Physical Layer&#xff0c;物理层&#xff09;MAC(Media Access Control、媒体访问控制器)以太网结构大框架…

HTTPS 证书生成脚本详细讲解

前言 HTTPS证书的作用是用于保障网站的安全性。在HTTPS协议中&#xff0c;通过使用证书来实现客户端与服务器之间的认证和数据加密&#xff0c;防止中间人攻击、信息泄漏等安全问题的发生。https证书也就是SSL证书&#xff0c;我们首先要确定好需要 https 安全连接的域名&…

Springboot整合jdbc和Mybatis

目录 整合jdbc 1. 新建项目 2. 编写yaml配置文件连接数据库 3. 测试类 使用原生的jdbcTemplate进行访问测试 使用Druid连接池 1. 添加类型 2. 初始化连接池 3. 编写config类 配置Druid数据源监视 整合Mybatis 1. 导入依赖 2. 编写mapper接口 3. 编写实体类 4. 编…

【LeetCode热题100】--560.和为K的子数组

560.和为K的子数组 示例2的结果&#xff1a; 输入&#xff1a;nums [1,2,3] ,k3的时候 连续子数组有[1,2],[3]&#xff0c;一共有2个 利用枚举法&#xff1a; 枚举[0,…i]里所有的下标j来判断是否符合条件 class Solution {public int subarraySum(int[] nums, int k) {i…

HiEV独家 | 接棒余承东,华为光产品线总裁靳玉志出任车BU CEO

作者 | 德新 编辑 | 王博 HiEV从多个信息源获悉&#xff0c;华为光产品线总裁靳玉志已于近期接任智能汽车解决方案BU CEO一职&#xff0c;而余承东担任智能汽车解决方案BU&#xff08;以下简称「车BU」&#xff09;董事长一职。 华为光产品线又称华为光传输与接入产品线&#…

极光笔记 | 大语言模型插件

在人工智能领域&#xff0c;大语言模型&#xff08;LLMs&#xff09;是根据预训练数据集进行”学习“&#xff0c;获取可以拟合结果的参数&#xff0c;虽然随着参数的增加&#xff0c;模型的功能也会随之增强。但无论专业领域的小模型&#xff0c;还是当下最火、效果最好的大模…

rtsp转webrtc的其他几个项目

1&#xff09; mpromonet/webrtc-streamer &#xff08;c开发&#xff09; 把rtsp转webrtc&#xff0c; 通过 load urls from JSON config file ./webrtc-streamer -C config.json 通过exe文件和docker项目实际测试可以显示&#xff0c;但不太稳定加载慢,有时候出错后很难…

Unity制作曲线进度条

unity制作曲线进度条 大家好&#xff0c;我是阿赵。   在使用Unity引擎做进度条的时候&#xff0c;有时会遇到一个问题&#xff0c;如果进度条不是简单的横向、纵向或者圆形&#xff0c;而是任意的不规则形状&#xff0c;那该怎么办呢&#xff1f;比如这样的&#xff1a; 一…

基于Xml方法的Bean的配置-实例化Bean的方法-构造方法

SpringBean的配置详解 Bean的实例化配置 Spring的实例化方法主要由以下两种 构造方法实例化 底层通过构造方法对bean进行实例化 构造方法实例化bean又分为无参方法实例化和有参方法实例化&#xff0c;在Spring中配置的<bean>几乎都是无参构造该方式&#xff0c;默认是无…

DevSecOps内置安全保护

前言 随着DevOps的发展&#xff0c;DevOps大幅提升了企业应用迭代的速度。但同时&#xff0c;安全如果不能跟上步伐&#xff0c;不仅会抵消DevOps变革带来的提升&#xff0c;拖慢企业数字化转型进程&#xff0c;还会导致漏洞与风险不约而至。所以安全能力在全球范围内受到的重…

配置HBase和zookeeper

一、上传文件 二、解压 tar -zxf ./zookeeper-3.4.5-cdh5.14.2.tar.gz -C /opt/soft/ tar -zxf ./hbase-2.3.5-bin.tar.gz -C ../soft/ 三、改名字 mv ./zookeeper-3.4.5-cdh5.14.2/ zk345 mv ./hbase-2.3.5/ hbase235 四、配置映射 vim /etc/profile#ZK export ZOOKEEPE…

1999-2018年地级市一般公共预算收入、支出(教育事业费、科技支出)

1999-2018年地级市一般公共预算收入、支出&#xff08;教育事业费、科技支出&#xff09; 1、时间&#xff1a;1999-2018年 2、来源&#xff1a;城市年鉴 3、指标&#xff1a;行政区划代码、城市、年份、地方一般公共预算收入_市辖区_万元、地方一般公共预算支出_市辖区_万元…

山石网科国产化防火墙,打造全方位边界安全解决方案

互联网的快速发展促进了各行各业的信息化建设&#xff0c;但也随之带来了诸多网络安全风险。大部分组织机构采用统一互联网接入方案&#xff0c;互联网出口承担着内部用户访问互联网的统一出口和对外信息服务的入口&#xff0c;因此在该区域部署相匹配的安全防护手段必不可少。…

iOS加固保护技术:保护你的iOS应用免受恶意篡改

目录 转载&#xff1a;开始使用ipaguard 前言 下载ipa代码混淆保护工具 获取ipaguard登录码 代码混淆 文件混淆 IPA重签名与安装测试 转载&#xff1a;开始使用ipaguard 前言 iOS加固保护是直接针对ios ipa二进制文件的保护技术&#xff0c;可以对iOS APP中的可执行文件…

pycharm中恢复原始界面布局_常用快捷键_常用设置

文章目录 1 恢复默认布局1 .1直接点击file→Manage IDE Settings→Restore Default Settings&#xff08;如下图所示&#xff09;&#xff1a;1.2 直接点击Restore and Restart&#xff0c; 然后Pycharm就会自动重启&#xff0c;重启之后的界面就是最原始的界面了 2 改变主题2.…

在服务器上创建git仓库

1、在服务器上创建git仓库 选择一个创建文件夹的地方&#xff0c;这个地方不会将源码存放在这里&#xff0c;只用于版本控制 # 创建一个专门放置git的文件夹&#xff0c;也可以叫其它名 mkdir git && cd git # 创建自己项目的文件夹&#xff0c;文件夹后面要带 .git…

传统的经典问题 Java 的 Interface 是干什么的

传统的经典问题 Java 的 Interface 是干什么 解答 上面的这个问题应该还是比较好回答的吧。 只要你做过 Java &#xff0c;通常 Interface 的问题多多少少会遇到&#xff0c;而且可能会遇到一大堆。 在JAVA编程语言中是一个抽象类型&#xff08;Abstract Type&#xff09;&…

基于Android+OpenCV+CNN+Keras的智能手语数字实时翻译——深度学习算法应用(含Python、ipynb工程源码)+数据集(一)

目录 前言总体设计系统整体结构图系统流程图 运行环境Python环境TensorFlow环境Keras环境Android环境1. 安装AndroidStudio2. 导入TensorFlow的jar包和so库3. 导入OpenCV库 相关其它博客工程源代码下载其它资料下载 前言 本项目依赖于Keras深度学习模型&#xff0c;旨在对手语…