-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathfusion.go
109 lines (92 loc) · 2.27 KB
/
fusion.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
package fusion
import (
"context"
"errors"
"fmt"
"io"
"time"
)
// Runner represents a fusion streaming pipeline. A fusion instance has a
// stream and a proc for processing it. If the proc is not set, a no-op
// proc will be used.
type Runner struct {
// Proc to use for processing the messages.
Proc Proc
// Stream to read messages from.
Stream Stream
// DrainTime is the timeout for draining the messages from the stream
// channel when the Proc exits pre-maturely. If not set, channel will
// not be drained.
DrainTime time.Duration
// Log to be used by the Runner. If not set, a no-op value will be
// used.
Log Log
}
// Run spawns all the worker goroutines and blocks until all of them exit.
// Worker threads exit when context is cancelled or when source closes. It
// returns any error that was returned from the source.
func (fu Runner) Run(ctx context.Context) error {
if err := fu.init(); err != nil {
return err
}
ctx, cancel := context.WithCancel(withLog(ctx, fu.Log))
defer cancel()
streamCh, err := fu.Stream.Out(ctx)
if err != nil {
return err
} else if streamCh == nil {
return io.EOF
}
if err := fu.Proc.Run(ctx, streamCh); err != nil {
fu.Log(map[string]interface{}{
"level": "warn",
"message": fmt.Sprintf("proc exited with error: %v", err),
})
fu.drainAll(streamCh)
return err
}
return nil
}
func (fu *Runner) drainAll(ch <-chan Msg) {
if fu.DrainTime == 0 {
fu.Log(map[string]interface{}{
"level": "warn",
"message": "drain time is not set, not draining stream",
})
return
}
fu.Log(map[string]interface{}{
"level": "info",
"message": fmt.Sprintf("drain time is set, waiting for %s", fu.DrainTime),
})
for {
select {
case <-time.After(fu.DrainTime):
return
case msg, open := <-ch:
if !open {
return
}
msg.Ack(Retry)
}
}
}
func (fu *Runner) init() error {
if fu.Log == nil {
fu.Log = func(_ map[string]interface{}) {}
}
if fu.Stream == nil {
return errors.New("stream must not be nil")
}
if fu.Proc == nil {
fu.Log(map[string]interface{}{
"level": "warn",
"message": "proc is not set, using no-op",
})
fu.Proc = &Fn{}
}
return nil
}
// Log implementation provides structured logging facilities for fusion
// components.
type Log func(_ map[string]interface{})