Skip to content

Commit 5b67b54

Browse files
authored
Add LimitStore (#2175) (apache#2242)
* Add LimitStore (#2175) * Review feedback * Fix test
1 parent 70691b2 commit 5b67b54

File tree

3 files changed

+265
-0
lines changed

3 files changed

+265
-0
lines changed

src/aws.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -684,6 +684,7 @@ impl AmazonS3Builder {
684684

685685
/// Sets the maximum number of concurrent outstanding
686686
/// connectons. Default is `16`.
687+
#[deprecated(note = "use LimitStore instead")]
687688
pub fn with_max_connections(mut self, max_connections: NonZeroUsize) -> Self {
688689
self.max_connections = max_connections;
689690
self

src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ pub mod aws;
4545
pub mod azure;
4646
#[cfg(feature = "gcp")]
4747
pub mod gcp;
48+
pub mod limit;
4849
pub mod local;
4950
pub mod memory;
5051
pub mod path;

src/limit.rs

Lines changed: 263 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,263 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! An object store that limits the maximum concurrency of the wrapped implementation
19+
20+
use crate::{
21+
BoxStream, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, Path, Result,
22+
StreamExt,
23+
};
24+
use async_trait::async_trait;
25+
use bytes::Bytes;
26+
use futures::Stream;
27+
use std::io::{Error, IoSlice};
28+
use std::ops::Range;
29+
use std::pin::Pin;
30+
use std::sync::Arc;
31+
use std::task::{Context, Poll};
32+
use tokio::io::AsyncWrite;
33+
use tokio::sync::{OwnedSemaphorePermit, Semaphore};
34+
35+
/// Store wrapper that wraps an inner store and limits the maximum number of concurrent
36+
/// object store operations. Where each call to an [`ObjectStore`] member function is
37+
/// considered a single operation, even if it may result in more than one network call
38+
///
39+
/// ```
40+
/// # use object_store::memory::InMemory;
41+
/// # use object_store::limit::LimitStore;
42+
///
43+
/// // Create an in-memory `ObjectStore` limited to 20 concurrent requests
44+
/// let store = LimitStore::new(InMemory::new(), 20);
45+
/// ```
46+
///
47+
#[derive(Debug)]
48+
pub struct LimitStore<T: ObjectStore> {
49+
inner: T,
50+
max_requests: usize,
51+
semaphore: Arc<Semaphore>,
52+
}
53+
54+
impl<T: ObjectStore> LimitStore<T> {
55+
/// Create new limit store that will limit the maximum
56+
/// number of outstanding concurrent requests to
57+
/// `max_requests`
58+
pub fn new(inner: T, max_requests: usize) -> Self {
59+
Self {
60+
inner,
61+
max_requests,
62+
semaphore: Arc::new(Semaphore::new(max_requests)),
63+
}
64+
}
65+
}
66+
67+
impl<T: ObjectStore> std::fmt::Display for LimitStore<T> {
68+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
69+
write!(f, "LimitStore({}, {})", self.max_requests, self.inner)
70+
}
71+
}
72+
73+
#[async_trait]
74+
impl<T: ObjectStore> ObjectStore for LimitStore<T> {
75+
async fn put(&self, location: &Path, bytes: Bytes) -> Result<()> {
76+
let _permit = self.semaphore.acquire().await.unwrap();
77+
self.inner.put(location, bytes).await
78+
}
79+
80+
async fn put_multipart(
81+
&self,
82+
location: &Path,
83+
) -> Result<(MultipartId, Box<dyn AsyncWrite + Unpin + Send>)> {
84+
let permit = Arc::clone(&self.semaphore).acquire_owned().await.unwrap();
85+
let (id, write) = self.inner.put_multipart(location).await?;
86+
Ok((id, Box::new(PermitWrapper::new(write, permit))))
87+
}
88+
89+
async fn abort_multipart(
90+
&self,
91+
location: &Path,
92+
multipart_id: &MultipartId,
93+
) -> Result<()> {
94+
let _permit = self.semaphore.acquire().await.unwrap();
95+
self.inner.abort_multipart(location, multipart_id).await
96+
}
97+
98+
async fn get(&self, location: &Path) -> Result<GetResult> {
99+
let permit = Arc::clone(&self.semaphore).acquire_owned().await.unwrap();
100+
match self.inner.get(location).await? {
101+
r @ GetResult::File(_, _) => Ok(r),
102+
GetResult::Stream(s) => {
103+
Ok(GetResult::Stream(PermitWrapper::new(s, permit).boxed()))
104+
}
105+
}
106+
}
107+
108+
async fn get_range(&self, location: &Path, range: Range<usize>) -> Result<Bytes> {
109+
let _permit = self.semaphore.acquire().await.unwrap();
110+
self.inner.get_range(location, range).await
111+
}
112+
113+
async fn head(&self, location: &Path) -> Result<ObjectMeta> {
114+
let _permit = self.semaphore.acquire().await.unwrap();
115+
self.inner.head(location).await
116+
}
117+
118+
async fn delete(&self, location: &Path) -> Result<()> {
119+
let _permit = self.semaphore.acquire().await.unwrap();
120+
self.inner.delete(location).await
121+
}
122+
123+
async fn list(
124+
&self,
125+
prefix: Option<&Path>,
126+
) -> Result<BoxStream<'_, Result<ObjectMeta>>> {
127+
let permit = Arc::clone(&self.semaphore).acquire_owned().await.unwrap();
128+
let s = self.inner.list(prefix).await?;
129+
Ok(PermitWrapper::new(s, permit).boxed())
130+
}
131+
132+
async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult> {
133+
let _permit = self.semaphore.acquire().await.unwrap();
134+
self.inner.list_with_delimiter(prefix).await
135+
}
136+
137+
async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
138+
let _permit = self.semaphore.acquire().await.unwrap();
139+
self.inner.copy(from, to).await
140+
}
141+
142+
async fn rename(&self, from: &Path, to: &Path) -> Result<()> {
143+
let _permit = self.semaphore.acquire().await.unwrap();
144+
self.inner.rename(from, to).await
145+
}
146+
147+
async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
148+
let _permit = self.semaphore.acquire().await.unwrap();
149+
self.inner.copy_if_not_exists(from, to).await
150+
}
151+
152+
async fn rename_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
153+
let _permit = self.semaphore.acquire().await.unwrap();
154+
self.inner.rename_if_not_exists(from, to).await
155+
}
156+
}
157+
158+
/// Combines an [`OwnedSemaphorePermit`] with some other type
159+
struct PermitWrapper<T> {
160+
inner: T,
161+
#[allow(dead_code)]
162+
permit: OwnedSemaphorePermit,
163+
}
164+
165+
impl<T> PermitWrapper<T> {
166+
fn new(inner: T, permit: OwnedSemaphorePermit) -> Self {
167+
Self { inner, permit }
168+
}
169+
}
170+
171+
impl<T: Stream + Unpin> Stream for PermitWrapper<T> {
172+
type Item = T::Item;
173+
174+
fn poll_next(
175+
mut self: Pin<&mut Self>,
176+
cx: &mut Context<'_>,
177+
) -> Poll<Option<Self::Item>> {
178+
Pin::new(&mut self.inner).poll_next(cx)
179+
}
180+
181+
fn size_hint(&self) -> (usize, Option<usize>) {
182+
self.inner.size_hint()
183+
}
184+
}
185+
186+
impl<T: AsyncWrite + Unpin> AsyncWrite for PermitWrapper<T> {
187+
fn poll_write(
188+
mut self: Pin<&mut Self>,
189+
cx: &mut Context<'_>,
190+
buf: &[u8],
191+
) -> Poll<std::result::Result<usize, Error>> {
192+
Pin::new(&mut self.inner).poll_write(cx, buf)
193+
}
194+
195+
fn poll_flush(
196+
mut self: Pin<&mut Self>,
197+
cx: &mut Context<'_>,
198+
) -> Poll<std::result::Result<(), Error>> {
199+
Pin::new(&mut self.inner).poll_flush(cx)
200+
}
201+
202+
fn poll_shutdown(
203+
mut self: Pin<&mut Self>,
204+
cx: &mut Context<'_>,
205+
) -> Poll<std::result::Result<(), Error>> {
206+
Pin::new(&mut self.inner).poll_shutdown(cx)
207+
}
208+
209+
fn poll_write_vectored(
210+
mut self: Pin<&mut Self>,
211+
cx: &mut Context<'_>,
212+
bufs: &[IoSlice<'_>],
213+
) -> Poll<std::result::Result<usize, Error>> {
214+
Pin::new(&mut self.inner).poll_write_vectored(cx, bufs)
215+
}
216+
217+
fn is_write_vectored(&self) -> bool {
218+
self.inner.is_write_vectored()
219+
}
220+
}
221+
222+
#[cfg(test)]
223+
mod tests {
224+
use crate::limit::LimitStore;
225+
use crate::memory::InMemory;
226+
use crate::tests::{
227+
list_uses_directories_correctly, list_with_delimiter, put_get_delete_list,
228+
rename_and_copy, stream_get,
229+
};
230+
use crate::ObjectStore;
231+
use std::time::Duration;
232+
use tokio::time::timeout;
233+
234+
#[tokio::test]
235+
async fn limit_test() {
236+
let max_requests = 10;
237+
let memory = InMemory::new();
238+
let integration = LimitStore::new(memory, max_requests);
239+
240+
put_get_delete_list(&integration).await.unwrap();
241+
list_uses_directories_correctly(&integration).await.unwrap();
242+
list_with_delimiter(&integration).await.unwrap();
243+
rename_and_copy(&integration).await.unwrap();
244+
stream_get(&integration).await.unwrap();
245+
246+
let mut streams = Vec::with_capacity(max_requests);
247+
for _ in 0..max_requests {
248+
let stream = integration.list(None).await.unwrap();
249+
streams.push(stream);
250+
}
251+
252+
let t = Duration::from_millis(20);
253+
254+
// Expect to not be able to make another request
255+
assert!(timeout(t, integration.list(None)).await.is_err());
256+
257+
// Drop one of the streams
258+
streams.pop();
259+
260+
// Can now make another request
261+
integration.list(None).await.unwrap();
262+
}
263+
}

0 commit comments

Comments
 (0)