Skip to content

Commit d5787d1

Browse files
authored
Merge pull request #2289 from ClickHouse/add-column-names
Add support for partial insert
2 parents f35261e + 6b3a229 commit d5787d1

File tree

3 files changed

+93
-466
lines changed

3 files changed

+93
-466
lines changed

client-v2/src/main/java/com/clickhouse/client/api/Client.java

+64-5
Original file line numberDiff line numberDiff line change
@@ -1425,9 +1425,23 @@ public CompletableFuture<InsertResponse> insert(String tableName, List<?> data,
14251425
* @return {@code CompletableFuture<InsertResponse>} - a promise to insert response
14261426
*/
14271427
public CompletableFuture<InsertResponse> insert(String tableName, InputStream data, ClickHouseFormat format) {
1428-
return insert(tableName, data, format, new InsertSettings());
1428+
return insert(tableName, Collections.emptyList(), data, format, new InsertSettings());
14291429
}
14301430

1431+
/**
1432+
* <p>Sends write request to database. Input data is read from the input stream.</p>
1433+
*
1434+
* @param tableName - destination table name
1435+
* @param data - data stream to insert
1436+
* @param format - format of the data in the stream
1437+
* @return {@code CompletableFuture<InsertResponse>} - a promise to insert response
1438+
*/
1439+
public CompletableFuture<InsertResponse> insert(String tableName, List<String> columnNames, InputStream data, ClickHouseFormat format) {
1440+
return insert(tableName, columnNames, data, format, new InsertSettings());
1441+
}
1442+
1443+
1444+
14311445
/**
14321446
* Sends write request to database. Input data is read from the input stream.
14331447
*
@@ -1441,6 +1455,23 @@ public CompletableFuture<InsertResponse> insert(String tableName,
14411455
InputStream data,
14421456
ClickHouseFormat format,
14431457
InsertSettings settings) {
1458+
return insert(tableName, Collections.emptyList(), data, format, settings);
1459+
}
1460+
/**
1461+
* Sends write request to database. Input data is read from the input stream.
1462+
*
1463+
* @param tableName - destination table name
1464+
* @param columnNames - list of column names to insert data into. If null or empty, all columns will be used.
1465+
* @param data - data stream to insert
1466+
* @param format - format of the data in the stream
1467+
* @param settings - insert operation settings
1468+
* @return {@code CompletableFuture<InsertResponse>} - a promise to insert response
1469+
*/
1470+
public CompletableFuture<InsertResponse> insert(String tableName,
1471+
List<String> columnNames,
1472+
InputStream data,
1473+
ClickHouseFormat format,
1474+
InsertSettings settings) {
14441475

14451476
final int writeBufferSize = settings.getInputStreamCopyBufferSize() <= 0 ?
14461477
Integer.parseInt(configuration.getOrDefault(ClientConfigProperties.CLIENT_NETWORK_BUFFER_SIZE.getKey(),
@@ -1451,7 +1482,7 @@ public CompletableFuture<InsertResponse> insert(String tableName,
14511482
throw new IllegalArgumentException("Buffer size must be greater than 0");
14521483
}
14531484

1454-
return insert(tableName, new DataStreamWriter() {
1485+
return insert(tableName, columnNames, new DataStreamWriter() {
14551486
@Override
14561487
public void onOutput(OutputStream out) throws IOException {
14571488
byte[] buffer = new byte[writeBufferSize];
@@ -1480,11 +1511,29 @@ public void onRetry() throws IOException {
14801511
* @return {@code CompletableFuture<InsertResponse>} - a promise to insert response
14811512
*/
14821513
public CompletableFuture<InsertResponse> insert(String tableName,
1514+
DataStreamWriter writer,
1515+
ClickHouseFormat format,
1516+
InsertSettings settings) {
1517+
return insert(tableName, Collections.emptyList(), writer, format, settings);
1518+
}
1519+
1520+
/**
1521+
* Does an insert request to a server. Data is pushed when a {@link DataStreamWriter#onOutput(OutputStream)} is called.
1522+
*
1523+
* @param tableName - target table name
1524+
* @param columnNames - list of column names to insert data into. If null or empty, all columns will be used.
1525+
* @param writer - {@link DataStreamWriter} implementation
1526+
* @param format - source format
1527+
* @param settings - operation settings
1528+
* @return {@code CompletableFuture<InsertResponse>} - a promise to insert response
1529+
*/
1530+
public CompletableFuture<InsertResponse> insert(String tableName,
1531+
List<String> columnNames,
14831532
DataStreamWriter writer,
14841533
ClickHouseFormat format,
14851534
InsertSettings settings) {
14861535

1487-
String operationId = (String) settings.getOperationId();
1536+
String operationId = settings.getOperationId();
14881537
ClientStatisticsHolder clientStats = null;
14891538
if (operationId != null) {
14901539
clientStats = globalClientStats.remove(operationId);
@@ -1509,8 +1558,18 @@ public CompletableFuture<InsertResponse> insert(String tableName,
15091558

15101559
settings.setOption(ClientConfigProperties.INPUT_OUTPUT_FORMAT.getKey(), format.name());
15111560
final InsertSettings finalSettings = settings;
1512-
final String sqlStmt = "INSERT INTO " + tableName + " FORMAT " + format.name();
1513-
finalSettings.serverSetting(ClickHouseHttpProto.QPARAM_QUERY_STMT, sqlStmt);
1561+
1562+
StringBuilder sqlStmt = new StringBuilder("INSERT INTO ").append(tableName);
1563+
if (columnNames != null && !columnNames.isEmpty()) {
1564+
sqlStmt.append(" (");
1565+
for (String columnName : columnNames) {
1566+
sqlStmt.append(columnName).append(", ");
1567+
}
1568+
sqlStmt.deleteCharAt(sqlStmt.length() - 2);
1569+
sqlStmt.append(")");
1570+
}
1571+
sqlStmt.append(" FORMAT ").append(format.name());
1572+
finalSettings.serverSetting(ClickHouseHttpProto.QPARAM_QUERY_STMT, sqlStmt.toString());
15141573
responseSupplier = () -> {
15151574
long startTime = System.nanoTime();
15161575
// Selecting some node

client-v2/src/test/java/com/clickhouse/client/insert/InsertTests.java

+25
Original file line numberDiff line numberDiff line change
@@ -294,6 +294,31 @@ public void insertRawDataSimple(String tableName) throws Exception {
294294
assertEquals((int)response.getWrittenRows(), numberOfRecords );
295295
}
296296

297+
@Test(groups = { "integration" })
298+
public void insertRawDataFewerColumns() throws Exception {
299+
final String tableName = "raw_data_select_columns_table";
300+
final String createSQL = "CREATE TABLE " + tableName +
301+
" (Id UInt32, event_ts Timestamp, name String, p1 Int64, p2 String, p3 String, p4 Int8) ENGINE = MergeTree() ORDER BY ()";
302+
List<String> columnNames = Arrays.asList("Id", "event_ts", "name", "p1", "p2");
303+
304+
initTable(tableName, createSQL);
305+
306+
settings.setInputStreamCopyBufferSize(8198 * 2);
307+
ByteArrayOutputStream data = new ByteArrayOutputStream();
308+
PrintWriter writer = new PrintWriter(data);
309+
for (int i = 0; i < 1000; i++) {
310+
writer.printf("%d\t%s\t%s\t%d\t%s\n", i, "2021-01-01 00:00:00", "name" + i, i, "p2");
311+
}
312+
writer.flush();
313+
InsertResponse response = client.insert(tableName, columnNames, new ByteArrayInputStream(data.toByteArray()),
314+
ClickHouseFormat.TSV, settings).get(30, TimeUnit.SECONDS);
315+
OperationMetrics metrics = response.getMetrics();
316+
assertEquals((int)response.getWrittenRows(), 1000 );
317+
318+
List<GenericRecord> records = client.queryAll("SELECT * FROM " + tableName);
319+
assertEquals(records.size(), 1000);
320+
}
321+
297322
@DataProvider(name = "insertRawDataSimpleDataProvider")
298323
public static Object[][] insertRawDataSimpleDataProvider() {
299324
return new Object[][] {

0 commit comments

Comments
 (0)