Skip to content

Commit

Permalink
Merge pull request #62 from m-lab/exec-b2
Browse files Browse the repository at this point in the history
Add NewTask
  • Loading branch information
gfr10598 committed Jul 3, 2018
2 parents ee3eb97 + 7713cc2 commit 46258a9
Show file tree
Hide file tree
Showing 3 changed files with 154 additions and 91 deletions.
54 changes: 47 additions & 7 deletions state/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"io"
"log"
"math/rand"
"regexp"
"time"

"cloud.google.com/go/datastore"
Expand Down Expand Up @@ -111,19 +112,58 @@ func (ds *DatastoreSaver) DeleteTask(t Task) error {
// Task contains the state of a single Task.
// These will be stored and retrieved from DataStore
type Task struct {
Name string // e.g. gs://archive-mlab-oti/ndt/2017/06/01/
TableSuffix string // e.g. 20170601
State State
Queue string // The queue the task files are submitted to, or "".
JobID string // BigQuery JobID, when the state is Deduplicating
ErrMsg string // Task handling error, if any
ErrInfo string // More context about any error, if any
Name string // e.g. gs://archive-mlab-oti/ndt/2017/06/01/
Date time.Time
State State
Queue string // The queue the task files are submitted to, or "".
JobID string // BigQuery JobID, when the state is Deduplicating
ErrMsg string // Task handling error, if any
ErrInfo string // More context about any error, if any

UpdateTime time.Time

saver Saver // Saver is used for Save operations. Stored locally, but not persisted.
}

// NewTask properly initializes a new task, complete with saver.
func NewTask(name string, queue string, saver Saver) (*Task, error) {
t := Task{Name: name, State: Initializing, Queue: queue, saver: saver}
parts, err := t.ParsePrefix()
if err != nil {
return nil, err
}
date, err := time.Parse("2006/01/02", parts[3])
if err != nil {
return nil, err
}
t.Date = date
return &t, nil
}

// HACK - remove from tq package?
const start = `^gs://(?P<bucket>.*)/(?P<exp>[^/]*)/`
const datePath = `(?P<datepath>\d{4}/[01]\d/[0123]\d)/`

// These are here to facilitate use across queue-pusher and parsing components.
var (
// This matches any valid test file name, and some invalid ones.
prefixPattern = regexp.MustCompile(start + // #1 #2
datePath) // #3 - YYYY/MM/DD
)

// ParsePrefix Parses prefix, returning {bucket, experiment, date string}, error
func (t *Task) ParsePrefix() ([]string, error) {
fields := prefixPattern.FindStringSubmatch(t.Name)

if fields == nil {
return nil, errors.New("Invalid test path: " + t.Name)
}
if len(fields) < 4 {
return nil, errors.New("Path does not include all fields: " + t.Name)
}
return fields, nil
}

func (t Task) String() string {
return fmt.Sprintf("{%s: %s, Q:%s, J:%s, E:%s (%s)}", t.Name, StateNames[t.State], t.Queue, t.JobID, t.ErrMsg, t.ErrInfo)
}
Expand Down
107 changes: 107 additions & 0 deletions state/state_bb_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
// +build integration

package state_test

import (
"bytes"
"context"
"log"
"strings"
"testing"
"time"

"github.com/m-lab/etl-gardener/state"
)

func TestStatus(t *testing.T) {
saver, err := state.NewDatastoreSaver("mlab-testing")
if err != nil {
log.Println(saver)
t.Fatal(err)
}
task, err := state.NewTask("gs://foo/bar/2000/01/01/task1", "Q1", saver)
if err != nil {
t.Fatal(err)
}
log.Println("saving")
err = task.Save()
if err != nil {
t.Fatal(err)
}
task.Name = "task2"
task.Queue = "Q2"
err = task.Update(state.Queuing)
if err != nil {
t.Fatal(err)
}

// Real datastore takes about 100 msec or more before consistency.
// In travis, we use the emulator, which should provide consistency
// much more quickly. So we use a modest number here that usually
// is sufficient for running on workstation.
time.Sleep(200 * time.Millisecond)
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 2*time.Second)
defer cancel()
tasks, err := saver.GetStatus(ctx)
if err != nil {
t.Fatal(err)
}
if ctx.Err() != nil {
t.Fatal(ctx.Err())
}
if len(tasks) != 2 {
t.Error("Should be 2 tasks (see notes on consistency", len(tasks))
for _, t := range tasks {
log.Println(t)
}
}

for i := range tasks {
err := saver.DeleteTask(tasks[i])
if err != nil {
t.Error(err)
}
}
}

