Skip to content

Commit 1309376

Browse files
committed
fix: handle duplicate "TooManyChanges" change insertion
1 parent a7358dd commit 1309376

File tree

2 files changed

+133
-42
lines changed

2 files changed

+133
-42
lines changed

db/changes.go

Lines changed: 114 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,8 @@ import (
55
"time"
66

77
sw "github.com/RussellLuo/slidingwindow"
8-
"github.com/google/uuid"
98

9+
"github.com/flanksource/commons/collections"
1010
"github.com/flanksource/config-db/api"
1111
"github.com/flanksource/config-db/db/models"
1212
"github.com/flanksource/config-db/pkg/ratelimit"
@@ -15,6 +15,8 @@ import (
1515
const (
1616
rateLimitWindow = time.Hour * 4
1717
maxChangesInWindow = 100
18+
19+
ChangeTypeTooManyChanges = "TooManyChanges"
1820
)
1921

2022
func GetWorkflowRunCount(ctx api.ScrapeContext, workflowID string) (int64, error) {
@@ -29,52 +31,44 @@ func GetWorkflowRunCount(ctx api.ScrapeContext, workflowID string) (int64, error
2931
var (
3032
scraperLocks = sync.Map{}
3133
configRateLimiters = map[string]*sw.Limiter{}
34+
35+
// List of configs that have been rate limited.
36+
// It's used to avoid inserting mutliple "TooManyChanges" changes for the same config.
37+
rateLimitedConfigsPerScraper = sync.Map{}
3238
)
3339

34-
func rateLimitChanges(ctx api.ScrapeContext, newChanges []*models.ConfigChange) ([]*models.ConfigChange, error) {
40+
func rateLimitChanges(ctx api.ScrapeContext, newChanges []*models.ConfigChange) ([]*models.ConfigChange, []string, error) {
3541
if len(newChanges) == 0 {
36-
return nil, nil
42+
return nil, nil, nil
3743
}
3844

39-
lock, loaded := scraperLocks.LoadOrStore(ctx.ScrapeConfig().GetPersistedID(), &sync.Mutex{})
45+
window := ctx.Properties().Duration("changes.max.window", rateLimitWindow)
46+
max := ctx.Properties().Int("changes.max.count", maxChangesInWindow)
47+
scraperID := ctx.ScrapeConfig().GetPersistedID().String()
48+
49+
lock, loaded := scraperLocks.LoadOrStore(scraperID, &sync.Mutex{})
4050
lock.(*sync.Mutex).Lock()
4151
defer lock.(*sync.Mutex).Unlock()
4252

43-
window := ctx.Properties().Duration("changes.max.window", rateLimitWindow)
44-
max := ctx.Properties().Int("changes.max.count", maxChangesInWindow)
53+
_rateLimitedConfigs, _ := rateLimitedConfigsPerScraper.LoadOrStore(scraperID, map[string]struct{}{})
54+
rateLimitedConfigs := _rateLimitedConfigs.(map[string]struct{})
4555

4656
if !loaded {
47-
// populate the rate limit window for the scraper
48-
query := `SELECT config_id, COUNT(*), min(created_at) FROM config_changes
49-
WHERE change_type != 'TooManyChanges'
50-
AND NOW() - created_at <= ? GROUP BY config_id`
51-
rows, err := ctx.DB().Raw(query, window).Rows()
52-
if err != nil {
53-
return nil, err
57+
// This is the initial sync of the rate limiter with the database.
58+
// Happens only once for each scrape config.
59+
if err := syncWindow(ctx, max, window); err != nil {
60+
return nil, nil, err
5461
}
5562

56-
for rows.Next() {
57-
var configID string
58-
var count int
59-
var earliest time.Time
60-
if err := rows.Scan(&configID, &count, &earliest); err != nil {
61-
return nil, err
62-
}
63-
64-
rateLimiter, _ := sw.NewLimiter(window, int64(max), func() (sw.Window, sw.StopFunc) {
65-
win, stopper := ratelimit.NewLocalWindow()
66-
if count > 0 {
67-
win.SetStart(earliest)
68-
win.AddCount(int64(count))
69-
}
70-
return win, stopper
71-
})
72-
configRateLimiters[configID] = rateLimiter
63+
if rlConfigs, err := syncCurrentlyRateLimitedConfigs(ctx, window); err != nil {
64+
return nil, nil, err
65+
} else {
66+
rateLimitedConfigs = rlConfigs
7367
}
7468
}
7569

70+
rateLimitedThisRun := map[string]struct{}{}
7671
passingNewChanges := make([]*models.ConfigChange, 0, len(newChanges))
77-
rateLimited := map[string]struct{}{}
7872
for _, change := range newChanges {
7973
rateLimiter, ok := configRateLimiters[change.ConfigID]
8074
if !ok {
@@ -86,23 +80,102 @@ func rateLimitChanges(ctx api.ScrapeContext, newChanges []*models.ConfigChange)
8680
}
8781

8882
if !rateLimiter.Allow() {
89-
ctx.Logger.V(2).Infof("change rate limited (config=%s)", change.ConfigID)
90-
rateLimited[change.ConfigID] = struct{}{}
83+
ctx.Logger.V(1).Infof("change rate limited (config=%s, external_id=%s, type=%s)", change.ConfigID, change.ExternalChangeId, change.ChangeType)
84+
rateLimitedThisRun[change.ConfigID] = struct{}{}
9185
continue
86+
} else {
87+
delete(rateLimitedConfigs, change.ConfigID)
9288
}
9389

9490
passingNewChanges = append(passingNewChanges, change)
9591
}
9692

97-
// For all the rate limited configs, we add a new "TooManyChanges" change
98-
for configID := range rateLimited {
99-
passingNewChanges = append(passingNewChanges, &models.ConfigChange{
100-
ConfigID: configID,
101-
Summary: "Changes on this config has been rate limited",
102-
ChangeType: "TooManyChanges",
103-
ExternalChangeId: uuid.New().String(),
93+
var newlyRateLimited []string
94+
for configID := range rateLimitedThisRun {
95+
if _, ok := rateLimitedConfigs[configID]; !ok {
96+
newlyRateLimited = append(newlyRateLimited, configID)
97+
}
98+
}
99+
100+
rateLimitedConfigs = collections.MergeMap(rateLimitedConfigs, rateLimitedThisRun)
101+
rateLimitedConfigsPerScraper.Store(scraperID, rateLimitedConfigs)
102+
103+
return passingNewChanges, newlyRateLimited, nil
104+
}
105+
106+
func syncCurrentlyRateLimitedConfigs(ctx api.ScrapeContext, window time.Duration) (map[string]struct{}, error) {
107+
query := `WITH latest_changes AS (
108+
SELECT
109+
DISTINCT ON (config_id) config_id,
110+
change_type
111+
FROM
112+
config_changes
113+
LEFT JOIN config_items ON config_items.id = config_changes.config_id
114+
WHERE
115+
scraper_id = ?
116+
AND NOW() - config_changes.created_at <= ?
117+
ORDER BY
118+
config_id,
119+
config_changes.created_at DESC
120+
) SELECT config_id FROM latest_changes WHERE change_type = ?`
121+
rows, err := ctx.DB().Raw(query, ctx.ScrapeConfig().GetPersistedID(), window, ChangeTypeTooManyChanges).Rows()
122+
if err != nil {
123+
return nil, err
124+
}
125+
126+
output := make(map[string]struct{})
127+
for rows.Next() {
128+
var id string
129+
if err := rows.Scan(&id); err != nil {
130+
return nil, err
131+
}
132+
133+
ctx.Logger.V(1).Infof("config %s is currently found to be rate limited", id)
134+
output[id] = struct{}{}
135+
}
136+
137+
return output, rows.Err()
138+
}
139+
140+
// syncWindow syncs the rate limit window for the scraper with the changes in the db.
141+
func syncWindow(ctx api.ScrapeContext, max int, window time.Duration) error {
142+
query := `SELECT
143+
config_id,
144+
COUNT(*),
145+
MIN(config_changes.created_at) AS min_created_at
146+
FROM
147+
config_changes
148+
LEFT JOIN config_items ON config_items.id = config_changes.config_id
149+
WHERE
150+
scraper_id = ?
151+
AND change_type != ?
152+
AND NOW() - config_changes.created_at <= ?
153+
GROUP BY
154+
config_id`
155+
rows, err := ctx.DB().Raw(query, ctx.ScrapeConfig().GetPersistedID(), ChangeTypeTooManyChanges, window).Rows()
156+
if err != nil {
157+
return err
158+
}
159+
160+
for rows.Next() {
161+
var configID string
162+
var count int
163+
var earliest time.Time
164+
if err := rows.Scan(&configID, &count, &earliest); err != nil {
165+
return err
166+
}
167+
ctx.Logger.V(3).Infof("config %s has %d changes in the last %s", configID, count, window)
168+
169+
rateLimiter, _ := sw.NewLimiter(window, int64(max), func() (sw.Window, sw.StopFunc) {
170+
win, stopper := ratelimit.NewLocalWindow()
171+
if count > 0 {
172+
win.SetStart(earliest)
173+
win.AddCount(int64(count))
174+
}
175+
return win, stopper
104176
})
177+
configRateLimiters[configID] = rateLimiter
105178
}
106179

107-
return passingNewChanges, nil
180+
return rows.Err()
108181
}

db/update.go

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -352,7 +352,7 @@ func SaveResults(ctx api.ScrapeContext, results []v1.ScrapeResult) error {
352352
}
353353
}
354354

355-
newChanges, err = rateLimitChanges(ctx, newChanges)
355+
newChanges, rateLimitedThisRun, err := rateLimitChanges(ctx, newChanges)
356356
if err != nil {
357357
return fmt.Errorf("failed to rate limit changes: %w", err)
358358
}
@@ -361,6 +361,24 @@ func SaveResults(ctx api.ScrapeContext, results []v1.ScrapeResult) error {
361361
return fmt.Errorf("failed to create config changes: %w", err)
362362
}
363363

364+
// For all the rate limited configs, we add a new "TooManyChanges" change.
365+
// This is intentionally inserted in a different batch from the new changes
366+
// as "ChangeTypeTooManyChanges" will have the same created_at timestamp.
367+
// We want these changes to be newer than the actual changes.
368+
var rateLimitedChanges []*models.ConfigChange
369+
for _, configID := range rateLimitedThisRun {
370+
rateLimitedChanges = append(rateLimitedChanges, &models.ConfigChange{
371+
ConfigID: configID,
372+
Summary: "Changes on this config has been rate limited",
373+
ChangeType: ChangeTypeTooManyChanges,
374+
ExternalChangeId: uuid.New().String(),
375+
})
376+
}
377+
378+
if err := ctx.DB().CreateInBatches(rateLimitedChanges, configItemsBulkInsertSize).Error; err != nil {
379+
return fmt.Errorf("failed to create rate limited config changes: %w", err)
380+
}
381+
364382
if len(changesToUpdate) != 0 {
365383
if err := ctx.DB().Save(changesToUpdate).Error; err != nil {
366384
return fmt.Errorf("failed to update config changes: %w", err)

0 commit comments

Comments
 (0)