PHP 队列的实现

队列,很简单的一个东西,但往往就是有那么多的麻烦。
  比如PHP发送邮件的时候,如果在用户注册,你是注册的时候发送邮件呢,还是注册成功之后发送呢,很显然,大多数时候都是在注册完成之后发送邮件,除非特殊情况,但是怎么让注册之后直接返回结果而不管是否发送了邮件呢。
  这里就需要这样一个东西,单独处理一个队列,一般情况有两种方式来实现,定时执行网页,还有就是使用PHP的cli模式。

  首先讨论队列的实现。使用数据库,这点很重要。比如这里我建了这样一个表。

CREATE  TABLE IF NOT EXISTS `pitus`.`queue` (
  `id` BIGINT NOT NULL AUTO_INCREMENT COMMENT ‘队列唯一ID‘ ,
  `time` DATETIME NOT NULL COMMENT ‘队列创建时间‘ ,
  `up_time` DATETIME NOT NULL COMMENT ‘队列更新时间‘ ,
  `status` INT NOT NULL DEFAULT 0 COMMENT ‘该队列的状态‘ ,
  `callback` LONGTEXT NOT NULL COMMENT ‘队列的回调函数‘ ,
  `param` LONGTEXT NOT NULL COMMENT ‘回调函数接受的参数‘ ,
  `library` LONGTEXT NOT NULL COMMENT ‘回调函数需要的类库‘ ,
  `message` VARCHAR(1024) NULL COMMENT ‘队列执行信息‘ ,
  PRIMARY KEY (`id`) )
ENGINE = InnoDB

  队列如何来处理呢,简单的可以分为几个方法,还是直接贴代码比较现实,分为一个队列的处理类,和一个接口。每个回调对象必须实现此接口,否则无法调用。

 

/**
 * 队列处理
 * Class Queue
 * @package ULib
 */
class Queue{
    /**
     * 开始执行队列
     */
    public function run(){
        if(substr(php_sapi_name(), 0, 3) === "cli"){
            //命令行模式循环等待执行
            while(true){
                $this->run_list($this->get_list());
                sleep(5);
            }
        } else{
            $this->run_list($this->get_list());
        }
    }

    /**
     * 执行指定的队列列表
     * @param array $list
     */
    public function run_list($list){
        for($i = 0, $l = count($list); $i < $l; $i++){
            $v = $list[$i];
            $status = intval($v[‘status‘]);
            $message = NULL;
            try{
                //执行回调函数,如果没有返回异常则为成功执行
                $this->exec($v[‘callback‘], $v[‘param‘], $v[‘library‘]);
                $status = 1;
            } catch(\Exception $ex){
                --$status;
                $message = $ex->getMessage();
                echo "ERROR:", $message;
            }
            //更新队列信息
            db()->update("queue", [
                ‘status‘ => $status,
                ‘up_time‘ => date("Y-m-d H:i:s"),
                ‘message‘ => $message
            ], [‘id‘ => $v[‘id‘]]);
            $list[$i] = NULL;
        }
   
 
}     /**      * 获取队列      * @return array      */     private function get_list(){         return db()->select("queue", [             ‘id‘,             ‘callback‘,             ‘param‘,             ‘library‘,             ‘status‘         ], [             ‘status[<]‘ => 1,             ‘ORDER‘ => ‘up_time DESC‘         ]);     }     /**      * @param QueueCallback $call  回调类      * @param mixed         $param 参数      * @param string        $lib   Lib名称      */     public function add($call, $param, $lib){         $time = date("Y-m-d H:i:s");         //新将对应的数据序列化后存储到数据库中         if(db()->insert("queue", [                 ‘time‘ => $time,                 ‘up_time‘ => $time,                 ‘callback‘ => serialize($call),                 ‘param‘ => serialize($param),                 ‘library‘ => serialize($lib)             ]) < 0         ){             //添加错误记录             $this->record_error("Add queue error on sql." . debug("SQL error:" . implode(",", db()->error()[‘write‘])));         }     }     /**      * 记录错误信息      * @param $err      */     private function record_error($err){     }     /**      * 执行回调      * @param string $callback      * @param string $param      * @param string $library      * @throws \Exception      */     public function exec($callback, $param, $library){         $lib = @unserialize($library);         //首先加载反序列化所需的类库         if(isset($lib[‘lib‘]) && is_array($lib[‘lib‘])){             call_user_func_array([                 lib(),                 ‘load‘             ], $lib[‘lib‘]);         }         if(isset($lib[‘c_lib‘]) && is_array($lib[‘c_lib‘])){             call_user_func_array([                 c_lib(),                 ‘load‘             ], $lib[‘c_lib‘]);         }         /**          * 对回调函数反序列化          * @var QueueCallback $call          */         $call = @unserialize($callback);         if(!is_object($call)){             //初步判断是否为对象             throw new \Exception("unserialize error");         }         $ref = new \ReflectionClass($call);         if(!in_array("ULib\\QueueCallback", $ref->getInterfaceNames())){             //检测是否为正确的实现了接口             throw new \Exception("callback class error.");         }         //最后执行,并使用对应的参数         @$call->run(@unserialize($param));     } } /**  * 队列的接口  * Interface QueueCallback  * @package ULib  */ interface QueueCallback{     /**      * 执行回调函数      * @param $param      * @return mixed      */     public function run($param); }

这样使用回调对象的一个好处就是可以使队列处理的内容扩大,而不仅仅限于邮件的处理,还比如一些其他耗时的操作,当然这里也可以更改为多线程处理队列,如果你需要的话。   最后就是如何实现队列的处理,必须有一个前提就是,同一个队列不能同时有多个线程去处理。这里需要用到一个其他的东西,文件锁,这个实现起来相对容易,而已跨平台性好。如果使用信号那么windows下就不行,代码如下。某些时候一个比较特殊的操作可能也遇得到吧。

 

<?php
$lock_file = __DIR__ . "/config/queue.lock";
$fp = fopen($lock_file, ‘w‘);//写模式打开,文件不存在直接创建
if(!flock($fp, LOCK_EX | LOCK_NB)){
    //如果当前文件无法锁定,表示被其他进程锁定,所以结束执行
    //LOCK_EX为独享锁,LOCK_NB为非阻塞
    fclose($fp);
    die("Queue must be a single run.\n");
} else{
    echo "LOCK\n";
}
set_time_limit(0);
require_once("sys/config.php");
cfg()->load(‘config/all.php‘); //加载其他配置文件
lib()->load(‘Queue‘, ‘Hook‘);
$hook = new \ULib\Hook();
if(db()->status()){
    $queue = new \ULib\Queue();
    $queue->run();
} else{
    echo("Cannot connect to the database.");
}
flock($fp, LOCK_UN);
fclose($fp);
?>

PHP 队列的实现,古老的榕树,5-wow.com

郑重声明:本站内容如果来自互联网及其他传播媒体,其版权均属原媒体及文章作者所有。转载目的在于传递更多信息及用于网络分享,并不代表本站赞同其观点和对其真实性负责,也不构成任何其他建议。