From 7ac07a774ce81a39cdb8e86661dbdc622f30c1ea Mon Sep 17 00:00:00 2001 From: Sagie Maoz Date: Wed, 9 Apr 2025 13:21:49 -0400 Subject: [PATCH 01/37] viam datapipelines list --- cli/app.go | 22 ++++++++++++++++++++++ cli/auth.go | 2 ++ cli/client.go | 24 +++++++++++++----------- cli/datapipelines.go | 38 ++++++++++++++++++++++++++++++++++++++ 4 files changed, 75 insertions(+), 11 deletions(-) create mode 100644 cli/datapipelines.go diff --git a/cli/app.go b/cli/app.go index 1946536ef65..2cc77202f17 100644 --- a/cli/app.go +++ b/cli/app.go @@ -1508,6 +1508,28 @@ var app = &cli.App{ }, }, }, + { + Name: "datapipelines", + Usage: "manage and track data pipelines", + UsageText: createUsageText("datapipelines", nil, false, true), + HideHelpCommand: true, + Subcommands: []*cli.Command{ + { + Name: "list", + Usage: "list data pipelines for an org ID", + UsageText: createUsageText("datapipelines list", + []string{generalFlagOrgID}, true, false), + Description: "In order to list data pipelines, an org ID is required", + Flags: []cli.Flag{ + &cli.StringFlag{ + Name: generalFlagOrgID, + Usage: fmt.Sprintf("organization ID for which data pipelines will be listed"), + }, + }, + Action: createCommandWithT[datapipelineListArgs](DatapipelineListAction), + }, + }, + }, { Name: "train", Usage: "train on data", diff --git a/cli/auth.go b/cli/auth.go index 8bc31f38e33..01dfa282798 100644 --- a/cli/auth.go +++ b/cli/auth.go @@ -20,6 +20,7 @@ import ( "go.uber.org/multierr" buildpb "go.viam.com/api/app/build/v1" datapb "go.viam.com/api/app/data/v1" + datapipelinespb "go.viam.com/api/app/datapipelines/v1" datasetpb "go.viam.com/api/app/dataset/v1" mlinferencepb "go.viam.com/api/app/mlinference/v1" mltrainingpb "go.viam.com/api/app/mltraining/v1" @@ -528,6 +529,7 @@ func (c *viamClient) ensureLoggedInInner() error { c.dataClient = datapb.NewDataServiceClient(conn) c.packageClient = packagepb.NewPackageServiceClient(conn) c.datasetClient = datasetpb.NewDatasetServiceClient(conn) + c.datapipelinesClient = datapipelinespb.NewDataPipelinesServiceClient(conn) c.mlTrainingClient = mltrainingpb.NewMLTrainingServiceClient(conn) c.mlInferenceClient = mlinferencepb.NewMLInferenceServiceClient(conn) c.buildClient = buildpb.NewBuildServiceClient(conn) diff --git a/cli/client.go b/cli/client.go index 5777538c5ef..2853aec18bd 100644 --- a/cli/client.go +++ b/cli/client.go @@ -31,6 +31,7 @@ import ( "go.uber.org/zap" buildpb "go.viam.com/api/app/build/v1" datapb "go.viam.com/api/app/data/v1" + datapipelinespb "go.viam.com/api/app/datapipelines/v1" datasetpb "go.viam.com/api/app/dataset/v1" mlinferencepb "go.viam.com/api/app/mlinference/v1" mltrainingpb "go.viam.com/api/app/mltraining/v1" @@ -79,17 +80,18 @@ var errNoShellService = errors.New("shell service is not enabled on this machine // viamClient wraps a cli.Context and provides all the CLI command functionality // needed to talk to the app and data services but not directly to robot parts. type viamClient struct { - c *cli.Context - conf *Config - client apppb.AppServiceClient - dataClient datapb.DataServiceClient - packageClient packagepb.PackageServiceClient - datasetClient datasetpb.DatasetServiceClient - mlTrainingClient mltrainingpb.MLTrainingServiceClient - mlInferenceClient mlinferencepb.MLInferenceServiceClient - buildClient buildpb.BuildServiceClient - baseURL *url.URL - authFlow *authFlow + c *cli.Context + conf *Config + client apppb.AppServiceClient + dataClient datapb.DataServiceClient + packageClient packagepb.PackageServiceClient + datasetClient datasetpb.DatasetServiceClient + datapipelinesClient datapipelinespb.DataPipelinesServiceClient + mlTrainingClient mltrainingpb.MLTrainingServiceClient + mlInferenceClient mlinferencepb.MLInferenceServiceClient + buildClient buildpb.BuildServiceClient + baseURL *url.URL + authFlow *authFlow selectedOrg *apppb.Organization selectedLoc *apppb.Location diff --git a/cli/datapipelines.go b/cli/datapipelines.go new file mode 100644 index 00000000000..8a5ab1238ab --- /dev/null +++ b/cli/datapipelines.go @@ -0,0 +1,38 @@ +package cli + +import ( + "context" + "errors" + + "github.com/urfave/cli/v2" + datapipelinespb "go.viam.com/api/app/datapipelines/v1" +) + +type datapipelineListArgs struct { + OrgID string +} + +func DatapipelineListAction(c *cli.Context, args datapipelineListArgs) error { + client, err := newViamClient(c) + if err != nil { + return err + } + + orgID := args.OrgID + if orgID == "" { + return errors.New("organization ID is required") + } + + resp, err := client.datapipelinesClient.ListDataPipelines(context.Background(), &datapipelinespb.ListDataPipelinesRequest{ + OrganizationId: orgID, + }) + if err != nil { + return err + } + + for _, pipeline := range resp.GetDataPipelines() { + printf(c.App.Writer, "\t%s (ID: %s)", pipeline.Name, pipeline.Id) + } + + return nil +} From dd56a19e84c41df4319081cd27d79e5f93475dae Mon Sep 17 00:00:00 2001 From: Sagie Maoz Date: Wed, 9 Apr 2025 15:42:43 -0400 Subject: [PATCH 02/37] viam datapipelines create --- cli/app.go | 26 +++++++++++++++ cli/datapipelines.go | 76 ++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 102 insertions(+) diff --git a/cli/app.go b/cli/app.go index 2cc77202f17..245161e68d4 100644 --- a/cli/app.go +++ b/cli/app.go @@ -1528,6 +1528,32 @@ var app = &cli.App{ }, Action: createCommandWithT[datapipelineListArgs](DatapipelineListAction), }, + { + Name: "create", + Usage: "create a new datapipeline", + UsageText: createUsageText("datapipelines create", + []string{generalFlagOrgID, datapipelineFlagName, datapipelineFlagSchedule, datapipelineFlagMQL}, false, false), + Flags: []cli.Flag{ + &cli.StringFlag{ + Name: generalFlagOrgID, + Usage: "organization ID for which data pipeline will be created", + }, + &cli.StringFlag{ + Name: datapipelineFlagName, + Usage: "name of the new data pipeline", + }, + &cli.StringFlag{ + Name: datapipelineFlagSchedule, + Usage: "schedule of the new data pipeline (cron expression)", + }, + &cli.StringFlag{ + Name: datapipelineFlagMQL, + Usage: "MQL query for the new data pipeline", + }, + // TODO: mql argument (path to file) + }, + Action: createCommandWithT[datapipelineCreateArgs](DatapipelineCreateAction), + }, }, }, { diff --git a/cli/datapipelines.go b/cli/datapipelines.go index 8a5ab1238ab..eb84d559df3 100644 --- a/cli/datapipelines.go +++ b/cli/datapipelines.go @@ -3,11 +3,19 @@ package cli import ( "context" "errors" + "fmt" "github.com/urfave/cli/v2" + "go.mongodb.org/mongo-driver/bson" datapipelinespb "go.viam.com/api/app/datapipelines/v1" ) +const ( + datapipelineFlagName = "name" + datapipelineFlagSchedule = "schedule" + datapipelineFlagMQL = "mql" +) + type datapipelineListArgs struct { OrgID string } @@ -36,3 +44,71 @@ func DatapipelineListAction(c *cli.Context, args datapipelineListArgs) error { return nil } + +type datapipelineCreateArgs struct { + OrgID string + Name string + Schedule string + MQL string +} + +func DatapipelineCreateAction(c *cli.Context, args datapipelineCreateArgs) error { + client, err := newViamClient(c) + if err != nil { + return err + } + + orgID := args.OrgID + if orgID == "" { + return errors.New("organization ID is required") + } + + name := args.Name + if name == "" { + return errors.New("data pipeline name is required") + } + + schedule := args.Schedule + if schedule == "" { + return errors.New("data pipeline schedule is required") + } + + // TODO: validate cron expression + + mql := args.MQL + if mql == "" { + return errors.New("data pipeline MQL is required") + } + + // Parse the MQL stages directly into BSON + // TODO: look into more leniant JSON parser + var mqlArray []bson.M + if err := bson.UnmarshalExtJSON([]byte(mql), false, &mqlArray); err != nil { + return fmt.Errorf("invalid MQL: %w", err) + } + + var mqlBinary [][]byte + for _, stage := range mqlArray { + bytes, err := bson.Marshal(stage) + if err != nil { + return fmt.Errorf("error converting MQL stage to BSON: %w", err) + } + mqlBinary = append(mqlBinary, bytes) + } + + // TODO: support MQL file path + + resp, err := client.datapipelinesClient.CreateDataPipeline(context.Background(), &datapipelinespb.CreateDataPipelineRequest{ + OrganizationId: orgID, + Name: name, + Schedule: schedule, + MqlBinary: mqlBinary, + }) + if err != nil { + return err + } + + printf(c.App.Writer, "%s (ID: %s) created", name, resp.GetId()) + + return nil +} From 8de0e929423010ecd61689e2ff8188ebe0828bef Mon Sep 17 00:00:00 2001 From: Sagie Maoz Date: Wed, 9 Apr 2025 17:04:57 -0400 Subject: [PATCH 03/37] mql-file --- cli/app.go | 18 +++++++++++++++--- cli/datapipelines.go | 28 ++++++++++++++++++---------- mql.json | 1 + 3 files changed, 34 insertions(+), 13 deletions(-) create mode 100644 mql.json diff --git a/cli/app.go b/cli/app.go index 245161e68d4..66ed0bfe681 100644 --- a/cli/app.go +++ b/cli/app.go @@ -117,6 +117,11 @@ const ( dataFlagFilterTags = "filter-tags" dataFlagTimeout = "timeout" + datapipelineFlagName = "name" + datapipelineFlagSchedule = "schedule" + datapipelineFlagMQL = "mql" + datapipelineFlagMQLFile = "mql-file" + packageFlagFramework = "model-framework" oauthAppFlagClientID = "client-id" @@ -1530,9 +1535,13 @@ var app = &cli.App{ }, { Name: "create", - Usage: "create a new datapipeline", + Usage: "create a new data pipeline", UsageText: createUsageText("datapipelines create", - []string{generalFlagOrgID, datapipelineFlagName, datapipelineFlagSchedule, datapipelineFlagMQL}, false, false), + []string{generalFlagOrgID, datapipelineFlagName, datapipelineFlagSchedule}, false, false, + fmt.Sprintf("[--%s=<%s> | --%s=<%s>]", + datapipelineFlagMQL, datapipelineFlagMQL, + datapipelineFlagMQLFile, datapipelineFlagMQLFile), + ), Flags: []cli.Flag{ &cli.StringFlag{ Name: generalFlagOrgID, @@ -1550,7 +1559,10 @@ var app = &cli.App{ Name: datapipelineFlagMQL, Usage: "MQL query for the new data pipeline", }, - // TODO: mql argument (path to file) + &cli.StringFlag{ + Name: datapipelineFlagMQLFile, + Usage: "path to JSON file containing MQL query for the new data pipeline", + }, }, Action: createCommandWithT[datapipelineCreateArgs](DatapipelineCreateAction), }, diff --git a/cli/datapipelines.go b/cli/datapipelines.go index eb84d559df3..5538ce436a6 100644 --- a/cli/datapipelines.go +++ b/cli/datapipelines.go @@ -4,18 +4,13 @@ import ( "context" "errors" "fmt" + "os" "github.com/urfave/cli/v2" "go.mongodb.org/mongo-driver/bson" datapipelinespb "go.viam.com/api/app/datapipelines/v1" ) -const ( - datapipelineFlagName = "name" - datapipelineFlagSchedule = "schedule" - datapipelineFlagMQL = "mql" -) - type datapipelineListArgs struct { OrgID string } @@ -50,6 +45,7 @@ type datapipelineCreateArgs struct { Name string Schedule string MQL string + MqlFile string } func DatapipelineCreateAction(c *cli.Context, args datapipelineCreateArgs) error { @@ -76,8 +72,22 @@ func DatapipelineCreateAction(c *cli.Context, args datapipelineCreateArgs) error // TODO: validate cron expression mql := args.MQL + mqlFile := args.MqlFile + + if mqlFile != "" { + if mql != "" { + return errors.New("data pipeline MQL and MQL file cannot both be provided") + } + + content, err := os.ReadFile(mqlFile) + if err != nil { + return fmt.Errorf("error reading MQL file: %w", err) + } + mql = string(content) + } + if mql == "" { - return errors.New("data pipeline MQL is required") + return errors.New("missing data pipeline MQL") } // Parse the MQL stages directly into BSON @@ -96,8 +106,6 @@ func DatapipelineCreateAction(c *cli.Context, args datapipelineCreateArgs) error mqlBinary = append(mqlBinary, bytes) } - // TODO: support MQL file path - resp, err := client.datapipelinesClient.CreateDataPipeline(context.Background(), &datapipelinespb.CreateDataPipelineRequest{ OrganizationId: orgID, Name: name, @@ -105,7 +113,7 @@ func DatapipelineCreateAction(c *cli.Context, args datapipelineCreateArgs) error MqlBinary: mqlBinary, }) if err != nil { - return err + return fmt.Errorf("error creating data pipeline: %w", err) } printf(c.App.Writer, "%s (ID: %s) created", name, resp.GetId()) diff --git a/mql.json b/mql.json new file mode 100644 index 00000000000..7a46c8ce1a7 --- /dev/null +++ b/mql.json @@ -0,0 +1 @@ +[{"$match": { "component_name": "dragino" }}, {"$group": { "_id": "$part_id", "count": { "$sum": 1 }, "avgTemp": { "$avg": "$data.readings.TempC_SHT" }, "avgHum": { "$avg": "$data.readings.Hum_SHT" }}}] From 804ad715729683b485573c53242962cd25f5cb90 Mon Sep 17 00:00:00 2001 From: Sagie Maoz Date: Wed, 9 Apr 2025 17:10:02 -0400 Subject: [PATCH 04/37] viam datapipelines update --- cli/app.go | 34 ++++++++++++++++++ cli/datapipelines.go | 82 ++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 116 insertions(+) diff --git a/cli/app.go b/cli/app.go index 66ed0bfe681..7f1feec1535 100644 --- a/cli/app.go +++ b/cli/app.go @@ -117,6 +117,7 @@ const ( dataFlagFilterTags = "filter-tags" dataFlagTimeout = "timeout" + datapipelineFlagID = "id" datapipelineFlagName = "name" datapipelineFlagSchedule = "schedule" datapipelineFlagMQL = "mql" @@ -1566,6 +1567,39 @@ var app = &cli.App{ }, Action: createCommandWithT[datapipelineCreateArgs](DatapipelineCreateAction), }, + { + Name: "update", + Usage: "update a data pipeline", + UsageText: createUsageText("datapipelines update", + []string{datapipelineFlagID, datapipelineFlagName, datapipelineFlagSchedule}, false, false, + fmt.Sprintf("[--%s=<%s> | --%s=<%s>]", + datapipelineFlagMQL, datapipelineFlagMQL, + datapipelineFlagMQLFile, datapipelineFlagMQLFile), + ), + Flags: []cli.Flag{ + &cli.StringFlag{ + Name: datapipelineFlagID, + Usage: "ID of the data pipeline to update", + }, + &cli.StringFlag{ + Name: datapipelineFlagName, + Usage: "name of the data pipeline to update", + }, + &cli.StringFlag{ + Name: datapipelineFlagSchedule, + Usage: "schedule of the data pipeline to update (cron expression)", + }, + &cli.StringFlag{ + Name: datapipelineFlagMQL, + Usage: "MQL query for the data pipeline to update", + }, + &cli.StringFlag{ + Name: datapipelineFlagMQLFile, + Usage: "path to JSON file containing MQL query for the data pipeline to update", + }, + }, + Action: createCommandWithT[datapipelineUpdateArgs](DatapipelineUpdateAction), + }, }, }, { diff --git a/cli/datapipelines.go b/cli/datapipelines.go index 5538ce436a6..c80d7952f8b 100644 --- a/cli/datapipelines.go +++ b/cli/datapipelines.go @@ -120,3 +120,85 @@ func DatapipelineCreateAction(c *cli.Context, args datapipelineCreateArgs) error return nil } + +type datapipelineUpdateArgs struct { + ID string + Name string + Schedule string + MQL string + MqlFile string +} + +func DatapipelineUpdateAction(c *cli.Context, args datapipelineUpdateArgs) error { + client, err := newViamClient(c) + if err != nil { + return err + } + + id := args.ID + if id == "" { + return errors.New("data pipeline ID is required") + } + + // TODO: maybe load existing pipeline and update fields? + + name := args.Name + if name == "" { + return errors.New("data pipeline name is required") + } + + schedule := args.Schedule + if schedule == "" { + return errors.New("data pipeline schedule is required") + } + + // TODO: validate cron expression + + mql := args.MQL + mqlFile := args.MqlFile + + if mqlFile != "" { + if mql != "" { + return errors.New("data pipeline MQL and MQL file cannot both be provided") + } + + content, err := os.ReadFile(mqlFile) + if err != nil { + return fmt.Errorf("error reading MQL file: %w", err) + } + mql = string(content) + } + + if mql == "" { + return errors.New("missing data pipeline MQL") + } + + // Parse the MQL stages directly into BSON + // TODO: look into more leniant JSON parser + var mqlArray []bson.M + if err := bson.UnmarshalExtJSON([]byte(mql), false, &mqlArray); err != nil { + return fmt.Errorf("invalid MQL: %w", err) + } + + var mqlBinary [][]byte + for _, stage := range mqlArray { + bytes, err := bson.Marshal(stage) + if err != nil { + return fmt.Errorf("error converting MQL stage to BSON: %w", err) + } + mqlBinary = append(mqlBinary, bytes) + } + + _, err = client.datapipelinesClient.UpdateDataPipeline(context.Background(), &datapipelinespb.UpdateDataPipelineRequest{ + Id: id, + Name: name, + Schedule: schedule, + MqlBinary: mqlBinary, + }) + if err != nil { + return fmt.Errorf("error updating data pipeline: %w", err) + } + + printf(c.App.Writer, "%s (id: %s) updated", name, id) + return nil +} From dd00e5c405f23e797b87916dc702fcd5265513ce Mon Sep 17 00:00:00 2001 From: Sagie Maoz Date: Thu, 10 Apr 2025 10:37:13 -0400 Subject: [PATCH 05/37] required=true for some flags --- cli/app.go | 25 +++++++++++++++---------- cli/datapipelines.go | 43 +++++++------------------------------------ 2 files changed, 22 insertions(+), 46 deletions(-) diff --git a/cli/app.go b/cli/app.go index 7f1feec1535..f8c8212313f 100644 --- a/cli/app.go +++ b/cli/app.go @@ -1528,8 +1528,9 @@ var app = &cli.App{ Description: "In order to list data pipelines, an org ID is required", Flags: []cli.Flag{ &cli.StringFlag{ - Name: generalFlagOrgID, - Usage: fmt.Sprintf("organization ID for which data pipelines will be listed"), + Name: generalFlagOrgID, + Usage: "organization ID for which data pipelines will be listed", + Required: true, }, }, Action: createCommandWithT[datapipelineListArgs](DatapipelineListAction), @@ -1545,16 +1546,19 @@ var app = &cli.App{ ), Flags: []cli.Flag{ &cli.StringFlag{ - Name: generalFlagOrgID, - Usage: "organization ID for which data pipeline will be created", + Name: generalFlagOrgID, + Usage: "organization ID for which data pipeline will be created", + Required: true, }, &cli.StringFlag{ - Name: datapipelineFlagName, - Usage: "name of the new data pipeline", + Name: datapipelineFlagName, + Usage: "name of the new data pipeline", + Required: true, }, &cli.StringFlag{ - Name: datapipelineFlagSchedule, - Usage: "schedule of the new data pipeline (cron expression)", + Name: datapipelineFlagSchedule, + Usage: "schedule of the new data pipeline (cron expression)", + Required: true, }, &cli.StringFlag{ Name: datapipelineFlagMQL, @@ -1578,8 +1582,9 @@ var app = &cli.App{ ), Flags: []cli.Flag{ &cli.StringFlag{ - Name: datapipelineFlagID, - Usage: "ID of the data pipeline to update", + Name: datapipelineFlagID, + Usage: "ID of the data pipeline to update", + Required: true, }, &cli.StringFlag{ Name: datapipelineFlagName, diff --git a/cli/datapipelines.go b/cli/datapipelines.go index c80d7952f8b..40b93f61023 100644 --- a/cli/datapipelines.go +++ b/cli/datapipelines.go @@ -21,13 +21,8 @@ func DatapipelineListAction(c *cli.Context, args datapipelineListArgs) error { return err } - orgID := args.OrgID - if orgID == "" { - return errors.New("organization ID is required") - } - resp, err := client.datapipelinesClient.ListDataPipelines(context.Background(), &datapipelinespb.ListDataPipelinesRequest{ - OrganizationId: orgID, + OrganizationId: args.OrgID, }) if err != nil { return err @@ -54,23 +49,6 @@ func DatapipelineCreateAction(c *cli.Context, args datapipelineCreateArgs) error return err } - orgID := args.OrgID - if orgID == "" { - return errors.New("organization ID is required") - } - - name := args.Name - if name == "" { - return errors.New("data pipeline name is required") - } - - schedule := args.Schedule - if schedule == "" { - return errors.New("data pipeline schedule is required") - } - - // TODO: validate cron expression - mql := args.MQL mqlFile := args.MqlFile @@ -107,16 +85,16 @@ func DatapipelineCreateAction(c *cli.Context, args datapipelineCreateArgs) error } resp, err := client.datapipelinesClient.CreateDataPipeline(context.Background(), &datapipelinespb.CreateDataPipelineRequest{ - OrganizationId: orgID, - Name: name, - Schedule: schedule, + OrganizationId: args.OrgID, + Name: args.Name, + Schedule: args.Schedule, MqlBinary: mqlBinary, }) if err != nil { return fmt.Errorf("error creating data pipeline: %w", err) } - printf(c.App.Writer, "%s (ID: %s) created", name, resp.GetId()) + printf(c.App.Writer, "%s (ID: %s) created", args.Name, resp.GetId()) return nil } @@ -135,11 +113,6 @@ func DatapipelineUpdateAction(c *cli.Context, args datapipelineUpdateArgs) error return err } - id := args.ID - if id == "" { - return errors.New("data pipeline ID is required") - } - // TODO: maybe load existing pipeline and update fields? name := args.Name @@ -152,8 +125,6 @@ func DatapipelineUpdateAction(c *cli.Context, args datapipelineUpdateArgs) error return errors.New("data pipeline schedule is required") } - // TODO: validate cron expression - mql := args.MQL mqlFile := args.MqlFile @@ -190,7 +161,7 @@ func DatapipelineUpdateAction(c *cli.Context, args datapipelineUpdateArgs) error } _, err = client.datapipelinesClient.UpdateDataPipeline(context.Background(), &datapipelinespb.UpdateDataPipelineRequest{ - Id: id, + Id: args.ID, Name: name, Schedule: schedule, MqlBinary: mqlBinary, @@ -199,6 +170,6 @@ func DatapipelineUpdateAction(c *cli.Context, args datapipelineUpdateArgs) error return fmt.Errorf("error updating data pipeline: %w", err) } - printf(c.App.Writer, "%s (id: %s) updated", name, id) + printf(c.App.Writer, "%s (id: %s) updated", name, args.ID) return nil } From 4292f711de762cb45cd57419de853928d1b1886d Mon Sep 17 00:00:00 2001 From: Sagie Maoz Date: Thu, 10 Apr 2025 10:44:15 -0400 Subject: [PATCH 06/37] update fields are optional --- cli/datapipelines.go | 42 +++++++++++++++++++++++------------------- 1 file changed, 23 insertions(+), 19 deletions(-) diff --git a/cli/datapipelines.go b/cli/datapipelines.go index 40b93f61023..a33a92c7e61 100644 --- a/cli/datapipelines.go +++ b/cli/datapipelines.go @@ -113,16 +113,22 @@ func DatapipelineUpdateAction(c *cli.Context, args datapipelineUpdateArgs) error return err } - // TODO: maybe load existing pipeline and update fields? + resp, err := client.datapipelinesClient.GetDataPipeline(context.Background(), &datapipelinespb.GetDataPipelineRequest{ + Id: args.ID, + }) + if err != nil { + return fmt.Errorf("error getting data pipeline: %w", err) + } + current := resp.GetDataPipeline() name := args.Name if name == "" { - return errors.New("data pipeline name is required") + name = current.GetName() } schedule := args.Schedule if schedule == "" { - return errors.New("data pipeline schedule is required") + schedule = current.GetSchedule() } mql := args.MQL @@ -140,24 +146,22 @@ func DatapipelineUpdateAction(c *cli.Context, args datapipelineUpdateArgs) error mql = string(content) } - if mql == "" { - return errors.New("missing data pipeline MQL") - } - - // Parse the MQL stages directly into BSON - // TODO: look into more leniant JSON parser - var mqlArray []bson.M - if err := bson.UnmarshalExtJSON([]byte(mql), false, &mqlArray); err != nil { - return fmt.Errorf("invalid MQL: %w", err) - } + mqlBinary := current.GetMqlBinary() + if mql != "" { + // Parse the MQL stages directly into BSON + // TODO: look into more leniant JSON parser + var mqlArray []bson.M + if err := bson.UnmarshalExtJSON([]byte(mql), false, &mqlArray); err != nil { + return fmt.Errorf("invalid MQL: %w", err) + } - var mqlBinary [][]byte - for _, stage := range mqlArray { - bytes, err := bson.Marshal(stage) - if err != nil { - return fmt.Errorf("error converting MQL stage to BSON: %w", err) + for _, stage := range mqlArray { + bytes, err := bson.Marshal(stage) + if err != nil { + return fmt.Errorf("error converting MQL stage to BSON: %w", err) + } + mqlBinary = append(mqlBinary, bytes) } - mqlBinary = append(mqlBinary, bytes) } _, err = client.datapipelinesClient.UpdateDataPipeline(context.Background(), &datapipelinespb.UpdateDataPipelineRequest{ From c18be02f9fd31f9de2e32a6dbca47acf9f33da40 Mon Sep 17 00:00:00 2001 From: Sagie Maoz Date: Thu, 10 Apr 2025 10:47:18 -0400 Subject: [PATCH 07/37] delete --- cli/app.go | 13 +++++++++++++ cli/datapipelines.go | 21 +++++++++++++++++++++ 2 files changed, 34 insertions(+) diff --git a/cli/app.go b/cli/app.go index f8c8212313f..aa9ec83db36 100644 --- a/cli/app.go +++ b/cli/app.go @@ -1605,6 +1605,19 @@ var app = &cli.App{ }, Action: createCommandWithT[datapipelineUpdateArgs](DatapipelineUpdateAction), }, + { + Name: "delete", + Usage: "delete a data pipeline", + UsageText: createUsageText("datapipelines delete", []string{datapipelineFlagID}, true, false), + Flags: []cli.Flag{ + &cli.StringFlag{ + Name: datapipelineFlagID, + Usage: "ID of the data pipeline to delete", + Required: true, + }, + }, + Action: createCommandWithT[datapipelineDeleteArgs](DatapipelineDeleteAction), + }, }, }, { diff --git a/cli/datapipelines.go b/cli/datapipelines.go index a33a92c7e61..772853b8566 100644 --- a/cli/datapipelines.go +++ b/cli/datapipelines.go @@ -177,3 +177,24 @@ func DatapipelineUpdateAction(c *cli.Context, args datapipelineUpdateArgs) error printf(c.App.Writer, "%s (id: %s) updated", name, args.ID) return nil } + +type datapipelineDeleteArgs struct { + ID string +} + +func DatapipelineDeleteAction(c *cli.Context, args datapipelineDeleteArgs) error { + client, err := newViamClient(c) + if err != nil { + return err + } + + _, err = client.datapipelinesClient.DeleteDataPipeline(context.Background(), &datapipelinespb.DeleteDataPipelineRequest{ + Id: args.ID, + }) + if err != nil { + return fmt.Errorf("error deleting data pipeline: %w", err) + } + + printf(c.App.Writer, "data pipeline (id: %s) deleted", args.ID) + return nil +} From 021a6c1e726296c5ccc7f96996a2890493714d6d Mon Sep 17 00:00:00 2001 From: Sagie Maoz Date: Thu, 10 Apr 2025 10:47:50 -0400 Subject: [PATCH 08/37] . --- cli/datapipelines.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/cli/datapipelines.go b/cli/datapipelines.go index 772853b8566..982e5a5291c 100644 --- a/cli/datapipelines.go +++ b/cli/datapipelines.go @@ -94,7 +94,7 @@ func DatapipelineCreateAction(c *cli.Context, args datapipelineCreateArgs) error return fmt.Errorf("error creating data pipeline: %w", err) } - printf(c.App.Writer, "%s (ID: %s) created", args.Name, resp.GetId()) + printf(c.App.Writer, "%s (ID: %s) created.", args.Name, resp.GetId()) return nil } @@ -174,7 +174,7 @@ func DatapipelineUpdateAction(c *cli.Context, args datapipelineUpdateArgs) error return fmt.Errorf("error updating data pipeline: %w", err) } - printf(c.App.Writer, "%s (id: %s) updated", name, args.ID) + printf(c.App.Writer, "%s (id: %s) updated.", name, args.ID) return nil } @@ -195,6 +195,6 @@ func DatapipelineDeleteAction(c *cli.Context, args datapipelineDeleteArgs) error return fmt.Errorf("error deleting data pipeline: %w", err) } - printf(c.App.Writer, "data pipeline (id: %s) deleted", args.ID) + printf(c.App.Writer, "data pipeline (id: %s) deleted.", args.ID) return nil } From 488684ec7b16ea91c5abe3b5459d099338846ccf Mon Sep 17 00:00:00 2001 From: Sagie Maoz Date: Thu, 10 Apr 2025 10:49:31 -0400 Subject: [PATCH 09/37] MQL parsing helper --- cli/datapipelines.go | 106 ++++++++++++++++++------------------------- 1 file changed, 44 insertions(+), 62 deletions(-) diff --git a/cli/datapipelines.go b/cli/datapipelines.go index 982e5a5291c..008c1e981a6 100644 --- a/cli/datapipelines.go +++ b/cli/datapipelines.go @@ -49,39 +49,9 @@ func DatapipelineCreateAction(c *cli.Context, args datapipelineCreateArgs) error return err } - mql := args.MQL - mqlFile := args.MqlFile - - if mqlFile != "" { - if mql != "" { - return errors.New("data pipeline MQL and MQL file cannot both be provided") - } - - content, err := os.ReadFile(mqlFile) - if err != nil { - return fmt.Errorf("error reading MQL file: %w", err) - } - mql = string(content) - } - - if mql == "" { - return errors.New("missing data pipeline MQL") - } - - // Parse the MQL stages directly into BSON - // TODO: look into more leniant JSON parser - var mqlArray []bson.M - if err := bson.UnmarshalExtJSON([]byte(mql), false, &mqlArray); err != nil { - return fmt.Errorf("invalid MQL: %w", err) - } - - var mqlBinary [][]byte - for _, stage := range mqlArray { - bytes, err := bson.Marshal(stage) - if err != nil { - return fmt.Errorf("error converting MQL stage to BSON: %w", err) - } - mqlBinary = append(mqlBinary, bytes) + mqlBinary, err := parseMQL(args.MQL, args.MqlFile) + if err != nil { + return err } resp, err := client.datapipelinesClient.CreateDataPipeline(context.Background(), &datapipelinespb.CreateDataPipelineRequest{ @@ -131,37 +101,13 @@ func DatapipelineUpdateAction(c *cli.Context, args datapipelineUpdateArgs) error schedule = current.GetSchedule() } - mql := args.MQL - mqlFile := args.MqlFile - - if mqlFile != "" { - if mql != "" { - return errors.New("data pipeline MQL and MQL file cannot both be provided") - } - - content, err := os.ReadFile(mqlFile) - if err != nil { - return fmt.Errorf("error reading MQL file: %w", err) - } - mql = string(content) - } - mqlBinary := current.GetMqlBinary() - if mql != "" { - // Parse the MQL stages directly into BSON - // TODO: look into more leniant JSON parser - var mqlArray []bson.M - if err := bson.UnmarshalExtJSON([]byte(mql), false, &mqlArray); err != nil { - return fmt.Errorf("invalid MQL: %w", err) - } - - for _, stage := range mqlArray { - bytes, err := bson.Marshal(stage) - if err != nil { - return fmt.Errorf("error converting MQL stage to BSON: %w", err) - } - mqlBinary = append(mqlBinary, bytes) + if args.MQL != "" || args.MqlFile != "" { + newMqlBinary, err := parseMQL(args.MQL, args.MqlFile) + if err != nil { + return err } + mqlBinary = newMqlBinary } _, err = client.datapipelinesClient.UpdateDataPipeline(context.Background(), &datapipelinespb.UpdateDataPipelineRequest{ @@ -178,6 +124,42 @@ func DatapipelineUpdateAction(c *cli.Context, args datapipelineUpdateArgs) error return nil } +func parseMQL(mql, mqlFile string) ([][]byte, error) { + if mqlFile != "" { + if mql != "" { + return nil, errors.New("data pipeline MQL and MQL file cannot both be provided") + } + + content, err := os.ReadFile(mqlFile) + if err != nil { + return nil, fmt.Errorf("error reading MQL file: %w", err) + } + mql = string(content) + } + + if mql == "" { + return nil, errors.New("missing data pipeline MQL") + } + + // Parse the MQL stages directly into BSON + // TODO: look into more leniant JSON parser + var mqlArray []bson.M + if err := bson.UnmarshalExtJSON([]byte(mql), false, &mqlArray); err != nil { + return nil, fmt.Errorf("invalid MQL: %w", err) + } + + var mqlBinary [][]byte + for _, stage := range mqlArray { + bytes, err := bson.Marshal(stage) + if err != nil { + return nil, fmt.Errorf("error converting MQL stage to BSON: %w", err) + } + mqlBinary = append(mqlBinary, bytes) + } + + return mqlBinary, nil +} + type datapipelineDeleteArgs struct { ID string } From 4f663280fa776ffbc59ca6ca2eff11cf3ddb8616 Mon Sep 17 00:00:00 2001 From: Sagie Maoz Date: Thu, 10 Apr 2025 10:59:04 -0400 Subject: [PATCH 10/37] describe --- cli/app.go | 13 +++++++ cli/datapipelines.go | 82 +++++++++++++++++++++++++++++++++++--------- 2 files changed, 79 insertions(+), 16 deletions(-) diff --git a/cli/app.go b/cli/app.go index aa9ec83db36..be66ee15494 100644 --- a/cli/app.go +++ b/cli/app.go @@ -1618,6 +1618,19 @@ var app = &cli.App{ }, Action: createCommandWithT[datapipelineDeleteArgs](DatapipelineDeleteAction), }, + { + Name: "describe", + Usage: "describe a data pipeline and its status", + UsageText: createUsageText("datapipelines describe", []string{datapipelineFlagID}, true, false), + Flags: []cli.Flag{ + &cli.StringFlag{ + Name: datapipelineFlagID, + Usage: "ID of the data pipeline to describe", + Required: true, + }, + }, + Action: createCommandWithT[datapipelineDescribeArgs](DatapipelineDescribeAction), + }, }, }, { diff --git a/cli/datapipelines.go b/cli/datapipelines.go index 008c1e981a6..76f0fea6975 100644 --- a/cli/datapipelines.go +++ b/cli/datapipelines.go @@ -2,6 +2,7 @@ package cli import ( "context" + "encoding/json" "errors" "fmt" "os" @@ -103,11 +104,10 @@ func DatapipelineUpdateAction(c *cli.Context, args datapipelineUpdateArgs) error mqlBinary := current.GetMqlBinary() if args.MQL != "" || args.MqlFile != "" { - newMqlBinary, err := parseMQL(args.MQL, args.MqlFile) + mqlBinary, err = parseMQL(args.MQL, args.MqlFile) if err != nil { return err } - mqlBinary = newMqlBinary } _, err = client.datapipelinesClient.UpdateDataPipeline(context.Background(), &datapipelinespb.UpdateDataPipelineRequest{ @@ -124,6 +124,59 @@ func DatapipelineUpdateAction(c *cli.Context, args datapipelineUpdateArgs) error return nil } +type datapipelineDeleteArgs struct { + ID string +} + +func DatapipelineDeleteAction(c *cli.Context, args datapipelineDeleteArgs) error { + client, err := newViamClient(c) + if err != nil { + return err + } + + _, err = client.datapipelinesClient.DeleteDataPipeline(context.Background(), &datapipelinespb.DeleteDataPipelineRequest{ + Id: args.ID, + }) + if err != nil { + return fmt.Errorf("error deleting data pipeline: %w", err) + } + + printf(c.App.Writer, "data pipeline (id: %s) deleted.", args.ID) + return nil +} + +type datapipelineDescribeArgs struct { + ID string +} + +func DatapipelineDescribeAction(c *cli.Context, args datapipelineDescribeArgs) error { + client, err := newViamClient(c) + if err != nil { + return err + } + + resp, err := client.datapipelinesClient.GetDataPipeline(context.Background(), &datapipelinespb.GetDataPipelineRequest{ + Id: args.ID, + }) + if err != nil { + return fmt.Errorf("error getting data pipeline: %w", err) + } + pipeline := resp.GetDataPipeline() + + printf(c.App.Writer, "ID: %s", pipeline.GetId()) + printf(c.App.Writer, "Name: %s", pipeline.GetName()) + printf(c.App.Writer, "Time window: %s", pipeline.GetSchedule()) + mql, err := mqlJSON(pipeline.GetMqlBinary()) + if err != nil { + return fmt.Errorf("error getting MQL query: %w", err) + } + printf(c.App.Writer, "MQL query: %s", mql) + // TODO: pending implementation of PipelineRuns API + // printf(c.App.Writer, "Last execution: %s", pipeline.GetLastExecution()) + + return nil +} + func parseMQL(mql, mqlFile string) ([][]byte, error) { if mqlFile != "" { if mql != "" { @@ -160,23 +213,20 @@ func parseMQL(mql, mqlFile string) ([][]byte, error) { return mqlBinary, nil } -type datapipelineDeleteArgs struct { - ID string -} - -func DatapipelineDeleteAction(c *cli.Context, args datapipelineDeleteArgs) error { - client, err := newViamClient(c) - if err != nil { - return err +func mqlJSON(mql [][]byte) (string, error) { + var stages []bson.M + for _, bsonBytes := range mql { + var stage bson.M + if err := bson.Unmarshal(bsonBytes, &stage); err != nil { + return "", fmt.Errorf("error unmarshaling BSON stage: %w", err) + } + stages = append(stages, stage) } - _, err = client.datapipelinesClient.DeleteDataPipeline(context.Background(), &datapipelinespb.DeleteDataPipelineRequest{ - Id: args.ID, - }) + jsonBytes, err := json.MarshalIndent(stages, "", " ") if err != nil { - return fmt.Errorf("error deleting data pipeline: %w", err) + return "", fmt.Errorf("error marshaling stages to JSON: %w", err) } - printf(c.App.Writer, "data pipeline (id: %s) deleted.", args.ID) - return nil + return string(jsonBytes), nil } From bb326cde81fee702c07be9576601ebcbdb3dfeea Mon Sep 17 00:00:00 2001 From: Sagie Maoz Date: Thu, 10 Apr 2025 11:42:59 -0400 Subject: [PATCH 11/37] oops didn't mean to commit mql.json --- mql.json | 1 - 1 file changed, 1 deletion(-) delete mode 100644 mql.json diff --git a/mql.json b/mql.json deleted file mode 100644 index 7a46c8ce1a7..00000000000 --- a/mql.json +++ /dev/null @@ -1 +0,0 @@ -[{"$match": { "component_name": "dragino" }}, {"$group": { "_id": "$part_id", "count": { "$sum": 1 }, "avgTemp": { "$avg": "$data.readings.TempC_SHT" }, "avgHum": { "$avg": "$data.readings.Hum_SHT" }}}] From 6d5dab3e4bf751ef6828726435470da7b1785b04 Mon Sep 17 00:00:00 2001 From: Sagie Maoz Date: Thu, 10 Apr 2025 11:46:18 -0400 Subject: [PATCH 12/37] mql parsing warning --- cli/datapipelines.go | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/cli/datapipelines.go b/cli/datapipelines.go index 76f0fea6975..48d8843a7df 100644 --- a/cli/datapipelines.go +++ b/cli/datapipelines.go @@ -163,13 +163,15 @@ func DatapipelineDescribeAction(c *cli.Context, args datapipelineDescribeArgs) e } pipeline := resp.GetDataPipeline() - printf(c.App.Writer, "ID: %s", pipeline.GetId()) - printf(c.App.Writer, "Name: %s", pipeline.GetName()) - printf(c.App.Writer, "Time window: %s", pipeline.GetSchedule()) mql, err := mqlJSON(pipeline.GetMqlBinary()) if err != nil { - return fmt.Errorf("error getting MQL query: %w", err) + warningf(c.App.Writer, "error parsing MQL query: %s", err) + mql = "(error parsing MQL query)" } + + printf(c.App.Writer, "ID: %s", pipeline.GetId()) + printf(c.App.Writer, "Name: %s", pipeline.GetName()) + printf(c.App.Writer, "Time window: %s", pipeline.GetSchedule()) printf(c.App.Writer, "MQL query: %s", mql) // TODO: pending implementation of PipelineRuns API // printf(c.App.Writer, "Last execution: %s", pipeline.GetLastExecution()) From 95d4913aa958121a5219709e1c7ea05e87be30e2 Mon Sep 17 00:00:00 2001 From: Sagie Maoz Date: Thu, 10 Apr 2025 13:00:29 -0400 Subject: [PATCH 13/37] test ParseMQL --- cli/datapipelines_test.go | 99 +++++++++++++++++++++++++++++++++++++++ mql.json | 1 + 2 files changed, 100 insertions(+) create mode 100644 cli/datapipelines_test.go create mode 100644 mql.json diff --git a/cli/datapipelines_test.go b/cli/datapipelines_test.go new file mode 100644 index 00000000000..a4318e70721 --- /dev/null +++ b/cli/datapipelines_test.go @@ -0,0 +1,99 @@ +package cli + +import ( + "os" + "testing" + + "go.mongodb.org/mongo-driver/bson" + "go.viam.com/test" +) + +var ( + mqlString = `[{"$match": {"component_name": "dragino"}}, {"$group": {"_id": "$part_id", "count": {"$sum": 1}}}]` + mqlBSON = []bson.M{ + {"$match": bson.M{"component_name": "dragino"}}, + {"$group": bson.M{"_id": "$part_id", "count": bson.M{"$sum": 1}}}, + } +) + +func TestParseMQL(t *testing.T) { + expectedBytes := expectedBSONBytes(t) + testCases := map[string]struct { + mqlString string + mqlFile string + expectedError bool + expectedBytes [][]byte + }{ + "valid MQL string": { + mqlString: mqlString, + mqlFile: "", + expectedError: false, + expectedBytes: expectedBytes, + }, + "valid MQL file": { + mqlString: "", + mqlFile: createTempMQLFile(t), + expectedError: false, + expectedBytes: expectedBytes, + }, + "empty string and file": { + mqlString: "", + mqlFile: "", + expectedError: true, + }, + "both string and file provided": { + mqlString: mqlString, + mqlFile: createTempMQLFile(t), + expectedError: true, + }, + "invalid MQL JSON string": { + mqlString: `[{"$match": {"component_name": "dragino"}}`, // missing closing bracket + mqlFile: "", + expectedError: true, + }, + } + + for name, tc := range testCases { + t.Run(name, func(t *testing.T) { + if tc.mqlFile != "" { + defer os.Remove(tc.mqlFile) + } + + mqlBytes, err := parseMQL(tc.mqlString, tc.mqlFile) + if tc.expectedError { + test.That(t, err, test.ShouldNotBeNil) + return + } + test.That(t, err, test.ShouldBeNil) + test.That(t, mqlBytes, test.ShouldResemble, tc.expectedBytes) + }) + } +} + +func createTempMQLFile(t *testing.T) string { + t.Helper() + + f, err := os.CreateTemp("", "mql.json") + test.That(t, err, test.ShouldBeNil) + + _, err = f.WriteString(mqlString) + test.That(t, err, test.ShouldBeNil) + err = f.Close() + test.That(t, err, test.ShouldBeNil) + + return f.Name() +} + +func expectedBSONBytes(t *testing.T) [][]byte { + t.Helper() + + expectedBSONBytes := make([][]byte, len(mqlBSON)) + var err error + for i, bsonDoc := range mqlBSON { + expectedBSONBytes[i], err = bson.Marshal(bsonDoc) + if err != nil { + break + } + } + return expectedBSONBytes +} diff --git a/mql.json b/mql.json new file mode 100644 index 00000000000..7a46c8ce1a7 --- /dev/null +++ b/mql.json @@ -0,0 +1 @@ +[{"$match": { "component_name": "dragino" }}, {"$group": { "_id": "$part_id", "count": { "$sum": 1 }, "avgTemp": { "$avg": "$data.readings.TempC_SHT" }, "avgHum": { "$avg": "$data.readings.Hum_SHT" }}}] From a35177d4fc47fb5aed0cffb7d34dc2f2fc746466 Mon Sep 17 00:00:00 2001 From: Sagie Maoz Date: Thu, 10 Apr 2025 14:20:41 -0400 Subject: [PATCH 14/37] test mql->json --- cli/datapipelines_test.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/cli/datapipelines_test.go b/cli/datapipelines_test.go index a4318e70721..eb9cc6be7c0 100644 --- a/cli/datapipelines_test.go +++ b/cli/datapipelines_test.go @@ -97,3 +97,9 @@ func expectedBSONBytes(t *testing.T) [][]byte { } return expectedBSONBytes } + +func TestMQLJSON(t *testing.T) { + json, err := mqlJSON(expectedBSONBytes(t)) + test.That(t, err, test.ShouldBeNil) + test.That(t, json, test.ShouldEqualJSON, mqlString) +} From c1cecfbff1cdec58c2afd0f66b3c70dbd9bdd3ea Mon Sep 17 00:00:00 2001 From: Sagie Maoz Date: Thu, 10 Apr 2025 17:03:38 -0400 Subject: [PATCH 15/37] schedule not time window --- cli/datapipelines.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cli/datapipelines.go b/cli/datapipelines.go index 48d8843a7df..c16944105bd 100644 --- a/cli/datapipelines.go +++ b/cli/datapipelines.go @@ -171,7 +171,7 @@ func DatapipelineDescribeAction(c *cli.Context, args datapipelineDescribeArgs) e printf(c.App.Writer, "ID: %s", pipeline.GetId()) printf(c.App.Writer, "Name: %s", pipeline.GetName()) - printf(c.App.Writer, "Time window: %s", pipeline.GetSchedule()) + printf(c.App.Writer, "Schedule: %s", pipeline.GetSchedule()) printf(c.App.Writer, "MQL query: %s", mql) // TODO: pending implementation of PipelineRuns API // printf(c.App.Writer, "Last execution: %s", pipeline.GetLastExecution()) From ee42e0af9d5a1222c5d5639bba77e5be296e1739 Mon Sep 17 00:00:00 2001 From: Sagie Maoz Date: Fri, 11 Apr 2025 12:43:44 -0400 Subject: [PATCH 16/37] print last execution status --- cli/datapipelines.go | 29 +++++++++++++++++++++++++++-- go.mod | 2 +- go.sum | 2 ++ 3 files changed, 30 insertions(+), 3 deletions(-) diff --git a/cli/datapipelines.go b/cli/datapipelines.go index c16944105bd..4b5b9f0a142 100644 --- a/cli/datapipelines.go +++ b/cli/datapipelines.go @@ -6,6 +6,7 @@ import ( "errors" "fmt" "os" + "time" "github.com/urfave/cli/v2" "go.mongodb.org/mongo-driver/bson" @@ -163,6 +164,15 @@ func DatapipelineDescribeAction(c *cli.Context, args datapipelineDescribeArgs) e } pipeline := resp.GetDataPipeline() + runsResp, err := client.datapipelinesClient.ListPipelineRuns(context.Background(), &datapipelinespb.ListPipelineRunsRequest{ + Id: args.ID, + PageSize: 1, + }) + if err != nil { + return fmt.Errorf("error listing pipeline runs: %w", err) + } + runs := runsResp.Runs + mql, err := mqlJSON(pipeline.GetMqlBinary()) if err != nil { warningf(c.App.Writer, "error parsing MQL query: %s", err) @@ -171,10 +181,25 @@ func DatapipelineDescribeAction(c *cli.Context, args datapipelineDescribeArgs) e printf(c.App.Writer, "ID: %s", pipeline.GetId()) printf(c.App.Writer, "Name: %s", pipeline.GetName()) + printf(c.App.Writer, "Enabled: %t", pipeline.GetEnabled()) printf(c.App.Writer, "Schedule: %s", pipeline.GetSchedule()) printf(c.App.Writer, "MQL query: %s", mql) - // TODO: pending implementation of PipelineRuns API - // printf(c.App.Writer, "Last execution: %s", pipeline.GetLastExecution()) + + var pipelineRunStatusMap = map[datapipelinespb.PipelineRunStatus]string{ + datapipelinespb.PipelineRunStatus_PIPELINE_RUN_STATUS_UNSPECIFIED: "Unknown", + datapipelinespb.PipelineRunStatus_PIPELINE_RUN_STATUS_SCHEDULED: "Scheduled", + datapipelinespb.PipelineRunStatus_PIPELINE_RUN_STATUS_STARTED: "Running", + datapipelinespb.PipelineRunStatus_PIPELINE_RUN_STATUS_COMPLETED: "Success", + datapipelinespb.PipelineRunStatus_PIPELINE_RUN_STATUS_FAILED: "Failed", + } + + if len(runs) > 0 { + printf(c.App.Writer, "Last run: %s, %s.", + runs[0].GetStartTime().AsTime().Format(time.RFC3339), + pipelineRunStatusMap[runs[0].GetStatus()]) + } else { + printf(c.App.Writer, "Has not run yet.") + } return nil } diff --git a/go.mod b/go.mod index 1d865a02a30..8fea15bf76b 100644 --- a/go.mod +++ b/go.mod @@ -77,7 +77,7 @@ require ( go.uber.org/atomic v1.11.0 go.uber.org/multierr v1.11.0 go.uber.org/zap v1.27.0 - go.viam.com/api v0.1.418 + go.viam.com/api v0.1.428 go.viam.com/test v1.2.4 go.viam.com/utils v0.1.138 goji.io v2.0.2+incompatible diff --git a/go.sum b/go.sum index 593f97d64c6..02b6b8f550d 100644 --- a/go.sum +++ b/go.sum @@ -1532,6 +1532,8 @@ go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8= go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= go.viam.com/api v0.1.418 h1:XKDdwmG/4oZWU2RLuHj+mjfbw90umMKYw80Chw5fliI= go.viam.com/api v0.1.418/go.mod h1:drvlBWaiHFxPziz5jayHvibez1qG7lylcNCC1LF8onU= +go.viam.com/api v0.1.428 h1:L8wEmzV60WXV/iIhFNKGvy1Cc4E2SL8fYff1ulyl+Ts= +go.viam.com/api v0.1.428/go.mod h1:drvlBWaiHFxPziz5jayHvibez1qG7lylcNCC1LF8onU= go.viam.com/test v1.2.4 h1:JYgZhsuGAQ8sL9jWkziAXN9VJJiKbjoi9BsO33TW3ug= go.viam.com/test v1.2.4/go.mod h1:zI2xzosHdqXAJ/kFqcN+OIF78kQuTV2nIhGZ8EzvaJI= go.viam.com/utils v0.1.138 h1:dD8Nb2XwYDh5mbUg2lK/kBjo3yqIdp01K8vclpGczko= From adc0120c2a2e26807e7ab103bbc326ffa8dec198 Mon Sep 17 00:00:00 2001 From: Sagie Maoz Date: Fri, 11 Apr 2025 15:00:26 -0400 Subject: [PATCH 17/37] lenient JSON keys --- cli/datapipelines.go | 6 +++--- go.mod | 1 + go.sum | 2 ++ 3 files changed, 6 insertions(+), 3 deletions(-) diff --git a/cli/datapipelines.go b/cli/datapipelines.go index 4b5b9f0a142..bd82c6aaadc 100644 --- a/cli/datapipelines.go +++ b/cli/datapipelines.go @@ -9,6 +9,7 @@ import ( "time" "github.com/urfave/cli/v2" + "github.com/yosuke-furukawa/json5/encoding/json5" "go.mongodb.org/mongo-driver/bson" datapipelinespb "go.viam.com/api/app/datapipelines/v1" ) @@ -221,10 +222,9 @@ func parseMQL(mql, mqlFile string) ([][]byte, error) { return nil, errors.New("missing data pipeline MQL") } - // Parse the MQL stages directly into BSON - // TODO: look into more leniant JSON parser + // Parse the MQL stages JSON (using JSON5 for unquoted keys + comments). var mqlArray []bson.M - if err := bson.UnmarshalExtJSON([]byte(mql), false, &mqlArray); err != nil { + if err := json5.Unmarshal([]byte(mql), &mqlArray); err != nil { return nil, fmt.Errorf("invalid MQL: %w", err) } diff --git a/go.mod b/go.mod index 8fea15bf76b..6f3e93df6ad 100644 --- a/go.mod +++ b/go.mod @@ -433,6 +433,7 @@ require ( github.com/erikstmartin/go-testdb v0.0.0-20160219214506-8d10e4a1bae5 // indirect github.com/kylelemons/go-gypsy v1.0.0 // indirect github.com/pkg/errors v0.9.1 + github.com/yosuke-furukawa/json5 v0.1.1 github.com/ziutek/mymysql v1.5.4 // indirect golang.org/x/exp v0.0.0-20240904232852-e7e105dedf7e ) diff --git a/go.sum b/go.sum index 02b6b8f550d..bdcb5b3fc39 100644 --- a/go.sum +++ b/go.sum @@ -1443,6 +1443,8 @@ github.com/yeya24/promlinter v0.3.0 h1:JVDbMp08lVCP7Y6NP3qHroGAO6z2yGKQtS5Jsjqto github.com/yeya24/promlinter v0.3.0/go.mod h1:cDfJQQYv9uYciW60QT0eeHlFodotkYZlL+YcPQN+mW4= github.com/ykadowak/zerologlint v0.1.5 h1:Gy/fMz1dFQN9JZTPjv1hxEk+sRWm05row04Yoolgdiw= github.com/ykadowak/zerologlint v0.1.5/go.mod h1:KaUskqF3e/v59oPmdq1U1DnKcuHokl2/K1U4pmIELKg= +github.com/yosuke-furukawa/json5 v0.1.1 h1:0F9mNwTvOuDNH243hoPqvf+dxa5QsKnZzU20uNsh3ZI= +github.com/yosuke-furukawa/json5 v0.1.1/go.mod h1:sw49aWDqNdRJ6DYUtIQiaA3xyj2IL9tjeNYmX2ixwcU= github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78 h1:ilQV1hzziu+LLM3zUTJ0trRztfwgjqKnBWNtSRkbmwM= github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78/go.mod h1:aL8wCCfTfSfmXjznFBSZNN13rSJjlIOI1fUNAtF7rmI= github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= From 976921399391a0aeb4c552abcead2c3777e8ed45 Mon Sep 17 00:00:00 2001 From: Sagie Maoz Date: Fri, 11 Apr 2025 15:44:55 -0400 Subject: [PATCH 18/37] better bson comparison tests --- cli/datapipelines_test.go | 55 +++++++++++++++++++++++++++------------ 1 file changed, 38 insertions(+), 17 deletions(-) diff --git a/cli/datapipelines_test.go b/cli/datapipelines_test.go index eb9cc6be7c0..fdf0e849658 100644 --- a/cli/datapipelines_test.go +++ b/cli/datapipelines_test.go @@ -1,6 +1,7 @@ package cli import ( + "encoding/json" "os" "testing" @@ -9,32 +10,39 @@ import ( ) var ( - mqlString = `[{"$match": {"component_name": "dragino"}}, {"$group": {"_id": "$part_id", "count": {"$sum": 1}}}]` - mqlBSON = []bson.M{ + mqlString = `[ + {"$match": { "component_name": "dragino" }}, + {"$group": { + "_id": "$part_id", + "count": { "$sum": 1 }, + "avgTemp": { "$avg": "$data.readings.TempC_SHT" }, + "avgHum": { "$avg": "$data.readings.Hum_SHT" } + }} + ]` + mqlBSON = []bson.M{ {"$match": bson.M{"component_name": "dragino"}}, - {"$group": bson.M{"_id": "$part_id", "count": bson.M{"$sum": 1}}}, + {"$group": bson.M{"_id": "$part_id", "count": bson.M{"$sum": 1}, "avgTemp": bson.M{"$avg": "$data.readings.TempC_SHT"}, "avgHum": bson.M{"$avg": "$data.readings.Hum_SHT"}}}, } ) func TestParseMQL(t *testing.T) { - expectedBytes := expectedBSONBytes(t) testCases := map[string]struct { mqlString string mqlFile string expectedError bool - expectedBytes [][]byte + expectedBSON []bson.M }{ "valid MQL string": { mqlString: mqlString, mqlFile: "", expectedError: false, - expectedBytes: expectedBytes, + expectedBSON: mqlBSON, }, "valid MQL file": { mqlString: "", mqlFile: createTempMQLFile(t), expectedError: false, - expectedBytes: expectedBytes, + expectedBSON: mqlBSON, }, "empty string and file": { mqlString: "", @@ -47,7 +55,7 @@ func TestParseMQL(t *testing.T) { expectedError: true, }, "invalid MQL JSON string": { - mqlString: `[{"$match": {"component_name": "dragino"}}`, // missing closing bracket + mqlString: `[{"$match": {"component_name": "dragino"`, // missing closing brackets mqlFile: "", expectedError: true, }, @@ -65,11 +73,30 @@ func TestParseMQL(t *testing.T) { return } test.That(t, err, test.ShouldBeNil) - test.That(t, mqlBytes, test.ShouldResemble, tc.expectedBytes) + + for i, bsonBytes := range mqlBytes { + var bsonM bson.M + err = bson.Unmarshal(bsonBytes, &bsonM) + test.That(t, err, test.ShouldBeNil) + testBSONResemble(t, bsonM, tc.expectedBSON[i]) + } }) } } +// testBSONResemble compares two bson.M objects and asserts that they are equal. +func testBSONResemble(t *testing.T, actual, expected bson.M) { + t.Helper() + + actualJSON, err := json.Marshal(actual) + test.That(t, err, test.ShouldBeNil) + + expectedJSON, err := json.Marshal(expected) + test.That(t, err, test.ShouldBeNil) + + test.That(t, string(actualJSON), test.ShouldEqualJSON, string(expectedJSON)) +} + func createTempMQLFile(t *testing.T) string { t.Helper() @@ -84,9 +111,7 @@ func createTempMQLFile(t *testing.T) string { return f.Name() } -func expectedBSONBytes(t *testing.T) [][]byte { - t.Helper() - +func TestMQLJSON(t *testing.T) { expectedBSONBytes := make([][]byte, len(mqlBSON)) var err error for i, bsonDoc := range mqlBSON { @@ -95,11 +120,7 @@ func expectedBSONBytes(t *testing.T) [][]byte { break } } - return expectedBSONBytes -} - -func TestMQLJSON(t *testing.T) { - json, err := mqlJSON(expectedBSONBytes(t)) + json, err := mqlJSON(expectedBSONBytes) test.That(t, err, test.ShouldBeNil) test.That(t, json, test.ShouldEqualJSON, mqlString) } From 2f8daf7bcff439cbcf41661b6bfa746cfa14cc3d Mon Sep 17 00:00:00 2001 From: Sagie Maoz Date: Fri, 11 Apr 2025 15:52:00 -0400 Subject: [PATCH 19/37] enable, disable --- cli/app.go | 26 ++++++++++++++++++++++++++ cli/datapipelines.go | 42 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 68 insertions(+) diff --git a/cli/app.go b/cli/app.go index be66ee15494..0c24a7e8f4d 100644 --- a/cli/app.go +++ b/cli/app.go @@ -1631,6 +1631,32 @@ var app = &cli.App{ }, Action: createCommandWithT[datapipelineDescribeArgs](DatapipelineDescribeAction), }, + { + Name: "enable", + Usage: "enable a data pipeline", + UsageText: createUsageText("datapipelines enable", []string{datapipelineFlagID}, true, false), + Flags: []cli.Flag{ + &cli.StringFlag{ + Name: datapipelineFlagID, + Usage: "ID of the data pipeline to enable", + Required: true, + }, + }, + Action: createCommandWithT[datapipelineEnableArgs](DatapipelineEnableAction), + }, + { + Name: "disable", + Usage: "disable a data pipeline", + UsageText: createUsageText("datapipelines disable", []string{datapipelineFlagID}, true, false), + Flags: []cli.Flag{ + &cli.StringFlag{ + Name: datapipelineFlagID, + Usage: "ID of the data pipeline to disable", + Required: true, + }, + }, + Action: createCommandWithT[datapipelineDisableArgs](DatapipelineDisableAction), + }, }, }, { diff --git a/cli/datapipelines.go b/cli/datapipelines.go index bd82c6aaadc..0e313aed45d 100644 --- a/cli/datapipelines.go +++ b/cli/datapipelines.go @@ -205,6 +205,48 @@ func DatapipelineDescribeAction(c *cli.Context, args datapipelineDescribeArgs) e return nil } +type datapipelineEnableArgs struct { + ID string +} + +func DatapipelineEnableAction(c *cli.Context, args datapipelineEnableArgs) error { + client, err := newViamClient(c) + if err != nil { + return err + } + + _, err = client.datapipelinesClient.EnableDataPipeline(context.Background(), &datapipelinespb.EnableDataPipelineRequest{ + Id: args.ID, + }) + if err != nil { + return fmt.Errorf("error enabling data pipeline: %w", err) + } + + printf(c.App.Writer, "data pipeline (id: %s) enabled.", args.ID) + return nil +} + +type datapipelineDisableArgs struct { + ID string +} + +func DatapipelineDisableAction(c *cli.Context, args datapipelineDisableArgs) error { + client, err := newViamClient(c) + if err != nil { + return err + } + + _, err = client.datapipelinesClient.DisableDataPipeline(context.Background(), &datapipelinespb.DisableDataPipelineRequest{ + Id: args.ID, + }) + if err != nil { + return fmt.Errorf("error disabling data pipeline: %w", err) + } + + printf(c.App.Writer, "data pipeline (id: %s) disabled.", args.ID) + return nil +} + func parseMQL(mql, mqlFile string) ([][]byte, error) { if mqlFile != "" { if mql != "" { From e672947815f1e03bdb2bfb748ae2a6b60de0cf27 Mon Sep 17 00:00:00 2001 From: Sagie Maoz Date: Fri, 11 Apr 2025 15:54:22 -0400 Subject: [PATCH 20/37] oops didn't mean to commit mql.json --- mql.json | 1 - 1 file changed, 1 deletion(-) delete mode 100644 mql.json diff --git a/mql.json b/mql.json deleted file mode 100644 index 7a46c8ce1a7..00000000000 --- a/mql.json +++ /dev/null @@ -1 +0,0 @@ -[{"$match": { "component_name": "dragino" }}, {"$group": { "_id": "$part_id", "count": { "$sum": 1 }, "avgTemp": { "$avg": "$data.readings.TempC_SHT" }, "avgHum": { "$avg": "$data.readings.Hum_SHT" }}}] From 4f8f82fdcbfd41d979e35399e42807182ea8d906 Mon Sep 17 00:00:00 2001 From: Sagie Maoz Date: Fri, 11 Apr 2025 16:07:52 -0400 Subject: [PATCH 21/37] yosuke-furukawa/json5 is bsd --- doc/dependency_decisions.yml | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/doc/dependency_decisions.yml b/doc/dependency_decisions.yml index 2dfb3940093..495bf1e2918 100644 --- a/doc/dependency_decisions.yml +++ b/doc/dependency_decisions.yml @@ -217,3 +217,11 @@ :why: :versions: [] :when: 2023-01-30 21:31:27.476042000 Z +- - :license + - github.com/yosuke-furukawa/json5 + - Simplified BSD + - :who: sagie + :why: + :license_links: https://github.com/yosuke-furukawa/json5/blob/cf7bb3f354ffe5d5ad4c9b714895eab7e0498b5f/LICENSE.md + :versions: [] + :when: 2025-04-11 20:04:09.564107000 Z From 760e80c55a58234b8b3113a054c346b13ce39d48 Mon Sep 17 00:00:00 2001 From: Sagie Maoz Date: Fri, 11 Apr 2025 18:03:43 -0400 Subject: [PATCH 22/37] reorder describe + exported func comments --- cli/app.go | 26 +++++++++++++------------- cli/datapipelines.go | 7 +++++++ 2 files changed, 20 insertions(+), 13 deletions(-) diff --git a/cli/app.go b/cli/app.go index 0c24a7e8f4d..28c24380af5 100644 --- a/cli/app.go +++ b/cli/app.go @@ -1535,6 +1535,19 @@ var app = &cli.App{ }, Action: createCommandWithT[datapipelineListArgs](DatapipelineListAction), }, + { + Name: "describe", + Usage: "describe a data pipeline and its status", + UsageText: createUsageText("datapipelines describe", []string{datapipelineFlagID}, true, false), + Flags: []cli.Flag{ + &cli.StringFlag{ + Name: datapipelineFlagID, + Usage: "ID of the data pipeline to describe", + Required: true, + }, + }, + Action: createCommandWithT[datapipelineDescribeArgs](DatapipelineDescribeAction), + }, { Name: "create", Usage: "create a new data pipeline", @@ -1618,19 +1631,6 @@ var app = &cli.App{ }, Action: createCommandWithT[datapipelineDeleteArgs](DatapipelineDeleteAction), }, - { - Name: "describe", - Usage: "describe a data pipeline and its status", - UsageText: createUsageText("datapipelines describe", []string{datapipelineFlagID}, true, false), - Flags: []cli.Flag{ - &cli.StringFlag{ - Name: datapipelineFlagID, - Usage: "ID of the data pipeline to describe", - Required: true, - }, - }, - Action: createCommandWithT[datapipelineDescribeArgs](DatapipelineDescribeAction), - }, { Name: "enable", Usage: "enable a data pipeline", diff --git a/cli/datapipelines.go b/cli/datapipelines.go index 0e313aed45d..a2b1b04f061 100644 --- a/cli/datapipelines.go +++ b/cli/datapipelines.go @@ -18,6 +18,7 @@ type datapipelineListArgs struct { OrgID string } +// DatapipelineListAction lists all data pipelines for an organization. func DatapipelineListAction(c *cli.Context, args datapipelineListArgs) error { client, err := newViamClient(c) if err != nil { @@ -46,6 +47,7 @@ type datapipelineCreateArgs struct { MqlFile string } +// DatapipelineCreateAction creates a new data pipeline. func DatapipelineCreateAction(c *cli.Context, args datapipelineCreateArgs) error { client, err := newViamClient(c) if err != nil { @@ -80,6 +82,7 @@ type datapipelineUpdateArgs struct { MqlFile string } +// DatapipelineUpdateAction updates an existing data pipeline. func DatapipelineUpdateAction(c *cli.Context, args datapipelineUpdateArgs) error { client, err := newViamClient(c) if err != nil { @@ -130,6 +133,7 @@ type datapipelineDeleteArgs struct { ID string } +// DatapipelineDeleteAction deletes a data pipeline. func DatapipelineDeleteAction(c *cli.Context, args datapipelineDeleteArgs) error { client, err := newViamClient(c) if err != nil { @@ -151,6 +155,7 @@ type datapipelineDescribeArgs struct { ID string } +// DatapipelineDescribeAction describes a data pipeline and its status. func DatapipelineDescribeAction(c *cli.Context, args datapipelineDescribeArgs) error { client, err := newViamClient(c) if err != nil { @@ -209,6 +214,7 @@ type datapipelineEnableArgs struct { ID string } +// DatapipelineEnableAction enables a data pipeline. func DatapipelineEnableAction(c *cli.Context, args datapipelineEnableArgs) error { client, err := newViamClient(c) if err != nil { @@ -230,6 +236,7 @@ type datapipelineDisableArgs struct { ID string } +// DatapipelineDisableAction disables a data pipeline. func DatapipelineDisableAction(c *cli.Context, args datapipelineDisableArgs) error { client, err := newViamClient(c) if err != nil { From dcdf14a8dcbc3163009c3184c2d3e80064ed7070 Mon Sep 17 00:00:00 2001 From: Sagie Maoz Date: Fri, 11 Apr 2025 18:12:37 -0400 Subject: [PATCH 23/37] nolint os.Readfile --- cli/datapipelines.go | 1 + go.sum | 2 -- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/cli/datapipelines.go b/cli/datapipelines.go index a2b1b04f061..2c54595babb 100644 --- a/cli/datapipelines.go +++ b/cli/datapipelines.go @@ -260,6 +260,7 @@ func parseMQL(mql, mqlFile string) ([][]byte, error) { return nil, errors.New("data pipeline MQL and MQL file cannot both be provided") } + //nolint:gosec // mqlFile is a user-provided path for reading MQL query files content, err := os.ReadFile(mqlFile) if err != nil { return nil, fmt.Errorf("error reading MQL file: %w", err) diff --git a/go.sum b/go.sum index bdcb5b3fc39..3c43b0d9183 100644 --- a/go.sum +++ b/go.sum @@ -1532,8 +1532,6 @@ go.uber.org/zap v1.18.1/go.mod h1:xg/QME4nWcxGxrpdeYfq7UvYrLh66cuVKdrbD1XF/NI= go.uber.org/zap v1.23.0/go.mod h1:D+nX8jyLsMHMYrln8A0rJjFt/T/9/bGgIhAqxv5URuY= go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8= go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= -go.viam.com/api v0.1.418 h1:XKDdwmG/4oZWU2RLuHj+mjfbw90umMKYw80Chw5fliI= -go.viam.com/api v0.1.418/go.mod h1:drvlBWaiHFxPziz5jayHvibez1qG7lylcNCC1LF8onU= go.viam.com/api v0.1.428 h1:L8wEmzV60WXV/iIhFNKGvy1Cc4E2SL8fYff1ulyl+Ts= go.viam.com/api v0.1.428/go.mod h1:drvlBWaiHFxPziz5jayHvibez1qG7lylcNCC1LF8onU= go.viam.com/test v1.2.4 h1:JYgZhsuGAQ8sL9jWkziAXN9VJJiKbjoi9BsO33TW3ug= From bc07687248440e84254adc8104ad660624bd331c Mon Sep 17 00:00:00 2001 From: Sagie Maoz Date: Fri, 11 Apr 2025 18:13:16 -0400 Subject: [PATCH 24/37] lint long line --- cli/datapipelines_test.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/cli/datapipelines_test.go b/cli/datapipelines_test.go index fdf0e849658..2dfc9f817c5 100644 --- a/cli/datapipelines_test.go +++ b/cli/datapipelines_test.go @@ -21,7 +21,12 @@ var ( ]` mqlBSON = []bson.M{ {"$match": bson.M{"component_name": "dragino"}}, - {"$group": bson.M{"_id": "$part_id", "count": bson.M{"$sum": 1}, "avgTemp": bson.M{"$avg": "$data.readings.TempC_SHT"}, "avgHum": bson.M{"$avg": "$data.readings.Hum_SHT"}}}, + {"$group": bson.M{ + "_id": "$part_id", + "count": bson.M{"$sum": 1}, + "avgTemp": bson.M{"$avg": "$data.readings.TempC_SHT"}, + "avgHum": bson.M{"$avg": "$data.readings.Hum_SHT"}, + }}, } ) From a27534e29745a82b310fe23216b72cb46c81a191 Mon Sep 17 00:00:00 2001 From: Sagie Maoz Date: Tue, 15 Apr 2025 13:27:31 -0400 Subject: [PATCH 25/37] generalFlagID, generalFlagName --- cli/app.go | 43 ++++++++++++++++++++----------------------- cli/module_build.go | 4 ++-- 2 files changed, 22 insertions(+), 25 deletions(-) diff --git a/cli/app.go b/cli/app.go index 28c24380af5..96dc65915e1 100644 --- a/cli/app.go +++ b/cli/app.go @@ -53,6 +53,7 @@ const ( generalFlagPart = "part" generalFlagPartName = "part-name" generalFlagPartID = "part-id" + generalFlagID = "id" generalFlagName = "name" generalFlagMethod = "method" generalFlagDestination = "destination" @@ -74,7 +75,6 @@ const ( moduleFlagLocal = "local" moduleFlagHomeDir = "home" moduleCreateLocalOnly = "local-only" - moduleFlagID = "id" moduleFlagIsPublic = "public" moduleFlagResourceType = "resource-type" moduleFlagModelName = "model-name" @@ -117,8 +117,6 @@ const ( dataFlagFilterTags = "filter-tags" dataFlagTimeout = "timeout" - datapipelineFlagID = "id" - datapipelineFlagName = "name" datapipelineFlagSchedule = "schedule" datapipelineFlagMQL = "mql" datapipelineFlagMQLFile = "mql-file" @@ -1522,10 +1520,9 @@ var app = &cli.App{ Subcommands: []*cli.Command{ { Name: "list", - Usage: "list data pipelines for an org ID", + Usage: "list data pipelines for an organization ID", UsageText: createUsageText("datapipelines list", []string{generalFlagOrgID}, true, false), - Description: "In order to list data pipelines, an org ID is required", Flags: []cli.Flag{ &cli.StringFlag{ Name: generalFlagOrgID, @@ -1538,10 +1535,10 @@ var app = &cli.App{ { Name: "describe", Usage: "describe a data pipeline and its status", - UsageText: createUsageText("datapipelines describe", []string{datapipelineFlagID}, true, false), + UsageText: createUsageText("datapipelines describe", []string{generalFlagID}, true, false), Flags: []cli.Flag{ &cli.StringFlag{ - Name: datapipelineFlagID, + Name: generalFlagID, Usage: "ID of the data pipeline to describe", Required: true, }, @@ -1552,7 +1549,7 @@ var app = &cli.App{ Name: "create", Usage: "create a new data pipeline", UsageText: createUsageText("datapipelines create", - []string{generalFlagOrgID, datapipelineFlagName, datapipelineFlagSchedule}, false, false, + []string{generalFlagOrgID, generalFlagName, datapipelineFlagSchedule}, false, false, fmt.Sprintf("[--%s=<%s> | --%s=<%s>]", datapipelineFlagMQL, datapipelineFlagMQL, datapipelineFlagMQLFile, datapipelineFlagMQLFile), @@ -1564,7 +1561,7 @@ var app = &cli.App{ Required: true, }, &cli.StringFlag{ - Name: datapipelineFlagName, + Name: generalFlagName, Usage: "name of the new data pipeline", Required: true, }, @@ -1588,19 +1585,19 @@ var app = &cli.App{ Name: "update", Usage: "update a data pipeline", UsageText: createUsageText("datapipelines update", - []string{datapipelineFlagID, datapipelineFlagName, datapipelineFlagSchedule}, false, false, + []string{generalFlagID, generalFlagName, datapipelineFlagSchedule}, false, false, fmt.Sprintf("[--%s=<%s> | --%s=<%s>]", datapipelineFlagMQL, datapipelineFlagMQL, datapipelineFlagMQLFile, datapipelineFlagMQLFile), ), Flags: []cli.Flag{ &cli.StringFlag{ - Name: datapipelineFlagID, + Name: generalFlagID, Usage: "ID of the data pipeline to update", Required: true, }, &cli.StringFlag{ - Name: datapipelineFlagName, + Name: generalFlagName, Usage: "name of the data pipeline to update", }, &cli.StringFlag{ @@ -1621,10 +1618,10 @@ var app = &cli.App{ { Name: "delete", Usage: "delete a data pipeline", - UsageText: createUsageText("datapipelines delete", []string{datapipelineFlagID}, true, false), + UsageText: createUsageText("datapipelines delete", []string{generalFlagID}, true, false), Flags: []cli.Flag{ &cli.StringFlag{ - Name: datapipelineFlagID, + Name: generalFlagID, Usage: "ID of the data pipeline to delete", Required: true, }, @@ -1634,10 +1631,10 @@ var app = &cli.App{ { Name: "enable", Usage: "enable a data pipeline", - UsageText: createUsageText("datapipelines enable", []string{datapipelineFlagID}, true, false), + UsageText: createUsageText("datapipelines enable", []string{generalFlagID}, true, false), Flags: []cli.Flag{ &cli.StringFlag{ - Name: datapipelineFlagID, + Name: generalFlagID, Usage: "ID of the data pipeline to enable", Required: true, }, @@ -1647,10 +1644,10 @@ var app = &cli.App{ { Name: "disable", Usage: "disable a data pipeline", - UsageText: createUsageText("datapipelines disable", []string{datapipelineFlagID}, true, false), + UsageText: createUsageText("datapipelines disable", []string{generalFlagID}, true, false), Flags: []cli.Flag{ &cli.StringFlag{ - Name: datapipelineFlagID, + Name: generalFlagID, Usage: "ID of the data pipeline to disable", Required: true, }, @@ -2701,7 +2698,7 @@ Example: DefaultText: "all", }, &cli.StringFlag{ - Name: moduleFlagID, + Name: generalFlagID, Usage: "restrict output to just return builds that match this id", }, }, @@ -2711,10 +2708,10 @@ Example: Name: "logs", Aliases: []string{"log"}, Usage: "get the logs from one of your cloud builds", - UsageText: createUsageText("module build logs", []string{moduleFlagID}, true, false), + UsageText: createUsageText("module build logs", []string{generalFlagID}, true, false), Flags: []cli.Flag{ &cli.StringFlag{ - Name: moduleFlagID, + Name: generalFlagID, Usage: "build that you want to get the logs for", Required: true, }, @@ -2796,7 +2793,7 @@ This won't work unless you have an existing installation of our GitHub app on yo Usage: "name of module to restart. pass at most one of --name, --id", }, &cli.StringFlag{ - Name: moduleFlagID, + Name: generalFlagID, Usage: "ID of module to restart, for example viam:wifi-sensor. pass at most one of --name, --id", }, &cli.BoolFlag{ @@ -2830,7 +2827,7 @@ This won't work unless you have an existing installation of our GitHub app on yo Value: ".", }, &cli.StringFlag{ - Name: moduleFlagID, + Name: generalFlagID, Usage: "module ID as org-id:name or namespace:name", DefaultText: "will try to read from meta.json", }, diff --git a/cli/module_build.go b/cli/module_build.go index 0638875d2ff..8ce189f6d16 100644 --- a/cli/module_build.go +++ b/cli/module_build.go @@ -654,7 +654,7 @@ func resolveTargetModule(c *cli.Context, manifest *moduleManifest) (*robot.Resta modID := args.ID // todo: use MutuallyExclusiveFlags for this when urfave/cli 3.x is stable if (len(modName) > 0) && (len(modID) > 0) { - return nil, fmt.Errorf("provide at most one of --%s and --%s", generalFlagName, moduleFlagID) + return nil, fmt.Errorf("provide at most one of --%s and --%s", generalFlagName, generalFlagID) } request := &robot.RestartModuleRequest{} //nolint:gocritic @@ -665,7 +665,7 @@ func resolveTargetModule(c *cli.Context, manifest *moduleManifest) (*robot.Resta } else if manifest != nil { request.ModuleID = manifest.ModuleID } else { - return nil, fmt.Errorf("if there is no meta.json, provide one of --%s or --%s", generalFlagName, moduleFlagID) + return nil, fmt.Errorf("if there is no meta.json, provide one of --%s or --%s", generalFlagName, generalFlagID) } return request, nil } From 42facd47689a355e39ce79838db7eb1924fab1c2 Mon Sep 17 00:00:00 2001 From: Sagie Maoz Date: Tue, 15 Apr 2025 13:27:49 -0400 Subject: [PATCH 26/37] nicer error msg --- cli/datapipelines.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cli/datapipelines.go b/cli/datapipelines.go index 2c54595babb..77e747a5818 100644 --- a/cli/datapipelines.go +++ b/cli/datapipelines.go @@ -175,7 +175,7 @@ func DatapipelineDescribeAction(c *cli.Context, args datapipelineDescribeArgs) e PageSize: 1, }) if err != nil { - return fmt.Errorf("error listing pipeline runs: %w", err) + return fmt.Errorf("error getting list of pipeline runs: %w", err) } runs := runsResp.Runs From 368d323e4f8cf3dfc2bf0dcf5cb4695aaa01ba79 Mon Sep 17 00:00:00 2001 From: Sagie Maoz Date: Tue, 15 Apr 2025 13:40:04 -0400 Subject: [PATCH 27/37] describe print data range and end time --- cli/datapipelines.go | 31 ++++++++++++++++++++----------- 1 file changed, 20 insertions(+), 11 deletions(-) diff --git a/cli/datapipelines.go b/cli/datapipelines.go index 77e747a5818..3fc0ad548cf 100644 --- a/cli/datapipelines.go +++ b/cli/datapipelines.go @@ -14,6 +14,15 @@ import ( datapipelinespb "go.viam.com/api/app/datapipelines/v1" ) +// pipelineRunStatusMap maps pipeline run statuses to human-readable strings. +var pipelineRunStatusMap = map[datapipelinespb.PipelineRunStatus]string{ + datapipelinespb.PipelineRunStatus_PIPELINE_RUN_STATUS_UNSPECIFIED: "Unknown", + datapipelinespb.PipelineRunStatus_PIPELINE_RUN_STATUS_SCHEDULED: "Scheduled", + datapipelinespb.PipelineRunStatus_PIPELINE_RUN_STATUS_STARTED: "Running", + datapipelinespb.PipelineRunStatus_PIPELINE_RUN_STATUS_COMPLETED: "Success", + datapipelinespb.PipelineRunStatus_PIPELINE_RUN_STATUS_FAILED: "Failed", +} + type datapipelineListArgs struct { OrgID string } @@ -191,18 +200,18 @@ func DatapipelineDescribeAction(c *cli.Context, args datapipelineDescribeArgs) e printf(c.App.Writer, "Schedule: %s", pipeline.GetSchedule()) printf(c.App.Writer, "MQL query: %s", mql) - var pipelineRunStatusMap = map[datapipelinespb.PipelineRunStatus]string{ - datapipelinespb.PipelineRunStatus_PIPELINE_RUN_STATUS_UNSPECIFIED: "Unknown", - datapipelinespb.PipelineRunStatus_PIPELINE_RUN_STATUS_SCHEDULED: "Scheduled", - datapipelinespb.PipelineRunStatus_PIPELINE_RUN_STATUS_STARTED: "Running", - datapipelinespb.PipelineRunStatus_PIPELINE_RUN_STATUS_COMPLETED: "Success", - datapipelinespb.PipelineRunStatus_PIPELINE_RUN_STATUS_FAILED: "Failed", - } - if len(runs) > 0 { - printf(c.App.Writer, "Last run: %s, %s.", - runs[0].GetStartTime().AsTime().Format(time.RFC3339), - pipelineRunStatusMap[runs[0].GetStatus()]) + r := runs[0] + + printf(c.App.Writer, "Last run:") + printf(c.App.Writer, " Status: %s", pipelineRunStatusMap[r.GetStatus()]) + printf(c.App.Writer, " Started: %s", r.GetStartTime().AsTime().Format(time.RFC3339)) + printf(c.App.Writer, " Data range: [%s, %s]", + r.GetDataStartTime().AsTime().Format(time.RFC3339), + r.GetDataEndTime().AsTime().Format(time.RFC3339)) + if r.GetEndTime() != nil { + printf(c.App.Writer, " Ended: %s", r.GetEndTime().AsTime().Format(time.RFC3339)) + } } else { printf(c.App.Writer, "Has not run yet.") } From 021c0cb3328ceaea4429ed548f3c6bbad2502aa8 Mon Sep 17 00:00:00 2001 From: Sagie Maoz Date: Tue, 15 Apr 2025 13:41:58 -0400 Subject: [PATCH 28/37] move mql+mqlFile check out --- cli/datapipelines.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/cli/datapipelines.go b/cli/datapipelines.go index 3fc0ad548cf..ffa376fcb39 100644 --- a/cli/datapipelines.go +++ b/cli/datapipelines.go @@ -264,11 +264,11 @@ func DatapipelineDisableAction(c *cli.Context, args datapipelineDisableArgs) err } func parseMQL(mql, mqlFile string) ([][]byte, error) { - if mqlFile != "" { - if mql != "" { - return nil, errors.New("data pipeline MQL and MQL file cannot both be provided") - } + if mqlFile != "" && mql != "" { + return nil, errors.New("data pipeline MQL and MQL file cannot both be provided") + } + if mqlFile != "" { //nolint:gosec // mqlFile is a user-provided path for reading MQL query files content, err := os.ReadFile(mqlFile) if err != nil { From c4772e5b499a0c9d1976003f388f632e7c8492c8 Mon Sep 17 00:00:00 2001 From: Sagie Maoz Date: Tue, 15 Apr 2025 13:42:21 -0400 Subject: [PATCH 29/37] MQL parsing error msg --- cli/datapipelines.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cli/datapipelines.go b/cli/datapipelines.go index ffa376fcb39..a63ee499be0 100644 --- a/cli/datapipelines.go +++ b/cli/datapipelines.go @@ -284,7 +284,7 @@ func parseMQL(mql, mqlFile string) ([][]byte, error) { // Parse the MQL stages JSON (using JSON5 for unquoted keys + comments). var mqlArray []bson.M if err := json5.Unmarshal([]byte(mql), &mqlArray); err != nil { - return nil, fmt.Errorf("invalid MQL: %w", err) + return nil, fmt.Errorf("unable to parse MQL argument: %w", err) } var mqlBinary [][]byte From 3c0a0fd089accd1aa735d1be877511ff69c4fd87 Mon Sep 17 00:00:00 2001 From: Sagie Maoz Date: Tue, 15 Apr 2025 13:44:35 -0400 Subject: [PATCH 30/37] couple more mql parsing test cases --- cli/datapipelines_test.go | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/cli/datapipelines_test.go b/cli/datapipelines_test.go index 2dfc9f817c5..85bb264b57f 100644 --- a/cli/datapipelines_test.go +++ b/cli/datapipelines_test.go @@ -45,7 +45,7 @@ func TestParseMQL(t *testing.T) { }, "valid MQL file": { mqlString: "", - mqlFile: createTempMQLFile(t), + mqlFile: createTempMQLFile(t, mqlString), expectedError: false, expectedBSON: mqlBSON, }, @@ -56,7 +56,7 @@ func TestParseMQL(t *testing.T) { }, "both string and file provided": { mqlString: mqlString, - mqlFile: createTempMQLFile(t), + mqlFile: createTempMQLFile(t, mqlString), expectedError: true, }, "invalid MQL JSON string": { @@ -64,6 +64,16 @@ func TestParseMQL(t *testing.T) { mqlFile: "", expectedError: true, }, + "invalid MQL JSON file": { + mqlString: "", + mqlFile: createTempMQLFile(t, `[{"$match": {"component_name": "dragino"`), // missing closing brackets + expectedError: true, + }, + "invalid MQL file path": { + mqlString: "", + mqlFile: "invalid/path/to/mql.json", + expectedError: true, + }, } for name, tc := range testCases { @@ -102,13 +112,13 @@ func testBSONResemble(t *testing.T, actual, expected bson.M) { test.That(t, string(actualJSON), test.ShouldEqualJSON, string(expectedJSON)) } -func createTempMQLFile(t *testing.T) string { +func createTempMQLFile(t *testing.T, mql string) string { t.Helper() f, err := os.CreateTemp("", "mql.json") test.That(t, err, test.ShouldBeNil) - _, err = f.WriteString(mqlString) + _, err = f.WriteString(mql) test.That(t, err, test.ShouldBeNil) err = f.Close() test.That(t, err, test.ShouldBeNil) From 6d69ecd32fdb8d70fb5a816c569887ec0204fdb7 Mon Sep 17 00:00:00 2001 From: Sagie Maoz Date: Tue, 15 Apr 2025 13:49:24 -0400 Subject: [PATCH 31/37] s/mql-file/mql-path --- cli/app.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cli/app.go b/cli/app.go index 96dc65915e1..8302b348e54 100644 --- a/cli/app.go +++ b/cli/app.go @@ -119,7 +119,7 @@ const ( datapipelineFlagSchedule = "schedule" datapipelineFlagMQL = "mql" - datapipelineFlagMQLFile = "mql-file" + datapipelineFlagMQLFile = "mql-path" packageFlagFramework = "model-framework" From 2d6434baac4955df6acc71ee6ba069941b6c635b Mon Sep 17 00:00:00 2001 From: Sagie Maoz Date: Tue, 15 Apr 2025 13:54:42 -0400 Subject: [PATCH 32/37] mqlPath --- cli/datapipelines.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cli/datapipelines.go b/cli/datapipelines.go index a63ee499be0..ef32e65745b 100644 --- a/cli/datapipelines.go +++ b/cli/datapipelines.go @@ -53,7 +53,7 @@ type datapipelineCreateArgs struct { Name string Schedule string MQL string - MqlFile string + MqlPath string } // DatapipelineCreateAction creates a new data pipeline. @@ -63,7 +63,7 @@ func DatapipelineCreateAction(c *cli.Context, args datapipelineCreateArgs) error return err } - mqlBinary, err := parseMQL(args.MQL, args.MqlFile) + mqlBinary, err := parseMQL(args.MQL, args.MqlPath) if err != nil { return err } From eb29e79c0783e1b5f529db03d1bb77acf59cfe9e Mon Sep 17 00:00:00 2001 From: Sagie Maoz Date: Tue, 15 Apr 2025 16:59:55 -0400 Subject: [PATCH 33/37] list enabled state --- cli/datapipelines.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/cli/datapipelines.go b/cli/datapipelines.go index ef32e65745b..0e0b7221c77 100644 --- a/cli/datapipelines.go +++ b/cli/datapipelines.go @@ -42,7 +42,11 @@ func DatapipelineListAction(c *cli.Context, args datapipelineListArgs) error { } for _, pipeline := range resp.GetDataPipelines() { - printf(c.App.Writer, "\t%s (ID: %s)", pipeline.Name, pipeline.Id) + enabled := "Enabled" + if !pipeline.Enabled { + enabled = "Disabled" + } + printf(c.App.Writer, "\t%s (ID: %s) [%s]", pipeline.Name, pipeline.Id, enabled) } return nil From 332fc6b6f2c083d11fd33cc6c5ec567463a7acff Mon Sep 17 00:00:00 2001 From: Sagie Maoz Date: Thu, 17 Apr 2025 13:58:59 -0400 Subject: [PATCH 34/37] add some fancy json5 stuff to the example --- cli/datapipelines_test.go | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/cli/datapipelines_test.go b/cli/datapipelines_test.go index 85bb264b57f..2c9eb05f751 100644 --- a/cli/datapipelines_test.go +++ b/cli/datapipelines_test.go @@ -14,10 +14,10 @@ var ( {"$match": { "component_name": "dragino" }}, {"$group": { "_id": "$part_id", - "count": { "$sum": 1 }, + "count": { "$sum": 1 }, // a comment just for fun "avgTemp": { "$avg": "$data.readings.TempC_SHT" }, "avgHum": { "$avg": "$data.readings.Hum_SHT" } - }} + }}, ]` mqlBSON = []bson.M{ {"$match": bson.M{"component_name": "dragino"}}, @@ -127,15 +127,21 @@ func createTempMQLFile(t *testing.T, mql string) string { } func TestMQLJSON(t *testing.T) { - expectedBSONBytes := make([][]byte, len(mqlBSON)) + // expectedJSON is a vanilla JSON representation of the MQL string. + expectedJSON := `[{"$match":{"component_name":"dragino"}}, + {"$group":{"_id":"$part_id","count":{"$sum":1}, + "avgTemp":{"$avg":"$data.readings.TempC_SHT"}, + "avgHum":{"$avg":"$data.readings.Hum_SHT"}}}]` + + bsonBytes := make([][]byte, len(mqlBSON)) var err error for i, bsonDoc := range mqlBSON { - expectedBSONBytes[i], err = bson.Marshal(bsonDoc) + bsonBytes[i], err = bson.Marshal(bsonDoc) if err != nil { break } } - json, err := mqlJSON(expectedBSONBytes) + json, err := mqlJSON(bsonBytes) test.That(t, err, test.ShouldBeNil) - test.That(t, json, test.ShouldEqualJSON, mqlString) + test.That(t, json, test.ShouldEqualJSON, expectedJSON) } From 19f5cede84f531d093dbfe871920ebed99d076cf Mon Sep 17 00:00:00 2001 From: Sagie Maoz Date: Thu, 17 Apr 2025 14:55:39 -0400 Subject: [PATCH 35/37] mql path arg name fix oops --- cli/datapipelines.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/cli/datapipelines.go b/cli/datapipelines.go index 0e0b7221c77..392487019dc 100644 --- a/cli/datapipelines.go +++ b/cli/datapipelines.go @@ -92,7 +92,7 @@ type datapipelineUpdateArgs struct { Name string Schedule string MQL string - MqlFile string + MqlPath string } // DatapipelineUpdateAction updates an existing data pipeline. @@ -121,8 +121,8 @@ func DatapipelineUpdateAction(c *cli.Context, args datapipelineUpdateArgs) error } mqlBinary := current.GetMqlBinary() - if args.MQL != "" || args.MqlFile != "" { - mqlBinary, err = parseMQL(args.MQL, args.MqlFile) + if args.MQL != "" || args.MqlPath != "" { + mqlBinary, err = parseMQL(args.MQL, args.MqlPath) if err != nil { return err } From 50fa205437edc701628a50e17b985c06afa8b19b Mon Sep 17 00:00:00 2001 From: Sagie Maoz Date: Mon, 21 Apr 2025 13:01:19 -0400 Subject: [PATCH 36/37] updated ListDataPipelineRuns name --- cli/datapipelines.go | 14 +++++++------- go.mod | 2 +- go.sum | 2 ++ 3 files changed, 10 insertions(+), 8 deletions(-) diff --git a/cli/datapipelines.go b/cli/datapipelines.go index 392487019dc..95224c707e8 100644 --- a/cli/datapipelines.go +++ b/cli/datapipelines.go @@ -15,12 +15,12 @@ import ( ) // pipelineRunStatusMap maps pipeline run statuses to human-readable strings. -var pipelineRunStatusMap = map[datapipelinespb.PipelineRunStatus]string{ - datapipelinespb.PipelineRunStatus_PIPELINE_RUN_STATUS_UNSPECIFIED: "Unknown", - datapipelinespb.PipelineRunStatus_PIPELINE_RUN_STATUS_SCHEDULED: "Scheduled", - datapipelinespb.PipelineRunStatus_PIPELINE_RUN_STATUS_STARTED: "Running", - datapipelinespb.PipelineRunStatus_PIPELINE_RUN_STATUS_COMPLETED: "Success", - datapipelinespb.PipelineRunStatus_PIPELINE_RUN_STATUS_FAILED: "Failed", +var pipelineRunStatusMap = map[datapipelinespb.DataPipelineRunStatus]string{ + datapipelinespb.DataPipelineRunStatus_DATA_PIPELINE_RUN_STATUS_UNSPECIFIED: "Unknown", + datapipelinespb.DataPipelineRunStatus_DATA_PIPELINE_RUN_STATUS_SCHEDULED: "Scheduled", + datapipelinespb.DataPipelineRunStatus_DATA_PIPELINE_RUN_STATUS_STARTED: "Running", + datapipelinespb.DataPipelineRunStatus_DATA_PIPELINE_RUN_STATUS_COMPLETED: "Success", + datapipelinespb.DataPipelineRunStatus_DATA_PIPELINE_RUN_STATUS_FAILED: "Failed", } type datapipelineListArgs struct { @@ -183,7 +183,7 @@ func DatapipelineDescribeAction(c *cli.Context, args datapipelineDescribeArgs) e } pipeline := resp.GetDataPipeline() - runsResp, err := client.datapipelinesClient.ListPipelineRuns(context.Background(), &datapipelinespb.ListPipelineRunsRequest{ + runsResp, err := client.datapipelinesClient.ListDataPipelineRuns(context.Background(), &datapipelinespb.ListDataPipelineRunsRequest{ Id: args.ID, PageSize: 1, }) diff --git a/go.mod b/go.mod index cb0a7829835..b9f9fbf3581 100644 --- a/go.mod +++ b/go.mod @@ -77,7 +77,7 @@ require ( go.uber.org/atomic v1.11.0 go.uber.org/multierr v1.11.0 go.uber.org/zap v1.27.0 - go.viam.com/api v0.1.430 + go.viam.com/api v0.1.432 go.viam.com/test v1.2.4 go.viam.com/utils v0.1.140 goji.io v2.0.2+incompatible diff --git a/go.sum b/go.sum index 4b2828c64cd..6d598fd5cc4 100644 --- a/go.sum +++ b/go.sum @@ -1534,6 +1534,8 @@ go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8= go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= go.viam.com/api v0.1.430 h1:6CF3xIA5CGJb3qkQeKYG3FWNinEy6S3XtQBfcjaPUbk= go.viam.com/api v0.1.430/go.mod h1:drvlBWaiHFxPziz5jayHvibez1qG7lylcNCC1LF8onU= +go.viam.com/api v0.1.432 h1:XT2HfUC/nO/1otRbYat25E4dhk+9KiTU/yj2jmy/YMM= +go.viam.com/api v0.1.432/go.mod h1:drvlBWaiHFxPziz5jayHvibez1qG7lylcNCC1LF8onU= go.viam.com/test v1.2.4 h1:JYgZhsuGAQ8sL9jWkziAXN9VJJiKbjoi9BsO33TW3ug= go.viam.com/test v1.2.4/go.mod h1:zI2xzosHdqXAJ/kFqcN+OIF78kQuTV2nIhGZ8EzvaJI= go.viam.com/utils v0.1.140 h1:u3ohwE4zuJX5R0MfzbyLOdnhT/GGM8S98UaBBLdmRrE= From 736d558eb7569a44fa0ece5853ca7404c51a4cee Mon Sep 17 00:00:00 2001 From: Sagie Maoz Date: Mon, 21 Apr 2025 16:12:24 -0400 Subject: [PATCH 37/37] cleanup --- go.sum | 2 -- 1 file changed, 2 deletions(-) diff --git a/go.sum b/go.sum index 6d598fd5cc4..cf7f2e3b75a 100644 --- a/go.sum +++ b/go.sum @@ -1532,8 +1532,6 @@ go.uber.org/zap v1.18.1/go.mod h1:xg/QME4nWcxGxrpdeYfq7UvYrLh66cuVKdrbD1XF/NI= go.uber.org/zap v1.23.0/go.mod h1:D+nX8jyLsMHMYrln8A0rJjFt/T/9/bGgIhAqxv5URuY= go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8= go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= -go.viam.com/api v0.1.430 h1:6CF3xIA5CGJb3qkQeKYG3FWNinEy6S3XtQBfcjaPUbk= -go.viam.com/api v0.1.430/go.mod h1:drvlBWaiHFxPziz5jayHvibez1qG7lylcNCC1LF8onU= go.viam.com/api v0.1.432 h1:XT2HfUC/nO/1otRbYat25E4dhk+9KiTU/yj2jmy/YMM= go.viam.com/api v0.1.432/go.mod h1:drvlBWaiHFxPziz5jayHvibez1qG7lylcNCC1LF8onU= go.viam.com/test v1.2.4 h1:JYgZhsuGAQ8sL9jWkziAXN9VJJiKbjoi9BsO33TW3ug=