Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ephemeral JetStream consumer disconnected without any exception in IAsyncEnumerable #791

Open
ealeykin opened this issue Mar 24, 2025 · 10 comments
Assignees

Comments

@ealeykin
Copy link

ealeykin commented Mar 24, 2025

Observed behavior

Having a JetStream ephemeral consumer, iterating messages with IAsyncEnumerable. At some point noticed the stream has 0 consumers, but no exceptions were reported from background service.

Also noticed an exception was actually raised in NATS client internal background thread - it wasn't propagated to background service with IAsyncEnumerable though.

That caused the service stuck on IAsyncEnumerable without any messages being delivered.

I was thinking about slow consumer issue, but having MaxAckPending set 1.

Additionally - what is the expected behavior from client perspective for slow consumers ?

exception: System.IO.IOException: Unable to write data to the transport connection: Connection reset by peer.
 ---> System.Net.Sockets.SocketException (104): Connection reset by peer
   at System.Net.Sockets.Socket.AwaitableSocketAsyncEventArgs.CreateException(SocketError error, Boolean forAsyncThrow)
   at System.Net.Sockets.NetworkStream.WriteAsync(ReadOnlyMemory`1 buffer, CancellationToken cancellationToken)
   at System.Net.Security.SslStream.WriteSingleChunk[TIOAdapter](ReadOnlyMemory`1 buffer, CancellationToken cancellationToken)
   at System.Net.Security.SslStream.WriteAsyncInternal[TIOAdapter](ReadOnlyMemory`1 buffer, CancellationToken cancellationToken)
   at System.Runtime.CompilerServices.AsyncMethodBuilderCore.Start[TStateMachine](TStateMachine& stateMachine)
   at NATS.Client.Core.Internal.SslStreamConnection.SendAsync(ReadOnlyMemory`1 buffer)
   at System.Runtime.CompilerServices.AsyncMethodBuilderCore.Start[TStateMachine](TStateMachine& stateMachine)
   at NATS.Client.Core.Commands.CommandWriter.ReaderLoopAsync(ILogger`1 logger, ISocketConnection connection, PipeReader pipeReader, Channel`1 channelSize, Memory`1 consolidateMem, PartialSendFailureCounter partialSendFailureCounter, CancellationToken cancellationToken)
   at System.Threading.ExecutionContext.RunFromThreadPoolDispatchLoop(Thread threadPoolThread, ExecutionContext executionContext, ContextCallback callback, Object state)
   at System.Runtime.CompilerServices.AsyncTaskMethodBuilder`1.AsyncStateMachineBox`1.MoveNext(Thread threadPoolThread)
   at System.Threading.ThreadPoolWorkQueue.Dispatch()
   at System.Threading.PortableThreadPool.WorkerThread.WorkerThreadStart()
--- End of stack trace from previous location ---

   --- End of inner exception stack trace ---
   at NATS.Client.Core.Commands.CommandWriter.ReaderLoopAsync(ILogger`1 logger, ISocketConnection connection, PipeReader pipeReader, Channel`1 channelSize, Memory`1 consolidateMem, PartialSendFailureCounter partialSendFailureCounter, CancellationToken cancellationToken)

   message: Unexpected error in send buffer reader loop
     }
     SourceContext: NATS.Client.Core.Commands.CommandWriter

Expected behavior

IAsyncEnumerable throws

Server and client version

NATS Client: 2.5.7
NATS Server: 2.10.25

Host environment

AWS ECS

Steps to reproduce

@mtmk mtmk transferred this issue from nats-io/nats-server Mar 24, 2025
@mtmk
Copy link
Member

mtmk commented Mar 24, 2025

thanks @ealeykin for the report. Could you share relevant part of your code?

@ealeykin
Copy link
Author

ealeykin commented Mar 25, 2025

@mtmk

Updated client to latest 2.7.12. The background service is pretty much like:

// Ephemeral WQ consumer
var consumer = await js.CreateOrUpdateConsumerAsync(streamName, props.Config, ct);
var messages = consumer.ConsumeAsync<TData>(cancellationToken: ct);

try {
  await foreach (var msg in messages.WithCancellation(stoppingToken))
  {
     try {
       // http request with timeout (timeout hits frequently and throw)
       // ack on success
     }
     catch(Exception e) {
        // nak on failure
     }
  }
}
catch(Exception e) {
 // never hit
}

Additionally to mentioned above - see the following exceptions in NATS client:

Exceptions except NullReferenceException are appearing in client logs from time to time and looks like after NullReferenceException - nothing is happening at all. However no exceptions propagated to Nak or IAsynEnumerable invocations.

System.NullReferenceException: Object reference not set to an instance of an object.
   at NATS.Client.Core.Commands.CommandWriter.ReaderLoopAsync(ILogger`1 logger, ISocketConnection connection, PipeReader pipeReader, Channel`1 channelSize, Memory`1 consolidateMem, PartialSendFailureCounter partialSendFailureCounter, CancellationToken cancellationToken) 
 message: Unexpected error in send buffer reader loop 
 exception: System.InvalidOperationException: Write operations are not allowed after the channel was shutdown.
   at NATS.Client.Core.Commands.CommandWriter.ReaderLoopAsync(ILogger`1 logger, ISocketConnection connection, PipeReader pipeReader, Channel`1 channelSize, Memory`1 consolidateMem, PartialSendFailureCounter partialSendFailureCounter, CancellationToken cancellationToken)
  • HttpTimeout set to 10min with 3 retries, so 30 min in total

Consumer config

