diff --git a/datastore/postgres/v2/indexer_v1.go b/datastore/postgres/v2/indexer_v1.go index 8e3508ff6..8b9a0b46b 100644 --- a/datastore/postgres/v2/indexer_v1.go +++ b/datastore/postgres/v2/indexer_v1.go @@ -1,5 +1,71 @@ package postgres +import ( + "context" + "fmt" + "runtime" + + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgxpool" + "github.com/jackc/pgx/v5/stdlib" + "github.com/remind101/migrate" + + "github.com/quay/claircore/datastore/postgres/migrations" + "github.com/quay/claircore/indexer" +) + +// NewIndexerV1 returns a configured [IndexerV1]. +// +// The passed [pgxpool.Config] will have its tracing and lifecycle hooks +// overwritten. +// +// Values that can be used as IndexerOptions: +// - [WithMigrations] +// - [WithMinimumMigration] +func NewIndexerV1(ctx context.Context, cfg *pgxpool.Config, opt ...IndexerOption) (*IndexerV1, error) { + const prefix = `indexer` + idxCfg := newIndexerConfig() + for _, o := range opt { + idxCfg = o.indexerConfig(idxCfg) + } + + if idxCfg.Migrations { + cfg := cfg.ConnConfig.Copy() + cfg.DefaultQueryExecMode = pgx.QueryExecModeExec + err := func() error { + db := stdlib.OpenDB(*cfg) + defer db.Close() + migrator := migrate.NewPostgresMigrator(db) + migrator.Table = migrations.IndexerMigrationTable + err := migrator.Exec(migrate.Up, migrations.IndexerMigrations...) + if err != nil { + return fmt.Errorf("failed to perform migrations: %w", err) + } + return nil + }() + if err != nil { + return nil, err + } + } + + var s IndexerV1 + var err error + if err = s.init(ctx, cfg, prefix); err != nil { + return nil, err + } + + if err := s.checkRevision(ctx, pgx.Identifier([]string{migrations.IndexerMigrationTable}), idxCfg.MinMigration); err != nil { + return nil, err + } + + _, file, line, _ := runtime.Caller(1) + runtime.SetFinalizer(&s, func(s *IndexerV1) { + panic(fmt.Sprintf("%s:%d: IndexerV1 not closed", file, line)) + }) + + return &s, nil +} + // IndexerOption is an option for configuring an indexer datastore. type IndexerOption interface { indexerConfig(indexerConfig) indexerConfig @@ -17,3 +83,33 @@ func newIndexerConfig() indexerConfig { MinMigration: MinimumIndexerMigration, } } + +// Static assertion for the [indexer.Store] interface. +var _ indexer.Store = (*IndexerV1)(nil) + +// IndexerV1 implements [indexer.Store] backed by a PostgreSQL database. +type IndexerV1 struct { + storeCommon +} + +// Close implements [indexer.Store]. +func (s *IndexerV1) Close(_ context.Context) error { + runtime.SetFinalizer(s, nil) + return s.storeCommon.Close() +} + +// RegisterScanners is a bad name. +func (s *IndexerV1) RegisterScanners(ctx context.Context, vs indexer.VersionedScanners) (err error) { + ctx, done := s.method(ctx, &err) + defer done() + rvs := rotateVersionedScanners(vs) + + err = s.pool.AcquireFunc(ctx, s.acquire(ctx, `register`, func(ctx context.Context, c *pgxpool.Conn, query string) (err error) { + _, err = c.Exec(ctx, query, rvs.Name, rvs.Version, rvs.Kind) + return err + })) + if err != nil { + return err + } + return nil +} diff --git a/datastore/postgres/v2/indexer_v1_affectedmanifests.go b/datastore/postgres/v2/indexer_v1_affectedmanifests.go new file mode 100644 index 000000000..3d66311b5 --- /dev/null +++ b/datastore/postgres/v2/indexer_v1_affectedmanifests.go @@ -0,0 +1,267 @@ +package postgres + +import ( + "context" + "errors" + "fmt" + "runtime/pprof" + "strconv" + + "github.com/jackc/pgx/v5" + "github.com/quay/zlog" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/trace" + + "github.com/quay/claircore" +) + +var ( + // ErrNotIndexed indicates the vulnerability being queried has a dist or repo not + // indexed into the database. + ErrNotIndexed = fmt.Errorf("vulnerability containers data not indexed by any scannners") +) + +// AffectedManifests finds the manifests digests which are affected by the provided vulnerability. +// +// An exhaustive search for all indexed packages of the same name as the vulnerability is performed. +// +// The list of packages is filtered down to only the affected set. +// +// The manifest index is then queried to resolve a list of manifest hashes containing the affected +// artifacts. +func (s *IndexerV1) AffectedManifests(ctx context.Context, v claircore.Vulnerability, vulnFunc claircore.CheckVulnernableFunc) (_ []claircore.Digest, err error) { + ctx, done := s.method(ctx, &err) + defer done() + ctx = zlog.ContextWithValues(ctx, "vulnerability", v.Name) + + out := []claircore.Digest{} + err = pgx.BeginTxFunc(ctx, s.pool, pgx.TxOptions{AccessMode: pgx.ReadOnly}, + s.tx(ctx, `AffectedManifests`, func(ctx context.Context, tx pgx.Tx) (err error) { + var pr claircore.IndexRecord + span := trace.SpanFromContext(ctx) + + err = pgx.BeginFunc(ctx, tx, s.tx(ctx, `protoRecord`, s.protoRecordCall(&pr, v))) + switch { + case err == nil: + case errors.Is(err, ErrNotIndexed): + // This is a common case: the system knows of a vulnerability but + // doesn't know of any manifests it could apply to. + zlog.Debug(ctx).Msg("not indexed") + trace.SpanFromContext(ctx).SetStatus(codes.Ok, "not indexed") + return nil + default: + return err + } + + // Collect all packages which may be affected by the vulnerability + // in question. + pkgsToFilter := []claircore.Package{} + + err = pgx.BeginFunc(ctx, tx, + s.call(ctx, `selectPackages`, func(ctx context.Context, tx pgx.Tx, query string) error { + rows, err := tx.Query(ctx, query, v.Package.Name) + if err != nil { + return fmt.Errorf("vulnerability %q: %w", v.ID, err) + } + defer rows.Close() + + for rows.Next() { + var pkg claircore.Package + var id int64 + var nKind *string + err := rows.Scan( + &id, + &pkg.Name, + &pkg.Version, + &pkg.Kind, + &nKind, + &pkg.NormalizedVersion, + &pkg.Module, + &pkg.Arch, + ) + if err != nil { + return fmt.Errorf("unmarshal error: %w", err) + } + idStr := strconv.FormatInt(id, 10) + pkg.ID = idStr + if nKind != nil { + pkg.NormalizedVersion.Kind = *nKind + } + pkgsToFilter = append(pkgsToFilter, pkg) + } + trace.SpanFromContext(ctx). + AddEvent("loaded packages", trace.WithAttributes(attribute.Int("count", len(pkgsToFilter)))) + zlog.Debug(ctx).Int("count", len(pkgsToFilter)).Msg("packages to filter") + if err := rows.Err(); err != nil { + return fmt.Errorf("error reading response: %w", err) + } + return nil + })) + if err != nil { + return fmt.Errorf("unable to select packages: %w", err) + } + + // for each package discovered create an index record + // and determine if any in-tree matcher finds the record vulnerable + var filteredRecords []claircore.IndexRecord + for i := range pkgsToFilter { + pkg := &pkgsToFilter[i] + pr.Package = pkg + var match bool + var err error + pprof.Do(ctx, pprof.Labels("hook", "CheckVulnFunc"), func(ctx context.Context) { + match, err = vulnFunc(ctx, &pr, &v) + }) + if err != nil { + return fmt.Errorf("error in check vulnerable hook: %w", err) + } + if match { + filteredRecords = append(filteredRecords, claircore.IndexRecord{ + Package: pkg, + Distribution: pr.Distribution, + Repository: pr.Repository, + }) + } + } + span.AddEvent("filtered packages", trace.WithAttributes(attribute.Int("count", len(filteredRecords)))) + zlog.Debug(ctx).Int("count", len(filteredRecords)).Msg("vulnerable index records") + // Query the manifest index for manifests containing the vulnerable + // IndexRecords and create a set containing each unique manifest. + set := map[string]struct{}{} + selectAffected := func(id string, dist, repo *uint64) callFunc { + return func(ctx context.Context, tx pgx.Tx, query string) error { + rows, err := tx.Query(ctx, query, id, dist, repo) + if err != nil { + return err + } + defer rows.Close() + for rows.Next() { + var hash string + if err := rows.Scan(&hash); err != nil { + return err + } + if _, ok := set[hash]; ok { + continue + } + set[hash] = struct{}{} + i := len(out) + out = append(out, claircore.Digest{}) + if err := out[i].UnmarshalText([]byte(hash)); err != nil { + return err + } + } + return rows.Err() + } + } + + for _, record := range filteredRecords { + v, err := toValues(record) + if err != nil { + return fmt.Errorf("failed to get sql values for query: %w", err) + } + err = pgx.BeginFunc(ctx, tx, s.call(ctx, `selectAffected`, selectAffected(record.Package.ID, v[2], v[3]))) + switch { + case errors.Is(err, nil): + default: + return fmt.Errorf("error selecting affected: %w", err) + } + } + + span.AddEvent("affected manifests", trace.WithAttributes(attribute.Int("count", len(out)))) + zlog.Debug(ctx).Int("count", len(out)).Msg("affected manifests") + return nil + })) + if err != nil { + return nil, err + } + return out, nil +} + +func (s *IndexerV1) protoRecordCall(out *claircore.IndexRecord, v claircore.Vulnerability) txFunc { + return func(ctx context.Context, tx pgx.Tx) error { + // fill dist into prototype index record if exists + if (v.Dist != nil) && (v.Dist.Name != "") { + const name = `selectDist` + var did int64 + err := pgx.BeginFunc(ctx, tx, s.call(ctx, name, protoRecordSelectDist(&did, v.Dist))) + switch { + case errors.Is(err, nil): + id := strconv.FormatInt(did, 10) + out.Distribution = &claircore.Distribution{ + ID: id, + Arch: v.Dist.Arch, + CPE: v.Dist.CPE, + DID: v.Dist.DID, + Name: v.Dist.Name, + PrettyName: v.Dist.PrettyName, + Version: v.Dist.Version, + VersionCodeName: v.Dist.VersionCodeName, + VersionID: v.Dist.VersionID, + } + zlog.Debug(ctx).Str("id", id).Msg("discovered distribution id") + case errors.Is(err, pgx.ErrNoRows): + // OK + default: + return fmt.Errorf("failed to scan dist: %w", err) + } + } else { + zlog.Debug(ctx).Msg("no distribution") + } + + // fill repo into prototype index record if exists + if (v.Repo != nil) && (v.Repo.Name != "") { + const name = `selectRepo` + var rid int64 + err := pgx.BeginFunc(ctx, tx, s.call(ctx, name, protoRecordSelectRepo(&rid, v.Repo))) + switch { + case errors.Is(err, nil): + id := strconv.FormatInt(rid, 10) + out.Repository = &claircore.Repository{ + ID: id, + Key: v.Repo.Key, + Name: v.Repo.Name, + URI: v.Repo.URI, + } + zlog.Debug(ctx).Str("id", id).Msg("discovered repo id") + case errors.Is(err, pgx.ErrNoRows): + // OK + default: + return fmt.Errorf("failed to scan repo: %w", err) + } + } else { + zlog.Debug(ctx).Msg("no repository") + } + + // we need at least a repo or distribution to continue + if (out.Distribution == nil) && (out.Repository == nil) { + return ErrNotIndexed + } + return nil + } +} + +func protoRecordSelectDist(out *int64, d *claircore.Distribution) callFunc { + return func(ctx context.Context, tx pgx.Tx, query string) error { + return tx.QueryRow(ctx, query, + d.Arch, + d.CPE, + d.DID, + d.Name, + d.PrettyName, + d.Version, + d.VersionCodeName, + d.VersionID, + ).Scan(out) + } +} + +func protoRecordSelectRepo(out *int64, r *claircore.Repository) callFunc { + return func(ctx context.Context, tx pgx.Tx, query string) error { + return tx.QueryRow(ctx, query, + r.Name, + r.Key, + r.URI, + ).Scan(out) + } +} diff --git a/datastore/postgres/v2/indexer_v1_affectedmanifests_test.go b/datastore/postgres/v2/indexer_v1_affectedmanifests_test.go new file mode 100644 index 000000000..e07527cb9 --- /dev/null +++ b/datastore/postgres/v2/indexer_v1_affectedmanifests_test.go @@ -0,0 +1,300 @@ +package postgres + +import ( + "context" + "encoding/json" + "os" + "path/filepath" + "testing" + + "github.com/jackc/pgx/v5/pgxpool" + "github.com/quay/zlog" + + "github.com/quay/claircore" + "github.com/quay/claircore/indexer" + "github.com/quay/claircore/pkg/omnimatcher" + "github.com/quay/claircore/test/integration" + pgtest "github.com/quay/claircore/test/postgres/v2" +) + +type affectedE2E struct { + store indexer.Store + pool *pgxpool.Pool + ctx context.Context + ir claircore.IndexReport + vr claircore.VulnerabilityReport +} + +func TestAffectedE2E(t *testing.T) { + integration.NeedDB(t) + ctx := zlog.Test(context.Background(), t) + cfg := pgtest.TestIndexerDB(ctx, t) + store, err := NewIndexerV1(ctx, cfg, WithMigrations) + if err != nil { + t.Fatal(err) + } + defer store.Close(ctx) + pool, err := pgxpool.NewWithConfig(ctx, cfg) + if err != nil { + t.Fatal(err) + } + t.Cleanup(pool.Close) + + table := []struct { + // name of the defined affectedE2E test + name string + // file name of index report in ./testdata + irFName string + // file name of vuln report in ./testdata + vrFName string + }{ + // these fixtures + // were generated against the same database + // to ensure all ids are sequentially increasing + // + // if fixtures are added you must generate + // this current set *and* your new fixtures against the same database + // to ensure there are no ID overlaps + // + // generate them via go generate github.com/quay/claircore/datastore/postgres + { + name: "amazonlinux 1", + irFName: "docker.io-library-amazonlinux-1.index.json", + vrFName: "docker.io-library-amazonlinux-1.report.json", + }, + { + name: "debian 8", + irFName: "docker.io-library-debian-8.index.json", + vrFName: "docker.io-library-debian-8.report.json", + }, + { + name: "debian 9", + irFName: "docker.io-library-debian-9.index.json", + vrFName: "docker.io-library-debian-9.report.json", + }, + { + name: "debian 10", + irFName: "docker.io-library-debian-10.index.json", + vrFName: "docker.io-library-debian-10.report.json", + }, + { + name: "ubi 8", + irFName: "registry.access.redhat.com-ubi8-ubi.index.json", + vrFName: "registry.access.redhat.com-ubi8-ubi.report.json", + }, + { + name: "ubuntu 16.04", + irFName: "docker.io-library-ubuntu-16.04.index.json", + vrFName: "docker.io-library-ubuntu-16.04.report.json", + }, + { + name: "ubuntu 18.04", + irFName: "docker.io-library-ubuntu-18.04.index.json", + vrFName: "docker.io-library-ubuntu-18.04.report.json", + }, + { + name: "ubuntu 19.10", + irFName: "docker.io-library-ubuntu-19.10.index.json", + vrFName: "docker.io-library-ubuntu-19.10.report.json", + }, + { + name: "ubuntu 20.04", + irFName: "docker.io-library-ubuntu-20.04.index.json", + vrFName: "docker.io-library-ubuntu-20.04.report.json", + }, + { + name: "mitmproxy 4.0.1", + irFName: "docker.io-mitmproxy-mitmproxy-4.0.1.index.json", + vrFName: "docker.io-mitmproxy-mitmproxy-4.0.1.report.json", + }, + } + + for _, tt := range table { + // grab and deserialize test data + irPath := filepath.Join("testdata", tt.irFName) + vrPath := filepath.Join("testdata", tt.vrFName) + irFD, err := os.Open(irPath) + if err != nil { + t.Fatalf("fd open for ir failed: %v", err) + } + vrFD, err := os.Open(vrPath) + if err != nil { + t.Fatalf("fd open for vr failed: %v", err) + } + + var ir claircore.IndexReport + var vr claircore.VulnerabilityReport + + err = json.NewDecoder(irFD).Decode(&ir) + if err != nil { + t.Fatalf("could not decode ir: %v", err) + } + + err = json.NewDecoder(vrFD).Decode(&vr) + if err != nil { + t.Fatalf("could not decode vr: %v", err) + } + + // create and run e2e test + e2e := &affectedE2E{ + store: store, + pool: pool, + ctx: ctx, + ir: ir, + vr: vr, + } + t.Run(tt.name, e2e.Run) + } +} + +func (e *affectedE2E) Run(t *testing.T) { + type subtest struct { + name string + do func(t *testing.T) + } + subtests := [...]subtest{ + {"IndexArtifacts", e.IndexArtifacts}, + {"IndexManifest", e.IndexManifest}, + {"AffectedManifests", e.AffectedManifests}, + } + if len(e.vr.Vulnerabilities) == 0 { + t.Fatal("bad test harness: no vulnerabilities") + } + for _, subtest := range subtests { + if !t.Run(subtest.name, subtest.do) { + t.FailNow() + } + } +} + +// IndexArtifacts manually writes all the necessary +// artifacts to the db. +// +// this is required so foreign key constraints do not +// fail in later tests. +func (e *affectedE2E) IndexArtifacts(t *testing.T) { + ctx := zlog.Test(e.ctx, t) + const ( + insertManifest = ` + INSERT INTO manifest + (hash) + VALUES ($1) + ON CONFLICT DO NOTHING; + ` + insertPkg = ` + INSERT INTO package (name, kind, version, norm_kind, norm_version, module, arch, id) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8) + ON CONFLICT DO NOTHING; + ` + insertDist = ` + INSERT INTO dist + (name, did, version, version_code_name, version_id, arch, cpe, pretty_name, id) + VALUES + ($1, $2, $3, $4, $5, $6, $7, $8, $9) + ON CONFLICT DO NOTHING; + ` + insertRepo = ` + INSERT INTO repo + (name, key, uri, id) + VALUES ($1, $2, $3, $4) + ON CONFLICT DO NOTHING; + ` + ) + _, err := e.pool.Exec(ctx, insertManifest, e.ir.Hash.String()) + if err != nil { + t.Fatalf("failed to insert manifest: %v", err) + } + for _, pkg := range e.ir.Packages { + _, err := e.pool.Exec(ctx, insertPkg, + pkg.Name, + pkg.Kind, + pkg.Version, + pkg.NormalizedVersion.Kind, + pkg.NormalizedVersion, + pkg.Module, + pkg.Arch, + pkg.ID, + ) + if err != nil { + t.Fatalf("failed to insert package: %v", err) + } + if pkg.Source != nil { + pkg := pkg.Source + _, err := e.pool.Exec(ctx, insertPkg, + pkg.Name, + pkg.Kind, + pkg.Version, + pkg.NormalizedVersion.Kind, + pkg.NormalizedVersion, + pkg.Module, + pkg.Arch, + pkg.ID, + ) + if err != nil { + t.Fatalf("failed to insert source package: %v", err) + } + } + } + for _, dist := range e.ir.Distributions { + _, err := e.pool.Exec(ctx, insertDist, + dist.Name, + dist.DID, + dist.Version, + dist.VersionCodeName, + dist.VersionID, + dist.Arch, + dist.CPE, + dist.PrettyName, + dist.ID, + ) + if err != nil { + t.Fatalf("failed to insert dist: %v", err) + } + } + for _, repo := range e.ir.Repositories { + _, err := e.pool.Exec(ctx, insertRepo, + repo.Name, + repo.Key, + repo.URI, + repo.ID, + ) + if err != nil { + t.Fatalf("failed to insert repo: %v", err) + } + } +} + +// IndexManifest confirms the contents of a manifest +// can be written to the manifest index table. +func (e *affectedE2E) IndexManifest(t *testing.T) { + ctx := zlog.Test(e.ctx, t) + err := e.store.IndexManifest(ctx, &e.ir) + if err != nil { + t.Fatalf("failed to index manifest: %v", err) + } +} + +// AffectedManifests confirms each vulnerability +// in the vulnereability report reports the associated +// manifest is affected. +func (e *affectedE2E) AffectedManifests(t *testing.T) { + ctx := zlog.Test(e.ctx, t) + om := omnimatcher.New(nil) + for _, vuln := range e.vr.Vulnerabilities { + t.Logf("vulnerability: %s (%s)", vuln.Name, vuln.Package.Name) + hashes, err := e.store.AffectedManifests(ctx, *vuln, om.Vulnerable) + if err != nil { + t.Fatalf("failed to retrieve affected manifest for vuln %s: %v", vuln.ID, err) + } + + if len(hashes) != 1 { + t.Fatalf("got: len(hashes)==%d, want: len(hashes)==1", len(hashes)) + } + + got := hashes[0].String() + wanted := e.ir.Hash.String() + if got != wanted { + t.Fatalf("got: %v, want: %v", got, wanted) + } + } +} diff --git a/datastore/postgres/v2/indexer_v1_bylayer.go b/datastore/postgres/v2/indexer_v1_bylayer.go new file mode 100644 index 000000000..8f90a34b9 --- /dev/null +++ b/datastore/postgres/v2/indexer_v1_bylayer.go @@ -0,0 +1,143 @@ +package postgres + +import ( + "context" + "fmt" + "reflect" + + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgxpool" + + "github.com/quay/claircore" + "github.com/quay/claircore/indexer" +) + +// ArtifactByLayer is a helper that does the "easy" cases of the *ByLayer +// methods. +func artifactByLayer[T artifact](ctx context.Context, s *IndexerV1, hash claircore.Digest, vs indexer.VersionedScanners) (out []T, err error) { + rvs := rotateVersionedScanners(vs) + typ := reflect.TypeOf(out).Elem() + fn := fmt.Sprintf(`helper_%s_bylayer.sql`, typ.Name()) + err = pgx.BeginTxFunc(ctx, s.pool, txRO, s.callfile(ctx, fn, `query`, func(ctx context.Context, tx pgx.Tx, query string) error { + rows, err := tx.Query(ctx, query, hash, rvs.Name, rvs.Version, rvs.Kind) + if err != nil { + return err + } + out, err = pgx.CollectRows(rows, pgx.RowTo[T]) + return err + })) + return out, err +} + +// PtrSlice returns a slice of pointers to the values in the passed slice. +func ptrSlice[T any](s []T) []*T { + if s == nil { + return nil + } + + out := make([]*T, len(s)) + for i := range s { + out[i] = &s[i] + } + return out +} + +// DistributionsByLayer implements [indexer.Store]. +func (s *IndexerV1) DistributionsByLayer(ctx context.Context, hash claircore.Digest, vs indexer.VersionedScanners) (_ []*claircore.Distribution, err error) { + ctx, done := s.method(ctx, &err) + defer done() + + out, err := artifactByLayer[claircore.Distribution](ctx, s, hash, vs) + return ptrSlice(out), err +} + +// FilesByLayer implements [indexer.Store]. +func (s *IndexerV1) FilesByLayer(ctx context.Context, hash claircore.Digest, vs indexer.VersionedScanners) (_ []claircore.File, err error) { + ctx, done := s.method(ctx, &err) + defer done() + + out, err := artifactByLayer[claircore.File](ctx, s, hash, vs) + return out, err +} + +// RepositoriesByLayer implements [indexer.Store]. +func (s *IndexerV1) RepositoriesByLayer(ctx context.Context, hash claircore.Digest, vs indexer.VersionedScanners) (_ []*claircore.Repository, err error) { + ctx, done := s.method(ctx, &err) + defer done() + + out, err := artifactByLayer[claircore.Repository](ctx, s, hash, vs) + return ptrSlice(out), err +} + +// PackagesByLayer implements [indexer.Store]. +func (s *IndexerV1) PackagesByLayer(ctx context.Context, hash claircore.Digest, scnrs indexer.VersionedScanners) (_ []*claircore.Package, err error) { + ctx, done := s.method(ctx, &err) + defer done() + // This method is different from the others because Packages are very + // special. + + var ps []claircore.Package + lookup := make(map[string]int) + todo := make(map[string]string) + rvs := rotateVersionedScanners(scnrs) + + err = s.pool.AcquireFunc(ctx, s.acquire(ctx, `query`, func(ctx context.Context, c *pgxpool.Conn, query string) (err error) { + rows, err := c.Query(ctx, query, hash.String(), rvs.Name, rvs.Version, rvs.Kind) + if err != nil { + return err + } + defer rows.Close() + for rows.Next() { + i := len(ps) + ps = append(ps, claircore.Package{}) + pkg := &ps[i] + + var srcID, nKind, fPath *string + err = rows.Scan( + &pkg.ID, + &pkg.Name, + &pkg.Kind, + &pkg.Version, + &nKind, + &pkg.NormalizedVersion, + &pkg.Module, + &pkg.Arch, + &srcID, + &pkg.PackageDB, + &pkg.RepositoryHint, + &fPath, + ) + if err != nil { + return err + } + lookup[pkg.ID] = i + if nKind != nil { + pkg.NormalizedVersion.Kind = *nKind + } + if fPath != nil { + pkg.Filepath = *fPath + } + if srcID != nil { + if si, ok := lookup[*srcID]; ok { + pkg.Source = &ps[si] + } else { + todo[pkg.ID] = *srcID + } + } + } + return rows.Err() + })) + if err != nil { + return nil, err + } + for pkgID, srcID := range todo { + si, ok := lookup[srcID] + if !ok { + continue // No Source ? + } + pkg := &ps[lookup[pkgID]] + pkg.Source = &ps[si] + } + + return ptrSlice(ps), nil +} diff --git a/datastore/postgres/v2/indexer_v1_bylayer_benchmark_test.go b/datastore/postgres/v2/indexer_v1_bylayer_benchmark_test.go new file mode 100644 index 000000000..bfebc7b88 --- /dev/null +++ b/datastore/postgres/v2/indexer_v1_bylayer_benchmark_test.go @@ -0,0 +1,164 @@ +package postgres + +import ( + "context" + "testing" + + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgxpool" + "github.com/quay/zlog" + + "github.com/quay/claircore" + "github.com/quay/claircore/indexer" + "github.com/quay/claircore/test" + "github.com/quay/claircore/test/integration" + pgtest "github.com/quay/claircore/test/postgres/v2" +) + +func Benchmark_PackagesByLayer(b *testing.B) { + integration.NeedDB(b) + ctx := context.Background() + benchmarks := []struct { + name string + hash claircore.Digest + pkgs int + scnrs int + }{ + { + name: "10 package, 5 scanners", + hash: test.RandomSHA256Digest(b), + pkgs: 10, + scnrs: 5, + }, + { + name: "50 packages, 25 scanners", + hash: test.RandomSHA256Digest(b), + pkgs: 50, + scnrs: 25, + }, + { + name: "100 packages, 50 scanners", + hash: test.RandomSHA256Digest(b), + pkgs: 100, + scnrs: 50, + }, + { + name: "500 packages, 250 scanners", + hash: test.RandomSHA256Digest(b), + pkgs: 500, + scnrs: 250, + }, + { + name: "1000 packages, 500 scanners", + hash: test.RandomSHA256Digest(b), + pkgs: 1000, + scnrs: 500, + }, + { + name: "2000 packages, 1000 scanners", + hash: test.RandomSHA256Digest(b), + pkgs: 2000, + scnrs: 1000, + }, + { + name: "3000 packages, 2000 scanners", + hash: test.RandomSHA256Digest(b), + pkgs: 3000, + scnrs: 1000, + }, + { + name: "3000 packages, 500 scanners", + hash: test.RandomSHA256Digest(b), + pkgs: 3000, + scnrs: 500, + }, + { + name: "3000 packages, 250 scanners", + hash: test.RandomSHA256Digest(b), + pkgs: 3000, + scnrs: 250, + }, + { + name: "3000 packages, 50 scanners", + hash: test.RandomSHA256Digest(b), + pkgs: 2000, + scnrs: 50, + }, + { + name: "3000 packages, 10 scanners", + hash: test.RandomSHA256Digest(b), + pkgs: 2000, + scnrs: 10, + }, + } + + for _, bench := range benchmarks { + b.Run(bench.name, func(b *testing.B) { + ctx := zlog.Test(ctx, b) + cfg := pgtest.TestIndexerDB(ctx, b) + pool, err := pgxpool.NewWithConfig(ctx, cfg) + if err != nil { + b.Fatal(err) + } + defer pool.Close() + store, err := NewIndexerV1(ctx, cfg) + if err != nil { + b.Fatal(err) + } + defer store.Close(ctx) + + pkgs := pgtest.Generate[claircore.Package](ctx, bench.pkgs) + err = pgx.BeginFunc(ctx, pool, pkgs.Exec) + if err != nil { + b.Fatalf("failed to insert packages: %v", err) + } + + ps := pgtest.Generate[indexer.PackageScanner](ctx, bench.scnrs) + err = pgx.BeginFunc(ctx, pool, ps.Exec) + if err != nil { + b.Fatalf("failed to insert scanners: %v", err) + } + var vs []indexer.VersionedScanner + err = pgx.BeginFunc(ctx, pool, func(tx pgx.Tx) error { + rows, err := tx.Query(ctx, `SELECT name, version, kind FROM scanner WHERE id = ANY($1)`, ps.IDs) + if err != nil { + return err + } + defer rows.Close() + for rows.Next() { + var m mockScanner + if err := rows.Scan(&m.name, &m.version, &m.kind); err != nil { + return err + } + vs = append(vs, &m) + } + return rows.Err() + }) + if err != nil { + b.Fatalf("failed to read back scanners: %v", err) + } + + // create scanartifacts + err = pgx.BeginFunc(ctx, pool, pgtest.CreatePackageScanArtifacts(ctx, bench.hash, pkgs.IDs(), ps.IDs())) + if err != nil { + b.Fatalf("failed to insert scan artifacts for test: %v", err) + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, err := store.PackagesByLayer(ctx, bench.hash, vs) + if err != nil { + b.Fatalf("failed to retrieve packages by layer: %v", err) + } + } + }) + } +} + +type mockScanner struct { + name, version, kind string +} + +func (m *mockScanner) Name() string { return m.name } +func (m *mockScanner) Version() string { return m.version } +func (m *mockScanner) Kind() string { return m.kind } diff --git a/datastore/postgres/v2/indexer_v1_deletemanifests.go b/datastore/postgres/v2/indexer_v1_deletemanifests.go new file mode 100644 index 000000000..cfd7a830d --- /dev/null +++ b/datastore/postgres/v2/indexer_v1_deletemanifests.go @@ -0,0 +1,66 @@ +package postgres + +import ( + "context" + + "github.com/jackc/pgx/v5" + "github.com/quay/zlog" + + "github.com/quay/claircore" +) + +// DeleteManifests implements [indexer.Store]. +func (s *IndexerV1) DeleteManifests(ctx context.Context, d ...claircore.Digest) (out []claircore.Digest, err error) { + ctx, done := s.method(ctx, &err) + defer done() + + out = make([]claircore.Digest, 0, len(d)) + err = pgx.BeginTxFunc(ctx, s.pool, txRW, s.tx(ctx, `DeleteManifests`, func(ctx context.Context, tx pgx.Tx) (err error) { + err = pgx.BeginFunc(ctx, tx, s.call(ctx, `deleteManifests`, deleteManifests(&out, d))) + if err != nil { + return err + } + zlog.Debug(ctx). + Int("count", len(out)). + Int("nonexistant", len(d)-len(out)). + Msg("deleted manifests") + err = pgx.BeginFunc(ctx, tx, s.call(ctx, `layerCleanup`, layerCleanup)) + if err != nil { + return err + } + return nil + })) + if err != nil { + return nil, err + } + return out, nil +} + +func deleteManifests(out *[]claircore.Digest, ds []claircore.Digest) callFunc { + return func(ctx context.Context, tx pgx.Tx, query string) (err error) { + rows, err := tx.Query(ctx, query, ds) + if err != nil { + return err + } + defer rows.Close() + for rows.Next() { + i := len(*out) + *out = (*out)[:i+1] + if err := rows.Scan(&(*out)[i]); err != nil { + return err + } + } + return rows.Err() + } +} + +func layerCleanup(ctx context.Context, tx pgx.Tx, query string) (err error) { + tag, err := tx.Exec(ctx, query) + if err != nil { + return err + } + zlog.Debug(ctx). + Int64("count", tag.RowsAffected()). + Msg("deleted layers") + return nil +} diff --git a/datastore/postgres/v2/indexer_v1_deletemanifests_test.go b/datastore/postgres/v2/indexer_v1_deletemanifests_test.go new file mode 100644 index 000000000..975d70235 --- /dev/null +++ b/datastore/postgres/v2/indexer_v1_deletemanifests_test.go @@ -0,0 +1,248 @@ +package postgres + +import ( + "context" + "testing" + + "github.com/google/go-cmp/cmp" + "github.com/jackc/pgx/v5/pgxpool" + "github.com/quay/zlog" + + "github.com/quay/claircore" + "github.com/quay/claircore/indexer" + "github.com/quay/claircore/test" + "github.com/quay/claircore/test/integration" + pgtest "github.com/quay/claircore/test/postgres/v2" +) + +func TestDeleteManifests(t *testing.T) { + integration.NeedDB(t) + ctx := zlog.Test(context.Background(), t) + cfg := pgtest.TestIndexerDB(ctx, t) + store, err := NewIndexerV1(ctx, cfg, WithMigrations) + if err != nil { + t.Fatal(err) + } + defer store.Close(ctx) + pool, err := pgxpool.NewWithConfig(ctx, cfg) + if err != nil { + t.Fatal(err) + } + t.Cleanup(pool.Close) + + t.Run("Nonexistent", func(t *testing.T) { + ctx := zlog.Test(ctx, t) + in := []claircore.Digest{ + test.RandomSHA256Digest(t), + } + got, err := store.DeleteManifests(ctx, in...) + if err != nil { + t.Error(err) + } + if len(got) != 0 { + t.Error(cmp.Diff(got, []claircore.Digest{}, cmpOpts)) + } + }) + t.Run("NonexistentMulti", func(t *testing.T) { + ctx := zlog.Test(ctx, t) + in := []claircore.Digest{ + test.RandomSHA256Digest(t), + test.RandomSHA256Digest(t), + test.RandomSHA256Digest(t), + test.RandomSHA256Digest(t), + test.RandomSHA256Digest(t), + } + got, err := store.DeleteManifests(ctx, in...) + if err != nil { + t.Error(err) + } + if len(got) != 0 { + t.Error(cmp.Diff(got, []claircore.Digest{}, cmpOpts)) + } + }) + const insertManifest = `INSERT INTO manifest (hash) SELECT unnest($1::TEXT[]);` + t.Run("One", func(t *testing.T) { + ctx := zlog.Test(ctx, t) + want := []claircore.Digest{ + test.RandomSHA256Digest(t), + } + if _, err := pool.Exec(ctx, insertManifest, want); err != nil { + t.Error(err) + } + got, err := store.DeleteManifests(ctx, want...) + if err != nil { + t.Error(err) + } + if !cmp.Equal(got, want, cmpOpts) { + t.Error(cmp.Diff(got, want, cmpOpts)) + } + }) + t.Run("Subset", func(t *testing.T) { + ctx := zlog.Test(ctx, t) + in := make([]claircore.Digest, 8) + for i := range in { + in[i] = test.RandomSHA256Digest(t) + } + if _, err := pool.Exec(ctx, insertManifest, in); err != nil { + t.Error(err) + } + for _, want := range [][]claircore.Digest{in[:4], in[4:]} { + arg := append(want[:len(want):len(want)], test.RandomSHA256Digest(t), test.RandomSHA256Digest(t)) + got, err := store.DeleteManifests(ctx, arg...) + if err != nil { + t.Error(err) + } + if !cmp.Equal(got, want, cmpOpts) { + t.Error(cmp.Diff(got, want, cmpOpts)) + } + } + }) + const ( + insertLayers = `INSERT INTO layer (hash) SELECT unnest($1::TEXT[]);` + assoc = `WITH + l AS (SELECT id FROM layer WHERE hash = ANY($1::TEXT[])), + m AS (SELECT id FROM manifest WHERE hash = $2::TEXT) +INSERT INTO manifest_layer (i, manifest_id, layer_id) + SELECT ROW_NUMBER() OVER (), m.id, l.id FROM m, l;` + ) + t.Run("Layers", func(t *testing.T) { + const ( + nManifests = 8 + layersPer = 4 + ) + ctx := zlog.Test(ctx, t) + ms := make([]claircore.Digest, nManifests) + for i := range ms { + ms[i] = test.RandomSHA256Digest(t) + } + ls := make([]claircore.Digest, nManifests+layersPer-1) + for i := range ls { + ls[i] = test.RandomSHA256Digest(t) + } + + if _, err := pool.Exec(ctx, insertManifest, ms); err != nil { + t.Error(err) + } + if _, err := pool.Exec(ctx, insertLayers, ls); err != nil { + t.Error(err) + } + var nLayers int + if err := pool.QueryRow(ctx, `SELECT COUNT(*) FROM layer;`).Scan(&nLayers); err != nil { + t.Error(err) + } + for i, m := range ms { + tag, err := pool.Exec(ctx, assoc, ls[i:i+layersPer], m) + t.Logf("affected: %d", tag.RowsAffected()) + if err != nil { + t.Error(err) + } + } + + prev := len(ls) + for _, m := range ms { + want := []claircore.Digest{m} + got, err := store.DeleteManifests(ctx, want...) + if err != nil { + t.Error(err) + } + if !cmp.Equal(got, want, cmpOpts) { + t.Error(cmp.Diff(got, want, cmpOpts)) + } + var rem int + if err := pool.QueryRow(ctx, `SELECT COUNT(*) FROM layer;`).Scan(&rem); err != nil { + t.Error(err) + } + if got, want := rem, prev; got >= want { + t.Errorf("left overlayers: got: == %d, < want %d", got, want) + } + prev = rem + } + + var rem int + if err := pool.QueryRow(ctx, `SELECT COUNT(*) FROM layer;`).Scan(&rem); err != nil { + t.Error(err) + } + if got, want := rem, 0; got != want { + t.Errorf("left overlayers: got: %d, want %d", got, want) + } + }) + + t.Run("ManifestIndex", func(t *testing.T) { + const ( + layersN = 4 + manifestsN = 100 + packageN = 10 + ) + ctx := zlog.Test(ctx, t) + toDelete := make([]claircore.Digest, manifestsN) + for i := 0; i < manifestsN; i++ { + ir := &claircore.IndexReport{} + ir.Hash = test.RandomSHA256Digest(t) + toDelete[i] = ir.Hash + ls := make([]claircore.Digest, layersN) + for i := range ls { + ls[i] = test.RandomSHA256Digest(t) + } + + if _, err := pool.Exec(ctx, insertManifest, []claircore.Digest{ir.Hash}); err != nil { + t.Error(err) + } + if _, err := pool.Exec(ctx, insertLayers, ls); err != nil { + t.Error(err) + } + var nLayers int + if err := pool.QueryRow(ctx, `SELECT COUNT(*) FROM layer;`).Scan(&nLayers); err != nil { + t.Error(err) + } + tag, err := pool.Exec(ctx, assoc, ls, ir.Hash) + t.Logf("affected: %d", tag.RowsAffected()) + if err != nil { + t.Error(err) + } + + scnr := indexer.NewPackageScannerMock("mock", "1", "vulnerability") + if err := store.RegisterScanners(ctx, indexer.VersionedScanners{scnr}); err != nil { + t.Error(err) + } + + pkgs := test.GenUniquePackages(packageN) + layer := &claircore.Layer{Hash: ls[0]} + if err := store.IndexPackages(ctx, pkgs, layer, scnr); err != nil { + t.Error(err) + } + // Retrieve packages from DB so they are all correctly ID'd + if pkgs, err = store.PackagesByLayer(ctx, layer.Hash, []indexer.VersionedScanner{scnr}); err != nil { + t.Error(err) + } + + pkgMap := make(map[string]*claircore.Package, packageN) + envs := make(map[string][]*claircore.Environment, packageN) + for _, p := range pkgs { + pkgMap[p.ID] = p + envs[p.ID] = []*claircore.Environment{ + { + PackageDB: "pdb", + IntroducedIn: ls[0], + DistributionID: "d", + RepositoryIDs: []string{}, + }, + } + } + ir.Packages = pkgMap + ir.Environments = envs + + if err := store.IndexManifest(ctx, ir); err != nil { + t.Error(err) + } + } + got, err := store.DeleteManifests(ctx, toDelete...) + if err != nil { + t.Error(err) + } + if len(got) != manifestsN { + t.Error(cmp.Diff(got, toDelete, cmpOpts)) + } + }) +} + +var cmpOpts cmp.Option = cmp.Transformer("DigestTransformer", func(d claircore.Digest) string { return d.String() }) diff --git a/datastore/postgres/v2/indexer_v1_e2e_test.go b/datastore/postgres/v2/indexer_v1_e2e_test.go new file mode 100644 index 000000000..703a42be0 --- /dev/null +++ b/datastore/postgres/v2/indexer_v1_e2e_test.go @@ -0,0 +1,438 @@ +package postgres + +import ( + "context" + "reflect" + "testing" + + "github.com/google/go-cmp/cmp" + "github.com/quay/zlog" + + "github.com/quay/claircore" + "github.com/quay/claircore/indexer" + "github.com/quay/claircore/test" + "github.com/quay/claircore/test/integration" + pgtest "github.com/quay/claircore/test/postgres/v2" +) + +// mockScnr is a kind-agnostic scanner we will +// use for testing purposes. +type mockScnr struct { + name string + kind string + version string +} + +func (m mockScnr) Name() string { + return m.name +} +func (m mockScnr) Kind() string { + return m.kind +} +func (m mockScnr) Version() string { + return m.version +} + +type indexE2e struct { + name string + store *IndexerV1 + ctx context.Context + manifest claircore.Manifest + scnrs indexer.VersionedScanners + packageGen int + distGen int + repoGen int +} + +func TestIndexE2E(t *testing.T) { + integration.NeedDB(t) + ctx := zlog.Test(context.Background(), t) + + e2es := []indexE2e{ + { + name: "3 scanners gen small", + scnrs: indexer.VersionedScanners{ + mockScnr{ + name: "test-scanner", + kind: "test", + version: "v0.0.1", + }, + mockScnr{ + name: "test-scanner1", + kind: "test", + version: "v0.0.11", + }, + mockScnr{ + name: "test-scanner2", + kind: "test", + version: "v0.0.8", + }, + }, + packageGen: 100, + distGen: 150, + repoGen: 50, + }, + { + name: "6 scanners gen small", + scnrs: indexer.VersionedScanners{ + mockScnr{ + name: "test-scanner", + kind: "test", + version: "v0.0.1", + }, + mockScnr{ + name: "test-scanner1", + kind: "test", + version: "v0.0.11", + }, + mockScnr{ + name: "test-scanner2", + kind: "test", + version: "v0.0.8", + }, + mockScnr{ + name: "test-scanner3", + kind: "test", + version: "v0.0.8", + }, + mockScnr{ + name: "test-scanner4", + kind: "test", + version: "v0.0.8", + }, + mockScnr{ + name: "test-scanner5", + kind: "test", + version: "v0.0.8", + }, + }, + packageGen: 100, + distGen: 150, + repoGen: 50, + }, + { + name: "3 scanners gen large", + scnrs: indexer.VersionedScanners{ + mockScnr{ + name: "test-scanner", + kind: "test", + version: "v0.0.1", + }, + mockScnr{ + name: "test-scanner1", + kind: "test", + version: "v0.0.11", + }, + mockScnr{ + name: "test-scanner2", + kind: "test", + version: "v0.0.8", + }, + }, + packageGen: 1000, + distGen: 1500, + repoGen: 500, + }, + { + name: "6 scanners gen large", + scnrs: indexer.VersionedScanners{ + mockScnr{ + name: "test-scanner", + kind: "test", + version: "v0.0.1", + }, + mockScnr{ + name: "test-scanner1", + kind: "test", + version: "v0.0.11", + }, + mockScnr{ + name: "test-scanner2", + kind: "test", + version: "v0.0.8", + }, + mockScnr{ + name: "test-scanner3", + kind: "test", + version: "v0.0.8", + }, + mockScnr{ + name: "test-scanner4", + kind: "test", + version: "v0.0.8", + }, + mockScnr{ + name: "test-scanner5", + kind: "test", + version: "v0.0.8", + }, + }, + packageGen: 1000, + distGen: 1500, + repoGen: 500, + }, + } + + for _, e := range e2es { + cfg := pgtest.TestIndexerDB(ctx, t) + store, err := NewIndexerV1(ctx, cfg, WithMigrations) + if err != nil { + t.Fatal(err) + } + + layer := &claircore.Layer{ + Hash: claircore.MustParseDigest(`sha256:5f70bf18a086007016e948b04aed3b82103a36bea41755b6cddfaf10ace3c6ef`), + } + manifest := claircore.Manifest{ + Hash: claircore.MustParseDigest(`sha256:fc92eec5cac70b0c324cec2933cd7db1c0eae7c9e2649e42d02e77eb6da0d15f`), + Layers: []*claircore.Layer{layer}, + } + + e.store = store + e.ctx = ctx + e.manifest = manifest + + t.Run(e.name, e.Run) + } +} + +func (e *indexE2e) Run(t *testing.T) { + t.Cleanup(func() { + e.store.Close(context.Background()) + }) + type subtest struct { + name string + do func(t *testing.T) + } + subtests := [...]subtest{ + {"RegisterScanner", e.RegisterScanner}, + {"PersistManifest", e.PersistManifest}, + {"IndexAndRetrievePackages", e.IndexAndRetrievePackages}, + {"IndexAndRetrieveDistributions", e.IndexAndRetrieveDistributions}, + {"IndexAndRetrieveRepos", e.IndexAndRetrieveRepos}, + {"LayerScanned", e.LayerScanned}, + {"LayerScannedNotExists", e.LayerScannedNotExists}, + {"LayerScannedFalse", e.LayerScannedFalse}, + {"IndexReport", e.IndexReport}, + } + for _, subtest := range subtests { + if !t.Run(subtest.name, subtest.do) { + t.FailNow() + } + } +} + +// PersistManifest confirms we create the necessary +// Manifest and Layer identifies so layer code +// foreign key references do not fail. +func (e *indexE2e) PersistManifest(t *testing.T) { + ctx := zlog.Test(e.ctx, t) + err := e.store.PersistManifest(ctx, e.manifest) + if err != nil { + t.Fatalf("failed to persist manifest: %v", err) + } +} + +// RegisterScanner confirms a scanner can be registered +// and provides this scanner for other subtests to use +func (e *indexE2e) RegisterScanner(t *testing.T) { + ctx := zlog.Test(e.ctx, t) + err := e.store.RegisterScanners(ctx, e.scnrs) + if err != nil { + t.Fatalf("failed to register scnr: %v", err) + } +} + +// IndexAndRetreivePackages confirms inserting and +// selecting packages associated with a layer works +// correctly. +func (e *indexE2e) IndexAndRetrievePackages(t *testing.T) { + ctx := zlog.Test(e.ctx, t) + A := test.GenUniquePackages(e.packageGen) + + for _, scnr := range e.scnrs { + err := e.store.IndexPackages(ctx, A, e.manifest.Layers[0], scnr) + if err != nil { + t.Fatalf("failed to index package: %v", err) + } + } + + B, err := e.store.PackagesByLayer(ctx, e.manifest.Layers[0].Hash, e.scnrs) + if err != nil { + t.Fatalf("failed to retrieve packages by layer: %v", err) + } + + if len(e.scnrs)*e.packageGen != len(B) { + t.Fatalf("wanted len: %v got: %v", len(e.scnrs)*e.packageGen, len(B)) + } +} + +// IndexAndRetreiveDistributions confirms inserting and +// selecting distributions associated with a layer works +// correctly. +func (e *indexE2e) IndexAndRetrieveDistributions(t *testing.T) { + ctx := zlog.Test(e.ctx, t) + A := test.GenUniqueDistributions(e.distGen) + + for _, scnr := range e.scnrs { + err := e.store.IndexDistributions(ctx, A, e.manifest.Layers[0], scnr) + if err != nil { + t.Fatalf("failed to index distributions: %v", err) + } + } + + B, err := e.store.DistributionsByLayer(ctx, e.manifest.Layers[0].Hash, e.scnrs) + if err != nil { + t.Fatalf("failed to retrieve distributions by layer: %v", err) + } + + if len(e.scnrs)*e.distGen != len(B) { + t.Fatalf("wanted len: %v got: %v", len(e.scnrs)*e.distGen, len(B)) + } +} + +// IndexAndRetreiveRepos confirms inserting and +// selecting repositories associated with a layer works +// correctly. +func (e *indexE2e) IndexAndRetrieveRepos(t *testing.T) { + ctx := zlog.Test(e.ctx, t) + generated := test.GenUniqueRepositories(e.repoGen) + defer func() { + if t.Failed() { + dumptable(ctx, t, e.store.pool, "repo_scanartifact") + } + }() + + for _, scnr := range e.scnrs { + err := e.store.IndexRepositories(ctx, generated, e.manifest.Layers[0], scnr) + if err != nil { + t.Fatalf("failed to index repos: %v", err) + } + } + + got, err := e.store.RepositoriesByLayer(ctx, e.manifest.Layers[0].Hash, e.scnrs) + if err != nil { + t.Fatalf("failed to retrieve repos by layer: %v", err) + } + + if len(e.scnrs)*e.repoGen != len(got) { + t.Fatalf("wanted len: %v got: %v", len(e.scnrs)*e.repoGen, len(got)) + } +} + +// LayerScanned confirms the book keeping involved in marking a layer +// scanned works correctly. +func (e *indexE2e) LayerScanned(t *testing.T) { + ctx := zlog.Test(e.ctx, t) + for _, scnr := range e.scnrs { + err := e.store.SetLayerScanned(ctx, e.manifest.Layers[0].Hash, scnr) + if err != nil { + t.Fatalf("failed to set layer scanned: %v", err) + } + + b, err := e.store.LayerScanned(ctx, e.manifest.Layers[0].Hash, scnr) + if err != nil { + t.Fatalf("failed to query if layer is scanned: %v", err) + } + if !b { + t.Fatalf("expected layer to be scanned") + } + } +} + +// LayerScannedNotExists confirms an error is returned when attempting +// to obtain if a layer was scanned by a non-existent scanner. +func (e *indexE2e) LayerScannedNotExists(t *testing.T) { + ctx := zlog.Test(e.ctx, t) + scnr := mockScnr{ + name: "invalid", + kind: "invalid", + version: "invalid", + } + + ok, err := e.store.LayerScanned(ctx, e.manifest.Layers[0].Hash, scnr) + if err != nil { + t.Fatal(err) + } + if ok { + t.Fatalf("got: %v, want: %v", ok, false) + } +} + +// LayerScannedFalse confirms a false boolean is returned when attempting +// to obtain if a non-exitent layer was scanned by a valid scanner +func (e *indexE2e) LayerScannedFalse(t *testing.T) { + ctx := zlog.Test(e.ctx, t) + + // create a layer that has not been persisted to the store + layer := &claircore.Layer{ + Hash: claircore.MustParseDigest(`sha256:5891b5b522d5df086d0ff0b110fbd9d21bb4fc7163af34d08286a2e846f6be03`), + } + + b, err := e.store.LayerScanned(ctx, layer.Hash, e.scnrs[0]) + if err != nil { + t.Fatalf("failed to query if layer is scanned: %v", err) + } + if b { + t.Fatalf("expected layer not to be scanned") + } +} + +// IndexReport confirms the book keeping around index reports works +// correctly. +func (e *indexE2e) IndexReport(t *testing.T) { + ctx := zlog.Test(e.ctx, t) + opts := cmp.Options{ + cmp.Comparer(func(a, b claircore.Digest) bool { + return a.String() == b.String() + }), + cmp.FilterPath(func(p cmp.Path) bool { + return len(p) == 3 && + p[0].Type() == reflect.TypeOf((*claircore.IndexReport)(nil)) && + (p[2].String() != ".Hash" && p[2].String() != ".State") + }, cmp.Ignore()), + } + + A := &claircore.IndexReport{ + Hash: e.manifest.Hash, + State: "Testing", + } + + if err := e.store.SetIndexReport(ctx, A); err != nil { + t.Fatalf("failed to set index report: %v", err) + } + B, ok, err := e.store.IndexReport(ctx, e.manifest.Hash) + if err != nil { + t.Fatalf("failed to retrieve index report: %v", err) + } + if !ok { + t.Fatalf("no index report found") + } + if got, want := B, A; !cmp.Equal(got, want, opts) { + t.Fatal(cmp.Diff(got, want, opts)) + } + + A.State = "IndexFinished" + if err := e.store.SetIndexFinished(ctx, A, e.scnrs); err != nil { + t.Fatalf("failed to set index as finished: %v", err) + } + + ok, err = e.store.ManifestScanned(ctx, e.manifest.Hash, e.scnrs) + if err != nil { + t.Fatalf("failed to query if manifest was scanned: %v", err) + } + if !ok { + t.Fatalf("expected manifest to be scanned") + } + + B, ok, err = e.store.IndexReport(ctx, e.manifest.Hash) + if err != nil { + t.Fatalf("failed to retrieve index report: %v", err) + } + if !ok { + t.Fatalf("no index report found") + } + if got, want := B, A; !cmp.Equal(got, want, opts) { + t.Fatal(cmp.Diff(got, want, opts)) + } +} diff --git a/datastore/postgres/v2/indexer_v1_index.go b/datastore/postgres/v2/indexer_v1_index.go new file mode 100644 index 000000000..24677d084 --- /dev/null +++ b/datastore/postgres/v2/indexer_v1_index.go @@ -0,0 +1,259 @@ +package postgres + +import ( + "bytes" + "context" + "fmt" + "reflect" + "strconv" + "sync" + + "github.com/jackc/pgx/v5" + + "github.com/quay/claircore" + "github.com/quay/claircore/indexer" + "github.com/quay/zlog" +) + +func indexArtifact[T any](ctx context.Context, s *IndexerV1, tx pgx.Tx, hash claircore.Digest, v indexer.VersionedScanner, as []T) (err error) { + ras := rotateArtifacts(as) + typ := reflect.TypeOf(as).Elem() + for typ.Kind() == reflect.Pointer { + typ = typ.Elem() + } + var ids []int64 + + fn := fmt.Sprintf(`helper_%s_indexartifact.sql`, typ.Name()) + err = pgx.BeginFunc(ctx, tx, s.callfile(ctx, fn, `insert`, func(ctx context.Context, tx pgx.Tx, query string) error { + rows, err := tx.Query(ctx, query, ras...) + if err != nil { + return err + } + ids, err = pgx.CollectRows(rows, pgx.RowTo[int64]) + return err + })) + if err != nil { + return err + } + + fn = fmt.Sprintf(`helper_%s_associateartifact.sql`, typ.Name()) + err = pgx.BeginFunc(ctx, tx, s.callfile(ctx, fn, `associate`, func(ctx context.Context, tx pgx.Tx, query string) error { + _, err = tx.Exec(ctx, query, ids, hash, v.Name(), v.Version(), v.Kind()) + return err + })) + + return err +} + +// IndexDistributions implements [indexer.Store]. +func (s *IndexerV1) IndexDistributions(ctx context.Context, dists []*claircore.Distribution, l *claircore.Layer, v indexer.VersionedScanner) (err error) { + ctx, done := s.method(ctx, &err) + defer done() + zlog.Debug(ctx). + Str("name", v.Name()). + Str("version", v.Version()). + Str("kind", v.Kind()). + Stringer("layer", l.Hash). + Send() + + err = pgx.BeginTxFunc(ctx, s.pool, txRW, s.tx(ctx, `IndexDistributions`, func(ctx context.Context, tx pgx.Tx) (err error) { + return indexArtifact[*claircore.Distribution](ctx, s, tx, l.Hash, v, dists) + })) + return err +} + +// IndexFiles implements [indexer.Store]. +func (s *IndexerV1) IndexFiles(ctx context.Context, files []claircore.File, l *claircore.Layer, v indexer.VersionedScanner) (err error) { + ctx, done := s.method(ctx, &err) + defer done() + zlog.Debug(ctx). + Str("name", v.Name()). + Str("version", v.Version()). + Str("kind", v.Kind()). + Stringer("layer", l.Hash). + Send() + + err = pgx.BeginTxFunc(ctx, s.pool, txRW, s.tx(ctx, `IndexFiles`, func(ctx context.Context, tx pgx.Tx) (err error) { + return indexArtifact[claircore.File](ctx, s, tx, l.Hash, v, files) + })) + return err +} + +// IndexRepositories implements [indexer.Store]. +func (s *IndexerV1) IndexRepositories(ctx context.Context, repos []*claircore.Repository, l *claircore.Layer, v indexer.VersionedScanner) (err error) { + ctx, done := s.method(ctx, &err) + defer done() + zlog.Debug(ctx). + Str("name", v.Name()). + Str("version", v.Version()). + Str("kind", v.Kind()). + Stringer("layer", l.Hash). + Send() + + err = pgx.BeginTxFunc(ctx, s.pool, txRW, s.tx(ctx, `IndexRepositories`, func(ctx context.Context, tx pgx.Tx) error { + return indexArtifact[*claircore.Repository](ctx, s, tx, l.Hash, v, repos) + })) + return err +} + +var ( + zeroPackage = claircore.Package{} + emptyNorm = "{}" +) + +// IndexPackages implements [indexer.Store]. +// +// IndexPackages indexes all provided packages along with creating a scan artifact. +// +// If a source package is nested inside a binary package we index the source +// package first and then create a relation between the binary package and +// source package. +// +// Scan artifacts are used to determine if a particular layer has been scanned by a +// particular scanner. See the LayerScanned method for more details. +func (s *IndexerV1) IndexPackages(ctx context.Context, pkgs []*claircore.Package, layer *claircore.Layer, scnr indexer.VersionedScanner) (err error) { + ctx, done := s.method(ctx, &err) + defer done() + + // Big bespoke routine to rotate the packages. + insertPrep := struct { + Name, Kind, Version, Module, Arch, NormKind, NormVersion []*string + }{ + Name: make([]*string, 0, len(pkgs)), + Kind: make([]*string, 0, len(pkgs)), + Version: make([]*string, 0, len(pkgs)), + Module: make([]*string, 0, len(pkgs)), + Arch: make([]*string, 0, len(pkgs)), + NormKind: make([]*string, 0, len(pkgs)), + NormVersion: make([]*string, 0, len(pkgs)), + } + str := make([]byte, 0, 32) + insertRotate := func(pkg *claircore.Package) { + insertPrep.Name = append(insertPrep.Name, &pkg.Name) + insertPrep.Kind = append(insertPrep.Kind, &pkg.Kind) + insertPrep.Version = append(insertPrep.Version, &pkg.Version) + insertPrep.Module = append(insertPrep.Module, &pkg.Module) + insertPrep.Arch = append(insertPrep.Arch, &pkg.Arch) + if pkg.NormalizedVersion.Kind != "" { + insertPrep.NormKind = append(insertPrep.NormKind, &pkg.NormalizedVersion.Kind) + var buf bytes.Buffer + buf.Grow(32) + buf.WriteByte('{') + for i := 0; i < 10; i++ { + if i != 0 { + buf.WriteByte(',') + } + buf.Write(strconv.AppendInt(str, int64(pkg.NormalizedVersion.V[i]), 10)) + } + buf.WriteByte('}') + s := buf.String() + insertPrep.NormVersion = append(insertPrep.NormVersion, &s) + } else { + insertPrep.NormKind = append(insertPrep.NormKind, nil) + insertPrep.NormVersion = append(insertPrep.NormVersion, &emptyNorm) + } + } + var zOnce sync.Once + insertSkipCt := 0 + for _, pkg := range pkgs { + if pkg.Name == "" { + insertSkipCt++ + } + if pkg.Source == nil { + pkg.Source = &zeroPackage + zOnce.Do(func() { insertRotate(pkg.Source) }) + } else { + insertRotate(pkg.Source) + } + insertRotate(pkg) + } + + // Same for association. + associatePrep := struct { + BinName, BinKind, BinVersion, BinModule, BinArch []*string + SrcName, SrcKind, SrcVersion, SrcModule, SrcArch []*string + PkgDB, Hint, Path []*string + }{ + BinName: make([]*string, 0, len(pkgs)), + BinKind: make([]*string, 0, len(pkgs)), + BinVersion: make([]*string, 0, len(pkgs)), + BinModule: make([]*string, 0, len(pkgs)), + BinArch: make([]*string, 0, len(pkgs)), + SrcName: make([]*string, 0, len(pkgs)), + SrcKind: make([]*string, 0, len(pkgs)), + SrcVersion: make([]*string, 0, len(pkgs)), + SrcModule: make([]*string, 0, len(pkgs)), + SrcArch: make([]*string, 0, len(pkgs)), + PkgDB: make([]*string, 0, len(pkgs)), + Hint: make([]*string, 0, len(pkgs)), + Path: make([]*string, 0, len(pkgs)), + } + associateRotate := func(pkg *claircore.Package) { + associatePrep.BinName = append(associatePrep.BinName, &pkg.Name) + associatePrep.BinKind = append(associatePrep.BinKind, &pkg.Kind) + associatePrep.BinVersion = append(associatePrep.BinVersion, &pkg.Version) + associatePrep.BinModule = append(associatePrep.BinModule, &pkg.Module) + associatePrep.BinArch = append(associatePrep.BinArch, &pkg.Arch) + associatePrep.SrcName = append(associatePrep.SrcName, &pkg.Source.Name) + associatePrep.SrcKind = append(associatePrep.SrcKind, &pkg.Source.Kind) + associatePrep.SrcVersion = append(associatePrep.SrcVersion, &pkg.Source.Version) + associatePrep.SrcModule = append(associatePrep.SrcModule, &pkg.Source.Module) + associatePrep.SrcArch = append(associatePrep.SrcArch, &pkg.Source.Arch) + associatePrep.PkgDB = append(associatePrep.PkgDB, &pkg.PackageDB) + associatePrep.Hint = append(associatePrep.Hint, &pkg.RepositoryHint) + associatePrep.Path = append(associatePrep.Path, &pkg.Filepath) + } + associateSkipCt := 0 + for _, pkg := range pkgs { + if pkg.Name == "" { + associateSkipCt++ + } + associateRotate(pkg) + } + + err = pgx.BeginTxFunc(ctx, s.pool, txRW, s.tx(ctx, `IndexPackages`, func(ctx context.Context, tx pgx.Tx) (err error) { + err = pgx.BeginFunc(ctx, tx, s.call(ctx, `insert`, func(ctx context.Context, tx pgx.Tx, query string) (err error) { + var ct int64 + defer func() { + zlog.Debug(ctx). + Int("skipped", insertSkipCt). + Int64("inserted", ct). + Msg("packages inserted") + }() + tag, err := tx.Exec(ctx, query, insertPrep.Name, insertPrep.Kind, insertPrep.Version, insertPrep.NormKind, insertPrep.NormVersion, insertPrep.Module, insertPrep.Arch) + ct = tag.RowsAffected() + return err + })) + if err != nil { + return err + } + + err = pgx.BeginFunc(ctx, tx, s.call(ctx, `associate`, func(ctx context.Context, tx pgx.Tx, query string) (err error) { + var ct int64 + defer func() { + zlog.Debug(ctx). + Int("skipped", associateSkipCt). + Int64("associated", ct). + Msg("packages associated") + }() + tag, err := tx.Exec(ctx, query, + associatePrep.BinName, associatePrep.BinKind, associatePrep.BinVersion, associatePrep.BinModule, associatePrep.BinArch, + associatePrep.SrcName, associatePrep.SrcKind, associatePrep.SrcVersion, associatePrep.SrcModule, associatePrep.SrcArch, + scnr.Name(), scnr.Version(), scnr.Kind(), + &layer.Hash, + associatePrep.PkgDB, associatePrep.Hint, associatePrep.Path, + ) + ct = tag.RowsAffected() + return err + })) + if err != nil { + return err + } + + return nil + })) + if err != nil { + return err + } + return nil +} diff --git a/datastore/postgres/v2/indexer_v1_index_benchmark_test.go b/datastore/postgres/v2/indexer_v1_index_benchmark_test.go new file mode 100644 index 000000000..3503d5cf5 --- /dev/null +++ b/datastore/postgres/v2/indexer_v1_index_benchmark_test.go @@ -0,0 +1,252 @@ +package postgres + +import ( + "context" + "testing" + + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgxpool" + "github.com/quay/zlog" + + "github.com/quay/claircore" + "github.com/quay/claircore/indexer" + "github.com/quay/claircore/test" + "github.com/quay/claircore/test/integration" + pgtest "github.com/quay/claircore/test/postgres/v2" +) + +func Benchmark_IndexPackages(b *testing.B) { + integration.NeedDB(b) + ctx := context.Background() + benchmarks := []struct { + // the name of this benchmark + name string + // number of packages to index. + pkgs int + // the layer that holds the discovered packages + layer *claircore.Layer + // whether the generated package array contains duplicate packages + duplicates bool + }{ + { + name: "10 packages", + pkgs: 10, + layer: &claircore.Layer{ + Hash: test.RandomSHA256Digest(b), + }, + }, + { + name: "10 packages with duplicates", + pkgs: 10, + layer: &claircore.Layer{ + Hash: test.RandomSHA256Digest(b), + }, + duplicates: true, + }, + { + name: "50 packages", + pkgs: 50, + layer: &claircore.Layer{ + Hash: test.RandomSHA256Digest(b), + }, + }, + { + name: "50 packages with duplicates", + pkgs: 50, + layer: &claircore.Layer{ + Hash: test.RandomSHA256Digest(b), + }, + duplicates: true, + }, + { + name: "100 packages", + pkgs: 100, + layer: &claircore.Layer{ + Hash: test.RandomSHA256Digest(b), + }, + }, + { + name: "100 packages with duplicates", + pkgs: 100, + layer: &claircore.Layer{ + Hash: test.RandomSHA256Digest(b), + }, + duplicates: true, + }, + { + name: "250 packages", + pkgs: 250, + layer: &claircore.Layer{ + Hash: test.RandomSHA256Digest(b), + }, + }, + { + name: "250 packages", + pkgs: 250, + layer: &claircore.Layer{ + Hash: test.RandomSHA256Digest(b), + }, + duplicates: true, + }, + { + name: "500 packages", + pkgs: 500, + layer: &claircore.Layer{ + Hash: test.RandomSHA256Digest(b), + }, + }, + { + name: "500 packages with duplicates", + pkgs: 500, + layer: &claircore.Layer{ + Hash: test.RandomSHA256Digest(b), + }, + duplicates: true, + }, + { + name: "1000 packages", + pkgs: 1000, + layer: &claircore.Layer{ + Hash: test.RandomSHA256Digest(b), + }, + }, + { + name: "1000 packages with duplicates", + pkgs: 1000, + layer: &claircore.Layer{ + Hash: test.RandomSHA256Digest(b), + }, + duplicates: true, + }, + { + name: "2000 packages", + pkgs: 2000, + layer: &claircore.Layer{ + Hash: test.RandomSHA256Digest(b), + }, + }, + { + name: "2000 packages with duplicates", + pkgs: 2000, + layer: &claircore.Layer{ + Hash: test.RandomSHA256Digest(b), + }, + duplicates: true, + }, + { + name: "3000 packages", + pkgs: 3000, + layer: &claircore.Layer{ + Hash: test.RandomSHA256Digest(b), + }, + }, + { + name: "3000 packages with duplicates", + pkgs: 3000, + layer: &claircore.Layer{ + Hash: test.RandomSHA256Digest(b), + }, + duplicates: true, + }, + { + name: "4000 packages", + pkgs: 4000, + layer: &claircore.Layer{ + Hash: test.RandomSHA256Digest(b), + }, + }, + { + name: "4000 packages with duplicates", + pkgs: 4000, + layer: &claircore.Layer{ + Hash: test.RandomSHA256Digest(b), + }, + duplicates: true, + }, + { + name: "5000 packages", + pkgs: 5000, + layer: &claircore.Layer{ + Hash: test.RandomSHA256Digest(b), + }, + }, + { + name: "5000 packages with duplicates", + pkgs: 5000, + layer: &claircore.Layer{ + Hash: test.RandomSHA256Digest(b), + }, + duplicates: true, + }, + } + + for _, bench := range benchmarks { + b.Run(bench.name, func(b *testing.B) { + ctx := zlog.Test(ctx, b) + cfg := pgtest.TestIndexerDB(ctx, b) + pool, err := pgxpool.NewWithConfig(ctx, cfg) + if err != nil { + b.Fatal(err) + } + defer pool.Close() + store, err := NewIndexerV1(ctx, cfg) + if err != nil { + b.Fatal(err) + } + defer store.Close(ctx) + + ps := pgtest.Generate[indexer.PackageScanner](ctx, 1) + err = pgx.BeginFunc(ctx, pool, ps.Exec) + if err != nil { + b.Fatalf("failed to insert scanners: %v", err) + } + var vs []indexer.VersionedScanner + err = pgx.BeginFunc(ctx, pool, func(tx pgx.Tx) error { + rows, err := tx.Query(ctx, `SELECT name, version, kind FROM scanner WHERE id = ANY($1)`, ps.IDs()) + if err != nil { + return err + } + defer rows.Close() + for rows.Next() { + var m mockScanner + if err := rows.Scan(&m.name, &m.version, &m.kind); err != nil { + return err + } + vs = append(vs, &m) + } + return rows.Err() + }) + if err != nil { + b.Fatalf("failed to read back scanners: %v", err) + } + + // gen packages + var pkgs []*claircore.Package + if bench.duplicates { + pkgs, err = test.GenDuplicatePackages(bench.pkgs) + if err != nil { + b.Fatalf("failed to generate duplicate packages: %v", err) + } + } else { + pkgs = test.GenUniquePackages(bench.pkgs) + } + + // insert layer + insertLayer := `INSERT INTO layer (hash) VALUES ($1);` + _, err = pool.Exec(ctx, insertLayer, bench.layer.Hash) + if err != nil { + b.Fatalf("failed to insert test layer: %v", err) + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + // run the indexing + err = store.IndexPackages(ctx, pkgs, bench.layer, vs[0]) + if err != nil { + b.Fatalf("failed to index packages: %v", err) + } + } + }) + } + +} diff --git a/datastore/postgres/v2/indexer_v1_indexreport.go b/datastore/postgres/v2/indexer_v1_indexreport.go new file mode 100644 index 000000000..4924a8efc --- /dev/null +++ b/datastore/postgres/v2/indexer_v1_indexreport.go @@ -0,0 +1,75 @@ +package postgres + +import ( + "context" + "errors" + + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgxpool" + + "github.com/quay/claircore" + "github.com/quay/claircore/indexer" +) + +// IndexReport implements [indexer.Store]. +func (s *IndexerV1) IndexReport(ctx context.Context, hash claircore.Digest) (_ *claircore.IndexReport, exists bool, err error) { + ctx, done := s.method(ctx, &err) + defer done() + + // All the "real" work in this method is shoved into database hooks (see + // types_indexreport.go) and the function helpers (see metrics.go). + + var ir claircore.IndexReport + err = s.pool.AcquireFunc(ctx, s.acquire(ctx, `cached`, func(ctx context.Context, c *pgxpool.Conn, query string) error { + return c.QueryRow(ctx, query, hash).Scan(&ir) + })) + switch { + case errors.Is(err, nil): + case errors.Is(err, pgx.ErrNoRows): + return nil, false, nil + default: + return nil, false, err + } + return &ir, true, nil +} + +func (s *IndexerV1) setIndexReport(ctx context.Context, ir *claircore.IndexReport) func(pgx.Tx) error { + return s.callfile(ctx, `helper_cache_indexreport.sql`, `setindexreport`, func(ctx context.Context, tx pgx.Tx, query string) error { + _, err := tx.Exec(ctx, query, ir.Hash, ir) + return err + }) +} + +// SetIndexReport implements [indexer.Store]. +func (s *IndexerV1) SetIndexReport(ctx context.Context, ir *claircore.IndexReport) (err error) { + ctx, done := s.method(ctx, &err) + defer done() + err = pgx.BeginTxFunc(ctx, s.pool, txRW, s.tx(ctx, `SetIndexReport`, func(ctx context.Context, tx pgx.Tx) error { + return pgx.BeginFunc(ctx, tx, s.setIndexReport(ctx, ir)) + })) + if err != nil { + return err + } + return nil +} + +// SetIndexFinished implements [indexer.Store]. +func (s *IndexerV1) SetIndexFinished(ctx context.Context, ir *claircore.IndexReport, vs indexer.VersionedScanners) (err error) { + ctx, done := s.method(ctx, &err) + defer done() + + rvs := rotateVersionedScanners(vs) + err = pgx.BeginTxFunc(ctx, s.pool, txRW, s.tx(ctx, `SetIndexFinished`, func(ctx context.Context, tx pgx.Tx) (err error) { + err = pgx.BeginFunc(ctx, tx, s.call(ctx, `insertmanifest`, func(ctx context.Context, tx pgx.Tx, query string) error { + _, err := tx.Exec(ctx, query, ir.Hash, rvs.Name, rvs.Version, rvs.Kind) + return err + })) + if err != nil { + return err + } + + err = pgx.BeginFunc(ctx, tx, s.setIndexReport(ctx, ir)) + return err + })) + return err +} diff --git a/datastore/postgres/v2/indexer_v1_layerscanned.go b/datastore/postgres/v2/indexer_v1_layerscanned.go new file mode 100644 index 000000000..3e8734889 --- /dev/null +++ b/datastore/postgres/v2/indexer_v1_layerscanned.go @@ -0,0 +1,50 @@ +package postgres + +import ( + "context" + "errors" + + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgxpool" + "github.com/quay/zlog" + + "github.com/quay/claircore" + "github.com/quay/claircore/indexer" +) + +// LayerScanned implements [indexer.Store]. +func (s *IndexerV1) LayerScanned(ctx context.Context, hash claircore.Digest, scnr indexer.VersionedScanner) (ok bool, err error) { + ctx, done := s.method(ctx, &err) + defer done() + + err = s.pool.AcquireFunc(ctx, s.acquire(ctx, `query`, func(ctx context.Context, c *pgxpool.Conn, query string) error { + return c.QueryRow(ctx, query, scnr.Name(), scnr.Version(), scnr.Kind(), hash).Scan(&ok) + })) + switch { + case errors.Is(err, nil): + case errors.Is(err, pgx.ErrNoRows): + return false, nil + default: + return false, err + } + + return ok, nil +} + +// SetLayerScanned implements [indexer.Store]. +func (s *IndexerV1) SetLayerScanned(ctx context.Context, hash claircore.Digest, vs indexer.VersionedScanner) (err error) { + ctx, done := s.method(ctx, &err) + defer done() + ctx = zlog.ContextWithValues(ctx, + "scanner", vs.Name(), + ) + + err = s.pool.AcquireFunc(ctx, s.acquire(ctx, `insert`, func(ctx context.Context, c *pgxpool.Conn, query string) error { + _, err := s.pool.Exec(ctx, query, hash, vs.Name(), vs.Version(), vs.Kind()) + return err + })) + if err != nil { + return err + } + return nil +} diff --git a/datastore/postgres/v2/indexer_v1_manifest.go b/datastore/postgres/v2/indexer_v1_manifest.go new file mode 100644 index 000000000..6059f0075 --- /dev/null +++ b/datastore/postgres/v2/indexer_v1_manifest.go @@ -0,0 +1,184 @@ +package postgres + +import ( + "context" + "errors" + "fmt" + "strconv" + + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgxpool" + "github.com/quay/zlog" + + "github.com/quay/claircore" + "github.com/quay/claircore/indexer" +) + +// IndexManifest implements [indexer.Store]. +func (s *IndexerV1) IndexManifest(ctx context.Context, ir *claircore.IndexReport) (err error) { + ctx, done := s.method(ctx, &err) + defer done() + d := ir.Hash.String() + if d == "" { + err = errors.New("invalid digest") + return err + } + records := ir.IndexRecords() + if len(records) == 0 { + zlog.Warn(ctx).Msg("manifest being indexed has 0 index records") + return nil + } + + doAssociate := func(id *uint64, v [4]*uint64) callFunc { + return func(ctx context.Context, tx pgx.Tx, query string) (err error) { + _, err = tx.Exec(ctx, query, id, v[2], v[3], d) + return err + } + } + + err = pgx.BeginTxFunc(ctx, s.pool, pgx.TxOptions{AccessMode: pgx.ReadWrite}, s.tx(ctx, `IndexManifest`, func(ctx context.Context, tx pgx.Tx) (err error) { + const name = `associate` + var v [4]*uint64 + for i, r := range records { + if r.Package == nil { + zlog.Debug(ctx).Int("index", i).Msg("ignoring nil Package") + continue + } + v, err = toValues(*r) + if err != nil { + err = fmt.Errorf("received a record with an invalid id: %v", err) + return err + } + if v[0] != nil { + err = pgx.BeginFunc(ctx, tx, s.call(ctx, name, doAssociate(v[0], v))) + if err != nil { + return err + } + } + err = pgx.BeginFunc(ctx, tx, s.call(ctx, name, doAssociate(v[1], v))) + if err != nil { + return err + } + } + return nil + })) + if err != nil { + return err + } + zlog.Debug(ctx).Msg("manifest indexed") + return nil +} + +// ToValues is a helper method which checks for nil pointers inside an +// IndexRecord before returning an associated pointer to the artifact in +// question. +// +// v[0] source package id or nil +// v[1] package id or nil +// v[2] distribution id or nil +// v[3] repository id or nil +func toValues(r claircore.IndexRecord) ([4]*uint64, error) { + res := [4]*uint64{} + + if r.Package.Source != nil { + id, err := strconv.ParseUint(r.Package.Source.ID, 10, 64) + if err != nil { + return res, fmt.Errorf("source package id %v: %v", r.Package.ID, err) + } + res[0] = &id + } + + if r.Package != nil { + id, err := strconv.ParseUint(r.Package.ID, 10, 64) + if err != nil { + return res, fmt.Errorf("package id %v: %v", r.Package.ID, err) + } + res[1] = &id + + } + + if r.Distribution != nil { + id, err := strconv.ParseUint(r.Distribution.ID, 10, 64) + if err != nil { + return res, fmt.Errorf("distribution id %v: %v", r.Distribution.ID, err) + } + res[2] = &id + } + + if r.Repository != nil { + id, err := strconv.ParseUint(r.Repository.ID, 10, 64) + if err != nil { + // return res, fmt.Errorf("repository id %v: %v", r.Package.ID, err) + return res, nil + } + res[3] = &id + } + + return res, nil +} + +// ManifestScanned implements [indexer.Store]. +// +// ManifestScanned determines if a manifest has been scanned by ALL the provided +// scanners. +func (s *IndexerV1) ManifestScanned(ctx context.Context, hash claircore.Digest, vs indexer.VersionedScanners) (ok bool, err error) { + ctx, done := s.method(ctx, &err) + defer done() + + rvs := rotateVersionedScanners(vs) + err = s.pool.AcquireFunc(ctx, s.acquire(ctx, `query`, func(ctx context.Context, c *pgxpool.Conn, query string) error { + return c.QueryRow(ctx, query, hash.String(), rvs.Name, rvs.Version, rvs.Kind).Scan(&ok) + })) + switch { + case errors.Is(err, nil): + case errors.Is(err, pgx.ErrNoRows): + return false, nil + default: + return false, err + } + + return ok, nil +} + +// PersistManifest implements [indexer.Store]. +func (s *IndexerV1) PersistManifest(ctx context.Context, manifest claircore.Manifest) (err error) { + ctx, done := s.method(ctx, &err) + defer done() + + layers := make([]string, len(manifest.Layers)) + for i, l := range manifest.Layers { + layers[i] = l.Hash.String() + } + + err = pgx.BeginTxFunc(ctx, s.pool, txRW, s.tx(ctx, `Persist`, func(ctx context.Context, tx pgx.Tx) (err error) { + err = pgx.BeginFunc(ctx, tx, s.call(ctx, `insertmanifest`, func(ctx context.Context, tx pgx.Tx, query string) (err error) { + _, err = tx.Exec(ctx, query, manifest.Hash) + return err + })) + if err != nil { + return err + } + + err = pgx.BeginFunc(ctx, tx, s.call(ctx, `insertlayers`, func(ctx context.Context, tx pgx.Tx, query string) (err error) { + _, err = tx.Exec(ctx, query, layers) + return err + })) + if err != nil { + return err + } + + err = pgx.BeginFunc(ctx, tx, s.call(ctx, `associate`, func(ctx context.Context, tx pgx.Tx, query string) (err error) { + _, err = tx.Exec(ctx, query, manifest.Hash, layers) + return err + })) + if err != nil { + return err + } + + return nil + })) + if err != nil { + return err + } + return nil +} diff --git a/datastore/postgres/v2/indexer_v1_test.go b/datastore/postgres/v2/indexer_v1_test.go new file mode 100644 index 000000000..1e3cb7dd9 --- /dev/null +++ b/datastore/postgres/v2/indexer_v1_test.go @@ -0,0 +1,7 @@ +package postgres + +import "testing" + +func TestIndexer(t *testing.T) { + t.Skip("TODO") +} diff --git a/datastore/postgres/v2/queries/indexer/affectedmanifests_selectaffected.sql b/datastore/postgres/v2/queries/indexer/affectedmanifests_selectaffected.sql new file mode 100644 index 000000000..8a85563c4 --- /dev/null +++ b/datastore/postgres/v2/queries/indexer/affectedmanifests_selectaffected.sql @@ -0,0 +1,9 @@ +SELECT + manifest.hash +FROM + manifest_index + JOIN manifest ON manifest_index.manifest_id = manifest.id +WHERE + package_id = $1 + AND dist_id IS NOT DISTINCT FROM $2 + AND repo_id IS NOT DISTINCT FROM $3; diff --git a/datastore/postgres/v2/queries/indexer/affectedmanifests_selectdist.sql b/datastore/postgres/v2/queries/indexer/affectedmanifests_selectdist.sql new file mode 100644 index 000000000..c418d22f6 --- /dev/null +++ b/datastore/postgres/v2/queries/indexer/affectedmanifests_selectdist.sql @@ -0,0 +1,13 @@ +SELECT + id +FROM + dist +WHERE + arch = $1 + AND cpe = $2 + AND did = $3 + AND name = $4 + AND pretty_name = $5 + AND version = $6 + AND version_code_name = $7 + AND version_id = $8; diff --git a/datastore/postgres/v2/queries/indexer/affectedmanifests_selectpackages.sql b/datastore/postgres/v2/queries/indexer/affectedmanifests_selectpackages.sql new file mode 100644 index 000000000..a23e32f09 --- /dev/null +++ b/datastore/postgres/v2/queries/indexer/affectedmanifests_selectpackages.sql @@ -0,0 +1,13 @@ +SELECT + id, + name, + version, + kind, + norm_kind, + norm_version, + module, + arch +FROM + package +WHERE + name = $1; diff --git a/datastore/postgres/v2/queries/indexer/affectedmanifests_selectrepo.sql b/datastore/postgres/v2/queries/indexer/affectedmanifests_selectrepo.sql new file mode 100644 index 000000000..f75217164 --- /dev/null +++ b/datastore/postgres/v2/queries/indexer/affectedmanifests_selectrepo.sql @@ -0,0 +1,8 @@ +SELECT + id +FROM + repo +WHERE + name = $1 + AND key = $2 + AND uri = $3; diff --git a/datastore/postgres/v2/queries/indexer/deletemanifests_deletemanifests.sql b/datastore/postgres/v2/queries/indexer/deletemanifests_deletemanifests.sql new file mode 100644 index 000000000..a3214434f --- /dev/null +++ b/datastore/postgres/v2/queries/indexer/deletemanifests_deletemanifests.sql @@ -0,0 +1,4 @@ +DELETE FROM manifest +WHERE hash = ANY ($1::text[]) +RETURNING + manifest.hash; diff --git a/datastore/postgres/v2/queries/indexer/deletemanifests_layercleanup.sql b/datastore/postgres/v2/queries/indexer/deletemanifests_layercleanup.sql new file mode 100644 index 000000000..402922ed5 --- /dev/null +++ b/datastore/postgres/v2/queries/indexer/deletemanifests_layercleanup.sql @@ -0,0 +1,7 @@ +DELETE FROM layer +WHERE NOT EXISTS ( + SELECT + FROM + manifest_layer + WHERE + manifest_layer.layer_id = layer.id); diff --git a/datastore/postgres/v2/queries/indexer/distributionsbylayer_query.sql b/datastore/postgres/v2/queries/indexer/distributionsbylayer_query.sql new file mode 100644 index 000000000..29d47aff1 --- /dev/null +++ b/datastore/postgres/v2/queries/indexer/distributionsbylayer_query.sql @@ -0,0 +1,31 @@ +WITH scanner AS MATERIALIZED ( + SELECT + id + FROM + scanner + JOIN UNNEST( + $2::text[], $3::text[], $4::text[] +) AS find ( + name, + version, + kind +) USING ( + name, version, kind +)) +SELECT + dist.id, + dist.name, + dist.did, + dist.version, + dist.version_code_name, + dist.version_id, + dist.arch, + dist.cpe, + dist.pretty_name +FROM + dist_scanartifact + JOIN dist ON dist_scanartifact.dist_id = dist.id + JOIN scanner ON dist_scanartifact.scanner_id = scanner.id + JOIN layer ON dist_scanartifact.layer_id = layer.id +WHERE + layer.hash = $1::text; diff --git a/datastore/postgres/v2/queries/indexer/filesbylayer_query.sql b/datastore/postgres/v2/queries/indexer/filesbylayer_query.sql new file mode 100644 index 000000000..3e5ab3797 --- /dev/null +++ b/datastore/postgres/v2/queries/indexer/filesbylayer_query.sql @@ -0,0 +1,24 @@ +WITH scanner AS MATERIALIZED ( + SELECT + id + FROM + scanner + JOIN UNNEST( + $2::text[], $3::text[], $4::text[] +) AS find ( + name, + version, + kind +) USING ( + name, version, kind +)) +SELECT + file.path, + file.kind +FROM + file_scanartifact + JOIN file ON file_scanartifact.file_id = file.id + JOIN scanner ON file_scanartifact.scanner_id = scanner.id + JOIN layer ON file_scanartifact.layer_id = layer.id +WHERE + layer.hash = $1::text; diff --git a/datastore/postgres/v2/queries/indexer/helper_cache_indexreport.sql b/datastore/postgres/v2/queries/indexer/helper_cache_indexreport.sql new file mode 100644 index 000000000..e9988d811 --- /dev/null +++ b/datastore/postgres/v2/queries/indexer/helper_cache_indexreport.sql @@ -0,0 +1,12 @@ +INSERT INTO indexreport (manifest_id, scan_result) +SELECT + manifest.id, + $2::jsonb +FROM + manifest +WHERE + hash = $1::text +ON CONFLICT + (manifest_id) +DO + UPDATE SET scan_result = excluded.scan_result; diff --git a/datastore/postgres/v2/queries/indexer/helper_distribution_associateartifact.sql b/datastore/postgres/v2/queries/indexer/helper_distribution_associateartifact.sql new file mode 100644 index 000000000..efebd3fa7 --- /dev/null +++ b/datastore/postgres/v2/queries/indexer/helper_distribution_associateartifact.sql @@ -0,0 +1,28 @@ +WITH layer AS ( + SELECT + id + FROM + layer + WHERE + hash = $2 +), +scanner AS ( + SELECT + id + FROM + scanner + WHERE + scanner.name = $3 + AND scanner.version = $4 + AND scanner.kind = $5) +INSERT INTO dist_scanartifact (dist_id, layer_id, scanner_id) +SELECT + dist.id AS dist_id, + layer.id AS layer_id, + scanner.id AS scanner_id +FROM + UNNEST($1::int8[]) AS dist (id) + CROSS JOIN layer + CROSS JOIN scanner +ON CONFLICT + DO NOTHING; diff --git a/datastore/postgres/v2/queries/indexer/helper_distribution_bylayer.sql b/datastore/postgres/v2/queries/indexer/helper_distribution_bylayer.sql new file mode 100644 index 000000000..34078af04 --- /dev/null +++ b/datastore/postgres/v2/queries/indexer/helper_distribution_bylayer.sql @@ -0,0 +1,31 @@ +WITH scanner AS MATERIALIZED ( + SELECT + id + FROM + scanner + JOIN UNNEST( + $2::text[], $3::text[], $4::text[] +) AS find ( + name, + version, + kind +) USING ( + name, version, kind +)) +SELECT + ROW (dist.id::text, + dist.name, + dist.did, + dist.version, + dist.version_code_name, + dist.version_id, + dist.arch, + dist.cpe, + dist.pretty_name) +FROM + dist_scanartifact + JOIN dist ON dist_scanartifact.dist_id = dist.id + JOIN scanner ON dist_scanartifact.scanner_id = scanner.id + JOIN layer ON dist_scanartifact.layer_id = layer.id +WHERE + layer.hash = $1::text; diff --git a/datastore/postgres/v2/queries/indexer/helper_distribution_indexartifact.sql b/datastore/postgres/v2/queries/indexer/helper_distribution_indexartifact.sql new file mode 100644 index 000000000..cf53fb23c --- /dev/null +++ b/datastore/postgres/v2/queries/indexer/helper_distribution_indexartifact.sql @@ -0,0 +1,50 @@ +WITH input ( + name, + did, + version, + version_code_name, + version_id, + arch, + cpe, + pretty_name +) AS ( + SELECT + * + FROM + UNNEST($1::text[], $2::text[], $3::text[], $4::text[], $5::text[], $6::text[], $7::text[], $8::text[]) +), +inserted AS ( +INSERT INTO dist (name, did, version, version_code_name, version_id, arch, cpe, pretty_name) + SELECT + name, + did, + version, + version_code_name, + version_id, + arch, + cpe, + pretty_name + FROM + input + ON CONFLICT (name, + did, + version, + version_code_name, + version_id, + arch, + cpe, + pretty_name) + DO NOTHING + RETURNING + id +) +SELECT + id +FROM + inserted +UNION ALL +SELECT + dist.id +FROM + input + JOIN dist USING (name, did, version, version_code_name, version_id, arch, cpe, pretty_name); diff --git a/datastore/postgres/v2/queries/indexer/helper_file_associateartifact.sql b/datastore/postgres/v2/queries/indexer/helper_file_associateartifact.sql new file mode 100644 index 000000000..f1ae849eb --- /dev/null +++ b/datastore/postgres/v2/queries/indexer/helper_file_associateartifact.sql @@ -0,0 +1,28 @@ +WITH layer AS ( + SELECT + id + FROM + layer + WHERE + hash = $2 +), +scanner AS ( + SELECT + id + FROM + scanner + WHERE + scanner.name = $3 + AND scanner.version = $4 + AND scanner.kind = $5) +INSERT INTO file_scanartifact (file_id, layer_id, scanner_id) +SELECT + file.id AS file_id, + layer.id AS layer_id, + scanner.id AS scanner_id +FROM + UNNEST($1::int8[]) AS file (id) + CROSS JOIN layer + CROSS JOIN scanner +ON CONFLICT + DO NOTHING; diff --git a/datastore/postgres/v2/queries/indexer/helper_file_bylayer.sql b/datastore/postgres/v2/queries/indexer/helper_file_bylayer.sql new file mode 100644 index 000000000..232a6039c --- /dev/null +++ b/datastore/postgres/v2/queries/indexer/helper_file_bylayer.sql @@ -0,0 +1,24 @@ +WITH scanner AS MATERIALIZED ( + SELECT + id + FROM + scanner + JOIN UNNEST( + $2::text[], $3::text[], $4::text[] +) AS find ( + name, + version, + kind +) USING ( + name, version, kind +)) +SELECT + ROW (file.path, + file.kind) +FROM + file_scanartifact + JOIN file ON file_scanartifact.file_id = file.id + JOIN scanner ON file_scanartifact.scanner_id = scanner.id + JOIN layer ON file_scanartifact.layer_id = layer.id +WHERE + layer.hash = $1::text; diff --git a/datastore/postgres/v2/queries/indexer/helper_file_indexartifact.sql b/datastore/postgres/v2/queries/indexer/helper_file_indexartifact.sql new file mode 100644 index 000000000..ba02245e9 --- /dev/null +++ b/datastore/postgres/v2/queries/indexer/helper_file_indexartifact.sql @@ -0,0 +1,30 @@ +WITH input ( + path, + kind +) AS ( + SELECT * FROM + UNNEST($1::text[], $2::text[]) +), +inserted AS ( +INSERT INTO file (path, kind) + SELECT + input.path, + input.kind + FROM + input + ON CONFLICT (path, + kind) + DO NOTHING + RETURNING + id +) +SELECT + id +FROM + inserted +UNION ALL +SELECT + file.id +FROM + input + JOIN file USING (path, kind); diff --git a/datastore/postgres/v2/queries/indexer/helper_repository_associateartifact.sql b/datastore/postgres/v2/queries/indexer/helper_repository_associateartifact.sql new file mode 100644 index 000000000..664f9d7b5 --- /dev/null +++ b/datastore/postgres/v2/queries/indexer/helper_repository_associateartifact.sql @@ -0,0 +1,28 @@ +WITH layer AS ( + SELECT + id + FROM + layer + WHERE + hash = $2::text +), +scanner AS ( + SELECT + id + FROM + scanner + WHERE + scanner.name = $3::text + AND scanner.version = $4::text + AND scanner.kind = $5::text) +INSERT INTO repo_scanartifact (repo_id, layer_id, scanner_id) +SELECT + repo.id, + layer.id, + scanner.id +FROM + UNNEST($1::int8[]) AS repo (id) + CROSS JOIN layer + CROSS JOIN scanner +ON CONFLICT + DO NOTHING; diff --git a/datastore/postgres/v2/queries/indexer/helper_repository_bylayer.sql b/datastore/postgres/v2/queries/indexer/helper_repository_bylayer.sql new file mode 100644 index 000000000..0c58ec11c --- /dev/null +++ b/datastore/postgres/v2/queries/indexer/helper_repository_bylayer.sql @@ -0,0 +1,21 @@ +SELECT + ROW (repo.id::text, + repo.name, + repo.key, + repo.uri, + repo.cpe) +FROM + repo_scanartifact + LEFT JOIN repo ON repo_scanartifact.repo_id = repo.id + JOIN layer ON layer.hash = $1::text +WHERE + repo_scanartifact.layer_id = layer.id + AND repo_scanartifact.scanner_id = ANY ( + SELECT + id + FROM + scanner + JOIN UNNEST($2::text[], $3::text[], $4::text[]) AS input (name, + version, + kind) + USING (name, version, kind)); diff --git a/datastore/postgres/v2/queries/indexer/helper_repository_indexartifact.sql b/datastore/postgres/v2/queries/indexer/helper_repository_indexartifact.sql new file mode 100644 index 000000000..501c8254d --- /dev/null +++ b/datastore/postgres/v2/queries/indexer/helper_repository_indexartifact.sql @@ -0,0 +1,35 @@ +WITH input AS ( + SELECT + * + FROM + UNNEST($1::text[], $2::text[], $3::text[], $4::text[]) AS input (name, + key, + uri, + cpe) +), +inserted AS ( +INSERT INTO repo (name, key, uri, cpe) + SELECT + name, + key, + uri, + cpe + FROM + input + ON CONFLICT (name, + key, + uri) + DO NOTHING + RETURNING + id +) +SELECT + id +FROM + inserted +UNION ALL +SELECT + repo.id +FROM + input + JOIN repo USING (name, key, uri); diff --git a/datastore/postgres/v2/queries/indexer/indexdistributions_insertwith.sql b/datastore/postgres/v2/queries/indexer/indexdistributions_insertwith.sql new file mode 100644 index 000000000..2eebab5a8 --- /dev/null +++ b/datastore/postgres/v2/queries/indexer/indexdistributions_insertwith.sql @@ -0,0 +1,50 @@ +WITH distributions AS ( + SELECT + id AS dist_id + FROM + dist + WHERE + name = $1 + AND did = $2 + AND version = $3 + AND version_code_name = $4 + AND version_id = $5 + AND arch = $6 + AND cpe = $7 + AND pretty_name = $8 +), +scanner AS ( + SELECT + id AS scanner_id + FROM + scanner + WHERE + name = $9 + AND version = $10 + AND kind = $11 +), +layer AS ( + SELECT + id AS layer_id + FROM + layer + WHERE + layer.hash = $12) +INSERT INTO dist_scanartifact (layer_id, dist_id, scanner_id) + VALUES (( + SELECT + layer_id + FROM + layer), + ( + SELECT + dist_id + FROM + distributions), + ( + SELECT + scanner_id + FROM + scanner)) + ON CONFLICT + DO NOTHING; diff --git a/datastore/postgres/v2/queries/indexer/indexmanifest_associate.sql b/datastore/postgres/v2/queries/indexer/indexmanifest_associate.sql new file mode 100644 index 000000000..f3c099c34 --- /dev/null +++ b/datastore/postgres/v2/queries/indexer/indexmanifest_associate.sql @@ -0,0 +1,15 @@ +WITH manifests AS ( + SELECT + id AS manifest_id + FROM + manifest + WHERE + hash = $4) +INSERT INTO manifest_index (package_id, dist_id, repo_id, manifest_id) + VALUES ($1, $2, $3, ( + SELECT + manifest_id + FROM + manifests)) +ON CONFLICT + DO NOTHING; diff --git a/datastore/postgres/v2/queries/indexer/indexpackages_associate.sql b/datastore/postgres/v2/queries/indexer/indexpackages_associate.sql new file mode 100644 index 000000000..b22763874 --- /dev/null +++ b/datastore/postgres/v2/queries/indexer/indexpackages_associate.sql @@ -0,0 +1,39 @@ +INSERT INTO package_scanartifact (layer_id, package_db, repository_hint, filepath, package_id, source_id, scanner_id) +SELECT + layer.id, + input.package_db, + input.repository_hint, + input.filepath, + package.id, + source.id, + scanner.id +FROM + UNNEST($1::text[], $2::text[], $3::text[], $4::text[], $5::text[], $6::text[], $7::text[], $8::text[], $9::text[], $10::text[], $15::text[], $16::text[], $17::text[]) AS input (src_name, + src_kind, + src_version, + src_module, + src_arch, + bin_name, + bin_kind, + bin_version, + bin_module, + bin_arch, + package_db, + repository_hint, + filepath) + JOIN layer ON layer.hash = $14::text + JOIN package ON package.name = input.bin_name + AND package.kind = input.bin_kind + AND package.version = input.bin_version + AND package.module = input.bin_module + AND package.arch = input.bin_arch + JOIN package AS source ON source.name = input.src_name + AND source.kind = input.src_kind + AND source.version = input.src_version + AND source.module = input.src_module + AND source.arch = input.src_arch + JOIN scanner ON scanner.name = $11 + AND scanner.version = $12 + AND scanner.kind = $13 + ON CONFLICT + DO NOTHING; diff --git a/datastore/postgres/v2/queries/indexer/indexpackages_insert.sql b/datastore/postgres/v2/queries/indexer/indexpackages_insert.sql new file mode 100644 index 000000000..dde478291 --- /dev/null +++ b/datastore/postgres/v2/queries/indexer/indexpackages_insert.sql @@ -0,0 +1,23 @@ +INSERT INTO package (name, kind, version, norm_kind, norm_version, module, arch) +SELECT + input.name, + input.kind, + input.version, + input.norm_kind, + input.norm_version::int[], + input.module, + input.arch +FROM + UNNEST($1::text[], $2::text[], $3::text[], $4::text[], $5::text[], $6::text[], $7::text[]) AS input (name, + kind, + version, + norm_kind, + norm_version, + module, + arch) +ON CONFLICT (name, + kind, + version, + module, + arch) + DO NOTHING; diff --git a/datastore/postgres/v2/queries/indexer/indexreport_cached.sql b/datastore/postgres/v2/queries/indexer/indexreport_cached.sql new file mode 100644 index 000000000..fa5305b59 --- /dev/null +++ b/datastore/postgres/v2/queries/indexer/indexreport_cached.sql @@ -0,0 +1,7 @@ +SELECT + scan_result +FROM + indexreport + JOIN manifest ON manifest.hash = $1 +WHERE + indexreport.manifest_id = manifest.id; diff --git a/datastore/postgres/v2/queries/indexer/layerscanned_query.sql b/datastore/postgres/v2/queries/indexer/layerscanned_query.sql new file mode 100644 index 000000000..8a5f950af --- /dev/null +++ b/datastore/postgres/v2/queries/indexer/layerscanned_query.sql @@ -0,0 +1,13 @@ +SELECT + EXISTS ( + SELECT + 1 + FROM + scanned_layer + JOIN scanner ON scanner.id = scanned_layer.scanner_id + JOIN layer ON layer.id = scanned_layer.layer_id + WHERE + scanner.name = $1 + AND scanner.version = $2 + AND scanner.kind = $3 + AND layer.hash = $4); diff --git a/datastore/postgres/v2/queries/indexer/manifestscanned_query.sql b/datastore/postgres/v2/queries/indexer/manifestscanned_query.sql new file mode 100644 index 000000000..ab70628c7 --- /dev/null +++ b/datastore/postgres/v2/queries/indexer/manifestscanned_query.sql @@ -0,0 +1,19 @@ +SELECT + bool_and(q.ok) +FROM ( + SELECT + scanner.id IS NOT NULL AS ok + FROM + scanned_manifest + JOIN manifest ON scanned_manifest.manifest_id = manifest.id + LEFT OUTER JOIN ( + SELECT + id + FROM + scanner + JOIN UNNEST($2::text[], $3::text[], $4::text[]) AS input (name, + version, + kind) + USING (name, version, kind)) AS scanner ON scanned_manifest.scanner_id = scanner.id + WHERE + manifest.hash = $1) AS q; diff --git a/datastore/postgres/v2/queries/indexer/packagesbylayer_query.sql b/datastore/postgres/v2/queries/indexer/packagesbylayer_query.sql new file mode 100644 index 000000000..fb0e0dfc3 --- /dev/null +++ b/datastore/postgres/v2/queries/indexer/packagesbylayer_query.sql @@ -0,0 +1,29 @@ +SELECT + package.id::text, + package.name, + package.kind, + package.version, + package.norm_kind, + package.norm_version, + package.module, + package.arch, + source_package.id, + package_scanartifact.package_db, + package_scanartifact.repository_hint, + package_scanartifact.filepath +FROM + package_scanartifact + JOIN layer ON layer.id = package_scanartifact.layer_id + LEFT JOIN package ON package_scanartifact.package_id = package.id + LEFT JOIN package AS source_package ON package_scanartifact.source_id = source_package.id + JOIN ( + SELECT + id + FROM + scanner + JOIN UNNEST($2::text[], $3::text[], $4::text[]) AS input (name, + version, + kind) + USING (name, version, kind)) AS scanner ON scanner.id = package_scanartifact.scanner_id +WHERE + layer.hash = $1; diff --git a/datastore/postgres/v2/queries/indexer/persistmanifest_associate.sql b/datastore/postgres/v2/queries/indexer/persistmanifest_associate.sql new file mode 100644 index 000000000..ce90229f4 --- /dev/null +++ b/datastore/postgres/v2/queries/indexer/persistmanifest_associate.sql @@ -0,0 +1,22 @@ +INSERT INTO manifest_layer (manifest_id, layer_id, i) +SELECT + m.id, + l.id, + l.n +FROM ( + SELECT + id + FROM + manifest + WHERE + hash = $1::text) AS m, + ( + SELECT + id, + n + FROM + layer + JOIN UNNEST($2::text[]) + WITH ORDINALITY AS ls (hash, n) ON layer.hash = ls.hash) AS l +ON CONFLICT + DO NOTHING; diff --git a/datastore/postgres/v2/queries/indexer/persistmanifest_insertlayers.sql b/datastore/postgres/v2/queries/indexer/persistmanifest_insertlayers.sql new file mode 100644 index 000000000..d64e0147a --- /dev/null +++ b/datastore/postgres/v2/queries/indexer/persistmanifest_insertlayers.sql @@ -0,0 +1,7 @@ +INSERT INTO layer (hash) +SELECT + * +FROM + UNNEST($1::text[]) +ON CONFLICT + DO NOTHING; diff --git a/datastore/postgres/v2/queries/indexer/persistmanifest_insertmanifest.sql b/datastore/postgres/v2/queries/indexer/persistmanifest_insertmanifest.sql new file mode 100644 index 000000000..889c57458 --- /dev/null +++ b/datastore/postgres/v2/queries/indexer/persistmanifest_insertmanifest.sql @@ -0,0 +1,4 @@ +INSERT INTO manifest (hash) + VALUES ($1) +ON CONFLICT + DO NOTHING; diff --git a/datastore/postgres/v2/queries/indexer/registerscanners_register.sql b/datastore/postgres/v2/queries/indexer/registerscanners_register.sql new file mode 100644 index 000000000..62e7c26b0 --- /dev/null +++ b/datastore/postgres/v2/queries/indexer/registerscanners_register.sql @@ -0,0 +1,11 @@ +INSERT INTO scanner (name, version, kind) +SELECT + name, + version, + kind +FROM + UNNEST($1::text[], $2::text[], $3::text[]) AS input (name, + version, + kind) +ON CONFLICT + DO NOTHING; diff --git a/datastore/postgres/v2/queries/indexer/repositoriesbylayer_select.sql b/datastore/postgres/v2/queries/indexer/repositoriesbylayer_select.sql new file mode 100644 index 000000000..3f1f0ea11 --- /dev/null +++ b/datastore/postgres/v2/queries/indexer/repositoriesbylayer_select.sql @@ -0,0 +1,21 @@ +SELECT + repo.id, + repo.name, + repo.key, + repo.uri, + repo.cpe +FROM + repo_scanartifact + LEFT JOIN repo ON repo_scanartifact.repo_id = repo.id + JOIN layer ON layer.hash = $1 +WHERE + repo_scanartifact.layer_id = layer.id + AND repo_scanartifact.scanner_id = ANY ( + SELECT + id + FROM + scanner + JOIN UNNEST($2::text[], $3::text[], $4::text[]) AS input (name, + version, + kind) + USING (name, version, kind)); diff --git a/datastore/postgres/v2/queries/indexer/setindexfinished_insertmanifest.sql b/datastore/postgres/v2/queries/indexer/setindexfinished_insertmanifest.sql new file mode 100644 index 000000000..ffe60c385 --- /dev/null +++ b/datastore/postgres/v2/queries/indexer/setindexfinished_insertmanifest.sql @@ -0,0 +1,17 @@ +INSERT INTO scanned_manifest (manifest_id, scanner_id) +SELECT + manifest.id, + scanner.id +FROM + manifest + CROSS JOIN ( + SELECT + id + FROM + scanner + JOIN UNNEST($2::text[], $3::text[], $4::text[]) AS input (name, + version, + kind) + USING (name, version, kind)) AS scanner +WHERE + hash = $1::text; diff --git a/datastore/postgres/v2/queries/indexer/setindexreport_update.sql b/datastore/postgres/v2/queries/indexer/setindexreport_update.sql new file mode 100644 index 000000000..e9988d811 --- /dev/null +++ b/datastore/postgres/v2/queries/indexer/setindexreport_update.sql @@ -0,0 +1,12 @@ +INSERT INTO indexreport (manifest_id, scan_result) +SELECT + manifest.id, + $2::jsonb +FROM + manifest +WHERE + hash = $1::text +ON CONFLICT + (manifest_id) +DO + UPDATE SET scan_result = excluded.scan_result; diff --git a/datastore/postgres/v2/queries/indexer/setlayerscanned_insert.sql b/datastore/postgres/v2/queries/indexer/setlayerscanned_insert.sql new file mode 100644 index 000000000..198bf12aa --- /dev/null +++ b/datastore/postgres/v2/queries/indexer/setlayerscanned_insert.sql @@ -0,0 +1,19 @@ +INSERT INTO scanned_layer (layer_id, scanner_id) + VALUES (( + SELECT + id + FROM + layer + WHERE + hash = $1), ( + SELECT + id + FROM + scanner + WHERE + name = $2 + AND version = $3 + AND kind = $4)) + ON CONFLICT (layer_id, + scanner_id) + DO NOTHING; diff --git a/datastore/postgres/v2/queries_test.go b/datastore/postgres/v2/queries_test.go index 82bb13aee..4215ddeda 100644 --- a/datastore/postgres/v2/queries_test.go +++ b/datastore/postgres/v2/queries_test.go @@ -153,6 +153,40 @@ var ( "0.Plan.0.Plans.1": "usage validated to be over a trivial number of rows", }, */ + "indexer/deletemanifests_layercleanup.sql": { + "0.Plan.Plans.0.Plans.0": "grandfathered", + "0.Plan.Plans.0.Plans.1.Plans.0": "grandfathered", + }, + "indexer/helper_distribution_bylayer.sql": { + "0.Plan.Plans.0.Plans.0": "TODO", + }, + "indexer/distributionsbylayer_query.sql": { + "0.Plan.Plans.0.Plans.0": "TODO", + }, + "indexer/helper_file_bylayer.sql": { + "0.Plan.Plans.0.Plans.0": "TODO", + }, + "indexer/filesbylayer_query.sql": { + "0.Plan.Plans.0.Plans.0": "TODO", + }, + "indexer/helper_distribution_indexartifact.sql": { + "0.Plan.Plans.3.Plans.0": "TODO", + }, + "indexer/helper_file_indexartifact.sql": { + "0.Plan.Plans.3.Plans.0": "TODO", + }, + "indexer/indexpackages_associate.sql": { + "0.Plan.Plans.0.Plans.0.Plans.0.Plans.0.Plans.0": "TODO", + }, + "indexer/helper_repository_indexartifact.sql": { + "0.Plan.Plans.3.Plans.0": "TODO", + }, + "indexer/persistmanifest_associate.sql": { + "0.Plan.Plans.0.Plans.1.Plans.0": "number of rows limited by passed input; select is not in a read query", + }, + "indexer/setindexfinished_insertmanifest.sql": { + "0.Plan.Plans.0.Plans.0.Plans.0": "TODO", + }, "matcher/initialized_initialized.sql": { "0.Plan.Plans.0": "only reads a single row", }, diff --git a/datastore/postgres/v2/query_metadata.go b/datastore/postgres/v2/query_metadata.go index 1fa3a03c3..71f62a707 100644 --- a/datastore/postgres/v2/query_metadata.go +++ b/datastore/postgres/v2/query_metadata.go @@ -7,6 +7,40 @@ var queryMetadata = struct { Op map[string]string }{ Table: map[string]string{ + "indexer/affectedmanifests_selectaffected.sql": "manifest_index", + "indexer/affectedmanifests_selectdist.sql": "dist", + "indexer/affectedmanifests_selectpackages.sql": "package", + "indexer/affectedmanifests_selectrepo.sql": "repo", + "indexer/deletemanifests_deletemanifests.sql": "manifest", + "indexer/deletemanifests_layercleanup.sql": "layer", + "indexer/distributionsbylayer_query.sql": "layer", + "indexer/filesbylayer_query.sql": "layer", + "indexer/helper_cache_indexreport.sql": "indexreport", + "indexer/helper_distribution_associateartifact.sql": "dist_scanartifact", + "indexer/helper_distribution_bylayer.sql": "layer", + "indexer/helper_distribution_indexartifact.sql": "inserted", + "indexer/helper_file_associateartifact.sql": "file_scanartifact", + "indexer/helper_file_bylayer.sql": "layer", + "indexer/helper_file_indexartifact.sql": "inserted", + "indexer/helper_repository_associateartifact.sql": "repo_scanartifact", + "indexer/helper_repository_bylayer.sql": "layer", + "indexer/helper_repository_indexartifact.sql": "inserted", + "indexer/indexdistributions_insertwith.sql": "dist_scanartifact", + "indexer/indexmanifest_associate.sql": "manifest_index", + "indexer/indexpackages_associate.sql": "package_scanartifact", + "indexer/indexpackages_insert.sql": "package", + "indexer/indexreport_cached.sql": "indexreport", + "indexer/layerscanned_query.sql": "layer", + "indexer/manifestscanned_query.sql": "scanned_manifest", + "indexer/packagesbylayer_query.sql": "package", + "indexer/persistmanifest_associate.sql": "manifest_layer", + "indexer/persistmanifest_insertlayers.sql": "layer", + "indexer/persistmanifest_insertmanifest.sql": "manifest", + "indexer/registerscanners_register.sql": "scanner", + "indexer/repositoriesbylayer_select.sql": "layer", + "indexer/setindexfinished_insertmanifest.sql": "scanned_manifest", + "indexer/setindexreport_update.sql": "indexreport", + "indexer/setlayerscanned_insert.sql": "scanned_layer", "matcher/gc_delete_ops.sql": "update_operation", "matcher/gc_distinct.sql": "update_operation", "matcher/gc_eligible.sql": "ordered_ops", @@ -41,6 +75,40 @@ var queryMetadata = struct { "matcher/updatevulnerabilities_refresh.sql": "latest_update_operations", }, Op: map[string]string{ + "indexer/affectedmanifests_selectaffected.sql": "SELECT", + "indexer/affectedmanifests_selectdist.sql": "SELECT", + "indexer/affectedmanifests_selectpackages.sql": "SELECT", + "indexer/affectedmanifests_selectrepo.sql": "SELECT", + "indexer/deletemanifests_deletemanifests.sql": "DELETE", + "indexer/deletemanifests_layercleanup.sql": "DELETE", + "indexer/distributionsbylayer_query.sql": "SELECT", + "indexer/filesbylayer_query.sql": "SELECT", + "indexer/helper_cache_indexreport.sql": "INSERT", + "indexer/helper_distribution_associateartifact.sql": "INSERT", + "indexer/helper_distribution_bylayer.sql": "SELECT", + "indexer/helper_distribution_indexartifact.sql": "SELECT", + "indexer/helper_file_associateartifact.sql": "INSERT", + "indexer/helper_file_bylayer.sql": "SELECT", + "indexer/helper_file_indexartifact.sql": "SELECT", + "indexer/helper_repository_associateartifact.sql": "INSERT", + "indexer/helper_repository_bylayer.sql": "SELECT", + "indexer/helper_repository_indexartifact.sql": "SELECT", + "indexer/indexdistributions_insertwith.sql": "INSERT", + "indexer/indexmanifest_associate.sql": "INSERT", + "indexer/indexpackages_associate.sql": "INSERT", + "indexer/indexpackages_insert.sql": "INSERT", + "indexer/indexreport_cached.sql": "SELECT", + "indexer/layerscanned_query.sql": "SELECT", + "indexer/manifestscanned_query.sql": "SELECT", + "indexer/packagesbylayer_query.sql": "SELECT", + "indexer/persistmanifest_associate.sql": "INSERT", + "indexer/persistmanifest_insertlayers.sql": "INSERT", + "indexer/persistmanifest_insertmanifest.sql": "INSERT", + "indexer/registerscanners_register.sql": "INSERT", + "indexer/repositoriesbylayer_select.sql": "SELECT", + "indexer/setindexfinished_insertmanifest.sql": "INSERT", + "indexer/setindexreport_update.sql": "INSERT", + "indexer/setlayerscanned_insert.sql": "INSERT", "matcher/gc_delete_ops.sql": "DELETE", "matcher/gc_distinct.sql": "SELECT", "matcher/gc_eligible.sql": "SELECT",