Skip to content

Commit 8562081

Browse files
authored
Merge pull request #480 from ClickHouse/add-bypass-flag
2 parents ccad169 + 6dc6699 commit 8562081

File tree

4 files changed

+33
-2
lines changed

4 files changed

+33
-2
lines changed

Diff for: CHANGELOG.md

+4-1
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,12 @@
1+
# 1.2.6
2+
* Detect if table schema has changed and refresh the schema
3+
* Allow bypassing field cleanup
4+
15
# 1.2.5
26
* Remove redis state provide since we are using KeeperMap for state storage
37
* Remove unused avro property from `build.gradle.kts`
48
* Trim schemaless data to only pass the fields that are in the table
59
* Allow bypassing the schema validation
6-
* Detect if table schema has changed and refresh the schema
710

811
# 1.2.4
912
* Adjusting underlying client version to 0.7.0

Diff for: VERSION

+1-1
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
v1.2.5
1+
v1.2.6

Diff for: src/main/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkConfig.java

+24
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ public class ClickHouseSinkConfig {
4949
public static final String DATE_TIME_FORMAT = "dateTimeFormats";
5050
public static final String TOLERATE_STATE_MISMATCH = "tolerateStateMismatch";
5151
public static final String BYPASS_SCHEMA_VALIDATION = "bypassSchemaValidation";
52+
public static final String BYPASS_FIELD_CLEANUP = "bypassFieldCleanup";
5253

5354
public static final int MILLI_IN_A_SEC = 1000;
5455
private static final String databaseDefault = "default";
@@ -90,6 +91,7 @@ public class ClickHouseSinkConfig {
9091
private final String clientVersion;
9192
private final boolean tolerateStateMismatch;
9293
private final boolean bypassSchemaValidation;
94+
private final boolean bypassFieldCleanup;
9395

9496
public enum InsertFormats {
9597
NONE,
@@ -263,6 +265,7 @@ public ClickHouseSinkConfig(Map<String, String> props) {
263265
this.clientVersion = props.getOrDefault(CLIENT_VERSION, "V1");
264266
this.tolerateStateMismatch = Boolean.parseBoolean(props.getOrDefault(TOLERATE_STATE_MISMATCH, "false"));
265267
this.bypassSchemaValidation = Boolean.parseBoolean(props.getOrDefault(BYPASS_SCHEMA_VALIDATION, "false"));
268+
this.bypassFieldCleanup = Boolean.parseBoolean(props.getOrDefault(BYPASS_FIELD_CLEANUP, "false"));
266269

267270
LOGGER.debug("ClickHouseSinkConfig: hostname: {}, port: {}, database: {}, username: {}, sslEnabled: {}, timeout: {}, retry: {}, exactlyOnce: {}",
268271
hostname, port, database, username, sslEnabled, timeout, retry, exactlyOnce);
@@ -285,6 +288,7 @@ public void addClickHouseSetting(String key, String value, boolean override) {
285288
private static ConfigDef createConfigDef() {
286289
ConfigDef configDef = new ConfigDef();
287290

291+
//TODO: At some point we should group these more clearly
288292
String group = "Connection";
289293
int orderInGroup = 0;
290294
configDef.define(HOSTNAME,
@@ -568,6 +572,26 @@ private static ConfigDef createConfigDef() {
568572
ConfigDef.Width.SHORT,
569573
"Tolerate state mismatch."
570574
);
575+
configDef.define(BYPASS_SCHEMA_VALIDATION,
576+
ConfigDef.Type.BOOLEAN,
577+
false,
578+
ConfigDef.Importance.LOW,
579+
"Bypass schema validation. default: false",
580+
group,
581+
++orderInGroup,
582+
ConfigDef.Width.SHORT,
583+
"Bypass schema validation."
584+
);
585+
configDef.define(BYPASS_FIELD_CLEANUP,
586+
ConfigDef.Type.BOOLEAN,
587+
false,
588+
ConfigDef.Importance.LOW,
589+
"Bypass field cleanup. default: false",
590+
group,
591+
++orderInGroup,
592+
ConfigDef.Width.SHORT,
593+
"Bypass field cleanup."
594+
);
571595
return configDef;
572596
}
573597
}

Diff for: src/main/java/com/clickhouse/kafka/connect/sink/db/ClickHouseWriter.java

+4
Original file line numberDiff line numberDiff line change
@@ -967,6 +967,10 @@ protected void doInsertJsonV2(List<Record> records, Table table, QueryIdentifier
967967
}
968968

969969
protected Map<String, Object> cleanupExtraFields(Map<String, Object> m, Table t) {
970+
if (csc.isBypassFieldCleanup()) {
971+
return m;
972+
}
973+
970974
Map<String, Object> cleaned = new HashMap<>();
971975
for (Column c : t.getRootColumnsList()) {
972976
if (m.containsKey(c.getName())) {

0 commit comments

Comments
 (0)