Skip to content

Commit 3b488b4

Browse files
committed
Merge branch 'main' into jmh_app
2 parents 3d02816 + 935e491 commit 3b488b4

File tree

15 files changed

+76
-23
lines changed

15 files changed

+76
-23
lines changed

clickhouse-jdbc/pom.xml

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@
4848

4949
<dependency>
5050
<groupId>org.lz4</groupId>
51-
<artifactId>lz4-pure-java</artifactId>
51+
<artifactId>lz4-java</artifactId>
5252
</dependency>
5353
<dependency>
5454
<groupId>com.google.code.gson</groupId>
@@ -425,7 +425,7 @@
425425
<include>org.apache.httpcomponents.core5:httpcore5</include>
426426
<include>org.apache.httpcomponents.core5:httpcore5-h2</include>
427427
<include>com.clickhouse:jdbc-v2</include>
428-
<include>org.lz4:lz4-pure-java</include>
428+
<include>org.lz4:lz4-java</include>
429429
</includes>
430430
</artifactSet>
431431
<relocations>
@@ -495,7 +495,7 @@
495495
<include>org.apache.httpcomponents.client5:httpclient5</include>
496496
<include>org.apache.httpcomponents.core5:httpcore5</include>
497497
<include>org.apache.httpcomponents.core5:httpcore5-h2</include>
498-
<include>org.lz4:lz4-pure-java</include>
498+
<include>org.lz4:lz4-java</include>
499499
<include>com.clickhouse:jdbc-v2</include>
500500
<include>com.clickhouse:client-v2</include>
501501
</includes>
@@ -573,7 +573,7 @@
573573
<include>com.clickhouse:clickhouse-data</include>
574574
<include>com.clickhouse:clickhouse-client</include>
575575
<include>com.clickhouse:clickhouse-http-client</include>
576-
<include>org.lz4:lz4-pure-java</include>
576+
<include>org.lz4:lz4-java</include>
577577
</includes>
578578
</artifactSet>
579579
<relocations>

client-v2/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@
5353

5454
<dependency>
5555
<groupId>org.lz4</groupId>
56-
<artifactId>lz4-pure-java</artifactId>
56+
<artifactId>lz4-java</artifactId>
5757
<version>${lz4.version}</version>
5858
</dependency>
5959

