Skip to content

V2 track dependencies #4210

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 6 commits into
base: v2
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions cmd/flipt/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,14 +66,14 @@
return err
}

var opts []containers.Option[fs.SnapshotOption]
var opts []containers.Option[fs.SnapshotBuilderOption]
if v.extraPath != "" {
schema, err := os.ReadFile(v.extraPath)
if err != nil {
return err
}

opts = append(opts, fs.WithValidatorOption(

Check failure on line 76 in cmd/flipt/validate.go

View workflow job for this annotation

GitHub Actions / Lint Go

ineffectual assignment to opts (ineffassign)
validation.WithSchemaExtension(schema),
))
}
Expand All @@ -82,6 +82,7 @@
return errors.New("non-empty working directory expected")
}

builder := fs.NewSnapshotBuilder(logger)
ofs := os.DirFS(v.workDirectory)
if len(args) == 0 {
var config *fs.Config
Expand All @@ -90,9 +91,9 @@
return err
}

_, err = fs.SnapshotFromFS(logger, config, ofs, opts...)
_, err = builder.SnapshotFromFS(config, ofs)
} else {
_, err = fs.SnapshotFromPaths(logger, ofs, args, opts...)
_, err = builder.SnapshotFromPaths(ofs, args)
}

errs, ok := validation.Unwrap(err)
Expand Down
10 changes: 10 additions & 0 deletions internal/server/environments/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,16 @@ import (
"go.uber.org/zap"
)

var (
FlagResourceType = NewResourceType("flipt.core", "Flag")
SegmentResourceType = NewResourceType("flipt.core", "Segment")
)

type TypedResource struct {
ResourceType ResourceType
*environments.Resource
}

