Skip to content

Commit

Permalink
Merge pull request #24 from ripienaar/cli_add
Browse files Browse the repository at this point in the history
(misc) support queue add on the cli
  • Loading branch information
ripienaar authored Feb 2, 2022
2 parents c2197fb + 14f2a1c commit 21f715d
Show file tree
Hide file tree
Showing 7 changed files with 81 additions and 9 deletions.
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ This package heavily inspired by [hibiken/asynq](https://github.com/hibiken/asyn

* [Status](#status)
* [Features](#features)
* [Examples](https://github.com/choria-io/asyncjobs/wiki/Introductory-Golang-Walkthrough)
* Examples
* [Golang](https://github.com/choria-io/asyncjobs/wiki/Introductory-Golang-Walkthrough)
* [CLI](https://github.com/choria-io/asyncjobs/wiki/Introductory-CLI-Walkthrough)

[![Go Reference](https://pkg.go.dev/badge/github.com/choria-io/asyncjobs.svg)](https://pkg.go.dev/github.com/choria-io/asyncjobs)
[![Go Report Card](https://goreportcard.com/badge/github.com/choria-io/asyncjobs)](https://goreportcard.com/report/github.com/choria-io/asyncjobs)
Expand Down
51 changes: 51 additions & 0 deletions ajc/queue_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"sort"
"time"

"github.com/choria-io/asyncjobs"
"github.com/dustin/go-humanize"
"gopkg.in/alecthomas/kingpin.v2"
)
Expand All @@ -22,13 +23,27 @@ type queueCommand struct {
maxTries int
maxTime time.Duration
maxConcurrent int
memory bool
replicas int
discardOld bool
}

func configureQueueCommand(app *kingpin.Application) {
c := &queueCommand{}

queues := app.Command("queues", "Manage Work Queues").Alias("q").Alias("queue")

add := queues.Command("new", "Creates a new Queue").Alias("add").Alias("n").Alias("a").Action(c.addAction)
add.Arg("queue", "Queue to Configure").Required().StringVar(&c.name)
add.Flag("age", "Sets the maximum age for entries to keep, 0s for unlimited").Default("0s").DurationVar(&c.maxAge)
add.Flag("entries", "Sets the maximum amount of entries to keep, 0 for unlimited").Default("0").IntVar(&c.maxEntries)
add.Flag("tries", "Maximum delivery attempts to allow per message, -1 for unlimited").Default(fmt.Sprintf("%d", asyncjobs.DefaultMaxTries)).IntVar(&c.maxTries)
add.Flag("run-time", "Maximum run-time to allow per task").Default(asyncjobs.DefaultJobRunTime.String()).DurationVar(&c.maxTime)
add.Flag("concurrent", "Maximum concurrent jobs that can be ran").Default(fmt.Sprintf("%d", asyncjobs.DefaultQueueMaxConcurrent)).IntVar(&c.maxConcurrent)
add.Flag("memory", "Store the Queue in memory").BoolVar(&c.memory)
add.Flag("replicas", "Number of storage replicas to configure").Default("1").IntVar(&c.replicas)
add.Flag("discard-old", "When full, discard old entries").BoolVar(&c.discardOld)

queues.Command("list", "List Queues").Alias("ls").Action(c.lsAction)

rm := queues.Command("delete", "Removes the entire work queue").Alias("rm").Action(c.rmAction)
Expand All @@ -51,6 +66,42 @@ func configureQueueCommand(app *kingpin.Application) {
cfg.Flag("concurrent", "Maximum concurrent jobs that can be ran").Default("-2").IntVar(&c.maxConcurrent)
}

func (c *queueCommand) addAction(_ *kingpin.ParseContext) error {
err := prepare()
if err != nil {
return err
}

_, err = admin.QueueInfo(c.name)
if err == nil {
return fmt.Errorf("queue %s already exist", c.name)
}

queue := &asyncjobs.Queue{
Name: c.name,
MaxAge: c.maxAge,
MaxEntries: c.maxEntries,
DiscardOld: c.discardOld,
MaxTries: c.maxTries,
MaxRunTime: c.maxTime,
MaxConcurrent: c.maxConcurrent,
}

err = admin.PrepareQueue(queue, c.replicas, c.memory)
if err != nil {
return err
}

nfo, err := admin.QueueInfo(c.name)
if err != nil {
return err
}

showQueue(nfo)

return nil
}

func (c *queueCommand) configureAction(_ *kingpin.ParseContext) error {
err := prepare()
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion ajc/task_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,7 @@ func (c *taskCommand) viewAction(_ *kingpin.ParseContext) error {
}

func (c *taskCommand) addAction(_ *kingpin.ParseContext) error {
err := prepare()
err := prepare(asyncjobs.BindWorkQueue(c.queue))
if err != nil {
return err
}
Expand Down
9 changes: 8 additions & 1 deletion ajc/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,12 @@ func prepare(copts ...asyncjobs.ClientOpt) error {
conn := []nats.Option{
nats.MaxReconnects(10),
nats.Name("Choria Asynchronous Jobs CLI version " + version),
nats.ReconnectHandler(func(nc *nats.Conn) {
log.Printf("Reconnected to NATS server %s", nc.ConnectedUrl())
}),
nats.DisconnectErrHandler(func(nc *nats.Conn, err error) {
log.Printf("Disconnected from server: %v", err)
}),
nats.ErrorHandler(func(nc *nats.Conn, _ *nats.Subscription, err error) {
url := nc.ConnectedUrl()
if url == "" {
Expand All @@ -51,7 +57,8 @@ func prepare(copts ...asyncjobs.ClientOpt) error {
}

opts := []asyncjobs.ClientOpt{
asyncjobs.CustomLogger(log), asyncjobs.NatsContext(nctx, conn...),
asyncjobs.CustomLogger(log),
asyncjobs.NatsContext(nctx, conn...),
}
opts = append(opts, copts...)

Expand Down
16 changes: 16 additions & 0 deletions client_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,22 @@ func WorkQueue(queue *Queue) ClientOpt {
}
}

// BindWorkQueue binds the client to a work queue that should already exist
func BindWorkQueue(queue string) ClientOpt {
return func(opts *ClientOpts) error {
if queue == "" {
return fmt.Errorf("a queue name is required")
}
if opts.queue != nil {
return fmt.Errorf("a queue has already been defined")
}

opts.queue = &Queue{Name: queue, NoCreate: true}

return nil
}
}

// TaskRetention is the time tasks will be kept with.
//
// Used only when initially creating the underlying streams.
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ require (
github.com/segmentio/ksuid v1.0.4
github.com/sirupsen/logrus v1.6.0
github.com/xlab/tablewriter v0.0.0-20160610135559-80b567a11ad5
golang.org/x/crypto v0.0.0-20220112180741-5e0467b6c7ce
golang.org/x/term v0.0.0-20210503060354-a79de5458b56
gopkg.in/alecthomas/kingpin.v2 v2.2.6
)

Expand All @@ -38,9 +38,9 @@ require (
github.com/prometheus/client_model v0.2.0 // indirect
github.com/prometheus/common v0.32.1 // indirect
github.com/prometheus/procfs v0.7.3 // indirect
golang.org/x/crypto v0.0.0-20220112180741-5e0467b6c7ce // indirect
golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2 // indirect
golang.org/x/sys v0.0.0-20220128215802-99c3d69c2c27 // indirect
golang.org/x/term v0.0.0-20210503060354-a79de5458b56 // indirect
golang.org/x/text v0.3.6 // indirect
golang.org/x/time v0.0.0-20211116232009-f0f3c7e86c11 // indirect
google.golang.org/protobuf v1.26.0 // indirect
Expand Down
4 changes: 0 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -192,10 +192,6 @@ github.com/nats-io/jsm.go v0.0.28-0.20220128163911-90cd1007b323/go.mod h1:HU1JmK
github.com/nats-io/jwt/v2 v2.2.1-0.20220113022732-58e87895b296 h1:vU9tpM3apjYlLLeY23zRWJ9Zktr5jp+mloR942LEOpY=
github.com/nats-io/jwt/v2 v2.2.1-0.20220113022732-58e87895b296/go.mod h1:0tqz9Hlu6bCBFLWAASKhE5vUA4c24L9KPUUgvwumE/k=
github.com/nats-io/nats-server/v2 v2.7.2-0.20220126224453-26b692ee73c0/go.mod h1:tckmrt0M6bVaDT3kmh9UrIq/CBOBBse+TpXQi5ldaa8=
github.com/nats-io/nats-server/v2 v2.7.2-0.20220130232407-6b690bd5b635 h1:h6+xCwResqI/W7AFR0E8wyYS89zXBBS85Qd0gf+iW/E=
github.com/nats-io/nats-server/v2 v2.7.2-0.20220130232407-6b690bd5b635/go.mod h1:tckmrt0M6bVaDT3kmh9UrIq/CBOBBse+TpXQi5ldaa8=
github.com/nats-io/nats-server/v2 v2.7.2-0.20220131171338-74c0fdc3bb8b h1:h8EYD8Q7yUbjXmMT6z1XI7SAV+aiHhkNEc1O+WImMh4=
github.com/nats-io/nats-server/v2 v2.7.2-0.20220131171338-74c0fdc3bb8b/go.mod h1:tckmrt0M6bVaDT3kmh9UrIq/CBOBBse+TpXQi5ldaa8=
github.com/nats-io/nats-server/v2 v2.7.2-0.20220201222209-2ed7a812d8b2 h1:/ocgZt+pxx9ocGWWdeBAJfg0tqoz4uUoIgCNepKnnDQ=
github.com/nats-io/nats-server/v2 v2.7.2-0.20220201222209-2ed7a812d8b2/go.mod h1:tckmrt0M6bVaDT3kmh9UrIq/CBOBBse+TpXQi5ldaa8=
github.com/nats-io/nats.go v1.13.1-0.20220121202836-972a071d373d h1:GRSmEJutHkdoxKsRypP575IIdoXe7Bm6yHQF6GcDBnA=
Expand Down

0 comments on commit 21f715d

Please sign in to comment.