Skip to content

Add OKafka Transactional Examples and Update Kafka Client #390

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Sep 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion txeventq/okafka/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,3 @@ doc/
.settings/

ojdbc.properties
TxProducer/
68 changes: 62 additions & 6 deletions txeventq/okafka/Quickstart/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ END;

## Step 4: Investigate and Try Simple Producer and Consumer

The repository contains 2 common OKafka application examples in `Simple` folder.
The repository contains two common OKafka application examples in `Simple` folder.

1. The Producer `ProducerOKafka.java`

Expand Down Expand Up @@ -197,7 +197,7 @@ You should see some output that looks very similar to this:
13:33:31.862 [main] INFO org.oracle.okafka.clients.producer.KafkaProducer -- [Producer clientId=] Overriding the default retries config to the recommended value of 2147483647 since the idempotent producer is enabled.
13:33:31.865 [kafka-producer-network-thread | ] DEBUG org.oracle.okafka.clients.producer.internals.SenderThread -- [Producer clientId=] Starting Kafka producer I/O thread.
13:33:31.866 [kafka-producer-network-thread | ] DEBUG org.oracle.okafka.clients.producer.internals.SenderThread -- [Producer clientId=] Sender waiting for 100
13:33:31.866 [main] INFO org.apache.kafka.common.utils.AppInfoParser -- Kafka version: 2.8.1
13:33:31.866 [main] INFO org.apache.kafka.common.utils.AppInfoParser -- Kafka version: 3.7.1
13:33:31.867 [main] INFO org.apache.kafka.common.utils.AppInfoParser -- Kafka commitId: 839b886f9b732b15
13:33:31.867 [main] INFO org.apache.kafka.common.utils.AppInfoParser -- Kafka startTimeMs: 1724258011865
13:33:31.867 [main] DEBUG org.oracle.okafka.clients.producer.KafkaProducer -- [Producer clientId=] Kafka producer started
Expand Down Expand Up @@ -229,8 +229,6 @@ Initiating close
13:33:48.738 [main] INFO org.apache.kafka.common.utils.AppInfoParser -- App info kafka.producer for unregistered
13:33:48.738 [main] DEBUG org.oracle.okafka.clients.producer.KafkaProducer -- [Producer clientId=] Kafka producer has been closed

BUILD SUCCESSFUL in 17s
3 actionable tasks: 3 executed
```

And, querying the topic `TOPIC_1` at the Database, you should see some output that looks very similar to this:
Expand Down Expand Up @@ -290,7 +288,7 @@ gradle :Simple:Consumer:run
.....
value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer

[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 2.8.1
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 3.7.1
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: 839b886f9b732b15
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 1724268189943
[main] INFO org.oracle.okafka.clients.NetworkClient - [Consumer clientId=consumer-consumer_grp_1-1, groupId=consumer_grp_1] Available Nodes 1
Expand All @@ -301,10 +299,68 @@ gradle :Simple:Consumer:run
[main] INFO org.oracle.okafka.clients.NetworkClient - [Consumer clientId=consumer-consumer_grp_1-1, groupId=consumer_grp_1] Reconnect successful to node 1:localhost:1521:FREEPDB1:FREE:OKAFKA_USER
[main] INFO org.oracle.okafka.clients.Metadata - Cluster ID: FREE
[main] INFO org.oracle.okafka.clients.NetworkClient - [Consumer clientId=consumer-consumer_grp_1-1, groupId=consumer_grp_1] Available Nodes 1
No Record Fetched. Retrying in 1 second
partition = 0, offset = 0, key = Just some key for OKafka0, value =0This is test with 128 characters Payload used to test Oracle Kafka. Read https://github.com/oracle/okafka/blob/master/README.md
partition = 0, offset = 1, key = Just some key for OKafka1, value =1This is test with 128 characters Payload used to test Oracle Kafka. Read https://github.com/oracle/okafka/blob/master/README.md
partition = 0, offset = 2, key = Just some key for OKafka2, value =2This is test with 128 characters Payload used to test Oracle Kafka. Read https://github.com/oracle/okafka/blob/master/README.md
partition = 0, offset = 3, key = Just some key for OKafka3, value =3This is test with 128 characters Payload used to test Oracle Kafka. Read https://github.com/oracle/okafka/blob/master/README.md
partition = 0, offset = 4, key = Just some key for OKafka4, value =4This is test with 128 characters Payload used to test Oracle Kafka. Read https://github.com/oracle/okafka/blob/master/README.md
partition = 0, offset = 5, key = Just some key for OKafka5, value =5This is test with 128 characters Payload used to test Oracle Kafka. Read https://github.com/oracle/okafka/blob/master/README.md
partition = 0, offset = 6, key = Just some key for OKafka6, value =6This is test with 128 characters Payload used to test Oracle Kafka. Read https://github.com/oracle/okafka/blob/master/README.md
partition = 0, offset = 7, key = Just some key for OKafka7, value =7This is test with 128 characters Payload used to test Oracle Kafka. Read https://github.com/oracle/okafka/blob/master/README.md
partition = 0, offset = 8, key = Just some key for OKafka8, value =8This is test with 128 characters Payload used to test Oracle Kafka. Read https://github.com/oracle/okafka/blob/master/README.md
partition = 0, offset = 9, key = Just some key for OKafka9, value =9This is test with 128 characters Payload used to test Oracle Kafka. Read https://github.com/oracle/okafka/blob/master/README.md
Committing records10
No Record Fetched. Retrying in 1 second
```

.....
## Step 5: Investigate and Try Administration API

With Administration API it is possible create and delete Topics.

### Task 1: Try the Producer

Let’s build and run the Admin Example Class. Use your IDE or open a command line (or terminal) and navigate to the folder
where you have the project files `<Quickstart Directory>/`. We can build and run the application by issuing the following command:

```cmd
gradle Simple:Admin:run
Usage: java OKafkaAdminTopic [CREATE|DELETE] topic1 ... topicN
```

This command requires at least two parameters. The first is specify if you wants to create or delete the topics informed
in sequence. For example:

```shell
gradle Simple:Admin:run --args="CREATE TOPIC_ADMIN_2 TOPIC_ADMIN_3"
```

As a result you will see the two new topics created.

```sql
SQL> select name, queue_table, dequeue_enabled,enqueue_enabled, sharded, queue_category, recipients
2 from all_queues
3 where OWNER='OKAFKA_USER'
4* and QUEUE_TYPE<>'EXCEPTION_QUEUE';

NAME QUEUE_TABLE DEQUEUE_ENABLED ENQUEUE_ENABLED SHARDED QUEUE_CATEGORY RECIPIENTS
________________ ________________ __________________ __________________ __________ ____________________________ _____________
......
TOPIC_ADMIN_2 TOPIC_ADMIN_2 YES YES TRUE Sharded Queue MULTIPLE
TOPIC_ADMIN_3 TOPIC_ADMIN_3 YES YES TRUE Sharded Queue MULTIPLE
```


## Transaction in OKafka Examples

Kafka Client for Oracle Transactional Event Queues allow developers use the transaction API effectively.

Transactions allow for atomic writes across multiple TxEventQ topics and partitions, ensuring that either all messages
within the transaction are successfully written, or none are. For instance, if an error occurs during processing, the
transaction may be aborted, preventing any of the messages from being committed to the topic or accessed by consumers.

You can now build and run the [Transactional Examples](./Transactional/TRANSACTIONAL_EXAMPLES.MD).

## Want to Learn More?

