主要解决一些异常消息导致队列堵塞,正常消息无法被进行消费。


下面为PHP实例代码:

/**
 * @param $queueName
 * @return mixed
 * @author: pengjch 2019-10-10 16:45
 */
public function rabbitMqMsgProcess($queueName){
    /**@var $rabbitMq RabbitMq**/
    $rabbitMq = Yii::app()->rabbitmq->getInstance();
    $exchangeName = $queueName."_exchange";
    $routingKey = $queueName."_routingKey";
    $rabbitMq->setExchange($exchangeName, AMQP_EX_TYPE_TOPIC);
    $rabbitMq->setQueue( $queueName, $routingKey);
    return $rabbitMq->run( array( $this, "rabbitMqMsgHandle" ), false );
}
/**
 * @param $envelope
 * @param $queue AMQPQueue
 * @return bool
 * @throws MongoCursorException
 * @throws MongoCursorTimeoutException
 * @throws MongoException
 * @author: pengjch 2019-10-10 14:42
 */
public function rabbitMqMsgHandle( $envelope, $queue ){
    $json = $envelope->getBody();
    $data = CJSON::decode($json);
    if($data['className']){
        $class = new $data['className'];
    }else{
        $class = $this;
    }
    $method = $data['method'];
    if(!method_exists($class,$method)){
        echo '未定义'.$data['method'].'方法!'."\n";
        $queue->ack($envelope->getDeliveryTag());   //未定义的方法直接抛弃
    }else{
        if($class->$method($data)){
            //$queue->ack()。当工作者(worker)完成了任务,就发送一个响应。
            //当工作者(worker)挂掉这后,所有没有响应的消息都会重新发送。
            $queue->ack($envelope->getDeliveryTag());
        }else{
            $cacheKey = md5(json_encode($data));
            $retryTime = CommonUtils::getCache($cacheKey)?:0;
            if($retryTime < WK::RABBIT_MQ_QUEUE_RETRY_TIME){   //防止mq队列死循环崩溃   重新入列10次后抛弃此次任务
                $retryTime += 1;
                echo '重新进入列队'.$retryTime.'次!'."\n";
                CommonUtils::setCache($cacheKey,$retryTime,3600);
                $queue->nack($envelope->getDeliveryTag(),AMQP_REQUEUE);
            }else{
                echo "放弃当前数据进入Unacked,处理下一条消息"."\n";
                $queue->ack($envelope->getDeliveryTag()); //不能直接响应,响应则视为垃圾处理(垃圾处理)
            }
        }
    }
}