Skip to content

Commit

Permalink
fix: instance_collector: refacto to handle cases where collector does…
Browse files Browse the repository at this point in the history
…n't pick up all tasks (#134)

* fix: instance_collector: refacto to handle cases where collector doesn't pick up all tasks

Signed-off-by: Romain Beuque <[email protected]>

* engine: foreach step should inherit from Idempotent and Resources

Signed-off-by: Romain Beuque <[email protected]>

* instance_collector: implemented semaphore to limit concurrent task execution from crashed instance

Signed-off-by: Romain Beuque <[email protected]>

* resources: Plugins can provide default resources that will be used during execution

Signed-off-by: Romain Beuque <[email protected]>

* exit sequence: add some WaitGroup to not wait if not needed

Signed-off-by: Romain Beuque <[email protected]>
  • Loading branch information
rbeuque74 committed May 6, 2020
1 parent 6f24783 commit 7af1ad1
Show file tree
Hide file tree
Showing 29 changed files with 490 additions and 131 deletions.
4 changes: 3 additions & 1 deletion api/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"os"
"strconv"
"strings"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -61,7 +62,8 @@ func TestMain(m *testing.M) {
auth.Init(store)

ctx := context.Background()
if err := engine.Init(ctx, store); err != nil {
var wg sync.WaitGroup
if err := engine.Init(ctx, &wg, store); err != nil {
panic(err)
}

Expand Down
2 changes: 1 addition & 1 deletion api/handler/resolution.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ func RunResolution(c *gin.Context, in *runResolutionIn) error {

logrus.WithFields(logrus.Fields{"resolution_id": r.PublicID}).Debugf("Handler RunResolution: manual resolve %s", r.PublicID)

return engine.GetEngine().Resolve(in.PublicID)
return engine.GetEngine().Resolve(in.PublicID, nil)
}

type extendResolutionIn struct {
Expand Down
26 changes: 21 additions & 5 deletions cmd/utask/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"net/http"
"os"
"sync"
"time"

"github.com/juju/errors"
Expand Down Expand Up @@ -164,19 +165,34 @@ var rootCmd = &cobra.Command{
if err := tasktemplate.LoadFromDir(dbp, utask.FTemplatesFolder); err != nil {
return err
}

var wg sync.WaitGroup
ctx, cancel := context.WithCancel(context.Background())
defer func() {
// Stop collectors
cancel()
log.Info("Exiting...")

// Grace period to commit still-running resolutions
time.Sleep(time.Second)
gracePeriodWaitGroup := make(chan struct{})
go func() {
wg.Wait()
close(gracePeriodWaitGroup)
}()

t := time.NewTicker(5 * time.Second)
// Running steps have 3 seconds to stop running after context cancelation
// Grace period of 2 seconds (3+2=5) to commit still-running resolutions
select {
case <-gracePeriodWaitGroup:
// all important goroutines exited successfully, bye-bye!
case <-t.C:
// game over, exiting before everyone said bye :(
log.Warn("5 seconds timeout for exiting expired")
}

log.Info("Exiting...")
log.Info("Bye!")
}()

if err := engine.Init(ctx, store); err != nil {
if err := engine.Init(ctx, &wg, store); err != nil {
return err
}

Expand Down
45 changes: 37 additions & 8 deletions config/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,14 +81,43 @@ postgres://user:pass@db/utask?sslmode=disable
"config_name": "database" // configuration entry where connection info can be found, default "database"
},
// concealed_secrets allows you to render some configstore items inaccessible to the task engine
"concealed_secrets": ["database", "encryption-key", "utask-cfg"],
// resource_limits allows you to define named resources and allocate a maximum number of concurrent actions on them (see Authoring task templates below)
"resource_limits": {
"openstack": 15,
},
// max_concurrent_executions defines a global maximum of concurrent actions running at any given time
// if none provided, no upper bound is enforced
"max_concurrent_executions": 100
"concealed_secrets": ["database", "encryption-key", "utask-cfg"],
// resource_limits allows you to define named resources and allocate a maximum number of concurrent actions on them (see Authoring task templates in /README.md)
"resource_limits": {
"openstack": 15,
"socket": 1024,
"fork": 50,
"url:example.org": 10
},
// max_concurrent_executions defines a global maximum of concurrent tasks running at any given time
// default value: 100; 0 will stop all tasks processing; -1 to indicate no limit
"max_concurrent_executions": 100,
// max_concurrent_executions_from_crashed defines a maximum of concurrent tasks from a crashed instance running at any given time
// default value: 20; 0 will stop all tasks processing; -1 to indicate no limit
"max_concurrent_executions_from_crashed": 20,
// delay_between_crashed_tasks_resolution defines a wait duration between two tasks from a crashed instance will be schedule in the current uTask instance
// default 1, unit: seconds
"delay_between_crashed_tasks_resolution": 1,
// dashboard_path_prefix defines the path prefix for the dashboard UI. Should be used if the uTask instance is hosted with a ProxyPass, on a custom path
// default: empty, no prefix
"dashboard_path_prefix": "/my-utask-instance",
// editor_path_prefix defines the path prefix for the editor UI. Should be used if the uTask instance is hosted with a ProxyPass, on a custom path
// default: empty, no prefix
"editor_path_prefix": "/my-utask-instance",
// dashboard_api_path_prefix defines the path prefix for the uTask API. Should be used if the uTask instance is hosted with a ProxyPass, on a custom path.
// dashboard_api_path_prefix will be used by Dashboard UI to contact the uTask API
// default: empty, no prefix
"dashboard_api_path_prefix": "/my-utask-instance",
// dashboard_sentry_dsn defines the Sentry DSN for the Dashboard UI. Used to retrieve Javascript execution errors inside a Sentry instance.
// default: empty, no SENTRY_DSN
"dashboard_sentry_dsn": "",
// server_options holds configuration to fine-tune DB connection
"server_options": {
// max_body_bytes defines the maximum size that will be read when sending a body to the uTask server.
// value can't be smaller than 1KB (1024), and can't be bigger than 10MB (10*1024*1024)
// default: 262144 (256KB), unit: byte
"max_body_bytes": 262144
}
}
```

Expand Down
2 changes: 1 addition & 1 deletion engine/collector_autorun.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func AutorunCollector(ctx context.Context) error {
if r != nil {
sl.wakeup()
logrus.WithFields(logrus.Fields{"resolution_id": r.PublicID}).Debugf("Autorun Collector: collected resolution %s", r.PublicID)
_ = GetEngine().Resolve(r.PublicID)
_ = GetEngine().Resolve(r.PublicID, nil)
}
}
}
Expand Down
41 changes: 34 additions & 7 deletions engine/collector_instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,26 @@ import (
"github.com/ovh/utask/models/resolution"
"github.com/ovh/utask/models/runnerinstance"
"github.com/sirupsen/logrus"
"golang.org/x/sync/semaphore"
)

// InstanceCollector launches a process that retrieves resolutions
// which might have been running on a dead instance and marks them as
// crashed, for examination
func InstanceCollector(ctx context.Context) error {
func InstanceCollector(ctx context.Context, maxConcurrentExecutions int, waitDuration time.Duration) error {
dbp, err := zesty.NewDBProvider(utask.DBName)
if err != nil {
return err
}

var sm *semaphore.Weighted
if maxConcurrentExecutions >= 0 {
sm = semaphore.NewWeighted(int64(maxConcurrentExecutions))
}

go func() {
// Start immediately
collect(dbp)
collect(dbp, sm, waitDuration)

for running := true; running; {
// wake up every minute
Expand All @@ -34,20 +40,21 @@ func InstanceCollector(ctx context.Context) error {
case <-ctx.Done():
running = false
default:
collect(dbp)
collect(dbp, sm, waitDuration)
}
}
}()

return nil
}

func collect(dbp zesty.DBProvider) error {
func collect(dbp zesty.DBProvider, sm *semaphore.Weighted, waitDuration time.Duration) error {
// get a list of all instances
instances, err := runnerinstance.ListInstances(dbp)
if err != nil {
return err
}
log := logrus.WithFields(logrus.Fields{"instance_id": utask.InstanceID, "collector": "instance_collector"})
for _, i := range instances {
// if an instance is dead
if i.IsDead() {
Expand All @@ -61,12 +68,18 @@ func collect(dbp zesty.DBProvider) error {
}
} else {
// run found resolution
logrus.WithFields(logrus.Fields{"resolution_id": r.PublicID}).Debugf("Instance Collector: collected crashed resolution %s", r.PublicID)
_ = GetEngine().Resolve(r.PublicID)
log.WithFields(logrus.Fields{"resolution_id": r.PublicID}).Debugf("collected crashed resolution %s", r.PublicID)
_ = GetEngine().Resolve(r.PublicID, sm)

// waiting between two resolve, so others instances can also select tasks
time.Sleep(waitDuration)
}
}
// no resolutions left to retry, delete instance
i.Delete(dbp)
if remaining, err := getRemainingResolution(dbp, i); err == nil && remaining == 0 {
log.Infof("collected all resolution from %d, deleting instance from instance list", i.ID)
i.Delete(dbp)
}
}
}

Expand Down Expand Up @@ -103,3 +116,17 @@ func getUpdateRunningResolution(dbp zesty.DBProvider, i *runnerinstance.Instance

return &r, nil
}

func getRemainingResolution(dbp zesty.DBProvider, i *runnerinstance.Instance) (int64, error) {
sqlStmt := `SELECT COUNT(id)
FROM "resolution"
WHERE instance_id = $1 AND state IN ($2,$3,$4,$5)`

return dbp.DB().SelectInt(sqlStmt,
i.ID,
resolution.StateCrashed,
resolution.StateRunning,
resolution.StateRetry,
resolution.StateAutorunning,
)
}
2 changes: 1 addition & 1 deletion engine/collector_retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func RetryCollector(ctx context.Context) error {
if r != nil {
sl.wakeup()
logrus.WithFields(logrus.Fields{"resolution_id": r.PublicID}).Debugf("Retry Collector: collected resolution %s", r.PublicID)
_ = GetEngine().Resolve(r.PublicID)
_ = GetEngine().Resolve(r.PublicID, nil)
}
}
}
Expand Down
Loading

0 comments on commit 7af1ad1

Please sign in to comment.