Skip to content

Modification in build.gradle and examples #51

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 37 commits into
base: okafka-23.4.0.0
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
583c8af
changes in build.gradle and examples
s-ourabh Jul 23, 2024
fb2aaff
merged oracle/okafka-23.4.0.0 to s-ourabh/okafka-23.4.0.0
s-ourabh Jul 23, 2024
b428fd6
syncing the branch okafka-23.4.0.0 with the okafkaWithKafka3.7
s-ourabh Jul 23, 2024
4dfa81b
some changes in example and build.gradle
s-ourabh Jul 23, 2024
b790666
slight change in example
s-ourabh Jul 25, 2024
c27b6d7
Merge remote-tracking branch 'upstream/okafka-23.4.0.0' into okafka-2…
s-ourabh Aug 2, 2024
9d5b723
pull from okafka-23.4.0.0
s-ourabh Aug 2, 2024
b47a75f
Merge pull request #53 from oracle/okafka-23.4.0.0
pasimoes Aug 15, 2024
af16144
syncing with the master
s-ourabh Aug 21, 2024
b645ad1
Update information in Readme file.
linny0608 Aug 21, 2024
3bafb4f
owner instance and enqueue instance issue
s-ourabh Aug 26, 2024
bd94c78
only compile tests while build
s-ourabh Aug 26, 2024
3c89c5a
removed comment
s-ourabh Aug 26, 2024
d1f9ac0
Merge pull request #56 from s-ourabh/ownerInstanceIssue
ichokshi2109 Aug 26, 2024
a5dfae5
Cleanup stale entries from user_queue_partition_assignment_table.
Aug 26, 2024
b032597
Changes to use JDK 11
Aug 26, 2024
f1368f9
Merge remote-tracking branch 'origin/master'
Aug 26, 2024
b99fd2c
Add topic name format requirement.
pasimoes Aug 27, 2024
9605df5
Fix format of code block
pasimoes Aug 27, 2024
b267192
text format
pasimoes Aug 27, 2024
e02b11c
Changes to make TopicName case insensitive.
Aug 27, 2024
6fd7271
TopicName case insensitive for kafka admin
Aug 27, 2024
4a289af
Make topicname case-insensitive for KafkaAdmin
Aug 27, 2024
30d0215
Merge pull request #59 from oracle/CaseInsensitive
ichokshi2109 Aug 28, 2024
0ef791d
Merge pull request #54 from linny0608/modifyConnectorReadme
ichokshi2109 Aug 28, 2024
ab7a264
Update build.gradle
ichokshi2109 Sep 4, 2024
3d517aa
Merge pull request #60 from oracle/ichokshi2109-patch-1
ichokshi2109 Sep 4, 2024
ed15ea2
transactional producer serialized key and value null pointer exceptio…
s-ourabh Oct 3, 2024
b8f67a0
Merge pull request #62 from s-ourabh/transactional_producer_Nullpoint…
ichokshi2109 Oct 3, 2024
d73eac9
Merge branch 'master' into 57-docs-topic-name-uppercase
pasimoes Oct 4, 2024
421c478
sync with upstream
s-ourabh Oct 10, 2024
31b6956
Merge pull request #58 from oracle/57-docs-topic-name-uppercase
ichokshi2109 Oct 15, 2024
f5d3d03
Update README.md
ichokshi2109 Oct 16, 2024
624cb2d
Merge pull request #64 from oracle/ichokshi2109-patch-2
ichokshi2109 Oct 17, 2024
33033a4
conflict resolve
s-ourabh Oct 17, 2024
31ccea3
Merge remote-tracking branch 'upstream/master'
s-ourabh Oct 17, 2024
32716dd
Merge branch 'master' into okafkaWithKafka3.7
s-ourabh Oct 17, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,9 @@ examples/producer/.classpath
examples/producer/.project
examples/producer/.settings/
okafka-github/
clients/config.properties
clients/ojdbc.properties
clientsafterConsumingOkafka.csv
clientsafterProducingOkafka.csv
examples/.externalToolBuilders/
.gitignore
99 changes: 67 additions & 32 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@

# Kafka Java Client for Oracle Transactional Event Queues
# Kafka Java Client for Oracle Transactional Event Queues

