Skip to content

Commit 34b58f8

Browse files
committed
routing: now supporting multiple divergent route/proxies
Fixes #10 Using a frontender file of the format ```shell [/<route1>,<route2>,...] <proxy1> <proxy2> ... [/<otherRoute1>,/<otherRoute2>,...] <proxy3> <proxy4> ... ``` can now route to divergent proxies e.g ```shell [/foo/,/bar/] https://orijtech.com [/] http://localhost:8889 http://localhost:8899 ``` where routes are matched by longest prefix first so visiting http://localhost:8877/foo/favicon.ico will redirect to https://orijtech.com/favicon.ico whereas http://localhost:8877/other/mail will visit https://google.com/mail and /food will redirect to either of: http://localhost:8889/food or http://localhost:8899/food
1 parent 8d2d3f3 commit 34b58f8

File tree

2 files changed

+156
-62
lines changed

2 files changed

+156
-62
lines changed

cmd/frontender/main.go

+25-1
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,12 @@ package main
33
import (
44
"flag"
55
"log"
6+
"os"
67
"strings"
78
"time"
89

910
"github.com/orijtech/frontender"
11+
"github.com/orijtech/namespace"
1012
)
1113

1214
func main() {
@@ -17,6 +19,7 @@ func main() {
1719
var csvDomains string
1820
var noAutoWWW bool
1921
var nonHTTPSRedirectURL string
22+
var routeFile string
2023

2124
flag.StringVar(&csvBackendAddresses, "csv-backends", "", "the comma separated addresses of the backend servers")
2225
flag.StringVar(&csvDomains, "domains", "", "the comma separated domains that the frontend will be representing")
@@ -25,13 +28,33 @@ func main() {
2528
flag.StringVar(&nonHTTPSRedirectURL, "non-https-redirect", "", "the URL to which all non-HTTPS traffic will be redirected")
2629
flag.BoolVar(&noAutoWWW, "no-auto-www", false, "if set, explicits tells the frontend service NOT to make equivalent www CNAMEs of domains, if the www CNAMEs haven't yet been set")
2730
flag.StringVar(&backendPingPeriodStr, "backend-ping-period", "3m", `the period for which the frontend should ping the backend servers. Please enter this value with the form <DIGIT><UNIT> where <UNIT> could be "ns", "us" (or "µs"), "ms", "s", "m", "h"`)
31+
flag.StringVar(&routeFile, "route-file", "", "the file containing the routing")
2832
flag.Parse()
33+
f, err := os.Open(routeFile)
34+
if err != nil && false {
35+
log.Fatalf("route-file: %v\n", err)
36+
}
37+
if f != nil {
38+
defer f.Close()
39+
}
40+
41+
ns, err := namespace.ParseWithHeaderDelimiter(f, ",")
42+
if err != nil {
43+
log.Fatalf("namespace: %v", err)
44+
}
2945

3046
var pingPeriod time.Duration
3147
if t, err := time.ParseDuration(backendPingPeriodStr); err == nil {
3248
pingPeriod = t
3349
}
3450

51+
proxyAddresses := splitAndTrimAddresses(csvBackendAddresses)
52+
if len(ns) == 0 {
53+
for _, addr := range proxyAddresses {
54+
ns[namespace.GlobalNamespaceKey] = append(ns[namespace.GlobalNamespaceKey], addr)
55+
}
56+
}
57+
3558
fReq := &frontender.Request{
3659
HTTP1: http1,
3760
Domains: splitAndTrimAddresses(csvDomains),
@@ -40,8 +63,9 @@ func main() {
4063
NonHTTPSAddr: nonHTTPSAddr,
4164
NonHTTPSRedirectURL: nonHTTPSRedirectURL,
4265

43-
ProxyAddresses: splitAndTrimAddresses(csvBackendAddresses),
4466
BackendPingPeriod: pingPeriod,
67+
PrefixRouter: (map[string][]string)(ns),
68+
ProxyAddresses: proxyAddresses,
4569
}
4670

4771
confirmation, err := frontender.Listen(fReq)

frontender.go

+131-61
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"net/http"
2323
"net/http/httputil"
2424
"net/url"
25+
"sort"
2526
"strings"
2627
"sync"
2728
"time"
@@ -60,6 +61,16 @@ type Request struct {
6061
// between which the frontend service will check
6162
// for the liveliness of the backends.
6263
BackendPingPeriod time.Duration
64+
65+
// PrefixRouter if set helps route traffic depending on
66+
// the route prefix e.g
67+
// {
68+
// "/bar": ["http://localhost:7997", "http://localhost:8888"],
69+
// "/foo": ["http://localhost:8999", "http://localhost:8877"]
70+
// }
71+
// if it gets traffic with a URL prefix "/foo" will distribute traffic
72+
// between "http://localhost:8999" and "http://localhost:8877".
73+
PrefixRouter map[string][]string `json:"routing"`
6374
}
6475

6576
var (
@@ -73,7 +84,15 @@ func (req *Request) hasAtLeastOneProxy() bool {
7384
if req == nil {
7485
return false
7586
}
76-
return otils.FirstNonEmptyString(req.ProxyAddresses...) != ""
87+
if len(req.PrefixRouter) == 0 {
88+
return otils.FirstNonEmptyString(req.ProxyAddresses...) != ""
89+
}
90+
for _, proxyAddresses := range req.PrefixRouter {
91+
if otils.FirstNonEmptyString(proxyAddresses...) != "" {
92+
return true
93+
}
94+
}
95+
return false
7796
}
7897

7998
func (req *Request) Validate() error {
@@ -201,14 +220,16 @@ func Listen(req *Request) (*ListenConfirmation, error) {
201220
type livelyProxy struct {
202221
mu sync.Mutex
203222

204-
next int
223+
next map[string]int
205224

206225
cycleFreq time.Duration
207226

208-
primary *lively.Peer
209-
peersMap map[string]*lively.Peer
227+
primariesMap map[string]*lively.Peer
228+
secondariesMap map[string]map[string]*lively.Peer
229+
230+
longestPrefixFirst []string
210231

211-
liveAddresses []string
232+
liveAddresses map[string][]string
212233
}
213234

214235
const defaultCycleFrequence = time.Minute * 3
@@ -220,7 +241,7 @@ type cycleFeedback struct {
220241
livePeers, nonLivePeers []*lively.Liveliness
221242
}
222243

223-
func (lp *livelyProxy) run() chan *cycleFeedback {
244+
func (lp *livelyProxy) run() map[string]chan *cycleFeedback {
224245
lp.mu.Lock()
225246
freq := lp.cycleFreq
226247
lp.mu.Unlock()
@@ -229,103 +250,148 @@ func (lp *livelyProxy) run() chan *cycleFeedback {
229250
freq = defaultCycleFrequence
230251
}
231252

232-
feedbackChan := make(chan *cycleFeedback)
233-
go func() {
234-
defer close(feedbackChan)
235-
cycleNumber := uint64(0)
236-
237-
for {
238-
cycleNumber += 1
239-
livePeers, nonLivePeers, err := lp.cycle()
240-
feedbackChan <- &cycleFeedback{
241-
err: err,
242-
cycleNumber: cycleNumber,
243-
livePeers: livePeers,
244-
nonLivePeers: nonLivePeers,
253+
feedbackChanMap := make(map[string]chan *cycleFeedback)
254+
for route, primary := range lp.primariesMap {
255+
feedbackChan := make(chan *cycleFeedback)
256+
go func(route string, primary *lively.Peer, feedbackChan chan *cycleFeedback) {
257+
defer close(feedbackChan)
258+
cycleNumber := uint64(0)
259+
260+
for {
261+
cycleNumber += 1
262+
livePeers, nonLivePeers, err := lp.cycle(route, primary)
263+
feedbackChan <- &cycleFeedback{
264+
err: err,
265+
cycleNumber: cycleNumber,
266+
livePeers: livePeers,
267+
nonLivePeers: nonLivePeers,
268+
}
269+
<-time.After(freq)
245270
}
246-
<-time.After(freq)
247-
}
248-
}()
271+
}(route, primary, feedbackChan)
272+
}
249273

250-
return feedbackChan
274+
return feedbackChanMap
251275
}
252276

253277
func (lp *livelyProxy) ServeHTTP(w http.ResponseWriter, r *http.Request) {
254-
proxyAddr := lp.roundRobinedAddress()
278+
// Firstly we need to find a primary match
279+
var matchedRoute string
280+
// We need to match by longest prefix first
281+
// so that cases like
282+
// * "/"
283+
// * "/foo"
284+
// * "/fo"
285+
// given * "/foo"
286+
// will always match "/foo" instead of "/" or "/fo"
287+
// however in the absence of "/foo", "/fo" will match before "/"
288+
longestPrefixFirst := lp.longestPrefixFirst
289+
for _, routePrefix := range longestPrefixFirst {
290+
if strings.HasPrefix(r.URL.Path, routePrefix) {
291+
matchedRoute = routePrefix
292+
break
293+
}
294+
}
295+
296+
proxyAddr := lp.roundRobinedAddress(matchedRoute)
255297
// Now proxy the traffic to that request
256298
parsedURL, err := url.Parse(proxyAddr)
257299
if err != nil {
258300
http.Error(w, err.Error(), http.StatusInternalServerError)
259301
return
260302
}
261303

304+
r.URL.Path = strings.TrimPrefix(r.URL.Path, matchedRoute)
305+
if !strings.HasPrefix(r.URL.Path, "/") {
306+
r.URL.Path = "/" + r.URL.Path
307+
}
262308
rproxy := httputil.NewSingleHostReverseProxy(parsedURL)
263309
rproxy.ServeHTTP(w, r)
264310
}
265311

266-
func (lp *livelyProxy) roundRobinedAddress() string {
312+
func (lp *livelyProxy) roundRobinedAddress(route string) string {
267313
lp.mu.Lock()
268-
if len(lp.liveAddresses) == 0 {
314+
defer lp.mu.Unlock()
315+
316+
liveAddresses := lp.liveAddresses[route]
317+
if len(liveAddresses) == 0 {
269318
return ""
270319
}
271-
if lp.next >= len(lp.liveAddresses) {
272-
lp.next = 0
320+
if lp.next[route] >= len(liveAddresses) {
321+
lp.next[route] = 0
273322
}
274-
addr := lp.liveAddresses[lp.next]
323+
addr := liveAddresses[lp.next[route]]
275324
// Now increment it
276-
lp.next += 1
277-
lp.mu.Unlock()
325+
lp.next[route] += 1
278326

279327
return addr
280328
}
281329

282-
func (lp *livelyProxy) cycle() (livePeers, nonLivePeers []*lively.Liveliness, err error) {
283-
lp.mu.Lock()
284-
primary := lp.primary
285-
lp.mu.Unlock()
286-
330+
func (lp *livelyProxy) cycle(route string, primary *lively.Peer) (livePeers, nonLivePeers []*lively.Liveliness, err error) {
287331
livePeers, nonLivePeers, err = primary.Liveliness(&lively.LivelyRequest{})
288332

289333
lp.mu.Lock()
334+
defer lp.mu.Unlock()
335+
290336
var liveAddresses []string
291337
for _, peer := range livePeers {
292338
liveAddresses = append(liveAddresses, peer.Addr)
293339
}
294340

295-
// Now reset the next index and shuffle the liveAddresses.
296-
lp.next = 0
341+
// Now reset the next index.
342+
lp.next[route] = 0
343+
344+
// Shuffle the liveAddresses.
297345
perm := rand.Perm(len(liveAddresses))
298346
var shuffledAddresses []string
299347
for _, i := range perm {
300348
shuffledAddresses = append(shuffledAddresses, liveAddresses[i])
301349
}
302-
lp.liveAddresses = shuffledAddresses
303-
lp.mu.Unlock()
350+
lp.liveAddresses[route] = shuffledAddresses
304351

305352
return livePeers, nonLivePeers, err
306353
}
307354

308-
func makeLivelyProxy(cycleFreq time.Duration, addresses []string) *livelyProxy {
309-
primary := &lively.Peer{
310-
ID: uuid.NewRandom().String(),
311-
Primary: true,
312-
}
355+
func makeLivelyProxy(cycleFreq time.Duration, pr map[string][]string) *livelyProxy {
356+
secondariesMap := make(map[string]map[string]*lively.Peer)
357+
primariesMap := make(map[string]*lively.Peer)
358+
for prefix, addresses := range pr {
359+
primary := &lively.Peer{
360+
ID: uuid.NewRandom().String(),
361+
Primary: true,
362+
}
313363

314-
peersMap := make(map[string]*lively.Peer)
315-
for _, addr := range addresses {
316-
secondary := &lively.Peer{
317-
Addr: addr,
318-
ID: uuid.NewRandom().String(),
364+
peersMap := make(map[string]*lively.Peer)
365+
for _, addr := range addresses {
366+
secondary := &lively.Peer{
367+
Addr: addr,
368+
ID: uuid.NewRandom().String(),
369+
}
370+
_ = primary.AddPeer(secondary)
371+
peersMap[secondary.ID] = secondary
319372
}
320-
_ = primary.AddPeer(secondary)
321-
peersMap[secondary.ID] = secondary
373+
secondariesMap[prefix] = peersMap
374+
primariesMap[prefix] = primary
322375
}
323376

377+
routePrefixes := make([]string, 0, len(pr))
378+
for routePrefix := range pr {
379+
routePrefixes = append(routePrefixes, routePrefix)
380+
}
381+
382+
sort.Slice(routePrefixes, func(i, j int) bool {
383+
// Sort in reverse by length
384+
si, sj := routePrefixes[i], routePrefixes[j]
385+
return len(si) >= len(sj)
386+
})
324387
return &livelyProxy{
325-
primary: primary,
326-
peersMap: peersMap,
388+
longestPrefixFirst: routePrefixes,
389+
primariesMap: primariesMap,
390+
secondariesMap: secondariesMap,
391+
cycleFreq: cycleFreq,
327392

328-
cycleFreq: cycleFreq,
393+
next: make(map[string]int),
394+
liveAddresses: make(map[string][]string),
329395
}
330396
}
331397

@@ -351,13 +417,17 @@ func (req *Request) runAndCreateListener(listener net.Listener) (*ListenConfirma
351417

352418
// Per cycle of liveliness, figure out what is lively
353419
// what isn't
354-
lproxy := makeLivelyProxy(req.BackendPingPeriod, req.ProxyAddresses)
420+
lproxy := makeLivelyProxy(req.BackendPingPeriod, req.PrefixRouter)
355421
go func() {
356-
feedbackChan := lproxy.run()
357-
for feedback := range feedbackChan {
358-
if err := feedback.err; err != nil {
359-
errsChan <- err
360-
}
422+
feedbackChanMap := lproxy.run()
423+
for route, feedbackChan := range feedbackChanMap {
424+
go func(route string, feedbackChan chan *cycleFeedback) {
425+
for feedback := range feedbackChan {
426+
if err := feedback.err; err != nil {
427+
errsChan <- err
428+
}
429+
}
430+
}(route, feedbackChan)
361431
}
362432
}()
363433
errsChan <- http.Serve(listener, lproxy)

0 commit comments

Comments
 (0)