Skip to content

Commit b97c692

Browse files
authored
Merge pull request #461 from ClickHouse/bugfixes
Updating for incident
2 parents 2e0c17e + 1566e27 commit b97c692

File tree

7 files changed

+76
-18
lines changed

7 files changed

+76
-18
lines changed

.github/workflows/pre-release.yaml

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
name: Release
2+
3+
on:
4+
workflow_dispatch:
5+
6+
permissions:
7+
contents: write
8+
9+
jobs:
10+
build_release:
11+
name: build_release
12+
runs-on: ubuntu-latest
13+
steps:
14+
- name: Checkout code
15+
uses: actions/checkout@v3
16+
- name: Set env
17+
run: echo "RELEASE_VERSION=$(cat VERSION)" >> $GITHUB_ENV
18+
- name: Test
19+
run: |
20+
echo $RELEASE_VERSION
21+
echo ${{ env.RELEASE_VERSION }}
22+
- name: Set up JDK 17
23+
uses: actions/setup-java@v3
24+
with:
25+
java-version: '17'
26+
distribution: 'adopt'
27+
architecture: x64
28+
- name: Setup and execute Gradle 'createConfluentArchive' task
29+
uses: gradle/gradle-build-action@v2
30+
with:
31+
arguments: createConfluentArchive
32+
gradle-version: '7.4.2'
33+
- name: release
34+
uses: actions/create-release@v1
35+
id: create_release
36+
with:
37+
draft: false
38+
prerelease: true
39+
release_name: ${{ env.RELEASE_VERSION }}
40+
tag_name: ${{ env.RELEASE_VERSION }}
41+
env:
42+
GITHUB_TOKEN: ${{ github.token }}
43+
- name: upload release artifact
44+
uses: actions/upload-release-asset@v1
45+
env:
46+
GITHUB_TOKEN: ${{ github.token }}
47+
with:
48+
upload_url: ${{ steps.create_release.outputs.upload_url }}
49+
asset_path: ./build/confluent/clickhouse-kafka-connect-${{ env.RELEASE_VERSION }}.zip
50+
asset_name: clickhouse-kafka-connect-${{ env.RELEASE_VERSION }}.zip
51+
asset_content_type: application/zip

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,7 @@
1+
# 1.2.4
2+
* Adjusting underlying client version to 0.7.0
3+
* Bugfix for UINT handling
4+
15
# 1.2.3
26
* Tweaking schema validation to allow for UINT
37

VERSION

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
v1.2.3
1+
v1.2.4

