Skip to content

Jetstream / MessageConsumer not working in Kubernetes environment #6772

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
fritzfs opened this issue Apr 8, 2025 · 13 comments
Open

Jetstream / MessageConsumer not working in Kubernetes environment #6772

fritzfs opened this issue Apr 8, 2025 · 13 comments
Labels
defect Suspected defect such as a bug or regression

Comments

@fritzfs
Copy link

fritzfs commented Apr 8, 2025

Observed behavior

We're using a simplified API with MessageConsumer, following the example on GitHub. However, the consumer app is not working correctly when deployed in the Kubernetes. The app connects to the NATS and reads the stream and consumer information. The message consumer handler is not called even though LastDeliveredMessage-ConsumeSequence keeps increasing, but StreamSequence stays the same. I can restart the consumer application multiple times outside Kubernetes and the consumer will continue successfully. We're not sure how can this be related to Kubernetes, but that's the only difference in setup.

The consumer is durable, with the following settings:

    ConsumerConfiguration.Builder consumerConfigBuilder = ConsumerConfiguration.builder()
            .durable(NAME)
            .filterSubject(SUBJECTS)
            .ackPolicy(AckPolicy.Explicit);

Consumer report when consumer is deployed on Kubernetes:

Information for Consumer MARKET_DATA > 01591b40-1a79-4073-9a3f-3e84f017f31e created 2025-04-08T17:03:43+02:00
Configuration:
                Name: 01591b40-1a79-4073-9a3f-3e84f017f31e
           Pull Mode: true
      Filter Subject: echo.inbound.01591b40-1a79-4073-9a3f-3e84f017f31e
      Deliver Policy: All
          Ack Policy: Explicit
            Ack Wait: 30.00s
       Replay Policy: Instant
     Max Ack Pending: 1,000
   Max Waiting Pulls: 512
Cluster Information:
                Name: nats-exp
              Leader: nats-exp-1
State:
  Last Delivered Message: Consumer sequence: 4,000 Stream sequence: 1,000 Last delivery: 14.26s ago
  Acknowledgment Floor: Consumer sequence: 0 Stream sequence: 0
  Outstanding Acks: 1,000 out of maximum 1,000
  Redelivered Messages: 1,000
  Unprocessed Messages: 2,648
  Waiting Pulls: 6 of maximum 512

Consumer report after consumer has been stopped on Kubernetes and started outside:

Information for Consumer MARKET_DATA > 01591b40-1a79-4073-9a3f-3e84f017f31e created 2025-04-08T17:07:22+02:00
Configuration:
                Name: 01591b40-1a79-4073-9a3f-3e84f017f31e
           Pull Mode: true
      Filter Subject: echo.inbound.01591b40-1a79-4073-9a3f-3e84f017f31e
      Deliver Policy: All
          Ack Policy: Explicit
            Ack Wait: 30.00s
       Replay Policy: Instant
     Max Ack Pending: 1,000
   Max Waiting Pulls: 512
Cluster Information:
                Name: nats-exp
              Leader: nats-exp-1
State:
  Last Delivered Message: Consumer sequence: 4,104 Stream sequence: 4,104 Last delivery: 5.38s ago
  Acknowledgment Floor: Consumer sequence: 4,104 Stream sequence: 4,104 Last Ack: 3.55s ago
  Outstanding Acks: 0 out of maximum 1,000
  Redelivered Messages: 0
  Unprocessed Messages: 0
  Waiting Pulls: 0 of maximum 512

Expected behavior

MessageConsumer handler should receive messages as it does outside the Kubernetes env.

Server and client version

nats-server 2.10.26
nats.java 2.20.1

Host environment

No response

Steps to reproduce

No response

@fritzfs fritzfs added the defect Suspected defect such as a bug or regression label Apr 8, 2025
@MauriceVanVeen
Copy link
Member

Seems like messages are not properly being acknowledged. When looking at:

  Acknowledgment Floor: Consumer sequence: 0 Stream sequence: 0
  Outstanding Acks: 1,000 out of maximum 1,000
  Redelivered Messages: 1,000
  Unprocessed Messages: 2,648

