-
Notifications
You must be signed in to change notification settings - Fork 329
/
Copy pathmanager.go
187 lines (171 loc) · 6.03 KB
/
manager.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
package processor
import (
"context"
"sync"
"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/logger"
"github.com/rudderlabs/rudder-go-kit/stats"
backendconfig "github.com/rudderlabs/rudder-server/backend-config"
"github.com/rudderlabs/rudder-server/enterprise/trackedusers"
"github.com/rudderlabs/rudder-server/internal/enricher"
"github.com/rudderlabs/rudder-server/jobsdb"
"github.com/rudderlabs/rudder-server/processor/transformer"
destinationdebugger "github.com/rudderlabs/rudder-server/services/debugger/destination"
transformationdebugger "github.com/rudderlabs/rudder-server/services/debugger/transformation"
"github.com/rudderlabs/rudder-server/services/fileuploader"
"github.com/rudderlabs/rudder-server/services/rmetrics"
"github.com/rudderlabs/rudder-server/services/rsources"
transformerFeaturesService "github.com/rudderlabs/rudder-server/services/transformer"
"github.com/rudderlabs/rudder-server/services/transientsource"
"github.com/rudderlabs/rudder-server/utils/types"
)
type LifecycleManager struct {
Handle *Handle
mainCtx context.Context
currentCancel context.CancelFunc
waitGroup interface{ Wait() }
gatewayDB *jobsdb.Handle
routerDB *jobsdb.Handle
batchRouterDB *jobsdb.Handle
readErrDB *jobsdb.Handle
writeErrDB *jobsdb.Handle
esDB *jobsdb.Handle
arcDB *jobsdb.Handle
clearDB *bool
ReportingI types.Reporting // need not initialize again
BackendConfig backendconfig.BackendConfig
TransformerClients *transformer.Clients
transientSources transientsource.Service
fileuploader fileuploader.Provider
rsourcesService rsources.JobService
transformerFeaturesService transformerFeaturesService.FeaturesService
destDebugger destinationdebugger.DestinationDebugger
transDebugger transformationdebugger.TransformationDebugger
enrichers []enricher.PipelineEnricher
trackedUsersReporter trackedusers.UsersReporter
pendingEventsRegistry rmetrics.PendingEventsRegistry
}
// Start starts a processor, this is not a blocking call.
// If the processor is not completely started and the data started coming then also it will not be problematic as we
// are assuming that the DBs will be up.
func (proc *LifecycleManager) Start() error {
if proc.TransformerClients != nil {
proc.Handle.transformerClients = proc.TransformerClients
}
if err := proc.Handle.Setup(
proc.BackendConfig,
proc.gatewayDB,
proc.routerDB,
proc.batchRouterDB,
proc.readErrDB,
proc.writeErrDB,
proc.esDB,
proc.arcDB,
proc.ReportingI,
proc.transientSources,
proc.fileuploader,
proc.rsourcesService,
proc.transformerFeaturesService,
proc.destDebugger,
proc.transDebugger,
proc.enrichers,
proc.trackedUsersReporter,
proc.pendingEventsRegistry,
); err != nil {
return err
}
currentCtx, cancel := context.WithCancel(context.Background())
proc.currentCancel = cancel
var wg sync.WaitGroup
proc.waitGroup = &wg
wg.Add(1)
go func() {
defer wg.Done()
if err := proc.Handle.countPendingEvents(currentCtx); err != nil {
proc.Handle.logger.Errorf("Error counting pending events: %v", err)
}
}()
wg.Add(1)
go func() {
defer wg.Done()
if err := proc.Handle.Start(currentCtx); err != nil {
proc.Handle.logger.Errorf("Error starting processor: %v", err)
}
}()
return nil
}
// Stop stops the processor, this is a blocking call.
func (proc *LifecycleManager) Stop() {
proc.currentCancel()
proc.waitGroup.Wait()
proc.Handle.Shutdown()
}
// New creates a new Processor instance
func New(
ctx context.Context,
clearDb *bool,
gwDb, rtDb, brtDb, errDbForRead, errDBForWrite, esDB, arcDB *jobsdb.Handle,
reporting types.Reporting,
transientSources transientsource.Service,
fileuploader fileuploader.Provider,
rsourcesService rsources.JobService,
transformerFeaturesService transformerFeaturesService.FeaturesService,
destDebugger destinationdebugger.DestinationDebugger,
transDebugger transformationdebugger.TransformationDebugger,
enrichers []enricher.PipelineEnricher,
trackedUsersReporter trackedusers.UsersReporter,
pendingEventsRegistry rmetrics.PendingEventsRegistry,
opts ...Opts,
) *LifecycleManager {
proc := &LifecycleManager{
Handle: NewHandle(
config.Default,
transformer.NewClients(
config.Default,
logger.NewLogger().Child("processor"),
stats.Default,
transformer.WithFeatureService(transformerFeaturesService),
),
),
mainCtx: ctx,
gatewayDB: gwDb,
routerDB: rtDb,
batchRouterDB: brtDb,
readErrDB: errDbForRead,
writeErrDB: errDBForWrite,
esDB: esDB,
arcDB: arcDB,
clearDB: clearDb,
BackendConfig: backendconfig.DefaultBackendConfig,
ReportingI: reporting,
transientSources: transientSources,
fileuploader: fileuploader,
rsourcesService: rsourcesService,
transformerFeaturesService: transformerFeaturesService,
destDebugger: destDebugger,
transDebugger: transDebugger,
enrichers: enrichers,
trackedUsersReporter: trackedUsersReporter,
pendingEventsRegistry: pendingEventsRegistry,
}
for _, opt := range opts {
opt(proc)
}
return proc
}
type Opts func(l *LifecycleManager)
func WithAdaptiveLimit(adaptiveLimitFunction func(int64) int64) Opts {
return func(l *LifecycleManager) {
l.Handle.adaptiveLimit = adaptiveLimitFunction
}
}
func WithStats(stats stats.Stats) Opts {
return func(l *LifecycleManager) {
l.Handle.statsFactory = stats
}
}
func WithTransformerClients(transformerClients transformer.TransformerClients) Opts {
return func(l *LifecycleManager) {
l.Handle.transformerClients = transformerClients
}
}