- [Kafka APIs for Oracle Transactional Event Queues](https://docs.oracle.com/en/database/oracle/oracle-database/19/adque/)
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ bootstrap.servers=<server address:server port>
oracle.service.name=<oracle database service>
oracle.net.tns_admin=<location of ojdbc.properties file>


#Option 2: Connect to Oracle Database deployed in Oracle Autonomous Cloud using Wallet
security.protocol=PLAINTEXT
#oracle.net.tns_admin=<location of Oracle Wallet, tnanames.ora and ojdbc.properties file>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,16 @@ oracle.net.tns_admin=<location of ojdbc.properties file>
#tns.alias=<tns alias>

# Application specific OKafka consumer properties
topic.name=<Oracle Database TxEventQ Topic [topic_1]>
topic.name=<Oracle Database TxEventQ Topic [TOPIC_1]>
group.id=<Oracle Database TxEventQ Subscriber [consumer_grp_1]>


enable.auto.commit=true
max.poll.records=1000
default.api.timeout.ms=180000

# Start consuming from the beginning (Default = latest);
auto.offset.reset=earliest

key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer

Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ oracle.net.tns_admin=<location of ojdbc.properties file>
#tns.alias=<tns alias>

#Appliction specific OKafka Producer properties
topic.name=<Oracle Database TxEventQ Topic [topic_1]>
allow.auto.create.topics=FALSE
topic.name=<Oracle Database TxEventQ Topic [TOPIC_1]>

batch.size=200
linger.ms=100
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@

package org.oracle.okafka.examples;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.sql.Connection;
import java.time.Duration;
Expand All @@ -24,68 +27,32 @@

import org.oracle.okafka.clients.consumer.KafkaConsumer;


public class TransactionalConsumerOKafka {

// Dummy implementation of ConsumerRebalanceListener interface
// It only maintains the list of assigned partitions in assignedPartitions list
static class ConsumerRebalance implements ConsumerRebalanceListener {

public List<TopicPartition> assignedPartitions = new ArrayList();
public static void main(String[] args) {
System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "DEBUG");

@Override
public synchronized void onPartitionsAssigned(Collection<TopicPartition> partitions) {
System.out.println("Newly Assigned Partitions:");
for (TopicPartition tp :partitions ) {
System.out.println(tp);
assignedPartitions.add(tp);
// Get application properties
Properties appProperties = null;
try {
appProperties = getProperties();
if (appProperties == null) {
System.out.println("Application properties not found!");
System.exit(-1);
}
} catch (Exception e) {
System.out.println("Application properties not found!");
System.out.println("Exception: " + e);
System.exit(-1);
}

@Override
public synchronized void onPartitionsRevoked(Collection<TopicPartition> partitions) {
System.out.println("Revoked previously assigned partitions. ");
for (TopicPartition tp :assignedPartitions ) {
System.out.println(tp);
}
assignedPartitions.clear();
}
}
String topicName = appProperties.getProperty("topic.name", "TXEQ");
appProperties.remove("topic.name"); // Pass props to build OKafkaProducer

public static void main(String[] args) {
Properties props = new Properties();

// Option 1: Connect to Oracle Database with database username and password
props.put("security.protocol","PLAINTEXT");
//IP or Host name where Oracle Database 23ai is running and Database Listener's Port
props.put("bootstrap.servers", "localhost:1521");
props.put("oracle.service.name", "freepdb1"); //name of the service running on the database instance
// location for ojdbc.properties file where user and password properties are saved
props.put("oracle.net.tns_admin",".");

/*
//Option 2: Connect to Oracle Autonomous Database using Oracle Wallet
//This option to be used when connecting to Oracle autonomous database instance on OracleCloud
props.put("security.protocol","SSL");
// location for Oracle Wallet, tnsnames.ora file and ojdbc.properties file
props.put("oracle.net.tns_admin",".");
props.put("tns.alias","Oracle23ai_high");
*/

//Consumer Group Name
props.put("group.id" , "CG1");
props.put("enable.auto.commit","false");

// Maximum number of records fetched in single poll call
props.put("max.poll.records", 10);

props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

Consumer<String , String> consumer = new KafkaConsumer<String, String>(props);
Consumer<String , String> consumer = new KafkaConsumer<String, String>(appProperties);
ConsumerRebalanceListener rebalanceListener = new ConsumerRebalance();

consumer.subscribe(Arrays.asList("TXEQ"), rebalanceListener);
consumer.subscribe(Arrays.asList(topicName), rebalanceListener);

int expectedMsgCnt = 100;
int msgCnt = 0;
Expand Down Expand Up @@ -152,4 +119,55 @@ private static void processRecord(Connection conn, ConsumerRecord<String, String
{
//Application specific logic to process the message
}


private static java.util.Properties getProperties() throws IOException {
InputStream inputStream = null;
Properties appProperties;

try {
Properties prop = new Properties();
String propFileName = "config.properties";
inputStream = TransactionalConsumerOKafka.class.getClassLoader().getResourceAsStream(propFileName);
if (inputStream != null) {
prop.load(inputStream);
} else {
throw new FileNotFoundException("property file '" + propFileName + "' not found.");
}
appProperties = prop;

} catch (Exception e) {
System.out.println("Exception: " + e);
throw e;
} finally {
if (inputStream != null)
inputStream.close();
}
return appProperties;
}

// Dummy implementation of ConsumerRebalanceListener interface
// It only maintains the list of assigned partitions in assignedPartitions list
static class ConsumerRebalance implements ConsumerRebalanceListener {

public List<TopicPartition> assignedPartitions = new ArrayList();

@Override
public synchronized void onPartitionsAssigned(Collection<TopicPartition> partitions) {
System.out.println("Newly Assigned Partitions:");
for (TopicPartition tp :partitions ) {
System.out.println(tp);
assignedPartitions.add(tp);
}
}

@Override
public synchronized void onPartitionsRevoked(Collection<TopicPartition> partitions) {
System.out.println("Revoked previously assigned partitions. ");
for (TopicPartition tp :assignedPartitions ) {
System.out.println(tp);
}
assignedPartitions.clear();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# OKafka Producer example properties

#Properties to connect to Oracle Database
#Option 1: Connect to Oracle database using plaintext

bootstrap.servers=<server address:server port>
oracle.service.name=<oracle database service>
oracle.net.tns_admin=<location of ojdbc.properties file>


#Option 2: Connect to Oracle Database deployed in Oracle Autonomous Cloud using Wallet
#security.protocol=SSL
security.protocol=PLAINTEXT

#oracle.net.tns_admin=<location of Oracle Wallet, tnanames.ora and ojdbc.properties file>
#tns.alias=<tns alias>

# Application specific OKafka consumer properties
topic.name=<Oracle Database TxEventQ Topic [TOPIC_1]>
group.id=<Oracle Database TxEventQ Subscriber [consumer_grp_1]>

enable.auto.commit=false

# Start consuming from the beginning (Default = latest);
auto.offset.reset=earliest

# Maximum number of records fetched in single poll call
max.poll.records=10

default.api.timeout.ms=180000

key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer

Loading