Skip to content

Commit

Permalink
mosaic: enable console support
Browse files Browse the repository at this point in the history
  • Loading branch information
thdxr committed Jul 23, 2024
1 parent cd6b7b6 commit 08e20f4
Show file tree
Hide file tree
Showing 3 changed files with 309 additions and 41 deletions.
12 changes: 11 additions & 1 deletion cmd/sst/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,17 @@ var root = &cli.Command{
},
},
{
Name: "mosaic",
Name: "mosaic",
Flags: []cli.Flag{
{
Name: "simple",
Type: "bool",
Description: cli.Description{
Short: "Run in simple mode",
Long: "Run in simple mode.",
},
},
},
Hidden: true,
Run: mosaic.CmdMosaic,
},
Expand Down
98 changes: 58 additions & 40 deletions cmd/sst/mosaic/mosaic.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/sst/ion/cmd/sst/mosaic/deployer"
"github.com/sst/ion/cmd/sst/mosaic/multiplexer"
"github.com/sst/ion/cmd/sst/mosaic/server"
"github.com/sst/ion/cmd/sst/mosaic/socket"
"github.com/sst/ion/cmd/sst/mosaic/watcher"
"github.com/sst/ion/pkg/project"
"github.com/sst/ion/pkg/server/dev/cloudflare"
Expand Down Expand Up @@ -80,6 +81,7 @@ func CmdMosaic(c *cli.Cli) error {
if err != nil {
return err
}
os.Setenv("SST_STAGE", p.App().Stage)
slog.Info("mosaic", "project", p.PathRoot())

wg.Go(func() error {
Expand All @@ -91,6 +93,14 @@ func CmdMosaic(c *cli.Cli) error {
if err != nil {
return err
}

wg.Go(func() error {
defer c.Cancel()
socket.Start(c.Context, p, server)
return nil
})

os.Setenv("SST_SERVER", fmt.Sprintf("http://localhost:%v", server.Port))
for name, a := range p.App().Providers {
args := a
switch name {
Expand All @@ -113,51 +123,59 @@ func CmdMosaic(c *cli.Cli) error {
})

currentExecutable, _ := os.Executable()
multi := multiplexer.New(c.Context)
multiEnv := []string{
fmt.Sprintf("SST_SERVER=http://localhost:%v", server.Port),
"SST_STAGE=" + p.App().Stage,
}
multi.AddProcess("deploy", []string{currentExecutable, "mosaic-deploy"}, "⑆", "SST", "", false, multiEnv...)
wg.Go(func() error {
defer c.Cancel()
multi.Start()
return nil
})

wg.Go(func() error {
evts := bus.Subscribe(&project.CompleteEvent{})
defer c.Cancel()
for {
select {
case <-c.Context.Done():
return nil
case unknown := <-evts:
switch evt := unknown.(type) {
case *project.CompleteEvent:
for _, d := range evt.Devs {
if d.Command == "" {
continue
if !c.Bool("simple") {
multi := multiplexer.New(c.Context)
multiEnv := []string{
fmt.Sprintf("SST_SERVER=http://localhost:%v", server.Port),
"SST_STAGE=" + p.App().Stage,
}
multi.AddProcess("deploy", []string{currentExecutable, "mosaic-deploy"}, "⑆", "SST", "", false, multiEnv...)
wg.Go(func() error {
defer c.Cancel()
multi.Start()
return nil
})
wg.Go(func() error {
evts := bus.Subscribe(&project.CompleteEvent{})
defer c.Cancel()
for {
select {
case <-c.Context.Done():
return nil
case unknown := <-evts:
switch evt := unknown.(type) {
case *project.CompleteEvent:
for _, d := range evt.Devs {
if d.Command == "" {
continue
}
dir := filepath.Join(cwd, d.Directory)
slog.Info("mosaic", "dev", d.Name, "directory", dir)
multi.AddProcess(
d.Name,
append([]string{currentExecutable, "mosaic", "--"},
strings.Split(d.Command, " ")...),
// 𝝺 λ
"→",
d.Name,
dir,
true,
multiEnv...,
)
}
dir := filepath.Join(cwd, d.Directory)
slog.Info("mosaic", "dev", d.Name, "directory", dir)
multi.AddProcess(
d.Name,
append([]string{currentExecutable, "mosaic", "--"},
strings.Split(d.Command, " ")...),
// 𝝺 λ
"→",
d.Name,
dir,
true,
multiEnv...,
)
break
}
break
}
}
}
})
})
}

if c.Bool("simple") {
wg.Go(func() error {
return CmdMosaicDeploy(c)
})
}

wg.Go(func() error {
defer c.Cancel()
Expand Down
240 changes: 240 additions & 0 deletions cmd/sst/mosaic/socket/socket.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,240 @@
package socket

import (
"context"
"encoding/json"
"log/slog"
"net/http"
"time"

"github.com/charmbracelet/x/ansi"
"github.com/gorilla/websocket"
"github.com/sst/ion/cmd/sst/mosaic/aws"
"github.com/sst/ion/cmd/sst/mosaic/bus"
"github.com/sst/ion/cmd/sst/mosaic/server"
"github.com/sst/ion/pkg/project"
)

var upgrader = websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool { return true },
}

type CliDevEvent struct {
App string `json:"app"`
Stage string `json:"stage"`
Region string `json:"region"`
}

type Invocation struct {
ID string `json:"id,omitempty"`
Source string `json:"source,omitempty"`
Cold bool `json:"cold,omitempty"`
Input json.RawMessage `json:"input,omitempty"`
Output interface{} `json:"output,omitempty"`
Start int64 `json:"start,omitempty"`
End int64 `json:"end,omitempty"`
Errors []InvocationError `json:"errors"`
Logs []InvocationLog `json:"logs"`
Report *InvocationReport `json:"report,omitempty"`
}

type InvocationLog struct {
ID string `json:"id,omitempty"`
Timestamp int64 `json:"timestamp,omitempty"`
Message string `json:"message,omitempty"`
}

type InvocationReport struct {
Duration int64 `json:"duration"`
Init int64 `json:"init"`
Size int64 `json:"size"`
Memory int64 `json:"memory"`
Xray string `json:"xray"`
}

type InvocationError struct {
ID string `json:"id"`
Error string `json:"error"`
Message string `json:"message"`
Stack []Frame `json:"stack"`
Failed bool `json:"failed"`
}

type Frame struct {
Raw string `json:"raw"`
}

func Start(ctx context.Context, p *project.Project, server *server.Server) {
connected := make(chan *websocket.Conn)
disconnected := make(chan *websocket.Conn)
invocationClear := make(chan string)
server.Mux.HandleFunc("/socket", func(w http.ResponseWriter, r *http.Request) {
slog.Info("socket upgrading", "addr", r.RemoteAddr)
ws, err := upgrader.Upgrade(w, r, nil)
if err != nil {
return
}
defer ws.Close()

connected <- ws
defer func() {
disconnected <- ws
}()

for {
_, data, err := ws.ReadMessage()
if err != nil {
slog.Info("socket error", "err", err)
break
}

slog.Info("socket message", "message", string(data))
var message map[string]interface{}
err = json.Unmarshal(data, &message)
if err != nil {
continue
}

if message["type"] == "log.cleared" {
source := message["properties"].(map[string]interface{})["source"].(string)
invocationClear <- source
}

}
})
sockets := make(map[*websocket.Conn]struct{})
invocations := make(map[string]*Invocation)

evts := bus.SubscribeAll()

publish := func(evt interface{}) {
for ws := range sockets {
ws.WriteJSON(evt)
}
}

publishInvocation := func(invocation *Invocation) {
publish(map[string]interface{}{
"type": "invocation",
"properties": []*Invocation{
invocation,
},
})
}

var complete *project.CompleteEvent

for {
select {
case <-ctx.Done():
return
case source := <-invocationClear:
if source == "all" {
invocations = map[string]*Invocation{}
break
}
for id, invocation := range invocations {
if invocation.Source == source {
delete(invocations, id)
}
}
break

case unknown := <-evts:
switch evt := unknown.(type) {
case *project.CompleteEvent:
complete = evt
break
case *aws.FunctionInvokedEvent:
source := ""
if complete != nil {
for _, resource := range complete.Resources {
if resource.URN.Name() == evt.FunctionID && resource.Type == "sst:aws:Function" {
source = string(resource.URN)
}
}
}
invocation := &Invocation{
ID: evt.RequestID,
Source: source,
Input: json.RawMessage(evt.Input),
Start: time.Now().UnixMilli(),
Errors: []InvocationError{},
Logs: []InvocationLog{},
}
invocations[evt.RequestID] = invocation
publishInvocation(invocation)
break
case *aws.FunctionResponseEvent:
invocation, ok := invocations[evt.RequestID]
if ok {
invocation.Output = json.RawMessage(evt.Output)
invocation.End = time.Now().UnixMilli()
invocation.Report = &InvocationReport{
Duration: invocation.End - invocation.Start,
}
publishInvocation(invocation)
}
break
case *aws.FunctionErrorEvent:
invocation, ok := invocations[evt.RequestID]
if ok {
invocation.End = time.Now().UnixMilli()
invocation.Report = &InvocationReport{
Duration: invocation.End - invocation.Start,
}
error := InvocationError{
Message: evt.ErrorMessage,
Error: evt.ErrorType,
Failed: true,
Stack: []Frame{},
}
for _, frame := range evt.Trace {
error.Stack = append(error.Stack, Frame{
Raw: frame,
})
}
invocation.Errors = append(invocation.Errors, error)
publishInvocation(invocation)
}
break
case *aws.FunctionLogEvent:
invocation, ok := invocations[evt.RequestID]
if ok {
invocation.Logs = append(invocation.Logs, InvocationLog{
ID: time.Now().String(),
Timestamp: time.Now().UnixMilli(),
Message: ansi.Strip(evt.Line),
})
publishInvocation(invocation)
}
break
}
case ws := <-connected:
slog.Info("socket connected", "addr", ws.RemoteAddr())
sockets[ws] = struct{}{}
ws.WriteJSON(map[string]interface{}{
"type": "cli.dev",
"properties": CliDevEvent{
App: p.App().Name,
Stage: p.App().Stage,
},
})
all := []*Invocation{}
for _, invocation := range invocations {
all = append(all, invocation)
}
slog.Info("sending invocations", "count", len(all))
ws.WriteJSON(map[string]interface{}{
"type": "invocation",
"properties": all,
})
break
case ws := <-disconnected:
slog.Info("socket disconnected", "addr", ws.RemoteAddr())
delete(sockets, ws)
break
}
}

}

0 comments on commit 08e20f4

Please sign in to comment.