diff --git a/pom.xml b/pom.xml index 2d1a97ce2..85bdce3e8 100644 --- a/pom.xml +++ b/pom.xml @@ -40,6 +40,7 @@ providers/configcat providers/statsig providers/multiprovider + tools/flagd-http-connector diff --git a/providers/flagd/src/main/resources/simplelogger.properties b/providers/flagd/src/main/resources/simplelogger.properties index d9d489e82..80c478930 100644 --- a/providers/flagd/src/main/resources/simplelogger.properties +++ b/providers/flagd/src/main/resources/simplelogger.properties @@ -1,3 +1,3 @@ -org.org.slf4j.simpleLogger.defaultLogLevel=debug +org.slf4j.simpleLogger.defaultLogLevel=debug io.grpc.level=trace diff --git a/providers/flagd/src/test/resources/simplelogger.properties b/providers/flagd/src/test/resources/simplelogger.properties index d2ca1bbdc..769e4e8bf 100644 --- a/providers/flagd/src/test/resources/simplelogger.properties +++ b/providers/flagd/src/test/resources/simplelogger.properties @@ -1,4 +1,4 @@ -org.org.slf4j.simpleLogger.defaultLogLevel=debug +org.slf4j.simpleLogger.defaultLogLevel=debug org.slf4j.simpleLogger.showDateTime= io.grpc.level=trace diff --git a/tools/flagd-http-connector/CHANGELOG.md b/tools/flagd-http-connector/CHANGELOG.md new file mode 100644 index 000000000..069e8ebdc --- /dev/null +++ b/tools/flagd-http-connector/CHANGELOG.md @@ -0,0 +1,35 @@ +# Changelog + +## [0.1.2](https://github.com/open-feature/java-sdk-contrib/compare/dev.openfeature.contrib.tools.junitopenfeature-v0.1.1...dev.openfeature.contrib.tools.junitopenfeature-v0.1.2) (2024-12-03) + + +### ✨ New Features + +* added interception of parameterized tests to Junit OpenFeature Extension ([#1093](https://github.com/open-feature/java-sdk-contrib/issues/1093)) ([a78c906](https://github.com/open-feature/java-sdk-contrib/commit/a78c906b24b53f7d25eb01aad85ed614eb30ca05)) + +## [0.1.1](https://github.com/open-feature/java-sdk-contrib/compare/dev.openfeature.contrib.tools.junitopenfeature-v0.1.0...dev.openfeature.contrib.tools.junitopenfeature-v0.1.1) (2024-09-27) + + +### 🐛 Bug Fixes + +* race condition causing default when multiple flags are used ([#983](https://github.com/open-feature/java-sdk-contrib/issues/983)) ([356a973](https://github.com/open-feature/java-sdk-contrib/commit/356a973cf2b6ddf82b8311ea200fa30df4f1d048)) + +## [0.1.0](https://github.com/open-feature/java-sdk-contrib/compare/dev.openfeature.contrib.tools.junitopenfeature-v0.0.3...dev.openfeature.contrib.tools.junitopenfeature-v0.1.0) (2024-09-25) + + +### 🐛 Bug Fixes + +* **deps:** update dependency org.apache.commons:commons-lang3 to v3.17.0 ([#932](https://github.com/open-feature/java-sdk-contrib/issues/932)) ([c598d9f](https://github.com/open-feature/java-sdk-contrib/commit/c598d9f0a61f2324fb85d72fdfea34811283c575)) + + +### 🐛 Bug Fixes + +* added missing dependency and installation instruction ([#895](https://github.com/open-feature/java-sdk-contrib/issues/895)) ([6748d02](https://github.com/open-feature/java-sdk-contrib/commit/6748d02403f0ceecb6cb9ecdfb2fecf98423a7db)) +* **deps:** update dependency org.apache.commons:commons-lang3 to v3.16.0 ([#908](https://github.com/open-feature/java-sdk-contrib/issues/908)) ([d21cfe3](https://github.com/open-feature/java-sdk-contrib/commit/d21cfe3ac7da1ff6e1a4dc2ee4b0db5c24ed4847)) + +## [0.0.2](https://github.com/open-feature/java-sdk-contrib/compare/dev.openfeature.contrib.tools.junitopenfeature-v0.0.1...dev.openfeature.contrib.tools.junitopenfeature-v0.0.2) (2024-07-29) + + +### ✨ New Features + +* Add JUnit5 extension for OpenFeature ([#888](https://github.com/open-feature/java-sdk-contrib/issues/888)) ([9fff9db](https://github.com/open-feature/java-sdk-contrib/commit/9fff9db4bcee3c3ae8128a1b2fb040f53df1d5ed)) diff --git a/tools/flagd-http-connector/README.md b/tools/flagd-http-connector/README.md new file mode 100644 index 000000000..39a3f3336 --- /dev/null +++ b/tools/flagd-http-connector/README.md @@ -0,0 +1,103 @@ +# Http Connector + +## Introduction +Http Connector is a tool for [flagd](https://github.com/open-feature/flagd) in-process resolver. + +This mode performs flag evaluations locally (in-process). +Flag configurations for evaluation are obtained via Http. + +## Http Connector functionality + +HttpConnector is responsible for polling data from a specified URL at regular intervals. +It is leveraging Http cache mechanism with 'ETag' header, then when receiving 304 Not Modified response, +reducing traffic, reducing rate limits effects and changes updates. Can be enabled via useHttpCache option. +The implementation is using Java HttpClient. + +## Use cases and benefits +* Reduce infrastructure/devops work, without additional containers needed. +* Use as an additional provider for fallback / internal backup service via multi-provider. + +### What happens if the Http source is down when application is starting ? + +It supports optional fail-safe initialization via cache, such that on initial fetch error following by +source downtime window, initial payload is taken from cache to avoid starting with default values until +the source is back up. Therefore, the cache ttl expected to be higher than the expected source +down-time to recover from during initialization. + +### Sample flow +Sample flow can use: +- Github as the flags payload source. +- Redis cache as a fail-safe initialization cache. + +Sample flow of initialization during Github down-time window, showing that application can still use flags +values as fetched from cache. +```mermaid +sequenceDiagram + participant Provider + participant Github + participant Redis + + break source downtime + Provider->>Github: initialize + Github->>Provider: failure + end + Provider->>Redis: fetch + Redis->>Provider: last payload + +``` + +## Usage + +### Installation + +```xml + + dev.openfeature.contrib.tools + flagd-http-connector + 0.0.1 + +``` + + +### Usage example + +```java + +HttpConnectorOptions httpConnectorOptions = HttpConnectorOptions.builder() + .url("http://example.com/flags") + .build(); +HttpConnector connector = HttpConnector.builder() + .httpConnectorOptions(httpConnectorOptions) + .build(); + +FlagdOptions options = + FlagdOptions.builder() + .resolverType(Config.Resolver.IN_PROCESS) + .customConnector(connector) + .build(); + +FlagdProvider flagdProvider = new FlagdProvider(options); +``` + +### Configuration +The Http Connector can be configured using the following properties in the `HttpConnectorOptions` class.: + +| Property Name | Type | Description | +|-------------------------------------------|---------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| url | String | The URL to poll for updates. This is a required field. | +| pollIntervalSeconds | Integer | The interval in seconds at which the connector will poll the URL for updates. Default is 60 seconds. | +| connectTimeoutSeconds | Integer | The timeout in seconds for establishing a connection to the URL. Default is 10 seconds. | +| requestTimeoutSeconds | Integer | The timeout in seconds for the request to complete. Default is 10 seconds. | +| linkedBlockingQueueCapacity | Integer | The capacity of the linked blocking queue used for processing requests. Default is 100. | +| scheduledThreadPoolSize | Integer | The size of the scheduled thread pool used for processing requests. Default is 2. | +| headers | Map | A map of headers to be included in the request. Default is an empty map. | +| httpClientExecutor | ExecutorService | The executor service used for making HTTP requests. Default is a fixed thread pool with 1 thread. | +| proxyHost | String | The host of the proxy server to use for requests. Default is null. | +| proxyPort | Integer | The port of the proxy server to use for requests. Default is null. | +| payloadCacheOptions | PayloadCacheOptions | Options for configuring the payload cache. Default is null. | +| payloadCache | PayloadCache | The payload cache to use for caching responses. Default is null. | +| useHttpCache | Boolean | Whether to use HTTP caching for the requests. Default is false. | +| PayloadCacheOptions.updateIntervalSeconds | Integer | The interval, in seconds, at which the cache is updated. By default, this is set to 30 minutes. The goal is to avoid overloading fallback cache writes, since the cache serves only as a fallback mechanism. Typically, this value can be tuned to be shorter than the cache's TTL, balancing the need to minimize unnecessary updates while still handling edge cases effectively. | + + + diff --git a/tools/flagd-http-connector/pom.xml b/tools/flagd-http-connector/pom.xml new file mode 100644 index 000000000..8945c7e97 --- /dev/null +++ b/tools/flagd-http-connector/pom.xml @@ -0,0 +1,54 @@ + + + 4.0.0 + + dev.openfeature.contrib + parent + 0.1.0 + ../../pom.xml + + dev.openfeature.contrib.tools + flagd-http-connector + 0.0.1 + + flagd-http-connector + Flagd Http Connector + https://openfeature.dev + + + + liran2000 + Liran Mendelovich + OpenFeature + https://openfeature.dev/ + + + + + + dev.openfeature.contrib.providers + flagd + 0.11.8 + + + + org.apache.commons + commons-lang3 + 3.17.0 + + + + org.junit.jupiter + junit-jupiter-api + test + + + org.junit.jupiter + junit-jupiter-api + test + + + + + diff --git a/tools/flagd-http-connector/src/main/java/dev/openfeature/contrib/tools/flagd/resolver/process/storage/connector/sync/http/HttpCacheFetcher.java b/tools/flagd-http-connector/src/main/java/dev/openfeature/contrib/tools/flagd/resolver/process/storage/connector/sync/http/HttpCacheFetcher.java new file mode 100644 index 000000000..0cc069032 --- /dev/null +++ b/tools/flagd-http-connector/src/main/java/dev/openfeature/contrib/tools/flagd/resolver/process/storage/connector/sync/http/HttpCacheFetcher.java @@ -0,0 +1,49 @@ +package dev.openfeature.contrib.tools.flagd.resolver.process.storage.connector.sync.http; + +import java.net.http.HttpClient; +import java.net.http.HttpRequest; +import java.net.http.HttpResponse; +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; + +/** + * Fetches content from a given HTTP endpoint using caching headers to optimize network usage. + * If cached ETag or Last-Modified values are available, they are included in the request headers + * to potentially receive a 304 Not Modified response, reducing data transfer. + * Updates the cached ETag and Last-Modified values upon receiving a 200 OK response. + * It does not store the cached response, assuming not needed after first successful fetching. + * Non thread-safe. + * + * @param httpClient the HTTP client used to send the request + * @param httpRequestBuilder the builder for constructing the HTTP request + * @return the HTTP response received from the server + */ +@Slf4j +public class HttpCacheFetcher { + private String cachedETag = null; + private String cachedLastModified = null; + + @SneakyThrows + public HttpResponse fetchContent(HttpClient httpClient, HttpRequest.Builder httpRequestBuilder) { + if (cachedETag != null) { + httpRequestBuilder.header("If-None-Match", cachedETag); + } + if (cachedLastModified != null) { + httpRequestBuilder.header("If-Modified-Since", cachedLastModified); + } + + HttpRequest request = httpRequestBuilder.build(); + HttpResponse httpResponse = httpClient.send(request, HttpResponse.BodyHandlers.ofString()); + + if (httpResponse.statusCode() == 200) { + if (httpResponse.headers() != null) { + cachedETag = httpResponse.headers().firstValue("ETag").orElse(null); + cachedLastModified = httpResponse.headers().firstValue("Last-Modified").orElse(null); + } + log.debug("fetched new content"); + } else if (httpResponse.statusCode() == 304) { + log.debug("got 304 Not Modified"); + } + return httpResponse; + } +} diff --git a/tools/flagd-http-connector/src/main/java/dev/openfeature/contrib/tools/flagd/resolver/process/storage/connector/sync/http/HttpConnector.java b/tools/flagd-http-connector/src/main/java/dev/openfeature/contrib/tools/flagd/resolver/process/storage/connector/sync/http/HttpConnector.java new file mode 100644 index 000000000..3eb8a116d --- /dev/null +++ b/tools/flagd-http-connector/src/main/java/dev/openfeature/contrib/tools/flagd/resolver/process/storage/connector/sync/http/HttpConnector.java @@ -0,0 +1,183 @@ +package dev.openfeature.contrib.tools.flagd.resolver.process.storage.connector.sync.http; + +import static java.net.http.HttpClient.Builder.NO_PROXY; + +import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.QueuePayload; +import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.QueuePayloadType; +import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.QueueSource; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.ProxySelector; +import java.net.URI; +import java.net.http.HttpClient; +import java.net.http.HttpRequest; +import java.net.http.HttpResponse; +import java.time.Duration; +import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import dev.openfeature.contrib.tools.flagd.resolver.process.storage.connector.sync.http.util.ConcurrentUtils; +import lombok.Builder; +import lombok.NonNull; +import lombok.extern.slf4j.Slf4j; + +/** + * HttpConnector is responsible for polling data from a specified URL at regular intervals. + * Notice rate limits for polling http sources like Github. + * It implements the QueueSource interface to enqueue and dequeue change messages. + * The class supports configurable parameters such as poll interval, request timeout, and proxy settings. + * It uses a ScheduledExecutorService to schedule polling tasks and an ExecutorService for HTTP client execution. + * The class also provides methods to initialize, retrieve the stream queue, and shutdown the connector gracefully. + * It supports optional fail-safe initialization via cache. + * + * See readme - Http Connector section. + */ +@Slf4j +public class HttpConnector implements QueueSource { + + private Integer pollIntervalSeconds; + private Integer requestTimeoutSeconds; + private BlockingQueue queue; + private HttpClient client; + private ExecutorService httpClientExecutor; + private ScheduledExecutorService scheduler; + private Map headers; + private PayloadCacheWrapper payloadCacheWrapper; + private PayloadCache payloadCache; + private HttpCacheFetcher httpCacheFetcher; + + @NonNull + private String url; + + @Builder + public HttpConnector(HttpConnectorOptions httpConnectorOptions) { + this.pollIntervalSeconds = httpConnectorOptions.getPollIntervalSeconds(); + this.requestTimeoutSeconds = httpConnectorOptions.getRequestTimeoutSeconds(); + ProxySelector proxySelector = NO_PROXY; + if (httpConnectorOptions.getProxyHost() != null && httpConnectorOptions.getProxyPort() != null) { + proxySelector = ProxySelector.of(new InetSocketAddress(httpConnectorOptions.getProxyHost(), + httpConnectorOptions.getProxyPort())); + } + this.url = httpConnectorOptions.getUrl(); + this.headers = httpConnectorOptions.getHeaders(); + this.httpClientExecutor = httpConnectorOptions.getHttpClientExecutor(); + scheduler = Executors.newScheduledThreadPool(httpConnectorOptions.getScheduledThreadPoolSize()); + this.client = HttpClient.newBuilder() + .connectTimeout(Duration.ofSeconds(httpConnectorOptions.getConnectTimeoutSeconds())) + .proxy(proxySelector) + .executor(this.httpClientExecutor) + .build(); + this.queue = new LinkedBlockingQueue<>(httpConnectorOptions.getLinkedBlockingQueueCapacity()); + this.payloadCache = httpConnectorOptions.getPayloadCache(); + if (payloadCache != null) { + this.payloadCacheWrapper = PayloadCacheWrapper.builder() + .payloadCache(payloadCache) + .payloadCacheOptions(httpConnectorOptions.getPayloadCacheOptions()) + .build(); + } + if (Boolean.TRUE.equals(httpConnectorOptions.getUseHttpCache())) { + httpCacheFetcher = new HttpCacheFetcher(); + } + } + + @Override + public void init() throws Exception { + log.info("init Http Connector"); + } + + @Override + public BlockingQueue getStreamQueue() { + boolean success = fetchAndUpdate(); + if (!success) { + log.info("failed initial fetch"); + if (payloadCache != null) { + updateFromCache(); + } + } + Runnable pollTask = buildPollTask(); + scheduler.scheduleWithFixedDelay(pollTask, pollIntervalSeconds, pollIntervalSeconds, TimeUnit.SECONDS); + return queue; + } + + private void updateFromCache() { + log.info("taking initial payload from cache to avoid starting with default values"); + String flagData = payloadCache.get(); + if (flagData == null) { + log.debug("got null from cache"); + return; + } + if (!this.queue.offer(new QueuePayload(QueuePayloadType.DATA, flagData))) { + log.warn("init: Unable to offer file content to queue: queue is full"); + } + } + + protected Runnable buildPollTask() { + return this::fetchAndUpdate; + } + + private boolean fetchAndUpdate() { + HttpRequest.Builder requestBuilder = HttpRequest.newBuilder() + .uri(URI.create(url)) + .timeout(Duration.ofSeconds(requestTimeoutSeconds)) + .GET(); + headers.forEach(requestBuilder::header); + + HttpResponse response; + try { + log.debug("fetching response"); + response = execute(requestBuilder); + } catch (IOException e) { + log.info("could not fetch", e); + return false; + } catch (Exception e) { + log.debug("exception", e); + return false; + } + log.debug("fetched response"); + String payload = response.body(); + if (!isSuccessful(response)) { + log.info("received non-successful status code: {} {}", response.statusCode(), payload); + return false; + } else if (response.statusCode() == 304) { + log.debug("got 304 Not Modified, skipping update"); + return false; + } + if (payload == null) { + log.debug("payload is null"); + return false; + } + log.debug("adding payload to queue"); + if (!this.queue.offer(new QueuePayload(QueuePayloadType.DATA, payload))) { + log.warn("Unable to offer file content to queue: queue is full"); + return false; + } + if (payloadCacheWrapper != null) { + log.debug("scheduling cache update if needed"); + scheduler.execute(() -> + payloadCacheWrapper.updatePayloadIfNeeded(payload) + ); + } + return payload != null; + } + + private static boolean isSuccessful(HttpResponse response) { + return response.statusCode() == 200 || response.statusCode() == 304; + } + + protected HttpResponse execute(HttpRequest.Builder requestBuilder) throws IOException, InterruptedException { + if (httpCacheFetcher != null) { + return httpCacheFetcher.fetchContent(client, requestBuilder); + } + return client.send(requestBuilder.build(), HttpResponse.BodyHandlers.ofString()); + } + + @Override + public void shutdown() throws InterruptedException { + ConcurrentUtils.shutdownAndAwaitTermination(scheduler, 10); + ConcurrentUtils.shutdownAndAwaitTermination(httpClientExecutor, 10); + } +} diff --git a/tools/flagd-http-connector/src/main/java/dev/openfeature/contrib/tools/flagd/resolver/process/storage/connector/sync/http/HttpConnectorOptions.java b/tools/flagd-http-connector/src/main/java/dev/openfeature/contrib/tools/flagd/resolver/process/storage/connector/sync/http/HttpConnectorOptions.java new file mode 100644 index 000000000..740d54ddf --- /dev/null +++ b/tools/flagd-http-connector/src/main/java/dev/openfeature/contrib/tools/flagd/resolver/process/storage/connector/sync/http/HttpConnectorOptions.java @@ -0,0 +1,125 @@ +package dev.openfeature.contrib.tools.flagd.resolver.process.storage.connector.sync.http; + +import java.net.URL; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import lombok.Builder; +import lombok.Getter; +import lombok.NonNull; +import lombok.SneakyThrows; + +@Getter +public class HttpConnectorOptions { + + @Builder.Default + private Integer pollIntervalSeconds = 60; + @Builder.Default + private Integer connectTimeoutSeconds = 10; + @Builder.Default + private Integer requestTimeoutSeconds = 10; + @Builder.Default + private Integer linkedBlockingQueueCapacity = 100; + @Builder.Default + private Integer scheduledThreadPoolSize = 2; + @Builder.Default + private Map headers = new HashMap<>(); + @Builder.Default + private ExecutorService httpClientExecutor = Executors.newFixedThreadPool(1); + @Builder.Default + private String proxyHost; + @Builder.Default + private Integer proxyPort; + @Builder.Default + private PayloadCacheOptions payloadCacheOptions; + @Builder.Default + private PayloadCache payloadCache; + @Builder.Default + private Boolean useHttpCache; + @NonNull + private String url; + + @Builder + public HttpConnectorOptions(Integer pollIntervalSeconds, Integer linkedBlockingQueueCapacity, + Integer scheduledThreadPoolSize, Integer requestTimeoutSeconds, Integer connectTimeoutSeconds, String url, + Map headers, ExecutorService httpClientExecutor, String proxyHost, Integer proxyPort, + PayloadCacheOptions payloadCacheOptions, PayloadCache payloadCache, Boolean useHttpCache) { + validate(url, pollIntervalSeconds, linkedBlockingQueueCapacity, scheduledThreadPoolSize, requestTimeoutSeconds, + connectTimeoutSeconds, proxyHost, proxyPort, payloadCacheOptions, payloadCache); + if (pollIntervalSeconds != null) { + this.pollIntervalSeconds = pollIntervalSeconds; + } + if (linkedBlockingQueueCapacity != null) { + this.linkedBlockingQueueCapacity = linkedBlockingQueueCapacity; + } + if (scheduledThreadPoolSize != null) { + this.scheduledThreadPoolSize = scheduledThreadPoolSize; + } + if (requestTimeoutSeconds != null) { + this.requestTimeoutSeconds = requestTimeoutSeconds; + } + if (connectTimeoutSeconds != null) { + this.connectTimeoutSeconds = connectTimeoutSeconds; + } + this.url = url; + if (headers != null) { + this.headers = headers; + } + if (httpClientExecutor != null) { + this.httpClientExecutor = httpClientExecutor; + } + if (proxyHost != null) { + this.proxyHost = proxyHost; + } + if (proxyPort != null) { + this.proxyPort = proxyPort; + } + if (payloadCache != null) { + this.payloadCache = payloadCache; + } + if (payloadCacheOptions != null) { + this.payloadCacheOptions = payloadCacheOptions; + } + if (useHttpCache != null) { + this.useHttpCache = useHttpCache; + } + } + + @SneakyThrows + private void validate(String url, Integer pollIntervalSeconds, Integer linkedBlockingQueueCapacity, + Integer scheduledThreadPoolSize, Integer requestTimeoutSeconds, Integer connectTimeoutSeconds, + String proxyHost, Integer proxyPort, PayloadCacheOptions payloadCacheOptions, + PayloadCache payloadCache) { + new URL(url).toURI(); + if (linkedBlockingQueueCapacity != null && (linkedBlockingQueueCapacity < 1 || linkedBlockingQueueCapacity > 1000)) { + throw new IllegalArgumentException("linkedBlockingQueueCapacity must be between 1 and 1000"); + } + if (scheduledThreadPoolSize != null && (scheduledThreadPoolSize < 1 || scheduledThreadPoolSize > 10)) { + throw new IllegalArgumentException("scheduledThreadPoolSize must be between 1 and 10"); + } + if (requestTimeoutSeconds != null && (requestTimeoutSeconds < 1 || requestTimeoutSeconds > 60)) { + throw new IllegalArgumentException("requestTimeoutSeconds must be between 1 and 60"); + } + if (connectTimeoutSeconds != null && (connectTimeoutSeconds < 1 || connectTimeoutSeconds > 60)) { + throw new IllegalArgumentException("connectTimeoutSeconds must be between 1 and 60"); + } + if (pollIntervalSeconds != null && (pollIntervalSeconds < 1 || pollIntervalSeconds > 600)) { + throw new IllegalArgumentException("pollIntervalSeconds must be between 1 and 600"); + } + if (proxyPort != null && (proxyPort < 1 || proxyPort > 65535)) { + throw new IllegalArgumentException("proxyPort must be between 1 and 65535"); + } + if (proxyHost != null && proxyPort == null ) { + throw new IllegalArgumentException("proxyPort must be set if proxyHost is set"); + } else if (proxyHost == null && proxyPort != null) { + throw new IllegalArgumentException("proxyHost must be set if proxyPort is set"); + } + if (payloadCacheOptions != null && payloadCache == null) { + throw new IllegalArgumentException("payloadCache must be set if payloadCacheOptions is set"); + } + if (payloadCache != null && payloadCacheOptions == null) { + throw new IllegalArgumentException("payloadCacheOptions must be set if payloadCache is set"); + } + } +} diff --git a/tools/flagd-http-connector/src/main/java/dev/openfeature/contrib/tools/flagd/resolver/process/storage/connector/sync/http/PayloadCache.java b/tools/flagd-http-connector/src/main/java/dev/openfeature/contrib/tools/flagd/resolver/process/storage/connector/sync/http/PayloadCache.java new file mode 100644 index 000000000..4af5f5f1d --- /dev/null +++ b/tools/flagd-http-connector/src/main/java/dev/openfeature/contrib/tools/flagd/resolver/process/storage/connector/sync/http/PayloadCache.java @@ -0,0 +1,6 @@ +package dev.openfeature.contrib.tools.flagd.resolver.process.storage.connector.sync.http; + +public interface PayloadCache { + public void put(String payload); + public String get(); +} diff --git a/tools/flagd-http-connector/src/main/java/dev/openfeature/contrib/tools/flagd/resolver/process/storage/connector/sync/http/PayloadCacheOptions.java b/tools/flagd-http-connector/src/main/java/dev/openfeature/contrib/tools/flagd/resolver/process/storage/connector/sync/http/PayloadCacheOptions.java new file mode 100644 index 000000000..9ca0dabcc --- /dev/null +++ b/tools/flagd-http-connector/src/main/java/dev/openfeature/contrib/tools/flagd/resolver/process/storage/connector/sync/http/PayloadCacheOptions.java @@ -0,0 +1,24 @@ +package dev.openfeature.contrib.tools.flagd.resolver.process.storage.connector.sync.http; + +import lombok.Builder; +import lombok.Getter; + +/** + * Represents configuration options for caching payloads. + *

+ * This class provides options to configure the caching behavior, + * specifically the interval at which the cache should be updated. + *

+ *

+ * The default update interval is set to 30 minutes. + * Change it typically to a value according to cache ttl and tradeoff with not updating it too much for + * corner cases. + *

+ */ +@Builder +@Getter +public class PayloadCacheOptions { + + @Builder.Default + private int updateIntervalSeconds = 60 * 30; // 30 minutes +} diff --git a/tools/flagd-http-connector/src/main/java/dev/openfeature/contrib/tools/flagd/resolver/process/storage/connector/sync/http/PayloadCacheWrapper.java b/tools/flagd-http-connector/src/main/java/dev/openfeature/contrib/tools/flagd/resolver/process/storage/connector/sync/http/PayloadCacheWrapper.java new file mode 100644 index 000000000..be213403e --- /dev/null +++ b/tools/flagd-http-connector/src/main/java/dev/openfeature/contrib/tools/flagd/resolver/process/storage/connector/sync/http/PayloadCacheWrapper.java @@ -0,0 +1,59 @@ +package dev.openfeature.contrib.tools.flagd.resolver.process.storage.connector.sync.http; + +import lombok.Builder; +import lombok.extern.slf4j.Slf4j; + +/** + * A wrapper class for managing a payload cache with a specified update interval. + * This class ensures that the cache is only updated if the specified time interval + * has passed since the last update. It logs debug messages when updates are skipped + * and error messages if the update process fails. + * Not thread-safe. + * + *

Usage involves creating an instance with {@link PayloadCacheOptions} to set + * the update interval, and then using {@link #updatePayloadIfNeeded(String)} to + * conditionally update the cache and {@link #get()} to retrieve the cached payload.

+ */ +@Slf4j +public class PayloadCacheWrapper { + private long lastUpdateTimeMs; + private long updateIntervalMs; + private PayloadCache payloadCache; + + @Builder + public PayloadCacheWrapper(PayloadCache payloadCache, PayloadCacheOptions payloadCacheOptions) { + if (payloadCacheOptions.getUpdateIntervalSeconds() < 1) { + throw new IllegalArgumentException("pollIntervalSeconds must be larger than 0"); + } + this.updateIntervalMs = payloadCacheOptions.getUpdateIntervalSeconds() * 1000L; + this.payloadCache = payloadCache; + } + + public void updatePayloadIfNeeded(String payload) { + if ((getCurrentTimeMillis() - lastUpdateTimeMs) < updateIntervalMs) { + log.debug("not updating payload, updateIntervalMs not reached"); + return; + } + + try { + log.debug("updating payload"); + payloadCache.put(payload); + lastUpdateTimeMs = getCurrentTimeMillis(); + } catch (Exception e) { + log.error("failed updating cache", e); + } + } + + protected long getCurrentTimeMillis() { + return System.currentTimeMillis(); + } + + public String get() { + try { + return payloadCache.get(); + } catch (Exception e) { + log.error("failed getting from cache", e); + return null; + } + } +} diff --git a/tools/flagd-http-connector/src/main/java/dev/openfeature/contrib/tools/flagd/resolver/process/storage/connector/sync/http/util/ConcurrentUtils.java b/tools/flagd-http-connector/src/main/java/dev/openfeature/contrib/tools/flagd/resolver/process/storage/connector/sync/http/util/ConcurrentUtils.java new file mode 100644 index 000000000..e7ccbc3b9 --- /dev/null +++ b/tools/flagd-http-connector/src/main/java/dev/openfeature/contrib/tools/flagd/resolver/process/storage/connector/sync/http/util/ConcurrentUtils.java @@ -0,0 +1,61 @@ +package dev.openfeature.contrib.tools.flagd.resolver.process.storage.connector.sync.http.util; + +import lombok.AccessLevel; +import lombok.NoArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; + +/** + * Concurrent / Concurrency utilities. + * + * @author Liran Mendelovich + */ +@NoArgsConstructor(access = AccessLevel.PRIVATE) +@Slf4j +public class ConcurrentUtils { + + /** + * Graceful shutdown a thread pool.
+ * See + * https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ExecutorService.html + * + * @param pool thread pool + * @param timeoutSeconds grace period timeout in seconds - timeout can be twice than this value, + * as first it waits for existing tasks to terminate, then waits for cancelled tasks to + * terminate. + */ + public static void shutdownAndAwaitTermination(ExecutorService pool, int timeoutSeconds) { + if (pool == null) { + return; + } + + // Disable new tasks from being submitted + pool.shutdown(); + try { + + // Wait a while for existing tasks to terminate + if (!pool.awaitTermination(timeoutSeconds, TimeUnit.SECONDS)) { + + // Cancel currently executing tasks - best effort, based on interrupt handling + // implementation. + pool.shutdownNow(); + + // Wait a while for tasks to respond to being cancelled + if (!pool.awaitTermination(timeoutSeconds, TimeUnit.SECONDS)) { + log.error("Thread pool did not shutdown all tasks after the timeout: {} seconds.", timeoutSeconds); + } + } + } catch (InterruptedException e) { + + log.info("Current thread interrupted during shutdownAndAwaitTermination, calling shutdownNow."); + + // (Re-)Cancel if current thread also interrupted + pool.shutdownNow(); + + // Preserve interrupt status + Thread.currentThread().interrupt(); + } + } +} diff --git a/tools/flagd-http-connector/src/test/java/dev/openfeature/contrib/tools/flagd/resolver/process/storage/connector/sync/http/HttpCacheFetcherTest.java b/tools/flagd-http-connector/src/test/java/dev/openfeature/contrib/tools/flagd/resolver/process/storage/connector/sync/http/HttpCacheFetcherTest.java new file mode 100644 index 000000000..fe49219ec --- /dev/null +++ b/tools/flagd-http-connector/src/test/java/dev/openfeature/contrib/tools/flagd/resolver/process/storage/connector/sync/http/HttpCacheFetcherTest.java @@ -0,0 +1,308 @@ +package dev.openfeature.contrib.tools.flagd.resolver.process.storage.connector.sync.http; + +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.lang.reflect.Field; +import java.net.http.HttpClient; +import java.net.http.HttpHeaders; +import java.net.http.HttpRequest; +import java.net.http.HttpResponse; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import lombok.SneakyThrows; +import org.junit.jupiter.api.Test; +public class HttpCacheFetcherTest { + + @Test + public void testFirstRequestSendsNoCacheHeaders() throws Exception { + HttpClient httpClientMock = mock(HttpClient.class); + HttpRequest.Builder requestBuilderMock = mock(HttpRequest.Builder.class); + HttpRequest requestMock = mock(HttpRequest.class); + HttpResponse responseMock = spy(HttpResponse.class); + doReturn(HttpHeaders.of(new HashMap<>(), (a, b) -> true)).when(responseMock).headers(); + + when(requestBuilderMock.build()).thenReturn(requestMock); + when(httpClientMock.send(eq(requestMock), any(HttpResponse.BodyHandler.class))).thenReturn(responseMock); + when(responseMock.statusCode()).thenReturn(200); + + HttpCacheFetcher fetcher = new HttpCacheFetcher(); + fetcher.fetchContent(httpClientMock, requestBuilderMock); + + verify(requestBuilderMock, never()).header(eq("If-None-Match"), anyString()); + verify(requestBuilderMock, never()).header(eq("If-Modified-Since"), anyString()); + } + + @Test + public void testResponseWith200ButNoCacheHeaders() throws Exception { + HttpClient httpClientMock = mock(HttpClient.class); + HttpRequest.Builder requestBuilderMock = mock(HttpRequest.Builder.class); + HttpRequest requestMock = mock(HttpRequest.class); + HttpResponse responseMock = mock(HttpResponse.class); + HttpHeaders headersMock = mock(HttpHeaders.class); + + when(requestBuilderMock.build()).thenReturn(requestMock); + when(httpClientMock.send(eq(requestMock), any(HttpResponse.BodyHandler.class))).thenReturn(responseMock); + when(responseMock.statusCode()).thenReturn(200); + when(responseMock.headers()).thenReturn(headersMock); + when(headersMock.firstValue("ETag")).thenReturn(Optional.empty()); + when(headersMock.firstValue("Last-Modified")).thenReturn(Optional.empty()); + + HttpCacheFetcher fetcher = new HttpCacheFetcher(); + HttpResponse response = fetcher.fetchContent(httpClientMock, requestBuilderMock); + + assertEquals(200, response.statusCode()); + + HttpRequest.Builder secondRequestBuilderMock = mock(HttpRequest.Builder.class); + when(secondRequestBuilderMock.build()).thenReturn(requestMock); + + fetcher.fetchContent(httpClientMock, secondRequestBuilderMock); + + verify(secondRequestBuilderMock, never()).header(eq("If-None-Match"), anyString()); + verify(secondRequestBuilderMock, never()).header(eq("If-Modified-Since"), anyString()); + } + + @Test + public void testFetchContentReturnsHttpResponse() throws Exception { + HttpClient httpClientMock = mock(HttpClient.class); + HttpRequest.Builder requestBuilderMock = mock(HttpRequest.Builder.class); + HttpRequest requestMock = mock(HttpRequest.class); + HttpResponse responseMock = spy(HttpResponse.class); + doReturn(HttpHeaders.of(new HashMap<>(), (a, b) -> true)).when(responseMock).headers(); + + when(requestBuilderMock.build()).thenReturn(requestMock); + when(httpClientMock.send(eq(requestMock), any(HttpResponse.BodyHandler.class))).thenReturn(responseMock); + when(responseMock.statusCode()).thenReturn(404); + + HttpCacheFetcher fetcher = new HttpCacheFetcher(); + HttpResponse result = fetcher.fetchContent(httpClientMock, requestBuilderMock); + + assertEquals(responseMock, result); + } + + @Test + public void test200ResponseNoEtagOrLastModified() throws Exception { + HttpClient httpClientMock = mock(HttpClient.class); + HttpRequest.Builder requestBuilderMock = mock(HttpRequest.Builder.class); + HttpRequest requestMock = mock(HttpRequest.class); + HttpResponse responseMock = spy(HttpResponse.class); + doReturn(HttpHeaders.of(new HashMap<>(), (a, b) -> true)).when(responseMock).headers(); + + when(requestBuilderMock.build()).thenReturn(requestMock); + when(httpClientMock.send(eq(requestMock), any(HttpResponse.BodyHandler.class))).thenReturn(responseMock); + when(responseMock.statusCode()).thenReturn(200); + + HttpCacheFetcher fetcher = new HttpCacheFetcher(); + fetcher.fetchContent(httpClientMock, requestBuilderMock); + + Field cachedETagField = HttpCacheFetcher.class.getDeclaredField("cachedETag"); + cachedETagField.setAccessible(true); + assertNull(cachedETagField.get(fetcher)); + Field cachedLastModifiedField = HttpCacheFetcher.class.getDeclaredField("cachedLastModified"); + cachedLastModifiedField.setAccessible(true); + assertNull(cachedLastModifiedField.get(fetcher)); + } + + @Test + public void testUpdateCacheOn200Response() throws Exception { + HttpClient httpClientMock = mock(HttpClient.class); + HttpRequest.Builder requestBuilderMock = mock(HttpRequest.Builder.class); + HttpRequest requestMock = mock(HttpRequest.class); + HttpResponse responseMock = spy(HttpResponse.class); + doReturn(HttpHeaders.of( + Map.of("Last-Modified", Arrays.asList("Wed, 21 Oct 2015 07:28:00 GMT"), + "ETag", Arrays.asList("etag-value")), + (a, b) -> true)).when(responseMock).headers(); + + when(requestBuilderMock.build()).thenReturn(requestMock); + when(httpClientMock.send(eq(requestMock), any(HttpResponse.BodyHandler.class))).thenReturn(responseMock); + when(responseMock.statusCode()).thenReturn(200); + HttpCacheFetcher fetcher = new HttpCacheFetcher(); + fetcher.fetchContent(httpClientMock, requestBuilderMock); + + Field cachedETagField = HttpCacheFetcher.class.getDeclaredField("cachedETag"); + cachedETagField.setAccessible(true); + assertEquals("etag-value", cachedETagField.get(fetcher)); + Field cachedLastModifiedField = HttpCacheFetcher.class.getDeclaredField("cachedLastModified"); + cachedLastModifiedField.setAccessible(true); + assertEquals("Wed, 21 Oct 2015 07:28:00 GMT", cachedLastModifiedField.get(fetcher)); + } + + @Test + public void testRequestWithCachedEtagIncludesIfNoneMatchHeader() throws Exception { + HttpClient httpClientMock = mock(HttpClient.class); + HttpRequest.Builder requestBuilderMock = mock(HttpRequest.Builder.class); + HttpRequest requestMock = mock(HttpRequest.class); + HttpResponse responseMock = spy(HttpResponse.class); + doReturn(HttpHeaders.of( + Map.of("ETag", Arrays.asList("12345")), + (a, b) -> true)).when(responseMock).headers(); + + when(requestBuilderMock.build()).thenReturn(requestMock); + when(httpClientMock.send(eq(requestMock), any(HttpResponse.BodyHandler.class))).thenReturn(responseMock); + when(responseMock.statusCode()).thenReturn(200); + + HttpCacheFetcher fetcher = new HttpCacheFetcher(); + fetcher.fetchContent(httpClientMock, requestBuilderMock); + fetcher.fetchContent(httpClientMock, requestBuilderMock); + + verify(requestBuilderMock, times(1)).header("If-None-Match", "12345"); + } + + @Test + public void testNullHttpClientOrRequestBuilder() { + HttpCacheFetcher fetcher = new HttpCacheFetcher(); + HttpRequest.Builder requestBuilderMock = mock(HttpRequest.Builder.class); + + assertThrows(NullPointerException.class, () -> { + fetcher.fetchContent(null, requestBuilderMock); + }); + + assertThrows(NullPointerException.class, () -> { + fetcher.fetchContent(mock(HttpClient.class), null); + }); + } + + @Test + public void testResponseWithUnexpectedStatusCode() throws Exception { + HttpClient httpClientMock = mock(HttpClient.class); + HttpRequest.Builder requestBuilderMock = mock(HttpRequest.Builder.class); + HttpRequest requestMock = mock(HttpRequest.class); + HttpResponse responseMock = spy(HttpResponse.class); + doReturn(HttpHeaders.of(new HashMap<>(), (a, b) -> true)).when(responseMock).headers(); + + when(requestBuilderMock.build()).thenReturn(requestMock); + when(httpClientMock.send(eq(requestMock), any(HttpResponse.BodyHandler.class))).thenReturn(responseMock); + when(responseMock.statusCode()).thenReturn(500); + + HttpCacheFetcher fetcher = new HttpCacheFetcher(); + HttpResponse response = fetcher.fetchContent(httpClientMock, requestBuilderMock); + + assertEquals(500, response.statusCode()); + verify(requestBuilderMock, never()).header(eq("If-None-Match"), anyString()); + verify(requestBuilderMock, never()).header(eq("If-Modified-Since"), anyString()); + } + + @Test + public void testRequestIncludesIfModifiedSinceHeaderWhenLastModifiedCached() throws Exception { + HttpClient httpClientMock = mock(HttpClient.class); + HttpRequest.Builder requestBuilderMock = mock(HttpRequest.Builder.class); + HttpRequest requestMock = mock(HttpRequest.class); + HttpResponse responseMock = spy(HttpResponse.class); + doReturn(HttpHeaders.of( + Map.of("Last-Modified", Arrays.asList("Wed, 21 Oct 2015 07:28:00 GMT")), + (a, b) -> true)).when(responseMock).headers(); + + when(requestBuilderMock.build()).thenReturn(requestMock); + when(httpClientMock.send(eq(requestMock), any(HttpResponse.BodyHandler.class))).thenReturn(responseMock); + when(responseMock.statusCode()).thenReturn(200); + + HttpCacheFetcher fetcher = new HttpCacheFetcher(); + fetcher.fetchContent(httpClientMock, requestBuilderMock); + fetcher.fetchContent(httpClientMock, requestBuilderMock); + + verify(requestBuilderMock).header(eq("If-Modified-Since"), eq("Wed, 21 Oct 2015 07:28:00 GMT")); + } + + @Test + public void testCalls200And304Responses() throws Exception { + HttpClient httpClientMock = mock(HttpClient.class); + HttpRequest.Builder requestBuilderMock = mock(HttpRequest.Builder.class); + HttpRequest requestMock = mock(HttpRequest.class); + HttpResponse responseMock200 = mock(HttpResponse.class); + HttpResponse responseMock304 = mock(HttpResponse.class); + + when(requestBuilderMock.build()).thenReturn(requestMock); + when(httpClientMock.send(eq(requestMock), any(HttpResponse.BodyHandler.class))) + .thenReturn(responseMock200) + .thenReturn(responseMock304); + when(responseMock200.statusCode()).thenReturn(200); + when(responseMock304.statusCode()).thenReturn(304); + + HttpCacheFetcher fetcher = new HttpCacheFetcher(); + fetcher.fetchContent(httpClientMock, requestBuilderMock); + fetcher.fetchContent(httpClientMock, requestBuilderMock); + + verify(responseMock200, times(1)).statusCode(); + verify(responseMock304, times(2)).statusCode(); + } + + @Test + public void testRequestIncludesBothEtagAndLastModifiedHeaders() throws Exception { + HttpClient httpClientMock = mock(HttpClient.class); + HttpRequest.Builder requestBuilderMock = mock(HttpRequest.Builder.class); + HttpRequest requestMock = mock(HttpRequest.class); + HttpResponse responseMock = spy(HttpResponse.class); + doReturn(HttpHeaders.of(new HashMap<>(), (a, b) -> true)).when(responseMock).headers(); + + when(requestBuilderMock.build()).thenReturn(requestMock); + when(httpClientMock.send(eq(requestMock), any(HttpResponse.BodyHandler.class))).thenReturn(responseMock); + when(responseMock.statusCode()).thenReturn(200); + + HttpCacheFetcher fetcher = new HttpCacheFetcher(); + Field cachedETagField = HttpCacheFetcher.class.getDeclaredField("cachedETag"); + cachedETagField.setAccessible(true); + cachedETagField.set(fetcher, "test-etag"); + Field cachedLastModifiedField = HttpCacheFetcher.class.getDeclaredField("cachedLastModified"); + cachedLastModifiedField.setAccessible(true); + cachedLastModifiedField.set(fetcher, "test-last-modified"); + + fetcher.fetchContent(httpClientMock, requestBuilderMock); + + verify(requestBuilderMock).header("If-None-Match", "test-etag"); + verify(requestBuilderMock).header("If-Modified-Since", "test-last-modified"); + } + + @SneakyThrows + @Test + public void testHttpClientSendExceptionPropagation() { + HttpClient httpClientMock = mock(HttpClient.class); + HttpRequest.Builder requestBuilderMock = mock(HttpRequest.Builder.class); + HttpRequest requestMock = mock(HttpRequest.class); + + when(requestBuilderMock.build()).thenReturn(requestMock); + when(httpClientMock.send(eq(requestMock), any(HttpResponse.BodyHandler.class))) + .thenThrow(new IOException("Network error")); + + HttpCacheFetcher fetcher = new HttpCacheFetcher(); + assertThrows(IOException.class, () -> { + fetcher.fetchContent(httpClientMock, requestBuilderMock); + }); + } + + @Test + public void testOnlyEtagAndLastModifiedHeadersCached() throws Exception { + HttpClient httpClientMock = mock(HttpClient.class); + HttpRequest.Builder requestBuilderMock = mock(HttpRequest.Builder.class); + HttpRequest requestMock = mock(HttpRequest.class); + HttpResponse responseMock = spy(HttpResponse.class); + doReturn(HttpHeaders.of( + Map.of("Last-Modified", Arrays.asList("last-modified-value"), + "ETag", Arrays.asList("etag-value")), + (a, b) -> true)).when(responseMock).headers(); + + when(requestBuilderMock.build()).thenReturn(requestMock); + when(httpClientMock.send(eq(requestMock), any(HttpResponse.BodyHandler.class))).thenReturn(responseMock); + when(responseMock.statusCode()).thenReturn(200); + + HttpCacheFetcher fetcher = new HttpCacheFetcher(); + fetcher.fetchContent(httpClientMock, requestBuilderMock); + + verify(requestBuilderMock, never()).header(eq("Some-Other-Header"), anyString()); + } + +} diff --git a/tools/flagd-http-connector/src/test/java/dev/openfeature/contrib/tools/flagd/resolver/process/storage/connector/sync/http/HttpConnectorIntegrationTest.java b/tools/flagd-http-connector/src/test/java/dev/openfeature/contrib/tools/flagd/resolver/process/storage/connector/sync/http/HttpConnectorIntegrationTest.java new file mode 100644 index 000000000..bd198c1c4 --- /dev/null +++ b/tools/flagd-http-connector/src/test/java/dev/openfeature/contrib/tools/flagd/resolver/process/storage/connector/sync/http/HttpConnectorIntegrationTest.java @@ -0,0 +1,57 @@ +package dev.openfeature.contrib.tools.flagd.resolver.process.storage.connector.sync.http; + +import static dev.openfeature.contrib.tools.flagd.resolver.process.storage.connector.sync.http.HttpConnectorTest.delay; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assumptions.assumeTrue; + +import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.QueuePayload; +import java.util.concurrent.BlockingQueue; +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; +import org.junit.jupiter.api.Test; + +/** + * Integration test for the HttpConnector class, specifically testing the ability to fetch + * raw content from a GitHub URL. This test assumes that integration tests are enabled + * and verifies that the HttpConnector can successfully enqueue data from the specified URL. + * The test initializes the HttpConnector with specific configurations, waits for data + * to be enqueued, and asserts the expected queue size. The connector is shut down + * gracefully after the test execution. + * As this integration test using external request, it is disabled by default, and not part of the CI build. + */ +@Slf4j +class HttpConnectorIntegrationTest { + + @SneakyThrows + @Test + void testGithubRawContent() { + assumeTrue(parseBoolean("integrationTestsEnabled")); + HttpConnector connector = null; + try { + String testUrl = "https://raw.githubusercontent.com/open-feature/java-sdk-contrib/58fe5da7d4e2f6f4ae2c1caf3411a01e84a1dc1a/providers/flagd/version.txt"; + + HttpConnectorOptions httpConnectorOptions = HttpConnectorOptions.builder() + .url(testUrl) + .connectTimeoutSeconds(10) + .requestTimeoutSeconds(10) + .useHttpCache(true) + .pollIntervalSeconds(5) + .build(); + connector = HttpConnector.builder() + .httpConnectorOptions(httpConnectorOptions) + .build(); + BlockingQueue queue = connector.getStreamQueue(); + delay(20000); + assertEquals(1, queue.size()); + } finally { + if (connector != null) { + connector.shutdown(); + } + } + } + + public static boolean parseBoolean(String key) { + return Boolean.parseBoolean(System.getProperty(key, System.getenv(key))); + } + +} diff --git a/tools/flagd-http-connector/src/test/java/dev/openfeature/contrib/tools/flagd/resolver/process/storage/connector/sync/http/HttpConnectorOptionsTest.java b/tools/flagd-http-connector/src/test/java/dev/openfeature/contrib/tools/flagd/resolver/process/storage/connector/sync/http/HttpConnectorOptionsTest.java new file mode 100644 index 000000000..96bd81c4d --- /dev/null +++ b/tools/flagd-http-connector/src/test/java/dev/openfeature/contrib/tools/flagd/resolver/process/storage/connector/sync/http/HttpConnectorOptionsTest.java @@ -0,0 +1,405 @@ +package dev.openfeature.contrib.tools.flagd.resolver.process.storage.connector.sync.http; + +import org.junit.jupiter.api.Test; +import java.net.MalformedURLException; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class HttpConnectorOptionsTest { + + @Test + public void testDefaultValuesInitialization() { + HttpConnectorOptions options = HttpConnectorOptions.builder() + .url("https://example.com") + .build(); + + assertEquals(60, options.getPollIntervalSeconds().intValue()); + assertEquals(10, options.getConnectTimeoutSeconds().intValue()); + assertEquals(10, options.getRequestTimeoutSeconds().intValue()); + assertEquals(100, options.getLinkedBlockingQueueCapacity().intValue()); + assertEquals(2, options.getScheduledThreadPoolSize().intValue()); + assertNotNull(options.getHeaders()); + assertTrue(options.getHeaders().isEmpty()); + assertNotNull(options.getHttpClientExecutor()); + assertNull(options.getProxyHost()); + assertNull(options.getProxyPort()); + assertNull(options.getPayloadCacheOptions()); + assertNull(options.getPayloadCache()); + assertNull(options.getUseHttpCache()); + assertEquals("https://example.com", options.getUrl()); + } + + @Test + public void testInvalidUrlFormat() { + MalformedURLException exception = assertThrows( + MalformedURLException.class, + () -> HttpConnectorOptions.builder() + .url("invalid-url") + .build() + ); + + assertNotNull(exception); + } + + @Test + public void testCustomValuesInitialization() { + HttpConnectorOptions options = HttpConnectorOptions.builder() + .pollIntervalSeconds(120) + .connectTimeoutSeconds(20) + .requestTimeoutSeconds(30) + .linkedBlockingQueueCapacity(200) + .scheduledThreadPoolSize(5) + .url("http://example.com") + .build(); + + assertEquals(120, options.getPollIntervalSeconds().intValue()); + assertEquals(20, options.getConnectTimeoutSeconds().intValue()); + assertEquals(30, options.getRequestTimeoutSeconds().intValue()); + assertEquals(200, options.getLinkedBlockingQueueCapacity().intValue()); + assertEquals(5, options.getScheduledThreadPoolSize().intValue()); + assertEquals("http://example.com", options.getUrl()); + } + + @Test + public void testCustomHeadersMap() { + Map customHeaders = new HashMap<>(); + customHeaders.put("Authorization", "Bearer token"); + customHeaders.put("Content-Type", "application/json"); + + HttpConnectorOptions options = HttpConnectorOptions.builder() + .url("http://example.com") + .headers(customHeaders) + .build(); + + assertEquals("Bearer token", options.getHeaders().get("Authorization")); + assertEquals("application/json", options.getHeaders().get("Content-Type")); + } + + @Test + public void testCustomExecutorService() { + ExecutorService customExecutor = Executors.newFixedThreadPool(5); + HttpConnectorOptions options = HttpConnectorOptions.builder() + .url("https://example.com") + .httpClientExecutor(customExecutor) + .build(); + + assertEquals(customExecutor, options.getHttpClientExecutor()); + } + + @Test + public void testSettingPayloadCacheWithValidOptions() { + PayloadCacheOptions cacheOptions = PayloadCacheOptions.builder() + .updateIntervalSeconds(1800) + .build(); + PayloadCache payloadCache = new PayloadCache() { + private String payload; + + @Override + public void put(String payload) { + this.payload = payload; + } + + @Override + public String get() { + return this.payload; + } + }; + + HttpConnectorOptions options = HttpConnectorOptions.builder() + .url("https://example.com") + .payloadCacheOptions(cacheOptions) + .payloadCache(payloadCache) + .build(); + + assertNotNull(options.getPayloadCacheOptions()); + assertNotNull(options.getPayloadCache()); + assertEquals(1800, options.getPayloadCacheOptions().getUpdateIntervalSeconds()); + } + + @Test + public void testProxyConfigurationWithValidHostAndPort() { + HttpConnectorOptions options = HttpConnectorOptions.builder() + .url("https://example.com") + .proxyHost("proxy.example.com") + .proxyPort(8080) + .build(); + + assertEquals("proxy.example.com", options.getProxyHost()); + assertEquals(8080, options.getProxyPort().intValue()); + } + + @Test + public void testLinkedBlockingQueueCapacityOutOfRange() { + IllegalArgumentException exception = assertThrows(IllegalArgumentException.class, () -> { + HttpConnectorOptions.builder() + .url("https://example.com") + .linkedBlockingQueueCapacity(0) + .build(); + }); + assertEquals("linkedBlockingQueueCapacity must be between 1 and 1000", exception.getMessage()); + + exception = assertThrows(IllegalArgumentException.class, () -> { + HttpConnectorOptions.builder() + .url("https://example.com") + .linkedBlockingQueueCapacity(1001) + .build(); + }); + assertEquals("linkedBlockingQueueCapacity must be between 1 and 1000", exception.getMessage()); + } + + @Test + public void testPollIntervalSecondsOutOfRange() { + IllegalArgumentException exception = assertThrows(IllegalArgumentException.class, () -> { + HttpConnectorOptions.builder() + .url("https://example.com") + .pollIntervalSeconds(700) + .build(); + }); + assertEquals("pollIntervalSeconds must be between 1 and 600", exception.getMessage()); + } + + @Test + public void testAdditionalCustomValuesInitialization() { + Map headers = new HashMap<>(); + headers.put("Authorization", "Bearer token"); + ExecutorService executorService = Executors.newFixedThreadPool(2); + PayloadCacheOptions cacheOptions = PayloadCacheOptions.builder().build(); + PayloadCache cache = new PayloadCache() { + @Override + public void put(String payload) { + // do nothing + } + @Override + public String get() { return null; } + }; + + HttpConnectorOptions options = HttpConnectorOptions.builder() + .url("https://example.com") + .pollIntervalSeconds(120) + .connectTimeoutSeconds(20) + .requestTimeoutSeconds(30) + .linkedBlockingQueueCapacity(200) + .scheduledThreadPoolSize(4) + .headers(headers) + .httpClientExecutor(executorService) + .proxyHost("proxy.example.com") + .proxyPort(8080) + .payloadCacheOptions(cacheOptions) + .payloadCache(cache) + .useHttpCache(true) + .build(); + + assertEquals(120, options.getPollIntervalSeconds().intValue()); + assertEquals(20, options.getConnectTimeoutSeconds().intValue()); + assertEquals(30, options.getRequestTimeoutSeconds().intValue()); + assertEquals(200, options.getLinkedBlockingQueueCapacity().intValue()); + assertEquals(4, options.getScheduledThreadPoolSize().intValue()); + assertNotNull(options.getHeaders()); + assertEquals("Bearer token", options.getHeaders().get("Authorization")); + assertNotNull(options.getHttpClientExecutor()); + assertEquals("proxy.example.com", options.getProxyHost()); + assertEquals(8080, options.getProxyPort().intValue()); + assertNotNull(options.getPayloadCacheOptions()); + assertNotNull(options.getPayloadCache()); + assertTrue(options.getUseHttpCache()); + assertEquals("https://example.com", options.getUrl()); + } + + @Test + public void testRequestTimeoutSecondsOutOfRange() { + IllegalArgumentException exception = assertThrows(IllegalArgumentException.class, () -> { + HttpConnectorOptions.builder() + .url("https://example.com") + .requestTimeoutSeconds(61) + .build(); + }); + assertEquals("requestTimeoutSeconds must be between 1 and 60", exception.getMessage()); + } + + @Test + public void testBuilderInitializesAllFields() { + Map headers = new HashMap<>(); + headers.put("Authorization", "Bearer token"); + ExecutorService executorService = Executors.newFixedThreadPool(2); + PayloadCacheOptions cacheOptions = PayloadCacheOptions.builder().build(); + PayloadCache cache = new PayloadCache() { + @Override + public void put(String payload) { + // do nothing + } + @Override + public String get() { return null; } + }; + + HttpConnectorOptions options = HttpConnectorOptions.builder() + .pollIntervalSeconds(120) + .connectTimeoutSeconds(20) + .requestTimeoutSeconds(30) + .linkedBlockingQueueCapacity(200) + .scheduledThreadPoolSize(4) + .headers(headers) + .httpClientExecutor(executorService) + .proxyHost("proxy.example.com") + .proxyPort(8080) + .payloadCacheOptions(cacheOptions) + .payloadCache(cache) + .useHttpCache(true) + .url("https://example.com") + .build(); + + assertEquals(120, options.getPollIntervalSeconds().intValue()); + assertEquals(20, options.getConnectTimeoutSeconds().intValue()); + assertEquals(30, options.getRequestTimeoutSeconds().intValue()); + assertEquals(200, options.getLinkedBlockingQueueCapacity().intValue()); + assertEquals(4, options.getScheduledThreadPoolSize().intValue()); + assertEquals(headers, options.getHeaders()); + assertEquals(executorService, options.getHttpClientExecutor()); + assertEquals("proxy.example.com", options.getProxyHost()); + assertEquals(8080, options.getProxyPort().intValue()); + assertEquals(cacheOptions, options.getPayloadCacheOptions()); + assertEquals(cache, options.getPayloadCache()); + assertTrue(options.getUseHttpCache()); + assertEquals("https://example.com", options.getUrl()); + } + + @Test + public void testScheduledThreadPoolSizeOutOfRange() { + IllegalArgumentException exception = assertThrows(IllegalArgumentException.class, () -> { + HttpConnectorOptions.builder() + .url("https://example.com") + .scheduledThreadPoolSize(11) + .build(); + }); + assertEquals("scheduledThreadPoolSize must be between 1 and 10", exception.getMessage()); + } + + @Test + public void testProxyPortOutOfRange() { + IllegalArgumentException exception = assertThrows(IllegalArgumentException.class, () -> { + HttpConnectorOptions.builder() + .url("https://example.com") + .proxyHost("proxy.example.com") + .proxyPort(70000) // Invalid port, out of range + .build(); + }); + assertEquals("proxyPort must be between 1 and 65535", exception.getMessage()); + } + + @Test + public void testConnectTimeoutSecondsOutOfRange() { + IllegalArgumentException exception = assertThrows(IllegalArgumentException.class, () -> { + HttpConnectorOptions.builder() + .url("https://example.com") + .connectTimeoutSeconds(0) + .build(); + }); + assertEquals("connectTimeoutSeconds must be between 1 and 60", exception.getMessage()); + + exception = assertThrows(IllegalArgumentException.class, () -> { + HttpConnectorOptions.builder() + .url("https://example.com") + .connectTimeoutSeconds(61) + .build(); + }); + assertEquals("connectTimeoutSeconds must be between 1 and 60", exception.getMessage()); + } + + @Test + public void testProxyPortWithoutProxyHost() { + IllegalArgumentException exception = assertThrows(IllegalArgumentException.class, () -> { + HttpConnectorOptions.builder() + .url("https://example.com") + .proxyPort(8080) + .build(); + }); + assertEquals("proxyHost must be set if proxyPort is set", exception.getMessage()); + } + + @Test + public void testDefaultValuesWhenNullParametersProvided() { + HttpConnectorOptions options = HttpConnectorOptions.builder() + .url("https://example.com") + .pollIntervalSeconds(null) + .linkedBlockingQueueCapacity(null) + .scheduledThreadPoolSize(null) + .requestTimeoutSeconds(null) + .connectTimeoutSeconds(null) + .headers(null) + .httpClientExecutor(null) + .proxyHost(null) + .proxyPort(null) + .payloadCacheOptions(null) + .payloadCache(null) + .useHttpCache(null) + .build(); + + assertEquals(60, options.getPollIntervalSeconds().intValue()); + assertEquals(10, options.getConnectTimeoutSeconds().intValue()); + assertEquals(10, options.getRequestTimeoutSeconds().intValue()); + assertEquals(100, options.getLinkedBlockingQueueCapacity().intValue()); + assertEquals(2, options.getScheduledThreadPoolSize().intValue()); + assertNotNull(options.getHeaders()); + assertTrue(options.getHeaders().isEmpty()); + assertNotNull(options.getHttpClientExecutor()); + assertNull(options.getProxyHost()); + assertNull(options.getProxyPort()); + assertNull(options.getPayloadCacheOptions()); + assertNull(options.getPayloadCache()); + assertNull(options.getUseHttpCache()); + assertEquals("https://example.com", options.getUrl()); + } + + @Test + public void testProxyHostWithoutProxyPort() { + IllegalArgumentException exception = assertThrows(IllegalArgumentException.class, () -> { + HttpConnectorOptions.builder() + .url("https://example.com") + .proxyHost("proxy.example.com") + .build(); + }); + assertEquals("proxyPort must be set if proxyHost is set", exception.getMessage()); + } + + @Test + public void testSettingPayloadCacheWithoutOptions() { + PayloadCache mockPayloadCache = new PayloadCache() { + @Override + public void put(String payload) { + // Mock implementation + } + + @Override + public String get() { + return "mockPayload"; + } + }; + + IllegalArgumentException exception = assertThrows(IllegalArgumentException.class, () -> { + HttpConnectorOptions.builder() + .url("https://example.com") + .payloadCache(mockPayloadCache) + .build(); + }); + + assertEquals("payloadCacheOptions must be set if payloadCache is set", exception.getMessage()); + } + + @Test + public void testPayloadCacheOptionsWithoutPayloadCache() { + IllegalArgumentException exception = assertThrows(IllegalArgumentException.class, () -> { + HttpConnectorOptions.builder() + .url("https://example.com") + .payloadCacheOptions(PayloadCacheOptions.builder().build()) + .build(); + }); + assertEquals("payloadCache must be set if payloadCacheOptions is set", exception.getMessage()); + } +} diff --git a/tools/flagd-http-connector/src/test/java/dev/openfeature/contrib/tools/flagd/resolver/process/storage/connector/sync/http/HttpConnectorTest.java b/tools/flagd-http-connector/src/test/java/dev/openfeature/contrib/tools/flagd/resolver/process/storage/connector/sync/http/HttpConnectorTest.java new file mode 100644 index 000000000..5298d98f6 --- /dev/null +++ b/tools/flagd-http-connector/src/test/java/dev/openfeature/contrib/tools/flagd/resolver/process/storage/connector/sync/http/HttpConnectorTest.java @@ -0,0 +1,438 @@ +package dev.openfeature.contrib.tools.flagd.resolver.process.storage.connector.sync.http; + +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; + +import dev.openfeature.contrib.providers.flagd.Config; +import dev.openfeature.contrib.providers.flagd.FlagdOptions; +import dev.openfeature.contrib.providers.flagd.FlagdProvider; +import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.QueuePayload; +import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.QueuePayloadType; +import java.io.IOException; +import java.lang.reflect.Field; +import java.net.http.HttpClient; +import java.net.http.HttpRequest; +import java.net.http.HttpResponse; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.QueueSource; +import dev.openfeature.sdk.EvaluationContext; +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; + +@Slf4j +class HttpConnectorTest { + + @SneakyThrows + @Test + void testGetStreamQueueInitialAndScheduledPolls() { + String testUrl = "http://example.com"; + HttpClient mockClient = mock(HttpClient.class); + HttpResponse mockResponse = mock(HttpResponse.class); + when(mockResponse.statusCode()).thenReturn(200); + when(mockResponse.body()).thenReturn("test data"); + when(mockClient.send(any(HttpRequest.class), any(HttpResponse.BodyHandler.class))) + .thenReturn(mockResponse); + + HttpConnectorOptions httpConnectorOptions = HttpConnectorOptions.builder() + .url(testUrl) + .httpClientExecutor(Executors.newSingleThreadExecutor()) + .build(); + HttpConnector connector = HttpConnector.builder() + .httpConnectorOptions(httpConnectorOptions) + .build(); + + Field clientField = HttpConnector.class.getDeclaredField("client"); + clientField.setAccessible(true); + clientField.set(connector, mockClient); + + BlockingQueue queue = connector.getStreamQueue(); + + assertFalse(queue.isEmpty()); + QueuePayload payload = queue.poll(); + assertNotNull(payload); + assertEquals(QueuePayloadType.DATA, payload.getType()); + assertEquals("test data", payload.getFlagData()); + + connector.shutdown(); + } + + @SneakyThrows + @Test + void testBuildPollTaskFetchesDataAndAddsToQueue() { + String testUrl = "http://example.com"; + HttpClient mockClient = mock(HttpClient.class); + HttpResponse mockResponse = mock(HttpResponse.class); + when(mockResponse.statusCode()).thenReturn(200); + when(mockResponse.body()).thenReturn("test data"); + when(mockClient.send(any(HttpRequest.class), any(HttpResponse.BodyHandler.class))) + .thenReturn(mockResponse); + + PayloadCache payloadCache = new PayloadCache() { + private String payload; + @Override + public void put(String payload) { + this.payload = payload; + } + + @Override + public String get() { + return payload; + } + }; + HttpConnectorOptions httpConnectorOptions = HttpConnectorOptions.builder() + .url(testUrl) + .proxyHost("proxy-host") + .proxyPort(8080) + .useHttpCache(true) + .payloadCache(payloadCache) + .payloadCacheOptions(PayloadCacheOptions.builder().build()) + .build(); + HttpConnector connector = HttpConnector.builder() + .httpConnectorOptions(httpConnectorOptions) + .build(); + connector.init(); + + Field clientField = HttpConnector.class.getDeclaredField("client"); + clientField.setAccessible(true); + clientField.set(connector, mockClient); + + Runnable pollTask = connector.buildPollTask(); + pollTask.run(); + + Field queueField = HttpConnector.class.getDeclaredField("queue"); + queueField.setAccessible(true); + BlockingQueue queue = (BlockingQueue) queueField.get(connector); + assertFalse(queue.isEmpty()); + QueuePayload payload = queue.poll(); + assertNotNull(payload); + assertEquals(QueuePayloadType.DATA, payload.getType()); + assertEquals("test data", payload.getFlagData()); + } + + @SneakyThrows + @Test + void testHttpRequestIncludesHeaders() { + String testUrl = "http://example.com"; + Map testHeaders = new HashMap<>(); + testHeaders.put("Authorization", "Bearer token"); + testHeaders.put("Content-Type", "application/json"); + + HttpConnectorOptions httpConnectorOptions = HttpConnectorOptions.builder() + .url(testUrl) + .headers(testHeaders) + .build(); + HttpConnector connector = HttpConnector.builder() + .httpConnectorOptions(httpConnectorOptions) + .build(); + + Field headersField = HttpConnector.class.getDeclaredField("headers"); + headersField.setAccessible(true); + Map headers = (Map) headersField.get(connector); + assertNotNull(headers); + assertEquals(2, headers.size()); + assertEquals("Bearer token", headers.get("Authorization")); + assertEquals("application/json", headers.get("Content-Type")); + } + + @SneakyThrows + @Test + void testSuccessfulHttpResponseAddsDataToQueue() { + String testUrl = "http://example.com"; + HttpClient mockClient = mock(HttpClient.class); + HttpResponse mockResponse = mock(HttpResponse.class); + when(mockResponse.statusCode()).thenReturn(200); + when(mockResponse.body()).thenReturn("test data"); + when(mockClient.send(any(HttpRequest.class), any(HttpResponse.BodyHandler.class))) + .thenReturn(mockResponse); + + HttpConnectorOptions httpConnectorOptions = HttpConnectorOptions.builder() + .url(testUrl) + .build(); + HttpConnector connector = HttpConnector.builder() + .httpConnectorOptions(httpConnectorOptions) + .build(); + + Field clientField = HttpConnector.class.getDeclaredField("client"); + clientField.setAccessible(true); + clientField.set(connector, mockClient); + + BlockingQueue queue = connector.getStreamQueue(); + + assertFalse(queue.isEmpty()); + QueuePayload payload = queue.poll(); + assertNotNull(payload); + assertEquals(QueuePayloadType.DATA, payload.getType()); + assertEquals("test data", payload.getFlagData()); + } + + @SneakyThrows + @Test + void testInitFailureUsingCache() { + String testUrl = "http://example.com"; + HttpClient mockClient = mock(HttpClient.class); + HttpResponse mockResponse = mock(HttpResponse.class); + when(mockResponse.statusCode()).thenReturn(200); + when(mockClient.send(any(HttpRequest.class), any(HttpResponse.BodyHandler.class))) + .thenThrow(new IOException("Simulated IO Exception")); + + final String cachedData = "cached data"; + PayloadCache payloadCache = new PayloadCache() { + @Override + public void put(String payload) { + // do nothing + } + + @Override + public String get() { + return cachedData; + } + }; + + HttpConnectorOptions httpConnectorOptions = HttpConnectorOptions.builder() + .url(testUrl) + .payloadCache(payloadCache) + .payloadCacheOptions(PayloadCacheOptions.builder().build()) + .build(); + HttpConnector connector = HttpConnector.builder() + .httpConnectorOptions(httpConnectorOptions) + .build(); + + Field clientField = HttpConnector.class.getDeclaredField("client"); + clientField.setAccessible(true); + clientField.set(connector, mockClient); + + BlockingQueue queue = connector.getStreamQueue(); + + assertFalse(queue.isEmpty()); + QueuePayload payload = queue.poll(); + assertNotNull(payload); + assertEquals(QueuePayloadType.DATA, payload.getType()); + assertEquals(cachedData, payload.getFlagData()); + } + + @SneakyThrows + @Test + void testQueueBecomesFull() { + String testUrl = "http://example.com"; + int queueCapacity = 1; + HttpConnectorOptions httpConnectorOptions = HttpConnectorOptions.builder() + .url(testUrl) + .linkedBlockingQueueCapacity(queueCapacity) + .build(); + HttpConnector connector = HttpConnector.builder() + .httpConnectorOptions(httpConnectorOptions) + .build(); + + BlockingQueue queue = connector.getStreamQueue(); + + queue.offer(new QueuePayload(QueuePayloadType.DATA, "test data 1")); + + boolean wasOffered = queue.offer(new QueuePayload(QueuePayloadType.DATA, "test data 2")); + + assertFalse(wasOffered, "Queue should be full and not accept more items"); + } + + @SneakyThrows + @Test + void testShutdownProperlyTerminatesSchedulerAndHttpClientExecutor() throws InterruptedException { + ExecutorService mockHttpClientExecutor = mock(ExecutorService.class); + ScheduledExecutorService mockScheduler = mock(ScheduledExecutorService.class); + String testUrl = "http://example.com"; + + HttpConnectorOptions httpConnectorOptions = HttpConnectorOptions.builder() + .url(testUrl) + .httpClientExecutor(mockHttpClientExecutor) + .build(); + HttpConnector connector = HttpConnector.builder() + .httpConnectorOptions(httpConnectorOptions) + .build(); + + Field schedulerField = HttpConnector.class.getDeclaredField("scheduler"); + schedulerField.setAccessible(true); + schedulerField.set(connector, mockScheduler); + + connector.shutdown(); + + Mockito.verify(mockScheduler).shutdown(); + Mockito.verify(mockHttpClientExecutor).shutdown(); + } + + @SneakyThrows + @Test + void testHttpResponseNonSuccessStatusCode() { + String testUrl = "http://example.com"; + HttpClient mockClient = mock(HttpClient.class); + HttpResponse mockResponse = mock(HttpResponse.class); + when(mockResponse.statusCode()).thenReturn(404); + when(mockClient.send(any(HttpRequest.class), any(HttpResponse.BodyHandler.class))) + .thenReturn(mockResponse); + + HttpConnectorOptions httpConnectorOptions = HttpConnectorOptions.builder() + .url(testUrl) + .build(); + HttpConnector connector = HttpConnector.builder() + .httpConnectorOptions(httpConnectorOptions) + .build(); + + Field clientField = HttpConnector.class.getDeclaredField("client"); + clientField.setAccessible(true); + clientField.set(connector, mockClient); + + BlockingQueue queue = connector.getStreamQueue(); + + assertTrue(queue.isEmpty(), "Queue should be empty when response status is non-200"); + } + + @SneakyThrows + @Test + void testHttpRequestFailsWithException() { + String testUrl = "http://example.com"; + HttpClient mockClient = mock(HttpClient.class); + HttpConnectorOptions httpConnectorOptions = HttpConnectorOptions.builder() + .url(testUrl) + .build(); + HttpConnector connector = HttpConnector.builder() + .httpConnectorOptions(httpConnectorOptions) + .build(); + + Field clientField = HttpConnector.class.getDeclaredField("client"); + clientField.setAccessible(true); + clientField.set(connector, mockClient); + + when(mockClient.send(any(HttpRequest.class), any(HttpResponse.BodyHandler.class))) + .thenThrow(new RuntimeException("Test exception")); + + BlockingQueue queue = connector.getStreamQueue(); + + assertTrue(queue.isEmpty(), "Queue should be empty when request fails with exception"); + } + + @SneakyThrows + @Test + void testHttpRequestFailsWithIoexception() { + String testUrl = "http://example.com"; + HttpClient mockClient = mock(HttpClient.class); + HttpConnectorOptions httpConnectorOptions = HttpConnectorOptions.builder() + .url(testUrl) + .build(); + HttpConnector connector = HttpConnector.builder() + .httpConnectorOptions(httpConnectorOptions) + .build(); + + Field clientField = HttpConnector.class.getDeclaredField("client"); + clientField.setAccessible(true); + clientField.set(connector, mockClient); + + when(mockClient.send(any(HttpRequest.class), any(HttpResponse.BodyHandler.class))) + .thenThrow(new IOException("Simulated IO Exception")); + + connector.getStreamQueue(); + + Field queueField = HttpConnector.class.getDeclaredField("queue"); + queueField.setAccessible(true); + BlockingQueue queue = (BlockingQueue) queueField.get(connector); + assertTrue(queue.isEmpty(), "Queue should be empty due to IOException"); + } + + @SneakyThrows + @Test + void testScheduledPollingContinuesAtFixedIntervals() { + String testUrl = "http://exampOle.com"; + HttpResponse mockResponse = mock(HttpResponse.class); + when(mockResponse.statusCode()).thenReturn(200); + when(mockResponse.body()).thenReturn("test data"); + + HttpConnectorOptions httpConnectorOptions = HttpConnectorOptions.builder() + .url(testUrl) + .build(); + HttpConnector connector = spy(HttpConnector.builder() + .httpConnectorOptions(httpConnectorOptions) + .build()); + + doReturn(mockResponse).when(connector).execute(any()); + + BlockingQueue queue = connector.getStreamQueue(); + + delay(2000); + assertFalse(queue.isEmpty()); + QueuePayload payload = queue.poll(); + assertNotNull(payload); + assertEquals(QueuePayloadType.DATA, payload.getType()); + assertEquals("test data", payload.getFlagData()); + + connector.shutdown(); + } + + @SneakyThrows + @Test + void testQueuePayloadTypeSetToDataOnSuccess() { + String testUrl = "http://example.com"; + HttpClient mockClient = mock(HttpClient.class); + HttpResponse mockResponse = mock(HttpResponse.class); + + when(mockResponse.statusCode()).thenReturn(200); + when(mockResponse.body()).thenReturn("response body"); + when(mockClient.send(any(HttpRequest.class), any(HttpResponse.BodyHandler.class))) + .thenReturn(mockResponse); + + HttpConnectorOptions httpConnectorOptions = HttpConnectorOptions.builder() + .url(testUrl) + .build(); + HttpConnector connector = HttpConnector.builder() + .httpConnectorOptions(httpConnectorOptions) + .build(); + + Field clientField = HttpConnector.class.getDeclaredField("client"); + clientField.setAccessible(true); + clientField.set(connector, mockClient); + + BlockingQueue queue = connector.getStreamQueue(); + + QueuePayload payload = queue.poll(1, TimeUnit.SECONDS); + assertNotNull(payload); + assertEquals(QueuePayloadType.DATA, payload.getType()); + assertEquals("response body", payload.getFlagData()); + } + + @Test + public void providerTest() { + HttpConnectorOptions httpConnectorOptions = HttpConnectorOptions.builder() + .url("http://example.com") + .build(); + HttpConnector connector = HttpConnector.builder() + .httpConnectorOptions(httpConnectorOptions) + .build(); + + FlagdOptions options = + FlagdOptions.builder() + .resolverType(Config.Resolver.IN_PROCESS) + .customConnector(connector) + .build(); + + FlagdProvider flagdProvider = new FlagdProvider(options); + + assertDoesNotThrow(() -> flagdProvider.getMetadata()); + } + + @SneakyThrows + protected static void delay(long ms) { + Thread.sleep(ms); + } + +} diff --git a/tools/flagd-http-connector/src/test/java/dev/openfeature/contrib/tools/flagd/resolver/process/storage/connector/sync/http/PayloadCacheWrapperTest.java b/tools/flagd-http-connector/src/test/java/dev/openfeature/contrib/tools/flagd/resolver/process/storage/connector/sync/http/PayloadCacheWrapperTest.java new file mode 100644 index 000000000..3ff3ff679 --- /dev/null +++ b/tools/flagd-http-connector/src/test/java/dev/openfeature/contrib/tools/flagd/resolver/process/storage/connector/sync/http/PayloadCacheWrapperTest.java @@ -0,0 +1,267 @@ +package dev.openfeature.contrib.tools.flagd.resolver.process.storage.connector.sync.http; + +import static dev.openfeature.contrib.tools.flagd.resolver.process.storage.connector.sync.http.HttpConnectorTest.delay; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.lang.reflect.Field; +import lombok.SneakyThrows; +import org.junit.jupiter.api.Test; + + +public class PayloadCacheWrapperTest { + + @Test + public void testConstructorInitializesWithValidParameters() { + PayloadCache mockCache = mock(PayloadCache.class); + PayloadCacheOptions options = PayloadCacheOptions.builder() + .updateIntervalSeconds(600) + .build(); + + PayloadCacheWrapper wrapper = PayloadCacheWrapper.builder() + .payloadCache(mockCache) + .payloadCacheOptions(options) + .build(); + + assertNotNull(wrapper); + + String testPayload = "test-payload"; + wrapper.updatePayloadIfNeeded(testPayload); + wrapper.get(); + + verify(mockCache).put(testPayload); + verify(mockCache).get(); + } + + @Test + public void testConstructorThrowsExceptionForInvalidInterval() { + PayloadCache mockCache = mock(PayloadCache.class); + PayloadCacheOptions options = PayloadCacheOptions.builder() + .updateIntervalSeconds(0) + .build(); + + PayloadCacheWrapper.PayloadCacheWrapperBuilder payloadCacheWrapperBuilder = PayloadCacheWrapper.builder() + .payloadCache(mockCache) + .payloadCacheOptions(options); + IllegalArgumentException exception = assertThrows( + IllegalArgumentException.class, + payloadCacheWrapperBuilder::build + ); + + assertEquals("pollIntervalSeconds must be larger than 0", exception.getMessage()); + } + + @Test + public void testUpdateSkipsWhenIntervalNotPassed() { + PayloadCache mockCache = mock(PayloadCache.class); + PayloadCacheOptions options = PayloadCacheOptions.builder() + .updateIntervalSeconds(600) + .build(); + PayloadCacheWrapper wrapper = PayloadCacheWrapper.builder() + .payloadCache(mockCache) + .payloadCacheOptions(options) + .build(); + + String initialPayload = "initial-payload"; + wrapper.updatePayloadIfNeeded(initialPayload); + + String newPayload = "new-payload"; + wrapper.updatePayloadIfNeeded(newPayload); + + verify(mockCache, times(1)).put(initialPayload); + verify(mockCache, never()).put(newPayload); + } + + @Test + public void testUpdatePayloadIfNeededHandlesPutException() { + PayloadCache mockCache = mock(PayloadCache.class); + PayloadCacheOptions options = PayloadCacheOptions.builder() + .updateIntervalSeconds(600) + .build(); + PayloadCacheWrapper wrapper = PayloadCacheWrapper.builder() + .payloadCache(mockCache) + .payloadCacheOptions(options) + .build(); + String testPayload = "test-payload"; + + doThrow(new RuntimeException("put exception")).when(mockCache).put(testPayload); + + wrapper.updatePayloadIfNeeded(testPayload); + + verify(mockCache).put(testPayload); + } + + @Test + public void testUpdatePayloadIfNeededUpdatesCacheAfterInterval() { + PayloadCache mockCache = mock(PayloadCache.class); + PayloadCacheOptions options = PayloadCacheOptions.builder() + .updateIntervalSeconds(1) // 1 second interval for quick test + .build(); + PayloadCacheWrapper wrapper = PayloadCacheWrapper.builder() + .payloadCache(mockCache) + .payloadCacheOptions(options) + .build(); + + String initialPayload = "initial-payload"; + String newPayload = "new-payload"; + + wrapper.updatePayloadIfNeeded(initialPayload); + delay(1100); + wrapper.updatePayloadIfNeeded(newPayload); + + verify(mockCache).put(initialPayload); + verify(mockCache).put(newPayload); + } + + @Test + public void testGetReturnsNullWhenCacheGetThrowsException() { + PayloadCache mockCache = mock(PayloadCache.class); + PayloadCacheOptions options = PayloadCacheOptions.builder() + .updateIntervalSeconds(600) + .build(); + PayloadCacheWrapper wrapper = PayloadCacheWrapper.builder() + .payloadCache(mockCache) + .payloadCacheOptions(options) + .build(); + + when(mockCache.get()).thenThrow(new RuntimeException("Cache get failed")); + + String result = wrapper.get(); + + assertNull(result); + + verify(mockCache).get(); + } + + @Test + public void test_get_returns_cached_payload() { + PayloadCache mockCache = mock(PayloadCache.class); + PayloadCacheOptions options = PayloadCacheOptions.builder() + .updateIntervalSeconds(600) + .build(); + PayloadCacheWrapper wrapper = PayloadCacheWrapper.builder() + .payloadCache(mockCache) + .payloadCacheOptions(options) + .build(); + String expectedPayload = "cached-payload"; + when(mockCache.get()).thenReturn(expectedPayload); + + String actualPayload = wrapper.get(); + + assertEquals(expectedPayload, actualPayload); + + verify(mockCache).get(); + } + + @Test + public void test_first_call_updates_cache() { + PayloadCache mockCache = mock(PayloadCache.class); + PayloadCacheOptions options = PayloadCacheOptions.builder() + .updateIntervalSeconds(600) + .build(); + PayloadCacheWrapper wrapper = PayloadCacheWrapper.builder() + .payloadCache(mockCache) + .payloadCacheOptions(options) + .build(); + + String testPayload = "initial-payload"; + + wrapper.updatePayloadIfNeeded(testPayload); + + verify(mockCache).put(testPayload); + } + + @Test + public void test_update_payload_once_within_interval() { + PayloadCache mockCache = mock(PayloadCache.class); + PayloadCacheOptions options = PayloadCacheOptions.builder() + .updateIntervalSeconds(1) // 1 second interval + .build(); + PayloadCacheWrapper wrapper = PayloadCacheWrapper.builder() + .payloadCache(mockCache) + .payloadCacheOptions(options) + .build(); + + String testPayload = "test-payload"; + + wrapper.updatePayloadIfNeeded(testPayload); + wrapper.updatePayloadIfNeeded(testPayload); + + verify(mockCache, times(1)).put(testPayload); + } + + @SneakyThrows + @Test + public void test_last_update_time_ms_updated_after_successful_cache_update() { + PayloadCache mockCache = mock(PayloadCache.class); + PayloadCacheOptions options = PayloadCacheOptions.builder() + .updateIntervalSeconds(600) + .build(); + PayloadCacheWrapper wrapper = PayloadCacheWrapper.builder() + .payloadCache(mockCache) + .payloadCacheOptions(options) + .build(); + String testPayload = "test-payload"; + + wrapper.updatePayloadIfNeeded(testPayload); + + verify(mockCache).put(testPayload); + + Field lastUpdateTimeMsField = PayloadCacheWrapper.class.getDeclaredField("lastUpdateTimeMs"); + lastUpdateTimeMsField.setAccessible(true); + long lastUpdateTimeMs = (Long) lastUpdateTimeMsField.get(wrapper); + + assertTrue(System.currentTimeMillis() - lastUpdateTimeMs < 1000, + "lastUpdateTimeMs should be updated to current time"); + } + + @Test + public void test_update_payload_if_needed_respects_update_interval() { + PayloadCache mockCache = mock(PayloadCache.class); + PayloadCacheOptions options = PayloadCacheOptions.builder() + .updateIntervalSeconds(600) + .build(); + PayloadCacheWrapper wrapper = spy(PayloadCacheWrapper.builder() + .payloadCache(mockCache) + .payloadCacheOptions(options) + .build()); + + String testPayload = "test-payload"; + long initialTime = System.currentTimeMillis(); + long updateIntervalMs = options.getUpdateIntervalSeconds() * 1000L; + + doReturn(initialTime).when(wrapper).getCurrentTimeMillis(); + + // First update should succeed + wrapper.updatePayloadIfNeeded(testPayload); + + // Verify the payload was updated + verify(mockCache).put(testPayload); + + // Attempt to update before interval has passed + doReturn(initialTime + updateIntervalMs - 1).when(wrapper).getCurrentTimeMillis(); + wrapper.updatePayloadIfNeeded(testPayload); + + // Verify the payload was not updated again + verify(mockCache, times(1)).put(testPayload); + + // Update after interval has passed + doReturn(initialTime + updateIntervalMs + 1).when(wrapper).getCurrentTimeMillis(); + wrapper.updatePayloadIfNeeded(testPayload); + + // Verify the payload was updated again + verify(mockCache, times(2)).put(testPayload); + } + +} diff --git a/tools/flagd-http-connector/src/test/resources/simplelogger.properties b/tools/flagd-http-connector/src/test/resources/simplelogger.properties new file mode 100644 index 000000000..80c478930 --- /dev/null +++ b/tools/flagd-http-connector/src/test/resources/simplelogger.properties @@ -0,0 +1,3 @@ +org.slf4j.simpleLogger.defaultLogLevel=debug + +io.grpc.level=trace diff --git a/tools/flagd-http-connector/version.txt b/tools/flagd-http-connector/version.txt new file mode 100644 index 000000000..8acdd82b7 --- /dev/null +++ b/tools/flagd-http-connector/version.txt @@ -0,0 +1 @@ +0.0.1