func TestWriteStatus(t *testing.T) {
saver, err := state.NewDatastoreSaver("mlab-testing")
if err != nil {
t.Fatal(err)
}
task, err := state.NewTask("gs://foo/bar/2000/01/01/task1", "Q1", saver)
if err != nil {
t.Fatal(err)
}
task.Save()
t1 := task
task.Name = "task2"
task.Queue = "Q2"
task.Update(state.Queuing)
t2 := task
time.Sleep(200 * time.Millisecond)

bb := make([]byte, 0, 500)
buf := bytes.NewBuffer(bb)

err = state.WriteHTMLStatusTo(buf, "mlab-testing")
if err != nil {
t.Fatal(err)
}
if !strings.Contains(buf.String(), "task1") {
t.Error("Missing task1")
}
if !strings.Contains(buf.String(), "task2") {
t.Error("Missing task2")
}

err = t1.Delete()
if err != nil {
t.Error(err)
}
err = t2.Delete()
if err != nil {
t.Error(err)
}
}
84 changes: 0 additions & 84 deletions state/state_test.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,9 @@
package state_test

import (
"bytes"
"context"
"errors"
"log"
"strings"
"testing"
"time"

"github.com/m-lab/etl-gardener/state"
)
Expand Down Expand Up @@ -79,83 +75,3 @@ func TestTaskBasics(t *testing.T) {
t.Fatal("Should have called delete")
}
}

func TestStatus(t *testing.T) {
saver, err := state.NewDatastoreSaver("mlab-testing")
if err != nil {
t.Fatal(err)
}
task := state.Task{Name: "task1", Queue: "Q1", State: state.Initializing}
task.SetSaver(saver)
log.Println("saving")
err = task.Save()
if err != nil {
t.Fatal(err)
}
task.Name = "task2"
task.Queue = "Q2"
err = task.Update(state.Queuing)
if err != nil {
t.Fatal(err)
}

// Real datastore takes about 100 msec or more before consistency.
// In travis, we use the emulator, which should provide consistency
// much more quickly. So we use a modest number here that usually
// is sufficient for running on workstation.
time.Sleep(500 * time.Millisecond)
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 2*time.Second)
defer cancel()
tasks, err := saver.GetStatus(ctx)
if err != nil {
t.Fatal(err)
}
if ctx.Err() != nil {
t.Fatal(ctx.Err())
}
if len(tasks) != 3 {
log.Println("See notes in code about consistency.")
t.Error("Should be 3 tasks", len(tasks))
for _, t := range tasks {
log.Println(t)
}
}
err = task.Delete()
if err != nil {
t.Fatal(err)
}
}

func TestWriteStatus(t *testing.T) {
saver, err := state.NewDatastoreSaver("mlab-testing")
if err != nil {
t.Fatal(err)
}
task := state.Task{Name: "task1", Queue: "Q1", State: state.Initializing}
task.SetSaver(saver)
task.Save()
task.Name = "task2"
task.Queue = "Q2"
task.Update(state.Queuing)
time.Sleep(200 * time.Millisecond)

bb := make([]byte, 0, 500)
buf := bytes.NewBuffer(bb)

err = state.WriteHTMLStatusTo(buf, "mlab-testing")
if err != nil {
t.Fatal(err)
}
if !strings.Contains(buf.String(), "task1") {
t.Error("Missing task1")
}
if !strings.Contains(buf.String(), "task2") {
t.Error("Missing task2")
}

err = task.Delete()
if err != nil {
t.Fatal(err)
}
}

0 comments on commit 46258a9

Please sign in to comment.