diff --git a/cmd/flipt/validate.go b/cmd/flipt/validate.go index f03d71e7a6..1af45094de 100644 --- a/cmd/flipt/validate.go +++ b/cmd/flipt/validate.go @@ -66,7 +66,7 @@ func (v *validateCommand) run(cmd *cobra.Command, args []string) error { return err } - var opts []containers.Option[fs.SnapshotOption] + var opts []containers.Option[fs.SnapshotBuilderOption] if v.extraPath != "" { schema, err := os.ReadFile(v.extraPath) if err != nil { @@ -82,6 +82,7 @@ func (v *validateCommand) run(cmd *cobra.Command, args []string) error { return errors.New("non-empty working directory expected") } + builder := fs.NewSnapshotBuilder(logger) ofs := os.DirFS(v.workDirectory) if len(args) == 0 { var config *fs.Config @@ -90,9 +91,9 @@ func (v *validateCommand) run(cmd *cobra.Command, args []string) error { return err } - _, err = fs.SnapshotFromFS(logger, config, ofs, opts...) + _, err = builder.SnapshotFromFS(config, ofs) } else { - _, err = fs.SnapshotFromPaths(logger, ofs, args, opts...) + _, err = builder.SnapshotFromPaths(ofs, args) } errs, ok := validation.Unwrap(err) diff --git a/internal/server/environments/storage.go b/internal/server/environments/storage.go index fa4134bc68..f1b456cee8 100644 --- a/internal/server/environments/storage.go +++ b/internal/server/environments/storage.go @@ -16,6 +16,16 @@ import ( "go.uber.org/zap" ) +var ( + FlagResourceType = NewResourceType("flipt.core", "Flag") + SegmentResourceType = NewResourceType("flipt.core", "Segment") +) + +type TypedResource struct { + ResourceType ResourceType + *environments.Resource +} + type ResourceType struct { Package string Name string diff --git a/internal/storage/environments/environments.go b/internal/storage/environments/environments.go index 743f7744b8..34475ec7af 100644 --- a/internal/storage/environments/environments.go +++ b/internal/storage/environments/environments.go @@ -17,6 +17,8 @@ import ( "go.flipt.io/flipt/internal/storage/environments/fs" configcoreflipt "go.flipt.io/flipt/internal/storage/environments/fs/flipt" environmentsgit "go.flipt.io/flipt/internal/storage/environments/git" + "go.flipt.io/flipt/internal/storage/environments/graph" + storagefs "go.flipt.io/flipt/internal/storage/fs" storagegit "go.flipt.io/flipt/internal/storage/git" "go.uber.org/zap" ) @@ -85,10 +87,11 @@ func (s *sourceBuilder) forEnvironment( return nil, fmt.Errorf("missing storage for name %q", envConf.Storage) } + dependencyGraph := graph.NewResourceGraph() fileStorage := fs.NewStorage( logger, - configcoreflipt.NewFlagStorage(logger), - configcoreflipt.NewSegmentStorage(logger), + configcoreflipt.NewFlagStorage(logger, dependencyGraph), + configcoreflipt.NewSegmentStorage(logger, dependencyGraph), ) repo, err := s.getOrCreateGitRepo(ctx, envConf, storage, credentials) @@ -103,6 +106,7 @@ func (s *sourceBuilder) forEnvironment( envConf, repo, fileStorage, + storagefs.NewSnapshotBuilder(logger, storagefs.WithDependencyGraph(dependencyGraph)), evaluation.NewSnapshotPublisher(logger), ) if err != nil { diff --git a/internal/storage/environments/fs/flipt/flags.go b/internal/storage/environments/fs/flipt/flags.go index 59f774aa17..8a19c4fc48 100644 --- a/internal/storage/environments/fs/flipt/flags.go +++ b/internal/storage/environments/fs/flipt/flags.go @@ -7,12 +7,14 @@ import ( "io" "os" "path" + "slices" "go.flipt.io/flipt/core/validation" "go.flipt.io/flipt/errors" "go.flipt.io/flipt/internal/ext" serverenvironments "go.flipt.io/flipt/internal/server/environments" environmentsfs "go.flipt.io/flipt/internal/storage/environments/fs" + "go.flipt.io/flipt/internal/storage/environments/graph" "go.flipt.io/flipt/rpc/flipt/core" rpcenvironments "go.flipt.io/flipt/rpc/v2/environments" "go.uber.org/zap" @@ -27,17 +29,18 @@ var _ environmentsfs.ResourceStorage = (*FlagStorage)(nil) // and handles retrieving and storing Flag types from Flipt features.yaml // declarative format through an opinionated convention for flag state layout type FlagStorage struct { - logger *zap.Logger + logger *zap.Logger + dependencyGraph *graph.ResourceGraph } // NewFlagStorage constructs and configures a new flag storage implementation -func NewFlagStorage(logger *zap.Logger) *FlagStorage { - return &FlagStorage{logger: logger} +func NewFlagStorage(logger *zap.Logger, dependencyGraph *graph.ResourceGraph) *FlagStorage { + return &FlagStorage{logger: logger, dependencyGraph: dependencyGraph} } // ResourceType returns the Flag specific resource type func (FlagStorage) ResourceType() serverenvironments.ResourceType { - return serverenvironments.NewResourceType("flipt.core", "Flag") + return serverenvironments.FlagResourceType } // GetResource fetches the requested flag from the namespaces features.yaml file @@ -168,6 +171,24 @@ func (f *FlagStorage) DeleteResource(ctx context.Context, fs environmentsfs.File } }() + flag := serverenvironments.TypedResource{ + ResourceType: serverenvironments.FlagResourceType, + Resource: &rpcenvironments.Resource{ + NamespaceKey: namespace, + Key: key, + }, + } + + // check for any dependents of the flag + // this should not be possible yet as flags are not a dependency of anything but we'll add this for completeness + dependents := f.dependencyGraph.GetDependents(flag) + + if len(dependents) > 0 { + // just get the first one for now + dependent := dependents[0] + return errors.ErrConflictf("flag cannot be deleted as it is a dependency of %s", dependent) + } + docs, err := parseNamespace(ctx, fs, namespace) if err != nil { return err @@ -187,7 +208,7 @@ func (f *FlagStorage) DeleteResource(ctx context.Context, fs environmentsfs.File if f.Key == key { found = true // remove entry from list - doc.Flags = append(doc.Flags[:i], doc.Flags[i+1:]...) + doc.Flags = slices.Delete(doc.Flags, i, i+1) } } } @@ -210,6 +231,7 @@ func (f *FlagStorage) DeleteResource(ctx context.Context, fs environmentsfs.File } } + f.dependencyGraph.RemoveResource(flag) return enc.Close() } @@ -332,6 +354,7 @@ func payloadFromFlag(flag *ext.Flag) (_ *anypb.Any, err error) { func resourceToFlag(r *rpcenvironments.Resource) (*ext.Flag, error) { var f core.Flag + if err := r.Payload.UnmarshalTo(&f); err != nil { return nil, err } @@ -390,6 +413,7 @@ func resourceToFlag(r *rpcenvironments.Resource) (*ext.Flag, error) { Operator: rollout.GetSegment().SegmentOperator.String(), Value: rollout.GetSegment().Value, } + case core.RolloutType_THRESHOLD_ROLLOUT_TYPE: r.Threshold = &ext.ThresholdRule{ Percentage: rollout.GetThreshold().Percentage, diff --git a/internal/storage/environments/fs/flipt/flags_test.go b/internal/storage/environments/fs/flipt/flags_test.go index 35cc6bc2dd..84b8d2e5e0 100644 --- a/internal/storage/environments/fs/flipt/flags_test.go +++ b/internal/storage/environments/fs/flipt/flags_test.go @@ -7,6 +7,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" fstesting "go.flipt.io/flipt/internal/storage/environments/fs/testing" + "go.flipt.io/flipt/internal/storage/environments/graph" "go.flipt.io/flipt/rpc/flipt/core" rpcenvironments "go.flipt.io/flipt/rpc/v2/environments" "go.uber.org/zap/zaptest" @@ -82,9 +83,12 @@ flags: ) func TestFlagStorage_GetResource(t *testing.T) { - ctx := context.TODO() - logger := zaptest.NewLogger(t) - storage := NewFlagStorage(logger) + var ( + ctx = context.TODO() + logger = zaptest.NewLogger(t) + dependencyGraph = graph.NewResourceGraph() + storage = NewFlagStorage(logger, dependencyGraph) + ) fs := fstesting.NewFilesystem( t, @@ -149,9 +153,12 @@ func TestFlagStorage_GetResource(t *testing.T) { } func TestFlagStorage_ListResources(t *testing.T) { - ctx := context.TODO() - logger := zaptest.NewLogger(t) - storage := NewFlagStorage(logger) + var ( + ctx = context.TODO() + logger = zaptest.NewLogger(t) + dependencyGraph = graph.NewResourceGraph() + storage = NewFlagStorage(logger, dependencyGraph) + ) fs := fstesting.NewFilesystem( t, @@ -210,9 +217,12 @@ func TestFlagStorage_ListResources(t *testing.T) { } func TestFlagStorage_PutResource(t *testing.T) { - ctx := context.TODO() - logger := zaptest.NewLogger(t) - storage := NewFlagStorage(logger) + var ( + ctx = context.TODO() + logger = zaptest.NewLogger(t) + dependencyGraph = graph.NewResourceGraph() + storage = NewFlagStorage(logger, dependencyGraph) + ) t.Run("create new flag", func(t *testing.T) { fs := fstesting.NewFilesystem( @@ -304,9 +314,12 @@ func TestFlagStorage_PutResource(t *testing.T) { } func TestFlagStorage_DeleteResource(t *testing.T) { - ctx := context.TODO() - logger := zaptest.NewLogger(t) - storage := NewFlagStorage(logger) + var ( + ctx = context.TODO() + logger = zaptest.NewLogger(t) + dependencyGraph = graph.NewResourceGraph() + storage = NewFlagStorage(logger, dependencyGraph) + ) tests := []struct { name string diff --git a/internal/storage/environments/fs/flipt/segments.go b/internal/storage/environments/fs/flipt/segments.go index 3b3eddcbad..651c19debf 100644 --- a/internal/storage/environments/fs/flipt/segments.go +++ b/internal/storage/environments/fs/flipt/segments.go @@ -6,10 +6,13 @@ import ( "os" "path" + "slices" + "go.flipt.io/flipt/errors" "go.flipt.io/flipt/internal/ext" serverenvironments "go.flipt.io/flipt/internal/server/environments" environmentsfs "go.flipt.io/flipt/internal/storage/environments/fs" + "go.flipt.io/flipt/internal/storage/environments/graph" "go.flipt.io/flipt/rpc/flipt/core" rpcenvironments "go.flipt.io/flipt/rpc/v2/environments" "go.uber.org/zap" @@ -22,17 +25,18 @@ var _ environmentsfs.ResourceStorage = (*SegmentStorage)(nil) // and handles retrieving and storing Segment types from Flipt features.yaml // declarative format through an opinionated convention for flag state layout type SegmentStorage struct { - logger *zap.Logger + logger *zap.Logger + dependencyGraph *graph.ResourceGraph } // NewSegmentStorage constructs and configures a new segment storage implementation -func NewSegmentStorage(logger *zap.Logger) *SegmentStorage { - return &SegmentStorage{logger: logger} +func NewSegmentStorage(logger *zap.Logger, dependencyGraph *graph.ResourceGraph) *SegmentStorage { + return &SegmentStorage{logger: logger, dependencyGraph: dependencyGraph} } // ResourceType returns the Flag specific resource type func (SegmentStorage) ResourceType() serverenvironments.ResourceType { - return serverenvironments.NewResourceType("flipt.core", "Segment") + return serverenvironments.SegmentResourceType } // GetResource fetches the requested segment from the namespaces features.yaml file @@ -161,6 +165,23 @@ func (f *SegmentStorage) DeleteResource(ctx context.Context, fs environmentsfs.F } }() + segment := serverenvironments.TypedResource{ + ResourceType: serverenvironments.SegmentResourceType, + Resource: &rpcenvironments.Resource{ + NamespaceKey: namespace, + Key: key, + }, + } + + // check for any dependents of the segment + dependents := f.dependencyGraph.GetDependents(segment) + + if len(dependents) > 0 { + // just get the first one for now + dependent := dependents[0] + return errors.ErrConflictf("segment cannot be deleted as it is a dependency of %s", dependent) + } + docs, err := parseNamespace(ctx, fs, namespace) if err != nil { return err @@ -180,7 +201,7 @@ func (f *SegmentStorage) DeleteResource(ctx context.Context, fs environmentsfs.F if s.Key == key { found = true // remove entry from list - doc.Segments = append(doc.Segments[:i], doc.Segments[i+1:]...) + doc.Segments = slices.Delete(doc.Segments, i, i+1) } } } @@ -203,6 +224,7 @@ func (f *SegmentStorage) DeleteResource(ctx context.Context, fs environmentsfs.F } } + f.dependencyGraph.RemoveResource(segment) return enc.Close() } diff --git a/internal/storage/environments/fs/flipt/segments_test.go b/internal/storage/environments/fs/flipt/segments_test.go index 751f4888c9..6114d6b159 100644 --- a/internal/storage/environments/fs/flipt/segments_test.go +++ b/internal/storage/environments/fs/flipt/segments_test.go @@ -6,7 +6,9 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.flipt.io/flipt/internal/server/environments" fstesting "go.flipt.io/flipt/internal/storage/environments/fs/testing" + "go.flipt.io/flipt/internal/storage/environments/graph" "go.flipt.io/flipt/rpc/flipt/core" rpcenvironments "go.flipt.io/flipt/rpc/v2/environments" "go.uber.org/zap/zaptest" @@ -51,12 +53,54 @@ segments: name: Segment 2 match_type: ANY ` + + dependentSegmentContents = `version: "1.5" +namespace: + key: default + name: Default + description: The default namespace +flags: + - key: flag1 + name: Flag 1 + type: BOOLEAN_FLAG_TYPE + description: A test flag + enabled: true + metadata: + team: backend + variants: + - key: variant1 + name: Variant 1 + description: A test variant + default: true + attachment: + color: blue +rules: + - segment: + keys: [segment1] + operator: AND + distributions: + - rollout: 100 + variant_key: variant1 +rollouts: + - description: A test rollout + segment: + keys: [segment1] + operator: AND + value: true +segments: + - key: segment1 + name: Segment 1 + match_type: ALL +` ) func TestSegmentStorage_GetResource(t *testing.T) { - ctx := context.TODO() - logger := zaptest.NewLogger(t) - storage := NewSegmentStorage(logger) + var ( + ctx = context.TODO() + logger = zaptest.NewLogger(t) + dependencyGraph = graph.NewResourceGraph() + storage = NewSegmentStorage(logger, dependencyGraph) + ) fs := fstesting.NewFilesystem( t, @@ -121,9 +165,12 @@ func TestSegmentStorage_GetResource(t *testing.T) { } func TestSegmentStorage_ListResources(t *testing.T) { - ctx := context.TODO() - logger := zaptest.NewLogger(t) - storage := NewSegmentStorage(logger) + var ( + ctx = context.TODO() + logger = zaptest.NewLogger(t) + dependencyGraph = graph.NewResourceGraph() + storage = NewSegmentStorage(logger, dependencyGraph) + ) fs := fstesting.NewFilesystem( t, @@ -182,9 +229,12 @@ func TestSegmentStorage_ListResources(t *testing.T) { } func TestSegmentStorage_PutResource(t *testing.T) { - ctx := context.TODO() - logger := zaptest.NewLogger(t) - storage := NewSegmentStorage(logger) + var ( + ctx = context.TODO() + logger = zaptest.NewLogger(t) + dependencyGraph = graph.NewResourceGraph() + storage = NewSegmentStorage(logger, dependencyGraph) + ) t.Run("create new segment", func(t *testing.T) { fs := fstesting.NewFilesystem( @@ -267,9 +317,12 @@ func TestSegmentStorage_PutResource(t *testing.T) { } func TestSegmentStorage_DeleteResource(t *testing.T) { - ctx := context.TODO() - logger := zaptest.NewLogger(t) - storage := NewSegmentStorage(logger) + var ( + ctx = context.TODO() + logger = zaptest.NewLogger(t) + dependencyGraph = graph.NewResourceGraph() + storage = NewSegmentStorage(logger, dependencyGraph) + ) tests := []struct { name string @@ -318,3 +371,40 @@ func TestSegmentStorage_DeleteResource(t *testing.T) { }) } } + +func TestSegmentStorage_DeleteResource_Dependent(t *testing.T) { + var ( + ctx = context.TODO() + logger = zaptest.NewLogger(t) + dependencyGraph = graph.NewResourceGraph() + + storage = NewSegmentStorage(logger, dependencyGraph) + ) + + // manually add a dependency between a flag and a segment for the test + dependencyGraph.AddDependency(environments.TypedResource{ + ResourceType: environments.FlagResourceType, + Resource: &rpcenvironments.Resource{ + NamespaceKey: "default", + Key: "flag1", + }, + }, environments.TypedResource{ + ResourceType: environments.SegmentResourceType, + Resource: &rpcenvironments.Resource{ + NamespaceKey: "default", + Key: "segment1", + }, + }) + + fs := fstesting.NewFilesystem( + t, + fstesting.WithDirectory( + "default", + fstesting.WithFile("features.yaml", dependentSegmentContents), + ), + ) + + err := storage.DeleteResource(ctx, fs, "default", "segment1") + require.Error(t, err) + assert.EqualError(t, err, "deleting segment default/segment1: segment cannot be deleted as it is a dependency of /default/flag1") // In reality this will have a actual type url +} diff --git a/internal/storage/environments/git/store.go b/internal/storage/environments/git/store.go index 69ea17d2d1..c6f08f971f 100644 --- a/internal/storage/environments/git/store.go +++ b/internal/storage/environments/git/store.go @@ -40,6 +40,7 @@ type Environment struct { head plumbing.Hash snap *storagefs.Snapshot + builder *storagefs.SnapshotBuilder publisher *evaluation.SnapshotPublisher } @@ -52,6 +53,7 @@ func NewEnvironmentFromRepo( cfg *config.EnvironmentConfig, repo *storagegit.Repository, storage environmentsfs.Storage, + builder *storagefs.SnapshotBuilder, publisher *evaluation.SnapshotPublisher, ) (_ *Environment, err error) { return &Environment{ @@ -61,6 +63,7 @@ func NewEnvironmentFromRepo( storage: storage, refs: map[string]string{}, snap: storagefs.EmptySnapshot(), + builder: builder, publisher: publisher, }, nil } @@ -376,7 +379,7 @@ func (e *Environment) buildSnapshot(ctx context.Context, hash plumbing.Hash) (sn return err } - snap, err = storagefs.SnapshotFromFS(e.logger, conf, iofs) + snap, err = e.builder.SnapshotFromFS(conf, iofs) return err }, storagegit.ViewWithHash(hash)) } diff --git a/internal/storage/environments/graph/dependency.go b/internal/storage/environments/graph/dependency.go new file mode 100644 index 0000000000..1437b21034 --- /dev/null +++ b/internal/storage/environments/graph/dependency.go @@ -0,0 +1,111 @@ +package graph + +import ( + "fmt" + "sync" + + "go.flipt.io/flipt/internal/server/environments" +) + +// ResourceGraph is an in-memory dependency graph for any environments.Resource node type. +// +// The graph is thread-safe and can be used concurrently by multiple goroutines. +type ResourceGraph struct { + // Map from a resource to the set of resources that depend on it + dependents map[string]map[string]struct{} + // Map from a resource to the set of resources it depends on + dependencies map[string]map[string]struct{} + mu sync.RWMutex +} + +// NewResourceGraph creates a new, empty dependency graph for environments.Resource. +func NewResourceGraph() *ResourceGraph { + return &ResourceGraph{ + dependents: make(map[string]map[string]struct{}), + dependencies: make(map[string]map[string]struct{}), + } +} + +// SetDependencies sets the dependencies for a resource, updating the graph accordingly. +// It removes all previous dependencies for the resource and sets the new ones. +func (g *ResourceGraph) SetDependencies(resource environments.TypedResource, dependencies []environments.TypedResource) { + g.mu.Lock() + defer g.mu.Unlock() + + resourceKey := fmt.Sprintf("%s/%s/%s", resource.ResourceType.String(), resource.NamespaceKey, resource.Key) + + // Remove old dependencies for this resource + for dep := range g.dependencies[resourceKey] { + delete(g.dependents[dep], resourceKey) + if len(g.dependents[dep]) == 0 { + delete(g.dependents, dep) + } + } + delete(g.dependencies, resourceKey) + + // Set new dependencies + g.dependencies[resourceKey] = make(map[string]struct{}) + for _, dep := range dependencies { + depKey := fmt.Sprintf("%s/%s/%s", dep.ResourceType.String(), dep.NamespaceKey, dep.Key) + g.dependencies[resourceKey][depKey] = struct{}{} + if g.dependents[depKey] == nil { + g.dependents[depKey] = make(map[string]struct{}) + } + g.dependents[depKey][resourceKey] = struct{}{} + } +} + +// AddDependency adds a dependency between two resources. +func (g *ResourceGraph) AddDependency(resource environments.TypedResource, dependency environments.TypedResource) { + g.mu.Lock() + defer g.mu.Unlock() + + resourceKey := fmt.Sprintf("%s/%s/%s", resource.ResourceType.String(), resource.NamespaceKey, resource.Key) + dependencyKey := fmt.Sprintf("%s/%s/%s", dependency.ResourceType.String(), dependency.NamespaceKey, dependency.Key) + + if g.dependencies[resourceKey] == nil { + g.dependencies[resourceKey] = make(map[string]struct{}) + } + if g.dependents[dependencyKey] == nil { + g.dependents[dependencyKey] = make(map[string]struct{}) + } + g.dependencies[resourceKey][dependencyKey] = struct{}{} + g.dependents[dependencyKey][resourceKey] = struct{}{} +} + +// RemoveResource removes a resource and all its dependency links from the graph. +func (g *ResourceGraph) RemoveResource(resource environments.TypedResource) { + g.mu.Lock() + defer g.mu.Unlock() + + resourceKey := fmt.Sprintf("%s/%s/%s", resource.ResourceType.String(), resource.NamespaceKey, resource.Key) + + for dep := range g.dependencies[resourceKey] { + delete(g.dependents[dep], resourceKey) + if len(g.dependents[dep]) == 0 { + delete(g.dependents, dep) + } + } + delete(g.dependencies, resourceKey) + for dep := range g.dependents[resourceKey] { + delete(g.dependencies[dep], resourceKey) + if len(g.dependencies[dep]) == 0 { + delete(g.dependencies, dep) + } + } + delete(g.dependents, resourceKey) +} + +// GetDependents returns all resources that depend on the given resource. +func (g *ResourceGraph) GetDependents(resource environments.TypedResource) []string { + g.mu.RLock() + defer g.mu.RUnlock() + + resourceKey := fmt.Sprintf("%s/%s/%s", resource.ResourceType.String(), resource.NamespaceKey, resource.Key) + + var out []string + for dep := range g.dependents[resourceKey] { + out = append(out, dep) + } + return out +} diff --git a/internal/storage/environments/graph/dependency_test.go b/internal/storage/environments/graph/dependency_test.go new file mode 100644 index 0000000000..5fd09ebb9c --- /dev/null +++ b/internal/storage/environments/graph/dependency_test.go @@ -0,0 +1,122 @@ +package graph + +import ( + "sync" + "testing" + + "go.flipt.io/flipt/internal/server/environments" + rpcenvironments "go.flipt.io/flipt/rpc/v2/environments" +) + +type resource struct { + NamespaceKey string + Key string +} + +func (r resource) GetNamespaceKey() string { + return r.NamespaceKey +} + +func (r resource) GetKey() string { + return r.Key +} + +func makeResource(typ environments.ResourceType, ns, key string) environments.TypedResource { + return environments.TypedResource{ + ResourceType: typ, + Resource: &rpcenvironments.Resource{ + NamespaceKey: ns, + Key: key, + }, + } +} + +func TestDependencyGraph_SetAndGetDependents(t *testing.T) { + g := NewResourceGraph() + + segA := makeResource(environments.SegmentResourceType, "ns1", "A") + segB := makeResource(environments.SegmentResourceType, "ns1", "B") + flag1 := makeResource(environments.FlagResourceType, "ns1", "flag1") + flag2 := makeResource(environments.FlagResourceType, "ns1", "flag2") + + // Set flag1 depends on segA + g.SetDependencies(flag1, []environments.TypedResource{segA}) + // Set flag2 depends on segA and segB + g.SetDependencies(flag2, []environments.TypedResource{segA, segB}) + + // segA should have flag1 and flag2 as dependents + depsA := g.GetDependents(segA) + if len(depsA) != 2 { + t.Errorf("expected 2 dependents for segA, got %d", len(depsA)) + } + // segB should have only flag2 as dependent + depsB := g.GetDependents(segB) + if len(depsB) != 1 || depsB[0] != "flipt.core.Flag/ns1/flag2" { + t.Errorf("expected flag2 as only dependent for segB") + } +} + +func TestDependencyGraph_RemoveResource(t *testing.T) { + g := NewResourceGraph() + segA := makeResource(environments.SegmentResourceType, "ns1", "A") + flag1 := makeResource(environments.FlagResourceType, "ns1", "flag1") + g.SetDependencies(flag1, []environments.TypedResource{segA}) + + // Remove flag1, segA should have no dependents + g.RemoveResource(flag1) + depsA := g.GetDependents(segA) + if len(depsA) != 0 { + t.Errorf("expected 0 dependents for segA after flag1 removed, got %d", len(depsA)) + } + + // Remove segA, should not panic + g.RemoveResource(segA) +} + +func TestDependencyGraph_SetDependencies_Overwrite(t *testing.T) { + g := NewResourceGraph() + segA := makeResource(environments.SegmentResourceType, "ns1", "A") + segB := makeResource(environments.SegmentResourceType, "ns1", "B") + flag1 := makeResource(environments.FlagResourceType, "ns1", "flag1") + + g.SetDependencies(flag1, []environments.TypedResource{segA}) + g.SetDependencies(flag1, []environments.TypedResource{segB}) + // should remove segA dependency + + depsA := g.GetDependents(segA) + if len(depsA) != 0 { + t.Errorf("expected 0 dependents for segA after overwrite, got %d", len(depsA)) + } + depsB := g.GetDependents(segB) + if len(depsB) != 1 || depsB[0] != "flipt.core.Flag/ns1/flag1" { + t.Errorf("expected flag1 as only dependent for segB after overwrite") + } +} + +func TestDependencyGraph_AddDependency(t *testing.T) { + g := NewResourceGraph() + segA := makeResource(environments.SegmentResourceType, "ns1", "A") + flag1 := makeResource(environments.FlagResourceType, "ns1", "flag1") + g.AddDependency(flag1, segA) + depsA := g.GetDependents(segA) + if len(depsA) != 1 || depsA[0] != "flipt.core.Flag/ns1/flag1" { + t.Errorf("expected flag1 as dependent for segA after AddDependency") + } +} + +func TestDependencyGraph_ThreadSafety(t *testing.T) { + g := NewResourceGraph() + segA := makeResource(environments.SegmentResourceType, "ns1", "A") + flag1 := makeResource(environments.FlagResourceType, "ns1", "flag1") + wg := sync.WaitGroup{} + for i := 0; i < 100; i++ { + wg.Add(1) + go func(i int) { + defer wg.Done() + g.SetDependencies(flag1, []environments.TypedResource{segA}) + _ = g.GetDependents(segA) + g.RemoveResource(flag1) + }(i) + } + wg.Wait() +} diff --git a/internal/storage/fs/snapshot.go b/internal/storage/fs/snapshot.go index d85dfb0cc4..f80c44f88c 100644 --- a/internal/storage/fs/snapshot.go +++ b/internal/storage/fs/snapshot.go @@ -19,9 +19,12 @@ import ( errs "go.flipt.io/flipt/errors" "go.flipt.io/flipt/internal/containers" "go.flipt.io/flipt/internal/ext" + "go.flipt.io/flipt/internal/server/environments" "go.flipt.io/flipt/internal/storage" + "go.flipt.io/flipt/internal/storage/environments/graph" "go.flipt.io/flipt/rpc/flipt" "go.flipt.io/flipt/rpc/flipt/core" + rpcenvironments "go.flipt.io/flipt/rpc/v2/environments" "go.flipt.io/flipt/rpc/v2/evaluation" "go.uber.org/zap" "google.golang.org/protobuf/types/known/structpb" @@ -57,38 +60,57 @@ func newNamespace() *namespace { } } -type SnapshotOption struct { +type SnapshotBuilderOption struct { validatorOption []validation.FeaturesValidatorOption + dependencyGraph *graph.ResourceGraph } -func newSnapshotOption(opts ...containers.Option[SnapshotOption]) SnapshotOption { - so := SnapshotOption{} +func newSnapshotOption(opts ...containers.Option[SnapshotBuilderOption]) SnapshotBuilderOption { + so := SnapshotBuilderOption{} containers.ApplyAll(&so, opts...) return so } -func WithValidatorOption(opts ...validation.FeaturesValidatorOption) containers.Option[SnapshotOption] { - return func(so *SnapshotOption) { +func WithValidatorOption(opts ...validation.FeaturesValidatorOption) containers.Option[SnapshotBuilderOption] { + return func(so *SnapshotBuilderOption) { so.validatorOption = opts } } +func WithDependencyGraph(dependencyGraph *graph.ResourceGraph) containers.Option[SnapshotBuilderOption] { + return func(so *SnapshotBuilderOption) { + so.dependencyGraph = dependencyGraph + } +} + +type SnapshotBuilder struct { + logger *zap.Logger + opts []containers.Option[SnapshotBuilderOption] +} + +func NewSnapshotBuilder(logger *zap.Logger, opts ...containers.Option[SnapshotBuilderOption]) *SnapshotBuilder { + return &SnapshotBuilder{ + logger: logger, + opts: opts, + } +} + // SnapshotFromFS is a convenience function for building a snapshot // directly from an implementation of fs.FS using the list state files // function to source the relevant Flipt configuration files. -func SnapshotFromFS(logger *zap.Logger, conf *Config, src fs.FS, opts ...containers.Option[SnapshotOption]) (*Snapshot, error) { +func (b *SnapshotBuilder) SnapshotFromFS(conf *Config, src fs.FS) (*Snapshot, error) { paths, err := conf.List(src) if err != nil { return nil, err } - return SnapshotFromPaths(logger, src, paths, opts...) + return b.SnapshotFromPaths(src, paths) } // SnapshotFromPaths constructs a StoreSnapshot from the provided // slice of paths resolved against the provided fs.FS. -func SnapshotFromPaths(logger *zap.Logger, ffs fs.FS, paths []string, opts ...containers.Option[SnapshotOption]) (*Snapshot, error) { - logger.Debug("opening state files", zap.Strings("paths", paths)) +func (b *SnapshotBuilder) SnapshotFromPaths(ffs fs.FS, paths []string) (*Snapshot, error) { + b.logger.Debug("opening state files", zap.Strings("paths", paths)) var files []fs.File for _, file := range paths { @@ -100,14 +122,14 @@ func SnapshotFromPaths(logger *zap.Logger, ffs fs.FS, paths []string, opts ...co files = append(files, fi) } - return SnapshotFromFiles(logger, files, opts...) + return b.SnapshotFromFiles(files) } // SnapshotFromFiles constructs a StoreSnapshot from the provided slice // of fs.File implementations. -func SnapshotFromFiles(logger *zap.Logger, files []fs.File, opts ...containers.Option[SnapshotOption]) (*Snapshot, error) { +func (b *SnapshotBuilder) SnapshotFromFiles(files []fs.File) (*Snapshot, error) { var ( - so = newSnapshotOption(opts...) + so = newSnapshotOption(b.opts...) s = EmptySnapshot() ) @@ -120,7 +142,7 @@ func SnapshotFromFiles(logger *zap.Logger, files []fs.File, opts ...containers.O } for _, doc := range docs { - if err := s.addDoc(doc); err != nil { + if err := s.addDoc(doc, so); err != nil { return nil, err } } @@ -176,7 +198,7 @@ func EmptySnapshot() *Snapshot { } // documentsFromFile parses and validates a document from a single fs.File instance -func documentsFromFile(fi fs.File, opts SnapshotOption) ([]*ext.Document, error) { +func documentsFromFile(fi fs.File, opts SnapshotBuilderOption) ([]*ext.Document, error) { validator, err := validation.NewFeaturesValidator(opts.validatorOption...) if err != nil { return nil, err @@ -248,7 +270,7 @@ func documentsFromFile(fi fs.File, opts SnapshotOption) ([]*ext.Document, error) // codepaths (v1 types / eval). // The snapshot generated contains all the necessary state to serve server-side // evaluation as well as returning entire snapshot state for client-side evaluation. -func (s *Snapshot) addDoc(doc *ext.Document) error { +func (s *Snapshot) addDoc(doc *ext.Document, opts SnapshotBuilderOption) error { var ( namespaceKey = doc.Namespace.GetKey() ns = s.ns[namespaceKey] @@ -317,6 +339,8 @@ func (s *Snapshot) addDoc(doc *ext.Document) error { for _, f := range doc.Flags { var ( + dependencies = []environments.TypedResource{} + flagType = core.FlagType_value[f.Type] flag = &core.Flag{ Key: f.Key, @@ -416,6 +440,15 @@ func (s *Snapshot) addDoc(doc *ext.Document) error { return errs.ErrInvalidf("flag %s/%s rule %d references unknown segment %q", doc.Namespace, flag.Key, rank, segmentKey) } + // track dependency between flag and segment + dependencies = append(dependencies, environments.TypedResource{ + ResourceType: environments.SegmentResourceType, + Resource: &rpcenvironments.Resource{ + NamespaceKey: doc.Namespace.GetKey(), + Key: segmentKey, + }, + }) + evc := make([]storage.EvaluationConstraint, 0, len(segment.Constraints)) for _, constraint := range segment.Constraints { evc = append(evc, storage.EvaluationConstraint{ @@ -541,6 +574,15 @@ func (s *Snapshot) addDoc(doc *ext.Document) error { return errs.ErrInvalidf("flag %s/%s rule %d references unknown segment %q", doc.Namespace, flag.Key, rank, segmentKey) } + // track dependency between flag and segment + dependencies = append(dependencies, environments.TypedResource{ + ResourceType: environments.SegmentResourceType, + Resource: &rpcenvironments.Resource{ + NamespaceKey: doc.Namespace.GetKey(), + Key: segmentKey, + }, + }) + constraints := make([]storage.EvaluationConstraint, 0, len(segment.Constraints)) for _, c := range segment.Constraints { constraints = append(constraints, storage.EvaluationConstraint{ @@ -594,6 +636,15 @@ func (s *Snapshot) addDoc(doc *ext.Document) error { } ns.evalRollouts[f.Key] = evalRollouts + if opts.dependencyGraph != nil { + opts.dependencyGraph.SetDependencies(environments.TypedResource{ + ResourceType: environments.FlagResourceType, + Resource: &rpcenvironments.Resource{ + NamespaceKey: doc.Namespace.GetKey(), + Key: f.Key, + }, + }, dependencies) + } } ns.etag = doc.Etag diff --git a/internal/storage/fs/snapshot_test.go b/internal/storage/fs/snapshot_test.go index 82e1d38f07..6a97818ff2 100644 --- a/internal/storage/fs/snapshot_test.go +++ b/internal/storage/fs/snapshot_test.go @@ -27,7 +27,8 @@ func ptr[P any](p P) *P { func TestSnapshot_GetFlag(t *testing.T) { conf := DefaultFliptConfig() - snap, err := SnapshotFromFS(zaptest.NewLogger(t), conf, testdata) + builder := NewSnapshotBuilder(zaptest.NewLogger(t)) + snap, err := builder.SnapshotFromFS(conf, testdata) require.NoError(t, err) tests := []struct { @@ -158,7 +159,9 @@ func TestSnapshot_GetFlag(t *testing.T) { } func TestSnapshot_ListFlags(t *testing.T) { - snap, err := SnapshotFromFS(zaptest.NewLogger(t), DefaultFliptConfig(), testdata) + conf := DefaultFliptConfig() + builder := NewSnapshotBuilder(zaptest.NewLogger(t)) + snap, err := builder.SnapshotFromFS(conf, testdata) require.NoError(t, err) tests := []struct { @@ -289,7 +292,9 @@ func TestSnapshot_ListFlags(t *testing.T) { } func TestSnapshot_CountFlags(t *testing.T) { - snap, err := SnapshotFromFS(zaptest.NewLogger(t), DefaultFliptConfig(), testdata) + conf := DefaultFliptConfig() + builder := NewSnapshotBuilder(zaptest.NewLogger(t)) + snap, err := builder.SnapshotFromFS(conf, testdata) require.NoError(t, err) tests := []struct { @@ -331,7 +336,9 @@ func TestSnapshot_CountFlags(t *testing.T) { } func TestSnapshot_GetEvaluationRules(t *testing.T) { - snap, err := SnapshotFromFS(zaptest.NewLogger(t), DefaultFliptConfig(), testdata) + conf := DefaultFliptConfig() + builder := NewSnapshotBuilder(zaptest.NewLogger(t)) + snap, err := builder.SnapshotFromFS(conf, testdata) require.NoError(t, err) tests := []struct { @@ -458,7 +465,9 @@ func storageEvaluationDistTransformer() cmp.Option { } func TestSnapshot_GetEvaluationDistributions(t *testing.T) { - snap, err := SnapshotFromFS(zaptest.NewLogger(t), DefaultFliptConfig(), testdata) + conf := DefaultFliptConfig() + builder := NewSnapshotBuilder(zaptest.NewLogger(t)) + snap, err := builder.SnapshotFromFS(conf, testdata) require.NoError(t, err) // First get the rules to get valid rule IDs @@ -551,7 +560,9 @@ func TestSnapshot_GetEvaluationDistributions(t *testing.T) { } func TestSnapshot_GetEvaluationRollouts(t *testing.T) { - snap, err := SnapshotFromFS(zaptest.NewLogger(t), DefaultFliptConfig(), testdata) + conf := DefaultFliptConfig() + builder := NewSnapshotBuilder(zaptest.NewLogger(t)) + snap, err := builder.SnapshotFromFS(conf, testdata) require.NoError(t, err) tests := []struct { @@ -652,7 +663,9 @@ func TestSnapshot_GetEvaluationRollouts(t *testing.T) { } func TestSnapshot_EvaluationNamespaceSnapshot(t *testing.T) { - snap, err := SnapshotFromFS(zaptest.NewLogger(t), DefaultFliptConfig(), testdata) + conf := DefaultFliptConfig() + builder := NewSnapshotBuilder(zaptest.NewLogger(t)) + snap, err := builder.SnapshotFromFS(conf, testdata) require.NoError(t, err) tests := []struct {