From 4beede3cc4c87305e2dd74ec6afec133d5bfcd8c Mon Sep 17 00:00:00 2001 From: Aditya Thebe Date: Thu, 16 May 2024 10:44:01 +0545 Subject: [PATCH 1/5] refactor: add temp cache for config changes We don't want to create new config changes with a ON CONFLICT DO NOTHING clause because now we don't want the same config change to take up multiple quotas from the rate limiter. i.e. if an aws scraper is run @every 5m, we'll be trying to insert the same config changes generated by the cloudtrail scraper again & again on every run. The same change will take up one more quota from the rate limiter on every run. By knowing that the change already exist, we can avoid inserting that change in the first place and the rate limiter will be happy about it. --- api/cache.go | 34 ++++++++++++++++++++++++++++++++++ db/models/config_change.go | 2 -- db/update.go | 6 +++++- 3 files changed, 39 insertions(+), 3 deletions(-) diff --git a/api/cache.go b/api/cache.go index 311f89ed..a9204c9e 100644 --- a/api/cache.go +++ b/api/cache.go @@ -16,6 +16,8 @@ type TempCache struct { ctx context.Context items map[string]models.ConfigItem aliases map[string]string + + changes map[string]struct{} } func (t TempCache) FindExternal(ext v1.ExternalID) (*models.ConfigItem, error) { @@ -76,6 +78,38 @@ func (t TempCache) Insert(item models.ConfigItem) { t.items[strings.ToLower(item.ID)] = item } +func (t TempCache) IsChangePersisted(configID, externalChangeID string) (bool, error) { + if configID == "" || externalChangeID == "" { + return false, nil + } + + configID = strings.ToLower(configID) + externalChangeID = strings.ToLower(externalChangeID) + + if t.changes == nil { + t.changes = make(map[string]struct{}) + } + + if _, ok := t.changes[configID+externalChangeID]; ok { + return true, nil + } + + var result models.ConfigChange + if err := t.ctx.DB().Select("id").Where("config_id = ?", configID). + Where("external_change_id = ?", externalChangeID). + Limit(1). + Find(&result).Error; err != nil { + return false, err + } + + if result.ID != "" { + t.changes[configID+externalChangeID] = struct{}{} + return true, nil + } + + return false, nil +} + func (t TempCache) Get(id string) (*models.ConfigItem, error) { id = strings.ToLower(id) if id == "" { diff --git a/db/models/config_change.go b/db/models/config_change.go index 87aae48b..ae2534f6 100644 --- a/db/models/config_change.go +++ b/db/models/config_change.go @@ -7,7 +7,6 @@ import ( v1 "github.com/flanksource/config-db/api/v1" "github.com/google/uuid" "gorm.io/gorm" - "gorm.io/gorm/clause" ) // ConfigChange represents the config change database table @@ -66,6 +65,5 @@ func (c *ConfigChange) BeforeCreate(tx *gorm.DB) (err error) { c.ID = uuid.New().String() } - tx.Statement.AddClause(clause.OnConflict{DoNothing: true}) return } diff --git a/db/update.go b/db/update.go index ee8ca64e..e7f9a243 100644 --- a/db/update.go +++ b/db/update.go @@ -269,7 +269,11 @@ func extractChanges(ctx api.ScrapeContext, result *v1.ScrapeResult, ci *models.C if changeResult.UpdateExisting { updates = append(updates, change) } else { - newOnes = append(newOnes, change) + if ok, err := ctx.TempCache().IsChangePersisted(change.ConfigID, change.ExternalChangeId); err != nil { + return nil, nil, fmt.Errorf("failed to check if change is persisted: %w", err) + } else if !ok { + newOnes = append(newOnes, change) + } } } From 1a2be66369e09d7e83a59f14a03a8d46719d5ebb Mon Sep 17 00:00:00 2001 From: Aditya Thebe Date: Fri, 24 May 2024 11:28:11 +0545 Subject: [PATCH 2/5] feat: rate limits on changes per config --- db/changes.go | 98 +++++++++++++++++++++++++++++++++++++- db/update.go | 5 ++ go.mod | 2 + go.sum | 4 ++ pkg/ratelimit/ratelimit.go | 48 +++++++++++++++++++ 5 files changed, 156 insertions(+), 1 deletion(-) create mode 100644 pkg/ratelimit/ratelimit.go diff --git a/db/changes.go b/db/changes.go index 209aaea3..68a2a0f1 100644 --- a/db/changes.go +++ b/db/changes.go @@ -1,6 +1,21 @@ package db -import "github.com/flanksource/config-db/api" +import ( + "sync" + "time" + + sw "github.com/RussellLuo/slidingwindow" + "github.com/google/uuid" + + "github.com/flanksource/config-db/api" + "github.com/flanksource/config-db/db/models" + "github.com/flanksource/config-db/pkg/ratelimit" +) + +const ( + rateLimitWindow = time.Hour * 4 + maxChangesInWindow = 100 +) func GetWorkflowRunCount(ctx api.ScrapeContext, workflowID string) (int64, error) { var count int64 @@ -10,3 +25,84 @@ func GetWorkflowRunCount(ctx api.ScrapeContext, workflowID string) (int64, error Error return count, err } + +var ( + scraperLocks = sync.Map{} + configRateLimiters = map[string]*sw.Limiter{} +) + +func rateLimitChanges(ctx api.ScrapeContext, newChanges []*models.ConfigChange) ([]*models.ConfigChange, error) { + if len(newChanges) == 0 { + return nil, nil + } + + lock, loaded := scraperLocks.LoadOrStore(ctx.ScrapeConfig().GetPersistedID(), &sync.Mutex{}) + lock.(*sync.Mutex).Lock() + defer lock.(*sync.Mutex).Unlock() + + window := ctx.Properties().Duration("changes.max.window", rateLimitWindow) + max := ctx.Properties().Int("changes.max.count", maxChangesInWindow) + + if !loaded { + // populate the rate limit window for the scraper + query := `SELECT config_id, COUNT(*), min(created_at) FROM config_changes + WHERE change_type != 'TooManyChanges' + AND NOW() - created_at <= ? GROUP BY config_id` + rows, err := ctx.DB().Raw(query, window).Rows() + if err != nil { + return nil, err + } + + for rows.Next() { + var configID string + var count int + var earliest time.Time + if err := rows.Scan(&configID, &count, &earliest); err != nil { + return nil, err + } + + rateLimiter, _ := sw.NewLimiter(window, int64(max), func() (sw.Window, sw.StopFunc) { + win, stopper := ratelimit.NewLocalWindow() + if count > 0 { + win.SetStart(earliest) + win.AddCount(int64(count)) + } + return win, stopper + }) + configRateLimiters[configID] = rateLimiter + } + } + + passingNewChanges := make([]*models.ConfigChange, 0, len(newChanges)) + rateLimited := map[string]struct{}{} + for _, change := range newChanges { + rateLimiter, ok := configRateLimiters[change.ConfigID] + if !ok { + rl, _ := sw.NewLimiter(window, int64(max), func() (sw.Window, sw.StopFunc) { + return sw.NewLocalWindow() + }) + configRateLimiters[change.ConfigID] = rl + rateLimiter = rl + } + + if !rateLimiter.Allow() { + ctx.Logger.V(2).Infof("change rate limited (config=%s)", change.ConfigID) + rateLimited[change.ConfigID] = struct{}{} + continue + } + + passingNewChanges = append(passingNewChanges, change) + } + + // For all the rate limited configs, we add a new "TooManyChanges" change + for configID := range rateLimited { + passingNewChanges = append(passingNewChanges, &models.ConfigChange{ + ConfigID: configID, + Summary: "Changes on this config has been rate limited", + ChangeType: "TooManyChanges", + ExternalChangeId: uuid.New().String(), + }) + } + + return passingNewChanges, nil +} diff --git a/db/update.go b/db/update.go index e7f9a243..4493db83 100644 --- a/db/update.go +++ b/db/update.go @@ -360,6 +360,11 @@ func SaveResults(ctx api.ScrapeContext, results []v1.ScrapeResult) error { return fmt.Errorf("failed to update last scraped time: %w", err) } + newChanges, err = rateLimitChanges(ctx, newChanges) + if err != nil { + return fmt.Errorf("failed to rate limit changes: %w", err) + } + if err := ctx.DB().CreateInBatches(newChanges, configItemsBulkInsertSize).Error; err != nil { return fmt.Errorf("failed to create config changes: %w", err) } diff --git a/go.mod b/go.mod index 0471ced3..8403a33e 100644 --- a/go.mod +++ b/go.mod @@ -19,6 +19,7 @@ require ( github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/subscription/armsubscription v1.1.0 github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/trafficmanager/armtrafficmanager v1.0.0 github.com/Jeffail/gabs/v2 v2.7.0 + github.com/RussellLuo/slidingwindow v0.0.0-20200528002341-535bb99d338b github.com/aws/aws-sdk-go-v2 v1.18.0 github.com/aws/aws-sdk-go-v2/config v1.18.25 github.com/aws/aws-sdk-go-v2/credentials v1.13.24 @@ -117,6 +118,7 @@ require ( github.com/ghodss/yaml v1.0.0 // indirect github.com/go-logr/stdr v1.2.2 // indirect github.com/go-openapi/inflect v0.19.0 // indirect + github.com/go-redis/redis v6.15.9+incompatible // indirect github.com/go-task/slim-sprig/v3 v3.0.0 // indirect github.com/golang-jwt/jwt/v5 v5.0.0 // indirect github.com/golang-sql/civil v0.0.0-20220223132316-b832511892a9 // indirect diff --git a/go.sum b/go.sum index c8ae9bc9..2452d2e3 100644 --- a/go.sum +++ b/go.sum @@ -672,6 +672,8 @@ github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578/go.mod h1:uGdko github.com/RaveNoX/go-jsoncommentstrip v1.0.0/go.mod h1:78ihd09MekBnJnxpICcwzCMzGrKSKYe4AqU6PDYYpjk= github.com/RaveNoX/go-jsonmerge v1.0.0 h1:2e0nqnadoGUP8rAvcA0hkQelZreVO5X3BHomT2XMrAk= github.com/RaveNoX/go-jsonmerge v1.0.0/go.mod h1:qYM/NA77LhO4h51JJM7Z+xBU3ovqrNIACZe+SkSNVFo= +github.com/RussellLuo/slidingwindow v0.0.0-20200528002341-535bb99d338b h1:5/++qT1/z812ZqBvqQt6ToRswSuPZ/B33m6xVHRzADU= +github.com/RussellLuo/slidingwindow v0.0.0-20200528002341-535bb99d338b/go.mod h1:4+EPqMRApwwE/6yo6CxiHoSnBzjRr3jsqer7frxP8y4= github.com/TomOnTime/utfutil v0.0.0-20210710122150-437f72b26edf h1:+GdVyvpzTy3UFAS1+hbTqm9Mk0U1Xrocm28s/E2GWz0= github.com/TomOnTime/utfutil v0.0.0-20210710122150-437f72b26edf/go.mod h1:FiuynIwe98RFhWI8nZ0dnsldPVsBy9rHH1hn2WYwme4= github.com/WinterYukky/gorm-extra-clause-plugin v0.2.0 h1:s1jobT8PlSyG/FXczfoGSt4r46iPiT4ZShe35k5/2y4= @@ -922,6 +924,8 @@ github.com/go-openapi/swag v0.22.4 h1:QLMzNJnMGPRNDCbySlcj1x01tzU8/9LTTL9hZZZogB github.com/go-openapi/swag v0.22.4/go.mod h1:UzaqsxGiab7freDnrUUra0MwWfN/q7tE4j+VcZ0yl14= github.com/go-pdf/fpdf v0.5.0/go.mod h1:HzcnA+A23uwogo0tp9yU+l3V+KXhiESpt1PMayhOh5M= github.com/go-pdf/fpdf v0.6.0/go.mod h1:HzcnA+A23uwogo0tp9yU+l3V+KXhiESpt1PMayhOh5M= +github.com/go-redis/redis v6.15.9+incompatible h1:K0pv1D7EQUjfyoMql+r/jZqCLizCGKFlFgcHWWmHQjg= +github.com/go-redis/redis v6.15.9+incompatible/go.mod h1:NAIEuMOZ/fxfXJIrKDQDz8wamY7mA7PouImQ2Jvg6kA= github.com/go-resty/resty/v2 v2.7.0 h1:me+K9p3uhSmXtrBZ4k9jcEAfJmuC8IivWHwaLZwPrFY= github.com/go-resty/resty/v2 v2.7.0/go.mod h1:9PWDzw47qPphMRFfhsyk0NnSgvluHcljSMVIq3w7q0I= github.com/go-sql-driver/mysql v1.7.1 h1:lUIinVbN1DY0xBg0eMOzmmtGoHwWBbvnWubQUrtU8EI= diff --git a/pkg/ratelimit/ratelimit.go b/pkg/ratelimit/ratelimit.go new file mode 100644 index 00000000..fa106bdf --- /dev/null +++ b/pkg/ratelimit/ratelimit.go @@ -0,0 +1,48 @@ +package ratelimit + +import ( + "time" + + sw "github.com/RussellLuo/slidingwindow" +) + +// LocalWindow represents a window that ignores sync behavior entirely +// and only stores counters in memory. +// +// NOTE: It's an exact copy of the LocalWindow provided by RussellLuo/slidingwindow +// with an added capability of setting a custom start time. +type LocalWindow struct { + // The start boundary (timestamp in nanoseconds) of the window. + // [start, start + size) + start int64 + + // The total count of events happened in the window. + count int64 +} + +func NewLocalWindow() (*LocalWindow, sw.StopFunc) { + return &LocalWindow{}, func() {} +} + +func (w *LocalWindow) SetStart(s time.Time) { + w.start = s.UnixNano() +} + +func (w *LocalWindow) Start() time.Time { + return time.Unix(0, w.start) +} + +func (w *LocalWindow) Count() int64 { + return w.count +} + +func (w *LocalWindow) AddCount(n int64) { + w.count += n +} + +func (w *LocalWindow) Reset(s time.Time, c int64) { + w.start = s.UnixNano() + w.count = c +} + +func (w *LocalWindow) Sync(now time.Time) {} From d9614923914169676cb267a3276fa0f5bffa2d75 Mon Sep 17 00:00:00 2001 From: Aditya Thebe Date: Fri, 24 May 2024 11:28:52 +0545 Subject: [PATCH 3/5] fix: handle duplicate "TooManyChanges" change insertion --- db/changes.go | 157 ++++++++++++++++++++++++++++++++++++-------------- db/update.go | 20 ++++++- 2 files changed, 134 insertions(+), 43 deletions(-) diff --git a/db/changes.go b/db/changes.go index 68a2a0f1..d097d9d2 100644 --- a/db/changes.go +++ b/db/changes.go @@ -5,8 +5,8 @@ import ( "time" sw "github.com/RussellLuo/slidingwindow" - "github.com/google/uuid" + "github.com/flanksource/commons/collections" "github.com/flanksource/config-db/api" "github.com/flanksource/config-db/db/models" "github.com/flanksource/config-db/pkg/ratelimit" @@ -15,6 +15,8 @@ import ( const ( rateLimitWindow = time.Hour * 4 maxChangesInWindow = 100 + + ChangeTypeTooManyChanges = "TooManyChanges" ) func GetWorkflowRunCount(ctx api.ScrapeContext, workflowID string) (int64, error) { @@ -29,52 +31,44 @@ func GetWorkflowRunCount(ctx api.ScrapeContext, workflowID string) (int64, error var ( scraperLocks = sync.Map{} configRateLimiters = map[string]*sw.Limiter{} + + // List of configs that have been rate limited. + // It's used to avoid inserting mutliple "TooManyChanges" changes for the same config. + rateLimitedConfigsPerScraper = sync.Map{} ) -func rateLimitChanges(ctx api.ScrapeContext, newChanges []*models.ConfigChange) ([]*models.ConfigChange, error) { - if len(newChanges) == 0 { - return nil, nil +func rateLimitChanges(ctx api.ScrapeContext, newChanges []*models.ConfigChange) ([]*models.ConfigChange, []string, error) { + if len(newChanges) == 0 || ctx.ScrapeConfig().GetPersistedID() == nil { + return newChanges, nil, nil } - lock, loaded := scraperLocks.LoadOrStore(ctx.ScrapeConfig().GetPersistedID(), &sync.Mutex{}) + window := ctx.Properties().Duration("changes.max.window", rateLimitWindow) + max := ctx.Properties().Int("changes.max.count", maxChangesInWindow) + scraperID := ctx.ScrapeConfig().GetPersistedID().String() + + lock, loaded := scraperLocks.LoadOrStore(scraperID, &sync.Mutex{}) lock.(*sync.Mutex).Lock() defer lock.(*sync.Mutex).Unlock() - window := ctx.Properties().Duration("changes.max.window", rateLimitWindow) - max := ctx.Properties().Int("changes.max.count", maxChangesInWindow) + _rateLimitedConfigs, _ := rateLimitedConfigsPerScraper.LoadOrStore(scraperID, map[string]struct{}{}) + rateLimitedConfigs := _rateLimitedConfigs.(map[string]struct{}) if !loaded { - // populate the rate limit window for the scraper - query := `SELECT config_id, COUNT(*), min(created_at) FROM config_changes - WHERE change_type != 'TooManyChanges' - AND NOW() - created_at <= ? GROUP BY config_id` - rows, err := ctx.DB().Raw(query, window).Rows() - if err != nil { - return nil, err + // This is the initial sync of the rate limiter with the database. + // Happens only once for each scrape config. + if err := syncWindow(ctx, max, window); err != nil { + return nil, nil, err } - for rows.Next() { - var configID string - var count int - var earliest time.Time - if err := rows.Scan(&configID, &count, &earliest); err != nil { - return nil, err - } - - rateLimiter, _ := sw.NewLimiter(window, int64(max), func() (sw.Window, sw.StopFunc) { - win, stopper := ratelimit.NewLocalWindow() - if count > 0 { - win.SetStart(earliest) - win.AddCount(int64(count)) - } - return win, stopper - }) - configRateLimiters[configID] = rateLimiter + if rlConfigs, err := syncCurrentlyRateLimitedConfigs(ctx, window); err != nil { + return nil, nil, err + } else { + rateLimitedConfigs = rlConfigs } } + rateLimitedThisRun := map[string]struct{}{} passingNewChanges := make([]*models.ConfigChange, 0, len(newChanges)) - rateLimited := map[string]struct{}{} for _, change := range newChanges { rateLimiter, ok := configRateLimiters[change.ConfigID] if !ok { @@ -86,23 +80,102 @@ func rateLimitChanges(ctx api.ScrapeContext, newChanges []*models.ConfigChange) } if !rateLimiter.Allow() { - ctx.Logger.V(2).Infof("change rate limited (config=%s)", change.ConfigID) - rateLimited[change.ConfigID] = struct{}{} + ctx.Logger.V(1).Infof("change rate limited (config=%s, external_id=%s, type=%s)", change.ConfigID, change.ExternalChangeId, change.ChangeType) + rateLimitedThisRun[change.ConfigID] = struct{}{} continue + } else { + delete(rateLimitedConfigs, change.ConfigID) } passingNewChanges = append(passingNewChanges, change) } - // For all the rate limited configs, we add a new "TooManyChanges" change - for configID := range rateLimited { - passingNewChanges = append(passingNewChanges, &models.ConfigChange{ - ConfigID: configID, - Summary: "Changes on this config has been rate limited", - ChangeType: "TooManyChanges", - ExternalChangeId: uuid.New().String(), + var newlyRateLimited []string + for configID := range rateLimitedThisRun { + if _, ok := rateLimitedConfigs[configID]; !ok { + newlyRateLimited = append(newlyRateLimited, configID) + } + } + + rateLimitedConfigs = collections.MergeMap(rateLimitedConfigs, rateLimitedThisRun) + rateLimitedConfigsPerScraper.Store(scraperID, rateLimitedConfigs) + + return passingNewChanges, newlyRateLimited, nil +} + +func syncCurrentlyRateLimitedConfigs(ctx api.ScrapeContext, window time.Duration) (map[string]struct{}, error) { + query := `WITH latest_changes AS ( + SELECT + DISTINCT ON (config_id) config_id, + change_type + FROM + config_changes + LEFT JOIN config_items ON config_items.id = config_changes.config_id + WHERE + scraper_id = ? + AND NOW() - config_changes.created_at <= ? + ORDER BY + config_id, + config_changes.created_at DESC + ) SELECT config_id FROM latest_changes WHERE change_type = ?` + rows, err := ctx.DB().Raw(query, ctx.ScrapeConfig().GetPersistedID(), window, ChangeTypeTooManyChanges).Rows() + if err != nil { + return nil, err + } + + output := make(map[string]struct{}) + for rows.Next() { + var id string + if err := rows.Scan(&id); err != nil { + return nil, err + } + + ctx.Logger.V(1).Infof("config %s is currently found to be rate limited", id) + output[id] = struct{}{} + } + + return output, rows.Err() +} + +// syncWindow syncs the rate limit window for the scraper with the changes in the db. +func syncWindow(ctx api.ScrapeContext, max int, window time.Duration) error { + query := `SELECT + config_id, + COUNT(*), + MIN(config_changes.created_at) AS min_created_at + FROM + config_changes + LEFT JOIN config_items ON config_items.id = config_changes.config_id + WHERE + scraper_id = ? + AND change_type != ? + AND NOW() - config_changes.created_at <= ? + GROUP BY + config_id` + rows, err := ctx.DB().Raw(query, ctx.ScrapeConfig().GetPersistedID(), ChangeTypeTooManyChanges, window).Rows() + if err != nil { + return err + } + + for rows.Next() { + var configID string + var count int + var earliest time.Time + if err := rows.Scan(&configID, &count, &earliest); err != nil { + return err + } + ctx.Logger.V(3).Infof("config %s has %d changes in the last %s", configID, count, window) + + rateLimiter, _ := sw.NewLimiter(window, int64(max), func() (sw.Window, sw.StopFunc) { + win, stopper := ratelimit.NewLocalWindow() + if count > 0 { + win.SetStart(earliest) + win.AddCount(int64(count)) + } + return win, stopper }) + configRateLimiters[configID] = rateLimiter } - return passingNewChanges, nil + return rows.Err() } diff --git a/db/update.go b/db/update.go index 4493db83..bb9c6565 100644 --- a/db/update.go +++ b/db/update.go @@ -360,7 +360,7 @@ func SaveResults(ctx api.ScrapeContext, results []v1.ScrapeResult) error { return fmt.Errorf("failed to update last scraped time: %w", err) } - newChanges, err = rateLimitChanges(ctx, newChanges) + newChanges, rateLimitedThisRun, err := rateLimitChanges(ctx, newChanges) if err != nil { return fmt.Errorf("failed to rate limit changes: %w", err) } @@ -369,6 +369,24 @@ func SaveResults(ctx api.ScrapeContext, results []v1.ScrapeResult) error { return fmt.Errorf("failed to create config changes: %w", err) } + // For all the rate limited configs, we add a new "TooManyChanges" change. + // This is intentionally inserted in a different batch from the new changes + // as "ChangeTypeTooManyChanges" will have the same created_at timestamp. + // We want these changes to be newer than the actual changes. + var rateLimitedChanges []*models.ConfigChange + for _, configID := range rateLimitedThisRun { + rateLimitedChanges = append(rateLimitedChanges, &models.ConfigChange{ + ConfigID: configID, + Summary: "Changes on this config has been rate limited", + ChangeType: ChangeTypeTooManyChanges, + ExternalChangeId: uuid.New().String(), + }) + } + + if err := ctx.DB().CreateInBatches(rateLimitedChanges, configItemsBulkInsertSize).Error; err != nil { + return fmt.Errorf("failed to create rate limited config changes: %w", err) + } + if len(changesToUpdate) != 0 { if err := ctx.DB().Save(changesToUpdate).Error; err != nil { return fmt.Errorf("failed to update config changes: %w", err) From c9c63f5172f268d0700991d4e93844f933af72b2 Mon Sep 17 00:00:00 2001 From: Aditya Thebe Date: Fri, 17 May 2024 11:18:53 +0545 Subject: [PATCH 4/5] improve config_changes lookup for existing ones --- api/cache.go | 34 ------------------------------ db/changes.go | 57 +++++++++++++++++++++++++++++++++++++++++++++++++++ db/update.go | 20 ++++++++++-------- 3 files changed, 68 insertions(+), 43 deletions(-) diff --git a/api/cache.go b/api/cache.go index a9204c9e..311f89ed 100644 --- a/api/cache.go +++ b/api/cache.go @@ -16,8 +16,6 @@ type TempCache struct { ctx context.Context items map[string]models.ConfigItem aliases map[string]string - - changes map[string]struct{} } func (t TempCache) FindExternal(ext v1.ExternalID) (*models.ConfigItem, error) { @@ -78,38 +76,6 @@ func (t TempCache) Insert(item models.ConfigItem) { t.items[strings.ToLower(item.ID)] = item } -func (t TempCache) IsChangePersisted(configID, externalChangeID string) (bool, error) { - if configID == "" || externalChangeID == "" { - return false, nil - } - - configID = strings.ToLower(configID) - externalChangeID = strings.ToLower(externalChangeID) - - if t.changes == nil { - t.changes = make(map[string]struct{}) - } - - if _, ok := t.changes[configID+externalChangeID]; ok { - return true, nil - } - - var result models.ConfigChange - if err := t.ctx.DB().Select("id").Where("config_id = ?", configID). - Where("external_change_id = ?", externalChangeID). - Limit(1). - Find(&result).Error; err != nil { - return false, err - } - - if result.ID != "" { - t.changes[configID+externalChangeID] = struct{}{} - return true, nil - } - - return false, nil -} - func (t TempCache) Get(id string) (*models.ConfigItem, error) { id = strings.ToLower(id) if id == "" { diff --git a/db/changes.go b/db/changes.go index d097d9d2..c4a954cf 100644 --- a/db/changes.go +++ b/db/changes.go @@ -5,6 +5,8 @@ import ( "time" sw "github.com/RussellLuo/slidingwindow" + "github.com/patrickmn/go-cache" + "github.com/samber/lo" "github.com/flanksource/commons/collections" "github.com/flanksource/config-db/api" @@ -19,6 +21,8 @@ const ( ChangeTypeTooManyChanges = "TooManyChanges" ) +var configChangesCache = cache.New(time.Hour*24, time.Hour*24) + func GetWorkflowRunCount(ctx api.ScrapeContext, workflowID string) (int64, error) { var count int64 err := ctx.DB().Table("config_changes"). @@ -122,6 +126,7 @@ func syncCurrentlyRateLimitedConfigs(ctx api.ScrapeContext, window time.Duration if err != nil { return nil, err } + defer rows.Close() output := make(map[string]struct{}) for rows.Next() { @@ -156,6 +161,7 @@ func syncWindow(ctx api.ScrapeContext, max int, window time.Duration) error { if err != nil { return err } + defer rows.Close() for rows.Next() { var configID string @@ -179,3 +185,54 @@ func syncWindow(ctx api.ScrapeContext, max int, window time.Duration) error { return rows.Err() } + +// filterOutPersistedChanges returns only those changes that weren't seen in the db. +func filterOutPersistedChanges(ctx api.ScrapeContext, changes []*models.ConfigChange) ([]*models.ConfigChange, error) { + // use cache to filter out ones that we've already seen before + changes = lo.Filter(changes, func(c *models.ConfigChange, _ int) bool { + _, found := configChangesCache.Get(c.ConfigID + c.ExternalChangeId) + if found { + _ = found + } + return !found + }) + + if len(changes) == 0 { + return nil, nil + } + + query := `SELECT config_id, external_change_id + FROM config_changes + WHERE (config_id, external_change_id) IN ?` + args := lo.Map(changes, func(c *models.ConfigChange, _ int) []string { + return []string{c.ConfigID, c.ExternalChangeId} + }) + + rows, err := ctx.DB().Raw(query, args).Rows() + if err != nil { + return nil, err + } + defer rows.Close() + + existing := make(map[string]struct{}) + for rows.Next() { + var configID, externalChangeID string + if err := rows.Scan(&configID, &externalChangeID); err != nil { + return nil, err + } + + configChangesCache.SetDefault(configID+externalChangeID, struct{}{}) + existing[configID+externalChangeID] = struct{}{} + } + + newOnes := lo.Filter(changes, func(c *models.ConfigChange, _ int) bool { + _, found := existing[c.ConfigID+c.ExternalChangeId] + return !found + }) + + if len(newOnes) > 0 { + _ = query + } + + return newOnes, nil +} diff --git a/db/update.go b/db/update.go index bb9c6565..d3de2695 100644 --- a/db/update.go +++ b/db/update.go @@ -209,8 +209,8 @@ func shouldExcludeChange(result *v1.ScrapeResult, changeResult v1.ChangeResult) func extractChanges(ctx api.ScrapeContext, result *v1.ScrapeResult, ci *models.ConfigItem) ([]*models.ConfigChange, []*models.ConfigChange, error) { var ( - newOnes = []*models.ConfigChange{} - updates = []*models.ConfigChange{} + toInsert = []*models.ConfigChange{} + toUpdate = []*models.ConfigChange{} ) changes.ProcessRules(result, result.BaseScraper.Transform.Change.Mapping...) @@ -267,17 +267,19 @@ func extractChanges(ctx api.ScrapeContext, result *v1.ScrapeResult, ci *models.C } if changeResult.UpdateExisting { - updates = append(updates, change) + toUpdate = append(toUpdate, change) } else { - if ok, err := ctx.TempCache().IsChangePersisted(change.ConfigID, change.ExternalChangeId); err != nil { - return nil, nil, fmt.Errorf("failed to check if change is persisted: %w", err) - } else if !ok { - newOnes = append(newOnes, change) - } + toInsert = append(toInsert, change) } } - return newOnes, updates, nil + // Remove the changes that have already been inserted. + newOnes, err := filterOutPersistedChanges(ctx, toInsert) + if err != nil { + return nil, nil, err + } + + return newOnes, toUpdate, nil } func upsertAnalysis(ctx api.ScrapeContext, result *v1.ScrapeResult) error { From 5f43ea748591173dccdde6e848594881bc953bf6 Mon Sep 17 00:00:00 2001 From: Aditya Thebe Date: Sun, 26 May 2024 22:54:08 +0545 Subject: [PATCH 5/5] feat: post insertion rate limitation --- db/changes.go | 228 +------------------------------------ db/changes_rate_limit.go | 162 ++++++++++++++++++++++++++ db/models/config_change.go | 11 +- db/update.go | 87 +++++++++++--- go.mod | 3 +- go.sum | 6 +- pkg/ratelimit/ratelimit.go | 48 -------- 7 files changed, 242 insertions(+), 303 deletions(-) create mode 100644 db/changes_rate_limit.go delete mode 100644 pkg/ratelimit/ratelimit.go diff --git a/db/changes.go b/db/changes.go index c4a954cf..209aaea3 100644 --- a/db/changes.go +++ b/db/changes.go @@ -1,27 +1,6 @@ package db -import ( - "sync" - "time" - - sw "github.com/RussellLuo/slidingwindow" - "github.com/patrickmn/go-cache" - "github.com/samber/lo" - - "github.com/flanksource/commons/collections" - "github.com/flanksource/config-db/api" - "github.com/flanksource/config-db/db/models" - "github.com/flanksource/config-db/pkg/ratelimit" -) - -const ( - rateLimitWindow = time.Hour * 4 - maxChangesInWindow = 100 - - ChangeTypeTooManyChanges = "TooManyChanges" -) - -var configChangesCache = cache.New(time.Hour*24, time.Hour*24) +import "github.com/flanksource/config-db/api" func GetWorkflowRunCount(ctx api.ScrapeContext, workflowID string) (int64, error) { var count int64 @@ -31,208 +10,3 @@ func GetWorkflowRunCount(ctx api.ScrapeContext, workflowID string) (int64, error Error return count, err } - -var ( - scraperLocks = sync.Map{} - configRateLimiters = map[string]*sw.Limiter{} - - // List of configs that have been rate limited. - // It's used to avoid inserting mutliple "TooManyChanges" changes for the same config. - rateLimitedConfigsPerScraper = sync.Map{} -) - -func rateLimitChanges(ctx api.ScrapeContext, newChanges []*models.ConfigChange) ([]*models.ConfigChange, []string, error) { - if len(newChanges) == 0 || ctx.ScrapeConfig().GetPersistedID() == nil { - return newChanges, nil, nil - } - - window := ctx.Properties().Duration("changes.max.window", rateLimitWindow) - max := ctx.Properties().Int("changes.max.count", maxChangesInWindow) - scraperID := ctx.ScrapeConfig().GetPersistedID().String() - - lock, loaded := scraperLocks.LoadOrStore(scraperID, &sync.Mutex{}) - lock.(*sync.Mutex).Lock() - defer lock.(*sync.Mutex).Unlock() - - _rateLimitedConfigs, _ := rateLimitedConfigsPerScraper.LoadOrStore(scraperID, map[string]struct{}{}) - rateLimitedConfigs := _rateLimitedConfigs.(map[string]struct{}) - - if !loaded { - // This is the initial sync of the rate limiter with the database. - // Happens only once for each scrape config. - if err := syncWindow(ctx, max, window); err != nil { - return nil, nil, err - } - - if rlConfigs, err := syncCurrentlyRateLimitedConfigs(ctx, window); err != nil { - return nil, nil, err - } else { - rateLimitedConfigs = rlConfigs - } - } - - rateLimitedThisRun := map[string]struct{}{} - passingNewChanges := make([]*models.ConfigChange, 0, len(newChanges)) - for _, change := range newChanges { - rateLimiter, ok := configRateLimiters[change.ConfigID] - if !ok { - rl, _ := sw.NewLimiter(window, int64(max), func() (sw.Window, sw.StopFunc) { - return sw.NewLocalWindow() - }) - configRateLimiters[change.ConfigID] = rl - rateLimiter = rl - } - - if !rateLimiter.Allow() { - ctx.Logger.V(1).Infof("change rate limited (config=%s, external_id=%s, type=%s)", change.ConfigID, change.ExternalChangeId, change.ChangeType) - rateLimitedThisRun[change.ConfigID] = struct{}{} - continue - } else { - delete(rateLimitedConfigs, change.ConfigID) - } - - passingNewChanges = append(passingNewChanges, change) - } - - var newlyRateLimited []string - for configID := range rateLimitedThisRun { - if _, ok := rateLimitedConfigs[configID]; !ok { - newlyRateLimited = append(newlyRateLimited, configID) - } - } - - rateLimitedConfigs = collections.MergeMap(rateLimitedConfigs, rateLimitedThisRun) - rateLimitedConfigsPerScraper.Store(scraperID, rateLimitedConfigs) - - return passingNewChanges, newlyRateLimited, nil -} - -func syncCurrentlyRateLimitedConfigs(ctx api.ScrapeContext, window time.Duration) (map[string]struct{}, error) { - query := `WITH latest_changes AS ( - SELECT - DISTINCT ON (config_id) config_id, - change_type - FROM - config_changes - LEFT JOIN config_items ON config_items.id = config_changes.config_id - WHERE - scraper_id = ? - AND NOW() - config_changes.created_at <= ? - ORDER BY - config_id, - config_changes.created_at DESC - ) SELECT config_id FROM latest_changes WHERE change_type = ?` - rows, err := ctx.DB().Raw(query, ctx.ScrapeConfig().GetPersistedID(), window, ChangeTypeTooManyChanges).Rows() - if err != nil { - return nil, err - } - defer rows.Close() - - output := make(map[string]struct{}) - for rows.Next() { - var id string - if err := rows.Scan(&id); err != nil { - return nil, err - } - - ctx.Logger.V(1).Infof("config %s is currently found to be rate limited", id) - output[id] = struct{}{} - } - - return output, rows.Err() -} - -// syncWindow syncs the rate limit window for the scraper with the changes in the db. -func syncWindow(ctx api.ScrapeContext, max int, window time.Duration) error { - query := `SELECT - config_id, - COUNT(*), - MIN(config_changes.created_at) AS min_created_at - FROM - config_changes - LEFT JOIN config_items ON config_items.id = config_changes.config_id - WHERE - scraper_id = ? - AND change_type != ? - AND NOW() - config_changes.created_at <= ? - GROUP BY - config_id` - rows, err := ctx.DB().Raw(query, ctx.ScrapeConfig().GetPersistedID(), ChangeTypeTooManyChanges, window).Rows() - if err != nil { - return err - } - defer rows.Close() - - for rows.Next() { - var configID string - var count int - var earliest time.Time - if err := rows.Scan(&configID, &count, &earliest); err != nil { - return err - } - ctx.Logger.V(3).Infof("config %s has %d changes in the last %s", configID, count, window) - - rateLimiter, _ := sw.NewLimiter(window, int64(max), func() (sw.Window, sw.StopFunc) { - win, stopper := ratelimit.NewLocalWindow() - if count > 0 { - win.SetStart(earliest) - win.AddCount(int64(count)) - } - return win, stopper - }) - configRateLimiters[configID] = rateLimiter - } - - return rows.Err() -} - -// filterOutPersistedChanges returns only those changes that weren't seen in the db. -func filterOutPersistedChanges(ctx api.ScrapeContext, changes []*models.ConfigChange) ([]*models.ConfigChange, error) { - // use cache to filter out ones that we've already seen before - changes = lo.Filter(changes, func(c *models.ConfigChange, _ int) bool { - _, found := configChangesCache.Get(c.ConfigID + c.ExternalChangeId) - if found { - _ = found - } - return !found - }) - - if len(changes) == 0 { - return nil, nil - } - - query := `SELECT config_id, external_change_id - FROM config_changes - WHERE (config_id, external_change_id) IN ?` - args := lo.Map(changes, func(c *models.ConfigChange, _ int) []string { - return []string{c.ConfigID, c.ExternalChangeId} - }) - - rows, err := ctx.DB().Raw(query, args).Rows() - if err != nil { - return nil, err - } - defer rows.Close() - - existing := make(map[string]struct{}) - for rows.Next() { - var configID, externalChangeID string - if err := rows.Scan(&configID, &externalChangeID); err != nil { - return nil, err - } - - configChangesCache.SetDefault(configID+externalChangeID, struct{}{}) - existing[configID+externalChangeID] = struct{}{} - } - - newOnes := lo.Filter(changes, func(c *models.ConfigChange, _ int) bool { - _, found := existing[c.ConfigID+c.ExternalChangeId] - return !found - }) - - if len(newOnes) > 0 { - _ = query - } - - return newOnes, nil -} diff --git a/db/changes_rate_limit.go b/db/changes_rate_limit.go new file mode 100644 index 00000000..15c41faf --- /dev/null +++ b/db/changes_rate_limit.go @@ -0,0 +1,162 @@ +package db + +import ( + "sync" + "time" + + "github.com/flanksource/commons/collections" + sw "github.com/flanksource/slidingwindow" + + "github.com/flanksource/config-db/api" + "github.com/flanksource/config-db/db/models" +) + +const ( + rateLimitWindow = time.Hour * 4 + maxChangesInWindow = 100 + + ChangeTypeTooManyChanges = "TooManyChanges" +) + +var ( + scraperLocks = sync.Map{} + configRateLimiters = map[string]*sw.Limiter{} + + // List of configs that are currently in being rate limited. + // It's used to avoid inserting multiple "TooManyChanges" changes for the same config. + currentlyRateLimitedConfigsPerScraper = sync.Map{} +) + +func rateLimitChanges(ctx api.ScrapeContext, newChanges []*models.ConfigChange) ([]*models.ConfigChange, []string, error) { + if len(newChanges) == 0 || ctx.ScrapeConfig().GetPersistedID() == nil { + return newChanges, nil, nil + } + + window := ctx.Properties().Duration("changes.max.window", rateLimitWindow) + max := ctx.Properties().Int("changes.max.count", maxChangesInWindow) + scraperID := ctx.ScrapeConfig().GetPersistedID().String() + + lock, loaded := scraperLocks.LoadOrStore(scraperID, &sync.Mutex{}) + lock.(*sync.Mutex).Lock() + defer lock.(*sync.Mutex).Unlock() + + _rateLimitedConfigs, _ := currentlyRateLimitedConfigsPerScraper.LoadOrStore(scraperID, map[string]struct{}{}) + rateLimitedConfigs := _rateLimitedConfigs.(map[string]struct{}) + + if !loaded { + // This is the initial sync of the rate limiter with the database. + // Happens only once for each scrape config. + if err := syncWindow(ctx, max, window); err != nil { + return nil, nil, err + } + + if rlConfigs, err := syncCurrentlyRateLimitedConfigs(ctx, window); err != nil { + return nil, nil, err + } else { + rateLimitedConfigs = rlConfigs + } + } + + var rateLimitedConfigsThisRun = make(map[string]struct{}) + var passingNewChanges = make([]*models.ConfigChange, 0, len(newChanges)) + for _, change := range newChanges { + if _, ok := rateLimitedConfigs[change.ConfigID]; ok { + rateLimitedConfigsThisRun[change.ConfigID] = struct{}{} + } else { + passingNewChanges = append(passingNewChanges, change) + } + } + + // Find those changes that were rate limited only this run but + // weren't previously in the rate limited state. + var newlyRateLimited []string + for configID := range rateLimitedConfigsThisRun { + if _, ok := rateLimitedConfigs[configID]; !ok { + newlyRateLimited = append(newlyRateLimited, configID) + } + } + + rateLimitedConfigs = collections.MergeMap(rateLimitedConfigs, rateLimitedConfigsThisRun) + currentlyRateLimitedConfigsPerScraper.Store(scraperID, rateLimitedConfigs) + + return passingNewChanges, newlyRateLimited, nil +} + +func syncCurrentlyRateLimitedConfigs(ctx api.ScrapeContext, window time.Duration) (map[string]struct{}, error) { + query := `WITH latest_changes AS ( + SELECT + DISTINCT ON (config_id) config_id, + change_type + FROM + config_changes + LEFT JOIN config_items ON config_items.id = config_changes.config_id + WHERE + scraper_id = ? + AND NOW() - config_changes.created_at <= ? + ORDER BY + config_id, + config_changes.created_at DESC + ) SELECT config_id FROM latest_changes WHERE change_type = ?` + rows, err := ctx.DB().Raw(query, ctx.ScrapeConfig().GetPersistedID(), window, ChangeTypeTooManyChanges).Rows() + if err != nil { + return nil, err + } + defer rows.Close() + + output := make(map[string]struct{}) + for rows.Next() { + var id string + if err := rows.Scan(&id); err != nil { + return nil, err + } + + ctx.Logger.V(3).Infof("config %s is currently found to be rate limited", id) + output[id] = struct{}{} + } + + return output, rows.Err() +} + +// syncWindow syncs the rate limit window for the scraper with the changes in the db. +func syncWindow(ctx api.ScrapeContext, max int, window time.Duration) error { + query := `SELECT + config_id, + COUNT(*), + MIN(config_changes.created_at) AS min_created_at + FROM + config_changes + LEFT JOIN config_items ON config_items.id = config_changes.config_id + WHERE + scraper_id = ? + AND change_type != ? + AND NOW() - config_changes.created_at <= ? + GROUP BY + config_id` + rows, err := ctx.DB().Raw(query, ctx.ScrapeConfig().GetPersistedID(), ChangeTypeTooManyChanges, window).Rows() + if err != nil { + return err + } + defer rows.Close() + + for rows.Next() { + var configID string + var count int + var earliest time.Time + if err := rows.Scan(&configID, &count, &earliest); err != nil { + return err + } + ctx.Logger.V(3).Infof("config %s has %d changes in the last %s", configID, count, window) + + rateLimiter, _ := sw.NewLimiter(window, int64(max), func() (sw.Window, sw.StopFunc) { + win, stopper := sw.NewLocalWindow() + if count > 0 { + win.SetStart(earliest) + win.AddCount(int64(count)) + } + return win, stopper + }) + configRateLimiters[configID] = rateLimiter + } + + return rows.Err() +} diff --git a/db/models/config_change.go b/db/models/config_change.go index ae2534f6..8545db14 100644 --- a/db/models/config_change.go +++ b/db/models/config_change.go @@ -5,8 +5,8 @@ import ( "time" v1 "github.com/flanksource/config-db/api/v1" - "github.com/google/uuid" "gorm.io/gorm" + "gorm.io/gorm/clause" ) // ConfigChange represents the config change database table @@ -14,8 +14,8 @@ type ConfigChange struct { ExternalID string `gorm:"-"` ConfigType string `gorm:"-"` ExternalChangeId string `gorm:"column:external_change_id" json:"external_change_id"` - ID string `gorm:"primaryKey;unique_index;not null;column:id" json:"id"` - ConfigID string `gorm:"column:config_id;default:''" json:"config_id"` + ID string `gorm:"primaryKey;unique_index;not null;column:id;default:generate_ulid()" json:"id"` + ConfigID string `gorm:"column:config_id" json:"config_id"` ChangeType string `gorm:"column:change_type" json:"change_type"` Diff *string `gorm:"column:diff" json:"diff,omitempty"` Severity string `gorm:"column:severity" json:"severity"` @@ -61,9 +61,6 @@ func NewConfigChangeFromV1(result v1.ScrapeResult, change v1.ChangeResult) *Conf } func (c *ConfigChange) BeforeCreate(tx *gorm.DB) (err error) { - if c.ID == "" { - c.ID = uuid.New().String() - } - + tx.Statement.AddClause(clause.OnConflict{DoNothing: true}) return } diff --git a/db/update.go b/db/update.go index d3de2695..e07fb9c7 100644 --- a/db/update.go +++ b/db/update.go @@ -6,6 +6,7 @@ import ( "slices" "strconv" "strings" + "sync" "time" "github.com/aws/smithy-go/ptr" @@ -21,6 +22,7 @@ import ( dutyContext "github.com/flanksource/duty/context" dutyModels "github.com/flanksource/duty/models" "github.com/flanksource/gomplate/v3" + sw "github.com/flanksource/slidingwindow" "github.com/google/uuid" "github.com/hexops/gotextdiff" "github.com/hexops/gotextdiff/myers" @@ -31,7 +33,11 @@ import ( "gorm.io/gorm/clause" ) -const configItemsBulkInsertSize = 200 +const ( + configItemsBulkInsertSize = 200 + + configChangesBulkInsertSize = 200 +) func deleteChangeHandler(ctx api.ScrapeContext, change v1.ChangeResult) error { var deletedAt interface{} @@ -209,8 +215,8 @@ func shouldExcludeChange(result *v1.ScrapeResult, changeResult v1.ChangeResult) func extractChanges(ctx api.ScrapeContext, result *v1.ScrapeResult, ci *models.ConfigItem) ([]*models.ConfigChange, []*models.ConfigChange, error) { var ( - toInsert = []*models.ConfigChange{} - toUpdate = []*models.ConfigChange{} + newOnes = []*models.ConfigChange{} + updates = []*models.ConfigChange{} ) changes.ProcessRules(result, result.BaseScraper.Transform.Change.Mapping...) @@ -267,19 +273,13 @@ func extractChanges(ctx api.ScrapeContext, result *v1.ScrapeResult, ci *models.C } if changeResult.UpdateExisting { - toUpdate = append(toUpdate, change) + updates = append(updates, change) } else { - toInsert = append(toInsert, change) + newOnes = append(newOnes, change) } } - // Remove the changes that have already been inserted. - newOnes, err := filterOutPersistedChanges(ctx, toInsert) - if err != nil { - return nil, nil, err - } - - return newOnes, toUpdate, nil + return newOnes, updates, nil } func upsertAnalysis(ctx api.ScrapeContext, result *v1.ScrapeResult) error { @@ -367,10 +367,46 @@ func SaveResults(ctx api.ScrapeContext, results []v1.ScrapeResult) error { return fmt.Errorf("failed to rate limit changes: %w", err) } - if err := ctx.DB().CreateInBatches(newChanges, configItemsBulkInsertSize).Error; err != nil { - return fmt.Errorf("failed to create config changes: %w", err) + // NOTE: Returning just the needed columns didn't work in gorm for some reason. + // So, returning * for now. + // returnClause := clause.Returning{Columns: []clause.Column{{Name: "id"}, {Name: "config_id"}, {Name: "external_change_id"}}} + returnClause := clause.Returning{} + + // NOTE: Ran into a weird gorm behavior where CreateInBatches combined with a return clause + // panics. So, handling batching manually. + var rateLimitedAfterInsertion = map[string]struct{}{} + for _, batch := range lo.Chunk(newChanges, configChangesBulkInsertSize) { + if err := ctx.DB().Clauses(returnClause).Create(&batch).Error; err != nil { + return fmt.Errorf("failed to create config changes: %w", err) + } + + if len(batch) != 0 { + // the `batch` slice now only contains those changes that were actually inserted. + // we increase the usage count on the rate limiter for those changes. + _l, _ := scraperLocks.LoadOrStore(ctx.ScrapeConfig().GetPersistedID().String(), &sync.Mutex{}) + lock := _l.(*sync.Mutex) + lock.Lock() + for _, b := range batch { + rateLimiter, ok := configRateLimiters[b.ConfigID] + if !ok { + window := ctx.Properties().Duration("changes.max.window", rateLimitWindow) + max := ctx.Properties().Int("changes.max.count", maxChangesInWindow) + rateLimiter, _ = sw.NewLimiter(window, int64(max), func() (sw.Window, sw.StopFunc) { + return sw.NewLocalWindow() + }) + configRateLimiters[b.ConfigID] = rateLimiter + } + + if rateLimiter.Allow() { + rateLimitedAfterInsertion[b.ConfigID] = struct{}{} + } + } + lock.Unlock() + } } + syncCurrentlyRatelimitedConfigMap(ctx, newChanges, rateLimitedAfterInsertion) + // For all the rate limited configs, we add a new "TooManyChanges" change. // This is intentionally inserted in a different batch from the new changes // as "ChangeTypeTooManyChanges" will have the same created_at timestamp. @@ -385,7 +421,7 @@ func SaveResults(ctx api.ScrapeContext, results []v1.ScrapeResult) error { }) } - if err := ctx.DB().CreateInBatches(rateLimitedChanges, configItemsBulkInsertSize).Error; err != nil { + if err := ctx.DB().CreateInBatches(rateLimitedChanges, configChangesBulkInsertSize).Error; err != nil { return fmt.Errorf("failed to create rate limited config changes: %w", err) } @@ -440,6 +476,27 @@ func SaveResults(ctx api.ScrapeContext, results []v1.ScrapeResult) error { return nil } +func syncCurrentlyRatelimitedConfigMap(ctx api.ScrapeContext, insertedChanged []*models.ConfigChange, rateLimitedAfterInsertion map[string]struct{}) { + _l, _ := scraperLocks.LoadOrStore(ctx.ScrapeConfig().GetPersistedID().String(), &sync.Mutex{}) + lock := _l.(*sync.Mutex) + lock.Lock() + + m, _ := currentlyRateLimitedConfigsPerScraper.LoadOrStore(ctx.ScrapeConfig().GetPersistedID().String(), map[string]struct{}{}) + mm := m.(map[string]struct{}) + for _, c := range insertedChanged { + if _, ok := rateLimitedAfterInsertion[c.ConfigID]; !ok { + // All the configs that weren't rate limited, will need to be removed from the + // "currently rate limited" map + delete(mm, c.ConfigID) + } else { + mm[c.ConfigID] = struct{}{} + } + } + + currentlyRateLimitedConfigsPerScraper.Store(ctx.ScrapeConfig().GetPersistedID().String(), mm) + lock.Unlock() +} + func updateLastScrapedTime(ctx api.ScrapeContext, ids []string) error { if len(ids) == 0 { return nil diff --git a/go.mod b/go.mod index 8403a33e..d79179cc 100644 --- a/go.mod +++ b/go.mod @@ -19,7 +19,6 @@ require ( github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/subscription/armsubscription v1.1.0 github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/trafficmanager/armtrafficmanager v1.0.0 github.com/Jeffail/gabs/v2 v2.7.0 - github.com/RussellLuo/slidingwindow v0.0.0-20200528002341-535bb99d338b github.com/aws/aws-sdk-go-v2 v1.18.0 github.com/aws/aws-sdk-go-v2/config v1.18.25 github.com/aws/aws-sdk-go-v2/credentials v1.13.24 @@ -47,6 +46,7 @@ require ( github.com/flanksource/is-healthy v1.0.7 github.com/flanksource/ketall v1.1.6 github.com/flanksource/mapstructure v1.6.0 + github.com/flanksource/slidingwindow v0.0.0-20240526171711-1e13c04e057b github.com/go-logr/zapr v1.2.4 github.com/gobwas/glob v0.2.3 github.com/gomarkdown/markdown v0.0.0-20230322041520-c84983bdbf2a @@ -118,7 +118,6 @@ require ( github.com/ghodss/yaml v1.0.0 // indirect github.com/go-logr/stdr v1.2.2 // indirect github.com/go-openapi/inflect v0.19.0 // indirect - github.com/go-redis/redis v6.15.9+incompatible // indirect github.com/go-task/slim-sprig/v3 v3.0.0 // indirect github.com/golang-jwt/jwt/v5 v5.0.0 // indirect github.com/golang-sql/civil v0.0.0-20220223132316-b832511892a9 // indirect diff --git a/go.sum b/go.sum index 2452d2e3..f080de42 100644 --- a/go.sum +++ b/go.sum @@ -672,8 +672,6 @@ github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578/go.mod h1:uGdko github.com/RaveNoX/go-jsoncommentstrip v1.0.0/go.mod h1:78ihd09MekBnJnxpICcwzCMzGrKSKYe4AqU6PDYYpjk= github.com/RaveNoX/go-jsonmerge v1.0.0 h1:2e0nqnadoGUP8rAvcA0hkQelZreVO5X3BHomT2XMrAk= github.com/RaveNoX/go-jsonmerge v1.0.0/go.mod h1:qYM/NA77LhO4h51JJM7Z+xBU3ovqrNIACZe+SkSNVFo= -github.com/RussellLuo/slidingwindow v0.0.0-20200528002341-535bb99d338b h1:5/++qT1/z812ZqBvqQt6ToRswSuPZ/B33m6xVHRzADU= -github.com/RussellLuo/slidingwindow v0.0.0-20200528002341-535bb99d338b/go.mod h1:4+EPqMRApwwE/6yo6CxiHoSnBzjRr3jsqer7frxP8y4= github.com/TomOnTime/utfutil v0.0.0-20210710122150-437f72b26edf h1:+GdVyvpzTy3UFAS1+hbTqm9Mk0U1Xrocm28s/E2GWz0= github.com/TomOnTime/utfutil v0.0.0-20210710122150-437f72b26edf/go.mod h1:FiuynIwe98RFhWI8nZ0dnsldPVsBy9rHH1hn2WYwme4= github.com/WinterYukky/gorm-extra-clause-plugin v0.2.0 h1:s1jobT8PlSyG/FXczfoGSt4r46iPiT4ZShe35k5/2y4= @@ -874,6 +872,8 @@ github.com/flanksource/mapstructure v1.6.0 h1:+1kJ+QsO1SxjAgktfLlpZXetsVSJ0uCLhG github.com/flanksource/mapstructure v1.6.0/go.mod h1:dttg5+FFE2sp4D/CrcPCVqufNDrBggDaM+08nk5S8Ps= github.com/flanksource/postq v0.1.3 h1:eTslG04hwxAvntZv8gIUsXKQPLGeLiRPNkZC+kQdL7c= github.com/flanksource/postq v0.1.3/go.mod h1:AAuaPRhpqxvyF7JPs8X1NMsJVenh80ldpJPDVgWvFf8= +github.com/flanksource/slidingwindow v0.0.0-20240526171711-1e13c04e057b h1:zB2nVrRAUgrZQb2eutgKzWd6ld7syPacrY/XQmz7Wks= +github.com/flanksource/slidingwindow v0.0.0-20240526171711-1e13c04e057b/go.mod h1:+hHPT8Yx+8I6i4SF6WwvwRso532uHlPJ1T029dtHFak= github.com/fogleman/gg v1.2.1-0.20190220221249-0403632d5b90/go.mod h1:R/bRT+9gY/C5z7JzPU0zXsXHKM4/ayA+zqcVNZzPa1k= github.com/fogleman/gg v1.3.0/go.mod h1:R/bRT+9gY/C5z7JzPU0zXsXHKM4/ayA+zqcVNZzPa1k= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= @@ -924,8 +924,6 @@ github.com/go-openapi/swag v0.22.4 h1:QLMzNJnMGPRNDCbySlcj1x01tzU8/9LTTL9hZZZogB github.com/go-openapi/swag v0.22.4/go.mod h1:UzaqsxGiab7freDnrUUra0MwWfN/q7tE4j+VcZ0yl14= github.com/go-pdf/fpdf v0.5.0/go.mod h1:HzcnA+A23uwogo0tp9yU+l3V+KXhiESpt1PMayhOh5M= github.com/go-pdf/fpdf v0.6.0/go.mod h1:HzcnA+A23uwogo0tp9yU+l3V+KXhiESpt1PMayhOh5M= -github.com/go-redis/redis v6.15.9+incompatible h1:K0pv1D7EQUjfyoMql+r/jZqCLizCGKFlFgcHWWmHQjg= -github.com/go-redis/redis v6.15.9+incompatible/go.mod h1:NAIEuMOZ/fxfXJIrKDQDz8wamY7mA7PouImQ2Jvg6kA= github.com/go-resty/resty/v2 v2.7.0 h1:me+K9p3uhSmXtrBZ4k9jcEAfJmuC8IivWHwaLZwPrFY= github.com/go-resty/resty/v2 v2.7.0/go.mod h1:9PWDzw47qPphMRFfhsyk0NnSgvluHcljSMVIq3w7q0I= github.com/go-sql-driver/mysql v1.7.1 h1:lUIinVbN1DY0xBg0eMOzmmtGoHwWBbvnWubQUrtU8EI= diff --git a/pkg/ratelimit/ratelimit.go b/pkg/ratelimit/ratelimit.go deleted file mode 100644 index fa106bdf..00000000 --- a/pkg/ratelimit/ratelimit.go +++ /dev/null @@ -1,48 +0,0 @@ -package ratelimit - -import ( - "time" - - sw "github.com/RussellLuo/slidingwindow" -) - -// LocalWindow represents a window that ignores sync behavior entirely -// and only stores counters in memory. -// -// NOTE: It's an exact copy of the LocalWindow provided by RussellLuo/slidingwindow -// with an added capability of setting a custom start time. -type LocalWindow struct { - // The start boundary (timestamp in nanoseconds) of the window. - // [start, start + size) - start int64 - - // The total count of events happened in the window. - count int64 -} - -func NewLocalWindow() (*LocalWindow, sw.StopFunc) { - return &LocalWindow{}, func() {} -} - -func (w *LocalWindow) SetStart(s time.Time) { - w.start = s.UnixNano() -} - -func (w *LocalWindow) Start() time.Time { - return time.Unix(0, w.start) -} - -func (w *LocalWindow) Count() int64 { - return w.count -} - -func (w *LocalWindow) AddCount(n int64) { - w.count += n -} - -func (w *LocalWindow) Reset(s time.Time, c int64) { - w.start = s.UnixNano() - w.count = c -} - -func (w *LocalWindow) Sync(now time.Time) {}