From 689bcda01c5167a435258fd32d37df1c56e18b75 Mon Sep 17 00:00:00 2001 From: Stephen Soltesz Date: Fri, 5 Aug 2022 15:06:13 -0400 Subject: [PATCH] Add -input_location flag for gardener load bucket source (#408) * Add input location flag * Add -input_location flag to k8s configs --- cmd/gardener/gardener.go | 3 ++- .../deployments/etl-gardener-universal.yml | 1 + ops/actions.go | 11 ++++++----- ops/actions_test.go | 2 +- 4 files changed, 10 insertions(+), 7 deletions(-) diff --git a/cmd/gardener/gardener.go b/cmd/gardener/gardener.go index f85466fb..8697168a 100644 --- a/cmd/gardener/gardener.go +++ b/cmd/gardener/gardener.go @@ -59,6 +59,7 @@ var ( statusPort = flag.String("status_port", ":0", "The public interface port where status (and pprof) will be published") gardenerAddr = flag.String("gardener_addr", ":8080", "The listen address for the gardener jobs service") configPath = flag.String("config_path", "config.yml", "Path to the config file.") + inputLocation = flag.String("input_location", "", "Load from this GCS bucket.") // Context and injected variables to allow smoke testing of main() mainCtx, mainCancel = context.WithCancel(context.Background()) @@ -274,7 +275,7 @@ func main() { Client: nil, } bqConfig := NewBQConfig(cloudCfg) - monitor, err := ops.NewStandardMonitor(mainCtx, project, bqConfig, globalTracker) + monitor, err := ops.NewStandardMonitor(mainCtx, project, *inputLocation, bqConfig, globalTracker) rtx.Must(err, "NewStandardMonitor failed") go monitor.Watch(mainCtx, 5*time.Second) diff --git a/k8s/data-processing/deployments/etl-gardener-universal.yml b/k8s/data-processing/deployments/etl-gardener-universal.yml index cb52a947..4b8856e9 100644 --- a/k8s/data-processing/deployments/etl-gardener-universal.yml +++ b/k8s/data-processing/deployments/etl-gardener-universal.yml @@ -44,6 +44,7 @@ spec: "-project={{GCLOUD_PROJECT}}", "-shutdown_timeout=5m", "-job_expiration_time=6h", + "-input_location=etl-{{GCLOUD_PROJECT}}", # must correspond to -output_location from etl parser. ] ports: - name: prometheus-port diff --git a/ops/actions.go b/ops/actions.go index 460b6bb8..7d7a0bc4 100644 --- a/ops/actions.go +++ b/ops/actions.go @@ -58,12 +58,13 @@ func newJoinConditionFunc(tk *tracker.Tracker, detail string) ConditionFunc { } type actionEnv struct { - project string + project string + inputBucket string } // NewStandardMonitor creates the standard monitor that handles several state transitions. -func NewStandardMonitor(ctx context.Context, project string, config cloud.BQConfig, tk *tracker.Tracker) (*Monitor, error) { - a := actionEnv{project: project} +func NewStandardMonitor(ctx context.Context, project, inputBucket string, config cloud.BQConfig, tk *tracker.Tracker) (*Monitor, error) { + a := actionEnv{project: project, inputBucket: inputBucket} m, err := NewMonitor(ctx, config, tk) if err != nil { return nil, err @@ -145,8 +146,8 @@ func waitAndCheck(ctx context.Context, bqJob bqiface.Job, j tracker.Job, label s // would be a good place for the TableOps object. func (a *actionEnv) tableOps(ctx context.Context, j tracker.Job) (*bq.TableOps, error) { // TODO pass in the JobWithTarget, and get this info from Target. - loadSource := fmt.Sprintf("gs://etl-%s/%s/%s/%s/%s/*", - a.project, j.Bucket, j.Experiment, j.Datatype, j.Date.Format(timex.YYYYMMDDWithSlash)) + loadSource := fmt.Sprintf("gs://%s/%s/%s/%s/%s/*", + a.inputBucket, j.Bucket, j.Experiment, j.Datatype, j.Date.Format(timex.YYYYMMDDWithSlash)) return bq.NewTableOps(ctx, j, a.project, loadSource) } diff --git a/ops/actions_test.go b/ops/actions_test.go index 7d9516dc..6647f8bf 100644 --- a/ops/actions_test.go +++ b/ops/actions_test.go @@ -80,7 +80,7 @@ func TestStandardMonitor(t *testing.T) { tk.AddJob(jobs[i]) } - m, err := ops.NewStandardMonitor(context.Background(), "mlab-testing", cloud.BQConfig{}, tk) + m, err := ops.NewStandardMonitor(context.Background(), "mlab-testing", "etl-mlab-testing", cloud.BQConfig{}, tk) rtx.Must(err, "NewMonitor failure") // We override some actions in place of the default Parser activity. // The resulting sequence should be: