Skip to content

Commit 39d7591

Browse files
authoredNov 4, 2024
Merge pull request #466 from ClickHouse/updated-java-client-version
Update build.gradle.kts
2 parents b97c692 + baa0abf commit 39d7591

File tree

2 files changed

+74
-40
lines changed

2 files changed

+74
-40
lines changed
 

‎build.gradle.kts

+1-2
Original file line numberDiff line numberDiff line change
@@ -45,11 +45,10 @@ repositories {
4545
mavenCentral()
4646
maven("https://packages.confluent.io/maven/")
4747
maven("https://jitpack.io")
48-
maven{ url = uri("https://s01.oss.sonatype.org/content/repositories/snapshots/") }
4948
}
5049

5150
extra.apply {
52-
set("clickHouseDriverVersion", "0.7.0-SNAPSHOT")
51+
set("clickHouseDriverVersion", "0.7.1")
5352
set("kafkaVersion", "2.7.0")
5453
set("avroVersion", "1.9.2")
5554

‎src/test/java/com/clickhouse/kafka/connect/sink/helper/ClickHouseTestHelpers.java

+73-38
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,10 @@ public class ClickHouseTestHelpers {
3737
public static final String HTTPS_PORT = "8443";
3838
public static final String DATABASE_DEFAULT = "default";
3939
public static final String USERNAME_DEFAULT = "default";
40+
41+
private static final int CLOUD_TIMEOUT_VALUE = 900;
42+
private static final TimeUnit CLOUD_TIMEOUT_UNIT = TimeUnit.SECONDS;
43+
4044
public static String getClickhouseVersion() {
4145
String clickHouseVersion = System.getenv("CLICKHOUSE_VERSION");
4246
if (clickHouseVersion == null) {
@@ -57,16 +61,38 @@ public static void query(ClickHouseHelperClient chc, String query) {
5761
chc.queryV1(query);
5862
}
5963
}
60-
public static void dropTable(ClickHouseHelperClient chc, String tableName) {
64+
65+
public static OperationMetrics dropTable(ClickHouseHelperClient chc, String tableName) {
66+
for (int i = 0; i < 5; i++) {
67+
try {
68+
OperationMetrics operationMetrics = dropTableLoop(chc, tableName);
69+
if (operationMetrics != null) {
70+
return operationMetrics;
71+
}
72+
} catch (Exception e) {
73+
LOGGER.error("Error while sleeping", e);
74+
}
75+
76+
try {
77+
Thread.sleep(30000);//Sleep for 30 seconds
78+
} catch (InterruptedException e) {
79+
LOGGER.error("Error while sleeping", e);
80+
}
81+
}
82+
83+
return null;
84+
}
85+
private static OperationMetrics dropTableLoop(ClickHouseHelperClient chc, String tableName) {
6186
String dropTable = String.format("DROP TABLE IF EXISTS `%s`", tableName);
6287
try {
63-
chc.getClient().queryRecords(dropTable).get(10, TimeUnit.SECONDS);
64-
} catch (InterruptedException e) {
65-
throw new RuntimeException(e);
66-
} catch (ExecutionException | TimeoutException e) {
88+
return chc.getClient().queryRecords(dropTable).get(CLOUD_TIMEOUT_VALUE, CLOUD_TIMEOUT_UNIT).getMetrics();
89+
} catch (Exception e) {
6790
throw new RuntimeException(e);
6891
}
6992
}
93+
94+
95+
7096
public static OperationMetrics createTable(ClickHouseHelperClient chc, String tableName, String createTableQuery) {
7197
LOGGER.info("Creating table: {}, Query: {}", tableName, createTableQuery);
7298
OperationMetrics operationMetrics = createTable(chc, tableName, createTableQuery, new HashMap<>());
@@ -81,18 +107,38 @@ public static OperationMetrics createTable(ClickHouseHelperClient chc, String ta
81107
}
82108

83109
public static OperationMetrics createTable(ClickHouseHelperClient chc, String tableName, String createTableQuery, Map<String, Serializable> clientSettings) {
110+
for (int i = 0; i < 5; i++) {
111+
try {
112+
OperationMetrics operationMetrics = createTableLoop(chc, tableName, createTableQuery, clientSettings);
113+
if (operationMetrics != null) {
114+
return operationMetrics;
115+
}
116+
} catch (Exception e) {
117+
LOGGER.error("Error while sleeping", e);
118+
}
119+
120+
try {
121+
Thread.sleep(30000);//Sleep for 30 seconds
122+
} catch (InterruptedException e) {
123+
LOGGER.error("Error while sleeping", e);
124+
}
125+
}
126+
127+
return null;
128+
}
129+
private static OperationMetrics createTableLoop(ClickHouseHelperClient chc, String tableName, String createTableQuery, Map<String, Serializable> clientSettings) {
84130
final String createTableQueryTmp = String.format(createTableQuery, tableName);
85131
QuerySettings settings = new QuerySettings();
86132
for (Map.Entry<String, Serializable> entry : clientSettings.entrySet()) {
87133
settings.setOption(entry.getKey(), entry.getValue());
88134
}
89135
try {
90-
Records records = chc.getClient().queryRecords(createTableQueryTmp, settings).get(120, java.util.concurrent.TimeUnit.SECONDS);
91-
return records.getMetrics();
136+
return chc.getClient().queryRecords(createTableQueryTmp, settings).get(CLOUD_TIMEOUT_VALUE, CLOUD_TIMEOUT_UNIT).getMetrics();
92137
} catch (Exception e) {
93138
throw new RuntimeException(e);
94139
}
95140
}
141+
96142
public static List<JSONObject> getAllRowsAsJson(ClickHouseHelperClient chc, String tableName) {
97143
String query = String.format("SELECT * FROM `%s`", tableName);
98144
QuerySettings querySettings = new QuerySettings();
@@ -114,29 +160,26 @@ public static List<JSONObject> getAllRowsAsJson(ClickHouseHelperClient chc, Stri
114160
} catch (IOException e) {
115161
throw new RuntimeException(e);
116162
}
117-
// try (ClickHouseClient client = ClickHouseClient.builder()
118-
// .options(chc.getDefaultClientOptions())
119-
// .nodeSelector(ClickHouseNodeSelector.of(ClickHouseProtocol.HTTP))
120-
// .build();
121-
// ClickHouseResponse response = client.read(chc.getServer())
122-
// .query(query)
123-
// .format(ClickHouseFormat.JSONEachRow)
124-
// .executeAndWait()) {
125-
//
126-
// return StreamSupport.stream(response.records().spliterator(), false)
127-
// .map(record -> record.getValue(0).asString())
128-
// .map(JSONObject::new)
129-
// .collect(Collectors.toList());
130-
// } catch (ClickHouseException e) {
131-
// throw new RuntimeException(e);
132-
// }
163+
}
164+
165+
166+
public static OperationMetrics optimizeTable(ClickHouseHelperClient chc, String tableName) {
167+
String queryCount = String.format("OPTIMIZE TABLE `%s`", tableName);
168+
169+
try {
170+
Records records = chc.getClient().queryRecords(queryCount).get(CLOUD_TIMEOUT_VALUE, CLOUD_TIMEOUT_UNIT);
171+
return records.getMetrics();
172+
} catch (Exception e) {
173+
return null;
174+
}
133175
}
134176

135177
public static int countRows(ClickHouseHelperClient chc, String tableName) {
136178
String queryCount = String.format("SELECT COUNT(*) FROM `%s`", tableName);
137179

138180
try {
139-
Records records = chc.getClient().queryRecords(queryCount).get(120, TimeUnit.SECONDS);
181+
optimizeTable(chc, tableName);
182+
Records records = chc.getClient().queryRecords(queryCount).get(CLOUD_TIMEOUT_VALUE, CLOUD_TIMEOUT_UNIT);
140183
// Note we probrbly need asInteger() here
141184
String value = records.iterator().next().getString(1);
142185
return Integer.parseInt(value);
@@ -152,25 +195,21 @@ public static int countRows(ClickHouseHelperClient chc, String tableName) {
152195
public static int sumRows(ClickHouseHelperClient chc, String tableName, String column) {
153196
String queryCount = String.format("SELECT SUM(`%s`) FROM `%s`", column, tableName);
154197
try {
155-
Records records = chc.getClient().queryRecords(queryCount).get();
198+
Records records = chc.getClient().queryRecords(queryCount).get(CLOUD_TIMEOUT_VALUE, CLOUD_TIMEOUT_UNIT);
156199
String value = records.iterator().next().getString(1);
157200
return (int)(Float.parseFloat(value));
158-
} catch (InterruptedException e) {
159-
throw new RuntimeException(e);
160-
} catch (ExecutionException e) {
201+
} catch (Exception e) {
161202
throw new RuntimeException(e);
162203
}
163204
}
164205

165206
public static int countRowsWithEmojis(ClickHouseHelperClient chc, String tableName) {
166207
String queryCount = "SELECT COUNT(*) FROM `" + tableName + "` WHERE str LIKE '%\uD83D\uDE00%'";
167208
try {
168-
Records records = chc.getClient().queryRecords(queryCount).get();
209+
Records records = chc.getClient().queryRecords(queryCount).get(CLOUD_TIMEOUT_VALUE, CLOUD_TIMEOUT_UNIT);
169210
String value = records.iterator().next().getString(1);
170211
return (int)(Float.parseFloat(value));
171-
} catch (InterruptedException e) {
172-
throw new RuntimeException(e);
173-
} catch (ExecutionException e) {
212+
} catch (Exception e) {
174213
throw new RuntimeException(e);
175214
}
176215
}
@@ -180,7 +219,7 @@ public static boolean validateRows(ClickHouseHelperClient chc, String topic, Col
180219
try {
181220
QuerySettings querySettings = new QuerySettings();
182221
querySettings.setFormat(ClickHouseFormat.JSONStringsEachRow);
183-
QueryResponse queryResponse = chc.getClient().query(String.format("SELECT * FROM `%s`", topic), querySettings).get();
222+
QueryResponse queryResponse = chc.getClient().query(String.format("SELECT * FROM `%s`", topic), querySettings).get(CLOUD_TIMEOUT_VALUE, CLOUD_TIMEOUT_UNIT);
184223
Gson gson = new Gson();
185224

186225
List<String> records = new ArrayList<>();
@@ -218,11 +257,7 @@ public static boolean validateRows(ClickHouseHelperClient chc, String topic, Col
218257
}
219258

220259
LOGGER.info("Match? {}", match);
221-
} catch (ExecutionException e) {
222-
throw new RuntimeException(e);
223-
} catch (InterruptedException e) {
224-
throw new RuntimeException(e);
225-
} catch (IOException e) {
260+
} catch (Exception e) {
226261
throw new RuntimeException(e);
227262
}
228263

0 commit comments

Comments
 (0)
Please sign in to comment.