Skip to content

Commit

Permalink
Add reprocessing of yesterday at 3:00 am UTC. (#202)
Browse files Browse the repository at this point in the history
* Add code to always process yesterday

* add unit test and fix bugs

* include FakeTime test code

* fix race in test fixture

* various minor cleanups

* fix initialization race

* fix monkeypatch import

* minor improvements
  • Loading branch information
gfr10598 committed Nov 12, 2019
1 parent 288c871 commit efa2e39
Show file tree
Hide file tree
Showing 3 changed files with 189 additions and 14 deletions.
64 changes: 53 additions & 11 deletions reproc/dispatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,15 +271,58 @@ queueLoop:
return maxDate, nil
}

var dailyDelay = 3 * time.Hour

// findNextRecentDay finds an appropriate date to start daily processing.
func findNextRecentDay(start time.Time, skip int) time.Time {
// Normally we'll reprocess yesterday after 3:00 am UTC.
// However, allow an extra hour of leeway when restarting.
// This may mean yesterday gets processed twice in a row.
yesterday := time.Now().Add(time.Hour + dailyDelay).UTC().Truncate(24 * time.Hour)
if skip == 0 {
log.Println("Most recent day to process is:", yesterday.Format("2006/01/02"))
return yesterday
}

nextCurrentDate := start
next := nextCurrentDate.AddDate(0, 0, 1+skip)

for next.Before(yesterday) {
nextCurrentDate = next
next = nextCurrentDate.AddDate(0, 0, 1+skip)
}

return nextCurrentDate
}