build.gradle.kts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ java {
2222
buildscript {
2323
repositories {
2424
mavenCentral()
25+
maven{ url = uri("https://s01.oss.sonatype.org/content/repositories/snapshots/") }
2526
}
2627
}
2728

@@ -44,10 +45,11 @@ repositories {
4445
mavenCentral()
4546
maven("https://packages.confluent.io/maven/")
4647
maven("https://jitpack.io")
48+
maven{ url = uri("https://s01.oss.sonatype.org/content/repositories/snapshots/") }
4749
}
4850

4951
extra.apply {
50-
set("clickHouseDriverVersion", "0.6.3")
52+
set("clickHouseDriverVersion", "0.7.0-SNAPSHOT")
5153
set("kafkaVersion", "2.7.0")
5254
set("avroVersion", "1.9.2")
5355

src/main/java/com/clickhouse/kafka/connect/sink/db/ClickHouseWriter.java

Lines changed: 9 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import java.io.InputStream;
3030
import java.io.OutputStream;
3131
import java.math.BigDecimal;
32+
import java.math.BigInteger;
3233
import java.time.LocalDateTime;
3334
import java.time.ZoneOffset;
3435
import java.time.ZonedDateTime;
@@ -244,16 +245,10 @@ protected boolean validateDataSchema(Table table, Record record, boolean onlyFie
244245
if (colTypeName.equals("TUPLE") && dataTypeName.equals("STRUCT"))
245246
continue;
246247

247-
if (colTypeName.equalsIgnoreCase("UINT8") && dataTypeName.equals("INT8"))
248-
continue;
249-
250-
if (colTypeName.equalsIgnoreCase("UINT16") && dataTypeName.equals("INT16"))
251-
continue;
252-
253-
if (colTypeName.equalsIgnoreCase("UINT32") && dataTypeName.equals("INT32"))
254-
continue;
255-
256-
if (colTypeName.equalsIgnoreCase("UINT64") && dataTypeName.equals("INT64"))
248+
if (colTypeName.equalsIgnoreCase("UINT8")
249+
|| colTypeName.equalsIgnoreCase("UINT16")
250+
|| colTypeName.equalsIgnoreCase("UINT32")
251+
|| colTypeName.equalsIgnoreCase("UINT64"))
257252
continue;
258253

259254
if (("DECIMAL".equalsIgnoreCase(colTypeName) && objSchema.name().equals("org.apache.kafka.connect.data.Decimal")))
@@ -713,15 +708,15 @@ protected void doInsertRawBinaryV2(List<Record> records, Table table, QueryIdent
713708
insertSettings.setQueryId(queryId.getQueryId());
714709

715710
for (String clickhouseSetting : csc.getClickhouseSettings().keySet()) {//THIS ASSUMES YOU DON'T ADD insert_deduplication_token
716-
insertSettings.setOption(clickhouseSetting, csc.getClickhouseSettings().get(clickhouseSetting));
711+
insertSettings.serverSetting(clickhouseSetting, csc.getClickhouseSettings().get(clickhouseSetting));
717712
}
718713
// insertSettings.setOption(ClickHouseClientOption.WRITE_BUFFER_SIZE.name(), 8192);
719714

720715
ByteArrayOutputStream stream = new ByteArrayOutputStream();
721716
for (Record record : records) {
722717
if (record.getSinkRecord().value() != null) {
723718
for (Column col : table.getRootColumnsList()) {
724-
System.out.println("Writing column: " + col.getName());
719+
LOGGER.debug("Writing column: {}", col.getName());
725720
long beforePushStream = System.currentTimeMillis();
726721
doWriteCol(record, col, stream, supportDefaults);
727722
pushStreamTime += System.currentTimeMillis() - beforePushStream;
@@ -897,7 +892,7 @@ protected void doInsertJsonV2(List<Record> records, Table table, QueryIdentifier
897892
insertSettings.setQueryId(queryId.getQueryId());
898893

899894
for (String clickhouseSetting : csc.getClickhouseSettings().keySet()) {//THIS ASSUMES YOU DON'T ADD insert_deduplication_token
900-
insertSettings.setOption(clickhouseSetting, csc.getClickhouseSettings().get(clickhouseSetting));
895+
insertSettings.serverSetting(clickhouseSetting, csc.getClickhouseSettings().get(clickhouseSetting));
901896
}
902897
//insertSettings.setOption(ClickHouseClientOption.WRITE_BUFFER_SIZE.name(), 8192);
903898

@@ -1032,7 +1027,7 @@ protected void doInsertStringV2(List<Record> records, Table table, QueryIdentifi
10321027
insertSettings.setQueryId(queryId.getQueryId());
10331028

10341029
for (String clickhouseSetting : csc.getClickhouseSettings().keySet()) {//THIS ASSUMES YOU DON'T ADD insert_deduplication_token
1035-
insertSettings.setOption(clickhouseSetting, csc.getClickhouseSettings().get(clickhouseSetting));
1030+
insertSettings.serverSetting(clickhouseSetting, csc.getClickhouseSettings().get(clickhouseSetting));
10361031
}
10371032
// insertSettings.setOption(ClickHouseClientOption.WRITE_BUFFER_SIZE.name(), 8192);
10381033

src/main/java/com/clickhouse/kafka/connect/sink/db/helper/ClickHouseHelperClient.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -388,7 +388,7 @@ public Table describeTableV2(String database, String tableName) {
388388
Table table = new Table(database, tableName);
389389
try {
390390
QuerySettings settings = new QuerySettings().setFormat(ClickHouseFormat.JSONEachRow);
391-
settings.setOption("describe_include_subcolumns", true);
391+
settings.serverSetting("describe_include_subcolumns", "1");
392392
settings.setDatabase(database);
393393
QueryResponse queryResponse = client.query(describeQuery, settings).get();
394394
try (BufferedReader br = new BufferedReader(new InputStreamReader(queryResponse.getInputStream()))) {

src/test/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkTaskWithSchemaTest.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import org.apache.kafka.connect.sink.SinkRecord;
1111
import org.json.JSONObject;
1212
import org.junit.jupiter.api.Assumptions;
13+
import org.junit.jupiter.api.Disabled;
1314
import org.junit.jupiter.api.Test;
1415
import org.junit.jupiter.api.extension.ExtendWith;
1516
import org.slf4j.Logger;
@@ -528,6 +529,7 @@ public void supportEnumTest() {
528529
assertEquals(sr.size(), ClickHouseTestHelpers.countRows(chc, topic));
529530
}
530531

532+
@Disabled("Disabled because it requires a flag on the instance.")
531533
@Test
532534
@SinceClickHouseVersion("24.1")
533535
public void schemaWithTupleOfMapsWithVariantTest() {
@@ -586,10 +588,14 @@ public void schemaWithTupleOfMapsWithVariantTest() {
586588
}
587589
}
588590

591+
@Disabled("Disabled because it requires a flag on the instance.")
589592
@Test
590593
@SinceClickHouseVersion("24.1")
591594
public void schemaWithNestedTupleMapArrayAndVariant() {
592-
Assumptions.assumeFalse(isCloud, "Skip test since experimental is not available in cloud");
595+
if (isCloud) {
596+
LOGGER.warn("Skip test since experimental is not available in cloud");
597+
return;
598+
}
593599
Map<String, String> props = createProps();
594600
ClickHouseHelperClient chc = createClient(props);
595601
String topic = "nested-tuple-map-array-and-variant-table-test";

0 commit comments

Comments
 (0)