Skip to content

Commit

Permalink
postgres/v2: Matcher implementation
Browse files Browse the repository at this point in the history
Signed-off-by: Hank Donnay <[email protected]>
  • Loading branch information
hdonnay committed Jan 8, 2024
1 parent 2bad781 commit deacead
Show file tree
Hide file tree
Showing 46 changed files with 3,144 additions and 2 deletions.
136 changes: 136 additions & 0 deletions datastore/postgres/v2/matcher_v1.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,75 @@
package postgres

import (
"context"
"fmt"
"runtime"
"sync/atomic"

"github.com/google/uuid"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/jackc/pgx/v5/stdlib"
"github.com/quay/zlog"
"github.com/remind101/migrate"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"

"github.com/quay/claircore/datastore"
"github.com/quay/claircore/datastore/postgres/migrations"
)

// NewMatcherV1 returns a configured [MatcherV1].
//
// The passed [pgxpool.Config] will have its tracing and lifecycle hooks
// overwritten.
//
// Values that can be used as MatcherOptions:
// - [WithMigrations]
// - [WithMinimumMigration]
func NewMatcherV1(ctx context.Context, cfg *pgxpool.Config, opt ...MatcherOption) (*MatcherV1, error) {
const prefix = `matcher`
var mCfg matcherConfig
for _, o := range opt {
mCfg = o.matcherConfig(mCfg)
}

if mCfg.Migrations {
cfg := cfg.ConnConfig.Copy()
cfg.DefaultQueryExecMode = pgx.QueryExecModeExec
err := func() error {
db := stdlib.OpenDB(*cfg)
defer db.Close()
migrator := migrate.NewPostgresMigrator(db)
migrator.Table = migrations.MatcherMigrationTable
err := migrator.Exec(migrate.Up, migrations.MatcherMigrations...)
if err != nil {
return fmt.Errorf("failed to perform migrations: %w", err)
}
return nil
}()
if err != nil {
return nil, err
}
}
var s MatcherV1
if err := s.init(ctx, cfg, prefix); err != nil {
return nil, err

}

if err := s.checkRevision(ctx, pgx.Identifier([]string{migrations.MatcherMigrationTable}), mCfg.MinMigration); err != nil {
return nil, err
}

_, file, line, _ := runtime.Caller(1)
runtime.SetFinalizer(&s, func(s *MatcherV1) {
panic(fmt.Sprintf("%s:%d: MatcherV1 not closed", file, line))
})

return &s, nil
}

type MatcherOption interface {
matcherConfig(matcherConfig) matcherConfig
}
Expand All @@ -15,3 +85,69 @@ func newMatcherConfig() matcherConfig {
MinMigration: MinimumMatcherMigration,
}
}

// MatcherV1 implements all the relevant interfaces in the datastore package
type MatcherV1 struct {
storeCommon
// Initialized is used as an atomic bool for tracking initialization.
initialized uint32
}

var _ datastore.MatcherV1 = (*MatcherV1)(nil)

// DeleteUpdateOperations implements [datastore.MatcherV1Updater].
func (s *MatcherV1) DeleteUpdateOperations(ctx context.Context, id ...uuid.UUID) (int64, error) {
const query = `DELETE FROM update_operation WHERE ref = ANY($1::uuid[]);`
ctx = zlog.ContextWithValues(ctx, "component", "internal/vulnstore/postgres/deleteUpdateOperations")
if len(id) == 0 {
return 0, nil
}

// Pgx seems unwilling to do the []uuid.UUID → uuid[] conversion, so we're
// forced to make some garbage here.
refStr := make([]string, len(id))
for i := range id {
refStr[i] = id[i].String()
}
tag, err := s.pool.Exec(ctx, query, refStr)
if err != nil {
return 0, fmt.Errorf("failed to delete: %w", err)
}
return tag.RowsAffected(), nil
}

// Initialized implements [datastore.MatcherV1].
func (s *MatcherV1) Initialized(ctx context.Context) (ok bool, err error) {
ctx, done := s.method(ctx, &err)
defer done()
span := trace.SpanFromContext(ctx)
ok = atomic.LoadUint32(&s.initialized) != 0
span.AddEvent(`loaded`, trace.WithAttributes(attribute.Bool("value", ok)))
if ok {
return true, nil
}

err = s.pool.AcquireFunc(ctx, s.acquire(ctx, `initialized`, func(ctx context.Context, c *pgxpool.Conn, query string) error {
return c.QueryRow(ctx, query).Scan(&ok)
}))
if err != nil {
return false, err
}

span.AddEvent(`initialized`, trace.WithAttributes(attribute.Bool("value", ok)))
// There were no rows when we looked, so report that. Don't update the bool,
// because it's in the 'false' state or another goroutine has read from the
// database and will want to swap in 'true'.
if !ok {
return false, nil
}
// If this fails, it means a concurrent goroutine already swapped. Any
// subsequent calls will see the 'true' value.
atomic.CompareAndSwapUint32(&s.initialized, 0, 1)
return true, nil
}

