优秀的编程知识分享平台

网站首页 > 技术文章 正文

php多进程通信,有名管道(pcntl学习)

nanyue 2024-08-07 18:56:46 技术文章 12 ℃

对于多进程间的通信,研究了一天的管道,封装了一个管道类,以及处理多进程的类。管道的理论,后期再补上,先上代码。

<?php
/**
* 管道封装类
*
* @author mingazi@163.com
* @link www.helloworldcoding.com
* @since 2017-04-18
*/
class fifoPipeClass
{
 /**
 * 管道资源
 */
 protected $handler;
 /**
 * 管道路径
 */
 protected $path;
 /**
 * 是否阻塞,false为非阻塞,true为阻塞
 */
 protected $block = false;
 /**
 * 创建管道
 */
 public function __construct($path = './weicool.pipe', $cover = false, $mode = 0666)
 {
 if (file_exists($path)) {
 if ($cover) {
 unlink($path);
 } else {
 $this->path = $path;
 return $this;
 }
 }
 if (posix_mkfifo($path,$mode)) {
 $this->path = $path;
 return $this;
 } else {
 $this->throwException('create pipe failed');
 }
 }
 /**
 * 抛异常方法
 */
 public function throwException($msg = 'failed')
 {
 throw new \Exception($msg);
 }
 /**
 * 设置阻塞方式
 *
 * @param bool $block false为非阻塞,true为阻塞
 */
 public function setBlock($block = false)
 {
 $this->block = $block;
 }
 /**
 * 指定pipe文件路径
 */
 public function setPath($path)
 {
 if (!file_exists($path)) {
 $msg = $path.' pipe does not exists';
 $this->throwException($msg);
 }
 $this->path = $path;
 }
 /**
 * 获取pipe文件路径
 */
 public function getPath()
 {
 return $this->path;
 }
 /**
 * 打开一个管道
 *
 * @param string $mode 打开类型
 */
 public function pipeOpen($mode = 'r')
 {
 $handler = fopen($this->path, $mode);
 if (!is_resource($handler)) {
 $msg = 'open pipe '.$this->path.' falied';
 $this->throwException($msg);
 }
 // 设置阻塞类型
 stream_set_blocking($handler, $this->block);
 $this->handler = $handler;
 return $this;
 }
 /**
 * 已读的方式打开管道
 *
 * @return resource
 */
 public function readOpen()
 {
 return $this->pipeOpen('r');
 }
 /**
 * 已写的方式打开管道
 *
 * @return resource
 */
 public function writeOpen()
 {
 return $this->pipeOpen('w');
 }
 /**
 * 读取一行,或给定的长度
 */
 public function readOne($byte = 1024)
 {
 $data = fread($this->handler,$byte);
 return $data;
 }
 /**
 * 读取所有的内容
 */
 public function readAll()
 {
 $hd = $this->handler;
 $data = '';
 while (!feof($hd)) {
 $data .= fread($hd,1024);
 }
 return $data;
 }
 /**
 * 写入数据
 */
 public function write($data)
 {
 $hd = $this->handler;
 try {
 fwrite($hd,$data);
 } catch(\Exception $e) {
 $this->throwException($e->getMessage());
 }
 return $this;
 }
 /**
 * 关闭管道
 */
 public function close()
 {
 return fclose($this->handler);
 }
 /**
 * 删除管道
 */
 public function remove()
 {
 return unlink($this->path);
 }
 /**
 * 一次性获取管道中所有的数据
 *
 */
 public function pipeGetContents()
 {
 $data = '';
 $this->readOpen();
 $handler = $this->handler;
 while( !feof($handler)) {
 $data .= fread($handler,1024);
 }
 $this->close();
 return $data;
 }
 /**
 * 一次性写入数据到管道中
 *
 * @param string $data 输入数据
 */
 public function pipePutContents($data)
 {
 return $this->writeOpen()->write($data)->close();
 }
}

多进程处理类,利用管道保存各个进程的返回结果,主进程处理最后的结果

<?php
require_once './fifoPipeClass.php';
class pipeMultiProcess
{
 protected $process = []; // 子进程
 protected $child = []; // 子进程pid数组
 protected $result = []; // 计算的结果
 public function __construct($process = [])
 {
 $this->process = $process;
 }
 /**
 * 设置子进程
 */
 public function setProcess($process)
 {
 $this->process = $process;
 }
 /**
 * fork 子进程
 */
 public function forkProcess()
 {
 $process = $this->process;
 foreach($process as $k => $item) {
 $pid = pcntl_fork();
 if ($pid == 0) {
 $pipe = new fifoPipeClass();
 $id = getmypid();
 $pipe->writeOpen();
 $pipe->write($k.' pid:'.$id.PHP_EOL);
 $pipe->close();
 exit(0);
 } else if ($pid > 0) {
 $this->child[] = $pid;
 }
 }
 return $this;
 }
 /**
 * 等待子进程结束
 */
 public function waiteProcess()
 {
 $child = $this->child;
 $pipe = new fifoPipeClass();
 $pipe->readOpen();
 echo 'get all begin'.PHP_EOL;
 while(count($child)) {
 foreach($child as $k => $pid){
 $res = pcntl_waitpid($pid,$status,WNOHANG);
 if ( -1 == $res || $res > 0 ) {
 unset($child[$k]);
 }
 }
 $data = $pipe->readOne();
 if ($data) {
 $this->result[] = $data;
 }
 }
 $pipe->close();
 echo 'get all end'.PHP_EOL;
 $pipe->remove();
 return $this;
 }
 /**
 * 获取返回结果
 */
 public function getResult()
 {
 return $this->result;
 }
}
$obj = new pipeMultiProcess();
$obj->setProcess(['name'=>1,'age'=>2,'sex'=>3]);
$res = $obj->forkProcess()->waiteProcess()->getResult();
print_r($res);

运行结果如下:

Array
(
 [0] => age pid:7436
 [1] => sex pid:7437
name pid:7435
)
最近发表
标签列表