You're getting redeliveries of messages and messages are not being acknowledged. Here the messages are being acknowledged:

  Acknowledgment Floor: Consumer sequence: 4,104 Stream sequence: 4,104 Last Ack: 3.55s ago
  Outstanding Acks: 0 out of maximum 1,000
  Redelivered Messages: 0
  Unprocessed Messages: 0

Can you share any steps to reproduce? What does your message handling code look like, do you ack messages?

@fritzfs
Copy link
Author

fritzfs commented Apr 8, 2025

Thanks for responding!

Yes, as you can see in the information above when I run it outside Kubernetes, they are ack-ed.

Couple more things to mention.

  • We use NATS core (note that it's core, not jetstream) to publish messages to subject echo.inbound.01591b40-1a79-4073-9a3f-3e84f017f31e from another app.
  • NATS cluster has 3 nodes.
  • Stream is created with file storage type, durable consumer and filter subject is echo.inbound.01591b40-1a79-4073-9a3f-3e84f017f31e.
  • Consumer uses NATS Jetstream to read the messages ...

Code looks like this:

   CLASS_CONSTRUCTOR(Connection natsConnection) {
            try {
                    this.jetStreamManagement = natsConnection.jetStreamManagement();
                    if (!NatsJsUtils.streamExists(jetStreamManagement, STREAM)) {
                            System.out.println("Creating stream: " + STREAM);
                            NatsJsUtils.createStream(jetStreamManagement, STREAM, StorageType.File, SUBJECTS);
                    } 
            } catch (Exception e) {
                    ...
            }
    }


    @EventListener
    public void subscribe(LoggedOnEvent event) {
            subscribe(msg -> onMessage(msg));
    }


    public Optional<MessageConsumer> subscribe(MessageHandler messageHandler) {
            ConsumerConfiguration.Builder consumerConfigBuilder = ConsumerConfiguration.builder()
                    .durable(CONSUMER)
                    .filterSubject(SUBJECTS)
                    .ackPolicy(AckPolicy.Explicit);
            try {
                    StreamContext streamContext = natsConnection.getStreamContext(STREAM);
                    if (NatsJsUtils.consumerExists(jetStreamManagement, STREAM, CONSUMER)) {
                            long streamLastSequence = streamContext.getStreamInfo().getStreamState().getLastSequence();
                            ConsumerInfo consumerInfo = streamContext.getConsumerInfo(CONSUMER);
                            long consumerLastSequence = consumerInfo.getDelivered().getConsumerSequence();
                            /*
                            * Do we have misalignment in consumer and stream? If so, delete consumer to reset back to stream seq.
                            */
                            if (consumerLastSequence > streamLastSequence) {
                                    System.out.println("Deleting consumer: " + CONSUMER);
                                    jetStreamManagement.deleteConsumer(STREAM, CONSUMER);
                            }
                    }
                    ConsumerContext consumerContext = streamContext.createOrUpdateConsumer(consumerConfigBuilder.build());
                    return Optional.of(consumerContext.consume(messageHandler));
            }
            ... 
    }


    private void onMessage(Message natsMessage) {
            try {
                    System.out.println("On Message raw " + new String(natsMessage.getData()));
                    String receivedData = new String(natsMessage.getData(), StandardCharsets.UTF_8);
                    TCSender.send( receivedData);
                    natsMessage.ack();
                    ...
            } catch (Exception e) {
                    natsMessage.nak();
                    ...
            }
    }

@MauriceVanVeen
Copy link
Member

consumerLastSequence > streamLastSequence

The consumer and stream sequence must NOT be compared. They are totally different. The stream sequence is the sequence in the stream and increases for every published message. The consumer sequence increases for every delivered message, including redeliveries. It's normal they are different and don't increase together. Should remove that bit of code that deletes the consumer based on that comparison.

Am wondering whether TCSender.send(..) blocks for you in Kubernetes for some reason. Which could explain messages not being acked? Or an exception is thrown, message is nacked, but the exception is not logged?

@fritzfs
Copy link
Author

fritzfs commented Apr 8, 2025

Thank you, I'll remove that part. I was under the impression that consumer sequence must not be greater than stream sequence.

Good point about TCSender.send(...), but for that reason, there's System.out.println(...) at the beginning of the method which isn't called (not mentioned in the logs, in case of Kubernetes deployment).

Just to remind you... if I look at NATS consumer info (report), I'll see "Last Delivered Message Consumer Sequence" increasing, but "Stream Sequence" stays the same ...

@fritzfs
Copy link
Author

fritzfs commented Apr 8, 2025

Does stream need to be created with "replicas=3" if I have a cluster of 3 replicas because I see that it's 1 by default?

One additional information. If I change the consumer side to use NATS core, everything works flawlessly (same subject, same NATS cluster) in Kubernetes. Dunno if that helps.

@MauriceVanVeen
Copy link
Member

Just to remind you... if I look at NATS consumer info (report), I'll see "Last Delivered Message Consumer Sequence" increasing, but "Stream Sequence" stays the same ...

That is expected. The consumer sequence increases for every attempted delivery for your client. You are only getting redeliveries, looking at the max ack pending being reached and the redelivery count being increased as well. No new messages are delivered, only redelivered messages, so the highest delivered stream sequence remains the same.

@MauriceVanVeen
Copy link
Member

One additional information. If I change the consumer side to use NATS core, everything works flawlessly (same subject, same NATS cluster) in Kubernetes. Dunno if that helps.

In that case you are not persisting data, though.

@MauriceVanVeen
Copy link
Member

What is different between your local setup and your Kubernetes setup? If you don't see any logs when deployed in Kubernetes, you might have some configuration differences, or different permissions?

@fritzfs
Copy link
Author

fritzfs commented Apr 8, 2025

There should be no difference.

  • Stdout is visible when looking at the Kubernetes pod, that's checked.
  • The publisher instance is the same. It's a Micronaut app deployed in Kubernetes. It writes to the NATS server.
  • The NATS server is the same. It's deployed in Kubernetes. I access literally that NATS from a local setup as well.
  • TCSender.send() uses the same TCP/IP destination and that is successfully connected from local and Kubernetes env, I see it in the logs. There is some communication using this TCSender.send() before the NATS part so that's why I know ...

@fritzfs
Copy link
Author

fritzfs commented Apr 8, 2025

Also, note that I see the publisher app logs and it's sending messages to this subject echo.inbound.01591b40-1a79-4073-9a3f-3e84f017f31e using NAT core publish (not jetstream publish).
When I explore the stream using the NUI web app, I see the messages in the stream ... it's just not triggering a call to onMessage (consume not working).

@Tauebenuss
Copy link

Hi everyone,

I'm experiencing the same issue when running under Docker.

The NATS Streamer crashes with the following exception:

[SUB-90007] No matching streams for subject.
java.lang.IllegalStateException: [SUB-90007] No matching streams for subject.
	at io.nats.client.support.NatsJetStreamClientError._instance(NatsJetStreamClientError.java:109)
	at io.nats.client.support.NatsJetStreamClientError.instance(NatsJetStreamClientError.java:98)
	at io.nats.client.impl.NatsJetStream.createSubscription(NatsJetStream.java:331)
	at io.nats.client.impl.NatsJetStream.subscribe(NatsJetStream.java:618)

As a test, I installed the same Java version on the host machine. When I run the application directly on Ubuntu, everything works fine — but inside the Docker container, it fails with the error above.

Any ideas what might be causing this?

Thanks in advance!

@MauriceVanVeen
Copy link
Member

@Tauebenuss, could you perhaps setup a small reproducible example? Something like a small app that publishes into a stream and subscribing, that works on one environment, but doesn't on the other?

@ripienaar
Copy link
Contributor

Can you show the output of nats server report jetstream? and nats stream info STREAMNAME?

Regarding publishing with nats core to a stream, you should not do that generally speaking.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
defect Suspected defect such as a bug or regression
Projects
None yet
Development

No branches or pull requests

4 participants