Skip to content

Issue- Python kafka producer to send the data using the kerberos authentication. #1934

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
narasimha-reddy-kr opened this issue Mar 6, 2025 · 0 comments

Comments

@narasimha-reddy-kr
Copy link

narasimha-reddy-kr commented Mar 6, 2025

Below is the sample program, we have everything working with Java and tested the principal and keytab able to produce the message but through kafka with below pattern noticing the

%7|1741244952.688|CONTROLLERID|rdkafka#producer-1| [thrd:main]: sasl_plaintext://{server:port}/bootstrap: ControllerId update -1 -> 1
Message delivery failed: KafkaError{code=TOPIC_AUTHORIZATION_FAILED,val=29,str="Broker: Topic authorization failed"}

import os
import logging
from confluent_kafka import Producer

# Enable debug logging
logging.basicConfig(level=logging.DEBUG)

os.environ['KRB5_CONFIG'] = './config/krb5.conf'
KAFKA_BROKER = "server:port"
TOPIC = "TEST"
KEYTAB_PATH = "./certs/my.keytab"
PRINCIPAL = "[email protected]"

conf = {
    'bootstrap.servers': KAFKA_BROKER,
    'security.protocol': 'SASL_PLAINTEXT',
    'sasl.mechanisms': 'GSSAPI',
    'sasl.kerberos.service.name': 'kafka',
    'sasl.kerberos.principal': PRINCIPAL,
    'sasl.kerberos.keytab': KEYTAB_PATH,
    'debug': 'security,broker,protocol'
}



def delivery_report(err, msg):
    if err is not None:
        print(f"Message delivery failed: {err}")
    else:
        print(f"Message delivered to {msg.topic()} [{msg.partition()}]")