func (s *MatcherV1) Close() error {
runtime.SetFinalizer(s, nil)
return s.storeCommon.Close()
}
152 changes: 152 additions & 0 deletions datastore/postgres/v2/matcher_v1_enrichment.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
package postgres

import (
"context"
"crypto/md5"
"errors"
"fmt"
"io"
"sort"

"github.com/google/uuid"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/quay/zlog"

"github.com/quay/claircore/libvuln/driver"
)

// UpdateEnrichments creates a new UpdateOperation, inserts the provided
// EnrichmentRecord(s), and ensures enrichments from previous updates are not
// queried by clients.
func (s *MatcherV1) UpdateEnrichments(ctx context.Context, name string, fp driver.Fingerprint, es []driver.EnrichmentRecord) (_ uuid.UUID, err error) {
ctx, done := s.method(ctx, &err)
defer done()

type digestPair struct {
Kind string
Digest []byte
}
hashes := make([]digestPair, len(es))
func() {
_, span := tracer.Start(ctx, `doHashes`)
defer span.End()
for i := range es {
hashes[i].Kind, hashes[i].Digest = hashEnrichment(&es[i])
}
}()

var ref uuid.UUID
err = pgx.BeginTxFunc(ctx, s.pool, txRW, s.tx(ctx, `UpdateEnrichments`, func(ctx context.Context, tx pgx.Tx) (err error) {
var id uint64
err = pgx.BeginFunc(ctx, tx, s.call(ctx, `create`, func(ctx context.Context, tx pgx.Tx, query string) error {
if err := tx.QueryRow(ctx, query, name, string(fp)).Scan(&id, &ref); err != nil {
return err
}
return nil
}))
if err != nil {
return fmt.Errorf("unable to create enrichment update operation: %w", err)
}
zlog.Debug(ctx).
Str("ref", ref.String()).
Msg("update_operation created")

err = pgx.BeginFunc(ctx, tx, s.call(ctx, `insert`, func(ctx context.Context, tx pgx.Tx, query string) error {
var batch pgx.Batch
for i := range es {
batch.Queue(query, hashes[i].Kind, hashes[i].Digest, name, es[i].Tags, es[i].Enrichment)
}
res := tx.SendBatch(ctx, &batch)
defer res.Close()
for range es {
if _, err := res.Exec(); err != nil {
return err
}
}
return nil
}))
if err != nil {
return fmt.Errorf("unable to insert enrichments: %w", err)
}

err = pgx.BeginFunc(ctx, tx, s.call(ctx, `associate`, func(ctx context.Context, tx pgx.Tx, query string) error {
var batch pgx.Batch
for i := range es {
batch.Queue(query, hashes[i].Kind, hashes[i].Digest, name, id)
}
res := tx.SendBatch(ctx, &batch)
defer res.Close()
for range es {
if _, err := res.Exec(); err != nil {
return err
}
}
return nil
}))
if err != nil {
return fmt.Errorf("unable to associate enrichments: %w", err)
}

return nil
}))
if err != nil {
return uuid.Nil, err
}

zlog.Debug(ctx).
Stringer("ref", ref).
Int("inserted", len(es)).
Msg("update_operation committed")

_ = s.pool.AcquireFunc(ctx, s.acquire(ctx, `refresh`, func(ctx context.Context, c *pgxpool.Conn, query string) error {
if _, err := c.Exec(ctx, query); err != nil {
// TODO(hank) Log?
return fmt.Errorf("unable to refresh update operations view: %w", err)
}
return nil
}))

return ref, nil
}

func hashEnrichment(r *driver.EnrichmentRecord) (k string, d []byte) {
h := md5.New()
sort.Strings(r.Tags)
for _, t := range r.Tags {
io.WriteString(h, t)
h.Write([]byte("\x00"))
}
h.Write(r.Enrichment)
return "md5", h.Sum(nil)
}

func (s *MatcherV1) GetEnrichment(ctx context.Context, name string, tags []string) (res []driver.EnrichmentRecord, err error) {
ctx, done := s.method(ctx, &err)
defer done()

res = make([]driver.EnrichmentRecord, 0, 8) // Guess at capacity.
err = pgx.BeginTxFunc(ctx, s.pool, txRO, s.call(ctx, `get`, func(ctx context.Context, tx pgx.Tx, query string) (err error) {
rows, err := tx.Query(ctx, query, name, tags)
if err != nil {
return err
}
for rows.Next() {
i := len(res)
res = append(res, driver.EnrichmentRecord{})
r := &res[i]
if err := rows.Scan(&r.Tags, &r.Enrichment); err != nil {
return err
}
}
return rows.Err()
}))
switch {
case errors.Is(err, nil):
case errors.Is(err, pgx.ErrNoRows):
return nil, nil
default:
return nil, err
}
return res, nil
}
Loading

0 comments on commit deacead

Please sign in to comment.