Skip to content

Commit b4c07b6

Browse files
committed
routing: Improve first route load signaling
Dataclients may load routes with arbitrary delays. When multiple dataclients are used, it's possible that one dataclient loads routes multiple times while another doesn't load any routes at all. This could incorrectly signal that the first route has been loaded, even though the routing table only contains routes from a single dataclient. This change fixes route update logic to signal first load only after routes from all configured dataclients are received at least once. An alternative to zalando#3447 Signed-off-by: Aleksandr Ponimaskin <[email protected]>
1 parent 7e2ca3b commit b4c07b6

File tree

2 files changed

+21
-21
lines changed

2 files changed

+21
-21
lines changed

Diff for: routing/datasource.go

+15-5
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,8 @@ func (d *incomingData) log(l logging.Logger, suppress bool) {
6565
// communication error occurs, it re-requests the whole valid set, and continues polling.
6666
// Currently, the routes with the same id coming from different sources are merged in an
6767
// undeterministic way, but this may change in the future.
68-
func receiveFromClient(c DataClient, o Options, out chan<- *incomingData, quit <-chan struct{}) {
68+
func receiveFromClient(c DataClient, o Options, out chan<- *incomingData, quit <-chan struct{}, firstLoad *sync.WaitGroup) {
69+
var once sync.Once
6970
initial := true
7071
var ticker *time.Ticker
7172
if o.PollTimeout != 0 {
@@ -96,6 +97,7 @@ func receiveFromClient(c DataClient, o Options, out chan<- *incomingData, quit <
9697
continue
9798
case initial || len(routes) > 0 || len(deletedIDs) > 0:
9899
var incoming *incomingData
100+
once.Do(firstLoad.Done)
99101
if initial {
100102
incoming = &incomingData{incomingReset, c, routes, nil}
101103
} else {
@@ -162,13 +164,21 @@ func mergeDefs(defsByClient map[DataClient]routeDefs) []*eskip.Route {
162164
//
163165
// The active set of routes from last successful update are used until the
164166
// next successful update.
165-
func receiveRouteDefs(o Options, quit <-chan struct{}) <-chan []*eskip.Route {
167+
func receiveRouteDefs(o Options, quit <-chan struct{}, firstLoad chan<- struct{}) <-chan []*eskip.Route {
166168
in := make(chan *incomingData)
167169
out := make(chan []*eskip.Route)
168170
defsByClient := make(map[DataClient]routeDefs)
169171

172+
wg := &sync.WaitGroup{}
173+
wg.Add(len(o.DataClients))
174+
170175
for _, c := range o.DataClients {
171-
go receiveFromClient(c, o, in, quit)
176+
go receiveFromClient(c, o, in, quit, wg)
177+
}
178+
179+
if o.SignalFirstLoad {
180+
wg.Wait()
181+
close(firstLoad)
172182
}
173183

174184
go func() {
@@ -543,8 +553,8 @@ func (rt *routeTable) close() {
543553

544554
// receives the next version of the routing table on the output channel,
545555
// when an update is received on one of the data clients.
546-
func receiveRouteMatcher(o Options, out chan<- *routeTable, quit <-chan struct{}) {
547-
updates := receiveRouteDefs(o, quit)
556+
func receiveRouteMatcher(o Options, out chan<- *routeTable, quit <-chan struct{}, firstLoad chan<- struct{}) {
557+
updates := receiveRouteDefs(o, quit, firstLoad)
548558
var (
549559
rt *routeTable
550560
outRelay chan<- *routeTable

Diff for: routing/routing.go

+6-16
Original file line numberDiff line numberDiff line change
@@ -244,12 +244,11 @@ type PreProcessor interface {
244244
// Routing ('router') instance providing live
245245
// updatable request matching.
246246
type Routing struct {
247-
routeTable atomic.Value // of struct routeTable
248-
log logging.Logger
249-
firstLoad chan struct{}
250-
firstLoadSignaled bool
251-
quit chan struct{}
252-
metrics metrics.Metrics
247+
routeTable atomic.Value // of struct routeTable
248+
log logging.Logger
249+
firstLoad chan struct{}
250+
quit chan struct{}
251+
metrics metrics.Metrics
253252
}
254253

255254
// New initializes a routing instance, and starts listening for route
@@ -263,7 +262,6 @@ func New(o Options) *Routing {
263262
r.metrics = o.Metrics
264263
if !o.SignalFirstLoad {
265264
close(r.firstLoad)
266-
r.firstLoadSignaled = true
267265
}
268266

269267
initialMatcher, _ := newMatcher(nil, MatchingOptionsNone)
@@ -338,21 +336,13 @@ func (r *Routing) ServeHTTP(w http.ResponseWriter, req *http.Request) {
338336
}
339337

340338
func (r *Routing) startReceivingUpdates(o Options) {
341-
dc := len(o.DataClients)
342339
c := make(chan *routeTable)
343-
go receiveRouteMatcher(o, c, r.quit)
340+
go receiveRouteMatcher(o, c, r.quit, r.firstLoad)
344341
go func() {
345342
for {
346343
select {
347344
case rt := <-c:
348345
r.routeTable.Store(rt)
349-
if !r.firstLoadSignaled {
350-
dc--
351-
if dc == 0 {
352-
close(r.firstLoad)
353-
r.firstLoadSignaled = true
354-
}
355-
}
356346
r.log.Infof("route settings applied, id: %d", rt.id)
357347
if r.metrics != nil { // existing codebases might not supply metrics instance
358348
r.metrics.UpdateGauge("routes.total", float64(len(rt.validRoutes)))

0 commit comments

Comments
 (0)