Skip to content

Commit baa0abf

Browse files
committed
Update ClickHouseTestHelpers.java
1 parent 7e6aa9c commit baa0abf

File tree

1 file changed

+48
-4
lines changed

1 file changed

+48
-4
lines changed

src/test/java/com/clickhouse/kafka/connect/sink/helper/ClickHouseTestHelpers.java

Lines changed: 48 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -61,14 +61,38 @@ public static void query(ClickHouseHelperClient chc, String query) {
6161
chc.queryV1(query);
6262
}
6363
}
64-
public static void dropTable(ClickHouseHelperClient chc, String tableName) {
64+
65+
public static OperationMetrics dropTable(ClickHouseHelperClient chc, String tableName) {
66+
for (int i = 0; i < 5; i++) {
67+
try {
68+
OperationMetrics operationMetrics = dropTableLoop(chc, tableName);
69+
if (operationMetrics != null) {
70+
return operationMetrics;
71+
}
72+
} catch (Exception e) {
73+
LOGGER.error("Error while sleeping", e);
74+
}
75+
76+
try {
77+
Thread.sleep(30000);//Sleep for 30 seconds
78+
} catch (InterruptedException e) {
79+
LOGGER.error("Error while sleeping", e);
80+
}
81+
}
82+
83+
return null;
84+
}
85+
private static OperationMetrics dropTableLoop(ClickHouseHelperClient chc, String tableName) {
6586
String dropTable = String.format("DROP TABLE IF EXISTS `%s`", tableName);
6687
try {
67-
chc.getClient().queryRecords(dropTable).get(CLOUD_TIMEOUT_VALUE, CLOUD_TIMEOUT_UNIT);
88+
return chc.getClient().queryRecords(dropTable).get(CLOUD_TIMEOUT_VALUE, CLOUD_TIMEOUT_UNIT).getMetrics();
6889
} catch (Exception e) {
6990
throw new RuntimeException(e);
7091
}
7192
}
93+
94+
95+
7296
public static OperationMetrics createTable(ClickHouseHelperClient chc, String tableName, String createTableQuery) {
7397
LOGGER.info("Creating table: {}, Query: {}", tableName, createTableQuery);
7498
OperationMetrics operationMetrics = createTable(chc, tableName, createTableQuery, new HashMap<>());
@@ -83,18 +107,38 @@ public static OperationMetrics createTable(ClickHouseHelperClient chc, String ta
83107
}
84108

85109
public static OperationMetrics createTable(ClickHouseHelperClient chc, String tableName, String createTableQuery, Map<String, Serializable> clientSettings) {
110+
for (int i = 0; i < 5; i++) {
111+
try {
112+
OperationMetrics operationMetrics = createTableLoop(chc, tableName, createTableQuery, clientSettings);
113+
if (operationMetrics != null) {
114+
return operationMetrics;
115+
}
116+
} catch (Exception e) {
117+
LOGGER.error("Error while sleeping", e);
118+
}
119+
120+
try {
121+
Thread.sleep(30000);//Sleep for 30 seconds
122+
} catch (InterruptedException e) {
123+
LOGGER.error("Error while sleeping", e);
124+
}
125+
}
126+
127+
return null;
128+
}
129+
private static OperationMetrics createTableLoop(ClickHouseHelperClient chc, String tableName, String createTableQuery, Map<String, Serializable> clientSettings) {
86130
final String createTableQueryTmp = String.format(createTableQuery, tableName);
87131
QuerySettings settings = new QuerySettings();
88132
for (Map.Entry<String, Serializable> entry : clientSettings.entrySet()) {
89133
settings.setOption(entry.getKey(), entry.getValue());
90134
}
91135
try {
92-
Records records = chc.getClient().queryRecords(createTableQueryTmp, settings).get(CLOUD_TIMEOUT_VALUE, CLOUD_TIMEOUT_UNIT);
93-
return records.getMetrics();
136+
return chc.getClient().queryRecords(createTableQueryTmp, settings).get(CLOUD_TIMEOUT_VALUE, CLOUD_TIMEOUT_UNIT).getMetrics();
94137
} catch (Exception e) {
95138
throw new RuntimeException(e);
96139
}
97140
}
141+
98142
public static List<JSONObject> getAllRowsAsJson(ClickHouseHelperClient chc, String tableName) {
99143
String query = String.format("SELECT * FROM `%s`", tableName);
100144
QuerySettings querySettings = new QuerySettings();

0 commit comments

Comments
 (0)