consumerBuilder.Config.MaxAckPending = 1;
consumerBuilder.Config.DurableName = null;
consumerBuilder.Config.AckWait = TimeSpan.FromMinutes(30); // maybe this value should be higher than http total timeout with retries
consumerBuilder.Config.MaxDeliver = -1;

UPD: Service restart - resumes consumption.

@mtmk mtmk self-assigned this Mar 25, 2025
@mtmk
Copy link
Member

mtmk commented Mar 25, 2025

thanks @ealeykin do the exceptions happen around reconnect events?

as for slow consumers, you can detect them from NATS .NET client by subscribing to MessageDropped events on the connection and signal your application to react.

@ealeykin
Copy link
Author

ealeykin commented Mar 25, 2025

Errors in nats client caused by service restarts and are not that relevant.

The issue looks like in Ephemeral WQ consumer, when two services are creating an ephemeral consumer with the same name (during service rolling update).

var nats1 = new NatsConnection(new NatsOpts
{
    Url = "localhost",
    SerializerRegistry = new NatsJsonSerializerRegistry()
});

var nats2 = new NatsConnection(new NatsOpts
{
    Url = "localhost",
    SerializerRegistry = new NatsJsonSerializerRegistry()
});

var js1 = new NatsJSContext(nats1);
var js2 = new NatsJSContext(nats2);

await Task.WhenAll(
    Enumerable
        .Range(0, 10)
        .Select(x => js1
            .PublishAsync("$JS.EVENT.ADVISORY.CONSUMER.MAX_DELIVERIES.plk-events.notification-service-notifications", new { Value = x })
            .AsTask())
        .ToArray());

var consumer1 = await js1.CreateOrUpdateConsumerAsync("dlq", new ConsumerConfig
{
    Name = "consumer-1",
    DurableName = null,
    MaxWaiting = 10,
    FilterSubject = "dlq.*.notification-service-notifications",
});

var consumer2 = await js2.CreateOrUpdateConsumerAsync("dlq", new ConsumerConfig
{
    Name = "consumer-1",
    DurableName = null,
    MaxWaiting = 10,
    FilterSubject = "dlq.*.notification-service-notifications",
});

var m1 = consumer1.ConsumeAsync<JsonElement>();
var m2 = consumer2.ConsumeAsync<JsonElement>();

var e1 = m1.GetAsyncEnumerator();
var e2 = m2.GetAsyncEnumerator();

await e1.MoveNextAsync();
await e1.Current.AckAsync(); // will delete the message from WQ as expected

await nats1.DisposeAsync(); // Drop first client connection

await e2.MoveNextAsync(); // waits ~20sec and then starts pulling messages from WQ
await e2.Current.AckAsync(); // at this point stream info will show 0 active consumer. Won't delete the message from WQ !!!

This piece for the second consumer

await e2.MoveNextAsync(); 
await e2.Current.AckAsync();

Once nats connection 1 disposed - you will get 0! active consumers on the stream. Second consumer will still be pulling messages, but ACKing them - won't delete the message from WQ.

And it looks like that second consumer pull only some range of messages, not all available, and then stucks waiting for something (but this needs to be double checked).

@mtmk
Copy link
Member

mtmk commented Mar 25, 2025

@ealeykin could you also give me stream config please?

edit: also what's the purpose of .PublishAsync("$JS.EVENT.ADVISORY...?

@ealeykin
Copy link
Author

ealeykin commented Mar 25, 2025

This is an emulation of DLQ messages.

             Description: Dead Letter Work Queue
                Subjects: $JS.EVENT.ADVISORY.CONSUMER.MAX_DELIVERIES.*.*
                Replicas: 1
                 Storage: File

Options:

       Subject Transform: $JS.EVENT.ADVISORY.CONSUMER.MAX_DELIVERIES.*.* to dlq.$1.$2
               Retention: WorkQueue
         Acknowledgments: true
          Discard Policy: New
        Duplicate Window: 2m0s
              Direct Get: true
       Allows Msg Delete: true
            Allows Purge: true
  Allows Per-Message TTL: false
          Allows Rollups: false

Limits:

        Maximum Messages: unlimited
     Maximum Per Subject: unlimited
           Maximum Bytes: unlimited
             Maximum Age: 30d0h0m0s
    Maximum Message Size: unlimited
       Maximum Consumers: unlimited

@mtmk Switching to durable consumer fixed the issue.

So during service rollout, having that short period of time when two service are active - second ephemeral consumer goes into some state that causes the issue. And when first service/consumer shuts down - stream info updated with 0 active consumers (but the second one somewhat active and didn't report any errors.)

@mtmk
Copy link
Member

mtmk commented Mar 25, 2025

ok that sounds good. I'm able to reproduce the issue now and having a look as well.

@ealeykin to be clear you're happy using durable consumer then?

@ealeykin
Copy link
Author

Yes, I'm totally fine with a durable.

Even if this ephemeral consumer behavior is somewhat expected - it would be good to have some indication of that, logs, exceptions etc.

@robertmircea
Copy link

This is an emulation of DLQ messages.

Very interesting, curious how does it work end-to-end and what do I need to do to emulate the same behaviour?

@ealeykin
Copy link
Author

@robertmircea the code snippet in the issue description will reproduce the behaviour.

By emulation i mean manually publishing nats core system message that indicates max delivery reached on a consumer. That's it.

@ealeykin ealeykin changed the title JetStream consumer disconnected without any exception in IAsyncEnumerable Ephemeral JetStream consumer disconnected without any exception in IAsyncEnumerable Mar 29, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants