Skip to content

Commit e3950fe

Browse files
authored
Runtime update of total blob size using content_range header (#253)
* Runtime update of total blob size using content_range header * typos * simplified code
1 parent 5bd06be commit e3950fe

File tree

9 files changed

+183
-50
lines changed

9 files changed

+183
-50
lines changed

sdk/core/src/errors.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,10 @@ pub enum ParsingError {
1818

1919
#[derive(Debug, Clone, PartialEq, thiserror::Error)]
2020
pub enum ParseError {
21-
#[error("Split not found")]
22-
SplitNotFound,
21+
#[error("Expected token \"{}\" not found", 0)]
22+
TokenNotFound(String),
23+
#[error("Expected split char \'{}\' not found", 0)]
24+
SplitNotFound(char),
2325
#[error("Parse int error {}", 0)]
2426
ParseIntError(ParseIntError),
2527
}
Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
use crate::errors::ParseError;
2+
use std::fmt;
3+
use std::str::FromStr;
4+
5+
const PREFIX: &str = "bytes ";
6+
7+
#[derive(Debug, Copy, Clone, PartialEq)]
8+
pub struct ContentRange {
9+
start: u64,
10+
end: u64,
11+
total_length: u64,
12+
}
13+
14+
impl ContentRange {
15+
pub fn new(start: u64, end: u64, total_length: u64) -> ContentRange {
16+
ContentRange {
17+
start,
18+
end,
19+
total_length,
20+
}
21+
}
22+
23+
pub fn start(&self) -> u64 {
24+
self.start
25+
}
26+
27+
pub fn end(&self) -> u64 {
28+
self.end
29+
}
30+
31+
pub fn total_length(&self) -> u64 {
32+
self.total_length
33+
}
34+
35+
pub fn is_empty(&self) -> bool {
36+
self.end == self.start
37+
}
38+
}
39+
40+
impl FromStr for ContentRange {
41+
type Err = ParseError;
42+
fn from_str(s: &str) -> Result<ContentRange, ParseError> {
43+
let remaining = s
44+
.strip_prefix(PREFIX)
45+
.ok_or(ParseError::TokenNotFound(PREFIX.to_owned()))?;
46+
47+
let mut split_at_dash = remaining.split('-');
48+
let start = split_at_dash.next().unwrap().parse()?;
49+
50+
let mut split_at_slash = split_at_dash
51+
.next()
52+
.ok_or(ParseError::SplitNotFound('-'))?
53+
.split('/');
54+
55+
let end = split_at_slash.next().unwrap().parse()?;
56+
let total_length = split_at_slash
57+
.next()
58+
.ok_or(ParseError::SplitNotFound('/'))?
59+
.parse()?;
60+
61+
Ok(ContentRange {
62+
start,
63+
end,
64+
total_length,
65+
})
66+
}
67+
}
68+
69+
impl fmt::Display for ContentRange {
70+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
71+
write!(
72+
f,
73+
"{}{}-{}/{}",
74+
PREFIX,
75+
self.start(),
76+
self.end(),
77+
self.total_length()
78+
)
79+
}
80+
}
81+
82+
#[cfg(test)]
83+
mod test {
84+
use super::*;
85+
86+
#[test]
87+
fn test_parse() {
88+
let range = "bytes 172032-172489/172490"
89+
.parse::<ContentRange>()
90+
.unwrap();
91+
92+
assert_eq!(range.start(), 172032);
93+
assert_eq!(range.end(), 172489);
94+
assert_eq!(range.total_length(), 172490);
95+
}
96+
97+
#[test]
98+
#[should_panic(expected = "TokenNotFound(\"bytes \")")]
99+
fn test_parse_no_starting_token() {
100+
"something else".parse::<ContentRange>().unwrap();
101+
}
102+
103+
#[test]
104+
#[should_panic(expected = "SplitNotFound('-')")]
105+
fn test_parse_no_dash() {
106+
"bytes 100".parse::<ContentRange>().unwrap();
107+
}
108+
109+
#[test]
110+
#[should_panic(expected = "SplitNotFound('/')")]
111+
fn test_parse_no_slash() {
112+
"bytes 100-500".parse::<ContentRange>().unwrap();
113+
}
114+
115+
#[test]
116+
fn test_display() {
117+
let range = ContentRange {
118+
start: 100,
119+
end: 501,
120+
total_length: 5000,
121+
};
122+
123+
let txt = format!("{}", range);
124+
125+
assert_eq!(txt, "bytes 100-501/5000");
126+
}
127+
}

sdk/core/src/request_options/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ mod client_request_id;
44
mod content_disposition;
55
mod content_encoding;
66
mod content_language;
7+
mod content_range;
78
mod content_type;
89
mod continuation;
910
mod delimiter;
@@ -33,6 +34,7 @@ pub use client_request_id::ClientRequestId;
3334
pub use content_disposition::ContentDisposition;
3435
pub use content_encoding::ContentEncoding;
3536
pub use content_language::ContentLanguage;
37+
pub use content_range::ContentRange;
3638
pub use content_type::ContentType;
3739
pub use continuation::Continuation;
3840
pub use delimiter::Delimiter;

sdk/core/src/request_options/range.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ impl FromStr for Range {
7474
fn from_str(s: &str) -> Result<Range, ParseError> {
7575
let v = s.split('/').collect::<Vec<&str>>();
7676
if v.len() != 2 {
77-
return Err(ParseError::SplitNotFound);
77+
return Err(ParseError::SplitNotFound('/'));
7878
}
7979

8080
let cp_start = v[0].parse::<u64>()?;

sdk/storage/examples/blob_00.rs

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -38,21 +38,28 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
3838

3939
trace!("Requesting blob");
4040

41+
// this is a single call that retrieves the first 1KB of the blob (or less if the blob is
42+
// smaller). The range(...) call is optional.
4143
let response = blob_client
4244
.get()
43-
.range(Range::new(0, 128000))
45+
.range(Range::new(0, 1024))
4446
.execute()
4547
.await?;
4648

4749
println!("{:#?}", response);
4850

49-
let mut stream = Box::pin(blob_client.get().stream(128));
51+
let mut complete_response = Vec::new();
52+
53+
// this is how you stream a blob. You can specify the range(...) value as above if necessary.
54+
// In this case we are retrieving the whole blob in 8KB chunks.
55+
let mut stream = Box::pin(blob_client.get().stream(1024 * 8));
5056
while let Some(value) = stream.next().await {
51-
println!("received {:?} bytes", value?.data.len());
57+
let data = value?.data;
58+
println!("received {:?} bytes", data.len());
59+
complete_response.extend(&data as &[u8]);
5260
}
5361

54-
let s_content = String::from_utf8(response.data.to_vec())?;
55-
println!("blob == {:?}", blob);
62+
let s_content = String::from_utf8(complete_response)?;
5663
println!("s_content == {}", s_content);
5764

5865
Ok(())

sdk/storage/src/blob/blob/requests/get_blob_builder.rs

Lines changed: 20 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,8 @@ impl<'a> GetBlobBuilder<'a> {
6565
None,
6666
)?;
6767

68+
debug!("request == {:#?}", request);
69+
6870
let expected_status_code = if self.range.is_some() {
6971
http::StatusCode::PARTIAL_CONTENT
7072
} else {
@@ -91,15 +93,21 @@ impl<'a> GetBlobBuilder<'a> {
9193
End,
9294
}
9395

94-
let complete_range = Range::new(0, u64::MAX);
96+
// this can either be the range requested by the caller or the complete file.
97+
let requested_range = self.range.unwrap_or(Range::new(0, u64::MAX));
9598

9699
futures::stream::unfold(States::Init, move |state| async move {
97-
let remaining = match state {
98-
States::Init => self.range.unwrap_or(complete_range),
100+
let mut remaining = match state {
101+
States::Init => requested_range,
99102
States::Progress(range) => range,
100103
States::End => return None,
101104
};
102105

106+
debug!(
107+
"remaining.start == {}, chunk_size == {}, remaining.end == {}",
108+
remaining.start, chunk_size, remaining.end
109+
);
110+
103111
let range = if remaining.start + chunk_size > remaining.end {
104112
Range::new(remaining.start, remaining.end)
105113
} else {
@@ -113,41 +121,16 @@ impl<'a> GetBlobBuilder<'a> {
113121
Err(err) => return Some((Err(err), States::End)),
114122
};
115123

124+
debug!("response.content_range == {:?}", response.content_range);
125+
126+
// now that we know what the remote blob size is, let's update the
127+
// boundary. We do this only if it's smaller than the requested size because the could
128+
// have specified a smaller range.
129+
remaining.end =
130+
std::cmp::min(requested_range.end, response.content_range.total_length());
131+
116132
let next_state = if remaining.end > range.end {
117-
if self.range.is_some() {
118-
States::Progress(Range::new(range.end, remaining.end))
119-
} else {
120-
// if we are here it means the user have not specified a
121-
// range and we didn't get the whole blob in one passing.
122-
// We specified u64::MAX as the first range but now
123-
// we need to find the correct size to avoid requesting data
124-
// outside the valid range.
125-
debug!("content-range == {:?}", response.content_range);
126-
// this unwrap should always be safe since we did not
127-
// get the whole blob in the previous call.
128-
let content_range = response.content_range.clone().unwrap();
129-
let ridx = match content_range.find('/') {
130-
Some(ridx) => ridx,
131-
None => {
132-
return Some((
133-
Err("The returned content-range is invalid: / is not present"
134-
.into()),
135-
States::End,
136-
))
137-
}
138-
};
139-
let total =
140-
match str::parse(&content_range[ridx + 1..]) {
141-
Ok(total) => total,
142-
Err(_err) => return Some((
143-
Err("The returned content-range is invalid: after / there is a non valid number"
144-
.into()),
145-
States::End,
146-
)),
147-
};
148-
149-
States::Progress(Range::new(range.end, total))
150-
}
133+
States::Progress(Range::new(range.end, remaining.end))
151134
} else {
152135
States::End
153136
};

sdk/storage/src/blob/blob/responses/get_blob_response.rs

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,21 @@
11
use crate::{blob::blob::Blob, AzureStorageError};
2+
use azure_core::errors::AzureError;
23
use azure_core::headers::{date_from_headers, request_id_from_headers};
4+
use azure_core::prelude::ContentRange;
35
use azure_core::RequestId;
46
use bytes::Bytes;
57
use chrono::{DateTime, Utc};
68
use http::Response;
79
use std::convert::TryFrom;
10+
use std::str::FromStr;
811

912
#[derive(Debug, Clone)]
1013
pub struct GetBlobResponse {
1114
pub request_id: RequestId,
1215
pub blob: Blob,
1316
pub data: Bytes,
1417
pub date: DateTime<Utc>,
15-
pub content_range: Option<String>,
18+
pub content_range: ContentRange,
1619
}
1720

1821
impl TryFrom<(&str, Response<Bytes>)> for GetBlobResponse {
@@ -23,10 +26,13 @@ impl TryFrom<(&str, Response<Bytes>)> for GetBlobResponse {
2326
let request_id = request_id_from_headers(response.headers())?;
2427
let date = date_from_headers(response.headers())?;
2528

26-
let content_range = response
27-
.headers()
28-
.get(http::header::CONTENT_RANGE)
29-
.map(|h| h.to_str().unwrap().to_owned());
29+
let content_range = ContentRange::from_str(
30+
response
31+
.headers()
32+
.get(http::header::CONTENT_RANGE)
33+
.ok_or_else(|| AzureError::HeaderNotFound(http::header::CONTENT_RANGE.to_string()))?
34+
.to_str()?,
35+
)?;
3036

3137
Ok(GetBlobResponse {
3238
request_id,

sdk/storage/src/core/errors.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22
pub enum AzureStorageError {
33
#[error(transparent)]
44
CoreError(#[from] azure_core::errors::AzureError),
5+
#[error("Parse error: {}", 0)]
6+
ParseError(#[from] azure_core::errors::ParseError),
57
#[error("Parsing error: {}", 0)]
68
ParsingError(azure_core::errors::ParsingError),
79
#[error("Permission error: {}", 0)]

sdk/storage/tests/stream_blob00.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,10 @@ async fn code() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
9292
assert_eq!(expected_string, returned_string);
9393
}
9494

95+
// test streaming a blob smaller than the chunk size issue 239.
96+
let mut stream = Box::pin(blob.get().stream(1024 * 8));
97+
while let Some(_value) = stream.next().await {}
98+
9599
blob.delete()
96100
.delete_snapshots_method(DeleteSnapshotsMethod::Include)
97101
.execute()

0 commit comments

Comments
 (0)