Skip to content

Commit d032ab2

Browse files
authored
Introduce exponential backoff for stream handler errors (#260)
1 parent e569e96 commit d032ab2

File tree

7 files changed

+58
-36
lines changed

7 files changed

+58
-36
lines changed

.github/workflows/main.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ jobs:
3737
set +e
3838
files=$(find ${{ github.workspace }}/test -name "*.test.ts" | xargs grep -l 'test.only')
3939
set -e
40-
40+
4141
if [[ -n $files ]]; then
4242
echo "Error: Found /test.only/ in following test files:"
4343
echo "$files"

src/background-executor.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ export async function callBackgroundExecutes(adapter: Adapter, apiShutdownPromis
9595
`Finished background execute for endpoint "${endpoint.name}", calling it again in 10ms...`,
9696
)
9797
metricsTimer()
98-
timeoutsMap[endpoint.name] = setTimeout(handler, 10)
98+
timeoutsMap[endpoint.name] = setTimeout(handler, 10) // 10ms
9999
}
100100

101101
// Start recursive async calls

src/config/index.ts

Lines changed: 20 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,6 @@ import CensorList, { CensorKeyValue } from '../util/censor/censor-list'
22
import { Validator, validator } from '../validation/utils'
33

44
export const BaseSettingsDefinition = {
5-
// V2 compat
6-
// ADAPTER_URL: {
7-
// description: 'The URL of another adapter from which data needs to be retrieved',
8-
// type: 'string',
9-
// },
105
API_TIMEOUT: {
116
description:
127
'The number of milliseconds a request can be pending before returning a timeout error for data provider request',
@@ -63,11 +58,6 @@ export const BaseSettingsDefinition = {
6358
type: 'string',
6459
default: '127.0.0.1',
6560
},
66-
// CACHE_REDIS_MAX_QUEUED_ITEMS: {
67-
// description: 'Maximum length of the client internal command queue',
68-
// type: 'number',
69-
// default: 500,
70-
// },
7161
CACHE_REDIS_MAX_RECONNECT_COOLDOWN: {
7262
description: 'Max cooldown (in ms) before attempting redis reconnection',
7363
type: 'number',
@@ -112,6 +102,26 @@ export const BaseSettingsDefinition = {
112102
type: 'string',
113103
default: '',
114104
},
105+
STREAM_HANDLER_RETRY_MAX_MS: {
106+
type: 'number',
107+
description:
108+
'The maximum time (ms) to wait before running the stream handler (takes precedent over STREAM_HANDLER_RETRY_MIN_MS',
109+
default: 20 * 60 * 1000,
110+
validate: validator.integer({ min: 3_000, max: 24 * 60 * 60 * 1000 }),
111+
},
112+
STREAM_HANDLER_RETRY_MIN_MS: {
113+
type: 'number',
114+
description: 'The minimum/base time (ms) to wait before trying to run the stream handler',
115+
default: 100,
116+
validate: validator.integer({ min: 100, max: 10_000 }),
117+
},
118+
STREAM_HANDLER_RETRY_EXP_FACTOR: {
119+
type: 'number',
120+
description:
121+
'The factor for exponential back-off to wait before running the stream handler (1 = no change from STREAM_HANDLER_RETRY_MIN_MS)',
122+
default: 3,
123+
validate: validator.integer({ min: 1, max: 10 }),
124+
},
115125
SUBSCRIPTION_SET_MAX_ITEMS: {
116126
type: 'number',
117127
description: 'The maximum number of subscriptions set',
@@ -123,30 +133,18 @@ export const BaseSettingsDefinition = {
123133
type: 'boolean',
124134
default: true,
125135
},
126-
// DATA_PROVIDER_URL: {
127-
// description: 'Legacy variable that has the same functionality as ADAPTER_URL',
128-
// type: 'string',
129-
// },
130136
DEBUG: {
131137
description: 'Toggles debug mode',
132138
type: 'boolean',
133139
default: false,
134140
},
135-
// DEFAULT_WS_HEARTBEAT_INTERVAL: {
136-
// description: 'Interval between WS heartbeat pings (ms)',
137-
// type: 'number',
138-
// default: 30000,
139-
// },
140141
EA_PORT: {
141142
description:
142143
'Port through which the EA will listen for REST requests (if mode is set to "reader" or "reader-writer")',
143144
type: 'number',
144145
default: 8080,
145146
validate: validator.port(),
146147
},
147-
// ERROR_CAPACITY: {
148-
// type: 'number',
149-
// },
150148
EXPERIMENTAL_METRICS_ENABLED: {
151149
description:
152150
'Flag to specify whether or not to collect metrics. Used as fallback for METRICS_ENABLED',
@@ -250,12 +248,6 @@ export const BaseSettingsDefinition = {
250248
default: 10_000,
251249
validate: validator.integer({ min: 500, max: 30_000 }),
252250
},
253-
// WS_TIME_UNTIL_HANDLE_NEXT_MESSAGE_OVERRIDE: {
254-
// description: 'Time to wait until adapter should handle next WS message',
255-
// type: 'number',
256-
// },
257-
258-
// V3
259251
CACHE_POLLING_MAX_RETRIES: {
260252
description:
261253
'Max amount of times to attempt to find EA response in the cache after the Transport has been set up',

src/metrics/index.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -294,6 +294,11 @@ export const metrics = new Metrics(() => ({
294294
help: 'The number of redis commands sent',
295295
labelNames: ['status', 'function_name'],
296296
}),
297+
streamHandlerErrors: new client.Counter({
298+
name: 'stream_handler_errors',
299+
help: 'The number of stream handler errors per endpoint x transport',
300+
labelNames: ['adapter_endpoint', 'transport'] as const,
301+
}),
297302
cacheWarmerCount: new client.Gauge({
298303
name: 'cache_warmer_get_count',
299304
help: 'The number of cache warmers running',

src/transports/abstract/streaming.ts

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ import { EndpointContext } from '../../adapter'
33
import { censorLogs, makeLogger } from '../../util'
44
import { TypeFromDefinition } from '../../validation/input-params'
55
import { SubscriptionTransport } from './subscription'
6+
import { metrics } from '../../metrics'
67

78
const logger = makeLogger('StreamingTransport')
89

@@ -37,10 +38,17 @@ export abstract class StreamingTransport<
3738
// This only tracks when the connection was established, not when the subscription was requested
3839
providerDataStreamEstablished = 0
3940

41+
retryCount = 0
42+
retryTime = 0
43+
4044
override async backgroundHandler(
4145
context: EndpointContext<T>,
4246
desiredSubs: TypeFromDefinition<T['Parameters']>[],
4347
): Promise<void> {
48+
if (this.retryTime > Date.now()) {
49+
return
50+
}
51+
4452
logger.debug('Generating delta (subscribes & unsubscribes)')
4553

4654
const desiredSubsSet = new Set(desiredSubs.map((s) => JSON.stringify(s)))
@@ -62,7 +70,25 @@ export abstract class StreamingTransport<
6270
censorLogs(() => logger.trace(`Will unsubscribe to: ${JSON.stringify(subscriptions.stale)}`))
6371
}
6472

65-
await this.streamHandler(context, subscriptions)
73+
try {
74+
await this.streamHandler(context, subscriptions)
75+
this.retryCount = 0
76+
} catch (error) {
77+
censorLogs(() => logger.error(error, (error as Error).stack))
78+
metrics
79+
.get('streamHandlerErrors')
80+
.labels({ adapter_endpoint: context.endpointName, transport: this.name })
81+
.inc()
82+
const timeout = Math.min(
83+
context.adapterSettings.STREAM_HANDLER_RETRY_MIN_MS *
84+
context.adapterSettings.STREAM_HANDLER_RETRY_EXP_FACTOR ** this.retryCount,
85+
context.adapterSettings.STREAM_HANDLER_RETRY_MAX_MS,
86+
)
87+
this.retryTime = Date.now() + timeout
88+
this.retryCount += 1
89+
logger.info(`Waiting ${timeout}ms before backgroundHandler retry #${this.retryCount}`)
90+
return
91+
}
6692

6793
logger.debug('Setting local state to subscription set value')
6894
this.localSubscriptions = desiredSubs

src/transports/abstract/subscription.ts

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,7 @@ const logger = makeLogger('SubscriptionTransport')
1414
*
1515
* @typeParam T - all types related to the [[Transport]]
1616
*/
17-
export abstract class SubscriptionTransport<const T extends TransportGenerics>
18-
implements Transport<T>
19-
{
17+
export abstract class SubscriptionTransport<T extends TransportGenerics> implements Transport<T> {
2018
responseCache!: ResponseCache<T>
2119
subscriptionSet!: SubscriptionSet<TypeFromDefinition<T['Parameters']>>
2220
subscriptionTtl!: number
@@ -54,6 +52,7 @@ export abstract class SubscriptionTransport<const T extends TransportGenerics>
5452

5553
async backgroundExecute(context: EndpointContext<T>): Promise<void> {
5654
logger.debug('Starting background execute')
55+
5756
const entries = await this.subscriptionSet.getAll()
5857

5958
// Keep track of active subscriptions for background execute

test/transports/websocket.test.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -520,7 +520,7 @@ test.serial(
520520

521521
const metrics = await testAdapter.getMetrics()
522522
metrics.assert(t, {
523-
name: 'bg_execute_errors',
523+
name: 'stream_handler_errors',
524524
labels: {
525525
adapter_endpoint: 'test',
526526
transport: 'default_single_transport',
@@ -776,7 +776,7 @@ test.serial('does not hang the background execution if the open handler hangs',
776776

777777
const metrics = await testAdapter.getMetrics()
778778
metrics.assert(t, {
779-
name: 'bg_execute_errors',
779+
name: 'stream_handler_errors',
780780
labels: {
781781
adapter_endpoint: 'test',
782782
transport: 'default_single_transport',

0 commit comments

Comments
 (0)