Skip to content

routing: fix first route load signaling #3447

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 19 additions & 6 deletions routing/datasource.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,13 @@ func applyIncoming(defs routeDefs, d *incomingData) routeDefs {
return defs
}

type mergedDefs struct {
routes []*eskip.Route
clients map[DataClient]struct{}
}

// merges the route definitions from multiple data clients by route id
func mergeDefs(defsByClient map[DataClient]routeDefs) []*eskip.Route {
func mergeDefs(defsByClient map[DataClient]routeDefs) mergedDefs {
mergeByID := make(routeDefs)
for _, defs := range defsByClient {
for id, def := range defs {
Expand All @@ -153,7 +158,12 @@ func mergeDefs(defsByClient map[DataClient]routeDefs) []*eskip.Route {
for _, def := range mergeByID {
all = append(all, def)
}
return all

clients := make(map[DataClient]struct{}, len(defsByClient))
for c := range defsByClient {
clients[c] = struct{}{}
}
return mergedDefs{routes: all, clients: clients}
}

// receives the initial set of the route definitiosn and their
Expand All @@ -162,9 +172,9 @@ func mergeDefs(defsByClient map[DataClient]routeDefs) []*eskip.Route {
//
// The active set of routes from last successful update are used until the
// next successful update.
func receiveRouteDefs(o Options, quit <-chan struct{}) <-chan []*eskip.Route {
func receiveRouteDefs(o Options, quit <-chan struct{}) <-chan mergedDefs {
in := make(chan *incomingData)
out := make(chan []*eskip.Route)
out := make(chan mergedDefs)
defsByClient := make(map[DataClient]routeDefs)

for _, c := range o.DataClients {
Expand Down Expand Up @@ -524,6 +534,7 @@ type routeTable struct {
routes []*Route // only used for closing
validRoutes []*eskip.Route
invalidRoutes []*eskip.Route
clients map[DataClient]struct{}
created time.Time
}

Expand All @@ -548,18 +559,19 @@ func receiveRouteMatcher(o Options, out chan<- *routeTable, quit <-chan struct{}
var (
rt *routeTable
outRelay chan<- *routeTable
updatesRelay <-chan []*eskip.Route
updatesRelay <-chan mergedDefs
updateId int
)
updatesRelay = updates
for {
select {
case defs := <-updatesRelay:
case mdefs := <-updatesRelay:
updateId++
start := time.Now()

o.Log.Infof("route settings received, id: %d", updateId)

defs := mdefs.routes
for i := range o.PreProcessors {
defs = o.PreProcessors[i].Do(defs)
}
Expand Down Expand Up @@ -599,6 +611,7 @@ func receiveRouteMatcher(o Options, out chan<- *routeTable, quit <-chan struct{}
routes: routes,
validRoutes: validRoutes,
invalidRoutes: invalidRoutes,
clients: mdefs.clients,
created: start,
}
updatesRelay = nil
Expand Down
10 changes: 9 additions & 1 deletion routing/export_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package routing

import "time"
import (
"time"

"github.com/zalando/skipper/eskip"
)

var (
ExportProcessRouteDef = processRouteDef
Expand All @@ -14,3 +18,7 @@ var (
func SetNow(r *EndpointRegistry, now func() time.Time) {
r.now = now
}

func (rl *RouteLookup) ValidRoutes() []*eskip.Route {
return rl.rt.validRoutes
}
10 changes: 4 additions & 6 deletions routing/routing.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,6 @@ func (r *Routing) ServeHTTP(w http.ResponseWriter, req *http.Request) {
}

func (r *Routing) startReceivingUpdates(o Options) {
dc := len(o.DataClients)
c := make(chan *routeTable)
go receiveRouteMatcher(o, c, r.quit)
go func() {
Expand All @@ -347,8 +346,7 @@ func (r *Routing) startReceivingUpdates(o Options) {
case rt := <-c:
r.routeTable.Store(rt)
if !r.firstLoadSignaled {
dc--
if dc == 0 {
if len(rt.clients) == len(o.DataClients) {
close(r.firstLoad)
r.firstLoadSignaled = true
}
Expand Down Expand Up @@ -398,20 +396,20 @@ func (r *Routing) FirstLoad() <-chan struct{} {
// against is found, the feature is experimental and its exported interface may
// change.
type RouteLookup struct {
matcher *matcher
rt *routeTable
}

// Do executes the lookup against the captured routing table. Equivalent to
// Routing.Route().
func (rl *RouteLookup) Do(req *http.Request) (*Route, map[string]string) {
return rl.matcher.match(req)
return rl.rt.m.match(req)
}

// Get returns a captured generation of the lookup table. This feature is
// experimental. See the description of the RouteLookup type.
func (r *Routing) Get() *RouteLookup {
rt := r.routeTable.Load().(*routeTable)
return &RouteLookup{matcher: rt.m}
return &RouteLookup{rt: rt}
}

// Close closes routing, routeTable and stops statemachine for receiving routes.
Expand Down
30 changes: 14 additions & 16 deletions routing/routing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -955,38 +955,36 @@ func TestSignalFirstLoad(t *testing.T) {
})

t.Run("multiple data clients", func(t *testing.T) {
dc1 := testdataclient.New([]*eskip.Route{{}})
const pollTimeout = 12 * time.Millisecond

dc1 := testdataclient.New([]*eskip.Route{{Id: "r1", Backend: "https://foo.example.org"}})
defer dc1.Close()

dc2 := testdataclient.New([]*eskip.Route{{}})
dc2 := testdataclient.New([]*eskip.Route{{Id: "r2", Backend: "https://bar.example.org"}})
defer dc2.Close()

// Schedule r1 update right away and delay r2 update
go func() {
dc1.Update([]*eskip.Route{{Id: "r1", Backend: "https://baz.example.org"}}, nil)
}()
dc2.WithLoadAllDelay(10 * pollTimeout)

l := loggingtest.New()
defer l.Close()

rt := routing.New(routing.Options{
SignalFirstLoad: true,
FilterRegistry: builtin.MakeRegistry(),
DataClients: []routing.DataClient{dc1, dc2},
PollTimeout: 12 * time.Millisecond,
PollTimeout: pollTimeout,
Log: l,
})
defer rt.Close()

select {
case <-rt.FirstLoad():
t.Error("the first load signal was not blocking")
default:
}
<-rt.FirstLoad()

if err := l.WaitForN("route settings applied", 2, 12*time.Millisecond); err != nil {
t.Error("failed to receive route settings", err)
}

select {
case <-rt.FirstLoad():
default:
t.Error("the first load signal was blocking")
if validRoutes := rt.Get().ValidRoutes(); len(validRoutes) != 2 {
t.Errorf("expected 2 valid routes, got: %v", validRoutes)
}
})
}
8 changes: 8 additions & 0 deletions routing/testdataclient/dataclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ package testdataclient

import (
"errors"
"time"

"github.com/zalando/skipper/eskip"
)
Expand All @@ -24,6 +25,7 @@ type Client struct {
upsert []*eskip.Route
deletedIDs []string
failNext int
loadAllDelay time.Duration
signalUpdate chan incomingUpdate
quit chan struct{}
}
Expand Down Expand Up @@ -61,6 +63,8 @@ func (c *Client) LoadAll() ([]*eskip.Route, error) {
return nil, errors.New("failed to get routes")
}

time.Sleep(c.loadAllDelay)

routes := make([]*eskip.Route, 0, len(c.routes))
for _, r := range c.routes {
routes = append(routes, r)
Expand Down Expand Up @@ -127,6 +131,10 @@ func (c *Client) FailNext() {
c.failNext++
}

func (c *Client) WithLoadAllDelay(d time.Duration) {
c.loadAllDelay = d
}

func (c *Client) Close() {
close(c.quit)
}