File tree 4 files changed +57
-3
lines changed
4 files changed +57
-3
lines changed Original file line number Diff line number Diff line change @@ -718,6 +718,14 @@ export type PurgeResponse = Success & {
718
718
purged : number ;
719
719
} ;
720
720
721
+ /**
722
+ * Additional options that express the intention of the API
723
+ */
724
+ export type ConsumerApiOptions = ConsumerApiAction | {
725
+ action ?: ConsumerApiAction ;
726
+ pedantic ?: boolean ;
727
+ } ;
728
+
721
729
export const ConsumerApiAction = {
722
730
CreateOrUpdate : "" ,
723
731
Update : "update" ,
@@ -730,6 +738,7 @@ export type CreateConsumerRequest = {
730
738
"stream_name" : string ;
731
739
config : Partial < ConsumerConfig > ;
732
740
action ?: ConsumerApiAction ;
741
+ pedantic ?: boolean ;
733
742
} ;
734
743
735
744
export type StreamMsgResponse = ApiResponse & {
Original file line number Diff line number Diff line change @@ -30,6 +30,7 @@ import {
30
30
InvalidArgumentError ,
31
31
} from "@nats-io/nats-core/internal" ;
32
32
import type {
33
+ ConsumerApiOptions ,
33
34
ConsumerConfig ,
34
35
ConsumerInfo ,
35
36
ConsumerListResponse ,
@@ -55,10 +56,14 @@ export class ConsumerAPIImpl extends BaseApiClientImpl implements ConsumerAPI {
55
56
async add (
56
57
stream : string ,
57
58
cfg : ConsumerConfig ,
58
- action : ConsumerApiAction = ConsumerApiAction . Create ,
59
+ opts : ConsumerApiOptions = ConsumerApiAction . Create ,
59
60
) : Promise < ConsumerInfo > {
60
61
validateStreamName ( stream ) ;
61
62
63
+ if ( typeof opts === "string" ) {
64
+ opts = { action : opts as ConsumerApiAction } ;
65
+ }
66
+
62
67
if ( cfg . deliver_group && cfg . flow_control ) {
63
68
throw InvalidArgumentError . format (
64
69
[ "flow_control" , "deliver_group" ] ,
@@ -89,7 +94,8 @@ export class ConsumerAPIImpl extends BaseApiClientImpl implements ConsumerAPI {
89
94
const cr = { } as CreateConsumerRequest ;
90
95
cr . config = cfg ;
91
96
cr . stream_name = stream ;
92
- cr . action = action ;
97
+ cr . action = opts . action ;
98
+ cr . pedantic = opts . pedantic ;
93
99
94
100
if ( cr . config . durable_name ) {
95
101
validateDurableName ( cr . config . durable_name ) ;
Original file line number Diff line number Diff line change @@ -23,6 +23,7 @@ import type {
23
23
} from "@nats-io/nats-core/internal" ;
24
24
25
25
import type {
26
+ ConsumerApiOptions ,
26
27
DeliverPolicy ,
27
28
DirectLastFor ,
28
29
PullOptions ,
@@ -271,7 +272,11 @@ export type ConsumerAPI = {
271
272
* @param stream
272
273
* @param cfg
273
274
*/
274
- add ( stream : string , cfg : Partial < ConsumerConfig > ) : Promise < ConsumerInfo > ;
275
+ add (
276
+ stream : string ,
277
+ cfg : Partial < ConsumerConfig > ,
278
+ opts ?: ConsumerApiOptions ,
279
+ ) : Promise < ConsumerInfo > ;
275
280
276
281
/**
277
282
* Updates the consumer configuration for the specified consumer on the specified
Original file line number Diff line number Diff line change @@ -2599,6 +2599,40 @@ Deno.test("jsm - pause/unpause", async () => {
2599
2599
await cleanup ( ns , nc ) ;
2600
2600
} ) ;
2601
2601
2602
+ Deno . test ( "jsm - consumer pedantic" , async ( ) => {
2603
+ const { ns, nc } = await setup ( jetstreamServerConf ( ) ) ;
2604
+ const jsm = await jetstreamManager ( nc ) ;
2605
+ await jsm . streams . add ( {
2606
+ name : "A" ,
2607
+ subjects : [ "a" ] ,
2608
+ storage : StorageType . Memory ,
2609
+ consumer_limits : {
2610
+ max_ack_pending : 10 ,
2611
+ } ,
2612
+ } ) ;
2613
+
2614
+ // this should work
2615
+ await jsm . consumers . add ( "A" , {
2616
+ name : "a" ,
2617
+ ack_policy : AckPolicy . Explicit ,
2618
+ } ) ;
2619
+
2620
+ // but this should reject
2621
+ await assertRejects (
2622
+ ( ) => {
2623
+ return jsm . consumers . add ( "A" , {
2624
+ name : "b" ,
2625
+ ack_policy : AckPolicy . Explicit ,
2626
+ max_ack_pending : 0 ,
2627
+ } , { pedantic : true } ) ;
2628
+ } ,
2629
+ Error ,
2630
+ "pedantic mode: max_ack_pending must be set if it's configured in stream limits" ,
2631
+ ) ;
2632
+
2633
+ await cleanup ( ns , nc ) ;
2634
+ } ) ;
2635
+
2602
2636
Deno . test ( "jsm - storage" , async ( ) => {
2603
2637
const { ns, nc } = await setup ( jetstreamServerConf ( ) ) ;
2604
2638
You can’t perform that action at this time.
0 commit comments