123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215 |
- <?php
- /*
- * This file is a part of the DiscordPHP-Http project.
- *
- * Copyright (c) 2021-present David Cole <david.cole1340@gmail.com>
- *
- * This file is subject to the MIT license that is bundled
- * with this source code in the LICENSE file.
- */
- namespace Discord\Http;
- use Psr\Http\Message\ResponseInterface;
- use Psr\Log\LoggerInterface;
- use React\EventLoop\LoopInterface;
- use React\EventLoop\TimerInterface;
- use SplQueue;
- /**
- * Represents a rate-limit bucket.
- *
- * @author David Cole <david.cole1340@gmail.com>
- */
- class Bucket
- {
- /**
- * Request queue.
- *
- * @var SplQueue
- */
- protected $queue;
- /**
- * Bucket name.
- *
- * @var string
- */
- protected $name;
- /**
- * ReactPHP event loop.
- *
- * @var LoopInterface
- */
- protected $loop;
- /**
- * HTTP logger.
- *
- * @var LoggerInterface
- */
- protected $logger;
- /**
- * Callback for when a request is ready.
- *
- * @var callable
- */
- protected $runRequest;
- /**
- * Whether we are checking the queue.
- *
- * @var bool
- */
- protected $checkerRunning = false;
- /**
- * Number of requests allowed before reset.
- *
- * @var int
- */
- protected $requestLimit;
- /**
- * Number of remaining requests before reset.
- *
- * @var int
- */
- protected $requestRemaining;
- /**
- * Timer to reset the bucket.
- *
- * @var TimerInterface
- */
- protected $resetTimer;
- /**
- * Bucket constructor.
- *
- * @param string $name
- * @param callable $runRequest
- */
- public function __construct(string $name, LoopInterface $loop, LoggerInterface $logger, callable $runRequest)
- {
- $this->queue = new SplQueue;
- $this->name = $name;
- $this->loop = $loop;
- $this->logger = $logger;
- $this->runRequest = $runRequest;
- }
- /**
- * Enqueue a request.
- *
- * @param Request $request
- */
- public function enqueue(Request $request)
- {
- $this->queue->enqueue($request);
- $this->logger->debug($this.' queued '.$request);
- $this->checkQueue();
- }
- /**
- * Checks for requests in the bucket.
- */
- public function checkQueue()
- {
- // We are already checking the queue.
- if ($this->checkerRunning) {
- return;
- }
- $checkQueue = function () use (&$checkQueue) {
- // Check for rate-limits
- if ($this->requestRemaining < 1 && ! is_null($this->requestRemaining)) {
- $interval = 0;
- if ($this->resetTimer) {
- $interval = $this->resetTimer->getInterval() ?? 0;
- }
- $this->logger->info($this.' expecting rate limit, timer interval '.($interval * 1000).' ms');
- $this->checkerRunning = false;
- return;
- }
- // Queue is empty, job done.
- if ($this->queue->isEmpty()) {
- $this->checkerRunning = false;
- return;
- }
- /** @var Request */
- $request = $this->queue->dequeue();
- ($this->runRequest)($request)->done(function (ResponseInterface $response) use (&$checkQueue) {
- $resetAfter = (float) $response->getHeaderLine('X-Ratelimit-Reset-After');
- $limit = $response->getHeaderLine('X-Ratelimit-Limit');
- $remaining = $response->getHeaderLine('X-Ratelimit-Remaining');
- if ($resetAfter) {
- $resetAfter = (float) $resetAfter;
- if ($this->resetTimer) {
- $this->loop->cancelTimer($this->resetTimer);
- }
- $this->resetTimer = $this->loop->addTimer($resetAfter, function () {
- // Reset requests remaining and check queue
- $this->requestRemaining = $this->requestLimit;
- $this->resetTimer = null;
- $this->checkQueue();
- });
- }
- // Check if rate-limit headers are present and store
- if (is_numeric($limit)) {
- $this->requestLimit = (int) $limit;
- }
- if (is_numeric($remaining)) {
- $this->requestRemaining = (int) $remaining;
- }
- // Check for more requests
- $checkQueue();
- }, function ($rateLimit) use (&$checkQueue, $request) {
- if ($rateLimit instanceof RateLimit) {
- $this->queue->enqueue($request);
- // Bucket-specific rate-limit
- // Re-queue the request and wait the retry after time
- if (! $rateLimit->isGlobal()) {
- $this->loop->addTimer($rateLimit->getRetryAfter(), $checkQueue);
- }
- // Stop the queue checker for a global rate-limit.
- // Will be restarted when global rate-limit finished.
- else {
- $this->checkerRunning = false;
- $this->logger->debug($this.' stopping queue checker');
- }
- } else {
- $checkQueue();
- }
- });
- };
- $this->checkerRunning = true;
- $checkQueue();
- }
- /**
- * Converts a bucket to a user-readable string.
- *
- * @return string
- */
- public function __toString()
- {
- return 'BUCKET '.$this->name;
- }
- }
|