def processKafka():

    print("started the processTwoTierKafka")
    producer = Producer(conf)
    print("Procducer", producer)
    message = 'Hello, Kafka validation with kerberos authentication!'
    try:
        producer.produce(TOPIC, value=message.encode('utf-8'), callback=delivery_report)
        producer.flush()
    except Exception as e:
        print(f'An error occurred: {e}')`

Here is the trace log.

> 
started the processKafka
%7|1741244944.818|SASL|rdkafka#producer-1| [thrd:app]: Selected provider Cyrus for SASL mechanism GSSAPI
%7|1741244944.819|SASLREFRESH|rdkafka#producer-1| [thrd:main]: Refreshing Kerberos ticket with command: kinit -R -t "./certs/my.keytab" -k [email protected] || kinit -t "./certs/dev.keytab" -k [email protected]
%7|1741244944.819|BRKMAIN|rdkafka#producer-1| [thrd::0/internal]: :0/internal: Enter main broker thread
%7|1741244944.819|BROKER|rdkafka#producer-1| [thrd:app]: sasl_plaintext://{server:port}/bootstrap: Added new broker with NodeId -1
%7|1741244944.819|CONNECT|rdkafka#producer-1| [thrd:app]: sasl_plaintext://{server:port}/bootstrap: Selected for cluster connection: bootstrap servers added (broker has 0 connection attempt(s))
%7|1741244944.819|BRKMAIN|rdkafka#producer-1| [thrd:sasl_plaintext://{server:port}/bootstrap]: sasl_plaintext://{server:port}/bootstrap: Enter main broker thread
%7|1741244944.819|INIT|rdkafka#producer-1| [thrd:app]: librdkafka v2.8.0 (0x20800ff) rdkafka#producer-1 initialized (builtin.features gzip,snappy,ssl,sasl,regex,lz4,sasl_gssapi,sasl_plain,sasl_scram,plugins,zstd,sasl_oauthbearer,http,oidc, STRIP STATIC_LINKING GCC GXX PKGCONFIG OSXLD LIBDL PLUGINS ZLIB SSL SASL_CYRUS ZSTD CURL HDRHISTOGRAM SYSLOG SNAPPY SOCKEM SASL_SCRAM SASL_OAUTHBEARER OAUTHBEARER_OIDC CRC32C_HW, debug 0x282)
%7|1741244944.819|CONNECT|rdkafka#producer-1| [thrd:sasl_plaintext://{server:port}/bootstrap]: sasl_plaintext://{server:port}/bootstrap: Received CONNECT op
%7|1741244944.820|STATE|rdkafka#producer-1| [thrd:sasl_plaintext://{server:port}/bootstrap]: sasl_plaintext://{server:port}/bootstrap: Broker changed state INIT -> TRY_CONNECT
Procducer <cimpl.Producer object at 0x106e7c5c0>
kinit: krb5_get_kdc_cred: KDC can't fulfill requested option
%7|1741244948.880|SASLREFRESH|rdkafka#producer-1| [thrd:main]: First kinit command finished: waking up broker threads
%7|1741244948.880|WAKEUP|rdkafka#producer-1| [thrd:main]: Wake-up sent to 1 broker thread in state >= INIT: Kerberos ticket refresh
%7|1741244948.880|SASLREFRESH|rdkafka#producer-1| [thrd:main]: Kerberos ticket refreshed in 4061ms
%7|1741244948.880|CONNECT|rdkafka#producer-1| [thrd:sasl_plaintext://{server:port}/bootstrap]: sasl_plaintext://{server:port}/bootstrap: broker in state TRY_CONNECT connecting
%7|1741244948.880|CONNECT|rdkafka#producer-1| [thrd:main]: Cluster connection already in progress: refresh unavailable topics
%7|1741244948.880|STATE|rdkafka#producer-1| [thrd:sasl_plaintext://{server:port}/bootstrap]: sasl_plaintext://{server:port}/bootstrap: Broker changed state TRY_CONNECT -> CONNECT
%7|1741244948.880|CONNECT|rdkafka#producer-1| [thrd:main]: Not selecting any broker for cluster connection: still suppressed for 49ms: no cluster connection
%7|1741244948.963|CONNECT|rdkafka#producer-1| [thrd:sasl_plaintext://{server:port}/bootstrap]: sasl_plaintext://{server:port}/bootstrap: Connecting to ipv4#{ipaddress:port} (sasl_plaintext) with socket 9
%7|1741244948.996|CONNECT|rdkafka#producer-1| [thrd:sasl_plaintext://{server:port}/bootstrap]: sasl_plaintext://{server:port}/bootstrap: Connected to ipv4#{ipaddress:port}
%7|1741244948.996|CONNECTED|rdkafka#producer-1| [thrd:sasl_plaintext://{server:port}/bootstrap]: sasl_plaintext://{server:port}/bootstrap: Connected (#1)
%7|1741244948.996|FEATURE|rdkafka#producer-1| [thrd:sasl_plaintext://{server:port}/bootstrap]: sasl_plaintext://{server:port}/bootstrap: Updated enabled protocol features +ApiVersion to ApiVersion
%7|1741244948.996|STATE|rdkafka#producer-1| [thrd:sasl_plaintext://{server:port}/bootstrap]: sasl_plaintext://{server:port}/bootstrap: Broker changed state CONNECT -> APIVERSION_QUERY
%7|1741244948.996|SEND|rdkafka#producer-1| [thrd:sasl_plaintext://{server:port}/bootstrap]: sasl_plaintext://{server:port}/bootstrap: Sent ApiVersionRequest (v3, 66 bytes @ 0, CorrId 1)
%7|1741244949.210|RECV|rdkafka#producer-1| [thrd:sasl_plaintext://{server:port}/bootstrap]: sasl_plaintext://{server:port}/bootstrap: Received ApiVersionResponse (v3, 599 bytes, CorrId 1, rtt 213.79ms)
%7|1741244949.210|FEATURE|rdkafka#producer-1| [thrd:sasl_plaintext://{server:port}/bootstrap]: sasl_plaintext://{server:port}/bootstrap: Updated enabled protocol features to MsgVer1,ApiVersion,BrokerBalancedConsumer,ThrottleTime,Sasl,SaslHandshake,BrokerGroupCoordinator,LZ4,OffsetTime,MsgVer2,IdempotentProducer,ZSTD,SaslAuthReq
%7|1741244949.210|AUTH|rdkafka#producer-1| [thrd:sasl_plaintext://{server:port}/bootstrap]: sasl_plaintext://{server:port}/bootstrap: Auth in state APIVERSION_QUERY (handshake supported)
%7|1741244949.210|STATE|rdkafka#producer-1| [thrd:sasl_plaintext://{server:port}/bootstrap]: sasl_plaintext://{server:port}/bootstrap: Broker changed state APIVERSION_QUERY -> AUTH_HANDSHAKE
%7|1741244949.211|SEND|rdkafka#producer-1| [thrd:sasl_plaintext://{server:port}/bootstrap]: sasl_plaintext://{server:port}/bootstrap: Sent SaslHandshakeRequest (v1, 29 bytes @ 0, CorrId 2)
%7|1741244949.331|RECV|rdkafka#producer-1| [thrd:sasl_plaintext://{server:port}/bootstrap]: sasl_plaintext://{server:port}/bootstrap: Received SaslHandshakeResponse (v1, 14 bytes, CorrId 2, rtt 120.38ms)
%7|1741244949.331|SASLMECHS|rdkafka#producer-1| [thrd:sasl_plaintext://{server:port}/bootstrap]: sasl_plaintext://{server:port}/bootstrap: Broker supported SASL mechanisms: GSSAPI
%7|1741244949.331|AUTH|rdkafka#producer-1| [thrd:sasl_plaintext://{server:port}/bootstrap]: sasl_plaintext://{server:port}/bootstrap: Auth in state AUTH_HANDSHAKE (handshake supported)
%7|1741244949.331|STATE|rdkafka#producer-1| [thrd:sasl_plaintext://{server:port}/bootstrap]: sasl_plaintext://{server:port}/bootstrap: Broker changed state AUTH_HANDSHAKE -> AUTH_REQ
%7|1741244949.331|SASL|rdkafka#producer-1| [thrd:sasl_plaintext://{server:port}/bootstrap]: sasl_plaintext://{server:port}/bootstrap: Initializing SASL client: service name kafka, hostname server, mechanisms GSSAPI, provider Cyrus
%7|1741244949.884|CONNECT|rdkafka#producer-1| [thrd:main]: Cluster connection already in progress: refresh unavailable topics
%7|1741244949.895|CONNECT|rdkafka#producer-1| [thrd:main]: Not selecting any broker for cluster connection: still suppressed for 39ms: no cluster connection
%7|1741244950.850|SASL|rdkafka#producer-1| [thrd:sasl_plaintext://{server:port}/bootstrap]: sasl_plaintext://{server:port}/bootstrap: My supported SASL mechanisms: SRP GSSAPI GSSAPI DIGEST-MD5 WEBDAV-DIGEST EXTERNAL SMB-NTLMv2 DHX MS-CHAPv2 NTLM CRAM-MD5 APOP PLAIN-CLIENTTOKEN ATOKEN OAUTHBEARER PLAIN LOGIN XOAUTH2 ANONYMOUS
%7|1741244950.897|CONNECT|rdkafka#producer-1| [thrd:main]: Cluster connection already in progress: refresh unavailable topics
%7|1741244950.897|CONNECT|rdkafka#producer-1| [thrd:main]: Not selecting any broker for cluster connection: still suppressed for 49ms: no cluster connection
%7|1741244951.899|CONNECT|rdkafka#producer-1| [thrd:main]: Cluster connection already in progress: refresh unavailable topics
%7|1741244951.899|CONNECT|rdkafka#producer-1| [thrd:main]: Not selecting any broker for cluster connection: still suppressed for 49ms: no cluster connection
%7|1741244952.187|SASL|rdkafka#producer-1| [thrd:sasl_plaintext://{server:port}/bootstrap]: sasl_plaintext://{server:port}/bootstrap: Send SASL Kafka frame to broker (3011 bytes)
%7|1741244952.187|SEND|rdkafka#producer-1| [thrd:sasl_plaintext://{server:port}/bootstrap]: sasl_plaintext://{server:port}/bootstrap: Sent SaslAuthenticateRequest (v1, 3036 bytes @ 0, CorrId 3)
%7|1741244952.434|RECV|rdkafka#producer-1| [thrd:sasl_plaintext://{server:port}/bootstrap]: sasl_plaintext://{server:port}/bootstrap: Received SaslAuthenticateResponse (v1, 124 bytes, CorrId 3, rtt 247.20ms)
%7|1741244952.434|SASL|rdkafka#producer-1| [thrd:sasl_plaintext://{server:port}/bootstrap]: sasl_plaintext://{server:port}/bootstrap: Received SASL frame from broker (108 bytes)
%7|1741244952.435|LIBSASL|rdkafka#producer-1| [thrd:sasl_plaintext://{server:port}/bootstrap]: sasl_plaintext://{server:port}/bootstrap: CB_CANON: flags 0x3, "[email protected]" @ "(null)": returning "[email protected]"
%7|1741244952.435|SASL|rdkafka#producer-1| [thrd:sasl_plaintext://{server:port}/bootstrap]: sasl_plaintext://{server:port}/bootstrap: Send SASL Kafka frame to broker (0 bytes)
%7|1741244952.435|SEND|rdkafka#producer-1| [thrd:sasl_plaintext://{server:port}/bootstrap]: sasl_plaintext://{server:port}/bootstrap: Sent SaslAuthenticateRequest (v1, 25 bytes @ 0, CorrId 4)
%7|1741244952.568|RECV|rdkafka#producer-1| [thrd:sasl_plaintext://{server:port}/bootstrap]: sasl_plaintext://{server:port}/bootstrap: Received SaslAuthenticateResponse (v1, 48 bytes, CorrId 4, rtt 132.93ms)
%7|1741244952.568|SASL|rdkafka#producer-1| [thrd:sasl_plaintext://{server:port}/bootstrap]: sasl_plaintext://{server:port}/bootstrap: Received SASL frame from broker (32 bytes)
%7|1741244952.569|SASL|rdkafka#producer-1| [thrd:sasl_plaintext://{server:port}/bootstrap]: sasl_plaintext://{server:port}/bootstrap: Send SASL Kafka frame to broker (32 bytes)
%7|1741244952.569|SASL|rdkafka#producer-1| [thrd:sasl_plaintext://{server:port}/bootstrap]: sasl_plaintext://{server:port}/bootstrap: GSSAPI authentication complete but awaiting final response from broker
%7|1741244952.569|SEND|rdkafka#producer-1| [thrd:sasl_plaintext://{server:port}/bootstrap]: sasl_plaintext://{server:port}/bootstrap: Sent SaslAuthenticateRequest (v1, 57 bytes @ 0, CorrId 5)
%7|1741244952.603|RECV|rdkafka#producer-1| [thrd:sasl_plaintext://{server:port}/bootstrap]: sasl_plaintext://{server:port}/bootstrap: Received SaslAuthenticateResponse (v1, 16 bytes, CorrId 5, rtt 34.15ms)
%7|1741244952.603|SASL|rdkafka#producer-1| [thrd:sasl_plaintext://{server:port}/bootstrap]: sasl_plaintext://{server:port}/bootstrap: Received SASL frame from broker (0 bytes)
%7|1741244952.603|SASL|rdkafka#producer-1| [thrd:sasl_plaintext://{server:port}/bootstrap]: sasl_plaintext://{server:port}/bootstrap: Authenticated as [email protected] using GSSAPI (gssapiv2)
%7|1741244952.603|STATE|rdkafka#producer-1| [thrd:sasl_plaintext://{server:port}/bootstrap]: sasl_plaintext://{server:port}/bootstrap: Broker changed state AUTH_REQ -> UP
%7|1741244952.604|SEND|rdkafka#producer-1| [thrd:sasl_plaintext://{server:port}/bootstrap]: sasl_plaintext://{server:port}/bootstrap: Sent MetadataRequest (v12, 65 bytes @ 0, CorrId 6)
%7|1741244952.687|RECV|rdkafka#producer-1| [thrd:sasl_plaintext://{server:port}/bootstrap]: sasl_plaintext://{server:port}/bootstrap: Received MetadataResponse (v12, 115 bytes, CorrId 6, rtt 83.23ms)
%7|1741244952.688|BROKER|rdkafka#producer-1| [thrd:main]: sasl_plaintext://{server:port}/1: Added new broker with NodeId 1
%7|1741244952.688|BRKMAIN|rdkafka#producer-1| [thrd:sasl_plaintext://{server:port}/1]: sasl_plaintext://{server:port}/1: Enter main broker thread
%7|1741244952.688|CLUSTERID|rdkafka#producer-1| [thrd:main]: sasl_plaintext://{server:port}/bootstrap: ClusterId update "" -> "1rVMr9jnSkaob2dlVhMzhQ"
%7|1741244952.688|CONTROLLERID|rdkafka#producer-1| [thrd:main]: sasl_plaintext://{server:port}/bootstrap: ControllerId update -1 -> 1
Message delivery failed: KafkaError{code=TOPIC_AUTHORIZATION_FAILED,val=29,str="Broker: Topic authorization failed"}
%7|1741244952.689|DESTROY|rdkafka#producer-1| [thrd:app]: Terminating instance (destroy flags none (0x0))
%7|1741244952.702|DESTROY|rdkafka#producer-1| [thrd:main]: Destroy internal
%7|1741244952.702|DESTROY|rdkafka#producer-1| [thrd:main]: Removing all topics
%7|1741244952.702|DESTROY|rdkafka#producer-1| [thrd:main]: Sending TERMINATE to sasl_plaintext://{server:port}/1
%7|1741244952.702|DESTROY|rdkafka#producer-1| [thrd:main]: Sending TERMINATE to sasl_plaintext://{server:port}/bootstrap
%7|1741244952.702|TERM|rdkafka#producer-1| [thrd:sasl_plaintext://{server:port}/1]: sasl_plaintext://{server:port}/1: Received TERMINATE op in state INIT: 2 refcnts, 0 toppar(s), 0 active toppar(s), 0 outbufs, 0 waitresps, 0 retrybufs
%7|1741244952.702|FAIL|rdkafka#producer-1| [thrd:sasl_plaintext://{server:port}/1]: sasl_plaintext://{server:port}/1: Client is terminating (after 14ms in state INIT) (_DESTROY)
%7|1741244952.702|STATE|rdkafka#producer-1| [thrd:sasl_plaintext://{server:port}/1]: sasl_plaintext://{server:port}/1: Broker changed state INIT -> DOWN
%7|1741244952.704|BRKTERM|rdkafka#producer-1| [thrd:sasl_plaintext://{server:port}/1]: sasl_plaintext://{server:port}/1: terminating: broker still has 2 refcnt(s), 0 buffer(s), 0 partition(s)
%7|1741244952.704|TERMINATE|rdkafka#producer-1| [thrd:sasl_plaintext://{server:port}/1]: sasl_plaintext://{server:port}/1: Handle is terminating in state DOWN: 1 refcnts (0x7f928380be40), 0 toppar(s), 0 active toppar(s), 0 outbufs, 0 waitresps, 0 retrybufs: failed 0 request(s) in retry+outbuf
%7|1741244952.704|FAIL|rdkafka#producer-1| [thrd:sasl_plaintext://{server:port}/1]: sasl_plaintext://{server:port}/1: Broker handle is terminating (after 0ms in state DOWN) (_DESTROY)
%7|1741244952.702|TERM|rdkafka#producer-1| [thrd:sasl_plaintext://{server:port}/bootstrap]: sasl_plaintext://{server:port}/bootstrap: Received TERMINATE op in state UP: 2 refcnts, 0 toppar(s), 0 active toppar(s), 0 outbufs, 0 waitresps, 0 retrybufs
%7|1741244952.703|TERM|rdkafka#producer-1| [thrd::0/internal]: :0/internal: Received TERMINATE op in state INIT: 2 refcnts, 0 toppar(s), 0 active toppar(s), 0 outbufs, 0 waitresps, 0 retrybufs
%7|1741244952.705|FAIL|rdkafka#producer-1| [thrd:sasl_plaintext://{server:port}/bootstrap]: sasl_plaintext://{server:port}/bootstrap: Client is terminating (after 101ms in state UP) (_DESTROY)
%7|1741244952.705|STATE|rdkafka#producer-1| [thrd:sasl_plaintext://{server:port}/bootstrap]: sasl_plaintext://{server:port}/bootstrap: Broker changed state UP -> DOWN
%7|1741244952.705|BRKTERM|rdkafka#producer-1| [thrd:sasl_plaintext://{server:port}/bootstrap]: sasl_plaintext://{server:port}/bootstrap: terminating: broker still has 2 refcnt(s), 0 buffer(s), 0 partition(s)
%7|1741244952.705|TERMINATE|rdkafka#producer-1| [thrd:sasl_plaintext://{server:port}/bootstrap]: sasl_plaintext://{server:port}/bootstrap: Handle is terminating in state DOWN: 1 refcnts (0x7f928107a440), 0 toppar(s), 0 active toppar(s), 0 outbufs, 0 waitresps, 0 retrybufs: failed 0 request(s) in retry+outbuf
%7|1741244952.705|FAIL|rdkafka#producer-1| [thrd:sasl_plaintext://{server:port}/bootstrap]: sasl_plaintext://{server:port}/bootstrap: Broker handle is terminating (after 0ms in state DOWN) (_DESTROY)
%7|1741244952.705|FAIL|rdkafka#producer-1| [thrd::0/internal]: :0/internal: Client is terminating (after 7886ms in state INIT) (_DESTROY)
%7|1741244952.706|STATE|rdkafka#producer-1| [thrd::0/internal]: :0/internal: Broker changed state INIT -> DOWN
%7|1741244952.706|BRKTERM|rdkafka#producer-1| [thrd::0/internal]: :0/internal: terminating: broker still has 2 refcnt(s), 0 buffer(s), 0 partition(s)
%7|1741244952.706|TERMINATE|rdkafka#producer-1| [thrd::0/internal]: :0/internal: Handle is terminating in state DOWN: 1 refcnts (0x7f927e87c240), 0 toppar(s), 0 active toppar(s), 0 outbufs, 0 waitresps, 0 retrybufs: failed 0 request(s) in retry+outbuf
%7|1741244952.706|FAIL|rdkafka#producer-1| [thrd::0/internal]: :0/internal: Broker handle is terminating (after 0ms in state DOWN) (_DESTROY)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant