加入收藏 | 设为首页 | 会员中心 | 我要投稿 阳江站长网 (https://www.0662zz.cn/)- 办公协同、云通信、区块链、物联平台、高性能计算!
当前位置: 首页 > 站长学院 > PHP教程 > 正文

PHP 框架 Hyperf 实现处理超时未支付订单和延时队列

发布时间:2022-06-30 09:39:35 所属栏目:PHP教程 来源:互联网
导读:延时队列 Delayproducer.Php Amqpbuilder.Php AmqpBuilder.php ?php declare(strict_types = 1); namespace AppComponentsAmqp; use HyperfAmqpBuilderBuilder; use HyperfAmqpBuilderQueueBuilder; class AmqpBuilder extends QueueBuilder { /** *
  延时队列
 
  Delayproducer.Php
 
  Amqpbuilder.Php
 
  AmqpBuilder.php
 
  <?php
   
  declare(strict_types = 1);
   
  namespace AppComponentsAmqp;
   
  use HyperfAmqpBuilderBuilder;
   
  use HyperfAmqpBuilderQueueBuilder;
   
  class AmqpBuilder extends QueueBuilder
   
  {
   
      /**
   
       * @param array|PhpAmqpLibWireAMQPTable $arguments
   
       *
   
       * @return HyperfAmqpBuilderBuilder
   
       */
   
      public function setArguments($arguments) : Builder
   
      {
   
          $this->arguments = array_merge($this->arguments, $arguments);
   
          return $this;
   
      }
   
      /**
   
       * 设置延时队列相关参数
   
       *
   
       * @param string $queueName
   
       * @param int    $xMessageTtl
   
       * @param string $xDeadLetterExchange
   
       * @param string $xDeadLetterRoutingKey
   
       *
   
       * @return $this
   
       */
   
      public function setDelayedQueue(string $queueName, int $xMessageTtl, string $xDeadLetterExchange, string $xDeadLetterRoutingKey) : self
   
      {
   
          $this->setArguments([
   
              'x-message-ttl'             => ['I', $xMessageTtl * 1000], // 毫秒
   
              'x-dead-letter-exchange'    => ['S', $xDeadLetterExchange],
   
              'x-dead-letter-routing-key' => ['S', $xDeadLetterRoutingKey],
   
          ]);
   
          $this->setQueue($queueName);
   
          return $this;
   
      }
   
  }
  DelayProducer.php
 
  <?php
   
  declare(strict_types = 1);
   
  namespace AppComponentsAmqp;
   
  use HyperfAmqpAnnotationProducer;
   
  use HyperfAmqpBuilder;
   
  use HyperfAmqpMessageProducerMessageInterface;
   
  use HyperfDiAnnotationAnnotationCollector;
   
  use PhpAmqpLibMessageAMQPMessage;
   
  use Throwable;
   
  class DelayProducer extends Builder
   
  {
   
      /**
   
       * @param ProducerMessageInterface $producerMessage
   
       * @param AmqpBuilder              $queueBuilder
   
       * @param bool                     $confirm
   
       * @param int                      $timeout
   
       *
   
       * @return bool
   
       * @throws Throwable
   
       */
   
      public function produce(ProducerMessageInterface $producerMessage, AmqpBuilder $queueBuilder, bool $confirm = false, int $timeout = 5) : bool
   
      {
   
          return retry(1, function () use ($producerMessage, $queueBuilder, $confirm, $timeout)
   
          {
   
              return $this->produceMessage($producerMessage, $queueBuilder, $confirm, $timeout);
   
          });
   
      }
   
      /**
   
       * @param ProducerMessageInterface $producerMessage
   
       * @param AmqpBuilder              $queueBuilder
   
       * @param bool                     $confirm
   
       * @param int                      $timeout
   
       *
   
       * @return bool
   
       * @throws Throwable
   
       */
   
      private function produceMessage(ProducerMessageInterface $producerMessage, AmqpBuilder $queueBuilder, bool $confirm = false, int $timeout = 5) : bool
   
      {
   
          $result = false;
   
          $this->injectMessageProperty($producerMessage);
   
          $message = new AMQPMessage($producerMessage->payload(), $producerMessage->getProperties());
   
          $pool    = $this->getConnectionPool($producerMessage->getPoolName());
   
          /** @var HyperfAmqpConnection $connection */
   
          $connection = $pool->get();
   
          if ($confirm) {
   
              $channel = $connection->getConfirmChannel();
   
          } else {
   
              $channel = $connection->getChannel();
   
          }
   
          $channel->set_ack_handler(function () use (&$result)
   
          {
   
              $result = true;
   
          });
   
          try {
   
              // 处理延时队列
   
              $exchangeBuilder = $producerMessage->getExchangeBuilder();
   
              // 队列定义
   
              $channel->queue_declare($queueBuilder->getQueue(), $queueBuilder->isPassive(), $queueBuilder->isDurable(), $queueBuilder->isExclusive(), $queueBuilder->isAutoDelete(), $queueBuilder->isNowait(), $queueBuilder->getArguments(), $queueBuilder->getTicket());
   
              // 路由定义
   
              $channel->exchange_declare($exchangeBuilder->getExchange(), $exchangeBuilder->getType(), $exchangeBuilder->isPassive(), $exchangeBuilder->isDurable(), $exchangeBuilder->isAutoDelete(), $exchangeBuilder->isInternal(), $exchangeBuilder->isNowait(), $exchangeBuilder->getArguments(), $exchangeBuilder->getTicket());
   
              // 队列绑定
   
              $channel->queue_bind($queueBuilder->getQueue(), $producerMessage->getExchange(), $producerMessage->getRoutingKey());
   
              // 消息发送
   
              $channel->basic_publish($message, $producerMessage->getExchange(), $producerMessage->getRoutingKey());
   
              $channel->wait_for_pending_acks_returns($timeout);
   
          } catch (Throwable $exception) {
   
              // Reconnect the connection before release.
   
              $connection->reconnect();
   
              throw $exception;
   
          }
   
          finally {
   
              $connection->release();
   
          }
   
          return $confirm ? $result : true;
   
      }
   
      /**
   
       * @param ProducerMessageInterface $producerMessage
   
       */
   
      private function injectMessageProperty(ProducerMessageInterface $producerMessage) : void
   
      {
   
          if (class_exists(AnnotationCollector::class)) {
   
              /** @var HyperfAmqpAnnotationProducer $annotation */
   
              $annotation = AnnotationCollector::getClassAnnotation(get_class($producerMessage), Producer::class);
   
              if ($annotation) {
   
                  $annotation->routingKey && $producerMessage->setRoutingKey($annotation->routingKey);
   
                  $annotation->exchange && $producerMessage->setExchange($annotation->exchange);
   
              }
   
          }
   
      }
   
  }
  处理超时订单
 
  Orderqueueconsumer.Php
 
  Orderqueueproducer.Php
 
  Orderqueueproducer.php
 
  <?php
   
  declare(strict_types = 1);
   
  namespace AppAmqpProducer;
   
  use HyperfAmqpAnnotationProducer;
   
  use HyperfAmqpBuilderExchangeBuilder;
   
  use HyperfAmqpMessageProducerMessage;
   
  /**
   
   * @Producer(exchange="order_exchange", routingKey="order_exchange")
   
   */
   
  class OrderQueueProducer extends ProducerMessage
   
  {
   
      public function __construct($data)
   
      {
   
          $this->payload = $data;
   
      }
   
      public function getExchangeBuilder() : ExchangeBuilder
   
      {
   
          return parent::getExchangeBuilder(); // TODO: Change the autogenerated stub
   
      }
   
  }
  Orderqueueconsumer.php
 
  <?php
   
  declare(strict_types = 1);
   
  namespace AppAmqpConsumer;
   
  use AppServiceCityTransportOrderService;
   
  use HyperfAmqpResult;
   
  use HyperfAmqpAnnotationConsumer;
   
  use HyperfAmqpMessageConsumerMessage;
   
  /**
   
   * @Consumer(exchange="delay_exchange", routingKey="delay_route", queue="delay_queue", name ="OrderQueueConsumer", nums=1)
   
   */
   
  class OrderQueueConsumer extends ConsumerMessage
   
  {
   
      public function consume($data) : string
   
      {
   
         ##业务处理
   
      }
   
      public function isEnable() : bool
   
      {
   
          return true;
   
      }
   
  }
  Demo
 
  $builder = new AmqpBuilder();
   
          $builder->setDelayedQueue('order_exchange', 1, 'delay_exchange', 'delay_route');
   
          $que = ApplicationContext::getContainer()->get(DelayProducer::class);
   
          var_dump($que->produce(new OrderQueueProducer(['order_sn' => (string)mt_rand(10000, 90000)]), $builder))

(编辑:阳江站长网)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    热点阅读