Skip to content

Commit 2028a41

Browse files
authored
Merge pull request #88 from Mrunal98/lightweightsub
Client support for lightweight consumer
2 parents 0a5e80f + 70532ec commit 2028a41

File tree

3 files changed

+342
-31
lines changed

3 files changed

+342
-31
lines changed

clients/src/main/java/org/oracle/okafka/clients/CommonClientConfigs.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,9 @@ public class CommonClientConfigs extends org.apache.kafka.clients.CommonClientCo
5858

5959
public static final String ORACLE_TRANSACTIONAL_PRODUCER ="oracle.transactional.producer";
6060

61+
public static final String ORACLE_CONSUMER_LIGHTWEIGHT = "oracle.consumer.lightweight";
62+
public static final String ORACLE_CONSUMER_LIGHTWEIGHT_DOC = "Creates a light weight subscriber";
63+
6164
/*
6265
public static final String METADATA_MAX_AGE_CONFIG = "metadata.max.age.ms";
6366
public static final String METADATA_MAX_AGE_DOC = "The period of time in milliseconds after which we force a refresh of metadata even if we haven't seen any partition leadership changes to proactively discover any new brokers or partitions.";

clients/src/main/java/org/oracle/okafka/clients/consumer/ConsumerConfig.java

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@
3030
package org.oracle.okafka.clients.consumer;
3131

3232
import org.apache.kafka.clients.ClientDnsLookup;
33-
import org.apache.kafka.clients.CommonClientConfigs;
3433
import org.apache.kafka.common.IsolationLevel;
3534
import org.apache.kafka.common.config.AbstractConfig;
3635
import org.apache.kafka.common.config.ConfigDef;
@@ -42,6 +41,7 @@
4241
import org.apache.kafka.common.requests.JoinGroupRequest;
4342
import org.apache.kafka.common.serialization.Deserializer;
4443
import org.oracle.okafka.common.config.SslConfigs;
44+
import org.oracle.okafka.clients.CommonClientConfigs;
4545

4646
import java.util.Collections;
4747
import java.util.HashMap;
@@ -338,15 +338,20 @@ public class ConsumerConfig extends AbstractConfig {
338338
" broker allows for it using `auto.create.topics.enable` broker configuration. This configuration must" +
339339
" be set to `false` when using brokers older than 0.11.0";
340340
public static final boolean DEFAULT_ALLOW_AUTO_CREATE_TOPICS = true;
341-
341+
342342
/**
343343
* <code>security.providers</code>
344344
*/
345345
public static final String SECURITY_PROVIDERS_CONFIG = SecurityConfig.SECURITY_PROVIDERS_CONFIG;
346346
private static final String SECURITY_PROVIDERS_DOC = SecurityConfig.SECURITY_PROVIDERS_DOC;
347347

348348
private static final AtomicInteger CONSUMER_CLIENT_ID_SEQUENCE = new AtomicInteger(1);
349-
349+
350+
/** <code>oracle.consumer.lightweight</code> */
351+
public static final String ORACLE_CONSUMER_LIGHTWEIGHT = "oracle.consumer.lightweight";
352+
public static final String ORACLE_CONSUMER_LIGHTWEIGHT_DOC = "Creates a light weight subscriber";
353+
354+
350355
static {
351356
CONFIG = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG,
352357
Type.LIST,
@@ -606,7 +611,13 @@ public class ConsumerConfig extends AbstractConfig {
606611
.define(org.oracle.okafka.clients.CommonClientConfigs.ORACLE_NET_TNS_ADMIN,
607612
ConfigDef.Type.STRING,
608613
Importance.MEDIUM,
609-
org.oracle.okafka.clients.CommonClientConfigs.ORACLE_NET_TNS_ADMIN_DOC);
614+
org.oracle.okafka.clients.CommonClientConfigs.ORACLE_NET_TNS_ADMIN_DOC)
615+
.define(CommonClientConfigs.ORACLE_CONSUMER_LIGHTWEIGHT,
616+
ConfigDef.Type.BOOLEAN,
617+
false,
618+
Importance.LOW,
619+
CommonClientConfigs.ORACLE_CONSUMER_LIGHTWEIGHT_DOC)
620+
;
610621

611622
}
612623

0 commit comments

Comments
 (0)