From eebe3597bdab74d9bdbaf35aa6bd6e3d5fe8c731 Mon Sep 17 00:00:00 2001 From: Mrunal98 Date: Tue, 6 May 2025 11:26:59 +0530 Subject: [PATCH 1/5] Client support for lightweight consumer --- .../clients/consumer/ConsumerConfig.java | 5 +- .../consumer/internals/AQKafkaConsumer.java | 257 +++++++++++++++++- 2 files changed, 253 insertions(+), 9 deletions(-) diff --git a/clients/src/main/java/org/oracle/okafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/oracle/okafka/clients/consumer/ConsumerConfig.java index 7845239..d44adf7 100644 --- a/clients/src/main/java/org/oracle/okafka/clients/consumer/ConsumerConfig.java +++ b/clients/src/main/java/org/oracle/okafka/clients/consumer/ConsumerConfig.java @@ -346,7 +346,10 @@ public class ConsumerConfig extends AbstractConfig { private static final String SECURITY_PROVIDERS_DOC = SecurityConfig.SECURITY_PROVIDERS_DOC; private static final AtomicInteger CONSUMER_CLIENT_ID_SEQUENCE = new AtomicInteger(1); - + + /** oracle.consumer.lightweight */ + public static final String ORACLE_CONSUMER_LIGHTWEIGHT_CONFIG = "oracle.consumer.lightweight"; + static { CONFIG = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG, Type.LIST, diff --git a/clients/src/main/java/org/oracle/okafka/clients/consumer/internals/AQKafkaConsumer.java b/clients/src/main/java/org/oracle/okafka/clients/consumer/internals/AQKafkaConsumer.java index 53218f0..dba6e4a 100644 --- a/clients/src/main/java/org/oracle/okafka/clients/consumer/internals/AQKafkaConsumer.java +++ b/clients/src/main/java/org/oracle/okafka/clients/consumer/internals/AQKafkaConsumer.java @@ -33,6 +33,7 @@ import oracle.jdbc.OracleData; import oracle.jdbc.OracleTypes; +import oracle.jdbc.OracleArray; import oracle.jms.AQjmsBytesMessage; import oracle.jms.AQjmsConnection; import oracle.jms.AQjmsConsumer; @@ -87,6 +88,7 @@ import org.oracle.okafka.common.utils.MessageIdConverter; import org.oracle.okafka.common.utils.MessageIdConverter.OKafkaOffset; import org.apache.kafka.common.utils.Time; +import oracle.jdbc.OracleConnection; /** * This class consumes messages from AQ @@ -275,17 +277,26 @@ public ClientResponse commit(ClientRequest request) { log.debug("Commit Nodes. " + nodes.size()); for(Map.Entry> node : nodes.entrySet()) { if(node.getValue().size() > 0) { + String topic = node.getValue().get(0).topic(); TopicConsumers consumers = topicConsumersMap.get(node.getKey()); try { - log.debug("Committing now for node " + node.toString()); - TopicSession jmsSession =consumers.getSession(); - if(jmsSession != null) - { + String ltwtSub = configs.getString(ConsumerConfig.ORACLE_CONSUMER_LIGHTWEIGHT_CONFIG); + + if(!ltwtSub.equals("true")) { log.debug("Committing now for node " + node.toString()); - jmsSession.commit(); - log.debug("Commit done"); - }else { - log.info("No valid session to commit for node " + node); + TopicSession jmsSession =consumers.getSession(); + if(jmsSession != null) + { + log.debug("Committing now for node " + node.toString()); + jmsSession.commit(); + log.debug("Commit done"); + }else { + log.info("No valid session to commit for node " + node); + } + } + else{ + log.debug("Performing lightweight commit for node " + node); + commitOffsetsLightWeightSub(node.getKey(), topic, offsets); } result.put(node.getKey(), null); @@ -307,6 +318,236 @@ public ClientResponse commit(ClientRequest request) { return createCommitResponse(request, nodes, offsets, result, error); } + private void commitOffsetsLightWeightSub(Node node, String topic, Map offsets) { + + final int OFFSET_DIVISOR = 20000; + int size = offsets.size(); + int[] partitions = new int[size]; + int[] priorities = new int[size]; + long[] subshards = new long[size]; + long[] sequences = new long[size]; + + int index = 0; + for (Map.Entry offsetEntry : offsets.entrySet()) { + TopicPartition tp = offsetEntry.getKey(); + OffsetAndMetadata metadata = offsetEntry.getValue(); + partitions[index] = tp.partition() * 2; + priorities[index] = 0; + subshards[index] = metadata.offset() / OFFSET_DIVISOR; + sequences[index] = metadata.offset() % OFFSET_DIVISOR; + index++; + } + + commitSyncAll(node, topic, partitions, priorities, subshards, sequences); + } + + public void createLightWeightSub(String topic, Node node) throws SQLException { + + CallableStatement cStmt = null; + try { + Connection con = ((AQjmsSession)topicConsumersMap.get(node).getSession()).getDBConnection(); + cStmt = con.prepareCall("{call sys.dbms_aqadm.add_ltwt_subscriber(?, sys.aq$_agent(?,null,null))}"); + cStmt.setString(1, ConnectionUtils.enquote(topic)); + cStmt.setString(2, configs.getString(ConsumerConfig.GROUP_ID_CONFIG)); + cStmt.execute(); + log.debug("Lightweight subscriber created for topic: " + topic + ", node: " + node); + } + + catch(Exception ex) { + log.error("Error creating lightweight subscriber for topic: " + topic + ", node: " + node, ex); + throw new SQLException("Failed to create lightweight subscriber", ex); + } + finally { + try { + if(cStmt != null) + cStmt.close(); + } catch(Exception e) { + //do nothing + } + } + } + + public void CommitSync(Node node, String topic, int partition_id, int priority, + long subshard_id, long seq_num ) { + + CallableStatement cStmt = null; + try { + + Connection con = ((AQjmsSession)topicConsumersMap.get(node).getSession()).getDBConnection(); + String user = con.getMetaData().getUserName(); + cStmt = con.prepareCall("{call dbms_teqk.AQ$_COMMITSYNC(?, ?, ?, ?, ?, ?, ?)}"); + cStmt.setString(1, user); + cStmt.setString(2, topic); + cStmt.setString(3, configs.getString(ConsumerConfig.GROUP_ID_CONFIG)); + cStmt.setInt(4, partition_id); + cStmt.setInt(5, priority); + cStmt.setLong(6, subshard_id); + cStmt.setLong(7, seq_num); + cStmt.execute(); + log.debug("CommitSync executed successfully for topic: {}, partition: {}, subshard: {}, seq: {}", + topic, partition_id, subshard_id, seq_num); + } + + catch(Exception ex) { + log.error("Error during CommitSync for node: " + node + ", topic: " + topic, ex); + } + finally { + try { + if(cStmt != null) + cStmt.close(); + } catch(Exception e) { + //do nothing + } + } + + } + + public void commitSyncAll(Node node, String topic, int[] partition_id, int[] priority, + long[] subshard_id, long[] seq_num ) { + + CallableStatement cStmt = null; + try { + + Connection con = ((AQjmsSession)topicConsumersMap.get(node).getSession()).getDBConnection(); + + OracleConnection oracleCon = (OracleConnection) con; + String user = con.getMetaData().getUserName(); + + Array partitionArray = oracleCon.createOracleArray("DBMS_TEQK.INPUT_ARRAY_T", partition_id); + Array priorityArray = oracleCon.createOracleArray("DBMS_TEQK.INPUT_ARRAY_T", priority); + Array subshardArray = oracleCon.createOracleArray("DBMS_TEQK.INPUT_ARRAY_T", subshard_id); + Array sequenceArray = oracleCon.createOracleArray("DBMS_TEQK.INPUT_ARRAY_T", seq_num); + + cStmt = con.prepareCall("{call dbms_teqk.AQ$_COMMITSYNC_ALL(?, ?, ?, ?, ?, ?, ?)}"); + cStmt.setString(1, user); + cStmt.setString(2, topic); + cStmt.setString(3, configs.getString(ConsumerConfig.GROUP_ID_CONFIG)); + cStmt.setArray(4, partitionArray); + cStmt.setArray(5, priorityArray); + cStmt.setArray(6, subshardArray); + cStmt.setArray(7, sequenceArray); + cStmt.execute(); + log.debug("CommitSyncAll executed for topic: {}, partitions: {}", topic, partition_id.length); + } + + catch(Exception ex) { + log.error("Error in commitSyncAll for topic: " + topic + ", node: " + node, ex); + } + finally { + try { + if(cStmt != null) + cStmt.close(); + } catch(Exception e) { + //do nothing + } + } + + } + + public void lightWeightSeek(Node node, String topic, int partition_id, int priority, + long subshard_id, long seq_num ) { + + CallableStatement cStmt = null; + + try { + Connection con = ((AQjmsSession)topicConsumersMap.get(node).getSession()).getDBConnection(); + String user = con.getMetaData().getUserName(); + cStmt = con.prepareCall("{call dbms_teqk.AQ$_SEEK(?, ?, ?, ?, ?, ?, ?)}"); + cStmt.setString(1, user); + cStmt.setString(2, topic); + cStmt.setString(3, configs.getString(ConsumerConfig.GROUP_ID_CONFIG)); + cStmt.setInt(4, partition_id); + cStmt.setInt(5, priority); + cStmt.setLong(6, subshard_id); + cStmt.setLong(7, seq_num); + cStmt.execute(); + log.debug("Light weight seek executed successfully for topic: {}, partition: {}, subshard: {}, seq: {}", + topic, partition_id, subshard_id, seq_num); + } + + catch(Exception ex) { + log.error("Error in lightWeightseek for topic: " + topic + ", node: " + node, ex); + } + finally { + try { + if(cStmt != null) + cStmt.close(); + } catch(Exception e) { + //do nothing + } + } + + } + + public void lightWeightSeektoBeginning(Node node, String topic, int[] partition_id, int[] priority) { + + CallableStatement cStmt = null; + + try { + Connection con = ((AQjmsSession)topicConsumersMap.get(node).getSession()).getDBConnection(); + OracleConnection oracleCon = (OracleConnection) con; + String user = con.getMetaData().getUserName(); + Array partitionArray = oracleCon.createOracleArray("DBMS_TEQK.SEEK_INPUT_ARRAY_T", partition_id); + Array priorityArray = oracleCon.createOracleArray("DBMS_TEQK.SEEK_INPUT_ARRAY_T", priority); + cStmt = con.prepareCall("{call dbms_teqk.AQ$_SEEKTOBEGINNING(?, ?, ?, ?, ?)}"); + cStmt.setString(1, user); + cStmt.setString(2, topic); + cStmt.setString(3, configs.getString(ConsumerConfig.GROUP_ID_CONFIG)); + cStmt.setArray(4, partitionArray); + cStmt.setArray(5, priorityArray); + cStmt.execute(); + log.debug("lightWeightSeektoBeginning executed for topic: {}, partitions: {}", topic, partition_id.length); + } + + catch(Exception ex) { + log.error("Error in lightWeightSeektoBeginning for topic: " + topic + ", node: " + node, ex); + } + finally { + try { + if(cStmt != null) + cStmt.close(); + } catch(Exception e) { + //do nothing + } + } + + } + + public void lightWeightSeektoEnd(Node node, String topic, int[] partition_id, int[] priority) { + + CallableStatement cStmt = null; + + try { + Connection con = ((AQjmsSession)topicConsumersMap.get(node).getSession()).getDBConnection(); + OracleConnection oracleCon = (OracleConnection) con; + String user = con.getMetaData().getUserName(); + Array partitionArray = oracleCon.createOracleArray("DBMS_TEQK.SEEK_INPUT_ARRAY_T", partition_id); + Array priorityArray = oracleCon.createOracleArray("DBMS_TEQK.SEEK_INPUT_ARRAY_T", priority); + + cStmt = con.prepareCall("{call dbms_teqk.AQ$_SEEKTOEND(?, ?, ?, ?, ?)}"); + cStmt.setString(1, user); + cStmt.setString(2, topic); + cStmt.setString(3, configs.getString(ConsumerConfig.GROUP_ID_CONFIG)); + cStmt.setArray(4, partitionArray); + cStmt.setArray(5, priorityArray); + cStmt.execute(); + log.debug("lightWeightSeektoEnd executed for topic: {}, partitions: {}", topic, partition_id.length); + } + + catch(Exception ex) { + log.error("Error in lightWeightSeektoEnd for topic: " + topic + ", node: " + node, ex); + } + finally { + try { + if(cStmt != null) + cStmt.close(); + } catch(Exception e) { + //do nothing + } + } + + } + private ClientResponse createCommitResponse(ClientRequest request, Map> nodes, Map offsets, Map result, boolean error) { return new ClientResponse(request.makeHeader((short)1), request.callback(), request.destination(), From a61899615ca55ac2491deeb49f0c431074f2fc1a Mon Sep 17 00:00:00 2001 From: Mrunal98 Date: Mon, 12 May 2025 14:50:11 +0530 Subject: [PATCH 2/5] addressed the review comments. --- .../okafka/clients/CommonClientConfigs.java | 3 + .../clients/consumer/ConsumerConfig.java | 16 +- .../consumer/internals/AQKafkaConsumer.java | 229 ++++++++---------- 3 files changed, 115 insertions(+), 133 deletions(-) diff --git a/clients/src/main/java/org/oracle/okafka/clients/CommonClientConfigs.java b/clients/src/main/java/org/oracle/okafka/clients/CommonClientConfigs.java index 8bf80b9..ade23ca 100644 --- a/clients/src/main/java/org/oracle/okafka/clients/CommonClientConfigs.java +++ b/clients/src/main/java/org/oracle/okafka/clients/CommonClientConfigs.java @@ -58,6 +58,9 @@ public class CommonClientConfigs extends org.apache.kafka.clients.CommonClientCo public static final String ORACLE_TRANSACTIONAL_PRODUCER ="oracle.transactional.producer"; + public static final String ORACLE_CONSUMER_LIGHTWEIGHT = "oracle.consumer.lightweight"; + public static final String ORACLE_CONSUMER_LIGHTWEIGHT_DOC = "Creates a light weight subscriber"; + /* public static final String METADATA_MAX_AGE_CONFIG = "metadata.max.age.ms"; public static final String METADATA_MAX_AGE_DOC = "The period of time in milliseconds after which we force a refresh of metadata even if we haven't seen any partition leadership changes to proactively discover any new brokers or partitions."; diff --git a/clients/src/main/java/org/oracle/okafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/oracle/okafka/clients/consumer/ConsumerConfig.java index d44adf7..bc8a880 100644 --- a/clients/src/main/java/org/oracle/okafka/clients/consumer/ConsumerConfig.java +++ b/clients/src/main/java/org/oracle/okafka/clients/consumer/ConsumerConfig.java @@ -30,7 +30,6 @@ package org.oracle.okafka.clients.consumer; import org.apache.kafka.clients.ClientDnsLookup; -import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.common.IsolationLevel; import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.common.config.ConfigDef; @@ -42,6 +41,7 @@ import org.apache.kafka.common.requests.JoinGroupRequest; import org.apache.kafka.common.serialization.Deserializer; import org.oracle.okafka.common.config.SslConfigs; +import org.oracle.okafka.clients.CommonClientConfigs; import java.util.Collections; import java.util.HashMap; @@ -338,7 +338,7 @@ public class ConsumerConfig extends AbstractConfig { " broker allows for it using `auto.create.topics.enable` broker configuration. This configuration must" + " be set to `false` when using brokers older than 0.11.0"; public static final boolean DEFAULT_ALLOW_AUTO_CREATE_TOPICS = true; - + /** * security.providers */ @@ -348,7 +348,9 @@ public class ConsumerConfig extends AbstractConfig { private static final AtomicInteger CONSUMER_CLIENT_ID_SEQUENCE = new AtomicInteger(1); /** oracle.consumer.lightweight */ - public static final String ORACLE_CONSUMER_LIGHTWEIGHT_CONFIG = "oracle.consumer.lightweight"; + public static final String ORACLE_CONSUMER_LIGHTWEIGHT = "oracle.consumer.lightweight"; + public static final String ORACLE_CONSUMER_LIGHTWEIGHT_DOC = "Creates a light weight subscriber"; + static { CONFIG = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG, @@ -609,7 +611,13 @@ public class ConsumerConfig extends AbstractConfig { .define(org.oracle.okafka.clients.CommonClientConfigs.ORACLE_NET_TNS_ADMIN, ConfigDef.Type.STRING, Importance.MEDIUM, - org.oracle.okafka.clients.CommonClientConfigs.ORACLE_NET_TNS_ADMIN_DOC); + org.oracle.okafka.clients.CommonClientConfigs.ORACLE_NET_TNS_ADMIN_DOC) + .define(CommonClientConfigs.ORACLE_CONSUMER_LIGHTWEIGHT, + ConfigDef.Type.BOOLEAN, + false, + Importance.LOW, + CommonClientConfigs.ORACLE_CONSUMER_LIGHTWEIGHT_DOC) + ; } diff --git a/clients/src/main/java/org/oracle/okafka/clients/consumer/internals/AQKafkaConsumer.java b/clients/src/main/java/org/oracle/okafka/clients/consumer/internals/AQKafkaConsumer.java index dba6e4a..853d956 100644 --- a/clients/src/main/java/org/oracle/okafka/clients/consumer/internals/AQKafkaConsumer.java +++ b/clients/src/main/java/org/oracle/okafka/clients/consumer/internals/AQKafkaConsumer.java @@ -22,6 +22,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import javax.jms.JMSException; import javax.jms.Message; @@ -106,6 +107,15 @@ public final class AQKafkaConsumer extends AQClient{ private boolean skipConnectMe = false; private boolean externalConn = false; + + private static final String LTWT_COMMIT_SYNC = "{call dbms_teqk.AQ$_COMMITSYNC(?, ?, ?, ?, ?, ?, ?)}"; + private static final String LTWT_COMMIT_SYNC_ALL = "{call dbms_teqk.AQ$_COMMITSYNC_ALL(?, ?, ?, ?, ?, ?, ?)}"; + private static final String LTWT_SEEK = "{call dbms_teqk.AQ$_SEEK(?, ?, ?, ?, ?, ?, ?)}"; + private static final String LTWT_SEEK_TO_BEGINNING = "{call dbms_teqk.AQ$_SEEKTOBEGINNING(?, ?, ?, ?, ?)}"; + private static final String LTWT_SEEK_TO_END = "{call dbms_teqk.AQ$_SEEKTOEND(?, ?, ?, ?, ?)}"; + private static final String LTWT_SUB = "{call sys.dbms_aqadm.add_ltwt_subscriber(?, sys.aq$_agent(?,null,null))}"; + + private final Map> callableCacheMap = new ConcurrentHashMap<>(); public AQKafkaConsumer(LogContext logContext, ConsumerConfig configs, Time time, Metadata metadata,Metrics metrics) @@ -125,6 +135,36 @@ public void setAssignors(List _assignores ) { assignors = _assignores; } + + private CallableStatement getOrCreateCallable(Node node, String key, String sql) { + Map nodeMap = callableCacheMap.computeIfAbsent(node, n -> new ConcurrentHashMap<>()); + return nodeMap.computeIfAbsent(key, k -> { + try { + Connection con = getConnection(node); + return con.prepareCall(sql); + } catch (SQLException | JMSException e) { + throw new RuntimeException("Failed to prepare statement for " + key, e); + } + }); + } + + public void closeCallableStmt(Node node) { + Map stmts = callableCacheMap.remove(node); + if (stmts != null) { + for (CallableStatement stmt : stmts.values()) { + try { stmt.close(); } catch (Exception e) {} + } + } + } + + private String getCurrentUser(Node node) throws SQLException, JMSException { + Connection con = ((AQjmsSession) topicConsumersMap.get(node).getSession()).getDBConnection(); + return con.getMetaData().getUserName(); + } + + private Connection getConnection(Node node) throws SQLException, JMSException { + return ((AQjmsSession) topicConsumersMap.get(node).getSession()).getDBConnection(); + } public ClientResponse send(ClientRequest request) { this.selectorMetrics.requestCompletedSend(request.destination()); @@ -134,7 +174,7 @@ public ClientResponse send(ClientRequest request) { } return cr; } - + /** * Determines the type of request and calls appropriate method for handling request * @param request request to be sent @@ -280,9 +320,9 @@ public ClientResponse commit(ClientRequest request) { String topic = node.getValue().get(0).topic(); TopicConsumers consumers = topicConsumersMap.get(node.getKey()); try { - String ltwtSub = configs.getString(ConsumerConfig.ORACLE_CONSUMER_LIGHTWEIGHT_CONFIG); + Boolean ltwtSub = configs.getBoolean(ConsumerConfig.ORACLE_CONSUMER_LIGHTWEIGHT); - if(!ltwtSub.equals("true")) { + if(!ltwtSub.equals(true)) { log.debug("Committing now for node " + node.toString()); TopicSession jmsSession =consumers.getSession(); if(jmsSession != null) @@ -341,41 +381,12 @@ private void commitOffsetsLightWeightSub(Node node, String topic, Map> nodes, Map offsets, Map result, boolean error) { return new ClientResponse(request.makeHeader((short)1), request.callback(), request.destination(), @@ -1690,7 +1634,13 @@ public ClientResponse subscribe(ClientRequest request) { topicConsumersMap.put(node, new TopicConsumers(node)); } TopicConsumers consumers = topicConsumersMap.get(node); - consumers.getTopicSubscriber(topic); + + if(consumers.getlightWeightSub()) { + consumers.createLightWeightSub(topic, node); + } + else { + consumers.getTopicSubscriber(topic); + } metadata.setDBVersion(consumers.getDBVersion()); } catch(JMSException exception) { log.error("Exception during Subscribe request " + exception, exception); @@ -1804,10 +1754,12 @@ private final class TopicConsumers { private Map topicSubscribers = null; private final Node node; private String dbVersion; + private Boolean lightWeightSub; public TopicConsumers(Node node) throws JMSException { this(node, TopicSession.AUTO_ACKNOWLEDGE); } public TopicConsumers(Node node,int mode) throws JMSException { + this.node = node; conn = createTopicConnection(node); @@ -1827,9 +1779,11 @@ public TopicConsumers(Node node,int mode) throws JMSException { try { this.dbVersion = ConnectionUtils.getDBVersion(oConn); + this.lightWeightSub = configs.getBoolean(ConsumerConfig.ORACLE_CONSUMER_LIGHTWEIGHT); + }catch(Exception e) { - log.error("Exception whle fetching DB Version " + e); + log.error("Exception whle fetching DB Version and lightweight consumer config" + e); } }catch(Exception e) @@ -1893,6 +1847,19 @@ private TopicSubscriber createTopicSubscriber(String topic) throws JMSException topicSubscribers.put(topic, subscriber); return subscriber; } + + private void createLightWeightSub(String topic, Node node) { + try { + CallableStatement cStmt = getOrCreateCallable(node, "CREATE_LTWT_SUB", LTWT_SUB); + cStmt.setString(1, ConnectionUtils.enquote(topic)); + cStmt.setString(2, configs.getString(ConsumerConfig.GROUP_ID_CONFIG)); + cStmt.execute(); + log.debug("Lightweight subscriber created for topic: " + topic + ", node: " + node); + } + catch(Exception ex) { + log.error("Error creating lightweight subscriber for topic: " + topic + ", node: " + node, ex); + } + } private void refresh(Node node) throws JMSException { conn = createTopicConnection(node); @@ -1934,6 +1901,10 @@ public String getDBVersion() { return dbVersion; } + + public boolean getlightWeightSub() { + return lightWeightSub; + } } From 082b8becbf257322bc40ba435c1f5a231b3755ea Mon Sep 17 00:00:00 2001 From: Mrunal98 Date: Fri, 23 May 2025 16:00:39 +0530 Subject: [PATCH 3/5] modified seek() for lightweight subscriber. --- .../consumer/internals/AQKafkaConsumer.java | 119 ++++++++++++++---- 1 file changed, 93 insertions(+), 26 deletions(-) diff --git a/clients/src/main/java/org/oracle/okafka/clients/consumer/internals/AQKafkaConsumer.java b/clients/src/main/java/org/oracle/okafka/clients/consumer/internals/AQKafkaConsumer.java index 853d956..bc1b897 100644 --- a/clients/src/main/java/org/oracle/okafka/clients/consumer/internals/AQKafkaConsumer.java +++ b/clients/src/main/java/org/oracle/okafka/clients/consumer/internals/AQKafkaConsumer.java @@ -17,6 +17,7 @@ import java.sql.Statement; import java.sql.Types; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -136,16 +137,20 @@ public void setAssignors(List _assignores ) assignors = _assignores; } - private CallableStatement getOrCreateCallable(Node node, String key, String sql) { - Map nodeMap = callableCacheMap.computeIfAbsent(node, n -> new ConcurrentHashMap<>()); - return nodeMap.computeIfAbsent(key, k -> { - try { - Connection con = getConnection(node); - return con.prepareCall(sql); - } catch (SQLException | JMSException e) { - throw new RuntimeException("Failed to prepare statement for " + key, e); - } - }); + private CallableStatement getOrCreateCallable(Node node, String key, String sql) { + Map nodeMap = callableCacheMap.computeIfAbsent(node, n -> new ConcurrentHashMap<>()); + + CallableStatement stmt = nodeMap.get(key); + try { + if (stmt == null || stmt.isClosed()) { + Connection con = getConnection(node); + stmt = con.prepareCall(sql); + nodeMap.put(key, stmt); + } + return stmt; + } catch (SQLException | JMSException e) { + throw new RuntimeException("Failed to prepare statement for " + key, e); + } } public void closeCallableStmt(Node node) { @@ -158,7 +163,7 @@ public void closeCallableStmt(Node node) { } private String getCurrentUser(Node node) throws SQLException, JMSException { - Connection con = ((AQjmsSession) topicConsumersMap.get(node).getSession()).getDBConnection(); + Connection con = getConnection(node); return con.getMetaData().getUserName(); } @@ -429,17 +434,16 @@ public void commitSyncAll(Node node, String topic, int[] partition_id, int[] pri } } - public void lightWeightSeek(Node node, String topic, int partition_id, int priority, + public void lightWeightSeek(Node node, String topic, long partition_id, long priority, long subshard_id, long seq_num ) { - try { String user = getCurrentUser(node); CallableStatement cStmt = getOrCreateCallable(node, "SEEK", LTWT_SEEK); cStmt.setString(1, user); cStmt.setString(2, topic); cStmt.setString(3, configs.getString(ConsumerConfig.GROUP_ID_CONFIG)); - cStmt.setInt(4, partition_id); - cStmt.setInt(5, priority); + cStmt.setLong(4, partition_id); + cStmt.setLong(5, priority); cStmt.setLong(6, subshard_id); cStmt.setLong(7, seq_num); cStmt.execute(); @@ -450,8 +454,7 @@ public void lightWeightSeek(Node node, String topic, int partition_id, int prior } } - public void lightWeightSeektoBeginning(Node node, String topic, int[] partition_id, int[] priority) { - + public void lightWeightSeektoBeginning(Node node, String topic, Long[] partition_id, Long[] priority) { try { OracleConnection oracleCon = (OracleConnection) getConnection(node); String user = getCurrentUser(node); @@ -463,6 +466,10 @@ public void lightWeightSeektoBeginning(Node node, String topic, int[] partition_ cStmt.setString(3, configs.getString(ConsumerConfig.GROUP_ID_CONFIG)); cStmt.setArray(4, partitionArray); cStmt.setArray(5, priorityArray); + log.debug("lightWeightSeektoBeginning: User: {}, Topic: {}, GroupId: {}, Partition IDs: {}, Priority: {}", + user, topic, configs.getString(ConsumerConfig.GROUP_ID_CONFIG), + Arrays.toString(partition_id), Arrays.toString(priority)); + cStmt.execute(); log.debug("lightWeightSeektoBeginning executed for topic: {}, partitions: {}", topic, partition_id.length); } catch(Exception ex) { @@ -470,28 +477,81 @@ public void lightWeightSeektoBeginning(Node node, String topic, int[] partition_ } } - public void lightWeightSeektoEnd(Node node, String topic, int[] partition_id, int[] priority) { + public void lightWeightSeektoEnd(Node node, String topic, Long[] partition_id, Long[] priority) { try { OracleConnection oracleCon = (OracleConnection) getConnection(node); String user = getCurrentUser(node); Array partitionArray = oracleCon.createOracleArray("DBMS_TEQK.SEEK_INPUT_ARRAY_T", partition_id); Array priorityArray = oracleCon.createOracleArray("DBMS_TEQK.SEEK_INPUT_ARRAY_T", priority); - CallableStatement cStmt = getOrCreateCallable(node, "SEEK_TO_END", LTWT_SEEK_TO_END); cStmt.setString(1, user); cStmt.setString(2, topic); cStmt.setString(3, configs.getString(ConsumerConfig.GROUP_ID_CONFIG)); cStmt.setArray(4, partitionArray); cStmt.setArray(5, priorityArray); + log.debug("lightWeightSeektoEnd: User: {}, Topic: {}, GroupId: {}, Partition IDs: {}, Priority: {}", + user, topic, configs.getString(ConsumerConfig.GROUP_ID_CONFIG), + Arrays.toString(partition_id), Arrays.toString(priority)); + cStmt.execute(); log.debug("lightWeightSeektoEnd executed for topic: {}, partitions: {}", topic, partition_id.length); - } catch(Exception ex) { + } catch (Exception ex) { log.error("Error in lightWeightSeektoEnd for topic: " + topic + ", node: " + node, ex); } } + private void lightweightSubscriberSeek(Node node, String topic, Map offsets, Map responses) { + + List seekbeginPartitions = new ArrayList<>(); + List seekEndPartitions = new ArrayList<>(); + List seekbeginPriorities = new ArrayList<>(); + List seekEndPriorities = new ArrayList<>(); + + for (Map.Entry entry : offsets.entrySet()) { + TopicPartition tp = entry.getKey(); + long offset = entry.getValue(); + long partition = tp.partition(); + long priority = 0; + + try { + if (offset == -2L) { // Seek to beginning + seekbeginPartitions.add(2L * partition); + seekbeginPriorities.add((long) priority); + responses.put(tp, null); + continue; + } + else if (offset == -1L) { // Seek to end + seekEndPartitions.add(2L * partition); + seekEndPriorities.add((long) priority); + responses.put(tp, null); + continue; + } + else { + long subshard = offset / 20000; + long sequence = offset % 20000; + lightWeightSeek(node, topic, partition, priority, subshard, sequence); + responses.put(tp, null); + } + } catch (Exception ex) { + responses.put(tp, ex); + } + } + + if (!seekbeginPartitions.isEmpty()) { + lightWeightSeektoBeginning(node, topic, + seekbeginPartitions.toArray(new Long[0]), + seekbeginPriorities.toArray(new Long[0])); + } + + if (!seekEndPartitions.isEmpty()) { + lightWeightSeektoEnd(node, topic, + seekEndPartitions.toArray(new Long[0]), + seekEndPriorities.toArray(new Long[0])); + } + } + private ClientResponse createCommitResponse(ClientRequest request, Map> nodes, Map offsets, Map result, boolean error) { return new ClientResponse(request.makeHeader((short)1), request.callback(), request.destination(), @@ -541,7 +601,6 @@ public SeekInput() { } } - private static void validateMsgId(String msgId) throws IllegalArgumentException { if(msgId == null || msgId.length() !=32) @@ -574,7 +633,7 @@ public ClientResponse seek(ClientRequest request) { OffsetResetRequest offsetResetRequest = builder.build(); Node node = metadata.getNodeById(Integer.parseInt(request.destination())); log.debug("Destination Node: " + node); - + Map offsetResetTimestamps = offsetResetRequest.offsetResetTimestamps(); Map> offsetResetTimeStampByTopic = new HashMap>() ; for(Map.Entry offsetResetTimestamp : offsetResetTimestamps.entrySet()) { @@ -586,22 +645,29 @@ public ClientResponse seek(ClientRequest request) { } TopicConsumers consumers = topicConsumersMap.get(node); Connection con = ((AQjmsSession)consumers.getSession()).getDBConnection(); - + SeekInput[] seekInputs = null; String[] inArgs = new String[5]; int indx =0; for(Map.Entry> offsetResetTimestampOfTopic : offsetResetTimeStampByTopic.entrySet()) { String topic = offsetResetTimestampOfTopic.getKey(); inArgs[0] = "Topic: " + topic + " "; + Map partitionOffsets = offsetResetTimestampOfTopic.getValue(); + + if(consumers.lightWeightSub) { + lightweightSubscriberSeek(node, topic, partitionOffsets, responses); + continue; + } + try { if(msgIdFormat.equals("00") ) { msgIdFormat = getMsgIdFormat(con, topic); } - int inputSize = offsetResetTimestampOfTopic.getValue().entrySet().size(); + int inputSize = partitionOffsets.entrySet().size(); seekInputs = new SeekInput[inputSize]; - for(Map.Entry offsets : offsetResetTimestampOfTopic.getValue().entrySet()) { + for(Map.Entry offsets : partitionOffsets.entrySet()) { seekInputs[indx] = new SeekInput(); try { TopicPartition tp = offsets.getKey(); @@ -704,7 +770,8 @@ else if( offsets.getValue() == -1L) { request.createdTimeMs(), time.milliseconds(), false, null,null, new OffsetResetResponse(responses, null)); } - private ClientResponse unsubscribe(ClientRequest request) { + + private ClientResponse unsubscribe(ClientRequest request) { HashMap response = new HashMap<>(); for(Map.Entry topicConsumersByNode: topicConsumersMap.entrySet()) { From fd472348bd9f6409d5639f859356eede5c582ba4 Mon Sep 17 00:00:00 2001 From: Mrunal98 Date: Wed, 18 Jun 2025 14:20:22 +0530 Subject: [PATCH 4/5] addressed review comments. --- .../consumer/internals/AQKafkaConsumer.java | 76 ++++++++++++------- 1 file changed, 49 insertions(+), 27 deletions(-) diff --git a/clients/src/main/java/org/oracle/okafka/clients/consumer/internals/AQKafkaConsumer.java b/clients/src/main/java/org/oracle/okafka/clients/consumer/internals/AQKafkaConsumer.java index bc1b897..dbb8652 100644 --- a/clients/src/main/java/org/oracle/okafka/clients/consumer/internals/AQKafkaConsumer.java +++ b/clients/src/main/java/org/oracle/okafka/clients/consumer/internals/AQKafkaConsumer.java @@ -364,9 +364,7 @@ public ClientResponse commit(ClientRequest request) { } private void commitOffsetsLightWeightSub(Node node, String topic, Map offsets) { - - final int OFFSET_DIVISOR = 20000; - int size = offsets.size(); + int size = offsets.size(); int[] partitions = new int[size]; int[] priorities = new int[size]; long[] subshards = new long[size]; @@ -378,8 +376,8 @@ private void commitOffsetsLightWeightSub(Node node, String topic, Map offsets, Map responses) { - List seekbeginPartitions = new ArrayList<>(); List seekEndPartitions = new ArrayList<>(); List seekbeginPriorities = new ArrayList<>(); List seekEndPriorities = new ArrayList<>(); + List seekBeginoffs = new ArrayList<>(); + List seekEndoffs = new ArrayList<>(); + for (Map.Entry entry : offsets.entrySet()) { TopicPartition tp = entry.getKey(); long offset = entry.getValue(); @@ -519,40 +522,57 @@ private void lightweightSubscriberSeek(Node node, String topic, Map> nodes, + + private ClientResponse createCommitResponse(ClientRequest request, Map> nodes, Map offsets, Map result, boolean error) { return new ClientResponse(request.makeHeader((short)1), request.callback(), request.destination(), request.createdTimeMs(), time.milliseconds(), false, null,null, @@ -1640,6 +1660,7 @@ private void close(Node node, TopicConsumers consumers) { try { if (consumers.getConnection() != null) { ((AQjmsConnection)consumers.getConnection()).close(); + closeCallableStmt(node); } this.selectorMetrics.connectionClosed.record(); } catch(JMSException jms) { @@ -1701,14 +1722,15 @@ public ClientResponse subscribe(ClientRequest request) { topicConsumersMap.put(node, new TopicConsumers(node)); } TopicConsumers consumers = topicConsumersMap.get(node); - - if(consumers.getlightWeightSub()) { + metadata.setDBVersion(consumers.getDBVersion()); + + if(consumers.getlightWeightSub() && metadata.getDBMajorVersion() > 26) { consumers.createLightWeightSub(topic, node); } else { consumers.getTopicSubscriber(topic); } - metadata.setDBVersion(consumers.getDBVersion()); + } catch(JMSException exception) { log.error("Exception during Subscribe request " + exception, exception); log.info("Exception during Subscribe request. " + exception); From 243b13ac299f1f794aeb9c2475cd9e8a5d87d2db Mon Sep 17 00:00:00 2001 From: Mrunal98 Date: Mon, 23 Jun 2025 19:46:21 +0530 Subject: [PATCH 5/5] only allow light weight sub if db version is 26 or greater. --- .../clients/consumer/internals/AQKafkaConsumer.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/clients/src/main/java/org/oracle/okafka/clients/consumer/internals/AQKafkaConsumer.java b/clients/src/main/java/org/oracle/okafka/clients/consumer/internals/AQKafkaConsumer.java index dbb8652..ca2c13d 100644 --- a/clients/src/main/java/org/oracle/okafka/clients/consumer/internals/AQKafkaConsumer.java +++ b/clients/src/main/java/org/oracle/okafka/clients/consumer/internals/AQKafkaConsumer.java @@ -1723,14 +1723,14 @@ public ClientResponse subscribe(ClientRequest request) { } TopicConsumers consumers = topicConsumersMap.get(node); metadata.setDBVersion(consumers.getDBVersion()); - - if(consumers.getlightWeightSub() && metadata.getDBMajorVersion() > 26) { + + if(consumers.getlightWeightSub() && metadata.getDBMajorVersion() >= 26) { consumers.createLightWeightSub(topic, node); } else { consumers.getTopicSubscriber(topic); } - + } catch(JMSException exception) { log.error("Exception during Subscribe request " + exception, exception); log.info("Exception during Subscribe request. " + exception); @@ -1740,7 +1740,7 @@ public ClientResponse subscribe(ClientRequest request) { } return createSubscribeResponse(request, topic, null, false); } - + private ClientResponse createSubscribeResponse(ClientRequest request, String topic, JMSException exception, boolean disconnected) { return new ClientResponse(request.makeHeader((short)1), request.callback(), request.destination(), request.createdTimeMs(), time.milliseconds(), disconnected, null,null,