Skip to content

Commit

Permalink
postgres/v2: Indexer implementation
Browse files Browse the repository at this point in the history
Signed-off-by: Hank Donnay <[email protected]>
  • Loading branch information
hdonnay committed Nov 20, 2023
1 parent de6e170 commit 3df8829
Show file tree
Hide file tree
Showing 50 changed files with 3,350 additions and 0 deletions.
91 changes: 91 additions & 0 deletions datastore/postgres/v2/indexer_v1.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,66 @@
package postgres

import (
"context"
"fmt"
"runtime"

"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/jackc/pgx/v5/stdlib"
"github.com/remind101/migrate"

"github.com/quay/claircore/datastore/postgres/migrations"
"github.com/quay/claircore/indexer"
)

// NewIndexerV1 returns a configured [IndexerV1].
//
// The passed [pgxpool.Config] will have its tracing and lifecycle hooks
// overwritten.
//
// Values that can be used as IndexerOptions:
// - [WithMigrations]
func NewIndexerV1(ctx context.Context, cfg *pgxpool.Config, opt ...IndexerOption) (*IndexerV1, error) {
const prefix = `indexer`
var idxCfg indexerConfig
for _, o := range opt {
idxCfg = o.indexerConfig(idxCfg)
}

if idxCfg.Migrations {
cfg := cfg.ConnConfig.Copy()
cfg.DefaultQueryExecMode = pgx.QueryExecModeExec
err := func() error {
db := stdlib.OpenDB(*cfg)
defer db.Close()
migrator := migrate.NewPostgresMigrator(db)
migrator.Table = migrations.IndexerMigrationTable
err := migrator.Exec(migrate.Up, migrations.IndexerMigrations...)
if err != nil {
return fmt.Errorf("failed to perform migrations: %w", err)
}

Check warning on line 42 in datastore/postgres/v2/indexer_v1.go

View check run for this annotation

Codecov / codecov/patch

datastore/postgres/v2/indexer_v1.go#L41-L42

Added lines #L41 - L42 were not covered by tests
return nil
}()
if err != nil {
return nil, err
}

Check warning on line 47 in datastore/postgres/v2/indexer_v1.go

View check run for this annotation

Codecov / codecov/patch

datastore/postgres/v2/indexer_v1.go#L46-L47

Added lines #L46 - L47 were not covered by tests
}

var s IndexerV1
var err error
if err = s.init(ctx, cfg, prefix); err != nil {
return nil, err
}

Check warning on line 54 in datastore/postgres/v2/indexer_v1.go

View check run for this annotation

Codecov / codecov/patch

datastore/postgres/v2/indexer_v1.go#L53-L54

Added lines #L53 - L54 were not covered by tests

_, file, line, _ := runtime.Caller(1)
runtime.SetFinalizer(&s, func(s *IndexerV1) {
panic(fmt.Sprintf("%s:%d: IndexerV1 not closed", file, line))

Check warning on line 58 in datastore/postgres/v2/indexer_v1.go

View check run for this annotation

Codecov / codecov/patch

datastore/postgres/v2/indexer_v1.go#L58

Added line #L58 was not covered by tests
})

return &s, nil
}

