Skip to content

[HELP] - Kafka Producer IAM Role Authentication Fails After Idle Period with MSK #1960

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
Sivakajan-Galaxy opened this issue Apr 4, 2025 · 0 comments

Comments

@Sivakajan-Galaxy
Copy link

I need to create a Kafka producer using the confluent_kafka.Producer that works with AWS MSK using IAM role-based authentication. To achieve this, I have developed a working solution as outlined below.

import json
import logging
from confluent_kafka import Producer
from config.settings import KAFKA_BOOTSTRAP_SERVERS, REGION
from aws_msk_iam_sasl_signer import MSKAuthTokenProvider
import socket

logger = logging.getLogger(__name__)


class KafkaProducerService:
    def __init__(self):
        self.producer = Producer(
            {
                "bootstrap.servers": KAFKA_BOOTSTRAP_SERVERS,
                "acks": "all",
                "security.protocol": "SASL_SSL",
                "sasl.mechanism": "OAUTHBEARER",
                "oauth_cb": self.oauth_cb,
                "client.id": socket.gethostname()
            }
        )

    def oauth_cb(self, oauth_config):
        auth_token, exp_ms = MSKAuthTokenProvider.generate_auth_token(REGION)
        return auth_token, exp_ms/1000

    def produce_message(self, topic, message):
        try:
            self.producer.produce(topic=topic, value=json.dumps(message).encode("utf-8"))
            self.producer.flush()
            self.producer.poll(10)
            logger.info(f"Produced message to {topic}: {message}")
        except Exception as e:
            logger.error(f"Failed to send message to {topic}: {str(e)}")
            raise


kafka_producer_service = KafkaProducerService()

The producer is able to establish a healthy connection and successfully send messages to the consumer. However, if the connection remains idle for a few minutes, it starts logging MSK-related connection errors repeatedly.

|1743698633.362|FAIL|api-service-29457134-mr5tm#producer-2| [thrd:sasl_ssl://b-1.*******devmsk.dc43-1948-12.kafka.us-east-1.a]: sasl_ssl://b-1.********devmsk.dc256562362.kafka.us-east-1.amazonaws.com:9098/1: SASL authentication error: [20234d-243-49fb-92e8-4]2324234: Access denied (after 235ms in state AUTH_REQ)

The error logs keep appearing repeatedly while the connection is idle. However, as soon as a new message is sent, it reaches the consumer successfully and the error logs stop. Once the connection becomes idle again, the same errors start appearing after a short timeout.

Can anyone help me to solve this, and able to give a best solution for the case.

@Sivakajan-Galaxy Sivakajan-Galaxy changed the title Producer get timeout and throws error repeatedly. [HELP] - Kafka Producer IAM Role Authentication Fails After Idle Period with MSK Apr 4, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant