Skip to content

Commit 689ac0a

Browse files
authored
RUST-576 Fix count_documents fails when writeConcern set on Collection (#265)
1 parent 71140e8 commit 689ac0a

File tree

6 files changed

+285
-64
lines changed

6 files changed

+285
-64
lines changed

src/coll/mod.rs

Lines changed: 4 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ use crate::{
2020
operation::{
2121
Aggregate,
2222
Count,
23+
CountDocuments,
2324
Delete,
2425
Distinct,
2526
DropCollection,
@@ -232,70 +233,9 @@ where
232233
options: impl Into<Option<CountOptions>>,
233234
) -> Result<i64> {
234235
let options = options.into();
235-
236-
let mut pipeline = vec![doc! {
237-
"$match": filter.into().unwrap_or_default(),
238-
}];
239-
240-
if let Some(skip) = options.as_ref().and_then(|opts| opts.skip) {
241-
pipeline.push(doc! {
242-
"$skip": skip
243-
});
244-
}
245-
246-
if let Some(limit) = options.as_ref().and_then(|opts| opts.limit) {
247-
pipeline.push(doc! {
248-
"$limit": limit
249-
});
250-
}
251-
252-
pipeline.push(doc! {
253-
"$group": {
254-
"_id": 1,
255-
"n": { "$sum": 1 },
256-
}
257-
});
258-
259-
let aggregate_options = options.map(|opts| {
260-
AggregateOptions::builder()
261-
.hint(opts.hint)
262-
.max_time(opts.max_time)
263-
.collation(opts.collation)
264-
.build()
265-
});
266-
267-
let result = match self
268-
.aggregate(pipeline, aggregate_options)
269-
.await?
270-
.next()
271-
.await
272-
{
273-
Some(doc) => doc?,
274-
None => return Ok(0),
275-
};
276-
277-
let n = match result.get("n") {
278-
Some(n) => n,
279-
None => {
280-
return Err(ErrorKind::ResponseError {
281-
message: "server response to count_documents aggregate did not contain the \
282-
'n' field"
283-
.into(),
284-
}
285-
.into())
286-
}
287-
};
288-
289-
bson_util::get_int(n).ok_or_else(|| {
290-
ErrorKind::ResponseError {
291-
message: format!(
292-
"server response to count_documents aggregate should have contained integer \
293-
'n', but instead had {:?}",
294-
n
295-
),
296-
}
297-
.into()
298-
})
236+
let filter = filter.into();
237+
let op = CountDocuments::new(self.namespace(), filter, options);
238+
self.client().execute_operation(op).await
299239
}
300240

301241
/// Deletes all documents stored in the collection matching `query`.

src/coll/options.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -550,6 +550,17 @@ pub struct CountOptions {
550550
/// information on how to use this option.
551551
#[builder(default)]
552552
pub collation: Option<Collation>,
553+
554+
/// The criteria used to select a server for this operation.
555+
///
556+
/// If none specified, the default set on the collection will be used.
557+
#[builder(default)]
558+
#[serde(skip_serializing)]
559+
pub selection_criteria: Option<SelectionCriteria>,
560+
561+
/// The level of the read concern.
562+
#[builder(default)]
563+
pub read_concern: Option<ReadConcern>,
553564
}
554565

555566
// rustfmt tries to split the link up when it's all on one line, which breaks the link, so we wrap

src/operation/count_documents/mod.rs

Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
#[cfg(test)]
2+
mod test;
3+
4+
use bson::{doc, Document};
5+
6+
use super::{Operation, Retryability};
7+
use crate::{
8+
bson_util,
9+
cmap::{Command, CommandResponse, StreamDescription},
10+
error::{ErrorKind, Result},
11+
operation::aggregate::Aggregate,
12+
options::{AggregateOptions, CountOptions},
13+
selection_criteria::SelectionCriteria,
14+
Namespace,
15+
};
16+
17+
pub(crate) struct CountDocuments {
18+
aggregate: Aggregate,
19+
}
20+
21+
impl CountDocuments {
22+
pub(crate) fn new(
23+
namespace: Namespace,
24+
filter: Option<Document>,
25+
options: Option<CountOptions>,
26+
) -> Self {
27+
let mut pipeline = vec![doc! {
28+
"$match": filter.unwrap_or_default(),
29+
}];
30+
31+
if let Some(skip) = options.as_ref().and_then(|opts| opts.skip) {
32+
pipeline.push(doc! {
33+
"$skip": skip
34+
});
35+
}
36+
37+
if let Some(limit) = options.as_ref().and_then(|opts| opts.limit) {
38+
pipeline.push(doc! {
39+
"$limit": limit
40+
});
41+
}
42+
43+
pipeline.push(doc! {
44+
"$group": {
45+
"_id": 1,
46+
"n": { "$sum": 1 },
47+
}
48+
});
49+
50+
let aggregate_options = options.map(|opts| {
51+
AggregateOptions::builder()
52+
.hint(opts.hint)
53+
.max_time(opts.max_time)
54+
.collation(opts.collation)
55+
.selection_criteria(opts.selection_criteria)
56+
.read_concern(opts.read_concern)
57+
.build()
58+
});
59+
60+
Self {
61+
aggregate: Aggregate::new(namespace, pipeline, aggregate_options),
62+
}
63+
}
64+
}
65+
66+
impl Operation for CountDocuments {
67+
type O = i64;
68+
const NAME: &'static str = Aggregate::NAME;
69+
70+
fn build(&self, description: &StreamDescription) -> Result<Command> {
71+
self.aggregate.build(description)
72+
}
73+
74+
fn handle_response(&self, response: CommandResponse) -> Result<Self::O> {
75+
let result = self
76+
.aggregate
77+
.handle_response(response)
78+
.map(|mut spec| spec.initial_buffer.pop_front())?;
79+
80+
let result_doc = match result {
81+
Some(doc) => doc,
82+
None => return Ok(0),
83+
};
84+
85+
let n = match result_doc.get("n") {
86+
Some(n) => n,
87+
None => {
88+
return Err(ErrorKind::ResponseError {
89+
message: "server response to count_documents aggregate did not contain the \
90+
'n' field"
91+
.into(),
92+
}
93+
.into())
94+
}
95+
};
96+
97+
bson_util::get_int(n).ok_or_else(|| {
98+
ErrorKind::ResponseError {
99+
message: format!(
100+
"server response to count_documents aggregate should have contained integer \
101+
'n', but instead had {:?}",
102+
n
103+
),
104+
}
105+
.into()
106+
})
107+
}
108+
109+
fn selection_criteria(&self) -> Option<&SelectionCriteria> {
110+
self.aggregate.selection_criteria()
111+
}
112+
113+
fn retryability(&self) -> Retryability {
114+
Retryability::Read
115+
}
116+
}

src/operation/count_documents/test.rs

Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
use crate::{
2+
bson::doc,
3+
bson_util,
4+
cmap::{CommandResponse, StreamDescription},
5+
coll::Namespace,
6+
concern::ReadConcern,
7+
operation::{test, Operation},
8+
options::{CountOptions, Hint},
9+
};
10+
11+
use super::CountDocuments;
12+
13+
#[cfg_attr(feature = "tokio-runtime", tokio::test)]
14+
#[cfg_attr(feature = "async-std-runtime", async_std::test)]
15+
async fn build() {
16+
let ns = Namespace {
17+
db: "test_db".to_string(),
18+
coll: "test_coll".to_string(),
19+
};
20+
let count_op = CountDocuments::new(ns, Some(doc! { "x": 1 }), None);
21+
let mut count_command = count_op
22+
.build(&StreamDescription::new_testing())
23+
.expect("error on build");
24+
25+
let mut expected_body = doc! {
26+
"aggregate": "test_coll",
27+
"pipeline": [
28+
{ "$match": { "x": 1 } },
29+
{ "$group": { "_id": 1, "n": { "$sum": 1 } } },
30+
],
31+
"cursor": { }
32+
};
33+
34+
bson_util::sort_document(&mut expected_body);
35+
bson_util::sort_document(&mut count_command.body);
36+
37+
assert_eq!(count_command.body, expected_body);
38+
assert_eq!(count_command.target_db, "test_db");
39+
assert_eq!(count_command.read_pref, None);
40+
}
41+
42+
#[cfg_attr(feature = "tokio-runtime", tokio::test)]
43+
#[cfg_attr(feature = "async-std-runtime", async_std::test)]
44+
async fn build_with_options() {
45+
let skip = 2;
46+
let limit = 5;
47+
let options = CountOptions::builder()
48+
.skip(skip)
49+
.limit(limit)
50+
.hint(Hint::Name("_id_1".to_string()))
51+
.read_concern(ReadConcern::available())
52+
.build();
53+
let ns = Namespace {
54+
db: "test_db".to_string(),
55+
coll: "test_coll".to_string(),
56+
};
57+
let count_op = CountDocuments::new(ns, None, Some(options));
58+
let mut count_command = count_op
59+
.build(&StreamDescription::new_testing())
60+
.expect("error on build");
61+
62+
let mut expected_body = doc! {
63+
"aggregate": "test_coll",
64+
"pipeline": [
65+
{ "$match": {} },
66+
{ "$skip": skip },
67+
{ "$limit": limit },
68+
{ "$group": { "_id": 1, "n": { "$sum": 1 } } },
69+
],
70+
"hint": "_id_1",
71+
"cursor": { },
72+
"readConcern": { "level": "available" },
73+
};
74+
75+
bson_util::sort_document(&mut expected_body);
76+
bson_util::sort_document(&mut count_command.body);
77+
78+
assert_eq!(count_command.body, expected_body);
79+
assert_eq!(count_command.target_db, "test_db");
80+
assert_eq!(count_command.read_pref, None);
81+
}
82+
83+
#[cfg_attr(feature = "tokio-runtime", tokio::test)]
84+
#[cfg_attr(feature = "async-std-runtime", async_std::test)]
85+
async fn op_selection_criteria() {
86+
test::op_selection_criteria(|selection_criteria| {
87+
let options = CountOptions {
88+
selection_criteria,
89+
..Default::default()
90+
};
91+
CountDocuments::new(Namespace::empty(), None, Some(options))
92+
});
93+
}
94+
95+
#[cfg_attr(feature = "tokio-runtime", tokio::test)]
96+
#[cfg_attr(feature = "async-std-runtime", async_std::test)]
97+
async fn handle_success() {
98+
let ns = Namespace {
99+
db: "test_db".to_string(),
100+
coll: "test_coll".to_string(),
101+
};
102+
let count_op = CountDocuments::new(ns, None, None);
103+
104+
let n = 26;
105+
let response = CommandResponse::with_document(doc! {
106+
"cursor" : {
107+
"firstBatch" : [
108+
{
109+
"_id" : 1,
110+
"n" : n
111+
}
112+
],
113+
"id" : 0,
114+
"ns" : "test_db.test_coll"
115+
},
116+
"ok" : 1
117+
});
118+
119+
let actual_values = count_op
120+
.handle_response(response)
121+
.expect("supposed to succeed");
122+
123+
assert_eq!(actual_values, n);
124+
}

src/operation/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
mod aggregate;
22
mod count;
3+
mod count_documents;
34
mod create;
45
mod delete;
56
mod distinct;
@@ -37,6 +38,7 @@ use crate::{
3738

3839
pub(crate) use aggregate::Aggregate;
3940
pub(crate) use count::Count;
41+
pub(crate) use count_documents::CountDocuments;
4042
pub(crate) use create::Create;
4143
pub(crate) use delete::Delete;
4244
pub(crate) use distinct::Distinct;

0 commit comments

Comments
 (0)