php rabbitMq异常消息重新入队或垃圾处理
主要解决一些异常消息导致队列堵塞,正常消息无法被进行消费。
下面为PHP实例代码:
/**
* @param $queueName
* @return mixed
* @author: pengjch 2019-10-10 16:45
*/
public function rabbitMqMsgProcess($queueName){
/**@var $rabbitMq RabbitMq**/
$rabbitMq = Yii::app()->rabbitmq->getInstance();
$exchangeName = $queueName."_exchange";
$routingKey = $queueName."_routingKey";
$rabbitMq->setExchange($exchangeName, AMQP_EX_TYPE_TOPIC);
$rabbitMq->setQueue( $queueName, $routingKey);
return $rabbitMq->run( array( $this, "rabbitMqMsgHandle" ), false );
}
/**
* @param $envelope
* @param $queue AMQPQueue
* @return bool
* @throws MongoCursorException
* @throws MongoCursorTimeoutException
* @throws MongoException
* @author: pengjch 2019-10-10 14:42
*/
public function rabbitMqMsgHandle( $envelope, $queue ){
$json = $envelope->getBody();
$data = CJSON::decode($json);
if($data['className']){
$class = new $data['className'];
}else{
$class = $this;
}
$method = $data['method'];
if(!method_exists($class,$method)){
echo '未定义'.$data['method'].'方法!'."\n";
$queue->ack($envelope->getDeliveryTag()); //未定义的方法直接抛弃
}else{
if($class->$method($data)){
//$queue->ack()。当工作者(worker)完成了任务,就发送一个响应。
//当工作者(worker)挂掉这后,所有没有响应的消息都会重新发送。
$queue->ack($envelope->getDeliveryTag());
}else{
$cacheKey = md5(json_encode($data));
$retryTime = CommonUtils::getCache($cacheKey)?:0;
if($retryTime < WK::RABBIT_MQ_QUEUE_RETRY_TIME){ //防止mq队列死循环崩溃 重新入列10次后抛弃此次任务
$retryTime += 1;
echo '重新进入列队'.$retryTime.'次!'."\n";
CommonUtils::setCache($cacheKey,$retryTime,3600);
$queue->nack($envelope->getDeliveryTag(),AMQP_REQUEUE);
}else{
echo "放弃当前数据进入Unacked,处理下一条消息"."\n";
$queue->ack($envelope->getDeliveryTag()); //不能直接响应,响应则视为垃圾处理(垃圾处理)
}
}
}
}