Skip to content

Commit 1040e9c

Browse files
committed
Bump version to v3.1.3: add waitForAvailability example to README
1 parent 6868ff7 commit 1040e9c

File tree

3 files changed

+77
-34
lines changed

3 files changed

+77
-34
lines changed

README.md

Lines changed: 68 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ If your use case requires a concurrency of 1, consider using the lock variant of
3838
- __Graceful & Deterministic Teardown :hourglass_flowing_sand:__: Await the completion of all currently executing jobs via the `waitForAllExecutingJobsToComplete` method. This guarantees **smooth resource cleanup**, making it well-suited for production environments (e.g., `onModuleDestroy` in NestJS) and maintaining a clean state between unit tests.
3939
- __High Efficiency :gear:__: All state-altering operations have a constant time complexity, O(1).
4040
- __Comprehensive documentation :books:__: The class is thoroughly documented, enabling IDEs to provide helpful tooltips that enhance the coding experience.
41-
- __Robust Error Handling__: Uncaught errors from background jobs triggered by `startExecution` are captured and can be accessed using the `extractUncaughtErrors` method.
41+
- __Robust Error Handling :warning:__: Uncaught errors from background jobs triggered by `startExecution` are captured and can be accessed using the `extractUncaughtErrors` method.
4242
- __Metrics :bar_chart:__: The class offers various metrics through getter methods, such as `amountOfCurrentlyExecutingJobs`, providing insights into the semaphore's current state. These metrics can be used for periodic logging or to collect statistics from real-world usage.
4343
- __Tests :test_tube:__: Fully covered by rigorous unit tests.
4444
- Self-explanatory method names.
@@ -100,29 +100,29 @@ Note: method `waitForAllExecutingJobsToComplete` can be used to perform post-pro
100100
```ts
101101
import { ZeroBackpressureSemaphore } from 'zero-backpressure-semaphore-typescript';
102102

103-
const maxConcurrentAggregationJobs = 24;
104-
const sensorAggregationSemaphore = new ZeroBackpressureSemaphore<void>(
105-
maxConcurrentAggregationJobs
106-
);
103+
const MAX_CONCURRENT_AGGREGATIONS_PER_CUSTOMER = 24;
107104

108-
async function aggregateSensorsData(sensorUIDs: AsyncGenerator<string>) {
109-
let fetchedSensorsCounter = 0;
105+
async function aggregateCustomerSensorsData(sensorUIDs: AsyncGenerator<string>) {
106+
const aggregationSemaphore = new ZeroBackpressureSemaphore<void>(
107+
MAX_CONCURRENT_AGGREGATIONS_PER_CUSTOMER
108+
);
109+
let sensorCount = 0;
110110

111111
for await (const uid of sensorUIDs) {
112-
++fetchedSensorsCounter;
112+
++sensorCount;
113113

114114
// Until the semaphore can start aggregating data from the current sensor,
115115
// adding more jobs won't make sense as this would induce unnecessary backpressure.
116-
await sensorAggregationSemaphore.startExecution(
116+
await aggregationSemaphore.startExecution(
117117
(): Promise<void> => handleDataAggregation(uid)
118118
);
119119
}
120-
// Note: at this stage, jobs might be still executing, as we did not wait for
121-
// their completion.
120+
// Note: at this stage, jobs might be still executing, as we did not wait
121+
// for their completion.
122122

123123
// Graceful termination: await the completion of all currently executing jobs.
124-
await sensorAggregationSemaphore.waitForAllExecutingJobsToComplete();
125-
console.info(`Finished aggregating data from ${fetchedSensorsCounter} IoT sensors`);
124+
await aggregationSemaphore.waitForAllExecutingJobsToComplete();
125+
console.info(`Finished aggregating data from ${sensorCount} IoT sensors`);
126126
}
127127

128128
/**
@@ -136,46 +136,84 @@ async function handleDataAggregation(sensorUID): Promise<void> {
136136
}
137137
```
138138

139+
The `waitForAvailability` method is ideal for scenarios where you prefer a `while` loop structure or need to defer fetching the next job's metadata **until** the semaphore confirms that at least one execution slot is available.
140+
This pattern is especially useful when you want to minimize the **delay between acquiring job metadata and dispatching its execution**, ensuring timely and efficient job handling.
141+
Below is a tailored example illustrating this approach:
142+
```ts
143+
import { ZeroBackpressureSemaphore } from 'zero-backpressure-semaphore-typescript';
144+
145+
const MAX_CONCURRENT_AGGREGATIONS_PER_CUSTOMER = 24;
146+
147+
async function aggregateCustomerSensorsData(sensorUIDs: AsyncGenerator<string>) {
148+
const aggregationSemaphore = new ZeroBackpressureSemaphore<void>(
149+
MAX_CONCURRENT_AGGREGATIONS_PER_CUSTOMER
150+
);
151+
let sensorCount = 0;
152+
153+
do {
154+
// Ensure a slot is available before fetching the next UID.
155+
await aggregationSemaphore.waitForAvailability();
156+
157+
const { done, value: uid } = await sensorUIDs.next();
158+
if (done) break;
159+
160+
++sensorCount;
161+
162+
// Since availability was confirmed beforehand, the job will
163+
// start execution immediately.
164+
await aggregationSemaphore.startExecution(
165+
() => handleDataAggregation(uid)
166+
);
167+
} while (true);
168+
// Note: at this stage, jobs might be still executing, as we did not wait
169+
// for their completion.
170+
171+
// Final step: wait for all in-flight jobs to complete.
172+
await aggregationSemaphore.waitForAllExecutingJobsToComplete();
173+
console.info(`Finished aggregating data from ${sensorCount} IoT sensors`);
174+
}
175+
```
176+
139177
If jobs might throw errors, you don't need to worry about these errors propagating to the event loop and potentially crashing the application. Uncaught errors from jobs triggered by `startExecution` are captured by the semaphore and can be safely accessed for post-processing purposes (e.g., metrics).
140178
Refer to the following adaptation of the above example, now utilizing the semaphore's error handling capabilities:
141179

