From deacead13af1f5b176891875b3a977fe354f8d01 Mon Sep 17 00:00:00 2001 From: Hank Donnay Date: Thu, 19 Oct 2023 12:28:28 -0500 Subject: [PATCH] postgres/v2: Matcher implementation Signed-off-by: Hank Donnay --- datastore/postgres/v2/matcher_v1.go | 136 +++++ .../postgres/v2/matcher_v1_enrichment.go | 152 +++++ datastore/postgres/v2/matcher_v1_gc.go | 111 ++++ datastore/postgres/v2/matcher_v1_gc_test.go | 193 ++++++ .../postgres/v2/matcher_v1_updateoperation.go | 301 ++++++++++ .../v2/matcher_v1_updateoperation_test.go | 189 ++++++ .../postgres/v2/matcher_v1_updater_test.go | 558 ++++++++++++++++++ .../postgres/v2/matcher_v1_updaterstatus.go | 66 +++ .../postgres/v2/matcher_v1_vulnerabilities.go | 454 ++++++++++++++ .../v2/matcher_v1_vulnerabilities_test.go | 448 ++++++++++++++ datastore/postgres/v2/queries.go | 1 + .../v2/queries/matcher/gc_delete_ops.sql | 2 + .../v2/queries/matcher/gc_distinct.sql | 1 + .../v2/queries/matcher/gc_eligible.sql | 14 + .../v2/queries/matcher/gc_orphaned.sql | 5 + .../postgres/v2/queries/matcher/get_get.sql | 62 ++ .../v2/queries/matcher/get_latestupdates.sql | 9 + .../v2/queries/matcher/getenrichment_get.sql | 21 + .../matcher/getlatestupdateref_any.sql | 7 + .../matcher/getlatestupdateref_enrichment.sql | 9 + .../getlatestupdateref_vulnerability.sql | 9 + .../matcher/getlatestupdaterefs_any.sql | 10 + .../getlatestupdaterefs_enrichment.sql | 12 + .../getlatestupdaterefs_vulnerability.sql | 12 + .../queries/matcher/getupdatediff_confirm.sql | 2 + .../v2/queries/matcher/getupdatediff_load.sql | 70 +++ .../matcher/getupdatediff_populaterefs.sql | 1 + .../matcher/getupdateoperations_any.sql | 11 + .../getupdateoperations_enrichment.sql | 12 + .../getupdateoperations_getupdaters.sql | 4 + .../getupdateoperations_vulnerability.sql | 12 + .../matcher/initialized_initialized.sql | 7 + .../matcher/recordupdatersetstatus_update.sql | 8 + .../matcher/recordupdaterstatus_failure.sql | 7 + .../matcher/recordupdaterstatus_success.sql | 7 + .../matcher/updateenrichment_assoc.sql | 22 + .../matcher/updateenrichment_create.sql | 7 + .../matcher/updateenrichment_insert.sql | 9 + .../matcher/updateenrichment_refresh.sql | 1 + .../updatevulnerabilities_associate.sql | 11 + .../matcher/updatevulnerabilities_create.sql | 4 + .../matcher/updatevulnerabilities_insert.sql | 4 + .../matcher/updatevulnerabilities_refresh.sql | 1 + datastore/postgres/v2/queries_test.go | 18 + datastore/postgres/v2/query_metadata.go | 70 ++- .../postgres/v2/testdata/matcher_helpers.psql | 76 +++ 46 files changed, 3144 insertions(+), 2 deletions(-) create mode 100644 datastore/postgres/v2/matcher_v1_enrichment.go create mode 100644 datastore/postgres/v2/matcher_v1_gc.go create mode 100644 datastore/postgres/v2/matcher_v1_gc_test.go create mode 100644 datastore/postgres/v2/matcher_v1_updateoperation.go create mode 100644 datastore/postgres/v2/matcher_v1_updateoperation_test.go create mode 100644 datastore/postgres/v2/matcher_v1_updater_test.go create mode 100644 datastore/postgres/v2/matcher_v1_updaterstatus.go create mode 100644 datastore/postgres/v2/matcher_v1_vulnerabilities.go create mode 100644 datastore/postgres/v2/matcher_v1_vulnerabilities_test.go create mode 100644 datastore/postgres/v2/queries/matcher/gc_delete_ops.sql create mode 100644 datastore/postgres/v2/queries/matcher/gc_distinct.sql create mode 100644 datastore/postgres/v2/queries/matcher/gc_eligible.sql create mode 100644 datastore/postgres/v2/queries/matcher/gc_orphaned.sql create mode 100644 datastore/postgres/v2/queries/matcher/get_get.sql create mode 100644 datastore/postgres/v2/queries/matcher/get_latestupdates.sql create mode 100644 datastore/postgres/v2/queries/matcher/getenrichment_get.sql create mode 100644 datastore/postgres/v2/queries/matcher/getlatestupdateref_any.sql create mode 100644 datastore/postgres/v2/queries/matcher/getlatestupdateref_enrichment.sql create mode 100644 datastore/postgres/v2/queries/matcher/getlatestupdateref_vulnerability.sql create mode 100644 datastore/postgres/v2/queries/matcher/getlatestupdaterefs_any.sql create mode 100644 datastore/postgres/v2/queries/matcher/getlatestupdaterefs_enrichment.sql create mode 100644 datastore/postgres/v2/queries/matcher/getlatestupdaterefs_vulnerability.sql create mode 100644 datastore/postgres/v2/queries/matcher/getupdatediff_confirm.sql create mode 100644 datastore/postgres/v2/queries/matcher/getupdatediff_load.sql create mode 100644 datastore/postgres/v2/queries/matcher/getupdatediff_populaterefs.sql create mode 100644 datastore/postgres/v2/queries/matcher/getupdateoperations_any.sql create mode 100644 datastore/postgres/v2/queries/matcher/getupdateoperations_enrichment.sql create mode 100644 datastore/postgres/v2/queries/matcher/getupdateoperations_getupdaters.sql create mode 100644 datastore/postgres/v2/queries/matcher/getupdateoperations_vulnerability.sql create mode 100644 datastore/postgres/v2/queries/matcher/initialized_initialized.sql create mode 100644 datastore/postgres/v2/queries/matcher/recordupdatersetstatus_update.sql create mode 100644 datastore/postgres/v2/queries/matcher/recordupdaterstatus_failure.sql create mode 100644 datastore/postgres/v2/queries/matcher/recordupdaterstatus_success.sql create mode 100644 datastore/postgres/v2/queries/matcher/updateenrichment_assoc.sql create mode 100644 datastore/postgres/v2/queries/matcher/updateenrichment_create.sql create mode 100644 datastore/postgres/v2/queries/matcher/updateenrichment_insert.sql create mode 100644 datastore/postgres/v2/queries/matcher/updateenrichment_refresh.sql create mode 100644 datastore/postgres/v2/queries/matcher/updatevulnerabilities_associate.sql create mode 100644 datastore/postgres/v2/queries/matcher/updatevulnerabilities_create.sql create mode 100644 datastore/postgres/v2/queries/matcher/updatevulnerabilities_insert.sql create mode 100644 datastore/postgres/v2/queries/matcher/updatevulnerabilities_refresh.sql create mode 100644 datastore/postgres/v2/testdata/matcher_helpers.psql diff --git a/datastore/postgres/v2/matcher_v1.go b/datastore/postgres/v2/matcher_v1.go index bae718a43..21bf7100a 100644 --- a/datastore/postgres/v2/matcher_v1.go +++ b/datastore/postgres/v2/matcher_v1.go @@ -1,5 +1,75 @@ package postgres +import ( + "context" + "fmt" + "runtime" + "sync/atomic" + + "github.com/google/uuid" + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgxpool" + "github.com/jackc/pgx/v5/stdlib" + "github.com/quay/zlog" + "github.com/remind101/migrate" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" + + "github.com/quay/claircore/datastore" + "github.com/quay/claircore/datastore/postgres/migrations" +) + +// NewMatcherV1 returns a configured [MatcherV1]. +// +// The passed [pgxpool.Config] will have its tracing and lifecycle hooks +// overwritten. +// +// Values that can be used as MatcherOptions: +// - [WithMigrations] +// - [WithMinimumMigration] +func NewMatcherV1(ctx context.Context, cfg *pgxpool.Config, opt ...MatcherOption) (*MatcherV1, error) { + const prefix = `matcher` + var mCfg matcherConfig + for _, o := range opt { + mCfg = o.matcherConfig(mCfg) + } + + if mCfg.Migrations { + cfg := cfg.ConnConfig.Copy() + cfg.DefaultQueryExecMode = pgx.QueryExecModeExec + err := func() error { + db := stdlib.OpenDB(*cfg) + defer db.Close() + migrator := migrate.NewPostgresMigrator(db) + migrator.Table = migrations.MatcherMigrationTable + err := migrator.Exec(migrate.Up, migrations.MatcherMigrations...) + if err != nil { + return fmt.Errorf("failed to perform migrations: %w", err) + } + return nil + }() + if err != nil { + return nil, err + } + } + var s MatcherV1 + if err := s.init(ctx, cfg, prefix); err != nil { + return nil, err + + } + + if err := s.checkRevision(ctx, pgx.Identifier([]string{migrations.MatcherMigrationTable}), mCfg.MinMigration); err != nil { + return nil, err + } + + _, file, line, _ := runtime.Caller(1) + runtime.SetFinalizer(&s, func(s *MatcherV1) { + panic(fmt.Sprintf("%s:%d: MatcherV1 not closed", file, line)) + }) + + return &s, nil +} + type MatcherOption interface { matcherConfig(matcherConfig) matcherConfig } @@ -15,3 +85,69 @@ func newMatcherConfig() matcherConfig { MinMigration: MinimumMatcherMigration, } } + +// MatcherV1 implements all the relevant interfaces in the datastore package +type MatcherV1 struct { + storeCommon + // Initialized is used as an atomic bool for tracking initialization. + initialized uint32 +} + +var _ datastore.MatcherV1 = (*MatcherV1)(nil) + +// DeleteUpdateOperations implements [datastore.MatcherV1Updater]. +func (s *MatcherV1) DeleteUpdateOperations(ctx context.Context, id ...uuid.UUID) (int64, error) { + const query = `DELETE FROM update_operation WHERE ref = ANY($1::uuid[]);` + ctx = zlog.ContextWithValues(ctx, "component", "internal/vulnstore/postgres/deleteUpdateOperations") + if len(id) == 0 { + return 0, nil + } + + // Pgx seems unwilling to do the []uuid.UUID → uuid[] conversion, so we're + // forced to make some garbage here. + refStr := make([]string, len(id)) + for i := range id { + refStr[i] = id[i].String() + } + tag, err := s.pool.Exec(ctx, query, refStr) + if err != nil { + return 0, fmt.Errorf("failed to delete: %w", err) + } + return tag.RowsAffected(), nil +} + +// Initialized implements [datastore.MatcherV1]. +func (s *MatcherV1) Initialized(ctx context.Context) (ok bool, err error) { + ctx, done := s.method(ctx, &err) + defer done() + span := trace.SpanFromContext(ctx) + ok = atomic.LoadUint32(&s.initialized) != 0 + span.AddEvent(`loaded`, trace.WithAttributes(attribute.Bool("value", ok))) + if ok { + return true, nil + } + + err = s.pool.AcquireFunc(ctx, s.acquire(ctx, `initialized`, func(ctx context.Context, c *pgxpool.Conn, query string) error { + return c.QueryRow(ctx, query).Scan(&ok) + })) + if err != nil { + return false, err + } + + span.AddEvent(`initialized`, trace.WithAttributes(attribute.Bool("value", ok))) + // There were no rows when we looked, so report that. Don't update the bool, + // because it's in the 'false' state or another goroutine has read from the + // database and will want to swap in 'true'. + if !ok { + return false, nil + } + // If this fails, it means a concurrent goroutine already swapped. Any + // subsequent calls will see the 'true' value. + atomic.CompareAndSwapUint32(&s.initialized, 0, 1) + return true, nil +} + +func (s *MatcherV1) Close() error { + runtime.SetFinalizer(s, nil) + return s.storeCommon.Close() +} diff --git a/datastore/postgres/v2/matcher_v1_enrichment.go b/datastore/postgres/v2/matcher_v1_enrichment.go new file mode 100644 index 000000000..605b0164e --- /dev/null +++ b/datastore/postgres/v2/matcher_v1_enrichment.go @@ -0,0 +1,152 @@ +package postgres + +import ( + "context" + "crypto/md5" + "errors" + "fmt" + "io" + "sort" + + "github.com/google/uuid" + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgxpool" + "github.com/quay/zlog" + + "github.com/quay/claircore/libvuln/driver" +) + +// UpdateEnrichments creates a new UpdateOperation, inserts the provided +// EnrichmentRecord(s), and ensures enrichments from previous updates are not +// queried by clients. +func (s *MatcherV1) UpdateEnrichments(ctx context.Context, name string, fp driver.Fingerprint, es []driver.EnrichmentRecord) (_ uuid.UUID, err error) { + ctx, done := s.method(ctx, &err) + defer done() + + type digestPair struct { + Kind string + Digest []byte + } + hashes := make([]digestPair, len(es)) + func() { + _, span := tracer.Start(ctx, `doHashes`) + defer span.End() + for i := range es { + hashes[i].Kind, hashes[i].Digest = hashEnrichment(&es[i]) + } + }() + + var ref uuid.UUID + err = pgx.BeginTxFunc(ctx, s.pool, txRW, s.tx(ctx, `UpdateEnrichments`, func(ctx context.Context, tx pgx.Tx) (err error) { + var id uint64 + err = pgx.BeginFunc(ctx, tx, s.call(ctx, `create`, func(ctx context.Context, tx pgx.Tx, query string) error { + if err := tx.QueryRow(ctx, query, name, string(fp)).Scan(&id, &ref); err != nil { + return err + } + return nil + })) + if err != nil { + return fmt.Errorf("unable to create enrichment update operation: %w", err) + } + zlog.Debug(ctx). + Str("ref", ref.String()). + Msg("update_operation created") + + err = pgx.BeginFunc(ctx, tx, s.call(ctx, `insert`, func(ctx context.Context, tx pgx.Tx, query string) error { + var batch pgx.Batch + for i := range es { + batch.Queue(query, hashes[i].Kind, hashes[i].Digest, name, es[i].Tags, es[i].Enrichment) + } + res := tx.SendBatch(ctx, &batch) + defer res.Close() + for range es { + if _, err := res.Exec(); err != nil { + return err + } + } + return nil + })) + if err != nil { + return fmt.Errorf("unable to insert enrichments: %w", err) + } + + err = pgx.BeginFunc(ctx, tx, s.call(ctx, `associate`, func(ctx context.Context, tx pgx.Tx, query string) error { + var batch pgx.Batch + for i := range es { + batch.Queue(query, hashes[i].Kind, hashes[i].Digest, name, id) + } + res := tx.SendBatch(ctx, &batch) + defer res.Close() + for range es { + if _, err := res.Exec(); err != nil { + return err + } + } + return nil + })) + if err != nil { + return fmt.Errorf("unable to associate enrichments: %w", err) + } + + return nil + })) + if err != nil { + return uuid.Nil, err + } + + zlog.Debug(ctx). + Stringer("ref", ref). + Int("inserted", len(es)). + Msg("update_operation committed") + + _ = s.pool.AcquireFunc(ctx, s.acquire(ctx, `refresh`, func(ctx context.Context, c *pgxpool.Conn, query string) error { + if _, err := c.Exec(ctx, query); err != nil { + // TODO(hank) Log? + return fmt.Errorf("unable to refresh update operations view: %w", err) + } + return nil + })) + + return ref, nil +} + +func hashEnrichment(r *driver.EnrichmentRecord) (k string, d []byte) { + h := md5.New() + sort.Strings(r.Tags) + for _, t := range r.Tags { + io.WriteString(h, t) + h.Write([]byte("\x00")) + } + h.Write(r.Enrichment) + return "md5", h.Sum(nil) +} + +func (s *MatcherV1) GetEnrichment(ctx context.Context, name string, tags []string) (res []driver.EnrichmentRecord, err error) { + ctx, done := s.method(ctx, &err) + defer done() + + res = make([]driver.EnrichmentRecord, 0, 8) // Guess at capacity. + err = pgx.BeginTxFunc(ctx, s.pool, txRO, s.call(ctx, `get`, func(ctx context.Context, tx pgx.Tx, query string) (err error) { + rows, err := tx.Query(ctx, query, name, tags) + if err != nil { + return err + } + for rows.Next() { + i := len(res) + res = append(res, driver.EnrichmentRecord{}) + r := &res[i] + if err := rows.Scan(&r.Tags, &r.Enrichment); err != nil { + return err + } + } + return rows.Err() + })) + switch { + case errors.Is(err, nil): + case errors.Is(err, pgx.ErrNoRows): + return nil, nil + default: + return nil, err + } + return res, nil +} diff --git a/datastore/postgres/v2/matcher_v1_gc.go b/datastore/postgres/v2/matcher_v1_gc.go new file mode 100644 index 000000000..7fc2ae52f --- /dev/null +++ b/datastore/postgres/v2/matcher_v1_gc.go @@ -0,0 +1,111 @@ +package postgres + +import ( + "context" + + "github.com/google/uuid" + "github.com/jackc/pgx/v5" + "github.com/quay/zlog" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" +) + +// GCThrottle sets a limit for the number of deleted update operations (and +// subsequent cascade deletes in the uo_vuln table) that can occur in a GC run. +const GCThrottle = 50 + +// GC performs garbage collection on tables in the Matcher store. +// +// GC is split into two phases, first it will identify any update operations +// which are older then the provided keep value and delete these. +// +// Next it will perform updater-based deletions of any vulnerabilities from the +// vuln table which are not longer referenced by update operations. +// +// The GC is throttled to not overload the database with CASCADE deletes. If a +// full GC is required, run this method until the returned value is 0. +func (s *MatcherV1) GC(ctx context.Context, keep int) (_ int64, err error) { + ctx, done := s.method(ctx, &err) + defer done() + var ( + total int64 + deleted int64 + ) + + err = pgx.BeginTxFunc(ctx, s.pool, txRW, s.tx(ctx, `GC`, func(ctx context.Context, tx pgx.Tx) error { + var ops []uuid.UUID + span := trace.SpanFromContext(ctx) + + err = pgx.BeginFunc(ctx, tx, s.call(ctx, `eligible`, func(ctx context.Context, tx pgx.Tx, query string) (err error) { + rows, err := tx.Query(ctx, query, keep+1, keep) + if err != nil { + return err + } + tmp, err := pgx.CollectRows(rows, pgx.RowTo[[]uuid.UUID]) + if err != nil { + return err + } + for _, t := range tmp { + ops = append(ops, t...) + } + return nil + })) + if err != nil { + return err + } + + total = int64(len(ops)) + switch { + case len(ops) > GCThrottle: + ops = ops[:GCThrottle] + case len(ops) == 0: + return nil + } + span.SetAttributes(attribute.Int64("total", total), attribute.Int("ops", len(ops))) + + err = pgx.BeginFunc(ctx, tx, s.call(ctx, `delete_ops`, func(ctx context.Context, tx pgx.Tx, query string) error { + tag, err := s.pool.Exec(ctx, query, ops) + deleted = tag.RowsAffected() + return err + })) + if err != nil { + return err + } + + var updaters []string + err = pgx.BeginFunc(ctx, tx, s.call(ctx, `distinct`, func(ctx context.Context, tx pgx.Tx, query string) (err error) { + rows, err := tx.Query(ctx, query) + if err != nil { + return err + } + updaters, err = pgx.CollectRows(rows, pgx.RowTo[string]) + return err + })) + if err != nil { + return err + } + + for _, u := range updaters { + err = pgx.BeginFunc(ctx, tx, s.call(ctx, `orphaned`, func(ctx context.Context, tx pgx.Tx, query string) (err error) { + ctx = zlog.ContextWithValues(ctx, "updater", u) + trace.SpanFromContext(ctx).SetAttributes(attribute.String("updater", u)) + zlog.Debug(ctx). + Msg("clean up start") + zlog.Debug(ctx).Msg("clean up done") + + _, err = tx.Exec(ctx, query, u) + return err + })) + if err != nil { + return err + } + } + + return nil + })) + if err != nil { + return 0, err + } + + return total - deleted, nil +} diff --git a/datastore/postgres/v2/matcher_v1_gc_test.go b/datastore/postgres/v2/matcher_v1_gc_test.go new file mode 100644 index 000000000..f04a6402e --- /dev/null +++ b/datastore/postgres/v2/matcher_v1_gc_test.go @@ -0,0 +1,193 @@ +package postgres + +import ( + "context" + "crypto/rand" + "encoding/hex" + "io" + "net/http" + "testing" + + "github.com/jackc/pgx/v5/pgxpool" + "github.com/quay/zlog" + + "github.com/quay/claircore" + "github.com/quay/claircore/libvuln/driver" + "github.com/quay/claircore/libvuln/updates" + "github.com/quay/claircore/locksource/pglock" + "github.com/quay/claircore/test" + "github.com/quay/claircore/test/integration" + pgtest "github.com/quay/claircore/test/postgres/v2" +) + +type updaterMock struct { + _name func() string + _fetch func(_ context.Context, _ driver.Fingerprint) (io.ReadCloser, driver.Fingerprint, error) + _parse func(ctx context.Context, contents io.ReadCloser) ([]*claircore.Vulnerability, error) +} + +func (u *updaterMock) Name() string { + return u._name() +} + +func (u *updaterMock) Fetch(ctx context.Context, fp driver.Fingerprint) (io.ReadCloser, driver.Fingerprint, error) { + return u._fetch(ctx, fp) +} + +func (u *updaterMock) Parse(ctx context.Context, contents io.ReadCloser) ([]*claircore.Vulnerability, error) { + return u._parse(ctx, contents) +} + +type gcTestcase struct { + Name string + UpdateOps int + Keep int +} + +// TestGC confirms the garbage collection of +// vulnerabilities works correctly. +func TestGC(t *testing.T) { + integration.NeedDB(t) + + // mock returns exactly one random vuln each time its Parse method is called. + // each update operation will be associated with a single vuln. + mock := &updaterMock{ + _name: func() string { return "MockUpdater" }, + _fetch: func(_ context.Context, _ driver.Fingerprint) (io.ReadCloser, driver.Fingerprint, error) { + return nil, "", nil + }, + _parse: func(ctx context.Context, contents io.ReadCloser) ([]*claircore.Vulnerability, error) { + return []*claircore.Vulnerability{ + { + Name: randString(t), + Updater: "MockUpdater", + Package: test.GenUniquePackages(1)[0], + }, + }, nil + }, + } + max := func(x, y int) int { + if x < y { + return y + } + return x + } + + // These tests maintain a one-to-one relationship between update operations + // and a linked vulnerability for simplicity. In other words, each update + // operation inserts one vulnerability and each deletion of an update + // operation should delete one vulnerability. + table := []gcTestcase{ + { + Name: "Small", + UpdateOps: 4, + Keep: 3, + }, + { + Name: "Large", + UpdateOps: 100, + Keep: 50, + }, + { + Name: "Odd", + UpdateOps: 37, + Keep: 23, + }, + { + Name: "Inverted", + UpdateOps: 10, + Keep: 50, + }, + { + Name: "Throttle", + UpdateOps: 60, + Keep: 5, + }, + } + + for _, tc := range table { + t.Run(tc.Name, func(t *testing.T) { + ctx := zlog.Test(context.Background(), t) + cfg := pgtest.TestMatcherDB(ctx, t) + pool, err := pgxpool.NewWithConfig(ctx, cfg.Copy()) + if err != nil { + t.Fatal(err) + } + defer pool.Close() + store, err := NewMatcherV1(ctx, cfg, WithMigrations) + if err != nil { + t.Fatal(err) + } + defer store.Close() + locks, err := pglock.New(ctx, cfg) + if err != nil { + t.Fatal(err) + } + defer locks.Close() + mgr, err := updates.NewManager( + ctx, + store, + locks, + http.DefaultClient, + updates.WithEnabled([]string{}), + updates.WithOutOfTree([]driver.Updater{mock}), + ) + if err != nil { + t.Fatalf("failed creating update manager: %v", err) + } + defer func() { + if t.Failed() { + dumptable(ctx, t, pool, "update_operation") + } + }() + + for i := 0; i < tc.UpdateOps; i++ { + if err := mgr.Run(ctx); err != nil { + t.Fatalf("manager failed to run: %v", err) + } + } + + // confirm update operations exist + ops, err := store.GetUpdateOperations(ctx, driver.VulnerabilityKind) + if err != nil { + t.Fatalf("failed obtaining update ops: %v", err) + } + if got, want := len(ops[mock.Name()]), tc.UpdateOps; got != want { + t.Fatalf("unexpected number of update operations: got: %d want: %d", got, want) + } + + // Run GC + expectedNotDone := max(tc.UpdateOps-tc.Keep-GCThrottle, 0) + notDone, err := store.GC(ctx, tc.Keep) + if err != nil { + t.Fatalf("error while performing GC: %v", err) + } + if got, want := notDone, int64(expectedNotDone); got != want { + t.Fatalf("unexpected number of leftover update operations: got: %d, want: %d", got, want) + } + + wantKeep := tc.Keep + if tc.UpdateOps < tc.Keep { + wantKeep = tc.UpdateOps + } + ops, err = store.GetUpdateOperations(ctx, driver.VulnerabilityKind) + if err != nil { + t.Fatalf("failed obtaining update ops: %v", err) + } + //t.Logf("ops: %v", ops) + expectedRemaining := wantKeep + expectedNotDone + if got, want := len(ops[mock.Name()]), expectedRemaining; got != want { + t.Fatalf("unexpected number of update operations remaining: got: %d want: %d", got, want) + } + }) + } +} + +func randString(t *testing.T) string { + buf := make([]byte, 4, 4) + _, err := io.ReadAtLeast(rand.Reader, buf, len(buf)) + if err != nil { + t.Fatalf("failed to generate random string: %v", err) + } + return hex.EncodeToString(buf) +} diff --git a/datastore/postgres/v2/matcher_v1_updateoperation.go b/datastore/postgres/v2/matcher_v1_updateoperation.go new file mode 100644 index 000000000..4d050d61d --- /dev/null +++ b/datastore/postgres/v2/matcher_v1_updateoperation.go @@ -0,0 +1,301 @@ +package postgres + +import ( + "context" + "errors" + "fmt" + "strconv" + + "github.com/google/uuid" + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgxpool" + "github.com/quay/zlog" + + "github.com/quay/claircore" + "github.com/quay/claircore/libvuln/driver" +) + +// GetUpdateDiff implements [datastore.MatcherV1]. +func (s *MatcherV1) GetUpdateDiff(ctx context.Context, prev, cur uuid.UUID) (_ *driver.UpdateDiff, err error) { + ctx, done := s.method(ctx, &err) + defer done() + if cur == uuid.Nil { + err = errors.New(`nil uuid is invalid as "current" endpoint`) + return nil, err + } + + var ret driver.UpdateDiff + + err = pgx.BeginTxFunc(ctx, s.pool, txRO, s.tx(ctx, `GetDiff`, func(ctx context.Context, tx pgx.Tx) (err error) { + err = pgx.BeginFunc(ctx, tx, s.call(ctx, `confirm`, func(ctx context.Context, tx pgx.Tx, query string) (err error) { + var discard int + return tx.QueryRow(ctx, query, prev, cur).Scan(&discard) + })) + if err != nil { + return err + } + + for _, v := range []struct { + Attr string + ID uuid.UUID + Dst *driver.UpdateOperation + }{ + {Attr: "cur", ID: cur, Dst: &ret.Cur}, + {Attr: "prev", ID: prev, Dst: &ret.Prev}, + } { + v.Dst.Ref = v.ID + if v.ID == uuid.Nil { + continue + } + err = pgx.BeginFunc(ctx, tx, s.call(ctx, `populaterefs`, func(ctx context.Context, tx pgx.Tx, query string) (err error) { + err = tx.QueryRow(ctx, query, v.ID).Scan( + &v.Dst.Updater, + &v.Dst.Fingerprint, + &v.Dst.Date, + ) + return err + })) + if err != nil { + return err + } + } + + for _, v := range []struct { + Attr string + A, B uuid.UUID + Dst *[]claircore.Vulnerability + }{ + {Attr: "added", A: cur, B: prev, Dst: &ret.Added}, + {Attr: "removed", A: prev, B: cur, Dst: &ret.Removed}, + } { + if v.A == uuid.Nil { + continue + } + err = pgx.BeginFunc(ctx, tx, s.call(ctx, `load`, func(ctx context.Context, tx pgx.Tx, query string) (err error) { + rows, err := tx.Query(ctx, query, v.A, v.B) + if err != nil { + return err + } + defer rows.Close() + for rows.Next() { + i := len(*v.Dst) + *v.Dst = append(*v.Dst, claircore.Vulnerability{ + Package: &claircore.Package{}, + Dist: &claircore.Distribution{}, + Repo: &claircore.Repository{}, + }) + if err = scanVulnerability(&(*v.Dst)[i], rows); err != nil { + return err + } + } + return rows.Err() + })) + if err != nil { + return err + } + } + + return nil + })) + if err != nil { + return nil, err + } + + return &ret, nil +} + +// GetLatestUpdateRef implements [driver.Updater]. +func (s *MatcherV1) GetLatestUpdateRef(ctx context.Context, kind driver.UpdateKind) (_ uuid.UUID, err error) { + ctx, done := s.method(ctx, &err) + defer done() + + var name string + switch kind { + case "": + name = `any` + case driver.EnrichmentKind: + name = `enrichment` + case driver.VulnerabilityKind: + name = `vulnerability` + } + var ref uuid.UUID + err = s.pool.AcquireFunc(ctx, s.acquire(ctx, name, func(ctx context.Context, c *pgxpool.Conn, query string) error { + return c.QueryRow(ctx, query).Scan(&ref) + })) + switch { + case errors.Is(err, nil): + case errors.Is(err, pgx.ErrNoRows): + return uuid.Nil, nil + default: + return uuid.Nil, err + } + + return ref, nil +} + +// GetLatestUpdateRefs implements [driver.Updater]. +func (s *MatcherV1) GetLatestUpdateRefs(ctx context.Context, kind driver.UpdateKind) (_ map[string][]driver.UpdateOperation, err error) { + ctx, done := s.method(ctx, &err) + defer done() + + var name string + switch kind { + case "": + name = `any` + case driver.EnrichmentKind: + name = `enrichment` + case driver.VulnerabilityKind: + name = `vulnerability` + } + ret := make(map[string][]driver.UpdateOperation) + + err = s.pool.AcquireFunc(ctx, s.acquire(ctx, name, func(ctx context.Context, c *pgxpool.Conn, query string) error { + rows, err := c.Query(ctx, query) + if err != nil { + return err + } + defer rows.Close() + for rows.Next() { + var op driver.UpdateOperation + err := rows.Scan( + &op.Updater, + &op.Ref, + &op.Fingerprint, + &op.Date, + ) + if err != nil { + return err + } + ret[op.Updater] = append(ret[op.Updater], op) + } + ev := zlog.Debug(ctx) + if ev.Enabled() { + ct := 0 + for _, ops := range ret { + ct += len(ops) + } + ev = ev.Int("count", ct) + } + ev.Msg("found updaters") + return nil + })) + switch { + case errors.Is(err, nil): + case errors.Is(err, pgx.ErrNoRows): + return nil, nil + default: + return nil, err + } + + return ret, nil +} + +func (s *MatcherV1) GetUpdateOperations(ctx context.Context, kind driver.UpdateKind, updater ...string) (_ map[string][]driver.UpdateOperation, err error) { + ctx, done := s.method(ctx, &err) + defer done() + + var name string + switch kind { + case "": + name = `any` + case driver.EnrichmentKind: + name = `enrichment` + case driver.VulnerabilityKind: + name = `vulnerability` + } + ret := make(map[string][]driver.UpdateOperation) + + err = pgx.BeginTxFunc(ctx, s.pool, txRO, s.tx(ctx, `GetUpdateOperations`, func(ctx context.Context, tx pgx.Tx) (err error) { + if len(updater) == 0 { + err = pgx.BeginFunc(ctx, tx, s.call(ctx, `getupdaters`, func(ctx context.Context, tx pgx.Tx, query string) error { + rows, err := tx.Query(ctx, query) + if err != nil { + return err + } + defer rows.Close() + for rows.Next() { + var u string + if err := rows.Scan(&u); err != nil { + return err + } + updater = append(updater, u) + } + return rows.Err() + })) + if err != nil { + return err + } + } + + err = pgx.BeginFunc(ctx, tx, s.call(ctx, name, func(ctx context.Context, tx pgx.Tx, query string) error { + rows, err := tx.Query(ctx, query, updater) + if err != nil { + return err + } + defer rows.Close() + for rows.Next() { + var uo driver.UpdateOperation + if err := rows.Scan( + &uo.Ref, + &uo.Updater, + &uo.Fingerprint, + &uo.Date, + ); err != nil { + return fmt.Errorf("failed to scan update operation for updater %q: %w", uo.Updater, err) + } + ret[uo.Updater] = append(ret[uo.Updater], uo) + } + return rows.Err() + })) + if err != nil { + return err + } + return nil + })) + switch { + case errors.Is(err, nil): + case errors.Is(err, pgx.ErrNoRows): + return ret, nil + default: + return nil, err + } + + return ret, nil +} + +// TODO(hank) Do this differently. +func scanVulnerability(v *claircore.Vulnerability, row pgx.CollectableRow) error { + var id uint64 + if err := row.Scan( + &id, + &v.Name, + &v.Updater, + &v.Description, + &v.Issued, + &v.Links, + &v.Severity, + &v.NormalizedSeverity, + &v.Package.Name, + &v.Package.Version, + &v.Package.Module, + &v.Package.Arch, + &v.Package.Kind, + &v.Dist.DID, + &v.Dist.Name, + &v.Dist.Version, + &v.Dist.VersionCodeName, + &v.Dist.VersionID, + &v.Dist.Arch, + &v.Dist.CPE, + &v.Dist.PrettyName, + &v.ArchOperation, + &v.Repo.Name, + &v.Repo.Key, + &v.Repo.URI, + &v.FixedInVersion, + ); err != nil { + return err + } + v.ID = strconv.FormatUint(id, 10) + return nil +} diff --git a/datastore/postgres/v2/matcher_v1_updateoperation_test.go b/datastore/postgres/v2/matcher_v1_updateoperation_test.go new file mode 100644 index 000000000..a57865e4c --- /dev/null +++ b/datastore/postgres/v2/matcher_v1_updateoperation_test.go @@ -0,0 +1,189 @@ +package postgres + +import ( + "context" + "fmt" + "testing" + + "github.com/google/uuid" + "github.com/quay/zlog" + + "github.com/quay/claircore" + "github.com/quay/claircore/libvuln/driver" + "github.com/quay/claircore/test/integration" + pgtest "github.com/quay/claircore/test/postgres/v2" +) + +const updater string = "test-updater" + +type diffTestCase struct { + Added, Removed int + FirstOp, SecondOp []*claircore.Vulnerability +} + +// TestGetUpdateDiff creates two update operations in the test DB and calculates +// their diff. This flow is also tested in TestE2E. However, not all the cases +// are captured there, e.g. if there's no difference between the two operations. +func TestGetUpdateDiff(t *testing.T) { + integration.NeedDB(t) + ctx := zlog.Test(context.Background(), t) + + cases := []diffTestCase{ + // second op adds two new vulns + { + Added: 2, + Removed: 0, + FirstOp: []*claircore.Vulnerability{ + { + Updater: updater, + Package: &claircore.Package{ + Name: "vi", + }, + }, + }, + SecondOp: []*claircore.Vulnerability{ + { + Updater: updater, + Package: &claircore.Package{ + Name: "vi", + }, + }, + { + Updater: updater, + Package: &claircore.Package{ + Name: "vim", + }, + }, + { + Updater: updater, + Package: &claircore.Package{ + Name: "nano", + }, + }, + }, + }, + // one vuln is the same for first and second op, the other one differs + { + Added: 1, + Removed: 1, + FirstOp: []*claircore.Vulnerability{ + { + Updater: updater, + Package: &claircore.Package{ + Name: "grep", + }, + }, + { + Updater: updater, + Package: &claircore.Package{ + Name: "sed", + }, + }, + }, + SecondOp: []*claircore.Vulnerability{ + { + Updater: updater, + Package: &claircore.Package{ + Name: "grep", + }, + }, + { + Updater: updater, + Package: &claircore.Package{ + Name: "awk", + }, + }, + }, + }, + // first op has two more vulns that the second op + { + Added: 0, + Removed: 2, + FirstOp: []*claircore.Vulnerability{ + { + Updater: updater, + Package: &claircore.Package{ + Name: "python3", + }, + }, + { + Updater: updater, + Package: &claircore.Package{ + Name: "python3-crypto", + }, + }, + { + Updater: updater, + Package: &claircore.Package{ + Name: "python3-urllib3", + }, + }, + }, + SecondOp: []*claircore.Vulnerability{ + { + Updater: updater, + Package: &claircore.Package{ + Name: "python3", + }, + }, + }, + }, + // no difference between first and second op + { + Added: 0, + Removed: 0, + FirstOp: []*claircore.Vulnerability{ + { + Updater: updater, + Package: &claircore.Package{ + Name: "perl", + }, + }, + }, + SecondOp: []*claircore.Vulnerability{ + { + Updater: updater, + Package: &claircore.Package{ + Name: "perl", + }, + }, + }, + }, + } + + // prepare DB + cfg := pgtest.TestMatcherDB(ctx, t) + store, err := NewMatcherV1(ctx, cfg, WithMigrations) + if err != nil { + t.Fatal(err) + } + defer store.Close() + + // run test cases + for _, tc := range cases { + name := getTestCaseName(tc) + prev, err := store.UpdateVulnerabilities(ctx, updater, driver.Fingerprint(uuid.New().String()), tc.FirstOp) + if err != nil { + t.Fatalf("failed to perform update for first op: %v", err) + } + cur, err := store.UpdateVulnerabilities(ctx, updater, driver.Fingerprint(uuid.New().String()), tc.SecondOp) + if err != nil { + t.Fatalf("failed to perform update for second op: %v", err) + } + diff, err := store.GetUpdateDiff(ctx, prev, cur) + if err != nil { + t.Fatalf("received error getting UpdateDiff: %v", err) + } + + if l := len(diff.Added); l != tc.Added { + t.Fatalf("%s: got %d added vulns, want %d", name, l, tc.Added) + } + if l := len(diff.Removed); l != tc.Removed { + t.Fatalf("%s: got %d removed vulns, want %d", name, l, tc.Removed) + } + } +} + +func getTestCaseName(tc diffTestCase) string { + return fmt.Sprintf("%d added and %d removed", tc.Added, tc.Removed) +} diff --git a/datastore/postgres/v2/matcher_v1_updater_test.go b/datastore/postgres/v2/matcher_v1_updater_test.go new file mode 100644 index 000000000..711c27ef5 --- /dev/null +++ b/datastore/postgres/v2/matcher_v1_updater_test.go @@ -0,0 +1,558 @@ +package postgres + +import ( + "context" + "encoding/binary" + "errors" + "fmt" + "hash/fnv" + "strconv" + "testing" + "time" + + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" + "github.com/google/uuid" + "github.com/jackc/pgx/v5/pgxpool" + "github.com/quay/zlog" + + "github.com/quay/claircore" + "github.com/quay/claircore/datastore" + "github.com/quay/claircore/libvuln/driver" + "github.com/quay/claircore/test" + "github.com/quay/claircore/test/integration" + pgtest "github.com/quay/claircore/test/postgres/v2" +) + +// TestUpdateE2E performs an end to end test of update operations and diffing +func TestUpdateE2E(t *testing.T) { + integration.NeedDB(t) + ctx := zlog.Test(context.Background(), t) + + cases := []updateE2e{ + { + Name: "10Add2", + Insert: 10, + Updates: 2, + }, + { + Name: "100Add2", + Insert: 100, + Updates: 2, + }, + { + Name: "10Add20", + Insert: 10, + Updates: 20, + }, + } + for _, tc := range cases { + c := &tc + t.Run(c.Name, c.Run(ctx)) + } +} + +// UpdateE2e implements a multi-phase test ensuring an update operation and +// diff works end to end. +type updateE2e struct { + Name string + Insert int + Updates int + + // These are all computed values or results that need to hang around between + // tests. + updater string + s datastore.MatcherV1 + pool *pgxpool.Pool + updateOps []driver.UpdateOperation +} + +func (e *updateE2e) Run(ctx context.Context) func(*testing.T) { + h := fnv.New64a() + h.Write([]byte(e.Name)) + binary.Write(h, binary.BigEndian, int64(e.Insert)) + binary.Write(h, binary.BigEndian, int64(e.Updates)) + e.updater = strconv.FormatUint(h.Sum64(), 36) + order := []struct { + Name string + Test func(context.Context) func(*testing.T) + }{ + {"Update", e.Update}, + {"GetUpdateOperations", e.GetUpdateOperations}, + {"recordUpdaterStatus", e.recordUpdaterStatus}, + {"Diff", e.Diff}, + {"DeleteUpdateOperations", e.DeleteUpdateOperations}, + } + return func(t *testing.T) { + ctx := zlog.Test(ctx, t) + cfg := pgtest.TestMatcherDB(ctx, t) + var err error + e.pool, err = pgxpool.NewWithConfig(ctx, cfg) + if err != nil { + t.Fatal(err) + } + store, err := NewMatcherV1(ctx, cfg) + if err != nil { + t.Fatal(err) + } + e.s = store + t.Cleanup(func() { + e.pool.Close() + store.Close() + }) + + for _, sub := range order { + if !t.Run(sub.Name, sub.Test(ctx)) { + t.FailNow() + } + } + } +} + +const ( + opStep = 10 +) + +func (e *updateE2e) vulns() [][]*claircore.Vulnerability { + sz := e.Insert + (opStep * e.Updates) + vs := test.GenUniqueVulnerabilities(sz, e.updater) + r := make([][]*claircore.Vulnerability, e.Updates) + for i := 0; i < e.Updates; i++ { + off := i * opStep + r[i] = vs[off : off+e.Insert] + } + return r +} + +var updateOpCmp = cmpopts.IgnoreFields(driver.UpdateOperation{}, "Date") + +// Update confirms multiple updates to the vulstore +// do the correct things. +func (e *updateE2e) Update(ctx context.Context) func(*testing.T) { + fp := driver.Fingerprint(uuid.New().String()) + return func(t *testing.T) { + ctx := zlog.Test(ctx, t) + e.updateOps = make([]driver.UpdateOperation, 0, e.Updates) + for _, vs := range e.vulns() { + ref, err := e.s.UpdateVulnerabilities(ctx, e.updater, fp, vs) + if err != nil { + t.Fatalf("failed to perform update: %v", err) + } + + // attach generated UpdateOperations to test retrieval + // date can be ignored. add in stack order to compare + e.updateOps = append(e.updateOps, driver.UpdateOperation{ + Ref: ref, + Fingerprint: fp, + Updater: e.updater, + }) + + checkInsertedVulns(ctx, t, e.pool, ref, vs) + } + t.Log("ok") + } +} + +// GetUpdateOperations confirms retrieving an update +// operation returns the expected results. +func (e *updateE2e) GetUpdateOperations(ctx context.Context) func(*testing.T) { + return func(t *testing.T) { + ctx := zlog.Test(ctx, t) + out, err := e.s.GetUpdateOperations(ctx, driver.VulnerabilityKind, e.updater) + if err != nil { + t.Fatalf("failed to get UpdateOperations: %v", err) + } + // confirm number of update operations + if got, want := len(out[e.updater]), e.Updates; got != want { + t.Fatalf("wrong number of update operations: got: %d, want: %d", got, want) + } + // confirm retrieved update operations match + // test generated values + for i := 0; i < e.Updates; i++ { + ri := e.Updates - i - 1 + want, got := e.updateOps[ri], out[e.updater][i] + if !cmp.Equal(want, got, updateOpCmp) { + t.Fatal(cmp.Diff(want, got, updateOpCmp)) + } + } + t.Log("ok") + } +} + +type update struct { + UpdaterName string `json:"updater_name"` + LastAttempt time.Time `json:"last_attempt"` + LastSuccess *time.Time `json:"last_success"` + LastRunSucceeded bool `json:"last_run_succeeded"` + LastAttemptFingerprint driver.Fingerprint `json:"last_attempt_fingerprint"` + LastError *string `json:"last_error"` +} + +// recordUpdaterStatus confirms multiple updates to record last update times +// and then an update to an whole updater set +func (e *updateE2e) recordUpdaterStatus(ctx context.Context) func(*testing.T) { + return func(t *testing.T) { + ctx := zlog.Test(ctx, t) + errorText := "test error" + firstUpdateDate := time.Date(2020, time.Month(1), 22, 2, 10, 30, 0, time.UTC) + secondUpdateDate := time.Date(2021, time.Month(2), 21, 1, 10, 30, 0, time.UTC) + var emptyFingerprint driver.Fingerprint + updates := []update{ + { + UpdaterName: "test-updater-1", + LastAttempt: firstUpdateDate, + LastSuccess: &firstUpdateDate, + LastRunSucceeded: true, + LastAttemptFingerprint: driver.Fingerprint(uuid.New().String()), + }, + { + UpdaterName: "test-updater-1", + LastAttempt: secondUpdateDate, + LastSuccess: &secondUpdateDate, + LastRunSucceeded: true, + LastAttemptFingerprint: driver.Fingerprint(uuid.New().String()), + }, + { + UpdaterName: "test-updater-2", + LastAttempt: firstUpdateDate, + LastSuccess: &firstUpdateDate, + LastRunSucceeded: true, + LastAttemptFingerprint: emptyFingerprint, + }, + { + UpdaterName: "test-updater-3", + LastAttempt: firstUpdateDate, + LastRunSucceeded: false, + LastAttemptFingerprint: driver.Fingerprint(uuid.New().String()), + LastError: &errorText, + }, + } + expectedTableContents := make(map[string]update) + for _, update := range updates { + var updateError error + if update.LastError != nil { + updateError = errors.New(*update.LastError) + } + err := e.s.RecordUpdaterStatus(ctx, update.UpdaterName, update.LastAttempt, update.LastAttemptFingerprint, updateError) + if err != nil { + t.Fatalf("failed to perform update: %v", err) + } + expectedTableContents[update.UpdaterName] = update + } + checkUpdateTimes(ctx, t, e.pool, expectedTableContents) + + newUpdaterSetTime := time.Date(2021, time.Month(2), 25, 1, 10, 30, 0, time.UTC) + e.s.RecordUpdaterSetStatus(ctx, "test", newUpdaterSetTime) + for updater, row := range expectedTableContents { + row.LastAttempt = newUpdaterSetTime + row.LastSuccess = &newUpdaterSetTime + row.LastRunSucceeded = true + expectedTableContents[updater] = row + } + checkUpdateTimes(ctx, t, e.pool, expectedTableContents) + t.Log("ok") + } +} + +var vulnCmp = cmp.Options{ + cmpopts.IgnoreFields(claircore.Vulnerability{}, "ID", "Package.ID", "Dist.ID", "Repo.ID"), +} + +func orNoIndex(a int) string { + if a < 0 { + return "no index" + } + return fmt.Sprintf("index %d", a) +} + +// Diff fetches Operation diffs from the database and compares them against +// independently calculated diffs. +func (e *updateE2e) Diff(ctx context.Context) func(t *testing.T) { + return func(t *testing.T) { + ctx := zlog.Test(ctx, t) + for n := range e.vulns() { + // This does a bunch of checks so that the first operation is + // compared appropriately. + prev := uuid.Nil + if n != 0 { + prev = e.updateOps[n-1].Ref + } + cur := e.updateOps[n].Ref + t.Logf("comparing %v (%s) and %v (index %d)", prev, orNoIndex(n-1), cur, n) + + diff, err := e.s.GetUpdateDiff(ctx, prev, cur) + if err != nil { + t.Fatalf("received error getting UpdateDiff: %v", err) + } + + expectSz := opStep + if n == 0 { + expectSz = e.Insert + } + if l := len(diff.Added); l != expectSz { + t.Fatalf("got: len == %d, want len == %d", l, expectSz) + } + if n == 0 { + expectSz = 0 + } + if l := len(diff.Removed); l != expectSz { + t.Fatalf("got: len == %d, want len == %d", l, expectSz) + } + + // make sure update operations match generated test values + if prev != diff.Prev.Ref { + t.Errorf("want: %v, got: %v", diff.Prev.Ref, prev) + } + if cur != diff.Cur.Ref { + t.Errorf("want: %v, got: %v", diff.Cur.Ref, cur) + } + + // confirm removed and added vulnerabilities are the ones we expect + pair := e.calcDiff(n) + if n == 0 { + pair[0] = []*claircore.Vulnerability{} + } + // I can't figure out how to make a cmp.Option that does this. + added := make([]*claircore.Vulnerability, len(pair[1])) + for i := range diff.Added { + added[i] = &diff.Added[i] + } + if want, got := pair[1], added; !cmp.Equal(got, want, vulnCmp) { + t.Error(cmp.Diff(got, want, vulnCmp)) + } + + removed := make([]*claircore.Vulnerability, len(pair[0])) + for i := range diff.Removed { + removed[i] = &diff.Removed[i] + } + if want, got := pair[0], removed; !cmp.Equal(want, got, vulnCmp) { + t.Error(cmp.Diff(want, got, vulnCmp)) + } + } + t.Log("ok") + } +} + +func (e *updateE2e) calcDiff(i int) [2][]*claircore.Vulnerability { + if i >= e.Updates { + panic(fmt.Sprintf("update %d out of bounds (%d)", i, e.Updates)) + } + sz := e.Insert + (opStep * e.Updates) + vs := test.GenUniqueVulnerabilities(sz, e.updater) + if i == 0 { + return [...][]*claircore.Vulnerability{{}, vs[:e.Insert]} + } + loff, lend := (i-1)*opStep, i*opStep + roff, rend := loff+e.Insert, lend+e.Insert + return [...][]*claircore.Vulnerability{vs[loff:lend], vs[roff:rend]} +} + +// DeleteUpdateOperations performs a deletion of all UpdateOperations used in +// the test and confirms both the UpdateOperation and vulnerabilities are +// removed from the vulnstore. +func (e *updateE2e) DeleteUpdateOperations(ctx context.Context) func(*testing.T) { + return func(t *testing.T) { + const ( + opExists = `SELECT EXISTS(SELECT 1 FROM update_operation WHERE ref = $1::uuid);` + assocExists = `SELECT EXISTS(SELECT 1 FROM uo_vuln JOIN update_operation uo ON (uo_vuln.uo = uo.id) WHERE uo.ref = $1::uuid);` + ) + var exists bool + ctx := zlog.Test(ctx, t) + for _, op := range e.updateOps { + _, err := e.s.DeleteUpdateOperations(ctx, op.Ref) + if err != nil { + t.Fatalf("failed to get delete UpdateOperation: %v", err) + } + + // Check that the update_operation is removed from the table. + if err := e.pool.QueryRow(ctx, opExists, op.Ref).Scan(&exists); err != nil { + t.Errorf("query failed: %v", err) + } + t.Logf("operation %v exists: %v", op.Ref, exists) + if exists { + t.Error() + } + + // This really shouldn't happen because of the foreign constraint. + if err := e.pool.QueryRow(ctx, assocExists, op.Ref).Scan(&exists); err != nil { + t.Errorf("query failed: %v", err) + } + t.Logf("operation %v exists: %v", op.Ref, exists) + if exists { + t.Error() + } + } + t.Log("ok") + } +} + +// checkInsertedVulns confirms vulnerabilitiles are inserted into the database correctly when +// store.UpdateVulnerabilities is called. +func checkInsertedVulns(ctx context.Context, t *testing.T, pool *pgxpool.Pool, id uuid.UUID, vulns []*claircore.Vulnerability) { + const query = `SELECT + vuln.hash_kind, + vuln.hash, + vuln.updater, + vuln.id, + vuln.name, + vuln.description, + vuln.issued, + vuln.links, + vuln.normalized_severity, + vuln.severity, + vuln.package_name, + vuln.package_version, + vuln.package_module, + vuln.package_arch, + vuln.package_kind, + vuln.dist_id, + vuln.dist_name, + vuln.dist_version, + vuln.dist_version_code_name, + vuln.dist_version_id, + vuln.dist_arch, + vuln.dist_cpe, + vuln.dist_pretty_name, + vuln.arch_operation, + vuln.repo_name, + vuln.repo_key, + vuln.repo_uri, + vuln.fixed_in_version +FROM uo_vuln +JOIN vuln ON vuln.id = uo_vuln.vuln +JOIN update_operation uo ON uo.id = uo_vuln.uo +WHERE uo.ref = $1::uuid;` + expectedVulns := map[string]*claircore.Vulnerability{} + for _, vuln := range vulns { + expectedVulns[vuln.Name] = vuln + } + rows, err := pool.Query(ctx, query, id) + if err != nil { + t.Fatalf("query failed: %v", err) + } + defer rows.Close() + + queriedVulns := map[string]*claircore.Vulnerability{} + for rows.Next() { + var id int64 + var hashKind string + var hash []byte + vuln := claircore.Vulnerability{ + Package: &claircore.Package{}, + Dist: &claircore.Distribution{}, + Repo: &claircore.Repository{}, + } + err := rows.Scan( + &hashKind, + &hash, + &vuln.Updater, + &id, + &vuln.Name, + &vuln.Description, + &vuln.Issued, + &vuln.Links, + &vuln.NormalizedSeverity, + &vuln.Severity, + &vuln.Package.Name, + &vuln.Package.Version, + &vuln.Package.Module, + &vuln.Package.Arch, + &vuln.Package.Kind, + &vuln.Dist.DID, + &vuln.Dist.Name, + &vuln.Dist.Version, + &vuln.Dist.VersionCodeName, + &vuln.Dist.VersionID, + &vuln.Dist.Arch, + &vuln.Dist.CPE, + &vuln.Dist.PrettyName, + &vuln.ArchOperation, + &vuln.Repo.Name, + &vuln.Repo.Key, + &vuln.Repo.URI, + &vuln.FixedInVersion, + ) + vuln.ID = strconv.FormatInt(id, 10) + if err != nil { + t.Fatalf("failed to scan vulnerability: %v", err) + } + // confirm a hash was generated + if hashKind == "" || len(hash) == 0 { + t.Fatalf("failed to identify hash for inserted vulnerability %+v", vuln) + } + queriedVulns[vuln.Name] = &vuln + } + if err := rows.Err(); err != nil { + t.Error(err) + } + + // confirm we did not receive unexpected vulns or bad fields + for name, got := range queriedVulns { + if want, ok := expectedVulns[name]; !ok { + t.Fatalf("received unexpected vuln: %v", got.Name) + } else { + // compare vuln fields. ignore id's + if !cmp.Equal(want, got, vulnCmp) { + t.Fatal(cmp.Diff(want, got, vulnCmp)) + } + } + } + + // confirm queriedVulns contain all expected vulns + for name := range expectedVulns { + if _, ok := queriedVulns[name]; !ok { + t.Fatalf("expected vuln %v was not found in query", name) + } + } +} + +// checkUpdateTimes confirms updater update times are upserted into the database correctly when +// store.RecordUpaterUptdateTime is called. +func checkUpdateTimes(ctx context.Context, t *testing.T, pool *pgxpool.Pool, updates map[string]update) { + const query = `SELECT updater_name, last_attempt, last_success, last_run_succeeded, last_attempt_fingerprint, last_error +FROM updater_status` + + rows, err := pool.Query(ctx, query) + if err != nil { + t.Fatalf("query failed: %v", err) + } + defer rows.Close() + + queriedUpdates := make(map[string]update) + for rows.Next() { + var updateEntry update + err := rows.Scan( + &updateEntry.UpdaterName, + &updateEntry.LastAttempt, + &updateEntry.LastSuccess, + &updateEntry.LastRunSucceeded, + &updateEntry.LastAttemptFingerprint, + &updateEntry.LastError, + ) + if err != nil { + t.Fatalf("failed to scan update: %v", err) + } + queriedUpdates[updateEntry.UpdaterName] = updateEntry + } + if err := rows.Err(); err != nil { + t.Error(err) + } + + // confirm we did not receive unexpected updates + for name, got := range queriedUpdates { + if want, ok := updates[name]; !ok { + t.Fatalf("received unexpected update: %s %v", name, got) + } else { + if !cmp.Equal(want, got) { + t.Fatal(cmp.Diff(want, got)) + } + } + } + + // confirm queriedUpdates contain all expected updates + for name := range updates { + if _, ok := queriedUpdates[name]; !ok { + t.Fatalf("expected update %v was not found in query", name) + } + } +} diff --git a/datastore/postgres/v2/matcher_v1_updaterstatus.go b/datastore/postgres/v2/matcher_v1_updaterstatus.go new file mode 100644 index 000000000..10ca4adee --- /dev/null +++ b/datastore/postgres/v2/matcher_v1_updaterstatus.go @@ -0,0 +1,66 @@ +package postgres + +import ( + "context" + "errors" + "time" + + "github.com/jackc/pgx/v5" + "github.com/quay/claircore/libvuln/driver" + "github.com/quay/zlog" +) + +// RecordUpdaterStatus records that an updater is up to date with vulnerabilities at this time +func (s *MatcherV1) RecordUpdaterStatus(ctx context.Context, updaterName string, updateTime time.Time, fingerprint driver.Fingerprint, updaterError error) (err error) { + ctx, done := s.method(ctx, &err) + defer done() + + failure := !errors.Is(updaterError, nil) + zlog.Debug(ctx). + Str("updater", updaterName). + Bool("failure", failure). + Msg("start recording update") + var returnedUpdaterName string + defer func() { + zlog.Debug(ctx). + Str("updater", returnedUpdaterName). + Msg("done recording update") + }() + + err = pgx.BeginTxFunc(ctx, s.pool, txRW, s.tx(ctx, `RecordUpdaterStatus`, func(ctx context.Context, tx pgx.Tx) (err error) { + // TODO(hank) Consolidate these queries. There's no real reason they + // need to be separated. + if failure { + err = pgx.BeginFunc(ctx, tx, s.call(ctx, `failure`, func(ctx context.Context, tx pgx.Tx, query string) error { + return tx.QueryRow(ctx, query, updaterName, updateTime, fingerprint, updaterError.Error()).Scan(&returnedUpdaterName) + })) + } else { + err = pgx.BeginFunc(ctx, tx, s.call(ctx, `success`, func(ctx context.Context, tx pgx.Tx, query string) error { + return tx.QueryRow(ctx, query, updaterName, updateTime, fingerprint).Scan(&returnedUpdaterName) + })) + } + return err + })) + if err != nil { + return err + } + + return nil +} + +// RecordUpdaterSetStatus records that all updaters from a updater set are up to +// date with vulnerabilities at this time. +func (s *MatcherV1) RecordUpdaterSetStatus(ctx context.Context, updaterSet string, updateTime time.Time) (err error) { + ctx, done := s.method(ctx, &err) + defer done() + + err = pgx.BeginFunc(ctx, s.pool, s.call(ctx, `update`, func(ctx context.Context, tx pgx.Tx, query string) error { + _, err = tx.Exec(ctx, query, updateTime, updaterSet) + return err + })) + if err != nil { + return err + } + + return nil +} diff --git a/datastore/postgres/v2/matcher_v1_vulnerabilities.go b/datastore/postgres/v2/matcher_v1_vulnerabilities.go new file mode 100644 index 000000000..03356ae6e --- /dev/null +++ b/datastore/postgres/v2/matcher_v1_vulnerabilities.go @@ -0,0 +1,454 @@ +package postgres + +import ( + "bytes" + "context" + "crypto/md5" + "errors" + "fmt" + "sort" + "strconv" + "strings" + + "github.com/google/uuid" + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgxpool" + "github.com/quay/zlog" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/trace" + + "github.com/quay/claircore" + "github.com/quay/claircore/datastore" + "github.com/quay/claircore/libvuln/driver" +) + +// Get implements [datastore.MatcherV1Vulnerability]. +func (s *MatcherV1) Get(ctx context.Context, records []*claircore.IndexRecord, opts datastore.MatcherV1VulnerabilityGetOpts) (_ map[string][]*claircore.Vulnerability, err error) { + ctx, done := s.method(ctx, &err) + defer done() + + // Current semantics is that callers must match on package name. + // Whether that's a good idea is up for debate. + cs := make([]driver.MatchConstraint, len(opts.Matchers)+1) + cs[0] = driver.PackageName + copy(cs[1:], opts.Matchers) + sort.Slice(cs, func(i, j int) bool { return cs[i] < cs[j] }) + cs = compact(cs) + va := newVulnArena() + zlog.Debug(ctx). + Int("count", len(records)). + Stringers("constraints", stringers(cs)). + Msg("fetching vulnerabilities") + + err = pgx.BeginTxFunc(ctx, s.pool, txRO, s.call(ctx, `get`, func(ctx context.Context, tx pgx.Tx, query string) (err error) { + var batch pgx.Batch + for _, record := range records { + batch.Queue(query, + constraint(cs, record, driver.PackageName), // $1 + constraint(cs, record, driver.PackageSourceName), // $2 + constraint(cs, record, driver.PackageModule), // $3 + constraint(cs, record, driver.DistributionDID), // $4 + constraint(cs, record, driver.DistributionName), // $5 + constraint(cs, record, driver.DistributionVersionID), // $6 + constraint(cs, record, driver.DistributionVersion), // $7 + constraint(cs, record, driver.DistributionVersionCodeName), // $8 + constraint(cs, record, driver.DistributionPrettyName), // $9 + constraint(cs, record, driver.DistributionCPE), // $10 + constraint(cs, record, driver.DistributionArch), // $11 + constraint(cs, record, driver.RepositoryName), // $12 + normVersionKind(opts.VersionFiltering, record), // $13 + normVersion(opts.VersionFiltering, record), // $14 + ) + } + res := tx.SendBatch(ctx, &batch) + // Make sure to assign to "err" and then return so that the defer can + // add if need be. + defer func() { + err = errors.Join(err, res.Close()) + }() + trace.SpanFromContext(ctx).AddEvent("queries submitted") + + var rowct int + for _, record := range records { + var rows pgx.Rows + rows, err = res.Query() + if err != nil { + err = fmt.Errorf("unable to read query results: %w", err) + return err + } + + va.CurrentID(record.Package.ID) + for rows.Next() { + rowct++ + if err := rows.Scan(va); err != nil { + err = fmt.Errorf("failed to scan vulnerability: %w", err) + return err + } + } + } + zlog.Debug(ctx). + Int("count", rowct). + Msg("read rows") + return nil + })) + switch { + case errors.Is(err, nil): + case errors.Is(err, pgx.ErrNoRows): + return nil, nil + default: + return nil, err + } + return va.Result(), nil +} + +// TODO(hank) Use the [slices] package for the constraint preparation when +// go1.20 support is dropped. + +// Compact is a port of [slices.Compact]. +// +// Remove this when we can use go1.21. +func compact(s []driver.MatchConstraint) []driver.MatchConstraint { + if len(s) < 2 { + return s + } + i := 1 + for k := 1; k < len(s); k++ { + if s[k] != s[k-1] { + if i != k { + s[i] = s[k] + } + i++ + } + } + return s[:i] +} + +// Contstraint returns the correct value for the constraint "which," if it's in +// the constraint set "cs". +func constraint(cs []driver.MatchConstraint, r *claircore.IndexRecord, which driver.MatchConstraint) (ret *string) { + // TODO(hank) Use [slices.Contains]. + contains := false + for i := range cs { + if which == cs[i] { + contains = true + break + } + } + if !contains { + return nil + } + switch which { + case driver.PackageName, driver.PackageSourceName, driver.PackageModule: + pkg := r.Package + switch which { + case driver.PackageName: + ret = &pkg.Name + case driver.PackageSourceName: + src := pkg.Source + ret = &src.Name + case driver.PackageModule: + ret = &pkg.Module + default: + panic(fmt.Sprintf("unimplemented package match constraint: %v", which)) + } + case driver.DistributionDID, driver.DistributionName, driver.DistributionVersion, + driver.DistributionVersionCodeName, driver.DistributionVersionID, + driver.DistributionArch, driver.DistributionCPE, driver.DistributionPrettyName: + dist := r.Distribution + switch which { + case driver.DistributionDID: + ret = &dist.DID + case driver.DistributionName: + ret = &dist.Name + case driver.DistributionVersion: + ret = &dist.Version + case driver.DistributionVersionCodeName: + ret = &dist.VersionCodeName + case driver.DistributionVersionID: + ret = &dist.VersionID + case driver.DistributionArch: + ret = &dist.Arch + case driver.DistributionCPE: + ret = new(string) + *ret = dist.CPE.BindFS() + case driver.DistributionPrettyName: + ret = &dist.PrettyName + default: + panic(fmt.Sprintf("unimplemented distribution match constraint: %v", which)) + } + case driver.RepositoryName: + repo := r.Repository + switch which { + case driver.RepositoryName: + ret = &repo.Name + default: + panic(fmt.Sprintf("unimplemented repository match constraint: %v", which)) + } + default: + panic(fmt.Sprintf("unimplemented match constraint: %v", which)) + } + return ret +} + +// NormVersion returns the encoded NormalizedVersion if "ok". +func normVersion(ok bool, r *claircore.IndexRecord) *string { + if !ok { + return nil + } + var lit strings.Builder + v := r.Package.NormalizedVersion + b := make([]byte, 0, 16) + lit.WriteString("'{") + for i := 0; i < 10; i++ { + if i != 0 { + lit.WriteByte(',') + } + lit.Write(strconv.AppendInt(b, int64(v.V[i]), 10)) + } + lit.WriteString("}'") + ret := lit.String() + return &ret +} + +// NormVersionKind returns the NormalizedVersion Kind if "ok". +func normVersionKind(ok bool, r *claircore.IndexRecord) *string { + if !ok { + return nil + } + return &r.Package.NormalizedVersion.Kind +} + +func stringers[E fmt.Stringer, S ~[]E](s S) []fmt.Stringer { + ret := make([]fmt.Stringer, len(s)) + for i := range s { + ret[i] = s[i] + } + return ret +} + +var ( + zeroRepo claircore.Repository + zeroDist claircore.Distribution +) + +// UpdateVulnerabilities implements [vulnstore.Updater]. +// +// It creates a new UpdateOperation for this update call, inserts the +// provided vulnerabilities and computes a diff comprising the removed +// and added vulnerabilities for this UpdateOperation. +func (s *MatcherV1) UpdateVulnerabilities(ctx context.Context, updater string, fingerprint driver.Fingerprint, vulns []*claircore.Vulnerability) (_ uuid.UUID, err error) { + ctx, done := s.method(ctx, &err) + defer done() + + const hashKind = `md5` + var ref uuid.UUID + type todo struct { + Vulnerability *claircore.Vulnerability + Digest []byte + } + todos := make([]todo, 0, len(vulns)) + for _, v := range vulns { + if v.Package == nil || v.Package.Name == "" { + continue + } + _, d := md5Vuln(v) + todos = append(todos, todo{ + Vulnerability: v, + Digest: d, + }) + } + + err = pgx.BeginTxFunc(ctx, s.pool, txRW, s.tx(ctx, `UpdateVulnerabilities`, func(ctx context.Context, tx pgx.Tx) (err error) { + var id int64 + + err = pgx.BeginFunc(ctx, tx, s.call(ctx, `create`, func(ctx context.Context, tx pgx.Tx, query string) (err error) { + return s.pool.QueryRow(ctx, query, updater, string(fingerprint)).Scan(&id, &ref) + })) + if err != nil { + return err + } + zlog.Debug(ctx). + Str("ref", ref.String()). + Msg("update_operation created") + + err = pgx.BeginFunc(ctx, tx, s.call(ctx, `insert`, func(ctx context.Context, tx pgx.Tx, query string) (err error) { + var batch pgx.Batch + for i, todo := range todos { + vuln := todo.Vulnerability + pkg := vuln.Package + dist := vuln.Dist + repo := vuln.Repo + if dist == nil { + dist = &zeroDist + } + if repo == nil { + repo = &zeroRepo + } + vKind, vrLower, vrUpper := rangefmt(vuln.Range) + + batch.Queue(query, + hashKind, todo.Digest, + vuln.Name, vuln.Updater, vuln.Description, vuln.Issued, vuln.Links, vuln.Severity, vuln.NormalizedSeverity, + pkg.Name, pkg.Version, pkg.Module, pkg.Arch, pkg.Kind, + dist.DID, dist.Name, dist.Version, dist.VersionCodeName, dist.VersionID, dist.Arch, dist.CPE, dist.PrettyName, + repo.Name, repo.Key, repo.URI, + vuln.FixedInVersion, vuln.ArchOperation, vKind, vrLower, vrUpper, + ) + + if i%2000 == 0 && batch.Len() != 0 { + res := tx.SendBatch(ctx, &batch) + for n, lim := 0, batch.Len(); n < lim; n++ { + if _, err := res.Exec(); err != nil { + return fmt.Errorf("failed to queue vulnerability: %w", err) + } + } + if err := res.Close(); err != nil { + return err + } + batch = pgx.Batch{} + } + } + res := tx.SendBatch(ctx, &batch) + for n, lim := 0, batch.Len(); n < lim; n++ { + if _, err := res.Exec(); err != nil { + return fmt.Errorf("failed to queue vulnerability: %w", err) + } + } + return res.Close() + })) + if err != nil { + return err + } + + err = pgx.BeginFunc(ctx, tx, s.call(ctx, `associate`, func(ctx context.Context, tx pgx.Tx, query string) (err error) { + var batch pgx.Batch + for i, todo := range todos { + batch.Queue(query, hashKind, todo.Digest, id) + + if i%2000 == 0 && batch.Len() != 0 { + res := tx.SendBatch(ctx, &batch) + for n, lim := 0, batch.Len(); n < lim; n++ { + if _, err := res.Exec(); err != nil { + return fmt.Errorf("failed to queue association: %w", err) + } + } + if err := res.Close(); err != nil { + return err + } + batch = pgx.Batch{} + } + } + res := tx.SendBatch(ctx, &batch) + for n, lim := 0, batch.Len(); n < lim; n++ { + if _, err := res.Exec(); err != nil { + return fmt.Errorf("failed to queue association: %w", err) + } + } + return res.Close() + })) + if err != nil { + return err + } + + return nil + })) + switch { + case errors.Is(err, nil): + zlog.Debug(ctx). + Str("ref", ref.String()). + Int("skipped", len(vulns)-len(todos)). + Int("inserted", len(todos)). + Msg("update_operation committed") + default: + return uuid.Nil, err + } + + s.pool.AcquireFunc(ctx, s.acquire(ctx, `refresh`, func(ctx context.Context, c *pgxpool.Conn, query string) error { + if _, err := c.Exec(ctx, query); err != nil { + span := trace.SpanFromContext(ctx) + span.SetStatus(codes.Error, "refresh failed") + span.RecordError(fmt.Errorf("could not refresh latest_update_operations: %w", err)) + } + return nil + })) + + return ref, nil +} + +// Md5Vuln creates an md5 hash from the members of the passed-in Vulnerability, +// giving us a stable, context-free identifier for this revision of the +// Vulnerability. +func md5Vuln(v *claircore.Vulnerability) (string, []byte) { + var b bytes.Buffer + b.WriteString(v.Name) + b.WriteString(v.Description) + b.WriteString(v.Issued.String()) + b.WriteString(v.Links) + b.WriteString(v.Severity) + if v.Package != nil { + b.WriteString(v.Package.Name) + b.WriteString(v.Package.Version) + b.WriteString(v.Package.Module) + b.WriteString(v.Package.Arch) + b.WriteString(v.Package.Kind) + } + if v.Dist != nil { + b.WriteString(v.Dist.DID) + b.WriteString(v.Dist.Name) + b.WriteString(v.Dist.Version) + b.WriteString(v.Dist.VersionCodeName) + b.WriteString(v.Dist.VersionID) + b.WriteString(v.Dist.Arch) + b.WriteString(v.Dist.CPE.BindFS()) + b.WriteString(v.Dist.PrettyName) + } + if v.Repo != nil { + b.WriteString(v.Repo.Name) + b.WriteString(v.Repo.Key) + b.WriteString(v.Repo.URI) + } + b.WriteString(v.ArchOperation.String()) + b.WriteString(v.FixedInVersion) + if k, l, u := rangefmt(v.Range); k != nil { + b.WriteString(*k) + b.WriteString(l) + b.WriteString(u) + } + s := md5.Sum(b.Bytes()) + return "md5", s[:] +} + +func rangefmt(r *claircore.Range) (kind *string, lower, upper string) { + lower, upper = "{}", "{}" + if r == nil || r.Lower.Kind != r.Upper.Kind { + return kind, lower, upper + } + + kind = &r.Lower.Kind // Just tested the both kinds are the same. + v := &r.Lower + var buf strings.Builder + b := make([]byte, 0, 16) // 16 byte wide scratch buffer + + buf.WriteByte('{') + for i := 0; i < 10; i++ { + if i != 0 { + buf.WriteByte(',') + } + buf.Write(strconv.AppendInt(b, int64(v.V[i]), 10)) + } + buf.WriteByte('}') + lower = buf.String() + buf.Reset() + v = &r.Upper + buf.WriteByte('{') + for i := 0; i < 10; i++ { + if i != 0 { + buf.WriteByte(',') + } + buf.Write(strconv.AppendInt(b, int64(v.V[i]), 10)) + } + buf.WriteByte('}') + upper = buf.String() + + return kind, lower, upper +} diff --git a/datastore/postgres/v2/matcher_v1_vulnerabilities_test.go b/datastore/postgres/v2/matcher_v1_vulnerabilities_test.go new file mode 100644 index 000000000..905990cc8 --- /dev/null +++ b/datastore/postgres/v2/matcher_v1_vulnerabilities_test.go @@ -0,0 +1,448 @@ +package postgres + +import ( + "context" + "strings" + "testing" + + "github.com/google/go-cmp/cmp" + "github.com/google/uuid" + "github.com/jackc/pgx/v5/pgxpool" + "github.com/quay/claircore/toolkit/types/cpe" + "github.com/quay/zlog" + + "github.com/quay/claircore" + "github.com/quay/claircore/datastore" + "github.com/quay/claircore/libvuln/driver" + "github.com/quay/claircore/test/integration" + pgtest "github.com/quay/claircore/test/postgres/v2" +) + +func TestGet(t *testing.T) { + integration.NeedDB(t) + ctx := zlog.Test(context.Background(), t) + cfg := pgtest.TestMatcherDB(ctx, t) + pool, err := pgxpool.NewWithConfig(ctx, cfg) + if err != nil { + t.Fatal(err) + } + defer pool.Close() + ref := uuid.New() + if _, err := pool.Exec(ctx, `SELECT GetTestSetup(10, 'test', $1);`, ref); err != nil { + t.Fatal(err) + } + store, err := NewMatcherV1(ctx, cfg, WithMigrations) + if err != nil { + t.Fatal(err) + } + defer store.Close() + + tt := []getTestcase{ + { + Name: "MatchName", + IndexRecord: []*claircore.IndexRecord{ + {Package: &claircore.Package{ID: "0", Name: `package_0`}}, + }, + Check: func(t *testing.T, res map[string][]*claircore.Vulnerability, err error) { + if err != nil { + t.Fatal(err) + } + got, want := len(res["0"]), 9 + t.Logf("got: %d results, want: %d results", got, want) + if got != want { + t.Fail() + } + }, + }, + { + Name: "MatchSourceName", + IndexRecord: []*claircore.IndexRecord{ + {Package: &claircore.Package{ID: "0", Name: `none`, Source: &claircore.Package{ID: "1", Name: `package_0`}}}, + }, + Constraints: []driver.MatchConstraint{driver.PackageSourceName}, + Check: func(t *testing.T, res map[string][]*claircore.Vulnerability, err error) { + if err != nil { + t.Fatal(err) + } + got, want := len(res["0"]), 1 + t.Logf("got: %d results, want: %d results", got, want) + if got != want { + t.Fail() + } + }, + }, + { + Name: "MatchModule", + IndexRecord: []*claircore.IndexRecord{ + {Package: &claircore.Package{ID: "pkg1", Name: `package_0`, Module: `module_1`}}, + {Package: &claircore.Package{ID: "pkg2", Name: `package_0`, Module: `module_2`}}, + {Package: &claircore.Package{ID: "pkg3", Name: `package_0`, Module: `module_3`}}, + {Package: &claircore.Package{ID: "pkg0", Name: `package_0`, Module: `module_0`}}, + }, + Constraints: []driver.MatchConstraint{driver.PackageModule}, + Check: func(t *testing.T, res map[string][]*claircore.Vulnerability, err error) { + if err != nil { + t.Fatal(err) + } + for _, id := range []string{"pkg0", "pkg1", "pkg2", "pkg3"} { + got, want := len(res[id]), 1 + t.Logf("%v: got: %d results, want: %d results", id, got, want) + if got != want { + t.Fail() + } + } + }, + }, + { + Name: "MatchDistributionID", + IndexRecord: []*claircore.IndexRecord{ + { + Package: &claircore.Package{ID: "0", Name: `package_0`}, + Distribution: &claircore.Distribution{DID: "distribution_1"}, + }, + { + Package: &claircore.Package{ID: "0", Name: `package_0`}, + Distribution: &claircore.Distribution{DID: "distribution_2"}, + }, + }, + Constraints: []driver.MatchConstraint{driver.DistributionDID}, + Check: func(t *testing.T, res map[string][]*claircore.Vulnerability, err error) { + if err != nil { + t.Fatal(err) + } + got, want := len(res["0"]), 2 + t.Logf("got: %d results, want: %d results", got, want) + if got != want { + t.Fail() + } + }, + }, + { + Name: "MatchDistributionName", + IndexRecord: []*claircore.IndexRecord{ + { + Package: &claircore.Package{ID: "0", Name: `package_0`}, + Distribution: &claircore.Distribution{Name: "Test"}, + }, + }, + Constraints: []driver.MatchConstraint{driver.DistributionName}, + Check: func(t *testing.T, res map[string][]*claircore.Vulnerability, err error) { + if err != nil { + t.Fatal(err) + } + got, want := len(res["0"]), 7 + t.Logf("got: %d results, want: %d results", got, want) + if got != want { + t.Fail() + } + }, + }, + { + Name: "MatchDistributionVersion", + IndexRecord: []*claircore.IndexRecord{ + { + Package: &claircore.Package{ID: "0", Name: `package_0`}, + Distribution: &claircore.Distribution{Version: "1"}, + }, + { + Package: &claircore.Package{ID: "0", Name: `package_0`}, + Distribution: &claircore.Distribution{Version: "2"}, + }, + { + Package: &claircore.Package{ID: "0", Name: `package_0`}, + Distribution: &claircore.Distribution{Version: "4"}, + }, + { + Package: &claircore.Package{ID: "0", Name: `package_0`}, + Distribution: &claircore.Distribution{Version: "5"}, + }, + { + Package: &claircore.Package{ID: "0", Name: `package_0`}, + Distribution: &claircore.Distribution{Version: "6"}, + }, + { + Package: &claircore.Package{ID: "0", Name: `package_0`}, + Distribution: &claircore.Distribution{Version: "8"}, + }, + { + Package: &claircore.Package{ID: "0", Name: `package_0`}, + Distribution: &claircore.Distribution{Version: "9"}, + }, + }, + Constraints: []driver.MatchConstraint{driver.DistributionVersion}, + Check: func(t *testing.T, res map[string][]*claircore.Vulnerability, err error) { + if err != nil { + t.Fatal(err) + } + got, want := len(res["0"]), 7 + t.Logf("got: %d results, want: %d results", got, want) + if got != want { + t.Fail() + } + }, + }, + { + Name: "MatchDistributionVersionID", + IndexRecord: []*claircore.IndexRecord{ + { + Package: &claircore.Package{ID: "0", Name: `package_0`}, + Distribution: &claircore.Distribution{VersionID: "1"}, + }, + { + Package: &claircore.Package{ID: "0", Name: `package_0`}, + Distribution: &claircore.Distribution{VersionID: "2"}, + }, + { + Package: &claircore.Package{ID: "0", Name: `package_0`}, + Distribution: &claircore.Distribution{VersionID: "4"}, + }, + { + Package: &claircore.Package{ID: "0", Name: `package_0`}, + Distribution: &claircore.Distribution{VersionID: "5"}, + }, + { + Package: &claircore.Package{ID: "0", Name: `package_0`}, + Distribution: &claircore.Distribution{VersionID: "6"}, + }, + { + Package: &claircore.Package{ID: "0", Name: `package_0`}, + Distribution: &claircore.Distribution{VersionID: "8"}, + }, + { + Package: &claircore.Package{ID: "0", Name: `package_0`}, + Distribution: &claircore.Distribution{VersionID: "9"}, + }, + }, + Constraints: []driver.MatchConstraint{driver.DistributionVersionID}, + Check: func(t *testing.T, res map[string][]*claircore.Vulnerability, err error) { + if err != nil { + t.Fatal(err) + } + got, want := len(res["0"]), 7 + t.Logf("got: %d results, want: %d results", got, want) + if got != want { + t.Fail() + } + }, + }, + { + Name: "MatchDistributionVersionCodeName", + IndexRecord: []*claircore.IndexRecord{ + { + Package: &claircore.Package{ID: "0", Name: `package_0`}, + Distribution: &claircore.Distribution{VersionCodeName: "Chicago"}, + }, + }, + Constraints: []driver.MatchConstraint{driver.DistributionVersionCodeName}, + Check: func(t *testing.T, res map[string][]*claircore.Vulnerability, err error) { + if err != nil { + t.Fatal(err) + } + got, want := len(res["0"]), 7 + t.Logf("got: %d results, want: %d results", got, want) + if got != want { + t.Fail() + } + }, + }, + { + Name: "MatchDistributionArch", + IndexRecord: []*claircore.IndexRecord{ + { + Package: &claircore.Package{ID: "0", Name: `package_0`}, + Distribution: &claircore.Distribution{Arch: "aarch64"}, + }, + }, + Constraints: []driver.MatchConstraint{driver.DistributionArch}, + Check: func(t *testing.T, res map[string][]*claircore.Vulnerability, err error) { + if err != nil { + t.Fatal(err) + } + got, want := len(res["0"]), 2 + t.Logf("got: %d results, want: %d results", got, want) + if got != want { + t.Fail() + } + }, + }, + { + Name: "MatchDistributionCPE", + IndexRecord: []*claircore.IndexRecord{ + { + Package: &claircore.Package{ID: "0", Name: `package_0`}, + Distribution: &claircore.Distribution{CPE: cpe.MustUnbind("cpe:2.3" + strings.Repeat(":*", 11))}, + }, + }, + Constraints: []driver.MatchConstraint{driver.DistributionCPE}, + Check: func(t *testing.T, res map[string][]*claircore.Vulnerability, err error) { + if err != nil { + t.Fatal(err) + } + got, want := len(res["0"]), 7 + t.Logf("got: %d results, want: %d results", got, want) + if got != want { + t.Fail() + } + }, + }, + { + Name: "MatchDistributionPrettyName", + IndexRecord: []*claircore.IndexRecord{ + { + Package: &claircore.Package{ID: "0", Name: `package_0`}, + Distribution: &claircore.Distribution{PrettyName: "Test 1"}, + }, + }, + Constraints: []driver.MatchConstraint{driver.DistributionPrettyName}, + Check: func(t *testing.T, res map[string][]*claircore.Vulnerability, err error) { + if err != nil { + t.Fatal(err) + } + got, want := len(res["0"]), 1 + t.Logf("got: %d results, want: %d results", got, want) + if got != want { + t.Fail() + } + }, + }, + { + Name: "MatchRepositoryName", + IndexRecord: []*claircore.IndexRecord{ + { + Package: &claircore.Package{ID: "0", Name: `package_0`}, + Repository: &claircore.Repository{Name: "repository_1"}, + }, + { + Package: &claircore.Package{ID: "0", Name: `package_0`}, + Repository: &claircore.Repository{Name: "repository_4"}, + }, + }, + Constraints: []driver.MatchConstraint{driver.RepositoryName}, + Check: func(t *testing.T, res map[string][]*claircore.Vulnerability, err error) { + if err != nil { + t.Fatal(err) + } + got, want := len(res["0"]), 2 + t.Logf("got: %d results, want: %d results", got, want) + if got != want { + t.Fail() + } + }, + }, + } + for _, tc := range tt { + t.Run(tc.Name, tc.Run(ctx, store)) + } +} + +type getTestcase struct { + Name string + IndexRecord []*claircore.IndexRecord + Constraints []driver.MatchConstraint + Check func(*testing.T, map[string][]*claircore.Vulnerability, error) +} + +func (tc getTestcase) Run(ctx context.Context, store *MatcherV1) func(*testing.T) { + opts := datastore.MatcherV1VulnerabilityGetOpts{ + Matchers: tc.Constraints, + } + return func(t *testing.T) { + t.Helper() + ctx := zlog.Test(ctx, t) + res, err := store.Get(ctx, tc.IndexRecord, opts) + tc.Check(t, res, err) + if t.Failed() { + t.Logf("result: %s", cmp.Diff(nil, res)) + t.Logf("error: %v", err) + } + } +} + +type latestTestCase struct { + Vulnerable int + Ops [][]*claircore.Vulnerability + Records []*claircore.IndexRecord +} + +// TestLatestVulns checks that only the latest update operations are considered +// when querying for vulnerabilities. +func TestLatestVulns(t *testing.T) { + integration.NeedDB(t) + ctx := zlog.Test(context.Background(), t) + + cases := []latestTestCase{ + { + Vulnerable: 2, + Ops: [][]*claircore.Vulnerability{ + { + { + Updater: "test-updater", + Package: &claircore.Package{ + Name: "vi", + Version: "v2.0.0", + Kind: "binary", + }, + }, + }, + { + { + Updater: "test-updater2", + Package: &claircore.Package{ + Name: "vi", + Version: "v3.0.0", + Kind: "binary", + }, + }, + { + Updater: "test-updater2", + Package: &claircore.Package{ + Name: "vi", + Version: "v3.1.0", + Kind: "binary", + }, + }, + }, + }, + Records: []*claircore.IndexRecord{ + { + Package: &claircore.Package{ + ID: "1", + Name: "vi", + Source: &claircore.Package{ + Name: "vi", + Version: "v1.0.0", + }, + }, + }, + }, + }, + } + + cfg := pgtest.TestMatcherDB(ctx, t) + store, err := NewMatcherV1(ctx, cfg, WithMigrations) + if err != nil { + t.Fatal(err) + } + defer store.Close() + + for _, tc := range cases { + for _, op := range tc.Ops { + _, err := store.UpdateVulnerabilities(ctx, updater, driver.Fingerprint(uuid.New().String()), op) + if err != nil { + t.Fatalf("failed to perform update for first op: %v", err) + } + } + + res, err := store.Get(ctx, tc.Records, datastore.MatcherV1VulnerabilityGetOpts{}) + if err != nil { + t.Fatalf("failed to get vulnerabilities: %v", err) + } + vulns := []*claircore.Vulnerability{} + for _, vs := range res { + vulns = append(vulns, vs...) + } + if len(vulns) != tc.Vulnerable { + t.Fatalf("wrong number of vulns, got %d want %d", len(vulns), tc.Vulnerable) + } + } +} diff --git a/datastore/postgres/v2/queries.go b/datastore/postgres/v2/queries.go index c4615a3db..d7fed16e8 100644 --- a/datastore/postgres/v2/queries.go +++ b/datastore/postgres/v2/queries.go @@ -11,6 +11,7 @@ import ( "go.opentelemetry.io/otel/trace" ) +//go:embed queries var queries embed.FS //go:generate go run github.com/quay/claircore/internal/cmd/querymetadata diff --git a/datastore/postgres/v2/queries/matcher/gc_delete_ops.sql b/datastore/postgres/v2/queries/matcher/gc_delete_ops.sql new file mode 100644 index 000000000..2f52813ca --- /dev/null +++ b/datastore/postgres/v2/queries/matcher/gc_delete_ops.sql @@ -0,0 +1,2 @@ +DELETE FROM update_operation +WHERE ref = ANY ($1::uuid[]); diff --git a/datastore/postgres/v2/queries/matcher/gc_distinct.sql b/datastore/postgres/v2/queries/matcher/gc_distinct.sql new file mode 100644 index 000000000..46c18832e --- /dev/null +++ b/datastore/postgres/v2/queries/matcher/gc_distinct.sql @@ -0,0 +1 @@ +SELECT DISTINCT(updater) FROM update_operation; diff --git a/datastore/postgres/v2/queries/matcher/gc_eligible.sql b/datastore/postgres/v2/queries/matcher/gc_eligible.sql new file mode 100644 index 000000000..183b697be --- /dev/null +++ b/datastore/postgres/v2/queries/matcher/gc_eligible.sql @@ -0,0 +1,14 @@ +WITH ordered_ops AS ( + SELECT + array_agg(ref ORDER BY date DESC) AS refs + FROM + update_operation + GROUP BY + updater +) +SELECT + ordered_ops.refs[$1:] +FROM + ordered_ops +WHERE + array_length(ordered_ops.refs, 1) > $2; diff --git a/datastore/postgres/v2/queries/matcher/gc_orphaned.sql b/datastore/postgres/v2/queries/matcher/gc_orphaned.sql new file mode 100644 index 000000000..c385550ed --- /dev/null +++ b/datastore/postgres/v2/queries/matcher/gc_orphaned.sql @@ -0,0 +1,5 @@ +DELETE FROM vuln v1 USING vuln v2 + LEFT JOIN uo_vuln uvl ON v2.id = uvl.vuln + WHERE uvl.vuln IS NULL + AND v2.updater = $1 + AND v1.id = v2.id; diff --git a/datastore/postgres/v2/queries/matcher/get_get.sql b/datastore/postgres/v2/queries/matcher/get_get.sql new file mode 100644 index 000000000..c2b0d18bb --- /dev/null +++ b/datastore/postgres/v2/queries/matcher/get_get.sql @@ -0,0 +1,62 @@ +SELECT + vuln.id, + name, + description, + issued, + links, + severity, + normalized_severity, + package_name, + package_version, + package_module, + package_arch, + package_kind, + dist_id, + dist_name, + dist_version, + dist_version_code_name, + dist_version_id, + dist_arch, + dist_cpe, + dist_pretty_name, + arch_operation, + repo_name, + repo_key, + repo_uri, + fixed_in_version, + vuln.updater, + vulnerable_range, + version_kind +FROM + vuln + INNER JOIN uo_vuln ON (vuln.id = uo_vuln.vuln) + INNER JOIN latest_update_operations ON (latest_update_operations.id = uo_vuln.uo) +WHERE ((("package_name" = $1::text) + AND ("package_kind" = 'binary')) + OR ($2::text IS NOT NULL + AND ("package_name" = $2::text) + AND ("package_kind" = 'source'))) +AND ($3::text IS NULL + OR "package_module" = $3::text) +AND ($4::text IS NULL + OR "dist_id" = $4::text) +AND ($5::text IS NULL + OR "dist_name" = $5::text) +AND ($6::text IS NULL + OR "dist_version_id" = $6::text) +AND ($7::text IS NULL + OR "dist_version" = $7::text) +AND ($8::text IS NULL + OR "dist_version_code_name" = $8::text) +AND ($9::text IS NULL + OR "dist_pretty_name" = $9::text) +AND ($10::text IS NULL + OR "dist_cpe" = $10::text) +AND ($11::text IS NULL + OR "dist_arch" = $11::text) +AND ($12::text IS NULL + OR "repo_name" = $12::text) +AND ($13::text IS NULL + OR ("version_kind" = $13::text + AND "vulnerable_range" @> $14::int[])) +AND ("latest_update_operations"."kind" = 'vulnerability'); diff --git a/datastore/postgres/v2/queries/matcher/get_latestupdates.sql b/datastore/postgres/v2/queries/matcher/get_latestupdates.sql new file mode 100644 index 000000000..4f7e13644 --- /dev/null +++ b/datastore/postgres/v2/queries/matcher/get_latestupdates.sql @@ -0,0 +1,9 @@ +SELECT DISTINCT ON (updater) + id +FROM + update_operation +WHERE + kind = 'vulnerability' +ORDER BY + updater, + id DESC; diff --git a/datastore/postgres/v2/queries/matcher/getenrichment_get.sql b/datastore/postgres/v2/queries/matcher/getenrichment_get.sql new file mode 100644 index 000000000..49b9680b0 --- /dev/null +++ b/datastore/postgres/v2/queries/matcher/getenrichment_get.sql @@ -0,0 +1,21 @@ +WITH latest AS ( + SELECT + id + FROM + latest_update_operations + WHERE + updater = $1 + AND kind = 'enrichment' + LIMIT 1 +) +SELECT + e.tags, + e.data +FROM + enrichment AS e, + uo_enrich AS uo, + latest +WHERE + uo.uo = latest.id + AND uo.enrich = e.id + AND e.tags && $2::text[]; diff --git a/datastore/postgres/v2/queries/matcher/getlatestupdateref_any.sql b/datastore/postgres/v2/queries/matcher/getlatestupdateref_any.sql new file mode 100644 index 000000000..097d1d357 --- /dev/null +++ b/datastore/postgres/v2/queries/matcher/getlatestupdateref_any.sql @@ -0,0 +1,7 @@ +SELECT + ref +FROM + update_operation +ORDER BY + id USING > +LIMIT 1; diff --git a/datastore/postgres/v2/queries/matcher/getlatestupdateref_enrichment.sql b/datastore/postgres/v2/queries/matcher/getlatestupdateref_enrichment.sql new file mode 100644 index 000000000..75cba245c --- /dev/null +++ b/datastore/postgres/v2/queries/matcher/getlatestupdateref_enrichment.sql @@ -0,0 +1,9 @@ +SELECT + ref +FROM + update_operation +WHERE + kind = 'enrichment' +ORDER BY + id USING > +LIMIT 1; diff --git a/datastore/postgres/v2/queries/matcher/getlatestupdateref_vulnerability.sql b/datastore/postgres/v2/queries/matcher/getlatestupdateref_vulnerability.sql new file mode 100644 index 000000000..8fcb20648 --- /dev/null +++ b/datastore/postgres/v2/queries/matcher/getlatestupdateref_vulnerability.sql @@ -0,0 +1,9 @@ +SELECT + ref +FROM + update_operation +WHERE + kind = 'vulnerability' +ORDER BY + id USING > +LIMIT 1; diff --git a/datastore/postgres/v2/queries/matcher/getlatestupdaterefs_any.sql b/datastore/postgres/v2/queries/matcher/getlatestupdaterefs_any.sql new file mode 100644 index 000000000..6e1d654b3 --- /dev/null +++ b/datastore/postgres/v2/queries/matcher/getlatestupdaterefs_any.sql @@ -0,0 +1,10 @@ +SELECT DISTINCT ON (updater) + updater, + ref, + fingerprint, + date +FROM + update_operation +ORDER BY + updater, + id USING >; diff --git a/datastore/postgres/v2/queries/matcher/getlatestupdaterefs_enrichment.sql b/datastore/postgres/v2/queries/matcher/getlatestupdaterefs_enrichment.sql new file mode 100644 index 000000000..30de61e8e --- /dev/null +++ b/datastore/postgres/v2/queries/matcher/getlatestupdaterefs_enrichment.sql @@ -0,0 +1,12 @@ +SELECT DISTINCT ON (updater) + updater, + ref, + fingerprint, + date +FROM + update_operation +WHERE + kind = 'enrichment' +ORDER BY + updater, + id USING >; diff --git a/datastore/postgres/v2/queries/matcher/getlatestupdaterefs_vulnerability.sql b/datastore/postgres/v2/queries/matcher/getlatestupdaterefs_vulnerability.sql new file mode 100644 index 000000000..dde669262 --- /dev/null +++ b/datastore/postgres/v2/queries/matcher/getlatestupdaterefs_vulnerability.sql @@ -0,0 +1,12 @@ +SELECT DISTINCT ON (updater) + updater, + ref, + fingerprint, + date +FROM + update_operation +WHERE + kind = 'vulnerability' +ORDER BY + updater, + id USING >; diff --git a/datastore/postgres/v2/queries/matcher/getupdatediff_confirm.sql b/datastore/postgres/v2/queries/matcher/getupdatediff_confirm.sql new file mode 100644 index 000000000..cf1fe0a9b --- /dev/null +++ b/datastore/postgres/v2/queries/matcher/getupdatediff_confirm.sql @@ -0,0 +1,2 @@ +SELECT 1 WHERE ROW ('vulnerability') = ALL (SELECT kind FROM update_operation + WHERE ref = $1 OR ref = $2); diff --git a/datastore/postgres/v2/queries/matcher/getupdatediff_load.sql b/datastore/postgres/v2/queries/matcher/getupdatediff_load.sql new file mode 100644 index 000000000..749a5a603 --- /dev/null +++ b/datastore/postgres/v2/queries/matcher/getupdatediff_load.sql @@ -0,0 +1,70 @@ +WITH lhs AS ( + SELECT + id, + updater + FROM + update_operation + WHERE + ref = $1 +), +rhs AS ( + SELECT + id, + updater + FROM + update_operation + WHERE + ref = $2 +) +SELECT + id, + name, + updater, + description, + issued, + links, + severity, + normalized_severity, + package_name, + package_version, + package_module, + package_arch, + package_kind, + dist_id, + dist_name, + dist_version, + dist_version_code_name, + dist_version_id, + dist_arch, + dist_cpe, + dist_pretty_name, + arch_operation, + repo_name, + repo_key, + repo_uri, + fixed_in_version +FROM + vuln +WHERE + vuln.id IN ( + SELECT + vuln AS id + FROM + uo_vuln + JOIN lhs ON (uo_vuln.uo = lhs.id) + EXCEPT ALL + SELECT + vuln AS id + FROM + uo_vuln + JOIN rhs ON (uo_vuln.uo = rhs.id)) + AND (vuln.updater = ( + SELECT + updater + FROM + rhs) + OR vuln.updater = ( + SELECT + updater + FROM + lhs)); diff --git a/datastore/postgres/v2/queries/matcher/getupdatediff_populaterefs.sql b/datastore/postgres/v2/queries/matcher/getupdatediff_populaterefs.sql new file mode 100644 index 000000000..d13f001df --- /dev/null +++ b/datastore/postgres/v2/queries/matcher/getupdatediff_populaterefs.sql @@ -0,0 +1 @@ +SELECT updater, fingerprint, date FROM update_operation WHERE ref = $1; diff --git a/datastore/postgres/v2/queries/matcher/getupdateoperations_any.sql b/datastore/postgres/v2/queries/matcher/getupdateoperations_any.sql new file mode 100644 index 000000000..5d37f1079 --- /dev/null +++ b/datastore/postgres/v2/queries/matcher/getupdateoperations_any.sql @@ -0,0 +1,11 @@ +SELECT + ref, + updater, + fingerprint, + date +FROM + update_operation +WHERE + updater = ANY ($1) +ORDER BY + id DESC; diff --git a/datastore/postgres/v2/queries/matcher/getupdateoperations_enrichment.sql b/datastore/postgres/v2/queries/matcher/getupdateoperations_enrichment.sql new file mode 100644 index 000000000..92f485f72 --- /dev/null +++ b/datastore/postgres/v2/queries/matcher/getupdateoperations_enrichment.sql @@ -0,0 +1,12 @@ +SELECT + ref, + updater, + fingerprint, + date +FROM + update_operation +WHERE + updater = ANY ($1) + AND kind = 'enrichment' +ORDER BY + id DESC; diff --git a/datastore/postgres/v2/queries/matcher/getupdateoperations_getupdaters.sql b/datastore/postgres/v2/queries/matcher/getupdateoperations_getupdaters.sql new file mode 100644 index 000000000..cf649bb00 --- /dev/null +++ b/datastore/postgres/v2/queries/matcher/getupdateoperations_getupdaters.sql @@ -0,0 +1,4 @@ +SELECT DISTINCT + (updater) +FROM + update_operation; diff --git a/datastore/postgres/v2/queries/matcher/getupdateoperations_vulnerability.sql b/datastore/postgres/v2/queries/matcher/getupdateoperations_vulnerability.sql new file mode 100644 index 000000000..90402866f --- /dev/null +++ b/datastore/postgres/v2/queries/matcher/getupdateoperations_vulnerability.sql @@ -0,0 +1,12 @@ +SELECT + ref, + updater, + fingerprint, + date +FROM + update_operation +WHERE + updater = ANY ($1) + AND kind = 'vulnerability' +ORDER BY + id DESC; diff --git a/datastore/postgres/v2/queries/matcher/initialized_initialized.sql b/datastore/postgres/v2/queries/matcher/initialized_initialized.sql new file mode 100644 index 000000000..d94ee0500 --- /dev/null +++ b/datastore/postgres/v2/queries/matcher/initialized_initialized.sql @@ -0,0 +1,7 @@ +SELECT + EXISTS ( + SELECT + 1 + FROM + vuln + LIMIT 1); diff --git a/datastore/postgres/v2/queries/matcher/recordupdatersetstatus_update.sql b/datastore/postgres/v2/queries/matcher/recordupdatersetstatus_update.sql new file mode 100644 index 000000000..0d6d72bc3 --- /dev/null +++ b/datastore/postgres/v2/queries/matcher/recordupdatersetstatus_update.sql @@ -0,0 +1,8 @@ +UPDATE + updater_status +SET + last_attempt = $1, + last_success = $1, + last_run_succeeded = 'true' +WHERE + updater_name LIKE $2 || '%'; diff --git a/datastore/postgres/v2/queries/matcher/recordupdaterstatus_failure.sql b/datastore/postgres/v2/queries/matcher/recordupdaterstatus_failure.sql new file mode 100644 index 000000000..68b71789c --- /dev/null +++ b/datastore/postgres/v2/queries/matcher/recordupdaterstatus_failure.sql @@ -0,0 +1,7 @@ +INSERT INTO updater_status (updater_name, last_attempt, last_run_succeeded, last_attempt_fingerprint, last_error) + VALUES ($1, $2, 'false', $3, $4) +ON CONFLICT (updater_name) + DO UPDATE SET + last_attempt = $2, last_run_succeeded = 'false', last_attempt_fingerprint = $3, last_error = $4 + RETURNING + updater_name; diff --git a/datastore/postgres/v2/queries/matcher/recordupdaterstatus_success.sql b/datastore/postgres/v2/queries/matcher/recordupdaterstatus_success.sql new file mode 100644 index 000000000..a3c3551ef --- /dev/null +++ b/datastore/postgres/v2/queries/matcher/recordupdaterstatus_success.sql @@ -0,0 +1,7 @@ +INSERT INTO updater_status (updater_name, last_attempt, last_success, last_run_succeeded, last_attempt_fingerprint) + VALUES ($1, $2, $2, 'true', $3) +ON CONFLICT (updater_name) + DO UPDATE SET + last_attempt = $2, last_success = $2, last_run_succeeded = 'true', last_attempt_fingerprint = $3 + RETURNING + updater_name; diff --git a/datastore/postgres/v2/queries/matcher/updateenrichment_assoc.sql b/datastore/postgres/v2/queries/matcher/updateenrichment_assoc.sql new file mode 100644 index 000000000..95a18c06b --- /dev/null +++ b/datastore/postgres/v2/queries/matcher/updateenrichment_assoc.sql @@ -0,0 +1,22 @@ +INSERT +INTO + uo_enrich (enrich, updater, uo, date) +VALUES + ( + ( + SELECT + id + FROM + enrichment + WHERE + hash_kind = $1 + AND hash = $2 + AND updater = $3 + ), + $3, + $4, + transaction_timestamp() + ) +ON CONFLICT +DO + NOTHING; diff --git a/datastore/postgres/v2/queries/matcher/updateenrichment_create.sql b/datastore/postgres/v2/queries/matcher/updateenrichment_create.sql new file mode 100644 index 000000000..dabd2b621 --- /dev/null +++ b/datastore/postgres/v2/queries/matcher/updateenrichment_create.sql @@ -0,0 +1,7 @@ +INSERT +INTO + update_operation (updater, fingerprint, kind) +VALUES + ($1, $2, 'enrichment') +RETURNING + id, ref; diff --git a/datastore/postgres/v2/queries/matcher/updateenrichment_insert.sql b/datastore/postgres/v2/queries/matcher/updateenrichment_insert.sql new file mode 100644 index 000000000..5e229035e --- /dev/null +++ b/datastore/postgres/v2/queries/matcher/updateenrichment_insert.sql @@ -0,0 +1,9 @@ +INSERT +INTO + enrichment (hash_kind, hash, updater, tags, data) +VALUES + ($1, $2, $3, $4, $5) +ON CONFLICT + (hash_kind, hash) +DO + NOTHING; diff --git a/datastore/postgres/v2/queries/matcher/updateenrichment_refresh.sql b/datastore/postgres/v2/queries/matcher/updateenrichment_refresh.sql new file mode 100644 index 000000000..5aa954d2f --- /dev/null +++ b/datastore/postgres/v2/queries/matcher/updateenrichment_refresh.sql @@ -0,0 +1 @@ +REFRESH MATERIALIZED VIEW latest_update_operations; diff --git a/datastore/postgres/v2/queries/matcher/updatevulnerabilities_associate.sql b/datastore/postgres/v2/queries/matcher/updatevulnerabilities_associate.sql new file mode 100644 index 000000000..b9c1500f6 --- /dev/null +++ b/datastore/postgres/v2/queries/matcher/updatevulnerabilities_associate.sql @@ -0,0 +1,11 @@ +INSERT INTO uo_vuln (uo, vuln) + VALUES ($3, ( + SELECT + id + FROM + vuln + WHERE + hash_kind = $1 + AND hash = $2)) +ON CONFLICT + DO NOTHING; diff --git a/datastore/postgres/v2/queries/matcher/updatevulnerabilities_create.sql b/datastore/postgres/v2/queries/matcher/updatevulnerabilities_create.sql new file mode 100644 index 000000000..28e514747 --- /dev/null +++ b/datastore/postgres/v2/queries/matcher/updatevulnerabilities_create.sql @@ -0,0 +1,4 @@ +INSERT INTO update_operation (updater, fingerprint, kind) + VALUES ($1, $2, 'vulnerability') +RETURNING + id, ref; diff --git a/datastore/postgres/v2/queries/matcher/updatevulnerabilities_insert.sql b/datastore/postgres/v2/queries/matcher/updatevulnerabilities_insert.sql new file mode 100644 index 000000000..6943fbce2 --- /dev/null +++ b/datastore/postgres/v2/queries/matcher/updatevulnerabilities_insert.sql @@ -0,0 +1,4 @@ +INSERT INTO vuln (hash_kind, hash, name, updater, description, issued, links, severity, normalized_severity, package_name, package_version, package_module, package_arch, package_kind, dist_id, dist_name, dist_version, dist_version_code_name, dist_version_id, dist_arch, dist_cpe, dist_pretty_name, repo_name, repo_key, repo_uri, fixed_in_version, arch_operation, version_kind, vulnerable_range) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22, $23, $24, $25, $26, $27, $28, VersionRange ($29, $30)) +ON CONFLICT (hash_kind, hash) + DO NOTHING; diff --git a/datastore/postgres/v2/queries/matcher/updatevulnerabilities_refresh.sql b/datastore/postgres/v2/queries/matcher/updatevulnerabilities_refresh.sql new file mode 100644 index 000000000..d10c792f9 --- /dev/null +++ b/datastore/postgres/v2/queries/matcher/updatevulnerabilities_refresh.sql @@ -0,0 +1 @@ +REFRESH MATERIALIZED VIEW CONCURRENTLY latest_update_operations; diff --git a/datastore/postgres/v2/queries_test.go b/datastore/postgres/v2/queries_test.go index 3cdfe8673..82bb13aee 100644 --- a/datastore/postgres/v2/queries_test.go +++ b/datastore/postgres/v2/queries_test.go @@ -153,6 +153,24 @@ var ( "0.Plan.0.Plans.1": "usage validated to be over a trivial number of rows", }, */ + "matcher/initialized_initialized.sql": { + "0.Plan.Plans.0": "only reads a single row", + }, + "matcher/gc_distinct.sql": { + "0.Plan.Plans.0": "TODO", + }, + "matcher/getlatestupdaterefs_any.sql": { + "0.Plan.Plans.0.Plans.0": "TODO", + }, + "matcher/getupdateoperations_getupdaters.sql": { + "0.Plan.Plans.0": "TODO", + }, + "matcher/getupdatediff_load.sql": { + "0.Plan.Plans.4": "TODO", + }, + "matcher/recordupdatersetstatus_update.sql": { + "0.Plan.Plans.0": "TODO", + }, } ) diff --git a/datastore/postgres/v2/query_metadata.go b/datastore/postgres/v2/query_metadata.go index cd678828b..1fa3a03c3 100644 --- a/datastore/postgres/v2/query_metadata.go +++ b/datastore/postgres/v2/query_metadata.go @@ -6,6 +6,72 @@ var queryMetadata = struct { Table map[string]string Op map[string]string }{ - Table: map[string]string{}, - Op: map[string]string{}, + Table: map[string]string{ + "matcher/gc_delete_ops.sql": "update_operation", + "matcher/gc_distinct.sql": "update_operation", + "matcher/gc_eligible.sql": "ordered_ops", + "matcher/gc_orphaned.sql": "vuln", + "matcher/get_get.sql": "latest_update_operations", + "matcher/get_latestupdates.sql": "update_operation", + "matcher/getenrichment_get.sql": "enrichment", + "matcher/getlatestupdateref_any.sql": "update_operation", + "matcher/getlatestupdateref_enrichment.sql": "update_operation", + "matcher/getlatestupdateref_vulnerability.sql": "update_operation", + "matcher/getlatestupdaterefs_any.sql": "update_operation", + "matcher/getlatestupdaterefs_enrichment.sql": "update_operation", + "matcher/getlatestupdaterefs_vulnerability.sql": "update_operation", + "matcher/getupdatediff_confirm.sql": "update_operation", + "matcher/getupdatediff_load.sql": "vuln", + "matcher/getupdatediff_populaterefs.sql": "update_operation", + "matcher/getupdateoperations_any.sql": "update_operation", + "matcher/getupdateoperations_enrichment.sql": "update_operation", + "matcher/getupdateoperations_getupdaters.sql": "update_operation", + "matcher/getupdateoperations_vulnerability.sql": "update_operation", + "matcher/initialized_initialized.sql": "vuln", + "matcher/recordupdatersetstatus_update.sql": "updater_status", + "matcher/recordupdaterstatus_failure.sql": "updater_status", + "matcher/recordupdaterstatus_success.sql": "updater_status", + "matcher/updateenrichment_assoc.sql": "uo_enrich", + "matcher/updateenrichment_create.sql": "update_operation", + "matcher/updateenrichment_insert.sql": "enrichment", + "matcher/updateenrichment_refresh.sql": "latest_update_operations", + "matcher/updatevulnerabilities_associate.sql": "uo_vuln", + "matcher/updatevulnerabilities_create.sql": "update_operation", + "matcher/updatevulnerabilities_insert.sql": "vuln", + "matcher/updatevulnerabilities_refresh.sql": "latest_update_operations", + }, + Op: map[string]string{ + "matcher/gc_delete_ops.sql": "DELETE", + "matcher/gc_distinct.sql": "SELECT", + "matcher/gc_eligible.sql": "SELECT", + "matcher/gc_orphaned.sql": "DELETE", + "matcher/get_get.sql": "SELECT", + "matcher/get_latestupdates.sql": "SELECT", + "matcher/getenrichment_get.sql": "SELECT", + "matcher/getlatestupdateref_any.sql": "SELECT", + "matcher/getlatestupdateref_enrichment.sql": "SELECT", + "matcher/getlatestupdateref_vulnerability.sql": "SELECT", + "matcher/getlatestupdaterefs_any.sql": "SELECT", + "matcher/getlatestupdaterefs_enrichment.sql": "SELECT", + "matcher/getlatestupdaterefs_vulnerability.sql": "SELECT", + "matcher/getupdatediff_confirm.sql": "SELECT", + "matcher/getupdatediff_load.sql": "SELECT", + "matcher/getupdatediff_populaterefs.sql": "SELECT", + "matcher/getupdateoperations_any.sql": "SELECT", + "matcher/getupdateoperations_enrichment.sql": "SELECT", + "matcher/getupdateoperations_getupdaters.sql": "SELECT", + "matcher/getupdateoperations_vulnerability.sql": "SELECT", + "matcher/initialized_initialized.sql": "SELECT", + "matcher/recordupdatersetstatus_update.sql": "UPDATE", + "matcher/recordupdaterstatus_failure.sql": "INSERT", + "matcher/recordupdaterstatus_success.sql": "INSERT", + "matcher/updateenrichment_assoc.sql": "INSERT", + "matcher/updateenrichment_create.sql": "INSERT", + "matcher/updateenrichment_insert.sql": "INSERT", + "matcher/updateenrichment_refresh.sql": "REFRESH MATERIALIZED VIEW", + "matcher/updatevulnerabilities_associate.sql": "INSERT", + "matcher/updatevulnerabilities_create.sql": "INSERT", + "matcher/updatevulnerabilities_insert.sql": "INSERT", + "matcher/updatevulnerabilities_refresh.sql": "REFRESH MATERIALIZED VIEW", + }, } diff --git a/datastore/postgres/v2/testdata/matcher_helpers.psql b/datastore/postgres/v2/testdata/matcher_helpers.psql new file mode 100644 index 000000000..59e295213 --- /dev/null +++ b/datastore/postgres/v2/testdata/matcher_helpers.psql @@ -0,0 +1,76 @@ +CREATE EXTENSION IF NOT EXISTS pgcrypto; + +CREATE FUNCTION GetTestSetup (ct integer, name text, ref uuid) + RETURNS void + AS $$ +DECLARE + op_id bigint; +BEGIN + INSERT INTO update_operation (updater, ref, fingerprint, kind) + VALUES (name, ref, '', 'vulnerability') + RETURNING + update_operation.id INTO op_id; + FOR n IN 0..ct-1 LOOP + DECLARE r vuln%ROWTYPE; + BEGIN + -- "Vuln.id" is NOT NULL, and passing the whole row means we don't get the default. + r.id = nextval('vuln_id_seq'); + r.hash_kind = 'sha256'; + r.hash = digest(name || n, 'sha256'); + r.vulnerable_range = 'empty'; + r.fixed_in_version = '5.5.5'; + r.severity = '???'; + r.normalized_severity = 'Unknown'; + -- NOT NULL constraint. + r.name = 'TEST-' || n || ' (' || name || ')'; + r.updater = name; + -- No way to do fallthrough in a case, so big batch of `IF` it is. + -- Unconditional parts: + r.package_name = 'package_' || n/10; + r.package_version = (random() * 10)::integer || '.' || (random() * 10)::integer || '.' || (random() * 10)::integer; + IF n+1 % 10 > 6 THEN + r.package_module = 'module_' || (n % 10) - 6; + END IF; + IF n % 10 < 3 THEN + r.package_arch = 'aarch64'; + ELSE + r.package_arch = 'x86_64'; + END IF; + if n % 10 = 0 THEN + r.package_kind = 'source'; + ELSE + r.package_kind = 'binary'; + END IF; + -- Add a Distribution: + IF n % 4 < 3 THEN + r.dist_id = 'distribution_'||n%10; + r.dist_name = 'Test'; + r.dist_version = n%10; + r.dist_version_code_name = 'Chicago'; + r.dist_version_id = n%10; + r.dist_arch = r.package_arch; + r.dist_cpe = 'cpe:2.3'||repeat(':*', 11); + r.dist_pretty_name = 'Test '|| n%10; + END IF; + -- Add a Repository: + IF n % 4 < 2 THEN + r.repo_name = 'repository_'||n%10; + r.repo_key = ''; + r.repo_uri = 'TODO'; + END IF; + -- Add a VersionRange: + IF n % 4 < 1 THEN + r.arch_operation = ''; + r.version_kind = 'semver'; + r.vulnerable_range = VersionRange('{0,0,0,0,0,0,0,0,0,1}'::int[], '{0,5,5,5,0,0,0,0,0,0}'::int[], '[)'); + END IF; + INSERT INTO vuln + VALUES (r.*); + INSERT INTO uo_vuln (uo, vuln) + VALUES (op_id, r.id); + END; + END LOOP; + EXECUTE 'REFRESH MATERIALIZED VIEW latest_update_operations'; +END; +$$ +LANGUAGE plpgsql;