Skip to content

Commit 020210d

Browse files
Handle scale up and down; add provider worker
Signed-off-by: Gabriel Adrian Samfira <[email protected]>
1 parent 7376a5f commit 020210d

File tree

14 files changed

+372
-39
lines changed

14 files changed

+372
-39
lines changed

cmd/garm/main.go

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ import (
4646
"github.com/cloudbase/garm/params"
4747
"github.com/cloudbase/garm/runner" //nolint:typecheck
4848
runnerMetrics "github.com/cloudbase/garm/runner/metrics"
49+
"github.com/cloudbase/garm/runner/providers"
4950
garmUtil "github.com/cloudbase/garm/util"
5051
"github.com/cloudbase/garm/util/appdefaults"
5152
"github.com/cloudbase/garm/websocket"
@@ -62,16 +63,17 @@ var signals = []os.Signal{
6263
syscall.SIGTERM,
6364
}
6465

65-
func maybeInitController(db common.Store) error {
66-
if _, err := db.ControllerInfo(); err == nil {
67-
return nil
66+
func maybeInitController(db common.Store) (params.ControllerInfo, error) {
67+
if info, err := db.ControllerInfo(); err == nil {
68+
return info, nil
6869
}
6970

70-
if _, err := db.InitController(); err != nil {
71-
return errors.Wrap(err, "initializing controller")
71+
info, err := db.InitController()
72+
if err != nil {
73+
return params.ControllerInfo{}, errors.Wrap(err, "initializing controller")
7274
}
7375

74-
return nil
76+
return info, nil
7577
}
7678

7779
func setupLogging(ctx context.Context, logCfg config.Logging, hub *websocket.Hub) {
@@ -212,7 +214,8 @@ func main() {
212214
log.Fatal(err)
213215
}
214216

215-
if err := maybeInitController(db); err != nil {
217+
controllerInfo, err := maybeInitController(db)
218+
if err != nil {
216219
log.Fatal(err)
217220
}
218221

@@ -231,7 +234,12 @@ func main() {
231234
log.Fatal(err)
232235
}
233236

234-
entityController, err := entity.NewController(ctx, db, *cfg)
237+
providers, err := providers.LoadProvidersFromConfig(ctx, *cfg, controllerInfo.ControllerID.String())
238+
if err != nil {
239+
log.Fatalf("loading providers: %+v", err)
240+
}
241+
242+
entityController, err := entity.NewController(ctx, db, providers)
235243
if err != nil {
236244
log.Fatalf("failed to create entity controller: %+v", err)
237245
}

database/common/store.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,7 @@ type UserStore interface {
9292
type InstanceStore interface {
9393
CreateInstance(ctx context.Context, poolID string, param params.CreateInstanceParams) (params.Instance, error)
9494
DeleteInstance(ctx context.Context, poolID string, instanceName string) error
95+
DeleteInstanceByName(ctx context.Context, instanceName string) error
9596
UpdateInstance(ctx context.Context, instanceName string, param params.UpdateInstanceParams) (params.Instance, error)
9697

9798
// Probably a bad idea without some king of filter or at least pagination

database/sql/instances.go

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,39 @@ func (s *sqlDatabase) DeleteInstance(_ context.Context, poolID string, instanceN
177177
return nil
178178
}
179179

180+
func (s *sqlDatabase) DeleteInstanceByName(ctx context.Context, instanceName string) error {
181+
instance, err := s.getInstanceByName(ctx, instanceName)
182+
if err != nil {
183+
return errors.Wrap(err, "deleting instance")
184+
}
185+
186+
defer func() {
187+
if err == nil {
188+
var providerID string
189+
if instance.ProviderID != nil {
190+
providerID = *instance.ProviderID
191+
}
192+
if notifyErr := s.sendNotify(common.InstanceEntityType, common.DeleteOperation, params.Instance{
193+
ID: instance.ID.String(),
194+
Name: instance.Name,
195+
ProviderID: providerID,
196+
AgentID: instance.AgentID,
197+
PoolID: instance.PoolID.String(),
198+
}); notifyErr != nil {
199+
slog.With(slog.Any("error", notifyErr)).Error("failed to send notify")
200+
}
201+
}
202+
}()
203+
204+
if q := s.conn.Unscoped().Delete(&instance); q.Error != nil {
205+
if errors.Is(q.Error, gorm.ErrRecordNotFound) {
206+
return nil
207+
}
208+
return errors.Wrap(q.Error, "deleting instance")
209+
}
210+
return nil
211+
}
212+
180213
func (s *sqlDatabase) AddInstanceEvent(ctx context.Context, instanceName string, event params.EventType, eventLevel params.EventLevel, statusMessage string) error {
181214
instance, err := s.getInstanceByName(ctx, instanceName)
182215
if err != nil {
@@ -293,7 +326,7 @@ func (s *sqlDatabase) ListPoolInstances(_ context.Context, poolID string) ([]par
293326
func (s *sqlDatabase) ListAllInstances(_ context.Context) ([]params.Instance, error) {
294327
var instances []Instance
295328

296-
q := s.conn.Model(&Instance{}).Preload("Job", "Pool", "ScaleSet").Find(&instances)
329+
q := s.conn.Model(&Instance{}).Preload("Job").Find(&instances)
297330
if q.Error != nil {
298331
return nil, errors.Wrap(q.Error, "fetching instances")
299332
}

database/sql/models.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -277,7 +277,7 @@ type Instance struct {
277277
GitHubRunnerGroup string
278278
AditionalLabels datatypes.JSON
279279

280-
PoolID uuid.UUID
280+
PoolID *uuid.UUID
281281
Pool Pool `gorm:"foreignKey:PoolID"`
282282

283283
ScaleSetFkID *uint

database/sql/scaleset_instances.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ func (s *sqlDatabase) CreateScaleSetInstance(_ context.Context, scaleSetID uint,
5151

5252
func (s *sqlDatabase) ListScaleSetInstances(_ context.Context, scalesetID uint) ([]params.Instance, error) {
5353
var instances []Instance
54-
query := s.conn.Model(&Instance{}).Preload("Job", "ScaleSet").Where("scale_set_fk_id = ?", scalesetID)
54+
query := s.conn.Model(&Instance{}).Preload("Job").Where("scale_set_fk_id = ?", scalesetID)
5555

5656
if err := query.Find(&instances); err.Error != nil {
5757
return nil, errors.Wrap(err.Error, "fetching instances")

database/sql/util.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ func (s *sqlDatabase) sqlToParamsInstance(instance Instance) (params.Instance, e
7979
ret.RunnerBootstrapTimeout = instance.ScaleSet.RunnerBootstrapTimeout
8080
}
8181

82-
if instance.PoolID != uuid.Nil {
82+
if instance.PoolID != nil {
8383
ret.PoolID = instance.PoolID.String()
8484
ret.ProviderName = instance.Pool.ProviderName
8585
ret.RunnerBootstrapTimeout = instance.Pool.RunnerBootstrapTimeout

database/watcher/filters.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package watcher
22

33
import (
4+
commonParams "github.com/cloudbase/garm-provider-common/params"
5+
46
dbCommon "github.com/cloudbase/garm/database/common"
57
"github.com/cloudbase/garm/params"
68
)
@@ -281,3 +283,24 @@ func WithEntityTypeAndCallbackFilter(entityType dbCommon.DatabaseEntityType, cal
281283
return ok
282284
}
283285
}
286+
287+
func WithInstanceStatusFilter(statuses ...commonParams.InstanceStatus) dbCommon.PayloadFilterFunc {
288+
return func(payload dbCommon.ChangePayload) bool {
289+
if payload.EntityType != dbCommon.InstanceEntityType {
290+
return false
291+
}
292+
instance, ok := payload.Payload.(params.Instance)
293+
if !ok {
294+
return false
295+
}
296+
if len(statuses) == 0 {
297+
return false
298+
}
299+
for _, status := range statuses {
300+
if instance.Status == status {
301+
return true
302+
}
303+
}
304+
return false
305+
}
306+
}

params/github.go

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -419,18 +419,18 @@ func (r RunnerScaleSetMessage) GetJobsFromBody() ([]ScaleSetJobMessage, error) {
419419
}
420420

421421
type RunnerReference struct {
422-
ID int `json:"id"`
423-
Name string `json:"name"`
424-
RunnerScaleSetID int `json:"runnerScaleSetId"`
425-
CreatedOn time.Time `json:"createdOn"`
426-
RunnerGroupID uint64 `json:"runnerGroupId"`
427-
RunnerGroupName string `json:"runnerGroupName"`
428-
Version string `json:"version"`
429-
Enabled bool `json:"enabled"`
430-
Ephemeral bool `json:"ephemeral"`
431-
Status RunnerStatus `json:"status"`
432-
DisableUpdate bool `json:"disableUpdate"`
433-
ProvisioningState string `json:"provisioningState"`
422+
ID int64 `json:"id"`
423+
Name string `json:"name"`
424+
RunnerScaleSetID int `json:"runnerScaleSetId"`
425+
CreatedOn interface{} `json:"createdOn"`
426+
RunnerGroupID uint64 `json:"runnerGroupId"`
427+
RunnerGroupName string `json:"runnerGroupName"`
428+
Version string `json:"version"`
429+
Enabled bool `json:"enabled"`
430+
Ephemeral bool `json:"ephemeral"`
431+
Status interface{} `json:"status"`
432+
DisableUpdate bool `json:"disableUpdate"`
433+
ProvisioningState string `json:"provisioningState"`
434434
}
435435

436436
type RunnerScaleSetJitRunnerConfig struct {

util/github/scalesets/runners.go

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ type scaleSetJitRunnerConfig struct {
3030
WorkFolder string `json:"workFolder"`
3131
}
3232

33-
func (s *ScaleSetClient) GenerateJitRunnerConfig(ctx context.Context, runnerName string, scaleSet params.RunnerScaleSet) (params.RunnerScaleSetJitRunnerConfig, error) {
33+
func (s *ScaleSetClient) GenerateJitRunnerConfig(ctx context.Context, runnerName string, scaleSetID int) (params.RunnerScaleSetJitRunnerConfig, error) {
3434
runnerSettings := scaleSetJitRunnerConfig{
3535
Name: runnerName,
3636
WorkFolder: "_work",
@@ -41,7 +41,14 @@ func (s *ScaleSetClient) GenerateJitRunnerConfig(ctx context.Context, runnerName
4141
return params.RunnerScaleSetJitRunnerConfig{}, err
4242
}
4343

44-
req, err := s.newActionsRequest(ctx, http.MethodPost, scaleSet.RunnerJitConfigURL, bytes.NewBuffer(body))
44+
serviceUrl, err := s.actionsServiceInfo.GetURL()
45+
if err != nil {
46+
return params.RunnerScaleSetJitRunnerConfig{}, fmt.Errorf("failed to get pipeline URL: %w", err)
47+
}
48+
jitConfigPath := fmt.Sprintf("/%s/%d/generatejitconfig", scaleSetEndpoint, scaleSetID)
49+
jitConfigURL := serviceUrl.JoinPath(jitConfigPath)
50+
51+
req, err := s.newActionsRequest(ctx, http.MethodPost, jitConfigURL.String(), bytes.NewBuffer(body))
4552
if err != nil {
4653
return params.RunnerScaleSetJitRunnerConfig{}, fmt.Errorf("failed to create request: %w", err)
4754
}
@@ -81,6 +88,26 @@ func (s *ScaleSetClient) GetRunner(ctx context.Context, runnerID int64) (params.
8188
return runnerReference, nil
8289
}
8390

91+
func (s *ScaleSetClient) ListAllRunners(ctx context.Context) (params.RunnerReferenceList, error) {
92+
req, err := s.newActionsRequest(ctx, http.MethodGet, runnerEndpoint, nil)
93+
if err != nil {
94+
return params.RunnerReferenceList{}, fmt.Errorf("failed to construct request: %w", err)
95+
}
96+
97+
resp, err := s.Do(req)
98+
if err != nil {
99+
return params.RunnerReferenceList{}, fmt.Errorf("request failed for %s: %w", req.URL.String(), err)
100+
}
101+
defer resp.Body.Close()
102+
103+
var runnerList params.RunnerReferenceList
104+
if err := json.NewDecoder(resp.Body).Decode(&runnerList); err != nil {
105+
return params.RunnerReferenceList{}, fmt.Errorf("failed to decode response: %w", err)
106+
}
107+
108+
return runnerList, nil
109+
}
110+
84111
func (s *ScaleSetClient) GetRunnerByName(ctx context.Context, runnerName string) (params.RunnerReference, error) {
85112
path := fmt.Sprintf("%s?agentName=%s", runnerEndpoint, runnerName)
86113

util/github/scalesets/util.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"context"
1919
"fmt"
2020
"io"
21+
"log/slog"
2122
"net/http"
2223
)
2324

@@ -50,5 +51,7 @@ func (s *ScaleSetClient) newActionsRequest(ctx context.Context, method, path str
5051
req.Header.Set("Content-Type", "application/json")
5152
req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", s.actionsServiceInfo.Token))
5253

54+
slog.DebugContext(ctx, "newActionsRequest", "method", method, "url", uri.String(), "body", body, "headers", req.Header)
55+
5356
return req, nil
5457
}

workers/entity/controller.go

Lines changed: 1 addition & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -7,31 +7,19 @@ import (
77
"sync"
88

99
"github.com/cloudbase/garm/auth"
10-
"github.com/cloudbase/garm/config"
1110
dbCommon "github.com/cloudbase/garm/database/common"
1211
"github.com/cloudbase/garm/database/watcher"
1312
"github.com/cloudbase/garm/runner/common"
14-
"github.com/cloudbase/garm/runner/providers"
1513
garmUtil "github.com/cloudbase/garm/util"
1614
)
1715

18-
func NewController(ctx context.Context, store dbCommon.Store, cfg config.Config) (*Controller, error) {
16+
func NewController(ctx context.Context, store dbCommon.Store, providers map[string]common.Provider) (*Controller, error) {
1917
consumerID := "entity-controller"
20-
ctrlID, err := store.ControllerInfo()
21-
if err != nil {
22-
return nil, fmt.Errorf("getting controller info: %w", err)
23-
}
24-
2518
ctx = garmUtil.WithSlogContext(
2619
ctx,
2720
slog.Any("worker", consumerID))
2821
ctx = auth.GetAdminContext(ctx)
2922

30-
providers, err := providers.LoadProvidersFromConfig(ctx, cfg, ctrlID.ControllerID.String())
31-
if err != nil {
32-
return nil, fmt.Errorf("loading providers: %w", err)
33-
}
34-
3523
return &Controller{
3624
consumerID: consumerID,
3725
ctx: ctx,

workers/provider/provider.go

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
package provider
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"sync"
7+
8+
dbCommon "github.com/cloudbase/garm/database/common"
9+
"github.com/cloudbase/garm/database/watcher"
10+
"github.com/cloudbase/garm/runner/common"
11+
)
12+
13+
func NewWorker(ctx context.Context, store dbCommon.Store, providers map[string]common.Provider) (*provider, error) {
14+
consumerID := "provider-worker"
15+
return &provider{
16+
ctx: context.Background(),
17+
store: store,
18+
consumerID: consumerID,
19+
providers: providers,
20+
}, nil
21+
}
22+
23+
type provider struct {
24+
ctx context.Context
25+
consumerID string
26+
27+
consumer dbCommon.Consumer
28+
// TODO: not all workers should have access to the store.
29+
// We need to implement way to RPC from workers to controllers
30+
// and abstract that into something we can use to eventually
31+
// scale out.
32+
store dbCommon.Store
33+
34+
providers map[string]common.Provider
35+
36+
mux sync.Mutex
37+
running bool
38+
quit chan struct{}
39+
}
40+
41+
func (p *provider) Start() error {
42+
p.mux.Lock()
43+
defer p.mux.Unlock()
44+
45+
if p.running {
46+
return nil
47+
}
48+
49+
consumer, err := watcher.RegisterConsumer(
50+
p.ctx, p.consumerID, composeProviderWatcher())
51+
if err != nil {
52+
return fmt.Errorf("registering consumer: %w", err)
53+
}
54+
p.consumer = consumer
55+
56+
p.quit = make(chan struct{})
57+
p.running = true
58+
return nil
59+
}
60+
61+
func (p *provider) Stop() error {
62+
p.mux.Lock()
63+
defer p.mux.Unlock()
64+
65+
if !p.running {
66+
return nil
67+
}
68+
69+
p.consumer.Close()
70+
close(p.quit)
71+
p.running = false
72+
return nil
73+
}

workers/provider/util.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package provider
2+
3+
import (
4+
commonParams "github.com/cloudbase/garm-provider-common/params"
5+
6+
dbCommon "github.com/cloudbase/garm/database/common"
7+
"github.com/cloudbase/garm/database/watcher"
8+
)
9+
10+
func composeProviderWatcher() dbCommon.PayloadFilterFunc {
11+
return watcher.WithAny(
12+
watcher.WithInstanceStatusFilter(
13+
commonParams.InstancePendingCreate,
14+
commonParams.InstancePendingDelete,
15+
commonParams.InstancePendingForceDelete,
16+
),
17+
)
18+
}

0 commit comments

Comments
 (0)