142180
```ts
143181
import { ZeroBackpressureSemaphore } from 'zero-backpressure-semaphore-typescript';
144182

145-
const maxConcurrentAggregationJobs = 24;
146-
const sensorAggregationSemaphore =
147-
// Notice the 2nd generic parameter (Error by default).
148-
new ZeroBackpressureSemaphore<void, SensorAggregationError>(
149-
maxConcurrentAggregationJobs
150-
);
183+
const MAX_CONCURRENT_AGGREGATIONS_PER_CUSTOMER = 24;
151184

152185
async function aggregateSensorsData(sensorUIDs: AsyncGenerator<string>) {
153-
let fetchedSensorsCounter = 0;
186+
const aggregationSemaphore =
187+
// Notice the 2nd generic parameter (Error by default).
188+
new ZeroBackpressureSemaphore<void, SensorAggregationError>(
189+
MAX_CONCURRENT_AGGREGATIONS_PER_CUSTOMER
190+
);
191+
let sensorCount = 0;
154192

155193
for await (const uid of sensorUIDs) {
156-
++fetchedSensorsCounter;
194+
++sensorCount;
157195

158196
// Until the semaphore can start aggregating data from the current sensor,
159197
// adding more jobs won't make sense as this would induce unnecessary backpressure.
160-
await sensorAggregationSemaphore.startExecution(
198+
await aggregationSemaphore.startExecution(
161199
(): Promise<void> => handleDataAggregation(uid)
162200
);
163201
}
164202
// Note: at this stage, jobs might be still executing, as we did not wait for
165203
// their completion.
166204

167205
// Graceful termination: await the completion of all currently executing jobs.
168-
await sensorAggregationSemaphore.waitForAllExecutingJobsToComplete();
206+
await aggregationSemaphore.waitForAllExecutingJobsToComplete();
169207

170208
// Post processing.
171-
const errors = sensorAggregationSemaphore.extractUncaughtErrors();
209+
const errors = aggregationSemaphore.extractUncaughtErrors();
172210
if (errors.length > 0) {
173211
await updateFailedAggregationMetrics(errors);
174212
}
175213

176214
// Summary.
177-
const successfulJobsCount = fetchedSensorsCounter - errors.length;
178-
logger.info(
215+
const successfulJobsCount = sensorCount - errors.length;
216+
console.info(
179217
`Successfully aggregated data from ${successfulJobsCount} IoT sensors, ` +
180218
`with failures in aggregating data from ${errors.length} IoT sensors`
181219
);
@@ -208,11 +246,13 @@ import { SemaphoreJob, ZeroBackpressureSemaphore } from 'zero-backpressure-semap
208246
type UserInfo = Record<string, string>;
209247

210248
const maxConcurrentDbRequests = 32;
211-
const dbAccessSemaphore = new ZeroBackpressureSemaphore<UserInfo>(maxConcurrentDbRequests);
249+
const dbAccessSemaphore = new ZeroBackpressureSemaphore<UserInfo>(
250+
maxConcurrentDbRequests
251+
);
212252

213253
app.get('/user/', async (req, res) => {
214254
// Define the sub-prodecure.
215-
const fetchUserInfo: SemaphoreJob<UserInfo> = async (): Promise<UserInfo> => {
255+
const fetchUserInfo = async (): Promise<UserInfo> => {
216256
const userInfo: UserInfo = await usersDbClient.get(req.userID);
217257
return userInfo;
218258
}

package-lock.json

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "zero-backpressure-semaphore-typescript",
3-
"version": "3.1.2",
3+
"version": "3.1.3",
44
"description": "A modern Promise-semaphore for Node.js projects, enabling users to limit the number of concurrently executing promises. Offering backpressure control for enhanced efficiency, utilizing a communicative API that signals availability, promoting a just-in-time approach. Additionally, it incorporates mechanisms for graceful termination and error handling, making it suitable for complex scenarios.",
55
"repository": {
66
"type": "git",
@@ -37,14 +37,17 @@
3737
"graceful-termination",
3838
"graceful-teardown",
3939
"graceful-shutdown",
40-
"scheduler",
41-
"executor",
40+
"wait-for-availability",
41+
"notify-availability",
42+
"free-slot",
43+
"execution-slot",
44+
"wait-for-completion",
45+
"wait-in-flight",
4246
"error-handling",
4347
"uncaught-error",
4448
"uncaught-rejection",
4549
"async",
4650
"nodejs",
47-
"Node.js",
4851
"typescript",
4952
"ts",
5053
"ES2020"

0 commit comments

Comments
 (0)