Bucket.php 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215
  1. <?php
  2. /*
  3. * This file is a part of the DiscordPHP-Http project.
  4. *
  5. * Copyright (c) 2021-present David Cole <david.cole1340@gmail.com>
  6. *
  7. * This file is subject to the MIT license that is bundled
  8. * with this source code in the LICENSE file.
  9. */
  10. namespace Discord\Http;
  11. use Psr\Http\Message\ResponseInterface;
  12. use Psr\Log\LoggerInterface;
  13. use React\EventLoop\LoopInterface;
  14. use React\EventLoop\TimerInterface;
  15. use SplQueue;
  16. /**
  17. * Represents a rate-limit bucket.
  18. *
  19. * @author David Cole <david.cole1340@gmail.com>
  20. */
  21. class Bucket
  22. {
  23. /**
  24. * Request queue.
  25. *
  26. * @var SplQueue
  27. */
  28. protected $queue;
  29. /**
  30. * Bucket name.
  31. *
  32. * @var string
  33. */
  34. protected $name;
  35. /**
  36. * ReactPHP event loop.
  37. *
  38. * @var LoopInterface
  39. */
  40. protected $loop;
  41. /**
  42. * HTTP logger.
  43. *
  44. * @var LoggerInterface
  45. */
  46. protected $logger;
  47. /**
  48. * Callback for when a request is ready.
  49. *
  50. * @var callable
  51. */
  52. protected $runRequest;
  53. /**
  54. * Whether we are checking the queue.
  55. *
  56. * @var bool
  57. */
  58. protected $checkerRunning = false;
  59. /**
  60. * Number of requests allowed before reset.
  61. *
  62. * @var int
  63. */
  64. protected $requestLimit;
  65. /**
  66. * Number of remaining requests before reset.
  67. *
  68. * @var int
  69. */
  70. protected $requestRemaining;
  71. /**
  72. * Timer to reset the bucket.
  73. *
  74. * @var TimerInterface
  75. */
  76. protected $resetTimer;
  77. /**
  78. * Bucket constructor.
  79. *
  80. * @param string $name
  81. * @param callable $runRequest
  82. */
  83. public function __construct(string $name, LoopInterface $loop, LoggerInterface $logger, callable $runRequest)
  84. {
  85. $this->queue = new SplQueue;
  86. $this->name = $name;
  87. $this->loop = $loop;
  88. $this->logger = $logger;
  89. $this->runRequest = $runRequest;
  90. }
  91. /**
  92. * Enqueue a request.
  93. *
  94. * @param Request $request
  95. */
  96. public function enqueue(Request $request)
  97. {
  98. $this->queue->enqueue($request);
  99. $this->logger->debug($this.' queued '.$request);
  100. $this->checkQueue();
  101. }
  102. /**
  103. * Checks for requests in the bucket.
  104. */
  105. public function checkQueue()
  106. {
  107. // We are already checking the queue.
  108. if ($this->checkerRunning) {
  109. return;
  110. }
  111. $checkQueue = function () use (&$checkQueue) {
  112. // Check for rate-limits
  113. if ($this->requestRemaining < 1 && ! is_null($this->requestRemaining)) {
  114. $interval = 0;
  115. if ($this->resetTimer) {
  116. $interval = $this->resetTimer->getInterval() ?? 0;
  117. }
  118. $this->logger->info($this.' expecting rate limit, timer interval '.($interval * 1000).' ms');
  119. $this->checkerRunning = false;
  120. return;
  121. }
  122. // Queue is empty, job done.
  123. if ($this->queue->isEmpty()) {
  124. $this->checkerRunning = false;
  125. return;
  126. }
  127. /** @var Request */
  128. $request = $this->queue->dequeue();
  129. ($this->runRequest)($request)->done(function (ResponseInterface $response) use (&$checkQueue) {
  130. $resetAfter = (float) $response->getHeaderLine('X-Ratelimit-Reset-After');
  131. $limit = $response->getHeaderLine('X-Ratelimit-Limit');
  132. $remaining = $response->getHeaderLine('X-Ratelimit-Remaining');
  133. if ($resetAfter) {
  134. $resetAfter = (float) $resetAfter;
  135. if ($this->resetTimer) {
  136. $this->loop->cancelTimer($this->resetTimer);
  137. }
  138. $this->resetTimer = $this->loop->addTimer($resetAfter, function () {
  139. // Reset requests remaining and check queue
  140. $this->requestRemaining = $this->requestLimit;
  141. $this->resetTimer = null;
  142. $this->checkQueue();
  143. });
  144. }
  145. // Check if rate-limit headers are present and store
  146. if (is_numeric($limit)) {
  147. $this->requestLimit = (int) $limit;
  148. }
  149. if (is_numeric($remaining)) {
  150. $this->requestRemaining = (int) $remaining;
  151. }
  152. // Check for more requests
  153. $checkQueue();
  154. }, function ($rateLimit) use (&$checkQueue, $request) {
  155. if ($rateLimit instanceof RateLimit) {
  156. $this->queue->enqueue($request);
  157. // Bucket-specific rate-limit
  158. // Re-queue the request and wait the retry after time
  159. if (! $rateLimit->isGlobal()) {
  160. $this->loop->addTimer($rateLimit->getRetryAfter(), $checkQueue);
  161. }
  162. // Stop the queue checker for a global rate-limit.
  163. // Will be restarted when global rate-limit finished.
  164. else {
  165. $this->checkerRunning = false;
  166. $this->logger->debug($this.' stopping queue checker');
  167. }
  168. } else {
  169. $checkQueue();
  170. }
  171. });
  172. };
  173. $this->checkerRunning = true;
  174. $checkQueue();
  175. }
  176. /**
  177. * Converts a bucket to a user-readable string.
  178. *
  179. * @return string
  180. */
  181. public function __toString()
  182. {
  183. return 'BUCKET '.$this->name;
  184. }
  185. }