Skip to content

Commit

Permalink
Add -input_location flag for gardener load bucket source (#408)
Browse files Browse the repository at this point in the history
* Add input location flag
* Add -input_location flag to k8s configs
  • Loading branch information
stephen-soltesz committed Aug 5, 2022
1 parent f047629 commit 689bcda
Show file tree
Hide file tree
Showing 4 changed files with 10 additions and 7 deletions.
3 changes: 2 additions & 1 deletion cmd/gardener/gardener.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ var (
statusPort = flag.String("status_port", ":0", "The public interface port where status (and pprof) will be published")
gardenerAddr = flag.String("gardener_addr", ":8080", "The listen address for the gardener jobs service")
configPath = flag.String("config_path", "config.yml", "Path to the config file.")
inputLocation = flag.String("input_location", "", "Load from this GCS bucket.")

// Context and injected variables to allow smoke testing of main()
mainCtx, mainCancel = context.WithCancel(context.Background())
Expand Down Expand Up @@ -274,7 +275,7 @@ func main() {
Client: nil,
}
bqConfig := NewBQConfig(cloudCfg)
monitor, err := ops.NewStandardMonitor(mainCtx, project, bqConfig, globalTracker)
monitor, err := ops.NewStandardMonitor(mainCtx, project, *inputLocation, bqConfig, globalTracker)
rtx.Must(err, "NewStandardMonitor failed")
go monitor.Watch(mainCtx, 5*time.Second)

Expand Down
1 change: 1 addition & 0 deletions k8s/data-processing/deployments/etl-gardener-universal.yml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ spec:
"-project={{GCLOUD_PROJECT}}",
"-shutdown_timeout=5m",
"-job_expiration_time=6h",
"-input_location=etl-{{GCLOUD_PROJECT}}", # must correspond to -output_location from etl parser.
]
ports:
- name: prometheus-port
Expand Down
11 changes: 6 additions & 5 deletions ops/actions.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,13 @@ func newJoinConditionFunc(tk *tracker.Tracker, detail string) ConditionFunc {
}

type actionEnv struct {
project string
project string
inputBucket string
}

// NewStandardMonitor creates the standard monitor that handles several state transitions.
func NewStandardMonitor(ctx context.Context, project string, config cloud.BQConfig, tk *tracker.Tracker) (*Monitor, error) {
a := actionEnv{project: project}
func NewStandardMonitor(ctx context.Context, project, inputBucket string, config cloud.BQConfig, tk *tracker.Tracker) (*Monitor, error) {
a := actionEnv{project: project, inputBucket: inputBucket}
m, err := NewMonitor(ctx, config, tk)
if err != nil {
return nil, err
Expand Down Expand Up @@ -145,8 +146,8 @@ func waitAndCheck(ctx context.Context, bqJob bqiface.Job, j tracker.Job, label s
// would be a good place for the TableOps object.
func (a *actionEnv) tableOps(ctx context.Context, j tracker.Job) (*bq.TableOps, error) {
// TODO pass in the JobWithTarget, and get this info from Target.
loadSource := fmt.Sprintf("gs://etl-%s/%s/%s/%s/%s/*",
a.project, j.Bucket, j.Experiment, j.Datatype, j.Date.Format(timex.YYYYMMDDWithSlash))
loadSource := fmt.Sprintf("gs://%s/%s/%s/%s/%s/*",
a.inputBucket, j.Bucket, j.Experiment, j.Datatype, j.Date.Format(timex.YYYYMMDDWithSlash))
return bq.NewTableOps(ctx, j, a.project, loadSource)
}

Expand Down
2 changes: 1 addition & 1 deletion ops/actions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func TestStandardMonitor(t *testing.T) {
tk.AddJob(jobs[i])
}

m, err := ops.NewStandardMonitor(context.Background(), "mlab-testing", cloud.BQConfig{}, tk)
m, err := ops.NewStandardMonitor(context.Background(), "mlab-testing", "etl-mlab-testing", cloud.BQConfig{}, tk)
rtx.Must(err, "NewMonitor failure")
// We override some actions in place of the default Parser activity.
// The resulting sequence should be:
Expand Down

0 comments on commit 689bcda

Please sign in to comment.