Skip to content

DF-20362 Update/cbp delay balance POC #3404

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions packages/sources/coinbase-prime/src/config/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@ export const config = new AdapterConfig({
required: true,
sensitive: true,
},
DELAYED_RESPONSE_MS: {
description: 'The amount of time to delay the new response in milliseconds',
type: 'number',
default: 180_000,
},
BACKGROUND_EXECUTE_MS: {
description:
'The amount of time the background execute should sleep before performing the next request',
Expand Down
66 changes: 66 additions & 0 deletions packages/sources/coinbase-prime/src/endpoint/balance2.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
import { AdapterEndpoint } from '@chainlink/external-adapter-framework/adapter'
import { InputParameters } from '@chainlink/external-adapter-framework/validation'
import { AdapterError } from '@chainlink/external-adapter-framework/validation/error'
import { SingleNumberResultResponse } from '@chainlink/external-adapter-framework/util'
import { config } from '../config'
import { balanceTransport } from '../transport/balance2'
import { getApiKeys } from '../transport/utils'

export const inputParameters = new InputParameters(
{
portfolio: {
required: true,
type: 'string',
description: 'The portfolio ID to query the balance of',
},
symbol: {
required: true,
type: 'string',
description: 'The symbol to return the balance for',
},
type: {
type: 'string',
description: 'The balance type to return',
default: 'total',
options: ['total', 'vault', 'trading'],
},
apiKey: {
type: 'string',
description:
'Alternative api keys to use for this request, {$apiKey}_ACCESS_KEY {$apiKey}_PASSPHRASE {$apiKey}_SIGNING_KEY required in environment variables',
default: '',
},
acceptDelay: {
type: 'boolean',
description: 'Delay ',
default: false,
},
},
[
{
portfolio: 'abcd1234-123a-1234-ab12-12a34bcd56e7',
symbol: 'BTC',
type: 'total',
apiKey: '',
acceptDelay: false,
},
],
)

export type BaseEndpointTypes = {
Parameters: typeof inputParameters.definition
Response: SingleNumberResultResponse
Settings: typeof config.settings
}

export const endpoint = new AdapterEndpoint({
name: 'balance2',
transport: balanceTransport,
inputParameters,
customInputValidation: (request, settings): AdapterError | undefined => {
if (request.requestContext.data.apiKey) {
getApiKeys(request.requestContext.data.apiKey, settings)
}
return
},
})
1 change: 1 addition & 0 deletions packages/sources/coinbase-prime/src/endpoint/index.ts
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
export { endpoint as balance } from './balance'
export { endpoint as balance2 } from './balance2'
export { endpoint as wallet } from './wallet'
4 changes: 2 additions & 2 deletions packages/sources/coinbase-prime/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
import { expose, ServerInstance } from '@chainlink/external-adapter-framework'
import { config } from './config'
import { balance, wallet } from './endpoint'
import { balance, wallet, balance2 } from './endpoint'
import { PoRAdapter } from '@chainlink/external-adapter-framework/adapter/por'

export const adapter = new PoRAdapter({
defaultEndpoint: balance.name,
name: 'COINBASE_PRIME',
config,
endpoints: [balance, wallet],
endpoints: [balance, wallet, balance2],
rateLimiting: {
tiers: {
default: {
Expand Down
256 changes: 256 additions & 0 deletions packages/sources/coinbase-prime/src/transport/balance2.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,256 @@
import { TransportDependencies } from '@chainlink/external-adapter-framework/transports'
import { BaseEndpointTypes, inputParameters } from '../endpoint/balance2'
import { sign, getApiKeys, errorResponse } from './utils'
import {
calculateCacheKey,
calculateHttpRequestKey,
} from '@chainlink/external-adapter-framework/cache'
import { EndpointContext } from '@chainlink/external-adapter-framework/adapter'
import { sleep, AdapterResponse, makeLogger } from '@chainlink/external-adapter-framework/util'
import { Requester } from '@chainlink/external-adapter-framework/util/requester'
import { SubscriptionTransport } from '@chainlink/external-adapter-framework/transports/abstract/subscription'

const logger = makeLogger('Balance2Transport')

export interface ResponseSchema {
balances: {
symbol: string
amount: string
holds: string
bonded_amount: string
reserved_amount: string
unbonding_amount: string
unvested_amount: string
pending_rewards_amount: string
past_rewards_amount: string
bondable_amount: string
withdrawable_amount: string
fiat_amount: string
}[]
type: string
trading_balances: {
total: string // Returns total in fiat amount
holds: string
}
vault_balances: {
total: string // Returns total in fiat amount
holds: string
}
}

export type BalanceTransportTypes = BaseEndpointTypes & {
Provider: {
RequestBody: never
ResponseBody: ResponseSchema
}
}

type RequestParams = typeof inputParameters.validated

// revisit if we have >100 separate portfolios using this EA
type BlipCacheValue = {
result: number
timestamp: number
}
const blipCache = new Map<string, BlipCacheValue>()

export class BalanceTransport extends SubscriptionTransport<BalanceTransportTypes> {
settings!: BalanceTransportTypes['Settings']
requester!: Requester
endpointName!: string

async initialize(
dependencies: TransportDependencies<BalanceTransportTypes>,
adapterSettings: BalanceTransportTypes['Settings'],
endpointName: string,
transportName: string,
): Promise<void> {
await super.initialize(dependencies, adapterSettings, endpointName, transportName)
this.settings = adapterSettings
this.requester = dependencies.requester
this.endpointName = endpointName
}

async backgroundHandler(
context: EndpointContext<BalanceTransportTypes>,
entries: RequestParams[],
) {
await Promise.all(entries.map(async (param) => this.handleRequest(param)))
await sleep(context.adapterSettings.BACKGROUND_EXECUTE_MS)
}

async handleRequest(param: RequestParams) {
let response: AdapterResponse<BaseEndpointTypes['Response']>
try {
response = await this._handleRequest(param)
} catch (e) {
const errorMessage = e instanceof Error ? e.message : 'Unknown error occurred'
logger.error(e, errorMessage)
response = {
statusCode: 502,
errorMessage,
timestamps: {
providerDataRequestedUnixMs: 0,
providerDataReceivedUnixMs: 0,
providerIndicatedTimeUnixMs: undefined,
},
}
}
await this.responseCache.write(this.name, [{ params: param, response }])
}

async _handleRequest(
param: RequestParams,
): Promise<AdapterResponse<BalanceTransportTypes['Response']>> {
const { portfolio, symbol, type, apiKey, acceptDelay } = param
const providerDataRequestedUnixMs = Date.now()

const response = await this.sendBalanceRequest(portfolio, symbol, type, apiKey)
if (!response) {
return errorResponse(
`The data provider did not return data for Portfolio: ${param.portfolio}, Balance Type: ${param.type}, Symbol: ${param.symbol}`,
providerDataRequestedUnixMs,
)
}

if (!response.balances) {
return errorResponse(
`The data provider response does not contain a balances list for Portfolio: ${param.portfolio}, Balance Type: ${param.type}, Symbol: ${param.symbol}`,
providerDataRequestedUnixMs,
)
}

// The adapter only supports querying one asset at a time so the balances list should only contain 1 element
if (response.balances.length !== 1) {
return errorResponse(
`The data provider response does not contain exactly one element in the balances list for Portfolio: ${param.portfolio}, Balance Type: ${param.type}, Symbol: ${param.symbol}`,
providerDataRequestedUnixMs,
)
}

const result = Number(response.balances[0].amount)
if (isNaN(result)) {
return errorResponse(
`The data provider returned non-numeric balance: ${response.balances[0].amount}`,
providerDataRequestedUnixMs,
)
}

const generateResponseBody = (r: number = result) => {
return {
result: r,
data: {
result: r,
},
statusCode: 200,
timestamps: {
providerDataRequestedUnixMs,
providerDataReceivedUnixMs: Date.now(),
providerIndicatedTimeUnixMs: undefined,
},
}
}

// If acceptDelay is false, return the new result right away
if (!acceptDelay) {
return generateResponseBody()
}

const cacheKey = calculateCacheKey({
transportName: this.name,
data: param,
adapterName: this.responseCache.adapterName,
endpointName: this.responseCache.endpointName,
adapterSettings: this.responseCache.adapterSettings,
})

// If `result` doesn't match already cached response, don't update the response cache right away.
// We want to delay returning the new value by time = DELAYED_RESPONSE_MS by caching this value
// in a separate map for DELAYED_RESPONSE_MS.
const cachedResponse = await this.responseCache.cache.get(cacheKey)
if (!cachedResponse?.result || result === cachedResponse.result) {
// If no cached result or the new result is the same as the cached result,
// return the new result, which writes to or refreshes the response cache TTL
// Clear the blipCache to avoid edge case where the value goes from x to y, then back to x, then back to y
// which would maintain a value in the cache, registering the second y as having passed the cache threshold immediately
logger.trace(`Preventatively deleting blipCache for ${cacheKey}`)
blipCache.delete(cacheKey)
return generateResponseBody()
}

const blipCacheValue = blipCache.get(cacheKey)

// If the result is the same as the temporarily cached value in blipCache, we want to check if
// the value in blipCache has been cached long enough to be considered "good"
if (result === blipCacheValue?.result) {
const isBlipCacheStale =
blipCacheValue?.timestamp <= Date.now() - this.settings.DELAYED_RESPONSE_MS
if (isBlipCacheStale) {
// blipCache value has been cached long enough and seems like a good value, update the response cache
logger.debug(`Deleting blipCache for ${cacheKey}`)
blipCache.delete(cacheKey)
return generateResponseBody()
}
} else {
// blipCache value is missing or is not the same as the result, overwrite
logger.debug(`Setting blipCache for ${cacheKey} to ${result}`)
blipCache.set(cacheKey, { result, timestamp: providerDataRequestedUnixMs })
}

// At this point, we have a new result that is different from the cached result
// and the blipCache value is still under the DELAYED_RESPONSE_MS threshold.
// return the cached result
return generateResponseBody(cachedResponse.result)
}

async sendBalanceRequest(
portfolio: string,
symbol: string,
type: string,
apiKey: string,
): Promise<ResponseSchema> {
const [signingKey, accessKey, passPhrase] = getApiKeys(apiKey, this.settings)
const timestamp = Math.floor(Date.now() / 1000)
const method = 'GET'
const path = `/v1/portfolios/${portfolio}/balances`
const message = `${timestamp}${method}${path}`
const signature = sign(message, signingKey)

const requestConfig = {
baseURL: this.settings.API_ENDPOINT,
url: path,
headers: {
'X-CB-ACCESS-KEY': accessKey,
'X-CB-ACCESS-PASSPHRASE': passPhrase,
'X-CB-ACCESS-SIGNATURE': signature,
'X-CB-ACCESS-TIMESTAMP': timestamp,
'Content-Type': 'application/json',
},
params: {
symbols: symbol.toUpperCase(),
balance_type: `${type.toUpperCase()}_BALANCES`,
},
}

const res = await this.requester.request<ResponseSchema>(
calculateHttpRequestKey<BalanceTransportTypes>({
context: {
adapterSettings: this.settings,
inputParameters,
endpointName: this.endpointName,
},
data: requestConfig.params,
transportName: this.name,
}),
requestConfig,
)

return res.response.data
}

getSubscriptionTtlFromConfig(adapterSettings: BaseEndpointTypes['Settings']): number {
return adapterSettings.WARMUP_SUBSCRIPTION_TTL
}
}

export const balanceTransport = new BalanceTransport()
16 changes: 16 additions & 0 deletions packages/sources/coinbase-prime/src/transport/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,19 @@ export const getApiKeys = (apiKey: string, config: BaseEndpointTypes['Settings']
return [config.SIGNING_KEY, config.ACCESS_KEY, config.PASSPHRASE]
}
}

export const errorResponse = (
message: string,
providerDataRequestedUnixMs: number,
statusCode = 502,
) => {
return {
errorMessage: message,
statusCode,
timestamps: {
providerDataRequestedUnixMs,
providerDataReceivedUnixMs: Date.now(),
providerIndicatedTimeUnixMs: undefined,
},
}
}
Loading