client-v2/src/main/java/com/clickhouse/client/api/Client.java

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@
3939
import com.clickhouse.client.config.ClickHouseClientOption;
4040
import com.clickhouse.data.ClickHouseColumn;
4141
import com.clickhouse.data.ClickHouseFormat;
42+
import net.jpountz.lz4.LZ4Compressor;
43+
import net.jpountz.lz4.LZ4Factory;
4244
import org.apache.hc.core5.concurrent.DefaultThreadFactory;
4345
import org.apache.hc.core5.http.ClassicHttpResponse;
4446
import org.apache.hc.core5.http.Header;
@@ -144,6 +146,7 @@ public class Client implements AutoCloseable {
144146
private String serverVersion;
145147
private Object metricsRegistry;
146148
private int retries;
149+
private LZ4Factory lz4Factory = null;
147150

148151
private Client(Set<String> endpoints, Map<String,String> configuration, boolean useNewImplementation,
149152
ExecutorService sharedOperationExecutor, ColumnToMethodMatchingStrategy columnToMethodMatchingStrategy) {
@@ -176,6 +179,12 @@ private Client(Set<String> endpoints, Map<String,String> configuration, boolean
176179

177180
String retry = configuration.get(ClientConfigProperties.RETRY_ON_FAILURE.getKey());
178181
this.retries = retry == null ? 0 : Integer.parseInt(retry);
182+
boolean useNativeCompression = !MapUtils.getFlag(configuration, ClientConfigProperties.DISABLE_NATIVE_COMPRESSION.getKey(), false);
183+
if (useNativeCompression) {
184+
this.lz4Factory = LZ4Factory.fastestInstance();
185+
} else {
186+
this.lz4Factory = LZ4Factory.fastestJavaInstance();
187+
}
179188
}
180189

181190
/**
@@ -589,6 +598,17 @@ public Builder setLZ4UncompressedBufferSize(int size) {
589598
return this;
590599
}
591600

601+
/**
602+
* Disable native compression. If set to true then native compression will be disabled.
603+
* If from some reason the native compressor is not working then it can be disabled.
604+
* @param disable
605+
* @return
606+
*/
607+
public Builder disableNativeCompression(boolean disable) {
608+
this.configuration.put(ClientConfigProperties.DISABLE_NATIVE_COMPRESSION.getKey(), String.valueOf(disable));
609+
return this;
610+
}
611+
592612
/**
593613
* Sets the default database name that will be used by operations if not specified.
594614
* @param database - actual default database name.
@@ -1062,6 +1082,10 @@ private void setDefaults() {
10621082
setLZ4UncompressedBufferSize(ClickHouseLZ4OutputStream.UNCOMPRESSED_BUFF_SIZE);
10631083
}
10641084

1085+
if (!configuration.containsKey(ClientConfigProperties.DISABLE_NATIVE_COMPRESSION.getKey())) {
1086+
disableNativeCompression(false);
1087+
}
1088+
10651089
if (!configuration.containsKey(ClientConfigProperties.USE_SERVER_TIMEZONE.getKey())) {
10661090
useServerTimeZone(true);
10671091
}
@@ -1338,7 +1362,7 @@ public CompletableFuture<InsertResponse> insert(String tableName, List<?> data,
13381362
for (int i = 0; i <= maxRetries; i++) {
13391363
// Execute request
13401364
try (ClassicHttpResponse httpResponse =
1341-
httpClientHelper.executeRequest(selectedNode, finalSettings.getAllSettings(),
1365+
httpClientHelper.executeRequest(selectedNode, finalSettings.getAllSettings(), lz4Factory,
13421366
out -> {
13431367
out.write("INSERT INTO ".getBytes());
13441368
out.write(tableName.getBytes());
@@ -1496,7 +1520,7 @@ public CompletableFuture<InsertResponse> insert(String tableName,
14961520
for (int i = 0; i <= retries; i++) {
14971521
// Execute request
14981522
try (ClassicHttpResponse httpResponse =
1499-
httpClientHelper.executeRequest(selectedNode, finalSettings.getAllSettings(),
1523+
httpClientHelper.executeRequest(selectedNode, finalSettings.getAllSettings(), lz4Factory,
15001524
out -> {
15011525
writer.onOutput(out);
15021526
out.close();
@@ -1619,7 +1643,7 @@ public CompletableFuture<QueryResponse> query(String sqlQuery, Map<String, Objec
16191643
for (int i = 0; i <= retries; i++) {
16201644
try {
16211645
ClassicHttpResponse httpResponse =
1622-
httpClientHelper.executeRequest(selectedNode, finalSettings.getAllSettings(), output -> {
1646+
httpClientHelper.executeRequest(selectedNode, finalSettings.getAllSettings(), lz4Factory, output -> {
16231647
output.write(sqlQuery.getBytes(StandardCharsets.UTF_8));
16241648
output.close();
16251649
});
@@ -2055,6 +2079,7 @@ private <T> CompletableFuture<T> runAsyncOperation(Supplier<T> resultSupplier, M
20552079
public String toString() {
20562080
return "Client{" +
20572081
"endpoints=" + endpoints +
2082+
",lz4factory" + lz4Factory +
20582083
'}';
20592084
}
20602085

client-v2/src/main/java/com/clickhouse/client/api/ClientConfigProperties.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,8 @@ public enum ClientConfigProperties {
7070

7171
COMPRESSION_LZ4_UNCOMPRESSED_BUF_SIZE("compression.lz4.uncompressed_buffer_size"),
7272

73+
DISABLE_NATIVE_COMPRESSION("disable_native_compression", "false"),
74+
7375
PROXY_TYPE("proxy_type"), // "http"
7476

7577
PROXY_HOST("proxy_host"),

client-v2/src/main/java/com/clickhouse/client/api/internal/ClickHouseLZ4InputStream.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ public class ClickHouseLZ4InputStream extends InputStream {
2727

2828
public ClickHouseLZ4InputStream(InputStream in, LZ4FastDecompressor decompressor, int bufferSize) {
2929
super();
30+
LOG.debug("Using decompressor {}", decompressor);
3031
this.decompressor = decompressor;
3132
this.in = in;
3233
this.buffer = ByteBuffer.allocate(bufferSize);

client-v2/src/main/java/com/clickhouse/client/api/internal/ClickHouseLZ4OutputStream.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,16 @@
22

33
import com.clickhouse.data.ClickHouseCityHash;
44
import net.jpountz.lz4.LZ4Compressor;
5+
import org.slf4j.Logger;
6+
import org.slf4j.LoggerFactory;
57

68
import java.io.IOException;
79
import java.io.OutputStream;
810
import java.nio.ByteBuffer;
911

1012
public class ClickHouseLZ4OutputStream extends OutputStream {
1113

14+
private static Logger LOG = LoggerFactory.getLogger(ClickHouseLZ4OutputStream.class);
1215
public static final int UNCOMPRESSED_BUFF_SIZE = 64 * 1024; // 64K is most optimal for LZ4 compression
1316

1417
private final ByteBuffer inBuffer;
@@ -26,6 +29,7 @@ public class ClickHouseLZ4OutputStream extends OutputStream {
2629

2730
public ClickHouseLZ4OutputStream(OutputStream out, LZ4Compressor compressor, int bufferSize) {
2831
super();
32+
LOG.debug("Using compressor {}", compressor);
2933
this.inBuffer = ByteBuffer.allocate(bufferSize);
3034
this.out = out;
3135
this.compressor = compressor;

client-v2/src/main/java/com/clickhouse/client/api/internal/HttpAPIClientHelper.java

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import com.clickhouse.client.api.data_formats.internal.SerializerUtils;
1414
import com.clickhouse.client.api.enums.ProxyType;
1515
import com.clickhouse.client.api.http.ClickHouseHttpProto;
16+
import net.jpountz.lz4.LZ4Factory;
1617
import org.apache.hc.client5.http.ConnectTimeoutException;
1718
import org.apache.hc.client5.http.classic.methods.HttpPost;
1819
import org.apache.hc.client5.http.config.ConnectionConfig;
@@ -377,7 +378,7 @@ public Exception readError(ClassicHttpResponse httpResponse) {
377378
private static final long POOL_VENT_TIMEOUT = 10000L;
378379
private AtomicLong timeToPoolVent = new AtomicLong(0);
379380

380-
public ClassicHttpResponse executeRequest(ClickHouseNode server, Map<String, Object> requestConfig,
381+
public ClassicHttpResponse executeRequest(ClickHouseNode server, Map<String, Object> requestConfig, LZ4Factory lz4Factory,
381382
IOCallback<OutputStream> writeCallback) throws IOException {
382383
if (timeToPoolVent.get() < System.currentTimeMillis()) {
383384
timeToPoolVent.set(System.currentTimeMillis() + POOL_VENT_TIMEOUT);
@@ -406,14 +407,14 @@ public ClassicHttpResponse executeRequest(ClickHouseNode server, Map<String, Obj
406407
req.setConfig(baseRequestConfig);
407408
// setting entity. wrapping if compression is enabled
408409
req.setEntity(wrapRequestEntity(new EntityTemplate(-1, CONTENT_TYPE, null, writeCallback),
409-
clientCompression, useHttpCompression, appCompressedData));
410+
clientCompression, useHttpCompression, appCompressedData, lz4Factory));
410411

411412
HttpClientContext context = HttpClientContext.create();
412413

413414
try {
414415
ClassicHttpResponse httpResponse = httpClient.executeOpen(null, req, context);
415416
boolean serverCompression = MapUtils.getFlag(requestConfig, chConfiguration, ClientConfigProperties.COMPRESS_SERVER_RESPONSE.getKey());
416-
httpResponse.setEntity(wrapResponseEntity(httpResponse.getEntity(), httpResponse.getCode(), serverCompression, useHttpCompression));
417+
httpResponse.setEntity(wrapResponseEntity(httpResponse.getEntity(), httpResponse.getCode(), serverCompression, useHttpCompression, lz4Factory));
417418

418419
if (httpResponse.getCode() == HttpStatus.SC_PROXY_AUTHENTICATION_REQUIRED) {
419420
throw new ClientMisconfigurationException("Proxy authentication required. Please check your proxy settings.");
@@ -570,18 +571,18 @@ private void addQueryParams(URIBuilder req, Map<String, String> chConfig, Map<St
570571
}
571572

572573
private HttpEntity wrapRequestEntity(HttpEntity httpEntity, boolean clientCompression, boolean useHttpCompression,
573-
boolean appControlledCompression) {
574+
boolean appControlledCompression, LZ4Factory lz4Factory) {
574575
LOG.debug("client compression: {}, http compression: {}", clientCompression, useHttpCompression);
575576

576577
if (clientCompression && !appControlledCompression) {
577578
return new LZ4Entity(httpEntity, useHttpCompression, false, true,
578-
MapUtils.getInt(chConfiguration, "compression.lz4.uncompressed_buffer_size"), false);
579+
MapUtils.getInt(chConfiguration, "compression.lz4.uncompressed_buffer_size"), false, lz4Factory);
579580
} else {
580581
return httpEntity;
581582
}
582583
}
583584

584-
private HttpEntity wrapResponseEntity(HttpEntity httpEntity, int httpStatus, boolean serverCompression, boolean useHttpCompression) {
585+
private HttpEntity wrapResponseEntity(HttpEntity httpEntity, int httpStatus, boolean serverCompression, boolean useHttpCompression, LZ4Factory lz4Factory) {
585586
LOG.debug("server compression: {}, http compression: {}", serverCompression, useHttpCompression);
586587

587588
if (serverCompression) {
@@ -598,7 +599,7 @@ private HttpEntity wrapResponseEntity(HttpEntity httpEntity, int httpStatus, boo
598599
case HttpStatus.SC_INTERNAL_SERVER_ERROR:
599600
case HttpStatus.SC_NOT_FOUND:
600601
return new LZ4Entity(httpEntity, useHttpCompression, true, false,
601-
MapUtils.getInt(chConfiguration, "compression.lz4.uncompressed_buffer_size"), true);
602+
MapUtils.getInt(chConfiguration, "compression.lz4.uncompressed_buffer_size"), true, lz4Factory);
602603
}
603604
}
604605

client-v2/src/main/java/com/clickhouse/client/api/internal/LZ4Entity.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,14 +27,17 @@ class LZ4Entity implements HttpEntity {
2727

2828
private boolean clientCompression;
2929

30+
private LZ4Factory lz4Factory = null;
31+
3032
LZ4Entity(HttpEntity httpEntity, boolean useHttpCompression, boolean serverCompression, boolean clientCompression,
31-
int bufferSize, boolean isResponse) {
33+
int bufferSize, boolean isResponse, LZ4Factory lz4Factory) {
3234
this.httpEntity = httpEntity;
3335
this.useHttpCompression = useHttpCompression;
3436
this.bufferSize = bufferSize;
3537
this.serverCompression = serverCompression;
3638
this.clientCompression = clientCompression;
3739
this.isResponse = isResponse;
40+
this.lz4Factory = lz4Factory;
3841
}
3942

4043
@Override
@@ -59,7 +62,7 @@ public InputStream getContent() throws IOException, UnsupportedOperationExceptio
5962
return content;
6063
}
6164
} else {
62-
return new ClickHouseLZ4InputStream(httpEntity.getContent(), LZ4Factory.fastestInstance().fastDecompressor(),
65+
return new ClickHouseLZ4InputStream(httpEntity.getContent(), lz4Factory.fastDecompressor(),
6366
bufferSize);
6467
}
6568
} else {
@@ -77,7 +80,8 @@ public void writeTo(OutputStream outStream) throws IOException {
7780
if (useHttpCompression) {
7881
httpEntity.writeTo(new FramedLZ4CompressorOutputStream(outStream));
7982
} else {
80-
httpEntity.writeTo(new ClickHouseLZ4OutputStream(outStream, LZ4Factory.fastestInstance().fastCompressor(),
83+
84+
httpEntity.writeTo(new ClickHouseLZ4OutputStream(outStream, lz4Factory.fastCompressor(),
8185
bufferSize));
8286
}
8387
} else {

client-v2/src/test/java/com/clickhouse/client/ClientTests.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,13 @@ public void testLoadingServerContext() throws Exception {
163163
}
164164
}
165165

166+
@Test
167+
public void testDisableNative() {
168+
try (Client client = newClient().disableNativeCompression(true).build()) {
169+
Assert.assertTrue(client.toString().indexOf("JavaUnsafe") != -1);
170+
}
171+
}
172+
166173
protected Client.Builder newClient() {
167174
ClickHouseNode node = getServer(ClickHouseProtocol.HTTP);
168175
boolean isSecure = isCloud();

jdbc-v2/pom.xml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@
5252
</dependency>
5353
<dependency>
5454
<groupId>org.lz4</groupId>
55-
<artifactId>lz4-pure-java</artifactId>
55+
<artifactId>lz4-java</artifactId>
5656
<optional>true</optional>
5757
</dependency>
5858
<dependency>
@@ -173,7 +173,7 @@
173173
<include>org.apache.httpcomponents.client5:httpclient5</include>
174174
<include>org.apache.httpcomponents.core5:httpcore5</include>
175175
<include>org.apache.httpcomponents.core5:httpcore5-h2</include>
176-
<include>org.lz4:lz4-pure-java</include>
176+
<include>org.lz4:lz4-java</include>
177177
</includes>
178178
</artifactSet>
179179
<relocations>
@@ -311,7 +311,7 @@
311311
<include>com.clickhouse:clickhouse-data</include>
312312
<include>com.clickhouse:client-v2</include>
313313
<include>com.clickhouse:clickhouse-http-client</include>
314-
<include>org.lz4:lz4-pure-java</include>
314+
<include>org.lz4:lz4-java</include>
315315
</includes>
316316
</artifactSet>
317317
<relocations>

performance/src/main/java/com/clickhouse/benchmark/clients/BenchmarkBase.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,8 @@ public static class DataState {
8383

8484
@Param({"file://dataset_500k.csv"})
8585
String datasetSourceName;
86-
@Param({"300000", "220000", "100000", "10000"})
86+
// @Param({"300000", "220000", "100000", "10000"})
87+
@Param({"300000"})
8788
int limit;
8889
@Param({"data_filled"})
8990
String tableNameFilled;

performance/src/main/java/com/clickhouse/benchmark/clients/Compression.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66
import com.clickhouse.data.ClickHouseOutputStream;
77
import com.clickhouse.data.stream.Lz4OutputStream;
88
import org.openjdk.jmh.annotations.Benchmark;
9+
import org.openjdk.jmh.annotations.Level;
10+
import org.openjdk.jmh.annotations.Setup;
911
import org.slf4j.Logger;
1012
import org.slf4j.LoggerFactory;
1113

@@ -15,6 +17,11 @@ public class Compression extends BenchmarkBase {
1517
private static final Logger LOGGER = LoggerFactory.getLogger(Compression.class);
1618

1719
static final int COMPRESS_BUFFER_SIZE = 64 * 1024; // 64K
20+
private static final LZ4Factory factory = LZ4Factory.fastestInstance();
21+
@Setup(Level.Invocation)
22+
public void setup() {
23+
LOGGER.info("Compressor type {}", factory.fastCompressor());
24+
}
1825

1926
@Benchmark
2027
public void CompressingOutputStreamV1(DataState dataState) {
@@ -29,7 +36,7 @@ public void CompressingOutputStreamV1(DataState dataState) {
2936
}
3037
}
3138

32-
private static final LZ4Factory factory = LZ4Factory.fastestInstance();
39+
3340

3441
@Benchmark
3542
public void CompressingOutputStreamV2(DataState dataState) {

performance/src/main/java/com/clickhouse/benchmark/clients/CompressorCompare.java

Whitespace-only changes.

performance/src/test/com/clickhouse/benchmark/BenchmarkRunner.java

Whitespace-only changes.

pom.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -290,6 +290,7 @@
290290
<artifactId>lz4-pure-java</artifactId>
291291
<version>${lz4.version}</version>
292292
</dependency>
293+
293294
<dependency>
294295
<groupId>org.msgpack</groupId>
295296
<artifactId>msgpack-core</artifactId>

0 commit comments

Comments
 (0)