Skip to content

Commit

Permalink
cmd/atlas/cmdlog: move migrate status reporting to cmdlog
Browse files Browse the repository at this point in the history
  • Loading branch information
a8m committed Jul 22, 2024
1 parent 8b6e5ae commit 3904d10
Show file tree
Hide file tree
Showing 7 changed files with 437 additions and 457 deletions.
2 changes: 1 addition & 1 deletion cmd/atlas/internal/cmdapi/migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -1226,7 +1226,7 @@ func migrateStatusRun(cmd *cobra.Command, _ []string, flags migrateStatusFlags)
if err := checkRevisionSchemaClarity(cmd, client, flags.revisionSchema); err != nil {
return err
}
report, err := (&cmdmigrate.StatusReporter{
report, err := (&cmdlog.StatusReporter{
Client: client,
Dir: dir,
DirURL: dirURL,
Expand Down
17 changes: 12 additions & 5 deletions cmd/atlas/internal/cmdapi/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -637,11 +637,7 @@ func setSchemaEnvFlags(cmd *cobra.Command, env *Env) error {
if err != nil {
return err
}
for i, s := range srcs {
if !isURL(s) {
srcs[i] = "file://" + s
}
}
srcs = fixFileURLs(srcs)
if err := maySetFlag(cmd, flagFile, strings.Join(srcs, ",")); err != nil {
return err
}
Expand Down Expand Up @@ -832,3 +828,14 @@ func fmtFile(task fmttask) (bool, error) {
}
return false, nil
}

// fixFileURLs converts all file paths to a URL format, if not already.
// For example, "schema.hcl" to "file://schema.hcl".
func fixFileURLs(src []string) []string {
for i, s := range src {
if !isURL(s) {
src[i] = "file://" + s
}
}
return src
}
143 changes: 110 additions & 33 deletions cmd/atlas/internal/cmdlog/cmdlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"net/url"
"slices"
Expand All @@ -16,6 +17,8 @@ import (
"text/template"
"time"

cmdmigrate "ariga.io/atlas/cmd/atlas/internal/migrate"
"ariga.io/atlas/cmd/atlas/internal/migrate/ent/revision"
"ariga.io/atlas/sql/migrate"
"ariga.io/atlas/sql/schema"
"ariga.io/atlas/sql/sqlclient"
Expand Down Expand Up @@ -186,6 +189,111 @@ func (r *MigrateStatus) FromCheckpoint() bool {
return ok && ck.IsCheckpoint()
}

// StatusReporter is used to gather information about migration status.
type StatusReporter struct {
// Client configures the connection to the database to file a MigrateStatus for.
Client *sqlclient.Client
// DirURL of the migration directory.
DirURL *url.URL
// Dir is used for scanning and validating the migration directory.
Dir migrate.Dir
// Schema name the revision table resides in.
Schema string
}

// Report creates and writes a MigrateStatus.
func (r *StatusReporter) Report(ctx context.Context) (*MigrateStatus, error) {
rep := &MigrateStatus{Env: NewEnv(r.Client, r.DirURL)}
// Check if there already is a revision table in the defined schema.
// Inspect schema and check if the table does already exist.
sch, err := r.Client.InspectSchema(ctx, r.Schema, &schema.InspectOptions{Tables: []string{revision.Table}})
if err != nil && !schema.IsNotExistError(err) {
return nil, err
}
if schema.IsNotExistError(err) || func() bool { _, ok := sch.Table(revision.Table); return !ok }() {
// Either schema or table does not exist.
if rep.Available, err = migrate.FilesFromLastCheckpoint(r.Dir); err != nil {
return nil, err
}
rep.Pending = rep.Available
} else {
// Both exist, fetch their data.
rrw, err := cmdmigrate.RevisionsForClient(ctx, r.Client, r.Schema)
if err != nil {
return nil, err
}
if err := rrw.Migrate(ctx); err != nil {
return nil, err
}
ex, err := migrate.NewExecutor(r.Client.Driver, r.Dir, rrw)
if err != nil {
return nil, err
}
rep.Applied, err = rrw.ReadRevisions(ctx)
if err != nil {
return nil, err
}
if rep.Pending, err = ex.Pending(ctx); err != nil && !errors.Is(err, migrate.ErrNoPendingFiles) {
if err1 := (*migrate.HistoryNonLinearError)(nil); errors.As(err, &err1) {
rep.Error = err1.Error()
rep.Status = statusPending
rep.Pending = err1.Pending
rep.OutOfOrder = err1.OutOfOrder
// Non-linear error means at least one file was applied.
rep.Current = rep.Applied[len(rep.Applied)-1].Version
return rep, nil
}
return nil, err
}
// If no files were applied, all pending files are
// available. The first one might be a checkpoint.
if len(rep.Applied) == 0 {
rep.Available = rep.Pending
} else if rep.Available, err = r.Dir.Files(); err != nil {
return nil, err
}
}
switch len(rep.Pending) {
case len(rep.Available):
rep.Current = "No migration applied yet"
default:
rep.Current = rep.Applied[len(rep.Applied)-1].Version
}
if len(rep.Pending) == 0 {
rep.Status = statusOK
rep.Next = "Already at latest version"
} else {
rep.Status = statusPending
rep.Next = rep.Pending[0].Version()
}
// If the last one is partially applied (and not manually resolved).
if len(rep.Applied) != 0 {
last := rep.Applied[len(rep.Applied)-1]
if !last.Type.Has(migrate.RevisionTypeResolved) && last.Applied < last.Total {
rep.SQL = strings.ReplaceAll(last.ErrorStmt, "\n", " ")
rep.Error = strings.ReplaceAll(last.Error, "\n", " ")
rep.Count = last.Applied
idx := migrate.FilesLastIndex(rep.Available, func(f migrate.File) bool {
return f.Version() == last.Version
})
if idx == -1 {
return nil, fmt.Errorf("migration file with version %q not found", last.Version)
}
stmts, err := migrate.FileStmts(r.Client.Driver, rep.Available[idx])
if err != nil {
return nil, err
}
rep.Total = len(stmts)
}
}
return rep, nil
}

const (
statusOK = "OK"
statusPending = "PENDING"
)

// MigrateSetTemplate holds the default template of the 'migrate set' command.
var MigrateSetTemplate = template.Must(template.New("set").
Funcs(ColorTemplateFuncs).Parse(`
Expand Down Expand Up @@ -812,38 +920,7 @@ func (s *SchemaInspect) MarshalSQL(indent ...string) (string, error) {
}

func sqlInspect(report *SchemaInspect, indent ...string) (string, error) {
var changes schema.Changes
for _, o := range report.Realm.Objects {
changes = append(changes, &schema.AddObject{O: o})
}
for _, s := range report.Realm.Schemas {
// Generate commands for creating the schemas on realm-mode.
if report.client.URL.Schema == "" {
changes = append(changes, &schema.AddSchema{S: s})
}
for _, o := range s.Objects {
changes = append(changes, &schema.AddObject{O: o})
}
for _, t := range s.Tables {
changes = append(changes, &schema.AddTable{T: t})
for _, r := range t.Triggers {
changes = append(changes, &schema.AddTrigger{T: r})
}
}
for _, v := range s.Views {
changes = append(changes, &schema.AddView{V: v})
for _, r := range v.Triggers {
changes = append(changes, &schema.AddTrigger{T: r})
}
}
for _, f := range s.Funcs {
changes = append(changes, &schema.AddFunc{F: f})
}
for _, p := range s.Procs {
changes = append(changes, &schema.AddProc{P: p})
}
}
return fmtPlan(report.ctx, report.client, changes, indent)
return fmtPlan(report.ctx, report.client, cmdmigrate.ChangesToRealm(report.client, report.Realm), indent)
}

// SchemaDiff contains a summary of the 'schema diff' command.
Expand Down Expand Up @@ -900,7 +977,7 @@ func fmtPlan(ctx context.Context, client *sqlclient.Client, changes schema.Chang
}
plan, err := client.PlanChanges(ctx, "plan", changes, func(o *migrate.PlanOptions) {
o.Mode = migrate.PlanModeDump
// Disable tables qualifier in schema-mode.
// Disable object qualifier in schema-mode.
if client.URL.Schema != "" {
o.SchemaQualifier = new(string)
}
Expand Down
Loading

0 comments on commit 3904d10

Please sign in to comment.