Skip to content

Commit 4b29e49

Browse files
authored
Add total size bytes gauge for search after cache (#5742)
* Add total size bytes gauge for search after cache * Rename cache size const * Clarify terminology * Also clarify docs * Clarify ClusterClient.put_kv contract * Document MiniKV limitations
1 parent 50c7535 commit 4b29e49

File tree

5 files changed

+57
-18
lines changed

5 files changed

+57
-18
lines changed

docs/reference/es_compatible_api.md

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,7 @@ If a parameter appears both as a query string parameter and in the JSON payload,
134134
| `q` | `String` | The search query. | (Optional) |
135135
| `size` | `Integer` | Number of hits to return. | 10 |
136136
| `sort` | `String` | Describes how documents should be ranked. See [Sort order](#sort-order) | (Optional) |
137-
| `scroll` | `Duration` | Creates a scroll context for "time to live". See [Scroll](#_scroll--scroll-api). | (Optional) |
137+
| `scroll` | `Duration` | Creates a scroll context for "time to live". See [Scroll](#_searchscroll--scroll-api). | (Optional) |
138138
| `allow_partial_search_results` | `Boolean` | Returns a partial response if some (but not all) of the split searches were unsuccessful. | `true` |
139139

140140
#### Supported Request Body parameters
@@ -279,6 +279,11 @@ First, the client needs to call the `search api` with a `scroll` query parameter
279279

280280
Each subsequent call to the `_search/scroll` endpoint will return a new `scroll_id` pointing to the next page.
281281

282+
:::tip
283+
284+
Using `_search` and then `_search/scroll` is somewhat similar to using `_search` with the `search_after` parameter, except that it creates a lightweight snapshot view of the dataset during the initial call to `_search`. Further calls to `_search/scroll` only return results from that view, thus ensuring more consistent results.
285+
286+
:::
282287

283288
### `_cat`   Cat API
284289

quickwit/quickwit-search/src/cluster_client.rs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -191,9 +191,12 @@ impl ClusterClient {
191191
client.leaf_list_terms(request.clone()).await
192192
}
193193

194-
/// Attempts to store a given search context within the cluster.
194+
/// Attempts to store a given key value pair within the cluster.
195195
///
196-
/// This function may fail silently, if no clients was available.
196+
/// Tries to replicate the pair to [`TARGET_NUM_REPLICATION`] nodes, but this function may fail
197+
/// silently (e.g if no client was available). Even in case of success, this storage is not
198+
/// persistent. For instance during a rolling upgrade, all replicas will be lost as there is no
199+
/// mechanism to maintain the replication count.
197200
pub async fn put_kv(&self, key: &[u8], payload: &[u8], ttl: Duration) {
198201
let clients: Vec<SearchServiceClient> = self
199202
.search_job_placer
@@ -216,8 +219,8 @@ impl ClusterClient {
216219
// course, this may still result in the replication over more nodes, but this is not
217220
// a problem.
218221
//
219-
// The requests are made in a concurrent manner, up to two at a time. As soon as 2 requests
220-
// are successful, we stop.
222+
// The requests are made in a concurrent manner, up to TARGET_NUM_REPLICATION at a time. As
223+
// soon as TARGET_NUM_REPLICATION requests are successful, we stop.
221224
let put_kv_futs = clients
222225
.into_iter()
223226
.map(|client| replicate_kv_to_one_server(client, key, payload, ttl));

quickwit/quickwit-search/src/metrics.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
use bytesize::ByteSize;
1818
use once_cell::sync::Lazy;
1919
use quickwit_common::metrics::{
20-
exponential_buckets, linear_buckets, new_counter, new_counter_vec, new_gauge_vec,
20+
exponential_buckets, linear_buckets, new_counter, new_counter_vec, new_gauge, new_gauge_vec,
2121
new_histogram, new_histogram_vec, Histogram, HistogramVec, IntCounter, IntCounterVec, IntGauge,
2222
};
2323

@@ -34,6 +34,7 @@ pub struct SearchMetrics {
3434
pub leaf_search_single_split_tasks_pending: IntGauge,
3535
pub leaf_search_single_split_tasks_ongoing: IntGauge,
3636
pub leaf_search_single_split_warmup_num_bytes: Histogram,
37+
pub searcher_local_kv_store_size_bytes: IntGauge,
3738
}
3839

3940
impl Default for SearchMetrics {
@@ -146,6 +147,13 @@ impl Default for SearchMetrics {
146147
&[],
147148
["affinity"],
148149
),
150+
searcher_local_kv_store_size_bytes: new_gauge(
151+
"searcher_local_kv_store_size_bytes",
152+
"Size of the searcher kv store in bytes. This store is used to cache scroll \
153+
contexts.",
154+
"search",
155+
&[],
156+
),
149157
}
150158
}
151159
}

quickwit/quickwit-search/src/scroll_context.rs

Lines changed: 31 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ use std::time::Duration;
2222
use anyhow::Context;
2323
use base64::prelude::BASE64_STANDARD;
2424
use base64::Engine;
25+
use quickwit_common::metrics::GaugeGuard;
26+
use quickwit_common::shared_consts::SCROLL_BATCH_LEN;
2527
use quickwit_metastore::SplitMetadata;
2628
use quickwit_proto::search::{LeafSearchResponse, PartialHit, SearchRequest, SplitSearchError};
2729
use quickwit_proto::types::IndexUid;
@@ -34,14 +36,13 @@ use crate::root::IndexMetasForLeafSearch;
3436
use crate::service::SearcherContext;
3537
use crate::ClusterClient;
3638

37-
/// Maximum capacity of the search after cache.
39+
/// Maximum number of values in the local search KV store.
3840
///
39-
/// For the moment this value is hardcoded.
4041
/// TODO make configurable.
4142
///
4243
/// Assuming a search context of 1MB, this can
4344
/// amount to up to 1GB.
44-
const SCROLL_BATCH_LEN: usize = 1_000;
45+
const LOCAL_KV_CACHE_SIZE: usize = 1_000;
4546

4647
#[derive(Serialize, Deserialize)]
4748
pub(crate) struct ScrollContext {
@@ -120,29 +121,51 @@ impl ScrollContext {
120121
}
121122
}
122123

124+
struct TrackedValue {
125+
content: Vec<u8>,
126+
_total_size_metric_guard: GaugeGuard<'static>,
127+
}
128+
129+
/// In memory key value store with TTL and limited size.
130+
///
131+
/// Once the capacity [LOCAL_KV_CACHE_SIZE] is reached, the oldest entries are
132+
/// removed.
133+
///
134+
/// Currently this store is only used for caching scroll contexts. Using it for
135+
/// other purposes is risky as use cases would compete for its capacity.
123136
#[derive(Clone)]
124137
pub(crate) struct MiniKV {
125-
ttl_with_cache: Arc<RwLock<TtlCache<Vec<u8>, Vec<u8>>>>,
138+
ttl_with_cache: Arc<RwLock<TtlCache<Vec<u8>, TrackedValue>>>,
126139
}
127140

128141
impl Default for MiniKV {
129142
fn default() -> MiniKV {
130143
MiniKV {
131-
ttl_with_cache: Arc::new(RwLock::new(TtlCache::new(SCROLL_BATCH_LEN))),
144+
ttl_with_cache: Arc::new(RwLock::new(TtlCache::new(LOCAL_KV_CACHE_SIZE))),
132145
}
133146
}
134147
}
135148

136149
impl MiniKV {
137150
pub async fn put(&self, key: Vec<u8>, payload: Vec<u8>, ttl: Duration) {
151+
let mut metric_guard =
152+
GaugeGuard::from_gauge(&crate::SEARCH_METRICS.searcher_local_kv_store_size_bytes);
153+
metric_guard.add(payload.len() as i64);
138154
let mut cache_lock = self.ttl_with_cache.write().await;
139-
cache_lock.insert(key, payload, ttl);
155+
cache_lock.insert(
156+
key,
157+
TrackedValue {
158+
content: payload,
159+
_total_size_metric_guard: metric_guard,
160+
},
161+
ttl,
162+
);
140163
}
141164

142165
pub async fn get(&self, key: &[u8]) -> Option<Vec<u8>> {
143166
let cache_lock = self.ttl_with_cache.read().await;
144-
let search_after_context_bytes = cache_lock.get(key)?;
145-
Some(search_after_context_bytes.clone())
167+
let tracked_value = cache_lock.get(key)?;
168+
Some(tracked_value.content.clone())
146169
}
147170
}
148171

quickwit/quickwit-search/src/service.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ pub struct SearchServiceImpl {
5757
storage_resolver: StorageResolver,
5858
cluster_client: ClusterClient,
5959
searcher_context: Arc<SearcherContext>,
60-
search_after_cache: MiniKV,
60+
local_kv_store: MiniKV,
6161
}
6262

6363
/// Trait representing a search service.
@@ -165,7 +165,7 @@ impl SearchServiceImpl {
165165
storage_resolver,
166166
cluster_client,
167167
searcher_context,
168-
search_after_cache: MiniKV::default(),
168+
local_kv_store: MiniKV::default(),
169169
}
170170
}
171171
}
@@ -322,13 +322,13 @@ impl SearchService for SearchServiceImpl {
322322

323323
async fn put_kv(&self, put_request: PutKvRequest) {
324324
let ttl = Duration::from_secs(put_request.ttl_secs as u64);
325-
self.search_after_cache
325+
self.local_kv_store
326326
.put(put_request.key, put_request.payload, ttl)
327327
.await;
328328
}
329329

330330
async fn get_kv(&self, get_request: GetKvRequest) -> Option<Vec<u8>> {
331-
let payload: Vec<u8> = self.search_after_cache.get(&get_request.key).await?;
331+
let payload: Vec<u8> = self.local_kv_store.get(&get_request.key).await?;
332332
Some(payload)
333333
}
334334

0 commit comments

Comments
 (0)