Skip to content

Commit 6dba4f5

Browse files
Add API credit support to BurstRateLimiter (#205)
1 parent ea64374 commit 6dba4f5

File tree

6 files changed

+48
-27
lines changed

6 files changed

+48
-27
lines changed

src/rate-limiting/burst.ts

+17-16
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@ const logger = makeLogger('BurstRateLimiter')
66

77
/**
88
* This rate limiter is the simplest stateful option.
9+
* It will keep track of used API credits (by default, one request equals one credit) and
10+
* upon reaching the limits in a fixed time window, will block the requests until API credits are available again.
911
* On startup, it'll compare the different thresholds for each tier, calculate them all
1012
* in the finest window we'll use (seconds), and use the most restrictive one.
1113
* This is so if the EA were to restart, we don't need to worry about persisting state
@@ -14,9 +16,9 @@ const logger = makeLogger('BurstRateLimiter')
1416
*/
1517
export class BurstRateLimiter implements RateLimiter {
1618
latestSecondInterval = 0
17-
requestsThisSecond = 0
19+
creditsThisSecond = 0
1820
latestMinuteInterval = 0
19-
requestsThisMinute = 0
21+
creditsThisMinute = 0
2022
perSecondLimit!: number
2123
perMinuteLimit!: number
2224

@@ -29,9 +31,8 @@ export class BurstRateLimiter implements RateLimiter {
2931
this.perMinuteLimit = Math.min(limits?.rateLimit1m || Infinity, perHourLimit)
3032
this.perSecondLimit = limits?.rateLimit1s || Infinity
3133
logger.debug(
32-
`Using rate limiting settings: perMinute = ${this.perMinuteLimit} | perSecond: = ${this.perSecondLimit}`,
34+
`Using API credit limit settings: perMinute = ${this.perMinuteLimit} | perSecond: = ${this.perSecondLimit}`,
3335
)
34-
3536
return this
3637
}
3738

@@ -46,18 +47,18 @@ export class BurstRateLimiter implements RateLimiter {
4647
// Ops should be "thread safe". Thank JS and its infinite single threaded dumbness.
4748
if (nearestSecondInterval !== this.latestSecondInterval) {
4849
logger.trace(
49-
`Clearing latest second interval, # of requests logged was: ${this.requestsThisSecond} `,
50+
`Clearing latest second interval, # of credits logged was: ${this.creditsThisSecond} `,
5051
)
5152
this.latestSecondInterval = nearestSecondInterval
52-
this.requestsThisSecond = 0
53+
this.creditsThisSecond = 0
5354
}
5455

5556
if (nearestMinuteInterval !== this.latestMinuteInterval) {
5657
logger.trace(
57-
`Clearing latest second minute, # of requests logged was: ${this.requestsThisMinute} `,
58+
`Clearing latest second minute, # of credits logged was: ${this.creditsThisMinute} `,
5859
)
5960
this.latestMinuteInterval = nearestMinuteInterval
60-
this.requestsThisMinute = 0
61+
this.creditsThisMinute = 0
6162
}
6263

6364
return {
@@ -76,33 +77,33 @@ export class BurstRateLimiter implements RateLimiter {
7677
const { now, nextSecondInterval, nextMinuteInterval } = this.updateIntervals()
7778

7879
const timeToWaitForNextSecond =
79-
this.requestsThisSecond < this.perSecondLimit ? 0 : nextSecondInterval - now
80+
this.creditsThisSecond < this.perSecondLimit ? 0 : nextSecondInterval - now
8081
const timeToWaitForNextMinute =
81-
this.requestsThisMinute < this.perMinuteLimit ? 0 : nextMinuteInterval - now
82+
this.creditsThisMinute < this.perMinuteLimit ? 0 : nextMinuteInterval - now
8283
const timeToWait = Math.max(timeToWaitForNextSecond, timeToWaitForNextMinute)
8384

8485
return timeToWait
8586
}
8687

87-
async waitForRateLimit(): Promise<void> {
88+
async waitForRateLimit(creditCost = 1): Promise<void> {
8889
const timeToWait = this.msUntilNextExecution()
8990

9091
if (timeToWait === 0) {
9192
logger.trace(
92-
`Request under limits, current count: (S = ${this.requestsThisSecond} | M = ${this.requestsThisMinute})`,
93+
`API credits under limits, current count: (S = ${this.creditsThisSecond} | M = ${this.creditsThisMinute}, | C = ${creditCost})`,
9394
)
9495
} else {
9596
logger.trace(
96-
`Capacity for provider requests has been reached this interval (S = ${this.requestsThisSecond} | M = ${this.requestsThisMinute}), need to wait ${timeToWait}ms`,
97+
`Capacity for provider API credits has been reached this interval (S = ${this.creditsThisSecond} | M = ${this.creditsThisMinute} | C = ${creditCost}), need to wait ${timeToWait}ms`,
9798
)
9899
await sleep(timeToWait)
99100
this.updateIntervals()
100101
}
102+
this.creditsThisSecond += creditCost
103+
this.creditsThisMinute += creditCost
101104

102-
this.requestsThisSecond++
103-
this.requestsThisMinute++
104105
logger.trace(
105-
`Request is now ready to go, updated count: (S = ${this.requestsThisSecond} | M = ${this.requestsThisMinute})`,
106+
`Request is now ready to go, updated count: (S = ${this.creditsThisSecond} | M = ${this.creditsThisMinute})`,
106107
)
107108

108109
return

src/rate-limiting/index.ts

+3-1
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,10 @@ export interface RateLimiter {
3131
/**
3232
* This method will block (if necessary) until the rate limiter can make sure the
3333
* next outbound request will be within the specified limits.
34+
*
35+
* @param cost - credit cost of an API request sent to Data Provider. Used for burst rate limiter
3436
*/
35-
waitForRateLimit(): Promise<void>
37+
waitForRateLimit(cost?: number): Promise<void>
3638

3739
/**
3840
* Returns the time in milliseconds until the next request would be able to be fired

src/transports/http.ts

+5
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,10 @@ export type ProviderRequestConfig<T extends HttpTransportGenerics> = {
4747

4848
/** An endpoint name override for the requester key to allow the coalescing of requests across endpoints */
4949
endpointNameOverride?: string
50+
51+
/** The API credit cost of the request sent to the data provider.
52+
* Applies only for burst rate limit strategy and is ignored if another strategy is used. */
53+
cost?: number
5054
}
5155

5256
/**
@@ -235,6 +239,7 @@ export class HttpTransport<T extends HttpTransportGenerics> extends Subscription
235239
transportName: this.name,
236240
}),
237241
requestConfig.request,
242+
requestConfig.cost,
238243
)
239244

240245
// Parse responses and apply timestamps

src/util/requester.ts

+9-2
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ interface QueuedRequest<T = unknown> {
6666
key: string
6767
config: AxiosRequestConfig
6868
retries: number
69+
cost?: number
6970
promise: Promise<RequesterResult<T>>
7071
reject: (err: unknown) => void
7172
resolve: (req: RequesterResult<T>) => void
@@ -158,9 +159,14 @@ export class Requester {
158159
*
159160
* @param key - a key to uniquely identify this request, and coalesce new ones that match
160161
* @param req - a request to send to a data provider
162+
* @param cost - Data Provider API credit cost of the request
161163
* @returns a promise that will resolve whenever the request is popped from the queue, sent, and a response is received
162164
*/
163-
async request<T>(key: string, req: AxiosRequestConfig): Promise<RequesterResult<T>> {
165+
async request<T>(
166+
key: string,
167+
req: AxiosRequestConfig,
168+
cost?: number,
169+
): Promise<RequesterResult<T>> {
164170
// If there's already a queued request, reuse it's existing promise
165171
const existingQueuedRequest = this.map[key]
166172
if (existingQueuedRequest) {
@@ -174,6 +180,7 @@ export class Requester {
174180
key,
175181
config: req,
176182
retries: 0,
183+
cost,
177184
} as QueuedRequest<T>
178185

179186
// This dual promise layer is built so the queuedRequest can hold both the resolve and reject handlers,
@@ -212,7 +219,7 @@ export class Requester {
212219
)
213220

214221
// Wait until the rate limiter allows the request to be executed
215-
await this.rateLimiter.waitForRateLimit()
222+
await this.rateLimiter.waitForRateLimit(next.cost)
216223

217224
// Fire off to complete in the background. We don't wait here to be able to fire off multiple requests concurrently
218225
this.executeRequest.bind(this)(next)

test/rate-limit-config.test.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -487,7 +487,7 @@ test('test build highest rate limits from config second, minute)', async (t) =>
487487
t.is(highestRateLimitTier, 4)
488488
})
489489

490-
test('returns 0 when limit is set to infinity, no tier limit specified', async (t) => {
490+
test('returns 0 when limit is set to infinity, no tier limit specified (burst rate limiter)', async (t) => {
491491
const burstRateLimiter = RateLimiterFactory.buildRateLimiter(
492492
RateLimitingStrategy.BURST,
493493
).initialize([], {})

test/transports/http.test.ts

+13-7
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,7 @@ class MockHttpTransport extends HttpTransport<HttpTransportTypes> {
104104
pairs: params.map((p) => ({ base: p.from, quote: p.to })),
105105
},
106106
},
107+
cost: requestCost,
107108
}),
108109
parseResponse: (
109110
params,
@@ -145,6 +146,7 @@ const from = 'ETH'
145146
const to = 'USD'
146147
const price = 1234
147148
const volume = 4567
149+
const requestCost = 2 // Static API credit cost of the request
148150

149151
axiosMock
150152
.onPost(URL + endpoint, {
@@ -254,16 +256,18 @@ test.serial(
254256
t.is(transport.backgroundExecuteCalls, expectedValue)
255257
}
256258

259+
let expected = rateLimit1m * Math.ceil(WARMUP_SUBSCRIPTION_TTL / 60_000)
260+
await subtest(RateLimitingStrategy.FIXED_INTERVAL, expected)
261+
257262
/**
258-
* The expected number of requests is at follows:
259-
* - At minute 0, a burst of `rateLimit1m` requests will be fired.
263+
* The expected number of requests using burst limiter is at follows:
264+
* - At minute 0, a burst of N requests will be fired, until the number of used API credits equals or exceeds `rateLimit1m`.
260265
* - The next request after those will be attempted immediately, only to hit the rate limiter blocking sleep.
261266
* - This will be repeated at the beginning of each minute interval (hence the rounding up when dividing the TTL).
262267
* - Finally, the +1 for the burst strategy will be the last request that was attempted to be fired then blocked,
263268
* and by the time it's fired the subscription is no longer active.
264269
*/
265-
const expected = rateLimit1m * Math.ceil(WARMUP_SUBSCRIPTION_TTL / 60_000)
266-
await subtest(RateLimitingStrategy.FIXED_INTERVAL, expected)
270+
expected = Math.ceil(rateLimit1m / requestCost) * Math.ceil(WARMUP_SUBSCRIPTION_TTL / 60_000)
267271
await subtest(RateLimitingStrategy.BURST, expected + 1)
268272
},
269273
)
@@ -319,8 +323,10 @@ test.serial(
319323
t.is(transport.backgroundExecuteCalls, expectedValue)
320324
}
321325

322-
const expected = 60 * rateLimit1s + 1
326+
let expected = 60 * rateLimit1s + 1
323327
await subtest(RateLimitingStrategy.FIXED_INTERVAL, expected)
328+
// For burst rate limiter, we need to divide the credit limit to the cost of one request
329+
expected = 60 * Math.ceil(rateLimit1s / requestCost) + 1
324330
await subtest(RateLimitingStrategy.BURST, expected)
325331
},
326332
)
@@ -871,9 +877,9 @@ test.serial(
871877
await testAdapter.api.close()
872878
}
873879

874-
// With the burst rate limiter, the result should be 2 requests, since we're advancing the clock
880+
// With the burst rate limiter, the result should be 2 requests divided by the cost, since we're advancing the clock
875881
// less time than the per minute interval, but we're still in the same minute window and have capacity
876-
await subtest(RateLimitingStrategy.BURST, 2)
882+
await subtest(RateLimitingStrategy.BURST, 2 / requestCost)
877883
// The fixed interval rate limiter on the other hand should not have executed the same request yet, since it
878884
// will wait for 30s before actually getting the result
879885
// await subtest(RateLimitingStrategy.FIXED_INTERVAL, 1)

0 commit comments

Comments
 (0)