Skip to content

Commit 671dea7

Browse files
authored
Merge pull request #432 from ClickHouse/update-version
Updating the version and removing old imports
2 parents 3ce73a9 + 9d34ed1 commit 671dea7

File tree

9 files changed

+29
-39
lines changed

9 files changed

+29
-39
lines changed

Diff for: CHANGELOG.md

+3-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
1+
# 1.2.0
2+
* Adding a KeyToValue transformation to allow for key to be stored in a separate column in ClickHouse
3+
14
# 1.1.4
25
* Bugfix to address field value to column name mapping for Tuples
3-
* Adding a KeyToValue transformation to allow for key to be stored in a separate column in ClickHouse
46

57
# 1.1.3
68
* Update to java-client 0.6.3

Diff for: VERSION

+1-1
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
v1.1.4
1+
v1.2.0

Diff for: build.gradle.kts

-4
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,6 @@
77
*/
88

99
import com.github.jengelman.gradle.plugins.shadow.tasks.ShadowJar
10-
import java.io.ByteArrayOutputStream
11-
import java.net.URI
1210
import java.time.LocalDateTime
1311
import java.time.format.DateTimeFormatter
1412
import org.gradle.api.tasks.testing.logging.TestExceptionFormat
@@ -34,7 +32,6 @@ plugins {
3432
signing
3533
// checkstyle
3634
id("com.github.gmazzo.buildconfig") version "5.3.5"
37-
//id("com.github.spotbugs") version "4.7.9"
3835
id("com.diffplug.spotless") version "6.25.0"
3936
id("com.github.johnrengelman.shadow") version "7.1.2"
4037
}
@@ -50,7 +47,6 @@ repositories {
5047
}
5148

5249
extra.apply {
53-
5450
set("clickHouseDriverVersion", "0.6.3")
5551
set("kafkaVersion", "2.7.0")
5652
set("avroVersion", "1.9.2")

Diff for: src/main/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkConfig.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -262,7 +262,7 @@ public ClickHouseSinkConfig(Map<String, String> props) {
262262
}
263263
}
264264
}
265-
this.clientVersion = props.getOrDefault(CLIENT_VERSION, "V2");
265+
this.clientVersion = props.getOrDefault(CLIENT_VERSION, "V1");
266266

267267
LOGGER.debug("ClickHouseSinkConfig: hostname: {}, port: {}, database: {}, username: {}, sslEnabled: {}, timeout: {}, retry: {}, exactlyOnce: {}",
268268
hostname, port, database, username, sslEnabled, timeout, retry, exactlyOnce);

Diff for: src/test/java/com/clickhouse/kafka/connect/sink/ClickHouseBase.java

+5-6
Original file line numberDiff line numberDiff line change
@@ -91,20 +91,19 @@ protected ClickHouseHelperClient createClient(Map<String,String> props, boolean
9191
.setRetry(csc.getRetry())
9292
.build();
9393

94-
9594
if (withDatabase) {
96-
createDatabase(this.database, tmpChc);
97-
props.put(ClickHouseSinkConnector.DATABASE, this.database);
98-
ClickHouseHelperClient chc = new ClickHouseHelperClient.ClickHouseClientBuilder(hostname, port, csc.getProxyType(), csc.getProxyHost(), csc.getProxyPort())
99-
.setDatabase(this.database)
95+
createDatabase(ClickHouseBase.database, tmpChc);
96+
props.put(ClickHouseSinkConnector.DATABASE, ClickHouseBase.database);
97+
tmpChc = new ClickHouseHelperClient.ClickHouseClientBuilder(hostname, port, csc.getProxyType(), csc.getProxyHost(), csc.getProxyPort())
98+
.setDatabase(ClickHouseBase.database)
10099
.setUsername(username)
101100
.setPassword(password)
102101
.sslEnable(sslEnabled)
103102
.setTimeout(timeout)
104103
.setRetry(csc.getRetry())
105104
.build();
106-
return chc;
107105
}
106+
108107
chc = tmpChc;
109108
return chc;
110109
}

Diff for: src/test/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkTaskSchemalessProxyTest.java

