Skip to content

Commit 436fd77

Browse files
WiP
Signed-off-by: Gabriel Adrian Samfira <[email protected]>
1 parent 020210d commit 436fd77

File tree

7 files changed

+300
-6
lines changed

7 files changed

+300
-6
lines changed

apiserver/controllers/instances.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ func (a *APIController) ListScaleSetInstancesHandler(w http.ResponseWriter, r *h
9797
}
9898
return
9999
}
100-
id, err := strconv.ParseUint(scalesetID, 10, 64)
100+
id, err := strconv.ParseUint(scalesetID, 10, 32)
101101
if err != nil {
102102
slog.With(slog.Any("error", err)).ErrorContext(ctx, "failed to parse id")
103103
handleError(ctx, w, gErrors.ErrBadRequest)

apiserver/controllers/scalesets.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ func (a *APIController) GetScaleSetByIDHandler(w http.ResponseWriter, r *http.Re
7979
}
8080
return
8181
}
82-
id, err := strconv.ParseUint(scaleSetID, 10, 64)
82+
id, err := strconv.ParseUint(scaleSetID, 10, 32)
8383
if err != nil {
8484
slog.With(slog.Any("error", err)).ErrorContext(ctx, "failed to parse id")
8585
handleError(ctx, w, gErrors.ErrBadRequest)
@@ -130,7 +130,7 @@ func (a *APIController) DeleteScaleSetByIDHandler(w http.ResponseWriter, r *http
130130
return
131131
}
132132

133-
id, err := strconv.ParseUint(scalesetID, 10, 64)
133+
id, err := strconv.ParseUint(scalesetID, 10, 32)
134134
if err != nil {
135135
slog.With(slog.Any("error", err)).ErrorContext(ctx, "failed to parse id")
136136
handleError(ctx, w, gErrors.ErrBadRequest)
@@ -183,7 +183,7 @@ func (a *APIController) UpdateScaleSetByIDHandler(w http.ResponseWriter, r *http
183183
return
184184
}
185185

186-
id, err := strconv.ParseUint(scalesetID, 10, 64)
186+
id, err := strconv.ParseUint(scalesetID, 10, 32)
187187
if err != nil {
188188
slog.With(slog.Any("error", err)).ErrorContext(ctx, "failed to parse id")
189189
handleError(ctx, w, gErrors.ErrBadRequest)

database/sql/models.go

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,17 @@ type Pool struct {
8686
Priority uint `gorm:"index:idx_pool_priority"`
8787
}
8888

89+
type ScaleSetEvent struct {
90+
gorm.Model
91+
92+
EventType params.EventType
93+
EventLevel params.EventLevel
94+
Message string `gorm:"type:text"`
95+
96+
ScaleSetID uint `gorm:"index:idx_scale_set_event"`
97+
ScaleSet ScaleSet `gorm:"foreignKey:ScaleSetID"`
98+
}
99+
89100
// ScaleSet represents a github scale set. Scale sets are almost identical to pools with a few
90101
// notable exceptions:
91102
// - Labels are no longer relevant
@@ -135,7 +146,11 @@ type ScaleSet struct {
135146
EnterpriseID *uuid.UUID `gorm:"index"`
136147
Enterprise Enterprise `gorm:"foreignKey:EnterpriseID"`
137148

138-
Instances []Instance `gorm:"foreignKey:ScaleSetFkID"`
149+
Status string
150+
StatusReason string `gorm:"type:text"`
151+
152+
Instances []Instance `gorm:"foreignKey:ScaleSetFkID"`
153+
Events []ScaleSetEvent `gorm:"foreignKey:ScaleSetID;constraint:OnDelete:CASCADE,OnUpdate:CASCADE;"`
139154
}
140155

141156
type RepositoryEvent struct {

database/sql/sql.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -432,6 +432,7 @@ func (s *sqlDatabase) migrateDB() error {
432432
&ControllerInfo{},
433433
&WorkflowJob{},
434434
&ScaleSet{},
435+
&ScaleSetEvent{},
435436
); err != nil {
436437
return errors.Wrap(err, "running auto migrate")
437438
}

database/sql/util.go

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -634,6 +634,40 @@ func (s *sqlDatabase) GetGithubEntity(_ context.Context, entityType params.Githu
634634
return entity, nil
635635
}
636636

637+
func (s *sqlDatabase) AddScaleSetEvent(ctx context.Context, scaleSetID uint, event params.EventType, eventLevel params.EventLevel, statusMessage string, maxEvents int) error {
638+
scaleSet, err := s.GetScaleSetByID(ctx, scaleSetID)
639+
if err != nil {
640+
return errors.Wrap(err, "updating instance")
641+
}
642+
643+
msg := InstanceStatusUpdate{
644+
Message: statusMessage,
645+
EventType: event,
646+
EventLevel: eventLevel,
647+
}
648+
649+
if err := s.conn.Model(&scaleSet).Association("Events").Append(&msg); err != nil {
650+
return errors.Wrap(err, "adding status message")
651+
}
652+
653+
if maxEvents > 0 {
654+
var latestEvents []ScaleSetEvent
655+
q := s.conn.Model(&ScaleSetEvent{}).
656+
Limit(maxEvents).Order("id desc").
657+
Where("scale_set_id = ?", scaleSetID).Find(&latestEvents)
658+
if q.Error != nil {
659+
return errors.Wrap(q.Error, "fetching latest events")
660+
}
661+
if len(latestEvents) == maxEvents {
662+
lastInList := latestEvents[len(latestEvents)-1]
663+
if err := s.conn.Where("scale_set_id = ? and id < ?", scaleSetID, lastInList.ID).Unscoped().Delete(&ScaleSetEvent{}).Error; err != nil {
664+
return errors.Wrap(err, "deleting old events")
665+
}
666+
}
667+
}
668+
return nil
669+
}
670+
637671
func (s *sqlDatabase) addRepositoryEvent(ctx context.Context, repoID string, event params.EventType, eventLevel params.EventLevel, statusMessage string, maxEvents int) error {
638672
repo, err := s.GetRepositoryByID(ctx, repoID)
639673
if err != nil {

workers/provider/provider.go

Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,12 @@ package provider
33
import (
44
"context"
55
"fmt"
6+
"log/slog"
67
"sync"
78

89
dbCommon "github.com/cloudbase/garm/database/common"
910
"github.com/cloudbase/garm/database/watcher"
11+
"github.com/cloudbase/garm/params"
1012
"github.com/cloudbase/garm/runner/common"
1113
)
1214

@@ -32,12 +34,51 @@ type provider struct {
3234
store dbCommon.Store
3335

3436
providers map[string]common.Provider
37+
// A cache of all scale sets kept updated by the watcher.
38+
// This helps us avoid a bunch of queries to the database.
39+
scaleSets map[uint]params.ScaleSet
40+
runners map[string]params.Instance
3541

3642
mux sync.Mutex
3743
running bool
3844
quit chan struct{}
3945
}
4046

47+
func (p *provider) loadAllScaleSets() error {
48+
p.mux.Lock()
49+
defer p.mux.Unlock()
50+
51+
scaleSets, err := p.store.ListAllScaleSets(p.ctx)
52+
if err != nil {
53+
return fmt.Errorf("fetching scale sets: %w", err)
54+
}
55+
56+
for _, scaleSet := range scaleSets {
57+
p.scaleSets[scaleSet.ID] = scaleSet
58+
}
59+
60+
return nil
61+
}
62+
63+
// loadAllRunners loads all runners from the database. At this stage we only
64+
// care about runners created by scale sets, but in the future, we will migrate
65+
// the pool manager to the same model.
66+
func (p *provider) loadAllRunners() error {
67+
p.mux.Lock()
68+
defer p.mux.Unlock()
69+
70+
runners, err := p.store.ListAllInstances(p.ctx)
71+
if err != nil {
72+
return fmt.Errorf("fetching runners: %w", err)
73+
}
74+
75+
for _, runner := range runners {
76+
p.runners[runner.Name] = runner
77+
}
78+
79+
return nil
80+
}
81+
4182
func (p *provider) Start() error {
4283
p.mux.Lock()
4384
defer p.mux.Unlock()
@@ -46,6 +87,14 @@ func (p *provider) Start() error {
4687
return nil
4788
}
4889

90+
if err := p.loadAllScaleSets(); err != nil {
91+
return fmt.Errorf("loading all scale sets: %w", err)
92+
}
93+
94+
if err := p.loadAllRunners(); err != nil {
95+
return fmt.Errorf("loading all runners: %w", err)
96+
}
97+
4998
consumer, err := watcher.RegisterConsumer(
5099
p.ctx, p.consumerID, composeProviderWatcher())
51100
if err != nil {
@@ -55,6 +104,8 @@ func (p *provider) Start() error {
55104

56105
p.quit = make(chan struct{})
57106
p.running = true
107+
go p.loop()
108+
58109
return nil
59110
}
60111

@@ -71,3 +122,75 @@ func (p *provider) Stop() error {
71122
p.running = false
72123
return nil
73124
}
125+
126+
func (p *provider) loop() {
127+
defer p.Stop()
128+
for {
129+
select {
130+
case payload := <-p.consumer.Watch():
131+
slog.InfoContext(p.ctx, "received payload", slog.Any("payload", payload))
132+
go p.handleWatcherEvent(payload)
133+
case <-p.ctx.Done():
134+
return
135+
case <-p.quit:
136+
return
137+
}
138+
}
139+
}
140+
141+
func (p *provider) handleWatcherEvent(payload dbCommon.ChangePayload) {
142+
switch payload.EntityType {
143+
case dbCommon.ScaleSetEntityType:
144+
p.handleScaleSetEvent(payload)
145+
case dbCommon.InstanceEntityType:
146+
p.handleInstanceEvent(payload)
147+
default:
148+
slog.ErrorContext(p.ctx, "invalid entity type", "entity_type", payload.EntityType)
149+
}
150+
}
151+
152+
func (p *provider) handleScaleSetEvent(event dbCommon.ChangePayload) {
153+
p.mux.Lock()
154+
defer p.mux.Unlock()
155+
156+
scaleSet, ok := event.Payload.(params.ScaleSet)
157+
if !ok {
158+
slog.ErrorContext(p.ctx, "invalid payload type", "payload_type", fmt.Sprintf("%T", event.Payload))
159+
return
160+
}
161+
162+
switch event.Operation {
163+
case dbCommon.CreateOperation, dbCommon.UpdateOperation:
164+
slog.DebugContext(p.ctx, "got create/update operation")
165+
p.scaleSets[scaleSet.ID] = scaleSet
166+
case dbCommon.DeleteOperation:
167+
slog.DebugContext(p.ctx, "got delete operation")
168+
delete(p.scaleSets, scaleSet.ID)
169+
default:
170+
slog.ErrorContext(p.ctx, "invalid operation type", "operation_type", event.Operation)
171+
return
172+
}
173+
}
174+
175+
func (p *provider) handleInstanceEvent(event dbCommon.ChangePayload) {
176+
p.mux.Lock()
177+
defer p.mux.Unlock()
178+
179+
instance, ok := event.Payload.(params.Instance)
180+
if !ok {
181+
slog.ErrorContext(p.ctx, "invalid payload type", "payload_type", fmt.Sprintf("%T", event.Payload))
182+
return
183+
}
184+
185+
switch event.Operation {
186+
case dbCommon.CreateOperation, dbCommon.UpdateOperation:
187+
slog.DebugContext(p.ctx, "got create/update operation")
188+
p.runners[instance.Name] = instance
189+
case dbCommon.DeleteOperation:
190+
slog.DebugContext(p.ctx, "got delete operation")
191+
delete(p.runners, instance.Name)
192+
default:
193+
slog.ErrorContext(p.ctx, "invalid operation type", "operation_type", event.Operation)
194+
return
195+
}
196+
}

0 commit comments

Comments
 (0)