Skip to content

Commit 15ce3fc

Browse files
committed
Bump version to v1.1.2: README enhancements, adding a message queue example
1 parent 929a903 commit 15ce3fc

File tree

3 files changed

+60
-7
lines changed

3 files changed

+60
-7
lines changed

README.md

Lines changed: 57 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ Each use case necessitates distinct handling capabilities, which will be discuss
1313

1414
Traditional semaphore APIs require explicit *acquire* and *release* steps, adding overhead and responsibility for the user. Additionally, they introduce the risk of deadlocking the application if one forgets to *release*, for example, due to a thrown exception.
1515

16-
In contrast, `ZeroBackpressureSemaphore` manages job execution, abstracting away these details and reducing user responsibility. The *acquire* and *release* steps are handled implicitly by the execution methods, similarly to the RAII idiom in C++.
16+
In contrast, `ZeroBackpressureSemaphore` manages job execution, abstracting away these details and reducing user responsibility. The *acquire* and *release* steps are handled implicitly by the execution methods, reminiscent of the RAII idiom in C++.
1717

1818
Method names are chosen to clearly convey their functionality.
1919

@@ -25,7 +25,7 @@ npm i zero-backpressure-semaphore-typescript
2525

2626
## Key Features
2727

28-
- __Backpressure Control__: Ideal for job workers and background services. Concurrency control alone isn't sufficient to ensure stability and performance if backpressure control is overlooked.
28+
- __Backpressure Control__: Ideal for job workers and background services. Concurrency control alone isn't sufficient to ensure stability and performance if backpressure control is overlooked. Without backpressure control, the heap can become overloaded, resulting in space complexity of O(*semaphore-rooms* + *pending-jobs*) instead of O(*semaphore-rooms*).
2929
- __Graceful Termination__: Await the completion of all currently executing jobs via the `waitTillAllExecutingJobsAreSettled` method.
3030
- __High Efficiency__: All state-altering operations have a constant time complexity, O(1).
3131
- __Comprehensive documentation__: The class is thoroughly documented, enabling IDEs to provide helpful tooltips that enhance the coding experience.
@@ -90,7 +90,8 @@ async function handleDataAggregation(sensorUID): Promise<void> {
9090
}
9191
```
9292

93-
If the jobs might throw errors, you don't need to worry about these errors propagating up to the event loop and potentially crashing the application. Uncaught errors from jobs triggered by `startExecution` are captured by the semaphore and can be safely accessed for post-processing purposes (e.g., metrics). See the following adaptation of the above example, now utilizing the semaphore's error handling capabilities:
93+
If the jobs might throw errors, you don't need to worry about these errors propagating up to the event loop and potentially crashing the application. Uncaught errors from jobs triggered by `startExecution` are captured by the semaphore and can be safely accessed for post-processing purposes (e.g., metrics).
94+
Refer to the following adaptation of the above example, now utilizing the semaphore's error handling capabilities:
9495

9596
```ts
9697
import { ZeroBackpressureSemaphore } from 'zero-backpressure-semaphore-typescript';
@@ -145,7 +146,59 @@ async function handleDataAggregation(sensorUID): Promise<void> {
145146
```
146147

147148
Please note that in a real-world scenario, sensor UIDs are more likely to be consumed from a message queue (e.g., RabbitMQ, Kafka, AWS SNS) rather than from an in-memory array. This setup **highlights the benefits** of avoiding backpressure:
148-
We should avoid consuming a message if we cannot start processing it immediately. Working with message queues typically involves acknowledgements, which have timeout mechanisms. Therefore, immediate processing is crucial to ensure efficient and reliable handling of messages.
149+
We should avoid consuming a message if we cannot start processing it immediately. Working with message queues typically involves acknowledgements, which have timeout mechanisms. Therefore, immediate processing is crucial to ensure efficient and reliable handling of messages.
150+
Refer to the following adaptation of the previous example, where sensor UIDs are consumed from a message queue. This example overlooks error handling and message validation, for simplicity.
151+
152+
```ts
153+
import { ZeroBackpressureSemaphore } from 'zero-backpressure-semaphore-typescript';
154+
155+
const maxConcurrentAggregationJobs = 24;
156+
const sensorAggregationSemaphore =
157+
new ZeroBackpressureSemaphore<void, SensorAggregationError>(
158+
maxConcurrentAggregationJobs
159+
);
160+
161+
const SENSOR_UIDS_TOPIC = "IOT_SENSOR_UIDS";
162+
const mqClient = new MessageQueueClient(SENSOR_UIDS_TOPIC);
163+
164+
async function processConsumedMessages(): Promise<void> {
165+
let numberOfProcessedMessages = 0;
166+
167+
do {
168+
const message = await mqClient.receiveOneMessage();
169+
if (!message) {
170+
// Consider the queue as empty.
171+
break;
172+
}
173+
174+
++numberOfProcessedMessages;
175+
const { uid } = message.data;
176+
await sensorAggregationSemaphore.startExecution(
177+
(): Promise<void> => handleDataAggregation(uid);
178+
);
179+
180+
await mqClient.removeMessageFromQueue(message);
181+
} while (true);
182+
// Note: at this stage, jobs might be still executing, as we did not wait for
183+
// their completion.
184+
185+
// Graceful termination: await the completion of all currently executing jobs.
186+
await sensorAggregationSemaphore.waitTillAllExecutingJobsAreSettled();
187+
188+
// Post processing.
189+
const errors = sensorAggregationSemaphore.extractUncaughtErrors();
190+
if (errors.length > 0) {
191+
await updateFailedAggregationMetrics(errors);
192+
}
193+
194+
// Summary.
195+
const successfulJobsCount = numberOfProcessedMessages - errors.length;
196+
logger.info(
197+
`Successfully aggregated data from ${successfulJobsCount} IoT sensors, ` +
198+
`with failures in aggregating data from ${errors.length} IoT sensors`
199+
);
200+
}
201+
```
149202

150203
## 2nd use-case: Single Job Execution
151204

package-lock.json

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "zero-backpressure-semaphore-typescript",
3-
"version": "1.1.1",
3+
"version": "1.1.2",
44
"description": "A classic semaphore with modern API, inspired by the RAII idiom. Offering backpressure control for enhanced efficiency, and includes a built-in method to gracefully await the completion of all currently executing jobs.",
55
"repository": {
66
"type": "git",

0 commit comments

Comments
 (0)