Skip to content

Commit cbd340c

Browse files
authored
Add a maybe_async method using serde_pool (#299)
1 parent a6f9305 commit cbd340c

File tree

5 files changed

+47
-81
lines changed

5 files changed

+47
-81
lines changed

src/elastic/src/client/async.rs

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
use std::error::Error as StdError;
22
use uuid::Uuid;
33
use futures::{Future, Poll};
4-
use futures_cpupool::CpuPool;
4+
use futures::future::{FutureResult, IntoFuture, Either};
5+
use futures_cpupool::{CpuPool, CpuFuture};
56
use tokio_core::reactor::Handle;
67
use elastic_reqwest::{AsyncBody, AsyncElasticClient};
78
use reqwest::Error as ReqwestError;
@@ -96,6 +97,21 @@ impl Sender for AsyncSender {
9697
}
9798
}
9899

100+
impl AsyncSender {
101+
pub(crate) fn maybe_async<TFn, TResult>(&self, f: TFn)
102+
-> Either<CpuFuture<TResult, Error>, FutureResult<TResult, Error>>
103+
where
104+
TFn: FnOnce() -> Result<TResult, Error> + Send + 'static,
105+
TResult: Send + 'static,
106+
{
107+
if let Some(ref ser_pool) = self.serde_pool {
108+
Either::A(ser_pool.spawn_fn(f))
109+
} else {
110+
Either::B(f().into_future())
111+
}
112+
}
113+
}
114+
99115
/** A future returned by calling `send`. */
100116
pub struct Pending {
101117
inner: Box<Future<Item = AsyncResponseBuilder, Error = Error>>,

src/elastic/src/client/requests/document_index.rs

Lines changed: 8 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,7 @@ Builders for [index requests][docs-index].
55
*/
66

77
use serde_json;
8-
use futures::{Future, IntoFuture, Poll};
9-
use futures_cpupool::CpuPool;
8+
use futures::{Future, Poll};
109
use serde::Serialize;
1110

1211
use error::{self, Error, Result};
@@ -120,7 +119,7 @@ impl<TDocument> IndexRequestInner<TDocument>
120119
where
121120
TDocument: Serialize,
122121
{
123-
fn into_sync_request(self) -> Result<IndexRequest<'static, Vec<u8>>> {
122+
fn into_request(self) -> Result<IndexRequest<'static, Vec<u8>>> {
124123
let body = serde_json::to_vec(&self.doc).map_err(error::request)?;
125124

126125
Ok(IndexRequest::for_index_ty_id(
@@ -132,21 +131,6 @@ where
132131
}
133132
}
134133

135-
impl<TDocument> IndexRequestInner<TDocument>
136-
where
137-
TDocument: Serialize + Send + 'static,
138-
{
139-
fn into_async_request(self, ser_pool: Option<CpuPool>) -> Box<Future<Item = IndexRequest<'static, Vec<u8>>, Error = Error>> {
140-
if let Some(ser_pool) = ser_pool {
141-
let request_future = ser_pool.spawn_fn(|| self.into_sync_request());
142-
143-
Box::new(request_future)
144-
} else {
145-
Box::new(self.into_sync_request().into_future())
146-
}
147-
}
148-
}
149-
150134
/**
151135
# Builder methods
152136
@@ -214,7 +198,7 @@ where
214198
[SyncClient]: ../../type.SyncClient.html
215199
*/
216200
pub fn send(self) -> Result<IndexResponse> {
217-
let req = self.inner.into_sync_request()?;
201+
let req = self.inner.into_request()?;
218202

219203
RequestBuilder::new(self.client, self.params, RawRequestInner::new(req))
220204
.send()?
@@ -278,10 +262,9 @@ where
278262
[AsyncClient]: ../../type.AsyncClient.html
279263
*/
280264
pub fn send(self) -> Pending {
281-
let (client, params) = (self.client, self.params);
265+
let (client, params, inner) = (self.client, self.params, self.inner);
282266

283-
let ser_pool = client.sender.serde_pool.clone();
284-
let req_future = self.inner.into_async_request(ser_pool);
267+
let req_future = client.sender.maybe_async(move || inner.into_request());
285268

286269
let res_future = req_future.and_then(move |req| {
287270
RequestBuilder::new(client, params, RawRequestInner::new(req))
@@ -330,7 +313,7 @@ mod tests {
330313
let req = client
331314
.document_index(index("test-idx"), id("1"), Value::Null)
332315
.inner
333-
.into_sync_request()
316+
.into_request()
334317
.unwrap();
335318

336319
assert_eq!("/test-idx/value/1", req.url.as_ref());
@@ -345,7 +328,7 @@ mod tests {
345328
.document_index(index("test-idx"), id("1"), Value::Null)
346329
.ty("new-ty")
347330
.inner
348-
.into_sync_request()
331+
.into_request()
349332
.unwrap();
350333

351334
assert_eq!("/test-idx/new-ty/1", req.url.as_ref());
@@ -359,7 +342,7 @@ mod tests {
359342
let req = client
360343
.document_index(index("test-idx"), id("1"), &doc)
361344
.inner
362-
.into_sync_request()
345+
.into_request()
363346
.unwrap();
364347

365348
assert_eq!("/test-idx/value/1", req.url.as_ref());

src/elastic/src/client/requests/document_put_mapping.rs

Lines changed: 7 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,7 @@ Builders for [put mapping requests][docs-mapping].
66

77
use std::marker::PhantomData;
88
use serde_json;
9-
use futures::{Future, IntoFuture, Poll};
10-
use futures_cpupool::CpuPool;
9+
use futures::{Future, Poll};
1110
use serde::Serialize;
1211

1312
use error::{self, Error, Result};
@@ -109,7 +108,7 @@ impl<TDocument> PutMappingRequestInner<TDocument>
109108
where
110109
TDocument: DocumentType,
111110
{
112-
fn into_sync_request(self) -> Result<IndicesPutMappingRequest<'static, Vec<u8>>> {
111+
fn into_request(self) -> Result<IndicesPutMappingRequest<'static, Vec<u8>>> {
113112
let body = serde_json::to_vec(&TDocument::index_mapping()).map_err(error::request)?;
114113

115114
Ok(IndicesPutMappingRequest::for_index_ty(
@@ -120,21 +119,6 @@ where
120119
}
121120
}
122121

123-
impl<TDocument> PutMappingRequestInner<TDocument>
124-
where
125-
TDocument: DocumentType + Send + 'static,
126-
{
127-
fn into_async_request(self, ser_pool: Option<CpuPool>) -> Box<Future<Item = IndicesPutMappingRequest<'static, Vec<u8>>, Error = Error>> {
128-
if let Some(ser_pool) = ser_pool {
129-
let request_future = ser_pool.spawn_fn(|| self.into_sync_request());
130-
131-
Box::new(request_future)
132-
} else {
133-
Box::new(self.into_sync_request().into_future())
134-
}
135-
}
136-
}
137-
138122
/**
139123
# Builder methods
140124
@@ -192,7 +176,7 @@ where
192176
[SyncClient]: ../../type.SyncClient.html
193177
*/
194178
pub fn send(self) -> Result<CommandResponse> {
195-
let req = self.inner.into_sync_request()?;
179+
let req = self.inner.into_request()?;
196180

197181
RequestBuilder::new(self.client, self.params, RawRequestInner::new(req))
198182
.send()?
@@ -246,10 +230,9 @@ where
246230
[AsyncClient]: ../../type.AsyncClient.html
247231
*/
248232
pub fn send(self) -> Pending {
249-
let (client, params) = (self.client, self.params);
233+
let (client, params, inner) = (self.client, self.params, self.inner);
250234

251-
let ser_pool = client.sender.serde_pool.clone();
252-
let req_future = self.inner.into_async_request(ser_pool);
235+
let req_future = client.sender.maybe_async(move || inner.into_request());
253236

254237
let res_future = req_future.and_then(move |req| {
255238
RequestBuilder::new(client, params, RawRequestInner::new(req))
@@ -298,7 +281,7 @@ mod tests {
298281
let req = client
299282
.document_put_mapping::<Value>(index("test-idx"))
300283
.inner
301-
.into_sync_request()
284+
.into_request()
302285
.unwrap();
303286

304287
let expected_body = json!({
@@ -321,7 +304,7 @@ mod tests {
321304
.document_put_mapping::<Value>(index("test-idx"))
322305
.ty("new-ty")
323306
.inner
324-
.into_sync_request()
307+
.into_request()
325308
.unwrap();
326309

327310
assert_eq!("/test-idx/_mappings/new-ty", req.url.as_ref());

src/elastic/src/client/requests/document_update.rs

Lines changed: 12 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,7 @@ Builders for [update document requests][docs-update].
55
*/
66

77
use std::marker::PhantomData;
8-
use futures::{Future, IntoFuture, Poll};
9-
use futures_cpupool::CpuPool;
8+
use futures::{Future, Poll};
109
use serde_json::{self, Map, Value};
1110
use serde::ser::{Serialize, Serializer};
1211

@@ -210,7 +209,7 @@ impl<TBody> UpdateRequestInner<TBody>
210209
where
211210
TBody: Serialize,
212211
{
213-
fn into_sync_request(self) -> Result<UpdateRequest<'static, Vec<u8>>, Error> {
212+
fn into_request(self) -> Result<UpdateRequest<'static, Vec<u8>>, Error> {
214213
let body = serde_json::to_vec(&self.body).map_err(error::request)?;
215214

216215
Ok(UpdateRequest::for_index_ty_id(
@@ -222,21 +221,6 @@ where
222221
}
223222
}
224223

225-
impl<TBody> UpdateRequestInner<TBody>
226-
where
227-
TBody: Serialize + Send + 'static,
228-
{
229-
fn into_async_request(self, ser_pool: Option<CpuPool>) -> Box<Future<Item = UpdateRequest<'static, Vec<u8>>, Error = Error>> {
230-
if let Some(ser_pool) = ser_pool {
231-
let request_future = ser_pool.spawn_fn(|| self.into_sync_request());
232-
233-
Box::new(request_future)
234-
} else {
235-
Box::new(self.into_sync_request().into_future())
236-
}
237-
}
238-
}
239-
240224
/**
241225
# Builder methods
242226
@@ -556,7 +540,7 @@ where
556540
[SyncClient]: ../../type.SyncClient.html
557541
*/
558542
pub fn send(self) -> Result<UpdateResponse, Error> {
559-
let req = self.inner.into_sync_request()?;
543+
let req = self.inner.into_request()?;
560544

561545
RequestBuilder::new(self.client, self.params, RawRequestInner::new(req))
562546
.send()?
@@ -618,10 +602,9 @@ where
618602
[AsyncClient]: ../../type.AsyncClient.html
619603
*/
620604
pub fn send(self) -> Pending {
621-
let (client, params) = (self.client, self.params);
605+
let (client, params, inner) = (self.client, self.params, self.inner);
622606

623-
let ser_pool = client.sender.serde_pool.clone();
624-
let req_future = self.inner.into_async_request(ser_pool);
607+
let req_future = client.sender.maybe_async(move || inner.into_request());
625608

626609
let res_future = req_future.and_then(move |req| {
627610
RequestBuilder::new(client, params, RawRequestInner::new(req))
@@ -801,7 +784,7 @@ mod tests {
801784
let req = client
802785
.document_update::<Value>(index("test-idx"), id("1"))
803786
.inner
804-
.into_sync_request()
787+
.into_request()
805788
.unwrap();
806789

807790
assert_eq!("/test-idx/value/1/_update", req.url.as_ref());
@@ -823,7 +806,7 @@ mod tests {
823806
.document_update::<Value>(index("test-idx"), id("1"))
824807
.ty("new-ty")
825808
.inner
826-
.into_sync_request()
809+
.into_request()
827810
.unwrap();
828811

829812
assert_eq!("/test-idx/new-ty/1/_update", req.url.as_ref());
@@ -844,7 +827,7 @@ mod tests {
844827
.document_update::<Value>(index("test-idx"), id("1"))
845828
.doc(doc)
846829
.inner
847-
.into_sync_request()
830+
.into_request()
848831
.unwrap();
849832

850833
let actual_body: Value = serde_json::from_slice(&req.body).unwrap();
@@ -860,7 +843,7 @@ mod tests {
860843
.document_update::<Value>(index("test-idx"), id("1"))
861844
.script("ctx._source.a = params.str")
862845
.inner
863-
.into_sync_request()
846+
.into_request()
864847
.unwrap();
865848

866849
let expected_body = json!({
@@ -882,7 +865,7 @@ mod tests {
882865
.document_update::<Value>(index("test-idx"), id("1"))
883866
.script(ScriptBuilder::new("ctx._source.a = params.str"))
884867
.inner
885-
.into_sync_request()
868+
.into_request()
886869
.unwrap();
887870

888871
let expected_body = json!({
@@ -909,7 +892,7 @@ mod tests {
909892
.param("other", "some other value")
910893
})
911894
.inner
912-
.into_sync_request()
895+
.into_request()
913896
.unwrap();
914897

915898
let expected_body = json!({
@@ -947,7 +930,7 @@ mod tests {
947930
})
948931
})
949932
.inner
950-
.into_sync_request()
933+
.into_request()
951934
.unwrap();
952935

953936
let expected_body = json!({

src/types/benches/mod.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
1-
#![feature(plugin, test)]
2-
#![plugin(json_str)]
1+
#![feature(test)]
32

43
extern crate geo as georust;
54
extern crate geojson;
65
extern crate serde;
76
extern crate serde_json;
87
extern crate test;
8+
#[macro_use]
9+
extern crate json_str;
910
pub extern crate chrono;
1011

1112
extern crate elastic_types;

0 commit comments

Comments
 (0)