// IndexerOption is an option for configuring an indexer datastore.
type IndexerOption interface {
indexerConfig(indexerConfig) indexerConfig
Expand All @@ -9,3 +70,33 @@ type IndexerOption interface {
type indexerConfig struct {
Migrations bool
}

// Static assertion for the [indexer.Store] interface.
var _ indexer.Store = (*IndexerV1)(nil)

// IndexerV1 implements [indexer.Store] backed by a PostgreSQL database.
type IndexerV1 struct {
storeCommon
}

// Close implements [indexer.Store].
func (s *IndexerV1) Close(_ context.Context) error {
runtime.SetFinalizer(s, nil)
return s.storeCommon.Close()
}

// RegisterScanners is a bad name.
func (s *IndexerV1) RegisterScanners(ctx context.Context, vs indexer.VersionedScanners) (err error) {
ctx, done := s.method(ctx, &err)
defer done()
rvs := rotateVersionedScanners(vs)

err = s.pool.AcquireFunc(ctx, s.acquire(ctx, `register`, func(ctx context.Context, c *pgxpool.Conn, query string) (err error) {
_, err = c.Exec(ctx, query, rvs.Name, rvs.Version, rvs.Kind)
return err
}))
if err != nil {
return err
}

Check warning on line 100 in datastore/postgres/v2/indexer_v1.go

View check run for this annotation

Codecov / codecov/patch

datastore/postgres/v2/indexer_v1.go#L99-L100

Added lines #L99 - L100 were not covered by tests
return nil
}
267 changes: 267 additions & 0 deletions datastore/postgres/v2/indexer_v1_affectedmanifests.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,267 @@
package postgres

import (
"context"
"errors"
"fmt"
"runtime/pprof"
"strconv"

"github.com/jackc/pgx/v5"
"github.com/quay/zlog"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"

"github.com/quay/claircore"
)

var (
// ErrNotIndexed indicates the vulnerability being queried has a dist or repo not
// indexed into the database.
ErrNotIndexed = fmt.Errorf("vulnerability containers data not indexed by any scannners")
)

// AffectedManifests finds the manifests digests which are affected by the provided vulnerability.
//
// An exhaustive search for all indexed packages of the same name as the vulnerability is performed.
//
// The list of packages is filtered down to only the affected set.
//
// The manifest index is then queried to resolve a list of manifest hashes containing the affected
// artifacts.
func (s *IndexerV1) AffectedManifests(ctx context.Context, v claircore.Vulnerability, vulnFunc claircore.CheckVulnernableFunc) (_ []claircore.Digest, err error) {
ctx, done := s.method(ctx, &err)
defer done()
ctx = zlog.ContextWithValues(ctx, "vulnerability", v.Name)

out := []claircore.Digest{}
err = pgx.BeginTxFunc(ctx, s.pool, pgx.TxOptions{AccessMode: pgx.ReadOnly},
s.tx(ctx, `AffectedManifests`, func(ctx context.Context, tx pgx.Tx) (err error) {
var pr claircore.IndexRecord
span := trace.SpanFromContext(ctx)

err = pgx.BeginFunc(ctx, tx, s.tx(ctx, `protoRecord`, s.protoRecordCall(&pr, v)))
switch {
case err == nil:
case errors.Is(err, ErrNotIndexed):
// This is a common case: the system knows of a vulnerability but
// doesn't know of any manifests it could apply to.
zlog.Debug(ctx).Msg("not indexed")
trace.SpanFromContext(ctx).SetStatus(codes.Ok, "not indexed")
return nil
default:
return err

Check warning on line 54 in datastore/postgres/v2/indexer_v1_affectedmanifests.go

View check run for this annotation

Codecov / codecov/patch

datastore/postgres/v2/indexer_v1_affectedmanifests.go#L47-L54

Added lines #L47 - L54 were not covered by tests
}

// Collect all packages which may be affected by the vulnerability
// in question.
pkgsToFilter := []claircore.Package{}

err = pgx.BeginFunc(ctx, tx,
s.call(ctx, `selectPackages`, func(ctx context.Context, tx pgx.Tx, query string) error {
rows, err := tx.Query(ctx, query, v.Package.Name)
if err != nil {
return fmt.Errorf("vulnerability %q: %w", v.ID, err)
}

Check warning on line 66 in datastore/postgres/v2/indexer_v1_affectedmanifests.go

View check run for this annotation

Codecov / codecov/patch

datastore/postgres/v2/indexer_v1_affectedmanifests.go#L65-L66

Added lines #L65 - L66 were not covered by tests
defer rows.Close()

for rows.Next() {
var pkg claircore.Package
var id int64
var nKind *string
err := rows.Scan(
&id,
&pkg.Name,
&pkg.Version,
&pkg.Kind,
&nKind,
&pkg.NormalizedVersion,
&pkg.Module,
&pkg.Arch,
)
if err != nil {
return fmt.Errorf("unmarshal error: %w", err)
}

Check warning on line 85 in datastore/postgres/v2/indexer_v1_affectedmanifests.go

View check run for this annotation

Codecov / codecov/patch

datastore/postgres/v2/indexer_v1_affectedmanifests.go#L84-L85

Added lines #L84 - L85 were not covered by tests
idStr := strconv.FormatInt(id, 10)
pkg.ID = idStr
if nKind != nil {
pkg.NormalizedVersion.Kind = *nKind
}
pkgsToFilter = append(pkgsToFilter, pkg)
}
trace.SpanFromContext(ctx).
AddEvent("loaded packages", trace.WithAttributes(attribute.Int("count", len(pkgsToFilter))))
zlog.Debug(ctx).Int("count", len(pkgsToFilter)).Msg("packages to filter")
if err := rows.Err(); err != nil {
return fmt.Errorf("error reading response: %w", err)
}

Check warning on line 98 in datastore/postgres/v2/indexer_v1_affectedmanifests.go

View check run for this annotation

Codecov / codecov/patch

datastore/postgres/v2/indexer_v1_affectedmanifests.go#L97-L98

Added lines #L97 - L98 were not covered by tests
return nil
}))
if err != nil {
return fmt.Errorf("unable to select packages: %w", err)
}

Check warning on line 103 in datastore/postgres/v2/indexer_v1_affectedmanifests.go

View check run for this annotation

Codecov / codecov/patch

datastore/postgres/v2/indexer_v1_affectedmanifests.go#L102-L103

Added lines #L102 - L103 were not covered by tests

// for each package discovered create an index record
// and determine if any in-tree matcher finds the record vulnerable
var filteredRecords []claircore.IndexRecord
for i := range pkgsToFilter {
pkg := &pkgsToFilter[i]
pr.Package = pkg
var match bool
var err error
pprof.Do(ctx, pprof.Labels("hook", "CheckVulnFunc"), func(ctx context.Context) {
match, err = vulnFunc(ctx, &pr, &v)
})
if err != nil {
return fmt.Errorf("error in check vulnerable hook: %w", err)
}

Check warning on line 118 in datastore/postgres/v2/indexer_v1_affectedmanifests.go

View check run for this annotation

Codecov / codecov/patch

datastore/postgres/v2/indexer_v1_affectedmanifests.go#L117-L118

Added lines #L117 - L118 were not covered by tests
if match {
filteredRecords = append(filteredRecords, claircore.IndexRecord{
Package: pkg,
Distribution: pr.Distribution,
Repository: pr.Repository,
})
}
}
span.AddEvent("filtered packages", trace.WithAttributes(attribute.Int("count", len(filteredRecords))))
zlog.Debug(ctx).Int("count", len(filteredRecords)).Msg("vulnerable index records")
// Query the manifest index for manifests containing the vulnerable
// IndexRecords and create a set containing each unique manifest.
set := map[string]struct{}{}
selectAffected := func(id string, dist, repo *uint64) callFunc {
return func(ctx context.Context, tx pgx.Tx, query string) error {
rows, err := tx.Query(ctx, query, id, dist, repo)
if err != nil {
return err
}

Check warning on line 137 in datastore/postgres/v2/indexer_v1_affectedmanifests.go

View check run for this annotation

Codecov / codecov/patch

datastore/postgres/v2/indexer_v1_affectedmanifests.go#L136-L137

Added lines #L136 - L137 were not covered by tests
defer rows.Close()
for rows.Next() {
var hash string
if err := rows.Scan(&hash); err != nil {
return err
}

Check warning on line 143 in datastore/postgres/v2/indexer_v1_affectedmanifests.go

View check run for this annotation

Codecov / codecov/patch

datastore/postgres/v2/indexer_v1_affectedmanifests.go#L142-L143

Added lines #L142 - L143 were not covered by tests
if _, ok := set[hash]; ok {
continue
}
set[hash] = struct{}{}
i := len(out)
out = append(out, claircore.Digest{})
if err := out[i].UnmarshalText([]byte(hash)); err != nil {
return err
}

Check warning on line 152 in datastore/postgres/v2/indexer_v1_affectedmanifests.go

View check run for this annotation

Codecov / codecov/patch

datastore/postgres/v2/indexer_v1_affectedmanifests.go#L151-L152

Added lines #L151 - L152 were not covered by tests
}
return rows.Err()
}
}

for _, record := range filteredRecords {
v, err := toValues(record)
if err != nil {
return fmt.Errorf("failed to get sql values for query: %w", err)
}

Check warning on line 162 in datastore/postgres/v2/indexer_v1_affectedmanifests.go

View check run for this annotation

Codecov / codecov/patch

datastore/postgres/v2/indexer_v1_affectedmanifests.go#L161-L162

Added lines #L161 - L162 were not covered by tests
err = pgx.BeginFunc(ctx, tx, s.call(ctx, `selectAffected`, selectAffected(record.Package.ID, v[2], v[3])))
switch {
case errors.Is(err, nil):
default:
return fmt.Errorf("error selecting affected: %w", err)

Check warning on line 167 in datastore/postgres/v2/indexer_v1_affectedmanifests.go

View check run for this annotation

Codecov / codecov/patch

datastore/postgres/v2/indexer_v1_affectedmanifests.go#L166-L167

Added lines #L166 - L167 were not covered by tests
}
}

span.AddEvent("affected manifests", trace.WithAttributes(attribute.Int("count", len(out))))
zlog.Debug(ctx).Int("count", len(out)).Msg("affected manifests")
return nil
}))
if err != nil {
return nil, err
}

Check warning on line 177 in datastore/postgres/v2/indexer_v1_affectedmanifests.go

View check run for this annotation

Codecov / codecov/patch

datastore/postgres/v2/indexer_v1_affectedmanifests.go#L176-L177

Added lines #L176 - L177 were not covered by tests
return out, nil
}

func (s *IndexerV1) protoRecordCall(out *claircore.IndexRecord, v claircore.Vulnerability) txFunc {
return func(ctx context.Context, tx pgx.Tx) error {
// fill dist into prototype index record if exists
if (v.Dist != nil) && (v.Dist.Name != "") {
const name = `selectDist`
var did int64
err := pgx.BeginFunc(ctx, tx, s.call(ctx, name, protoRecordSelectDist(&did, v.Dist)))
switch {
case errors.Is(err, nil):
id := strconv.FormatInt(did, 10)
out.Distribution = &claircore.Distribution{
ID: id,
Arch: v.Dist.Arch,
CPE: v.Dist.CPE,
DID: v.Dist.DID,
Name: v.Dist.Name,
PrettyName: v.Dist.PrettyName,
Version: v.Dist.Version,
VersionCodeName: v.Dist.VersionCodeName,
VersionID: v.Dist.VersionID,
}
zlog.Debug(ctx).Str("id", id).Msg("discovered distribution id")
case errors.Is(err, pgx.ErrNoRows):

Check warning on line 203 in datastore/postgres/v2/indexer_v1_affectedmanifests.go

View check run for this annotation

Codecov / codecov/patch

datastore/postgres/v2/indexer_v1_affectedmanifests.go#L203

Added line #L203 was not covered by tests
// OK
default:
return fmt.Errorf("failed to scan dist: %w", err)

Check warning on line 206 in datastore/postgres/v2/indexer_v1_affectedmanifests.go

View check run for this annotation

Codecov / codecov/patch

datastore/postgres/v2/indexer_v1_affectedmanifests.go#L205-L206

Added lines #L205 - L206 were not covered by tests
}
} else {
zlog.Debug(ctx).Msg("no distribution")
}

// fill repo into prototype index record if exists
if (v.Repo != nil) && (v.Repo.Name != "") {
const name = `selectRepo`
var rid int64
err := pgx.BeginFunc(ctx, tx, s.call(ctx, name, protoRecordSelectRepo(&rid, v.Repo)))
switch {
case errors.Is(err, nil):
id := strconv.FormatInt(rid, 10)
out.Repository = &claircore.Repository{
ID: id,
Key: v.Repo.Key,
Name: v.Repo.Name,
URI: v.Repo.URI,
}
zlog.Debug(ctx).Str("id", id).Msg("discovered repo id")
case errors.Is(err, pgx.ErrNoRows):

Check warning on line 227 in datastore/postgres/v2/indexer_v1_affectedmanifests.go

View check run for this annotation

Codecov / codecov/patch

datastore/postgres/v2/indexer_v1_affectedmanifests.go#L227

Added line #L227 was not covered by tests
// OK
default:
return fmt.Errorf("failed to scan repo: %w", err)

Check warning on line 230 in datastore/postgres/v2/indexer_v1_affectedmanifests.go

View check run for this annotation

Codecov / codecov/patch

datastore/postgres/v2/indexer_v1_affectedmanifests.go#L229-L230

Added lines #L229 - L230 were not covered by tests
}
} else {
zlog.Debug(ctx).Msg("no repository")
}

// we need at least a repo or distribution to continue
if (out.Distribution == nil) && (out.Repository == nil) {
return ErrNotIndexed
}

Check warning on line 239 in datastore/postgres/v2/indexer_v1_affectedmanifests.go

View check run for this annotation

Codecov / codecov/patch

datastore/postgres/v2/indexer_v1_affectedmanifests.go#L238-L239

Added lines #L238 - L239 were not covered by tests
return nil
}
}

func protoRecordSelectDist(out *int64, d *claircore.Distribution) callFunc {
return func(ctx context.Context, tx pgx.Tx, query string) error {
return tx.QueryRow(ctx, query,
d.Arch,
d.CPE,
d.DID,
d.Name,
d.PrettyName,
d.Version,
d.VersionCodeName,
d.VersionID,
).Scan(out)
}
}

func protoRecordSelectRepo(out *int64, r *claircore.Repository) callFunc {
return func(ctx context.Context, tx pgx.Tx, query string) error {
return tx.QueryRow(ctx, query,
r.Name,
r.Key,
r.URI,
).Scan(out)
}
}
Loading

0 comments on commit 3df8829

Please sign in to comment.