@@ -13,10 +13,10 @@ use cmp;
13
13
use collections:: Collection ;
14
14
use comm:: { Sender , Receiver } ;
15
15
use io;
16
- use option:: { None , Option , Some } ;
16
+ use option:: { None , Some } ;
17
17
use result:: { Ok , Err } ;
18
18
use slice:: { bytes, CloneableVector } ;
19
- use super :: { Reader , Writer , IoResult } ;
19
+ use super :: { Buffer , Reader , Writer , IoResult } ;
20
20
use vec:: Vec ;
21
21
22
22
/// Allows reading from a rx.
@@ -37,7 +37,7 @@ use vec::Vec;
37
37
/// }
38
38
/// ```
39
39
pub struct ChanReader {
40
- buf : Option < Vec < u8 > > , // A buffer of bytes received but not consumed.
40
+ buf : Vec < u8 > , // A buffer of bytes received but not consumed.
41
41
pos : uint , // How many of the buffered bytes have already be consumed.
42
42
rx : Receiver < Vec < u8 > > , // The Receiver to pull data from.
43
43
closed : bool , // Whether the channel this Receiver connects to has been closed.
@@ -47,35 +47,59 @@ impl ChanReader {
47
47
/// Wraps a `Port` in a `ChanReader` structure
48
48
pub fn new ( rx : Receiver < Vec < u8 > > ) -> ChanReader {
49
49
ChanReader {
50
- buf : None ,
50
+ buf : Vec :: new ( ) ,
51
51
pos : 0 ,
52
52
rx : rx,
53
53
closed : false ,
54
54
}
55
55
}
56
56
}
57
57
58
+ impl Buffer for ChanReader {
59
+ fn fill_buf < ' a > ( & ' a mut self ) -> IoResult < & ' a [ u8 ] > {
60
+ if self . pos >= self . buf . len ( ) {
61
+ self . pos = 0 ;
62
+ match self . rx . recv_opt ( ) {
63
+ Ok ( bytes) => {
64
+ self . buf = bytes;
65
+ } ,
66
+ Err ( ( ) ) => {
67
+ self . closed = true ;
68
+ self . buf = Vec :: new ( ) ;
69
+ }
70
+ }
71
+ }
72
+ if self . closed {
73
+ Err ( io:: standard_error ( io:: EndOfFile ) )
74
+ } else {
75
+ Ok ( self . buf . slice_from ( self . pos ) )
76
+ }
77
+ }
78
+
79
+ fn consume ( & mut self , amt : uint ) {
80
+ self . pos += amt;
81
+ assert ! ( self . pos <= self . buf. len( ) ) ;
82
+ }
83
+ }
84
+
58
85
impl Reader for ChanReader {
59
86
fn read ( & mut self , buf : & mut [ u8 ] ) -> IoResult < uint > {
60
87
let mut num_read = 0 ;
61
88
loop {
62
- match self . buf {
63
- Some ( ref prev ) => {
89
+ let count = match self . fill_buf ( ) . ok ( ) {
90
+ Some ( src ) => {
64
91
let dst = buf[ mut num_read..] ;
65
- let src = prev[ self . pos ..] ;
66
- let count = cmp:: min ( dst. len ( ) , src. len ( ) ) ;
92
+ let count = cmp:: min ( src. len ( ) , dst. len ( ) ) ;
67
93
bytes:: copy_memory ( dst, src[ ..count] ) ;
68
- num_read += count;
69
- self . pos += count;
94
+ count
70
95
} ,
71
- None => ( ) ,
96
+ None => 0 ,
72
97
} ;
98
+ self . consume ( count) ;
99
+ num_read += count;
73
100
if num_read == buf. len ( ) || self . closed {
74
101
break ;
75
102
}
76
- self . pos = 0 ;
77
- self . buf = self . rx . recv_opt ( ) . ok ( ) ;
78
- self . closed = self . buf . is_none ( ) ;
79
103
}
80
104
if self . closed && num_read == 0 {
81
105
Err ( io:: standard_error ( io:: EndOfFile ) )
@@ -149,7 +173,6 @@ mod test {
149
173
let mut reader = ChanReader :: new ( rx) ;
150
174
let mut buf = [ 0u8 , ..3 ] ;
151
175
152
-
153
176
assert_eq ! ( Ok ( 0 ) , reader. read( [ ] ) ) ;
154
177
155
178
assert_eq ! ( Ok ( 3 ) , reader. read( buf) ) ;
@@ -178,6 +201,28 @@ mod test {
178
201
assert_eq ! ( a, buf. as_slice( ) ) ;
179
202
}
180
203
204
+ #[ test]
205
+ fn test_rx_buffer ( ) {
206
+ let ( tx, rx) = channel ( ) ;
207
+ task:: spawn ( proc ( ) {
208
+ tx. send ( b"he" . to_vec ( ) ) ;
209
+ tx. send ( b"llo wo" . to_vec ( ) ) ;
210
+ tx. send ( b"" . to_vec ( ) ) ;
211
+ tx. send ( b"rld\n how " . to_vec ( ) ) ;
212
+ tx. send ( b"are you?" . to_vec ( ) ) ;
213
+ tx. send ( b"" . to_vec ( ) ) ;
214
+ } ) ;
215
+
216
+ let mut reader = ChanReader :: new ( rx) ;
217
+
218
+ assert_eq ! ( Ok ( "hello world\n " . to_string( ) ) , reader. read_line( ) ) ;
219
+ assert_eq ! ( Ok ( "how are you?" . to_string( ) ) , reader. read_line( ) ) ;
220
+ match reader. read_line ( ) {
221
+ Ok ( ..) => fail ! ( ) ,
222
+ Err ( e) => assert_eq ! ( e. kind, io:: EndOfFile ) ,
223
+ }
224
+ }
225
+
181
226
#[ test]
182
227
fn test_chan_writer ( ) {
183
228
let ( tx, rx) = channel ( ) ;
0 commit comments