type ResourceType struct {
Package string
Name string
Expand Down
8 changes: 6 additions & 2 deletions internal/storage/environments/environments.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import (
"go.flipt.io/flipt/internal/storage/environments/fs"
configcoreflipt "go.flipt.io/flipt/internal/storage/environments/fs/flipt"
environmentsgit "go.flipt.io/flipt/internal/storage/environments/git"
"go.flipt.io/flipt/internal/storage/environments/graph"
storagefs "go.flipt.io/flipt/internal/storage/fs"
storagegit "go.flipt.io/flipt/internal/storage/git"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -85,10 +87,11 @@ func (s *sourceBuilder) forEnvironment(
return nil, fmt.Errorf("missing storage for name %q", envConf.Storage)
}

dependencyGraph := graph.NewResourceGraph()
fileStorage := fs.NewStorage(
logger,
configcoreflipt.NewFlagStorage(logger),
configcoreflipt.NewSegmentStorage(logger),
configcoreflipt.NewFlagStorage(logger, dependencyGraph),
configcoreflipt.NewSegmentStorage(logger, dependencyGraph),
)

repo, err := s.getOrCreateGitRepo(ctx, envConf, storage, credentials)
Expand All @@ -103,6 +106,7 @@ func (s *sourceBuilder) forEnvironment(
envConf,
repo,
fileStorage,
storagefs.NewSnapshotBuilder(logger, storagefs.WithDependencyGraph(dependencyGraph)),
evaluation.NewSnapshotPublisher(logger),
)
if err != nil {
Expand Down
34 changes: 29 additions & 5 deletions internal/storage/environments/fs/flipt/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,14 @@ import (
"io"
"os"
"path"
"slices"

"go.flipt.io/flipt/core/validation"
"go.flipt.io/flipt/errors"
"go.flipt.io/flipt/internal/ext"
serverenvironments "go.flipt.io/flipt/internal/server/environments"
environmentsfs "go.flipt.io/flipt/internal/storage/environments/fs"
"go.flipt.io/flipt/internal/storage/environments/graph"
"go.flipt.io/flipt/rpc/flipt/core"
rpcenvironments "go.flipt.io/flipt/rpc/v2/environments"
"go.uber.org/zap"
Expand All @@ -27,17 +29,18 @@ var _ environmentsfs.ResourceStorage = (*FlagStorage)(nil)
// and handles retrieving and storing Flag types from Flipt features.yaml
// declarative format through an opinionated convention for flag state layout
type FlagStorage struct {
logger *zap.Logger
logger *zap.Logger
dependencyGraph *graph.ResourceGraph
}

// NewFlagStorage constructs and configures a new flag storage implementation
func NewFlagStorage(logger *zap.Logger) *FlagStorage {
return &FlagStorage{logger: logger}
func NewFlagStorage(logger *zap.Logger, dependencyGraph *graph.ResourceGraph) *FlagStorage {
return &FlagStorage{logger: logger, dependencyGraph: dependencyGraph}
}

// ResourceType returns the Flag specific resource type
func (FlagStorage) ResourceType() serverenvironments.ResourceType {
return serverenvironments.NewResourceType("flipt.core", "Flag")
return serverenvironments.FlagResourceType
}

// GetResource fetches the requested flag from the namespaces features.yaml file
Expand Down Expand Up @@ -168,6 +171,24 @@ func (f *FlagStorage) DeleteResource(ctx context.Context, fs environmentsfs.File
}
}()

flag := serverenvironments.TypedResource{
ResourceType: serverenvironments.FlagResourceType,
Resource: &rpcenvironments.Resource{
NamespaceKey: namespace,
Key: key,
},
}

// check for any dependents of the flag
// this should not be possible yet as flags are not a dependency of anything but we'll add this for completeness
dependents := f.dependencyGraph.GetDependents(flag)

if len(dependents) > 0 {
// just get the first one for now
dependent := dependents[0]
return errors.ErrConflictf("flag cannot be deleted as it is a dependency of %s", dependent)
}

docs, err := parseNamespace(ctx, fs, namespace)
if err != nil {
return err
Expand All @@ -187,7 +208,7 @@ func (f *FlagStorage) DeleteResource(ctx context.Context, fs environmentsfs.File
if f.Key == key {
found = true
// remove entry from list
doc.Flags = append(doc.Flags[:i], doc.Flags[i+1:]...)
doc.Flags = slices.Delete(doc.Flags, i, i+1)
}
}
}
Expand All @@ -210,6 +231,7 @@ func (f *FlagStorage) DeleteResource(ctx context.Context, fs environmentsfs.File
}
}

f.dependencyGraph.RemoveResource(flag)
return enc.Close()
}

Expand Down Expand Up @@ -332,6 +354,7 @@ func payloadFromFlag(flag *ext.Flag) (_ *anypb.Any, err error) {

func resourceToFlag(r *rpcenvironments.Resource) (*ext.Flag, error) {
var f core.Flag

if err := r.Payload.UnmarshalTo(&f); err != nil {
return nil, err
}
Expand Down Expand Up @@ -390,6 +413,7 @@ func resourceToFlag(r *rpcenvironments.Resource) (*ext.Flag, error) {
Operator: rollout.GetSegment().SegmentOperator.String(),
Value: rollout.GetSegment().Value,
}

case core.RolloutType_THRESHOLD_ROLLOUT_TYPE:
r.Threshold = &ext.ThresholdRule{
Percentage: rollout.GetThreshold().Percentage,
Expand Down
37 changes: 25 additions & 12 deletions internal/storage/environments/fs/flipt/flags_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
fstesting "go.flipt.io/flipt/internal/storage/environments/fs/testing"
"go.flipt.io/flipt/internal/storage/environments/graph"
"go.flipt.io/flipt/rpc/flipt/core"
rpcenvironments "go.flipt.io/flipt/rpc/v2/environments"
"go.uber.org/zap/zaptest"
Expand Down Expand Up @@ -82,9 +83,12 @@ flags:
)

func TestFlagStorage_GetResource(t *testing.T) {
ctx := context.TODO()
logger := zaptest.NewLogger(t)
storage := NewFlagStorage(logger)
var (
ctx = context.TODO()
logger = zaptest.NewLogger(t)
dependencyGraph = graph.NewResourceGraph()
storage = NewFlagStorage(logger, dependencyGraph)
)

fs := fstesting.NewFilesystem(
t,
Expand Down Expand Up @@ -149,9 +153,12 @@ func TestFlagStorage_GetResource(t *testing.T) {
}

func TestFlagStorage_ListResources(t *testing.T) {
ctx := context.TODO()
logger := zaptest.NewLogger(t)
storage := NewFlagStorage(logger)
var (
ctx = context.TODO()
logger = zaptest.NewLogger(t)
dependencyGraph = graph.NewResourceGraph()
storage = NewFlagStorage(logger, dependencyGraph)
)

fs := fstesting.NewFilesystem(
t,
Expand Down Expand Up @@ -210,9 +217,12 @@ func TestFlagStorage_ListResources(t *testing.T) {
}

func TestFlagStorage_PutResource(t *testing.T) {
ctx := context.TODO()
logger := zaptest.NewLogger(t)
storage := NewFlagStorage(logger)
var (
ctx = context.TODO()
logger = zaptest.NewLogger(t)
dependencyGraph = graph.NewResourceGraph()
storage = NewFlagStorage(logger, dependencyGraph)
)

t.Run("create new flag", func(t *testing.T) {
fs := fstesting.NewFilesystem(
Expand Down Expand Up @@ -304,9 +314,12 @@ func TestFlagStorage_PutResource(t *testing.T) {
}

func TestFlagStorage_DeleteResource(t *testing.T) {
ctx := context.TODO()
logger := zaptest.NewLogger(t)
storage := NewFlagStorage(logger)
var (
ctx = context.TODO()
logger = zaptest.NewLogger(t)
dependencyGraph = graph.NewResourceGraph()
storage = NewFlagStorage(logger, dependencyGraph)
)

tests := []struct {
name string
Expand Down
32 changes: 27 additions & 5 deletions internal/storage/environments/fs/flipt/segments.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,13 @@ import (
"os"
"path"

"slices"

"go.flipt.io/flipt/errors"
"go.flipt.io/flipt/internal/ext"
serverenvironments "go.flipt.io/flipt/internal/server/environments"
environmentsfs "go.flipt.io/flipt/internal/storage/environments/fs"
"go.flipt.io/flipt/internal/storage/environments/graph"
"go.flipt.io/flipt/rpc/flipt/core"
rpcenvironments "go.flipt.io/flipt/rpc/v2/environments"
"go.uber.org/zap"
Expand All @@ -22,17 +25,18 @@ var _ environmentsfs.ResourceStorage = (*SegmentStorage)(nil)
// and handles retrieving and storing Segment types from Flipt features.yaml
// declarative format through an opinionated convention for flag state layout
type SegmentStorage struct {
logger *zap.Logger
logger *zap.Logger
dependencyGraph *graph.ResourceGraph
}

// NewSegmentStorage constructs and configures a new segment storage implementation
func NewSegmentStorage(logger *zap.Logger) *SegmentStorage {
return &SegmentStorage{logger: logger}
func NewSegmentStorage(logger *zap.Logger, dependencyGraph *graph.ResourceGraph) *SegmentStorage {
return &SegmentStorage{logger: logger, dependencyGraph: dependencyGraph}
}

// ResourceType returns the Flag specific resource type
func (SegmentStorage) ResourceType() serverenvironments.ResourceType {
return serverenvironments.NewResourceType("flipt.core", "Segment")
return serverenvironments.SegmentResourceType
}

// GetResource fetches the requested segment from the namespaces features.yaml file
Expand Down Expand Up @@ -161,6 +165,23 @@ func (f *SegmentStorage) DeleteResource(ctx context.Context, fs environmentsfs.F
}
}()

segment := serverenvironments.TypedResource{
ResourceType: serverenvironments.SegmentResourceType,
Resource: &rpcenvironments.Resource{
NamespaceKey: namespace,
Key: key,
},
}

// check for any dependents of the segment
dependents := f.dependencyGraph.GetDependents(segment)

if len(dependents) > 0 {
// just get the first one for now
dependent := dependents[0]
return errors.ErrConflictf("segment cannot be deleted as it is a dependency of %s", dependent)
}

docs, err := parseNamespace(ctx, fs, namespace)
if err != nil {
return err
Expand All @@ -180,7 +201,7 @@ func (f *SegmentStorage) DeleteResource(ctx context.Context, fs environmentsfs.F
if s.Key == key {
found = true
// remove entry from list
doc.Segments = append(doc.Segments[:i], doc.Segments[i+1:]...)
doc.Segments = slices.Delete(doc.Segments, i, i+1)
}
}
}
Expand All @@ -203,6 +224,7 @@ func (f *SegmentStorage) DeleteResource(ctx context.Context, fs environmentsfs.F
}
}

f.dependencyGraph.RemoveResource(segment)
return enc.Close()
}

Expand Down
Loading
Loading