## Building the Kafka Java Client for Oracle TxEventQ distribution

Expand All @@ -9,14 +9,13 @@ You need to have [Gradle 7.3 or above](http://www.gradle.org/installation) and [

This distribution contains version 23.4.0.0 of the `Kafka Java Client for Oracle Transactional Event Queues` project. It will be referred as OKafka-23.4.0.0 henceforth. This is tested with JDK 11.0.22 but we recommend using the latest version.

The Kafka Java Client works with Oracle Database 23ai Free version as well as Oracle Database 23ai available on Oracle Autonomous Cloud platform.
The Kafka Java Client works with Oracle Database 23ai Free version as well as Oracle Database 23ai available on Oracle Autonomous Cloud platform.

To test this distribution in free Oracle Cloud environment create [Oracle Cloud account](https://docs.cloud.oracle.com/en-us/iaas/Content/FreeTier/freetier.htm) then create [Oracle Autonomous Transaction Processing Database instance](https://docs.oracle.com/en/cloud/paas/autonomous-data-warehouse-cloud/tutorial-getting-started-autonomous-db/index.html) in cloud.
To test this distribution in free Oracle Cloud environment create [Oracle Cloud account](https://docs.cloud.oracle.com/en-us/iaas/Content/FreeTier/freetier.htm) then create [Oracle Autonomous Transaction Processing Database instance](https://docs.oracle.com/en/cloud/paas/autonomous-data-warehouse-cloud/tutorial-getting-started-autonomous-db/index.html) in cloud.

A database user should be created and should be granted the privileges mentioned in Database user configuration section. Then create a Transactional Event Queue to produce and consume messages.


### Database user configuration ###
### Database user configuration

To run `OKafka application` against Oracle Database, a database user must be created and must be granted below privileges.

Expand All @@ -38,6 +37,24 @@ GRANT SELECT on USER_QUEUE_PARTITION_ASSIGNMENT_TABLE to user;
GRANT SELECT on SYS.DBA_RSRC_PLAN_DIRECTIVES to user;
EXEC DBMS_AQADM.GRANT_PRIV_FOR_RM_PLAN('user');
```
For Oracle database user created on Oracle Autonomous Cloud, below privileges needs to be granted.

```roomsql
GRANT AQ_USER_ROLE to user;
GRANT CONNECT, RESOURCE, unlimited tablespace to user;
GRANT EXECUTE on DBMS_AQ to user;
GRANT EXECUTE on DBMS_AQADM to user;
GRANT EXECUTE on DBMS_AQIN to user;
GRANT EXECUTE on DBMS_TEQK to user;
GRANT SELECT on GV$SESSION to user;
GRANT SELECT on V$SESSION to user;
GRANT SELECT on GV$INSTANCE to user;
GRANT SELECT on GV$LISTENER_NETWORK to user;
GRANT SELECT on GV$PDBS to user;
GRANT SELECT on SYS.DBA_RSRC_PLAN_DIRECTIVES to user;
EXEC DBMS_AQADM.GRANT_PRIV_FOR_RM_PLAN('user');
```


Note:
It is preferred in general to assign or grant a specific quota on a tablespace to a database user instead of granting unlimited quota in default tablespace. One can create a table space and use the following command to grant quota on a specific tablespace to a database user.
Expand All @@ -46,7 +63,7 @@ It is preferred in general to assign or grant a specific quota on a tablespace t
ALTER USER user QUOTA UNLIMITED /* or size-clause */ on tablespace_name;
```

Once user is created and above privileges are granted, connect to Oracle Database as this user and create a Transactional Event Queue using below PL/SQL script. One can also use `KafkaAdmin` interface as shown in `CreateTopic.java` in `examples` directory to create a Transactional Event Queue.
Once user is created and above privileges are granted, connect to Oracle Database as this user and create a Topic using below PL/SQL script.

```roomsql
-- Create an OKafka topic named 'TXEQ' with 5 partition and retention time of 7 days.
Expand All @@ -55,50 +72,69 @@ begin
end;
```

#### Connection configuration ####
> Note: A Topic can also be created using OKAFKA Administration methods. Or, through the Producer interface which creates a new topic if it was not previously created.

#### Connection configuration

`OKafka` uses JDBC(thin driver) connection to connect to Oracle Database instance using any one of two security protocols.

1. PLAINTEXT
2. SSL

1. PLAINTEXT
2. SSL

1.PLAINTEXT: In this protocol a JDBC connection is setup by providing username and password in plain text in ojdbc.prperties file. To use PLAINTEXT protocol user must provide following properties through application.

security.protocol = "PLAINTEXT"
bootstrap.servers = "host:port"
oracle.service.name = "name of the service running on the instance"
oracle.net.tns_admin = "location of ojdbc.properties file"

```text
security.protocol = "PLAINTEXT"
bootstrap.servers = "host:port"
oracle.service.name = "name of the service running on the instance"
oracle.net.tns_admin = "location of ojdbc.properties file"
```

`ojdbc.properties` file must have below properties

user(in lowercase)=DatabaseUserName
password(in lowercase)=Password

```text
user(in lowercase)=DatabaseUserName
password(in lowercase)=Password
```

2.SSL: This protocol requires that, while connecting to Oracle Database, the JDBC driver authenticates database user using Oracle Wallet or Java KeyStore(JKS) files. This protocol is typically used to o connect to Oracle database 23ai instance in Oracle Autonomous cloud. To use this protocol `Okafka` application must specify following properties.

security.protocol = "SSL"
oracle.net.tns_admin = "location containing Oracle Wallet, tnsname.ora and ojdbc.properties file"
tns.alias = "alias of connection string in tnsnames.ora"
```text
security.protocol = "SSL"
oracle.net.tns_admin = "location containing Oracle Wallet, tnsname.ora and ojdbc.properties file"
tns.alias = "alias of connection string in tnsnames.ora"
```

Directory location provided in `oracle.net.tns_admin` property should have:

Directory location provided in `oracle.net.tns_admin` property should have
1. Oracle Wallet
2. tnsnames.ora file
3. ojdbc.properties file (optional)
This depends on how the Oracle Wallet is configured.

Learn more about [JDBC Thin Connections with a Wallet (mTLS)](https://docs.oracle.com/en/cloud/paas/atp-cloud/atpug/connect-jdbc-thin-wallet.html#GUID-5ED3C08C-1A84-4E5A-B07A-A5114951AA9E) to establish secured JDBC connections.

Note: tnsnames.ora file in wallet downloaded from Oracle Autonomous Database contains JDBC connection string which is used for establishing JDBC connection.


> Note: tnsnames.ora file in wallet downloaded from Oracle Autonomous Database contains JDBC connection string which is used for establishing JDBC connection.

#### APIs configuration

You can get a detailed description of the Producer, Consumer and Administration APIs in the [Kafka APIs for Oracle Transactional Event Queues Documentation](https://docs.oracle.com/en/database/oracle/oracle-database/23/adque/Kafka_cient_interface_TEQ.html#GUID-5549915E-6509-4065-B05E-E96338F4742C).

> Note: Topic name property should be provided in UPPERCASE.
>
>> ```text
>> topic.name=<Oracle Database TxEventQ Topic, use uppercase>
>> ```

### Building okafka.jar

Simplest way to build the `okafka.jar` file is by using Gradle build tool.
This distribution contains gradle build files which will work for Gradle 7.3 or higher.

```
```shell
./gradle jar
```

This generates `okafka-23.4.0.0.jar` in `okafka_source_dir/clients/build/libs`.

**Project Dependency:**
Expand All @@ -119,18 +155,17 @@ All these jars are downloaded from Maven Repository during gradle build.

To build the `okafka.jar` file which includes all the dependent jar files in itself.

```
```shell
./gradle fullJar
```
This generates `okafka-full-23.4.0.0.jar` in `okafka_source_dir/clients/build/libs`.


## Build javadoc
This genddsferates `okafka-full-23.4.0.0.jar` in `okafka_source_dir/clients/build/libs`.

## Build javadoc

This command generates javadoc in `okafka_source_dir/clients/build/docs/javadoc`

```
```shell
gradle javadoc
```

Expand All @@ -142,7 +177,7 @@ Repository contains 2 common OKafka application examples in `examples` folder.
Produces 10 messages into TxEQ topic.

`2. ConsumerOKafka.java`
Consumes 10 messages from TxEQ topic.
Consumes 10 messages from TxEQ topic.

## Kafka Java Client APIs supported

Expand Down
30 changes: 15 additions & 15 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@ allprojects {
}

group = 'org.oracle.okafka'
version = '23.4.0.0'
version = '23.5.0.0'

tasks.withType(Javadoc) {
// disable the crazy super-strict doclint tool in Java 8
// noinspection SpellCheckingInspection
title ="Oracle Kafka 23.4.0.0 API"
title ="Oracle Kafka 23.5.0.0 API"
options.addStringOption('Xdoclint:none', '-quiet')
options.windowTitle = "Oracle Database Transactional Event Queues Java API Reference"
options.header = """<b>Oracle&reg; Database Transactional Event Queues Java API Reference<br>23ai</b><br>FF46992-04<br>"""
Expand All @@ -33,7 +33,7 @@ allprojects {

ext {
gradleVersion = '8.8'
minJavaVersion = JavaVersion.VERSION_17
minJavaVersion = JavaVersion.VERSION_11

mavenUrl = project.hasProperty('mavenUrl') ? project.mavenUrl : ''
mavenUsername = project.hasProperty('mavenUsername') ? project.mavenUsername : ''
Expand All @@ -50,13 +50,11 @@ project(':clients') {
main {
java {
srcDir 'src/main/java'
exclude 'tests/**'
exclude 'test/**'
}
}
}

println 'Building okafka 23.4.0.0 Java API jar'
println 'Building okafka 23.5.0.0 Java API jar'

dependencies {

Expand All @@ -67,16 +65,14 @@ project(':clients') {
implementation group: 'javax.jms', name: 'javax.jms-api', version: '2.0'
implementation group: 'com.oracle.database.security', name: 'oraclepki', version: '23.4.0.24.05'
implementation group: 'org.slf4j', name: 'slf4j-api', version: '2.0.0-alpha0'
implementation group: 'org.apache.kafka', name: 'kafka-clients', version: '3.7.1'
// Use JUnit test framework
implementation group: 'junit', name: 'junit', version: '4.12'

implementation group: 'org.apache.kafka', name: 'kafka-clients', version: '3.7.1'

// Test dependencies
testImplementation group: 'org.easymock', name: 'easymock', version: '3.6'
testImplementation group: 'org.powermock', name: 'powermock-module-junit4', version: '2.0.0-beta.5'
testImplementation group: 'org.powermock', name: 'powermock-api-support', version: '2.0.5'
testImplementation group: 'org.powermock', name: 'powermock-api-easymock', version: '2.0.0-beta.5'

testImplementation group: 'junit', name: 'junit', version: '4.12'
}

javadoc {
Expand All @@ -87,9 +83,9 @@ project(':clients') {
}

tasks.named('jar') {
description('Generates okafka 23.4.0.0 API jar ')
description('Generates okafka 23.5.0.0 API jar ')
archiveBaseName = 'okafka'
archiveVersion = '23.4.0.0'
archiveVersion = '23.5.0.0'

from "${rootProject.projectDir}/LICENSE.txt"
from "${rootProject.projectDir}/NOTICE"
Expand All @@ -98,7 +94,7 @@ project(':clients') {
attributes (
'Implementation-Title' : 'okafka',
'Implementation-Version': project.version,
'Version': '23.4.0.0',
'Version': '23.5.0.0',
'Build-Time-ISO-8601':new Date().format("yyyy-MM-dd HH:mm:ss")
)
}
Expand All @@ -125,6 +121,10 @@ project(':clients') {

duplicatesStrategy 'exclude'
}

tasks.withType(Test) {
onlyIf { false }
}
}


Expand Down Expand Up @@ -216,4 +216,4 @@ dependencies {
task multiProjectJar (type: Jar, dependsOn: configurations.childJar) {
description 'Generates a jar containing okafka client, all its dependencies and examples for okafka demo'
from { configurations.childJar.collect { zipTree(it) } }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1598,14 +1598,15 @@ private static boolean groupIdIsUnrepresentable(String groupId) {
public CreateTopicsResult createTopics(final Collection<NewTopic> newTopics, final CreateTopicsOptions options) {
final Map<String, KafkaFutureImpl<TopicMetadataAndConfig>> topicFutures = new HashMap<>(newTopics.size());
final Map<String, CreateTopicsRequest.TopicDetails> topicsMap = new HashMap<>(newTopics.size());

for (NewTopic newTopic : newTopics) {
if (topicNameIsUnrepresentable(newTopic.name())) {
KafkaFutureImpl<TopicMetadataAndConfig> future = new KafkaFutureImpl<>();
future.completeExceptionally(new InvalidTopicException(
"The given topic name '" + newTopic.name() + "' cannot be represented in a request."));
topicFutures.put(newTopic.name(), future);
} else if (!topicFutures.containsKey(newTopic.name())) {
topicFutures.put(newTopic.name(), new KafkaFutureImpl<TopicMetadataAndConfig>());
topicFutures.put(newTopic.name().toUpperCase(), future);
} else if (!topicFutures.containsKey(newTopic.name().toUpperCase())) {
topicFutures.put(newTopic.name().toUpperCase(), new KafkaFutureImpl<TopicMetadataAndConfig>());
TopicDetails topicDetails = null;

if (newTopic.replicasAssignments() != null) {
Expand All @@ -1622,7 +1623,7 @@ public CreateTopicsResult createTopics(final Collection<NewTopic> newTopics, fin
topicDetails = new TopicDetails(newTopic.numPartitions(), newTopic.replicationFactor());
}
}
topicsMap.put(newTopic.name(), topicDetails);
topicsMap.put(newTopic.name().toUpperCase(), topicDetails);
}
}
final long now = time.milliseconds();
Expand Down Expand Up @@ -1693,17 +1694,21 @@ public DeleteTopicsResult deleteTopics(Collection<String> topicNames, DeleteTopi

public org.oracle.okafka.clients.admin.DeleteTopicsResult deleteTopics(Collection<String> topicNames,
org.oracle.okafka.clients.admin.DeleteTopicsOptions options) {

final Map<String, KafkaFutureImpl<Void>> topicFutures = new HashMap<>(topicNames.size());
final List<String> validTopicNames = new ArrayList<>(topicNames.size());

for (String topicName : topicNames) {
if (topicNameIsUnrepresentable(topicName)) {

KafkaFutureImpl<Void> future = new KafkaFutureImpl<>();
future.completeExceptionally(new InvalidTopicException(
"The given topic name '" + topicName + "' cannot be represented in a request."));
topicFutures.put(topicName, future);
} else if (!topicFutures.containsKey(topicName)) {
topicFutures.put(topicName, new KafkaFutureImpl<Void>());
validTopicNames.add(topicName);
topicFutures.put(topicName.toUpperCase(), future);

} else if (!topicFutures.containsKey(topicName.toUpperCase())) {
topicFutures.put(topicName.toUpperCase(), new KafkaFutureImpl<Void>());
validTopicNames.add(topicName.toUpperCase());
}
}
final long now = time.milliseconds();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -739,19 +739,25 @@ public void subscribe(Collection<String> topics, ConsumerRebalanceListener liste
try {
if (topics == null) {
throw new IllegalArgumentException("Topic collection to subscribe to cannot be null");

} else if (topics.isEmpty()) {
// treat subscribing to empty topic list as the same as unsubscribing
this.unsubscribe();
} else {
if (topics.size() > 1)
throw new IllegalArgumentException("Only one topic can be subscribed");


Collection<String> topicsUp = new ArrayList<String>(topics.size());

for (String topic : topics) {
if (topic == null || topic.trim().isEmpty())
throw new IllegalArgumentException(
"Topic collection to subscribe to cannot contain null or empty topic");

topicsUp.add(topic.toUpperCase());
}

topics = topicsUp;

// Only one topic can be subscribed, unsubcribe to previous topics before
// subscribing to new topic
Set<String> Alltopics = subscriptions.metadataTopics();
Expand All @@ -765,7 +771,6 @@ public void subscribe(Collection<String> topics, ConsumerRebalanceListener liste
// metadata.setTopics(subscriptions.groupSubscription());
// Change for 2.8.1 groupSubscription() is not present any more
metadata.setTopics(subscribedTopicSet);

}
} finally {
release();
Expand Down
Loading