// doDispatchLoop just sequences through archives in date order.
// It will generally be blocked on the queues.
// It will start processing at startDate, and when it catches up to "now" it will restart at restartDate.
func doDispatchLoop(ctx context.Context, handler *TaskHandler, bucket string, exp string, startDate time.Time, restartDate time.Time, dateSkip int) {
func doDispatchLoop(ctx context.Context, handler *TaskHandler, bucket string, expAndType string, startDate time.Time, restartDate time.Time, dateSkip int) {
log.Println("(Re)starting at", startDate)
next := startDate

nextRecent := findNextRecentDay(startDate, dateSkip)
next := startDate.Truncate(24 * time.Hour)

for {
prefix := fmt.Sprintf("gs://%s/%s/", bucket, exp) + next.Format("2006/01/02/")
// If it is 3 hours past the end of the nextRecent day, we should process it now.
if time.Since(nextRecent) > 24*time.Hour+dailyDelay {
// Only process if next isn't same or later date.
if nextRecent.After(next.Add(time.Hour)) {
prefix := fmt.Sprintf("gs://%s/%s/", bucket, expAndType) + nextRecent.Format("2006/01/02/")

log.Println("Processing yesterday:", prefix)
// Note that this blocks until a queue is available.
err := handler.AddTask(ctx, prefix)
if err != nil {
// Only error expected here is ErrTerminating
log.Println(err)
}
}
nextRecent = nextRecent.AddDate(0, 0, 1+dateSkip)
}

prefix := fmt.Sprintf("gs://%s/%s/", bucket, expAndType) + next.Format("2006/01/02/")

// Note that this blocks until a queue is available.
err := handler.AddTask(ctx, prefix)
Expand All @@ -291,8 +334,8 @@ func doDispatchLoop(ctx context.Context, handler *TaskHandler, bucket string, ex
// Advance to next date, possibly skipping days if DATE_SKIP env var was set.
next = next.AddDate(0, 0, 1+dateSkip)

// If gardener has processed all dates up now, then start over.
if next.After(time.Now()) {
// Start over if next date is less than 3 days ago.
if time.Since(next) < 72*time.Hour {
// TODO - load this from DataStore
log.Println("Starting over at", restartDate)
next = restartDate
Expand All @@ -301,10 +344,8 @@ func doDispatchLoop(ctx context.Context, handler *TaskHandler, bucket string, ex
}

// RunDispatchLoop sets up dispatch loop.
// TODO inject DatastoreSaver to allow unit testing?? TaskHandler would require a fake task queue.
// However, this code will be replaced soon. Replacement code should have better unit tests, but it might
// be wasted effort to improve coverage on this code.
func RunDispatchLoop(ctx context.Context, th *TaskHandler, project string, bucket string, exp string, startDate time.Time, dateSkip int) error {
// TODO - refactor to take tasks as argument, instead of loading them.
func RunDispatchLoop(ctx context.Context, th *TaskHandler, project string, bucket string, expAndType string, startDate time.Time, dateSkip int) error {
ds, err := state.NewDatastoreSaver(ctx, project)
if err != nil {
log.Println(err)
Expand All @@ -313,13 +354,14 @@ func RunDispatchLoop(ctx context.Context, th *TaskHandler, project string, bucke

// Move the timeout into GetStatus?
taskCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
tasks, err := ds.FetchAllTasks(taskCtx, exp)
tasks, err := ds.FetchAllTasks(taskCtx, expAndType)
cancel()

if err != nil {
log.Println(err)
return err
}

maxDate, err := th.RestartTasks(ctx, tasks)
if err != nil {
log.Println(err)
Expand All @@ -335,7 +377,7 @@ func RunDispatchLoop(ctx context.Context, th *TaskHandler, project string, bucke
}

log.Println("Using start date of", startDate)
go doDispatchLoop(ctx, th, bucket, exp, restartDate, startDate, dateSkip)
go doDispatchLoop(ctx, th, bucket, expAndType, restartDate, startDate, dateSkip)

return nil
}
136 changes: 133 additions & 3 deletions reproc/dispatch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,22 @@ package reproc_test

import (
"context"
"flag"
"log"
"math/rand"
"sync"
"sync/atomic"
"testing"
"time"

"bou.ke/monkey"

"github.com/m-lab/etl-gardener/reproc"
"github.com/m-lab/etl-gardener/state"
)

var verbose = false

func init() {
// Always prepend the filename and line number.
log.SetFlags(log.LstdFlags | log.Lshortfile)
Expand Down Expand Up @@ -71,12 +77,33 @@ func (s *testSaver) DeleteTask(ctx context.Context, t state.Task) error {
return nil
}

func (s *testSaver) getTask(name string) []state.Task {
s.lock.Lock()
defer s.lock.Unlock()
return s.tasks[name]
}

func (s *testSaver) getTaskStates() [][]state.Task {
s.lock.Lock()
defer s.lock.Unlock()
taskStates := make([][]state.Task, len(s.tasks))
i := 0
for _, t := range s.tasks {
taskStates[i] = t
i++
}
return taskStates
}

func assertPersistentStore() { func(ex state.PersistentStore) {}(&testSaver{}) }

type Exec struct{}

func (ex *Exec) Next(ctx context.Context, t *state.Task, terminate <-chan struct{}) error {
log.Println("Do", t)
if verbose {
log.Println("Do", t)
}

time.Sleep(time.Duration(1+rand.Intn(2)) * time.Millisecond)

switch t.State {
Expand Down Expand Up @@ -131,6 +158,8 @@ func TestBasic(t *testing.T) {
// may fail to complete. Also, running with -race may detect race
// conditions.
func TestWithTaskQueue(t *testing.T) {
verbose = true

ctx := context.Background()
// Start tracker with one queue.
exec := Exec{}
Expand All @@ -148,6 +177,8 @@ func TestWithTaskQueue(t *testing.T) {
}

func TestRestart(t *testing.T) {
verbose = true

ctx := context.Background()
exec := Exec{}
saver := NewTestSaver()
Expand All @@ -162,6 +193,105 @@ func TestRestart(t *testing.T) {
tasks := []state.Task{*t1}
th.RestartTasks(ctx, tasks)

time.Sleep(5 * time.Second)
log.Println(saver.tasks[taskName])
// Restarts are asynchronous, so wait up to 5 seconds for task to be started.
start := time.Now()
for time.Since(start) < 5*time.Second &&
saver.getTask(taskName) == nil {
time.Sleep(10 * time.Millisecond)
}
if saver.getTask(taskName) == nil {
t.Fatal("Task never started")
}
}

/*********************************
This block of code vvvvvvvv will move to go/test
***********************************************/

// FakeTime sets the current time to midnight UTC 1 month ago, then advances time at
// the speed indicated by the multiplier.
// NOTE: Since this replaces time.Now() for the entire process, it should not be used in
// parallel, e.g. for concurrent unit tests.
func FakeTime(multiplier int64) func() {
if flag.Lookup("test.v") == nil {
log.Fatal("package go/test should not be used outside unit tests")
}

var fakeNow int64

atomic.StoreInt64(&fakeNow, time.Now().AddDate(0, -1, 0).UTC().Truncate(24*time.Hour).UnixNano())

f := func() time.Time {
return time.Unix(0, atomic.LoadInt64(&fakeNow)) // race
}

monkey.Patch(time.Now, f)

ticker := time.NewTicker(time.Millisecond)
go func() {
for range ticker.C {
atomic.AddInt64(&fakeNow, multiplier*int64(time.Millisecond))
}
}()

return ticker.Stop
}

// StopFakeTime restores the normal time.Now() function.
func StopFakeTime(stop func()) {
log.Println("Stopping fake clock")
stop()
monkey.Unpatch(time.Now)
}

/*********************************
This block of code ^^^^^^ will move to go/test
***********************************************/

func TestDoDispatchLoop(t *testing.T) {
verbose = false

// Set up time to go at approximately 30 days/second.
stop := FakeTime(int64((30 * 24 * time.Hour) / (1000 * time.Millisecond)))
// Virtual start time.
start := time.Now().UTC()

ctx := context.Background()
exec := Exec{}
saver := NewTestSaver()
th := reproc.NewTaskHandler("exp", &exec, []string{"queue-1", "queue-2", "queue-3"}, saver)

restart := time.Date(2013, 1, 1, 0, 0, 0, 0, time.UTC)
startDate := time.Date(2013, 2, 1, 0, 0, 0, 0, time.UTC)
go reproc.DoDispatchLoop(ctx, th, "foobar", "exp", restart, startDate, 0)

// run for 3 virtual days
for {
if time.Since(start) > (3*24+12)*time.Hour {
break
}
}

// th.Terminate()

StopFakeTime(stop)

// FakeTime starts at midnight UTC, so we should see the previous day.
recent := "gs://foobar/exp" + start.Add(-24*time.Hour).Format("/2006/01/02/")
// We expect to see at least 3 distinct recent dates...
recents := map[string]bool{}
tasks := saver.getTaskStates()
for _, task := range tasks {
taskEnd := task[len(task)-1]
if taskEnd.Name >= recent {
t.Log(taskEnd)
recents[taskEnd.Name] = true
}
}

// Count should be 3 or 4 days. There is some variation because of the randomness
// in processing time in the fake Exec.Next() function.
if len(recents) < 3 {
t.Error("Should have seen at least 3 daily jobs", recents)
}
}
3 changes: 3 additions & 0 deletions reproc/export_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
package reproc

var DoDispatchLoop = doDispatchLoop

0 comments on commit efa2e39

Please sign in to comment.