Skip to content

Commit a8f200c

Browse files
add docs for abstract transports (#235)
* add docs for abstract transports * prettier * add subscription and streaming transports to main readme * add abstract transport section to transports doc * prettier
1 parent d331ff9 commit a8f200c

File tree

5 files changed

+124
-3
lines changed

5 files changed

+124
-3
lines changed

README.md

+2
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@ yarn # Install yarn dependencies
4040
- [HTTP](./docs/components/transport-types/http-transport.md)
4141
- [WebSocket](./docs/components/transport-types/websocket-transport.md)
4242
- [SSE](./docs/components/transport-types/sse-transport.md)
43+
- [Subscription](./docs/components/transport-types/subscription-transport.md)
44+
- [Streaming](./docs/components/transport-types/streaming-transport.md)
4345
- [Custom](./docs/components/transport-types/custom-transport.md)
4446
- Guides
4547
- [Porting a v2 EA to v3](./docs/guides/porting-a-v2-ea-to-v3.md)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
# Streaming Transport
2+
3+
The `StreamingTransport` is an **abstract transport** (class) that extends the [SubscriptionTransport](./subscription-transport.md) and provides a foundation for implementing streaming-based transports. It handles incoming requests, manages subscriptions, and defines an abstract `streamHandler` method to process subscription deltas. This class is intended to be extended by specific transport implementations.
4+
5+
All incoming requests to the adapter for an endpoint that uses stream-based transport are stored in a cached set (`SubscriptionSet`). Periodically, the background execute loop of the adapter will read the entire subscription set and call the `backgroundHandler` method of the transport.`backgroundHandler` method is already implemented in `StreamingTransport`. It calculates subscription deltas (new subscriptions and subscriptions to unsubscribe) based on the all subscriptions and the current local subscriptions. The deltas are then passed to the `streamHandler` method for further processing.
6+
7+
When extending `StreamingTransport` there are two abstract methods that should be implemented by subclasses.
8+
9+
1. `streamHandler` receives endpoint context as first argument and object containing details for the desired, new, and stale subscriptions as second argument and is responsible for handling the streaming connection, sending messages to the streaming source, and processing subscription deltas.
10+
2. `getSubscriptionTtlFromConfig` is an abstract method from `SubscriptionTransport`. It receives adapter settings and should return time-to-live (TTL) value for subscription set.
11+
12+
An example of `StreamingTransport` is built-in [Websocket Transport](./websocket-transport.md) and [SSE Transport](./sse-transport.md)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
# Subscription Transport
2+
3+
The `SubscriptionTransport` is an **abstract transport** (class) that serves as the foundation for implementing subscription-based transports. It handles incoming requests, adds them to a subscription set, and provides those entries to a background handler method. This class is intended to be extended by specific transport implementations.
4+
5+
All incoming requests to the adapter for an endpoint that uses subscription-based transport are stored in a cached set (`SubscriptionSet`).
6+
Periodically, the background execute loop of the adapter will read the entire subscription set and call the `backgroundHandler` method of the transport.
7+
8+
`SubscriptionTransport` has two abstract methods that should be implemented by subclasses.
9+
10+
1. `backgroundHandler` is called on each background execution iteration. It receives endpoint context as first argument and an array of all the entries in the subscription set as second argument. Sub-transport logic should be defined in this method.
11+
2. `getSubscriptionTtlFromConfig` receives adapter settings and should return time-to-live (TTL) value for subscription set.
12+
13+
## Example implementation of SubscriptionTransport
14+
15+
```typescript
16+
// `AddressTransport` is a custom subscription-based transport that extends `SubscriptionTransport`
17+
// It uses `ethers` library to fetch data from a contract
18+
export class AddressTransport extends SubscriptionTransport<AddressTransportTypes> {
19+
// JsonRpcProvider provider instance to be used for contract calls in this example
20+
provider!: ethers.providers.JsonRpcProvider
21+
22+
// Initialize the transport with necessary dependencies, adapter settings, endpoint name, and a transport name.
23+
// You can initialize additional properties here as well, like in this case `this.provider`
24+
async initialize(
25+
dependencies: TransportDependencies<AddressTransportTypes>,
26+
adapterSettings: AddressTransportTypes['Settings'],
27+
endpointName: string,
28+
transportName: string,
29+
): Promise<void> {
30+
// when initializing additional properties don't forget to call super.initialize()
31+
await super.initialize(dependencies, adapterSettings, endpointName, transportName)
32+
33+
this.provider = new ethers.providers.JsonRpcProvider(
34+
adapterSettings.RPC_URL,
35+
adapterSettings.CHAIN_ID,
36+
)
37+
}
38+
39+
// backgroundHandler receives endpoint context and entries in subscription set and should implement the transport logic
40+
async backgroundHandler(
41+
context: EndpointContext<AddressTransportTypes>,
42+
entries: RequestParams[],
43+
) {
44+
// Processes each entry in subscription set
45+
await Promise.all(entries.map(async (param) => this.handleRequest(param)))
46+
// Sleeps for BACKGROUND_EXECUTE_MS miliseconds after processing all entries in subscription set
47+
await sleep(context.adapterSettings.BACKGROUND_EXECUTE_MS)
48+
}
49+
50+
// helper method that takes params in subscription set, cocnstructs and saves a response object into a cache.
51+
private async handleRequest(param: RequestParams) {
52+
let response: AdapterResponse<BaseEndpointTypes['Response']>
53+
try {
54+
response = await this._handleRequest(param)
55+
} catch (e) {
56+
const errorMessage = e instanceof Error ? e.message : 'Unknown error occurred'
57+
response = {
58+
statusCode: 502,
59+
errorMessage,
60+
timestamps: {
61+
providerDataRequestedUnixMs: 0,
62+
providerDataReceivedUnixMs: 0,
63+
providerIndicatedTimeUnixMs: undefined,
64+
},
65+
}
66+
}
67+
// save response to cache
68+
await this.responseCache.write(this.name, [{ params: param, response }])
69+
}
70+
71+
// helper method that gets the data from a contract and returns as AdapterResponse object
72+
private async _handleRequest(
73+
param: RequestParams,
74+
): Promise<AdapterResponse<AddressTransportTypes['Response']>> {
75+
const { contractAddress } = param
76+
const contract = new ethers.Contract(contractAddress, ABI, this.provider)
77+
78+
const providerDataRequestedUnixMs = Date.now()
79+
const addressList = await contract.getAddressList()
80+
81+
return {
82+
data: {
83+
result: addressList,
84+
},
85+
statusCode: 200,
86+
result: null,
87+
timestamps: {
88+
providerDataRequestedUnixMs,
89+
providerDataReceivedUnixMs: Date.now(),
90+
providerIndicatedTimeUnixMs: undefined,
91+
},
92+
}
93+
}
94+
95+
// getSubscriptionTtlFromConfig method should return TTL number for subscription sets in this transport
96+
getSubscriptionTtlFromConfig(adapterSettings: BaseEndpointTypes['Settings']): number {
97+
return adapterSettings.WARMUP_SUBSCRIPTION_TTL
98+
}
99+
}
100+
```
101+
102+
Another example of `SubscriptionTransport` is built-in [HTTP Transport](./http-transport.md).

docs/components/transports.md

+7-2
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,14 @@ Define transport file in a seperate folder called `transport`. The name of the f
1010

1111
The v3 framework provides transports to fetch data from a Provider using the common protocols they might use. Please refer to the guides listed below for the relevant transports your adapter endpoints need.
1212

13-
If you find that the built-in features of a transport do not meet your endpoint's requirements, you can define a custom transport extending one of the existing ones to include the custom functionality yourself.
14-
1513
- [HTTP Transport](./transport-types/http-transport.md)
1614
- [Websocket Transport](./transport-types/websocket-transport.md)
1715
- [SSE Transport](./transport-types/sse-transport.md)
1816
- [Custom Transport](./transport-types/custom-transport.md)
17+
18+
### Abstract Transports
19+
20+
If you find that the built-in features of a transport do not meet your endpoint's requirements, you can define a custom transport extending one of the abstract transports or existing basic ones to include the custom functionality yourself.
21+
22+
- [Subscription Transport](./transport-types/subscription-transport.md)
23+
- [Streaming Transport](./transport-types/streaming-transport.md)

docs/guides/creating-a-new-v3-ea.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ This guide carries stylistic biases, mainly in the context of the collection of
77
## Creating A New Adapter
88

99
The framework provides an interactive EA generator script to help create new adapters.
10-
To create a new adapter in the [External Adapters Monorepo](https://github.com/smartcontractkit/external-adapters-js) run `yarn generate-v3-adapter`. It will ask several questions regarding adapter and endpoints and will generate the file structure with all the boilerplate code.
10+
To create a new adapter in the [External Adapters Monorepo](https://github.com/smartcontractkit/external-adapters-js) run `yarn new`. It will ask several questions regarding adapter and endpoints and will generate the file structure with all the boilerplate code.
1111

1212
## Steps to create a new Adapter manually
1313

0 commit comments

Comments
 (0)