-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathstream_test.go
103 lines (88 loc) · 2.2 KB
/
stream_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
package fusion_test
import (
"context"
"io"
"strings"
"sync/atomic"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/spy16/fusion"
)
func TestStreamFn_Out(t *testing.T) {
n := int64(2)
sf := fusion.StreamFn(func(ctx context.Context) (*fusion.Msg, error) {
if n <= 0 {
return nil, io.EOF
}
atomic.AddInt64(&n, -1)
return &fusion.Msg{}, nil
})
messages, err := sf.Out(context.Background())
require.NoError(t, err)
count := countStream(messages)
assert.Equal(t, 2, count)
}
func TestLineStream_Out(t *testing.T) {
t.Parallel()
t.Run("FromNotSet", func(t *testing.T) {
ls := &fusion.LineStream{}
messages, err := ls.Out(context.Background())
require.Error(t, err)
assert.Nil(t, messages)
})
t.Run("ClosesOnContext", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
cancel() // immediately cancel context
ls := &fusion.LineStream{From: strings.NewReader("msg1\nmsg2\nmsg3\n")}
messages, err := ls.Out(ctx)
require.NoError(t, err)
count := countStream(messages)
assert.True(t, count == 0 || count == 1)
})
t.Run("BeginningToEOF", func(t *testing.T) {
ls := &fusion.LineStream{From: strings.NewReader("msg1\nmsg2\nmsg3\n")}
messages, err := ls.Out(context.Background())
require.NoError(t, err)
count := countStream(messages)
assert.Equal(t, 3, count)
})
t.Run("FromOffset", func(t *testing.T) {
ls := &fusion.LineStream{
From: strings.NewReader("msg1\nmsg2\nmsg3\n"),
Offset: 1,
}
messages, err := ls.Out(context.Background())
require.NoError(t, err)
count := countStream(messages)
assert.Equal(t, 2, count)
})
t.Run("FromOffsetWithSize", func(t *testing.T) {
ls := &fusion.LineStream{
From: strings.NewReader("msg1\nmsg2\nmsg3\n"),
Offset: 1,
Size: 1,
}
messages, err := ls.Out(context.Background())
require.NoError(t, err)
count := countStream(messages)
assert.Equal(t, 1, count)
})
}
func countStream(s <-chan fusion.Msg) int {
upperBound := time.NewTimer(1 * time.Second)
defer upperBound.Stop()
count := 0
for {
select {
case <-upperBound.C:
return count
case _, open := <-s:
if !open {
return count
}
count++
}
}
}