diff --git a/clients/src/main/java/org/oracle/okafka/clients/CommonClientConfigs.java b/clients/src/main/java/org/oracle/okafka/clients/CommonClientConfigs.java
index 8bf80b9..ade23ca 100644
--- a/clients/src/main/java/org/oracle/okafka/clients/CommonClientConfigs.java
+++ b/clients/src/main/java/org/oracle/okafka/clients/CommonClientConfigs.java
@@ -58,6 +58,9 @@ public class CommonClientConfigs extends org.apache.kafka.clients.CommonClientCo
public static final String ORACLE_TRANSACTIONAL_PRODUCER ="oracle.transactional.producer";
+ public static final String ORACLE_CONSUMER_LIGHTWEIGHT = "oracle.consumer.lightweight";
+ public static final String ORACLE_CONSUMER_LIGHTWEIGHT_DOC = "Creates a light weight subscriber";
+
/*
public static final String METADATA_MAX_AGE_CONFIG = "metadata.max.age.ms";
public static final String METADATA_MAX_AGE_DOC = "The period of time in milliseconds after which we force a refresh of metadata even if we haven't seen any partition leadership changes to proactively discover any new brokers or partitions.";
diff --git a/clients/src/main/java/org/oracle/okafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/oracle/okafka/clients/consumer/ConsumerConfig.java
index 7845239..bc8a880 100644
--- a/clients/src/main/java/org/oracle/okafka/clients/consumer/ConsumerConfig.java
+++ b/clients/src/main/java/org/oracle/okafka/clients/consumer/ConsumerConfig.java
@@ -30,7 +30,6 @@
package org.oracle.okafka.clients.consumer;
import org.apache.kafka.clients.ClientDnsLookup;
-import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
@@ -42,6 +41,7 @@
import org.apache.kafka.common.requests.JoinGroupRequest;
import org.apache.kafka.common.serialization.Deserializer;
import org.oracle.okafka.common.config.SslConfigs;
+import org.oracle.okafka.clients.CommonClientConfigs;
import java.util.Collections;
import java.util.HashMap;
@@ -338,7 +338,7 @@ public class ConsumerConfig extends AbstractConfig {
" broker allows for it using `auto.create.topics.enable` broker configuration. This configuration must" +
" be set to `false` when using brokers older than 0.11.0";
public static final boolean DEFAULT_ALLOW_AUTO_CREATE_TOPICS = true;
-
+
/**
* security.providers
*/
@@ -346,7 +346,12 @@ public class ConsumerConfig extends AbstractConfig {
private static final String SECURITY_PROVIDERS_DOC = SecurityConfig.SECURITY_PROVIDERS_DOC;
private static final AtomicInteger CONSUMER_CLIENT_ID_SEQUENCE = new AtomicInteger(1);
-
+
+ /** oracle.consumer.lightweight
*/
+ public static final String ORACLE_CONSUMER_LIGHTWEIGHT = "oracle.consumer.lightweight";
+ public static final String ORACLE_CONSUMER_LIGHTWEIGHT_DOC = "Creates a light weight subscriber";
+
+
static {
CONFIG = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG,
Type.LIST,
@@ -606,7 +611,13 @@ public class ConsumerConfig extends AbstractConfig {
.define(org.oracle.okafka.clients.CommonClientConfigs.ORACLE_NET_TNS_ADMIN,
ConfigDef.Type.STRING,
Importance.MEDIUM,
- org.oracle.okafka.clients.CommonClientConfigs.ORACLE_NET_TNS_ADMIN_DOC);
+ org.oracle.okafka.clients.CommonClientConfigs.ORACLE_NET_TNS_ADMIN_DOC)
+ .define(CommonClientConfigs.ORACLE_CONSUMER_LIGHTWEIGHT,
+ ConfigDef.Type.BOOLEAN,
+ false,
+ Importance.LOW,
+ CommonClientConfigs.ORACLE_CONSUMER_LIGHTWEIGHT_DOC)
+ ;
}
diff --git a/clients/src/main/java/org/oracle/okafka/clients/consumer/internals/AQKafkaConsumer.java b/clients/src/main/java/org/oracle/okafka/clients/consumer/internals/AQKafkaConsumer.java
index a9aced7..1a1801e 100644
--- a/clients/src/main/java/org/oracle/okafka/clients/consumer/internals/AQKafkaConsumer.java
+++ b/clients/src/main/java/org/oracle/okafka/clients/consumer/internals/AQKafkaConsumer.java
@@ -17,11 +17,13 @@
import java.sql.Statement;
import java.sql.Types;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import javax.jms.JMSException;
import javax.jms.Message;
@@ -33,6 +35,7 @@
import oracle.jdbc.OracleData;
import oracle.jdbc.OracleTypes;
+import oracle.jdbc.OracleArray;
import oracle.jms.AQjmsBytesMessage;
import oracle.jms.AQjmsConnection;
import oracle.jms.AQjmsConsumer;
@@ -88,6 +91,7 @@
import org.oracle.okafka.common.utils.MessageIdConverter;
import org.oracle.okafka.common.utils.MessageIdConverter.OKafkaOffset;
import org.apache.kafka.common.utils.Time;
+import oracle.jdbc.OracleConnection;
/**
* This class consumes messages from AQ
@@ -105,6 +109,15 @@ public final class AQKafkaConsumer extends AQClient{
private boolean skipConnectMe = false;
private boolean externalConn = false;
+
+ private static final String LTWT_COMMIT_SYNC = "{call dbms_teqk.AQ$_COMMITSYNC(?, ?, ?, ?, ?, ?, ?)}";
+ private static final String LTWT_COMMIT_SYNC_ALL = "{call dbms_teqk.AQ$_COMMITSYNC_ALL(?, ?, ?, ?, ?, ?, ?)}";
+ private static final String LTWT_SEEK = "{call dbms_teqk.AQ$_SEEK(?, ?, ?, ?, ?, ?, ?)}";
+ private static final String LTWT_SEEK_TO_BEGINNING = "{call dbms_teqk.AQ$_SEEKTOBEGINNING(?, ?, ?, ?, ?)}";
+ private static final String LTWT_SEEK_TO_END = "{call dbms_teqk.AQ$_SEEKTOEND(?, ?, ?, ?, ?)}";
+ private static final String LTWT_SUB = "{call sys.dbms_aqadm.add_ltwt_subscriber(?, sys.aq$_agent(?,null,null))}";
+
+ private final Map> callableCacheMap = new ConcurrentHashMap<>();
public AQKafkaConsumer(LogContext logContext, ConsumerConfig configs, Time time, Metadata metadata,Metrics metrics)
@@ -124,6 +137,40 @@ public void setAssignors(List _assignores )
{
assignors = _assignores;
}
+
+ private CallableStatement getOrCreateCallable(Node node, String key, String sql) {
+ Map nodeMap = callableCacheMap.computeIfAbsent(node, n -> new ConcurrentHashMap<>());
+
+ CallableStatement stmt = nodeMap.get(key);
+ try {
+ if (stmt == null || stmt.isClosed()) {
+ Connection con = getConnection(node);
+ stmt = con.prepareCall(sql);
+ nodeMap.put(key, stmt);
+ }
+ return stmt;
+ } catch (SQLException | JMSException e) {
+ throw new RuntimeException("Failed to prepare statement for " + key, e);
+ }
+ }
+
+ public void closeCallableStmt(Node node) {
+ Map stmts = callableCacheMap.remove(node);
+ if (stmts != null) {
+ for (CallableStatement stmt : stmts.values()) {
+ try { stmt.close(); } catch (Exception e) {}
+ }
+ }
+ }
+
+ private String getCurrentUser(Node node) throws SQLException, JMSException {
+ Connection con = getConnection(node);
+ return con.getMetaData().getUserName();
+ }
+
+ private Connection getConnection(Node node) throws SQLException, JMSException {
+ return ((AQjmsSession) topicConsumersMap.get(node).getSession()).getDBConnection();
+ }
public ClientResponse send(ClientRequest request) {
this.selectorMetrics.requestCompletedSend(request.destination());
@@ -133,7 +180,7 @@ public ClientResponse send(ClientRequest request) {
}
return cr;
}
-
+
/**
* Determines the type of request and calls appropriate method for handling request
* @param request request to be sent
@@ -276,32 +323,37 @@ public ClientResponse commit(ClientRequest request) {
log.debug("Commit Nodes. " + nodes.size());
for(Map.Entry> node : nodes.entrySet()) {
if(node.getValue().size() > 0) {
+ String topic = node.getValue().get(0).topic();
TopicConsumers consumers = topicConsumersMap.get(node.getKey());
TopicSession jmsSession = null;
try {
- log.debug("Committing now for node " + node.toString());
- jmsSession =consumers.getSession();
- if(jmsSession != null)
- {
- log.debug("Committing now for node " + node.toString());
- jmsSession.commit();
- log.debug("Commit done");
- }else {
- log.info("No valid session to commit for node " + node);
+ log.debug("Committing now for node " + node.toString());
+ Boolean ltwtSub = configs.getBoolean(ConsumerConfig.ORACLE_CONSUMER_LIGHTWEIGHT);
+ if(!ltwtSub.equals(true)) {
+ TopicSession jmsSession =consumers.getSession();
+ if(jmsSession != null)
+ {
+ log.debug("Committing now for node " + node.toString());
+ jmsSession.commit();
+ log.debug("Commit done");
+ }else {
+ log.info("No valid session to commit for node " + node);
+ }
+ }
+ else{
+ log.debug("Performing lightweight commit for node " + node);
+ commitOffsetsLightWeightSub(node.getKey(), topic, offsets);
}
result.put(node.getKey(), null);
- } catch(JMSException exception) {
+ } catch(Exception exception) {
+ log.error("Exception from commit " + exception, exception);
error = true;
if(ConnectionUtils.isSessionClosed((AQjmsSession)jmsSession))
result.put(node.getKey(), new DisconnectException(exception.getMessage(),exception));
else
result.put(node.getKey(), exception);
- }
- catch(Exception e)
- {
- log.error("Exception from commit " + e, e);
- }
+ }
}
else {
log.info("Not Committing on Node " + node);
@@ -311,7 +363,216 @@ public ClientResponse commit(ClientRequest request) {
return createCommitResponse(request, nodes, offsets, result, error);
}
- private ClientResponse createCommitResponse(ClientRequest request, Map> nodes,
+ private void commitOffsetsLightWeightSub(Node node, String topic, Map offsets) {
+ int size = offsets.size();
+ int[] partitions = new int[size];
+ int[] priorities = new int[size];
+ long[] subshards = new long[size];
+ long[] sequences = new long[size];
+
+ int index = 0;
+ for (Map.Entry offsetEntry : offsets.entrySet()) {
+ TopicPartition tp = offsetEntry.getKey();
+ OffsetAndMetadata metadata = offsetEntry.getValue();
+ partitions[index] = tp.partition() * 2;
+ priorities[index] = 0;
+ subshards[index] = metadata.offset() / MessageIdConverter.DEFAULT_SUBPARTITION_SIZE;
+ sequences[index] = metadata.offset() % MessageIdConverter.DEFAULT_SUBPARTITION_SIZE;
+ index++;
+ }
+
+ commitSyncAll(node, topic, partitions, priorities, subshards, sequences);
+ }
+
+ public void CommitSync(Node node, String topic, int partition_id, int priority,
+ long subshard_id, long seq_num ) {
+
+ try {
+ String user = getCurrentUser(node);
+ CallableStatement cStmt = getOrCreateCallable(node, "COMMIT_SYNC", LTWT_COMMIT_SYNC);
+ cStmt.setString(1, user);
+ cStmt.setString(2, topic);
+ cStmt.setString(3, configs.getString(ConsumerConfig.GROUP_ID_CONFIG));
+ cStmt.setInt(4, partition_id);
+ cStmt.setInt(5, priority);
+ cStmt.setLong(6, subshard_id);
+ cStmt.setLong(7, seq_num);
+ cStmt.execute();
+ log.debug("Light weight CommitSync executed successfully for topic: {}, partition: {}, subshard: {}, seq: {}",
+ topic, partition_id, subshard_id, seq_num);
+ } catch(Exception ex) {
+ log.error("Error during light weight CommitSync for node: " + node + ", topic: " + topic, ex);
+ }
+ }
+
+ public void commitSyncAll(Node node, String topic, int[] partition_id, int[] priority,
+ long[] subshard_id, long[] seq_num ) {
+
+ try {
+ OracleConnection oracleCon = (OracleConnection) getConnection(node);
+ String user = getCurrentUser(node);
+
+ Array partitionArray = oracleCon.createOracleArray("DBMS_TEQK.INPUT_ARRAY_T", partition_id);
+ Array priorityArray = oracleCon.createOracleArray("DBMS_TEQK.INPUT_ARRAY_T", priority);
+ Array subshardArray = oracleCon.createOracleArray("DBMS_TEQK.INPUT_ARRAY_T", subshard_id);
+ Array sequenceArray = oracleCon.createOracleArray("DBMS_TEQK.INPUT_ARRAY_T", seq_num);
+
+ CallableStatement cStmt = getOrCreateCallable(node, "COMMIT_SYNC_ALL", LTWT_COMMIT_SYNC_ALL);
+ cStmt.setString(1, user);
+ cStmt.setString(2, topic);
+ cStmt.setString(3, configs.getString(ConsumerConfig.GROUP_ID_CONFIG));
+ cStmt.setArray(4, partitionArray);
+ cStmt.setArray(5, priorityArray);
+ cStmt.setArray(6, subshardArray);
+ cStmt.setArray(7, sequenceArray);
+ cStmt.execute();
+ log.debug("Light weight CommitSyncAll executed for topic: {}, partitions: {}", topic, partition_id.length);
+ } catch(Exception ex) {
+ log.error("Error in light weight commitSyncAll for topic: " + topic + ", node: " + node, ex);
+ }
+ }
+
+ public void lightWeightSeek(Node node, String topic, long partition_id, long priority,
+ long subshard_id, long seq_num ) throws Exception {
+ try {
+ String user = getCurrentUser(node);
+ CallableStatement cStmt = getOrCreateCallable(node, "SEEK", LTWT_SEEK);
+ cStmt.setString(1, user);
+ cStmt.setString(2, topic);
+ cStmt.setString(3, configs.getString(ConsumerConfig.GROUP_ID_CONFIG));
+ cStmt.setLong(4, partition_id);
+ cStmt.setLong(5, priority);
+ cStmt.setLong(6, subshard_id);
+ cStmt.setLong(7, seq_num);
+ cStmt.execute();
+ log.debug("Light weight seek executed successfully for topic: {}, partition: {}, subshard: {}, seq: {}",
+ topic, partition_id, subshard_id, seq_num);
+ } catch(Exception ex) {
+ log.error("Error in lightWeightseek for topic: " + topic + ", node: " + node, ex);
+ throw ex;
+ }
+ }
+
+ public void lightWeightSeektoBeginning(Node node, String topic, Long[] partition_id, Long[] priority) throws Exception {
+ try {
+ OracleConnection oracleCon = (OracleConnection) getConnection(node);
+ String user = getCurrentUser(node);
+ Array partitionArray = oracleCon.createOracleArray("DBMS_TEQK.SEEK_INPUT_ARRAY_T", partition_id);
+ Array priorityArray = oracleCon.createOracleArray("DBMS_TEQK.SEEK_INPUT_ARRAY_T", priority);
+ CallableStatement cStmt = getOrCreateCallable(node, "SEEK_TO_BEGINNING", LTWT_SEEK_TO_BEGINNING);
+ cStmt.setString(1, user);
+ cStmt.setString(2, topic);
+ cStmt.setString(3, configs.getString(ConsumerConfig.GROUP_ID_CONFIG));
+ cStmt.setArray(4, partitionArray);
+ cStmt.setArray(5, priorityArray);
+ log.debug("lightWeightSeektoBeginning: User: {}, Topic: {}, GroupId: {}, Partition IDs: {}, Priority: {}",
+ user, topic, configs.getString(ConsumerConfig.GROUP_ID_CONFIG),
+ Arrays.toString(partition_id), Arrays.toString(priority));
+
+ cStmt.execute();
+ log.debug("lightWeightSeektoBeginning executed for topic: {}, partitions: {}", topic, partition_id.length);
+ } catch(Exception ex) {
+ log.error("Error in lightWeightSeektoBeginning for topic: " + topic + ", node: " + node, ex);
+ throw ex;
+ }
+ }
+
+
+ public void lightWeightSeektoEnd(Node node, String topic, Long[] partition_id, Long[] priority) throws Exception {
+ try {
+ OracleConnection oracleCon = (OracleConnection) getConnection(node);
+ String user = getCurrentUser(node);
+ Array partitionArray = oracleCon.createOracleArray("DBMS_TEQK.SEEK_INPUT_ARRAY_T", partition_id);
+ Array priorityArray = oracleCon.createOracleArray("DBMS_TEQK.SEEK_INPUT_ARRAY_T", priority);
+ CallableStatement cStmt = getOrCreateCallable(node, "SEEK_TO_END", LTWT_SEEK_TO_END);
+ cStmt.setString(1, user);
+ cStmt.setString(2, topic);
+ cStmt.setString(3, configs.getString(ConsumerConfig.GROUP_ID_CONFIG));
+ cStmt.setArray(4, partitionArray);
+ cStmt.setArray(5, priorityArray);
+ log.debug("lightWeightSeektoEnd: User: {}, Topic: {}, GroupId: {}, Partition IDs: {}, Priority: {}",
+ user, topic, configs.getString(ConsumerConfig.GROUP_ID_CONFIG),
+ Arrays.toString(partition_id), Arrays.toString(priority));
+
+ cStmt.execute();
+ log.debug("lightWeightSeektoEnd executed for topic: {}, partitions: {}", topic, partition_id.length);
+ } catch (Exception ex) {
+ log.error("Error in lightWeightSeektoEnd for topic: " + topic + ", node: " + node, ex);
+ throw ex;
+ }
+ }
+
+
+ private void lightweightSubscriberSeek(Node node, String topic, Map offsets, Map responses) {
+ List seekbeginPartitions = new ArrayList<>();
+ List seekEndPartitions = new ArrayList<>();
+ List seekbeginPriorities = new ArrayList<>();
+ List seekEndPriorities = new ArrayList<>();
+
+ List seekBeginoffs = new ArrayList<>();
+ List seekEndoffs = new ArrayList<>();
+
+ for (Map.Entry entry : offsets.entrySet()) {
+ TopicPartition tp = entry.getKey();
+ long offset = entry.getValue();
+ long partition = tp.partition();
+ long priority = 0;
+
+ try {
+ if (offset == -2L) { // Seek to beginning
+ seekbeginPartitions.add(2L * partition);
+ seekbeginPriorities.add((long) priority);
+ seekBeginoffs.add(tp);
+ continue;
+ }
+ else if (offset == -1L) { // Seek to end
+ seekEndPartitions.add(2L * partition);
+ seekEndPriorities.add((long) priority);
+ seekEndoffs.add(tp);
+ continue;
+ }
+ else {
+ long subshard = offset / MessageIdConverter.DEFAULT_SUBPARTITION_SIZE;
+ long sequence = offset % MessageIdConverter.DEFAULT_SUBPARTITION_SIZE;
+ lightWeightSeek(node, topic, 2*partition, priority, subshard, sequence);
+ responses.put(tp, null);
+ }
+ } catch (Exception ex) {
+ responses.put(tp, ex);
+ }
+ }
+ try {
+ if (!seekbeginPartitions.isEmpty()) {
+ lightWeightSeektoBeginning(node, topic,
+ seekbeginPartitions.toArray(new Long[0]),
+ seekbeginPriorities.toArray(new Long[0]));
+ for (TopicPartition tp : seekBeginoffs) {
+ responses.put(tp, null);
+ }
+ }
+
+ if (!seekEndPartitions.isEmpty()) {
+ lightWeightSeektoEnd(node, topic,
+ seekEndPartitions.toArray(new Long[0]),
+ seekEndPriorities.toArray(new Long[0]));
+ for (TopicPartition tp : seekEndoffs) {
+ responses.put(tp, null);
+ }
+ }
+ }
+ catch (Exception e) {
+ log.error("Error in lightweightSubscriberSeek for topic: " + topic + ", node: " + node, e);
+ for (TopicPartition tp : seekBeginoffs) {
+ responses.put(tp, e);
+ }
+ for (TopicPartition tp : seekEndoffs) {
+ responses.put(tp, e);
+ }
+ }
+ }
+
+
+ private ClientResponse createCommitResponse(ClientRequest request, Map> nodes,
Map offsets, Map result, boolean error) {
return new ClientResponse(request.makeHeader((short)1), request.callback(), request.destination(),
request.createdTimeMs(), time.milliseconds(), false, null,null,
@@ -360,7 +621,6 @@ public SeekInput() {
}
}
-
private static void validateMsgId(String msgId) throws IllegalArgumentException {
if(msgId == null || msgId.length() !=32)
@@ -393,7 +653,7 @@ public ClientResponse seek(ClientRequest request) {
OffsetResetRequest offsetResetRequest = builder.build();
Node node = metadata.getNodeById(Integer.parseInt(request.destination()));
log.debug("Destination Node: " + node);
-
+
Map offsetResetTimestamps = offsetResetRequest.offsetResetTimestamps();
Map> offsetResetTimeStampByTopic = new HashMap>() ;
for(Map.Entry offsetResetTimestamp : offsetResetTimestamps.entrySet()) {
@@ -405,22 +665,29 @@ public ClientResponse seek(ClientRequest request) {
}
TopicConsumers consumers = topicConsumersMap.get(node);
Connection con = ((AQjmsSession)consumers.getSession()).getDBConnection();
-
+
SeekInput[] seekInputs = null;
String[] inArgs = new String[5];
int indx =0;
for(Map.Entry> offsetResetTimestampOfTopic : offsetResetTimeStampByTopic.entrySet()) {
String topic = offsetResetTimestampOfTopic.getKey();
inArgs[0] = "Topic: " + topic + " ";
+ Map partitionOffsets = offsetResetTimestampOfTopic.getValue();
+
+ if(consumers.lightWeightSub) {
+ lightweightSubscriberSeek(node, topic, partitionOffsets, responses);
+ continue;
+ }
+
try {
if(msgIdFormat.equals("00") ) {
msgIdFormat = getMsgIdFormat(con, topic);
}
- int inputSize = offsetResetTimestampOfTopic.getValue().entrySet().size();
+ int inputSize = partitionOffsets.entrySet().size();
seekInputs = new SeekInput[inputSize];
- for(Map.Entry offsets : offsetResetTimestampOfTopic.getValue().entrySet()) {
+ for(Map.Entry offsets : partitionOffsets.entrySet()) {
seekInputs[indx] = new SeekInput();
try {
TopicPartition tp = offsets.getKey();
@@ -529,7 +796,8 @@ else if( offsets.getValue() == -1L) {
request.createdTimeMs(), time.milliseconds(), false, null,null, new OffsetResetResponse(responses, null));
}
- private ClientResponse unsubscribe(ClientRequest request) {
+
+ private ClientResponse unsubscribe(ClientRequest request) {
HashMap response = new HashMap<>();
for(Map.Entry topicConsumersByNode: topicConsumersMap.entrySet())
{
@@ -1398,6 +1666,7 @@ private void close(Node node, TopicConsumers consumers) {
try {
if (consumers.getConnection() != null) {
((AQjmsConnection)consumers.getConnection()).close();
+ closeCallableStmt(node);
}
this.selectorMetrics.connectionClosed.record();
} catch(JMSException jms) {
@@ -1459,8 +1728,15 @@ public ClientResponse subscribe(ClientRequest request) {
topicConsumersMap.put(node, new TopicConsumers(node));
}
TopicConsumers consumers = topicConsumersMap.get(node);
- consumers.getTopicSubscriber(topic);
- metadata.setDBVersion(consumers.getDBVersion());
+ metadata.setDBVersion(consumers.getDBVersion());
+
+ if(consumers.getlightWeightSub() && metadata.getDBMajorVersion() >= 26) {
+ consumers.createLightWeightSub(topic, node);
+ }
+ else {
+ consumers.getTopicSubscriber(topic);
+ }
+
} catch(JMSException exception) {
log.error("Exception during Subscribe request " + exception, exception);
log.info("Exception during Subscribe request. " + exception);
@@ -1470,7 +1746,7 @@ public ClientResponse subscribe(ClientRequest request) {
}
return createSubscribeResponse(request, topic, null, false);
}
-
+
private ClientResponse createSubscribeResponse(ClientRequest request, String topic, JMSException exception, boolean disconnected) {
return new ClientResponse(request.makeHeader((short)1), request.callback(), request.destination(),
request.createdTimeMs(), time.milliseconds(), disconnected, null,null,
@@ -1573,10 +1849,12 @@ private final class TopicConsumers {
private Map topicSubscribers = null;
private final Node node;
private String dbVersion;
+ private Boolean lightWeightSub;
public TopicConsumers(Node node) throws JMSException {
this(node, TopicSession.AUTO_ACKNOWLEDGE);
}
public TopicConsumers(Node node,int mode) throws JMSException {
+
this.node = node;
conn = createTopicConnection(node);
@@ -1596,9 +1874,11 @@ public TopicConsumers(Node node,int mode) throws JMSException {
try {
this.dbVersion = ConnectionUtils.getDBVersion(oConn);
+ this.lightWeightSub = configs.getBoolean(ConsumerConfig.ORACLE_CONSUMER_LIGHTWEIGHT);
+
}catch(Exception e)
{
- log.error("Exception whle fetching DB Version " + e);
+ log.error("Exception whle fetching DB Version and lightweight consumer config" + e);
}
}catch(Exception e)
@@ -1662,6 +1942,19 @@ private TopicSubscriber createTopicSubscriber(String topic) throws JMSException
topicSubscribers.put(topic, subscriber);
return subscriber;
}
+
+ private void createLightWeightSub(String topic, Node node) {
+ try {
+ CallableStatement cStmt = getOrCreateCallable(node, "CREATE_LTWT_SUB", LTWT_SUB);
+ cStmt.setString(1, ConnectionUtils.enquote(topic));
+ cStmt.setString(2, configs.getString(ConsumerConfig.GROUP_ID_CONFIG));
+ cStmt.execute();
+ log.debug("Lightweight subscriber created for topic: " + topic + ", node: " + node);
+ }
+ catch(Exception ex) {
+ log.error("Error creating lightweight subscriber for topic: " + topic + ", node: " + node, ex);
+ }
+ }
private void refresh(Node node) throws JMSException {
conn = createTopicConnection(node);
@@ -1703,6 +1996,10 @@ public String getDBVersion()
{
return dbVersion;
}
+
+ public boolean getlightWeightSub() {
+ return lightWeightSub;
+ }
}