php rabbitMq异常消息重新入队或垃圾处理
主要解决一些异常消息导致队列堵塞,正常消息无法被进行消费。
下面为PHP实例代码:
/** * @param $queueName * @return mixed * @author: pengjch 2019-10-10 16:45 */ public function rabbitMqMsgProcess($queueName){ /**@var $rabbitMq RabbitMq**/ $rabbitMq = Yii::app()->rabbitmq->getInstance(); $exchangeName = $queueName."_exchange"; $routingKey = $queueName."_routingKey"; $rabbitMq->setExchange($exchangeName, AMQP_EX_TYPE_TOPIC); $rabbitMq->setQueue( $queueName, $routingKey); return $rabbitMq->run( array( $this, "rabbitMqMsgHandle" ), false ); } /** * @param $envelope * @param $queue AMQPQueue * @return bool * @throws MongoCursorException * @throws MongoCursorTimeoutException * @throws MongoException * @author: pengjch 2019-10-10 14:42 */ public function rabbitMqMsgHandle( $envelope, $queue ){ $json = $envelope->getBody(); $data = CJSON::decode($json); if($data['className']){ $class = new $data['className']; }else{ $class = $this; } $method = $data['method']; if(!method_exists($class,$method)){ echo '未定义'.$data['method'].'方法!'."\n"; $queue->ack($envelope->getDeliveryTag()); //未定义的方法直接抛弃 }else{ if($class->$method($data)){ //$queue->ack()。当工作者(worker)完成了任务,就发送一个响应。 //当工作者(worker)挂掉这后,所有没有响应的消息都会重新发送。 $queue->ack($envelope->getDeliveryTag()); }else{ $cacheKey = md5(json_encode($data)); $retryTime = CommonUtils::getCache($cacheKey)?:0; if($retryTime < WK::RABBIT_MQ_QUEUE_RETRY_TIME){ //防止mq队列死循环崩溃 重新入列10次后抛弃此次任务 $retryTime += 1; echo '重新进入列队'.$retryTime.'次!'."\n"; CommonUtils::setCache($cacheKey,$retryTime,3600); $queue->nack($envelope->getDeliveryTag(),AMQP_REQUEUE); }else{ echo "放弃当前数据进入Unacked,处理下一条消息"."\n"; $queue->ack($envelope->getDeliveryTag()); //不能直接响应,响应则视为垃圾处理(垃圾处理) } } } }