-3
Original file line numberDiff line numberDiff line change
@@ -8,17 +8,14 @@
88
import com.clickhouse.kafka.connect.sink.helper.SchemalessTestData;
99
import eu.rekawek.toxiproxy.Proxy;
1010
import eu.rekawek.toxiproxy.ToxiproxyClient;
11-
import org.apache.kafka.common.record.TimestampType;
1211
import org.apache.kafka.connect.sink.SinkRecord;
1312
import org.junit.jupiter.api.*;
1413
import org.testcontainers.clickhouse.ClickHouseContainer;
1514
import org.testcontainers.containers.Network;
1615
import org.testcontainers.containers.ToxiproxyContainer;
1716

1817
import java.io.IOException;
19-
import java.math.BigDecimal;
2018
import java.util.*;
21-
import java.util.stream.LongStream;
2219

2320
import static org.junit.jupiter.api.Assertions.*;
2421

Diff for: src/test/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkTaskWithSchemaProxyTest.java

+14-14
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ private Map<String, String> getTestProperties() {
6262

6363
@Test
6464
public void proxyPingTest() throws IOException {
65-
ClickHouseHelperClient chc = createClient(getTestProperties(), false);
65+
ClickHouseHelperClient chc = createClient(getTestProperties());
6666
assertTrue(chc.ping());
6767
proxy.disable();
6868
assertFalse(chc.ping());
@@ -72,7 +72,7 @@ public void proxyPingTest() throws IOException {
7272
@Test
7373
public void arrayTypesTest() {
7474
Map<String, String> props = getTestProperties();
75-
ClickHouseHelperClient chc = createClient(props, false);
75+
ClickHouseHelperClient chc = createClient(props);
7676

7777
String topic = "array_string_table_test";
7878
ClickHouseTestHelpers.dropTable(chc, topic);
@@ -92,7 +92,7 @@ public void arrayTypesTest() {
9292
@Test
9393
public void mapTypesTest() {
9494
Map<String, String> props = getTestProperties();
95-
ClickHouseHelperClient chc = createClient(props, false);
95+
ClickHouseHelperClient chc = createClient(props);
9696

9797
String topic = "map_table_test";
9898
ClickHouseTestHelpers.dropTable(chc, topic);
@@ -114,7 +114,7 @@ public void mapTypesTest() {
114114
// https://github.com/ClickHouse/clickhouse-kafka-connect/issues/33
115115
public void materializedViewsBug() {
116116
Map<String, String> props = getTestProperties();
117-
ClickHouseHelperClient chc = createClient(props, false);
117+
ClickHouseHelperClient chc = createClient(props);
118118

119119
String topic = "m_array_string_table_test";
120120
ClickHouseTestHelpers.dropTable(chc, topic);
@@ -135,7 +135,7 @@ public void materializedViewsBug() {
135135
// https://github.com/ClickHouse/clickhouse-kafka-connect/issues/38
136136
public void specialCharTableNameTest() {
137137
Map<String, String> props = getTestProperties();
138-
ClickHouseHelperClient chc = createClient(props, false);
138+
ClickHouseHelperClient chc = createClient(props);
139139

140140
String topic = "special-char-table-test";
141141
ClickHouseTestHelpers.dropTable(chc, topic);
@@ -157,7 +157,7 @@ public void specialCharTableNameTest() {
157157
// https://github.com/ClickHouse/clickhouse-kafka-connect/issues/62
158158
public void nullValueDataTest() {
159159
Map<String, String> props = getTestProperties();
160-
ClickHouseHelperClient chc = createClient(props, false);
160+
ClickHouseHelperClient chc = createClient(props);
161161

162162
String topic = "null-value-table-test";
163163
ClickHouseTestHelpers.dropTable(chc, topic);
@@ -178,7 +178,7 @@ public void nullValueDataTest() {
178178
// https://github.com/ClickHouse/clickhouse-kafka-connect/issues/57
179179
public void supportDatesTest() {
180180
Map<String, String> props = getTestProperties();
181-
ClickHouseHelperClient chc = createClient(props, false);
181+
ClickHouseHelperClient chc = createClient(props);
182182

183183
String topic = "support-dates-table-test";
184184
ClickHouseTestHelpers.dropTable(chc, topic);
@@ -197,7 +197,7 @@ public void supportDatesTest() {
197197
@Test
198198
public void detectUnsupportedDataConversions() {
199199
Map<String, String> props = getTestProperties();
200-
ClickHouseHelperClient chc = createClient(props, false);
200+
ClickHouseHelperClient chc = createClient(props);
201201

202202
String topic = "support-unsupported-dates-table-test";
203203
ClickHouseTestHelpers.dropTable(chc, topic);
@@ -218,7 +218,7 @@ public void detectUnsupportedDataConversions() {
218218
@Test
219219
public void withEmptyDataRecordsTest() {
220220
Map<String, String> props = getTestProperties();
221-
ClickHouseHelperClient chc = createClient(props, false);
221+
ClickHouseHelperClient chc = createClient(props);
222222

223223
String topic = "schema_empty_records_table_test";
224224
ClickHouseTestHelpers.dropTable(chc, topic);
@@ -235,7 +235,7 @@ public void withEmptyDataRecordsTest() {
235235
@Test
236236
public void withLowCardinalityTest() {
237237
Map<String, String> props = getTestProperties();
238-
ClickHouseHelperClient chc = createClient(props, false);
238+
ClickHouseHelperClient chc = createClient(props, true);
239239

240240
String topic = "schema_empty_records_lc_table_test";
241241
ClickHouseTestHelpers.dropTable(chc, topic);
@@ -252,7 +252,7 @@ public void withLowCardinalityTest() {
252252
@Test
253253
public void withUUIDTest() {
254254
Map<String, String> props = getTestProperties();
255-
ClickHouseHelperClient chc = createClient(props, false);
255+
ClickHouseHelperClient chc = createClient(props, true);
256256

257257
String topic = "schema_empty_records_lc_table_test";
258258
ClickHouseTestHelpers.dropTable(chc, topic);
@@ -269,7 +269,7 @@ public void withUUIDTest() {
269269
@Test
270270
public void schemaWithDefaultsTest() {
271271
Map<String, String> props = getTestProperties();
272-
ClickHouseHelperClient chc = createClient(props, false);
272+
ClickHouseHelperClient chc = createClient(props, true);
273273

274274
String topic = "default-value-table-test";
275275
ClickHouseTestHelpers.dropTable(chc, topic);
@@ -288,7 +288,7 @@ public void schemaWithDefaultsTest() {
288288
@Test
289289
public void schemaWithDecimalTest() {
290290
Map<String, String> props = getTestProperties();
291-
ClickHouseHelperClient chc = createClient(props, false);
291+
ClickHouseHelperClient chc = createClient(props, true);
292292

293293
String topic = "decimal-value-table-test";
294294
ClickHouseTestHelpers.dropTable(chc, topic);
@@ -307,7 +307,7 @@ public void schemaWithDecimalTest() {
307307
@Test
308308
public void schemaWithBytesTest() {
309309
Map<String, String> props = getTestProperties();
310-
ClickHouseHelperClient chc = createClient(props, false);
310+
ClickHouseHelperClient chc = createClient(props, true);
311311
String topic = "bytes-value-table-test";
312312
ClickHouseTestHelpers.dropTable(chc, topic);
313313
ClickHouseTestHelpers.createTable(chc, topic, "CREATE TABLE `%s` (`string` String) Engine = MergeTree ORDER BY `string`");

Diff for: src/test/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkTaskWithSchemaTest.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,8 @@ public void materializedViewsBug() {
134134
ClickHouseTestHelpers.dropTable(chc, topic + "mate");
135135
ClickHouseTestHelpers.dropTable(chc, topic);
136136
ClickHouseTestHelpers.createTable(chc, topic, "CREATE TABLE %s ( `off16` Int16, `arr` Array(String), `arr_empty` Array(String), `arr_int8` Array(Int8), `arr_int16` Array(Int16), `arr_int32` Array(Int32), `arr_int64` Array(Int64), `arr_float32` Array(Float32), `arr_float64` Array(Float64), `arr_bool` Array(Bool) ) Engine = MergeTree ORDER BY off16");
137-
ClickHouseTestHelpers.createTable(chc, topic + "mate", "CREATE MATERIALIZED VIEW %s ( `off16` Int16 ) Engine = MergeTree ORDER BY `off16` POPULATE AS SELECT off16 FROM " + topic);
137+
ClickHouseTestHelpers.createTable(chc, topic, "CREATE MATERIALIZED VIEW %s_mv TO " + topic + "_mate AS SELECT off16 FROM " + topic);
138+
ClickHouseTestHelpers.createTable(chc, topic, "CREATE TABLE %s_mate ( `off16` Int16 ) Engine = Null");
138139
Collection<SinkRecord> sr = SchemaTestData.createArrayType(topic, 1);
139140

140141
ClickHouseSinkTask chst = new ClickHouseSinkTask();

Diff for: src/test/java/com/clickhouse/kafka/connect/sink/helper/ClickHouseTestHelpers.java

+3-8
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,7 @@
55
import com.clickhouse.client.api.query.QueryResponse;
66
import com.clickhouse.client.api.query.QuerySettings;
77
import com.clickhouse.client.api.query.Records;
8-
import com.clickhouse.data.ClickHouseColumn;
98
import com.clickhouse.data.ClickHouseFormat;
10-
import com.clickhouse.data.ClickHouseRecord;
11-
import com.clickhouse.data.ClickHouseValue;
129
import com.clickhouse.kafka.connect.sink.db.helper.ClickHouseFieldDescriptor;
1310
import com.clickhouse.kafka.connect.sink.db.helper.ClickHouseHelperClient;
1411
import com.clickhouse.kafka.connect.sink.db.mapping.Column;
@@ -26,12 +23,9 @@
2623
import java.io.InputStreamReader;
2724
import java.io.Serializable;
2825
import java.util.*;
29-
import java.util.concurrent.CompletableFuture;
3026
import java.util.concurrent.ExecutionException;
3127
import java.util.concurrent.TimeUnit;
3228
import java.util.concurrent.TimeoutException;
33-
import java.util.stream.Collectors;
34-
import java.util.stream.StreamSupport;
3529

3630
public class ClickHouseTestHelpers {
3731
private static final Logger LOGGER = LoggerFactory.getLogger(ClickHouseTestHelpers.class);
@@ -74,6 +68,7 @@ public static void dropTable(ClickHouseHelperClient chc, String tableName) {
7468
}
7569
}
7670
public static OperationMetrics createTable(ClickHouseHelperClient chc, String tableName, String createTableQuery) {
71+
LOGGER.info("Creating table: {}, Query: {}", tableName, createTableQuery);
7772
OperationMetrics operationMetrics = createTable(chc, tableName, createTableQuery, new HashMap<>());
7873
if (isCloud()) {
7974
try {
@@ -92,7 +87,7 @@ public static OperationMetrics createTable(ClickHouseHelperClient chc, String ta
9287
settings.setOption(entry.getKey(), entry.getValue());
9388
}
9489
try {
95-
Records records = chc.getClient().queryRecords(createTableQueryTmp, settings).get(10, java.util.concurrent.TimeUnit.SECONDS);
90+
Records records = chc.getClient().queryRecords(createTableQueryTmp, settings).get(120, java.util.concurrent.TimeUnit.SECONDS);
9691
return records.getMetrics();
9792
} catch (Exception e) {
9893
throw new RuntimeException(e);
@@ -141,7 +136,7 @@ public static int countRows(ClickHouseHelperClient chc, String tableName) {
141136
String queryCount = String.format("SELECT COUNT(*) FROM `%s`", tableName);
142137

143138
try {
144-
Records records = chc.getClient().queryRecords(queryCount).get(10, TimeUnit.SECONDS);
139+
Records records = chc.getClient().queryRecords(queryCount).get(120, TimeUnit.SECONDS);
145140
// Note we probrbly need asInteger() here
146141
String value = records.iterator().next().getString(1);
147142
return Integer.parseInt(value);

0 commit comments

Comments
 (0)