Skip to content

Commit 8802ec5

Browse files
authored
Merge pull request #19032 from ianic/add_buffered_tee
propose adding BufferedTee to the std.io
2 parents e60d667 + a5326c5 commit 8802ec5

File tree

3 files changed

+406
-143
lines changed

3 files changed

+406
-143
lines changed

lib/std/io.zig

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -411,6 +411,9 @@ pub const BufferedAtomicFile = @import("io/buffered_atomic_file.zig").BufferedAt
411411

412412
pub const StreamSource = @import("io/stream_source.zig").StreamSource;
413413

414+
pub const BufferedTee = @import("io/buffered_tee.zig").BufferedTee;
415+
pub const bufferedTee = @import("io/buffered_tee.zig").bufferedTee;
416+
414417
pub const tty = @import("io/tty.zig");
415418

416419
/// A Writer that doesn't write to anything.
@@ -692,4 +695,5 @@ test {
692695
_ = @import("io/seekable_stream.zig");
693696
_ = @import("io/stream_source.zig");
694697
_ = @import("io/test.zig");
698+
_ = @import("io/buffered_tee.zig");
695699
}

lib/std/io/buffered_tee.zig

Lines changed: 379 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,379 @@
1+
const std = @import("std");
2+
const io = std.io;
3+
const assert = std.debug.assert;
4+
const testing = std.testing;
5+
6+
/// BufferedTee provides reader interface to the consumer. Data read by consumer
7+
/// is also written to the output. Output is hold lookahead_size bytes behind
8+
/// consumer. Allowing consumer to put back some bytes to be read again. On flush
9+
/// all consumed bytes are flushed to the output.
10+
///
11+
/// input -> tee -> consumer
12+
/// |
13+
/// output
14+
///
15+
/// input - underlying unbuffered reader
16+
/// output - writer, receives data read by consumer
17+
/// consumer - uses provided reader interface
18+
///
19+
/// If lookahead_size is zero output always has same bytes as consumer.
20+
///
21+
pub fn BufferedTee(
22+
comptime buffer_size: usize, // internal buffer size in bytes
23+
comptime lookahead_size: usize, // lookahead, number of bytes to hold output behind consumer
24+
comptime InputReaderType: type,
25+
comptime OutputWriterType: type,
26+
) type {
27+
comptime assert(buffer_size > lookahead_size);
28+
29+
return struct {
30+
input: InputReaderType,
31+
output: OutputWriterType,
32+
33+
buf: [buffer_size]u8 = undefined, // internal buffer
34+
tail: usize = 0, // buffer is filled up to this position with bytes from input
35+
rp: usize = 0, // reader pointer; consumer has read up to this position
36+
wp: usize = 0, // writer pointer; data is sent to the output up to this position
37+
38+
pub const Error = InputReaderType.Error || OutputWriterType.Error;
39+
pub const Reader = io.Reader(*Self, Error, read);
40+
41+
const Self = @This();
42+
43+
pub fn read(self: *Self, dest: []u8) Error!usize {
44+
var dest_index: usize = 0;
45+
46+
while (dest_index < dest.len) {
47+
const written = @min(dest.len - dest_index, self.tail - self.rp);
48+
if (written == 0) {
49+
try self.preserveLookahead();
50+
// fill upper part of the buf
51+
const n = try self.input.read(self.buf[self.tail..]);
52+
if (n == 0) {
53+
// reading from the unbuffered stream returned nothing
54+
// so we have nothing left to read.
55+
return dest_index;
56+
}
57+
self.tail += n;
58+
} else {
59+
@memcpy(dest[dest_index..][0..written], self.buf[self.rp..][0..written]);
60+
self.rp += written;
61+
dest_index += written;
62+
try self.flush_(lookahead_size);
63+
}
64+
}
65+
return dest.len;
66+
}
67+
68+
/// Move lookahead_size bytes to the buffer start.
69+
fn preserveLookahead(self: *Self) !void {
70+
assert(self.tail == self.rp);
71+
if (lookahead_size == 0) {
72+
// Flush is called on each read so wp must follow rp when lookahead_size == 0.
73+
assert(self.wp == self.rp);
74+
// Nothing to preserve rewind pointer to the buffer start
75+
self.rp = 0;
76+
self.wp = 0;
77+
self.tail = 0;
78+
return;
79+
}
80+
if (self.tail <= lookahead_size) {
81+
// There is still palce in the buffer, append to buffer from tail position.
82+
return;
83+
}
84+
try self.flush_(lookahead_size);
85+
const head = self.tail - lookahead_size;
86+
// Preserve head..tail at the start of the buffer.
87+
std.mem.copyForwards(u8, self.buf[0..lookahead_size], self.buf[head..self.tail]);
88+
self.wp -= head;
89+
assert(self.wp <= lookahead_size);
90+
self.rp = lookahead_size;
91+
self.tail = lookahead_size;
92+
}
93+
94+
/// Flush to the output all but lookahead size bytes.
95+
fn flush_(self: *Self, lookahead: usize) !void {
96+
if (self.rp <= self.wp + lookahead) return;
97+
const new_wp = self.rp - lookahead;
98+
try self.output.writeAll(self.buf[self.wp..new_wp]);
99+
self.wp = new_wp;
100+
}
101+
102+
/// Flush to the output all consumed bytes.
103+
pub fn flush(self: *Self) !void {
104+
try self.flush_(0);
105+
}
106+
107+
/// Put back some bytes to be consumed again. Usefull when we overshoot
108+
/// reading and want to return that overshoot bytes. Can return maximum
109+
/// of lookahead_size number of bytes.
110+
pub fn putBack(self: *Self, n: usize) void {
111+
assert(n <= lookahead_size and n <= self.rp);
112+
self.rp -= n;
113+
}
114+
115+
pub fn reader(self: *Self) Reader {
116+
return .{ .context = self };
117+
}
118+
};
119+
}
120+
121+
pub fn bufferedTee(
122+
comptime buffer_size: usize,
123+
comptime lookahead_size: usize,
124+
input: anytype,
125+
output: anytype,
126+
) BufferedTee(
127+
buffer_size,
128+
lookahead_size,
129+
@TypeOf(input),
130+
@TypeOf(output),
131+
) {
132+
return .{ .input = input, .output = output };
133+
}
134+
135+
// Running test from std.io.BufferedReader on BufferedTee
136+
// It should act as BufferedReader for consumer.
137+
138+
fn BufferedReader(comptime buffer_size: usize, comptime ReaderType: type) type {
139+
return BufferedTee(buffer_size, 0, ReaderType, @TypeOf(io.null_writer));
140+
}
141+
142+
fn bufferedReader(reader: anytype) BufferedReader(4096, @TypeOf(reader)) {
143+
return .{
144+
.input = reader,
145+
.output = io.null_writer,
146+
};
147+
}
148+
149+
test "io.BufferedTee io.BufferedReader OneByte" {
150+
const OneByteReadReader = struct {
151+
str: []const u8,
152+
curr: usize,
153+
154+
const Error = error{NoError};
155+
const Self = @This();
156+
const Reader = io.Reader(*Self, Error, read);
157+
158+
fn init(str: []const u8) Self {
159+
return Self{
160+
.str = str,
161+
.curr = 0,
162+
};
163+
}
164+
165+
fn read(self: *Self, dest: []u8) Error!usize {
166+
if (self.str.len <= self.curr or dest.len == 0)
167+
return 0;
168+
169+
dest[0] = self.str[self.curr];
170+
self.curr += 1;
171+
return 1;
172+
}
173+
174+
fn reader(self: *Self) Reader {
175+
return .{ .context = self };
176+
}
177+
};
178+
179+
const str = "This is a test";
180+
var one_byte_stream = OneByteReadReader.init(str);
181+
var buf_reader = bufferedReader(one_byte_stream.reader());
182+
const stream = buf_reader.reader();
183+
184+
const res = try stream.readAllAlloc(testing.allocator, str.len + 1);
185+
defer testing.allocator.free(res);
186+
try testing.expectEqualSlices(u8, str, res);
187+
}
188+
189+
test "io.BufferedTee io.BufferedReader Block" {
190+
const BlockReader = struct {
191+
block: []const u8,
192+
reads_allowed: usize,
193+
curr_read: usize,
194+
195+
const Error = error{NoError};
196+
const Self = @This();
197+
const Reader = io.Reader(*Self, Error, read);
198+
199+
fn init(block: []const u8, reads_allowed: usize) Self {
200+
return Self{
201+
.block = block,
202+
.reads_allowed = reads_allowed,
203+
.curr_read = 0,
204+
};
205+
}
206+
207+
fn read(self: *Self, dest: []u8) Error!usize {
208+
if (self.curr_read >= self.reads_allowed) return 0;
209+
@memcpy(dest[0..self.block.len], self.block);
210+
211+
self.curr_read += 1;
212+
return self.block.len;
213+
}
214+
215+
fn reader(self: *Self) Reader {
216+
return .{ .context = self };
217+
}
218+
};
219+
220+
const block = "0123";
221+
222+
// len out == block
223+
{
224+
var test_buf_reader: BufferedReader(4, BlockReader) = .{
225+
.input = BlockReader.init(block, 2),
226+
.output = io.null_writer,
227+
};
228+
var out_buf: [4]u8 = undefined;
229+
_ = try test_buf_reader.read(&out_buf);
230+
try testing.expectEqualSlices(u8, &out_buf, block);
231+
_ = try test_buf_reader.read(&out_buf);
232+
try testing.expectEqualSlices(u8, &out_buf, block);
233+
try testing.expectEqual(try test_buf_reader.read(&out_buf), 0);
234+
}
235+
236+
// len out < block
237+
{
238+
var test_buf_reader: BufferedReader(4, BlockReader) = .{
239+
.input = BlockReader.init(block, 2),
240+
.output = io.null_writer,
241+
};
242+
var out_buf: [3]u8 = undefined;
243+
_ = try test_buf_reader.read(&out_buf);
244+
try testing.expectEqualSlices(u8, &out_buf, "012");
245+
_ = try test_buf_reader.read(&out_buf);
246+
try testing.expectEqualSlices(u8, &out_buf, "301");
247+
const n = try test_buf_reader.read(&out_buf);
248+
try testing.expectEqualSlices(u8, out_buf[0..n], "23");
249+
try testing.expectEqual(try test_buf_reader.read(&out_buf), 0);
250+
}
251+
252+
// len out > block
253+
{
254+
var test_buf_reader: BufferedReader(4, BlockReader) = .{
255+
.input = BlockReader.init(block, 2),
256+
.output = io.null_writer,
257+
};
258+
var out_buf: [5]u8 = undefined;
259+
_ = try test_buf_reader.read(&out_buf);
260+
try testing.expectEqualSlices(u8, &out_buf, "01230");
261+
const n = try test_buf_reader.read(&out_buf);
262+
try testing.expectEqualSlices(u8, out_buf[0..n], "123");
263+
try testing.expectEqual(try test_buf_reader.read(&out_buf), 0);
264+
}
265+
266+
// len out == 0
267+
{
268+
var test_buf_reader: BufferedReader(4, BlockReader) = .{
269+
.input = BlockReader.init(block, 2),
270+
.output = io.null_writer,
271+
};
272+
var out_buf: [0]u8 = undefined;
273+
_ = try test_buf_reader.read(&out_buf);
274+
try testing.expectEqualSlices(u8, &out_buf, "");
275+
}
276+
277+
// len bufreader buf > block
278+
{
279+
var test_buf_reader: BufferedReader(5, BlockReader) = .{
280+
.input = BlockReader.init(block, 2),
281+
.output = io.null_writer,
282+
};
283+
var out_buf: [4]u8 = undefined;
284+
_ = try test_buf_reader.read(&out_buf);
285+
try testing.expectEqualSlices(u8, &out_buf, block);
286+
_ = try test_buf_reader.read(&out_buf);
287+
try testing.expectEqualSlices(u8, &out_buf, block);
288+
try testing.expectEqual(try test_buf_reader.read(&out_buf), 0);
289+
}
290+
}
291+
292+
test "io.BufferedTee with zero lookahead" {
293+
// output has same bytes as consumer
294+
const data = [_]u8{ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 } ** 12;
295+
var in = io.fixedBufferStream(&data);
296+
var out = std.ArrayList(u8).init(testing.allocator);
297+
defer out.deinit();
298+
299+
var bt = bufferedTee(8, 0, in.reader(), out.writer());
300+
301+
var buf: [16]u8 = undefined;
302+
var read_len: usize = 0;
303+
for (0..buf.len) |i| {
304+
const n = try bt.read(buf[0..i]);
305+
try testing.expectEqual(i, n);
306+
read_len += i;
307+
try testing.expectEqual(read_len, out.items.len);
308+
}
309+
}
310+
311+
test "io.BufferedTee with lookahead" {
312+
// output is lookahead bytes behind consumer
313+
inline for (1..8) |lookahead| {
314+
const data = [_]u8{ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 } ** 12;
315+
var in = io.fixedBufferStream(&data);
316+
var out = std.ArrayList(u8).init(testing.allocator);
317+
defer out.deinit();
318+
319+
var bt = bufferedTee(8, lookahead, in.reader(), out.writer());
320+
var buf: [16]u8 = undefined;
321+
322+
var read_len: usize = 0;
323+
for (1..buf.len) |i| {
324+
const n = try bt.read(buf[0..i]);
325+
try testing.expectEqual(i, n);
326+
read_len += i;
327+
const out_len = if (read_len < lookahead) 0 else read_len - lookahead;
328+
try testing.expectEqual(out_len, out.items.len);
329+
}
330+
try testing.expectEqual(read_len, out.items.len + lookahead);
331+
try bt.flush();
332+
try testing.expectEqual(read_len, out.items.len);
333+
}
334+
}
335+
336+
test "io.BufferedTee internal state" {
337+
const data = [_]u8{ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 } ** 2;
338+
var in = io.fixedBufferStream(&data);
339+
var out = std.ArrayList(u8).init(testing.allocator);
340+
defer out.deinit();
341+
342+
var bt = bufferedTee(8, 4, in.reader(), out.writer());
343+
344+
var buf: [16]u8 = undefined;
345+
var n = try bt.read(buf[0..3]);
346+
try testing.expectEqual(3, n);
347+
try testing.expectEqualSlices(u8, data[0..3], buf[0..n]);
348+
try testing.expectEqual(8, bt.tail);
349+
try testing.expectEqual(3, bt.rp);
350+
try testing.expectEqual(0, out.items.len);
351+
352+
n = try bt.read(buf[0..6]);
353+
try testing.expectEqual(6, n);
354+
try testing.expectEqualSlices(u8, data[3..9], buf[0..n]);
355+
try testing.expectEqual(8, bt.tail);
356+
try testing.expectEqual(5, bt.rp);
357+
try testing.expectEqualSlices(u8, data[4..12], &bt.buf);
358+
try testing.expectEqual(5, out.items.len);
359+
360+
n = try bt.read(buf[0..9]);
361+
try testing.expectEqual(9, n);
362+
try testing.expectEqualSlices(u8, data[9..18], buf[0..n]);
363+
try testing.expectEqual(8, bt.tail);
364+
try testing.expectEqual(6, bt.rp);
365+
try testing.expectEqualSlices(u8, data[12..20], &bt.buf);
366+
try testing.expectEqual(14, out.items.len);
367+
368+
try bt.flush();
369+
try testing.expectEqual(18, out.items.len);
370+
371+
bt.putBack(4);
372+
n = try bt.read(buf[0..4]);
373+
try testing.expectEqual(4, n);
374+
try testing.expectEqualSlices(u8, data[14..18], buf[0..n]);
375+
376+
try testing.expectEqual(18, out.items.len);
377+
try bt.flush();
378+
try testing.expectEqual(18, out.items.len);
379+
}

0 commit comments

Comments
 (0)