Skip to content

Commit ca6dc99

Browse files
mingmxuasereda-gs
mingmxu
authored andcommitted
[CALCITE-2913] Adapter for Apache Kafka (Mingmin Xu)
Expose an Apache Kafka topic as a stream table.
1 parent 1310295 commit ca6dc99

17 files changed

+1046
-1
lines changed

kafka/pom.xml

Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<!--
3+
Licensed to the Apache Software Foundation (ASF) under one or more
4+
contributor license agreements. See the NOTICE file distributed with
5+
this work for additional information regarding copyright ownership.
6+
The ASF licenses this file to you under the Apache License, Version 2.0
7+
(the "License"); you may not use this file except in compliance with
8+
the License. You may obtain a copy of the License at
9+
10+
http://www.apache.org/licenses/LICENSE-2.0
11+
12+
Unless required by applicable law or agreed to in writing, software
13+
distributed under the License is distributed on an "AS IS" BASIS,
14+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
See the License for the specific language governing permissions and
16+
limitations under the License.
17+
-->
18+
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0"
19+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
20+
<modelVersion>4.0.0</modelVersion>
21+
<parent>
22+
<groupId>org.apache.calcite</groupId>
23+
<artifactId>calcite</artifactId>
24+
<version>1.20.0-SNAPSHOT</version>
25+
</parent>
26+
27+
<!-- The basics. -->
28+
<artifactId>calcite-kafka</artifactId>
29+
<packaging>jar</packaging>
30+
<name>calcite kafka</name>
31+
<description>Kafka Adapter. Exposes kafka topic(s) as stream table(s).</description>
32+
33+
<properties>
34+
<top.dir>${project.basedir}/..</top.dir>
35+
<build.timestamp>${maven.build.timestamp}</build.timestamp>
36+
</properties>
37+
38+
<dependencies>
39+
<dependency>
40+
<groupId>org.apache.calcite</groupId>
41+
<artifactId>calcite-core</artifactId>
42+
</dependency>
43+
<dependency>
44+
<groupId>org.apache.calcite</groupId>
45+
<artifactId>calcite-core</artifactId>
46+
<type>test-jar</type>
47+
<scope>test</scope>
48+
<exclusions>
49+
<exclusion>
50+
<artifactId>commons-compiler</artifactId>
51+
<groupId>org.codehaus.janino</groupId>
52+
</exclusion>
53+
</exclusions>
54+
</dependency>
55+
<dependency>
56+
<groupId>org.apache.calcite</groupId>
57+
<artifactId>calcite-linq4j</artifactId>
58+
</dependency>
59+
<dependency>
60+
<groupId>com.google.guava</groupId>
61+
<artifactId>guava</artifactId>
62+
</dependency>
63+
<dependency>
64+
<groupId>org.apache.kafka</groupId>
65+
<artifactId>kafka-clients</artifactId>
66+
</dependency>
67+
<dependency>
68+
<groupId>junit</groupId>
69+
<artifactId>junit</artifactId>
70+
<scope>test</scope>
71+
</dependency>
72+
</dependencies>
73+
74+
<build>
75+
<plugins>
76+
<plugin>
77+
<groupId>org.apache.maven.plugins</groupId>
78+
<artifactId>maven-dependency-plugin</artifactId>
79+
<version>${maven-dependency-plugin.version}</version>
80+
<executions>
81+
<execution>
82+
<id>copy-dependencies</id>
83+
<phase>package</phase>
84+
<goals>
85+
<goal>copy-dependencies</goal>
86+
</goals>
87+
<configuration>
88+
<outputDirectory>${project.build.directory}/dependencies/</outputDirectory>
89+
<overWriteReleases>false</overWriteReleases>
90+
<overWriteSnapshots>false</overWriteSnapshots>
91+
<overWriteIfNewer>true</overWriteIfNewer>
92+
</configuration>
93+
</execution>
94+
</executions>
95+
</plugin>
96+
</plugins>
97+
</build>
98+
99+
</project>
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to you under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.calcite.adapter.kafka;
18+
19+
import org.apache.calcite.linq4j.Enumerator;
20+
21+
import org.apache.kafka.clients.consumer.Consumer;
22+
import org.apache.kafka.clients.consumer.ConsumerConfig;
23+
import org.apache.kafka.clients.consumer.ConsumerRecord;
24+
import org.apache.kafka.clients.consumer.ConsumerRecords;
25+
26+
import java.time.Duration;
27+
import java.util.LinkedList;
28+
import java.util.concurrent.atomic.AtomicBoolean;
29+
30+
/**
31+
* Enumerator to read data from {@link Consumer},
32+
* and converted into SQL rows with {@link KafkaRowConverter}.
33+
* @param <K>: type for Kafka message key,
34+
* refer to {@link ConsumerConfig#KEY_DESERIALIZER_CLASS_CONFIG};
35+
* @param <V>: type for Kafka message value,
36+
* refer to {@link ConsumerConfig#VALUE_DESERIALIZER_CLASS_CONFIG};
37+
*/
38+
public class KafkaMessageEnumerator<K, V> implements Enumerator<Object[]> {
39+
final Consumer consumer;
40+
final KafkaRowConverter<K, V> rowConverter;
41+
private final AtomicBoolean cancelFlag;
42+
43+
//runtime
44+
private final LinkedList<ConsumerRecord<K, V>> bufferedRecords = new LinkedList<>();
45+
private ConsumerRecord<K, V> curRecord;
46+
47+
KafkaMessageEnumerator(final Consumer consumer,
48+
final KafkaRowConverter<K, V> rowConverter,
49+
final AtomicBoolean cancelFlag) {
50+
this.consumer = consumer;
51+
this.rowConverter = rowConverter;
52+
this.cancelFlag = cancelFlag;
53+
}
54+
55+
/**
56+
* It returns an Array of Object, with each element represents a field of row.
57+
*/
58+
@Override public Object[] current() {
59+
return rowConverter.toRow(curRecord);
60+
}
61+
62+
@Override public boolean moveNext() {
63+
if (cancelFlag.get()) {
64+
return false;
65+
}
66+
67+
while (bufferedRecords.isEmpty()) {
68+
pullRecords();
69+
}
70+
71+
curRecord = bufferedRecords.removeFirst();
72+
return true;
73+
}
74+
75+
private void pullRecords() {
76+
ConsumerRecords<K, V> records = consumer.poll(Duration.ofMillis(100));
77+
for (ConsumerRecord record : records) {
78+
bufferedRecords.add(record);
79+
}
80+
}
81+
82+
@Override public void reset() {
83+
this.bufferedRecords.clear();
84+
pullRecords();
85+
}
86+
87+
@Override public void close() {
88+
consumer.close();
89+
}
90+
}
91+
// End KafkaMessageEnumerator.java
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to you under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.calcite.adapter.kafka;
18+
19+
import org.apache.calcite.rel.type.RelDataType;
20+
21+
import org.apache.kafka.clients.consumer.ConsumerConfig;
22+
import org.apache.kafka.clients.consumer.ConsumerRecord;
23+
24+
/**
25+
* Interface to handle formatting between Kafka message and Calcite row.
26+
*
27+
* @param <K>: type for Kafka message key,
28+
* refer to {@link ConsumerConfig#KEY_DESERIALIZER_CLASS_CONFIG};
29+
* @param <V>: type for Kafka message value,
30+
* refer to {@link ConsumerConfig#VALUE_DESERIALIZER_CLASS_CONFIG};
31+
*
32+
*/
33+
public interface KafkaRowConverter<K, V> {
34+
35+
/**
36+
* Generate row type for a given Kafka topic.
37+
*
38+
* @param topicName, Kafka topic name;
39+
* @return row type
40+
*/
41+
RelDataType rowDataType(String topicName);
42+
43+
/**
44+
* Parse and reformat Kafka message from consumer,
45+
* to align with row type defined as {@link #rowDataType(String)}.
46+
* @param message, the raw Kafka message record;
47+
* @return fields in the row
48+
*/
49+
Object[] toRow(ConsumerRecord<K, V> message);
50+
}
51+
// End KafkaRowConverter.java
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to you under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.calcite.adapter.kafka;
18+
19+
import org.apache.calcite.rel.type.RelDataType;
20+
import org.apache.calcite.rel.type.RelDataTypeFactory;
21+
import org.apache.calcite.rel.type.RelDataTypeSystem;
22+
import org.apache.calcite.sql.type.SqlTypeFactoryImpl;
23+
import org.apache.calcite.sql.type.SqlTypeName;
24+
25+
import org.apache.kafka.clients.consumer.ConsumerRecord;
26+
27+
/**
28+
* Default implementation of {@link KafkaRowConverter}, both key and value are byte[].
29+
*/
30+
public class KafkaRowConverterImpl implements KafkaRowConverter<byte[], byte[]> {
31+
/**
32+
* Generate row schema for a given Kafka topic.
33+
*
34+
* @param topicName, Kafka topic name;
35+
* @return row type
36+
*/
37+
@Override public RelDataType rowDataType(final String topicName) {
38+
final RelDataTypeFactory typeFactory =
39+
new SqlTypeFactoryImpl(RelDataTypeSystem.DEFAULT);
40+
final RelDataTypeFactory.Builder fieldInfo = typeFactory.builder();
41+
fieldInfo.add("MSG_PARTITION", typeFactory.createSqlType(SqlTypeName.INTEGER)).nullable(false);
42+
fieldInfo.add("MSG_TIMESTAMP", typeFactory.createSqlType(SqlTypeName.BIGINT)).nullable(false);
43+
fieldInfo.add("MSG_OFFSET", typeFactory.createSqlType(SqlTypeName.BIGINT)).nullable(false);
44+
fieldInfo.add("MSG_KEY_BYTES", typeFactory.createSqlType(SqlTypeName.VARBINARY)).nullable(true);
45+
fieldInfo.add("MSG_VALUE_BYTES", typeFactory.createSqlType(SqlTypeName.VARBINARY))
46+
.nullable(false);
47+
48+
return fieldInfo.build();
49+
}
50+
51+
/**
52+
* Parse and reformat Kafka message from consumer, to align with row schema
53+
* defined as {@link #rowDataType(String)}.
54+
* @param message, the raw Kafka message record;
55+
* @return fields in the row
56+
*/
57+
@Override public Object[] toRow(final ConsumerRecord<byte[], byte[]> message) {
58+
Object[] fields = new Object[5];
59+
fields[0] = message.partition();
60+
fields[1] = message.timestamp();
61+
fields[2] = message.offset();
62+
fields[3] = message.key();
63+
fields[4] = message.value();
64+
65+
return fields;
66+
}
67+
}
68+
// End KafkaRowConverterImpl.java

0 commit comments

Comments
 (0)