Skip to content

Commit ba6a8cc

Browse files
authored
feat(core): tracing (#265)
Introduce `TraceOptions` in `PublishOptions` to enable message tracing with NATS. Messages can now be traced using `traceDestination` and optionally flagged as `traceOnly` to avoid delivery and generate trace data only. Updated `publish` implementation and added corresponding tests. Signed-off-by: Alberto Ricart <[email protected]>
1 parent 355e554 commit ba6a8cc

File tree

4 files changed

+150
-6
lines changed

4 files changed

+150
-6
lines changed

core/src/core.ts

+21-3
Original file line numberDiff line numberDiff line change
@@ -288,7 +288,7 @@ export interface MsgHdrs extends Iterable<[string, string[]]> {
288288
last(k: string, match?: Match): string;
289289
}
290290

291-
export interface RequestOptions {
291+
export interface RequestOptions extends TraceOptions {
292292
/**
293293
* number of milliseconds before the request will timeout.
294294
*/
@@ -311,7 +311,7 @@ export interface RequestOptions {
311311

312312
export type RequestStrategy = "timer" | "count" | "stall" | "sentinel";
313313

314-
export interface RequestManyOptions {
314+
export interface RequestManyOptions extends TraceOptions {
315315
strategy: RequestStrategy;
316316
maxWait: number;
317317
headers?: MsgHdrs;
@@ -653,7 +653,25 @@ export interface Subscription extends AsyncIterable<Msg> {
653653
getMax(): number | undefined;
654654
}
655655

656-
export interface PublishOptions {
656+
/**
657+
* These options enable message tracing through NATS.
658+
*/
659+
export interface TraceOptions {
660+
/**
661+
* If set, the server will send events representing the flow of the
662+
* message as it moves through the system to this subject.
663+
*/
664+
traceDestination?: string;
665+
/**
666+
* If true, the message will NOT be delivered, and instead will just
667+
* generate trace information. Note that in the context of requests,
668+
* this means the service will not be triggered so the operation will
669+
* timeout or return no responses.
670+
*/
671+
traceOnly?: boolean;
672+
}
673+
674+
export interface PublishOptions extends TraceOptions {
657675
/**
658676
* An optional subject where a response should be sent.
659677
* Note you must have a subscription listening on this subject

core/src/nats.ts

+30-3
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import { deferred } from "./util.ts";
1717
import { ProtocolHandler, SubscriptionImpl } from "./protocol.ts";
1818
import { Empty } from "./encoders.ts";
19+
import { headers } from "./headers.ts";
1920

2021
import type { Features, SemVer } from "./semver.ts";
2122
import { parseSemVer } from "./semver.ts";
@@ -103,6 +104,18 @@ export class NatsConnectionImpl implements NatsConnection {
103104
if (options?.reply) {
104105
this._check(options.reply, false, true);
105106
}
107+
108+
if (typeof options?.traceOnly === "boolean") {
109+
const hdrs = options.headers || headers();
110+
hdrs.set("Nats-Trace-Only", "true");
111+
options.headers = hdrs;
112+
}
113+
if (typeof options?.traceDestination === "string") {
114+
const hdrs = options.headers || headers();
115+
hdrs.set("Nats-Trace-Dest", options.traceDestination);
116+
options.headers = hdrs;
117+
}
118+
106119
this.protocol.publish(subject, data, options);
107120
}
108121

@@ -293,8 +306,14 @@ export class NatsConnectionImpl implements NatsConnection {
293306
sub?.unsubscribe();
294307
});
295308

309+
const { headers, traceDestination, traceOnly } = opts;
296310
try {
297-
this.publish(subject, data, { reply: sub.getSubject() });
311+
this.publish(subject, data, {
312+
reply: sub.getSubject(),
313+
headers,
314+
traceDestination,
315+
traceOnly,
316+
});
298317
} catch (err) {
299318
cancel(err as Error);
300319
}
@@ -322,13 +341,17 @@ export class NatsConnectionImpl implements NatsConnection {
322341
const r = new RequestMany(this.protocol.muxSubscriptions, subject, rmo);
323342
this.protocol.request(r);
324343

344+
const { headers, traceDestination, traceOnly } = opts;
345+
325346
try {
326347
this.publish(
327348
subject,
328349
data,
329350
{
330351
reply: `${this.protocol.muxSubscriptions.baseInbox}${r.token}`,
331-
headers: opts.headers,
352+
headers,
353+
traceDestination,
354+
traceOnly,
332355
},
333356
);
334357
} catch (err) {
@@ -415,13 +438,17 @@ export class NatsConnectionImpl implements NatsConnection {
415438
);
416439
this.protocol.request(r);
417440

441+
const { headers, traceDestination, traceOnly } = opts;
442+
418443
try {
419444
this.publish(
420445
subject,
421446
data,
422447
{
423448
reply: `${this.protocol.muxSubscriptions.baseInbox}${r.token}`,
424-
headers: opts.headers,
449+
headers,
450+
traceDestination,
451+
traceOnly,
425452
},
426453
);
427454
} catch (err) {

core/tests/basics_test.ts

+53
Original file line numberDiff line numberDiff line change
@@ -1726,3 +1726,56 @@ Deno.test("basics - internal close listener", async () => {
17261726

17271727
await ns.stop();
17281728
});
1729+
1730+
Deno.test("basics - publish tracing", async () => {
1731+
const { ns, nc } = await setup();
1732+
const sub = nc.subscribe("foo", { callback: () => {} });
1733+
1734+
const traces = nc.subscribe("traces", {
1735+
callback: () => {},
1736+
max: 2,
1737+
});
1738+
nc.flush();
1739+
1740+
nc.publish("foo", Empty, { traceDestination: "traces" });
1741+
nc.publish("foo", Empty, { traceDestination: "traces", traceOnly: true });
1742+
1743+
await traces.closed;
1744+
assertEquals(sub.getReceived(), 1);
1745+
assertEquals(traces.getReceived(), 2);
1746+
1747+
await cleanup(ns, nc);
1748+
});
1749+
1750+
Deno.test("basics - request tracing", async () => {
1751+
const { ns, nc } = await setup();
1752+
const sub = nc.subscribe("foo", {
1753+
callback: (_, m) => {
1754+
m.respond();
1755+
},
1756+
});
1757+
1758+
const traces = nc.subscribe("traces", {
1759+
callback: () => {},
1760+
max: 2,
1761+
});
1762+
nc.flush();
1763+
1764+
await nc.request("foo", Empty, {
1765+
timeout: 2_000,
1766+
traceDestination: "traces",
1767+
});
1768+
await assertRejects(() => {
1769+
return nc.request("foo", Empty, {
1770+
timeout: 2_000,
1771+
traceDestination: "traces",
1772+
traceOnly: true,
1773+
});
1774+
});
1775+
1776+
await traces.closed;
1777+
assertEquals(sub.getReceived(), 1);
1778+
assertEquals(traces.getReceived(), 2);
1779+
1780+
await cleanup(ns, nc);
1781+
});

core/tests/mrequest_test.ts

+46
Original file line numberDiff line numberDiff line change
@@ -511,3 +511,49 @@ Deno.test("mreq - no mux request no perms doesn't leak subs", async () => {
511511

512512
await cleanup(ns, nc);
513513
});
514+
515+
Deno.test("basics - request many tracing", async () => {
516+
const { ns, nc } = await setup();
517+
const sub = nc.subscribe("foo", {
518+
callback: (_, m) => {
519+
m.respond();
520+
m.respond();
521+
},
522+
});
523+
524+
const traces = nc.subscribe("traces", {
525+
callback: () => {},
526+
max: 2,
527+
});
528+
nc.flush();
529+
530+
let iter = await nc.requestMany("foo", Empty, {
531+
strategy: "stall",
532+
maxWait: 2_000,
533+
traceDestination: "traces",
534+
});
535+
let count = 0;
536+
for await (const _ of iter) {
537+
count++;
538+
}
539+
assertEquals(count, 2);
540+
541+
iter = await nc.requestMany("foo", Empty, {
542+
strategy: "stall",
543+
maxWait: 2_000,
544+
traceDestination: "traces",
545+
traceOnly: true,
546+
});
547+
548+
count = 0;
549+
for await (const _ of iter) {
550+
count++;
551+
}
552+
assertEquals(count, 0);
553+
554+
await traces.closed;
555+
assertEquals(sub.getReceived(), 1);
556+
assertEquals(traces.getReceived(), 2);
557+
558+
await cleanup(ns, nc);
559+
});

0 commit comments

Comments
 (0)