diff --git a/.github/workflows/ci-e2e-no-metrics-tests.yml b/.github/workflows/ci-e2e-no-metrics-tests.yml new file mode 100644 index 0000000..77e483a --- /dev/null +++ b/.github/workflows/ci-e2e-no-metrics-tests.yml @@ -0,0 +1,32 @@ +name: Continuous Integration (E2E Testing Checks without metrics database) + +on: + workflow_call: +jobs: + e2e-no-metrics-test: + runs-on: ubuntu-latest + steps: + - name: checkout repo from current commit + uses: actions/checkout@v3 + - name: set up Go + uses: actions/setup-go@v3 + with: + go-version: "1.21" + check-latest: true + cache: false + - name: pull pre-built images + run: sudo docker compose -f ci.docker-compose.yml pull + - name: build and start proxy service and it's dependencies + # We need to provide additional env file to override the METRIC_DATABASE_ENABLED variable, not via env variable. + # Mentioned here: https://github.com/docker/compose/issues/9737 + run: sudo docker compose -f ci.docker-compose.yml --env-file .env --env-file no_metric.env up -d --build proxy redis + - name: wait for proxy service to be running + run: bash ${GITHUB_WORKSPACE}/scripts/wait-for-proxy-service-running.sh + env: + PROXY_CONTAINER_PORT: 7777 + - name: run e2e tests + run: SKIP_METRICS=true make e2e-test + - name: print proxy service logs + run: sudo docker compose -f ci.docker-compose.yml logs proxy + # because we especially want the logs if the test(s) fail 😅 + if: always() diff --git a/.github/workflows/ci-main.yml b/.github/workflows/ci-main.yml index 5e5eb5c..004b310 100644 --- a/.github/workflows/ci-main.yml +++ b/.github/workflows/ci-main.yml @@ -11,10 +11,14 @@ jobs: # run default ci checks against main branch default-checks: uses: ./.github/workflows/ci-default.yml - # run e2e testing ci checks against main branch + # run e2e testing ci for internal testnet checks against main branch e2e-tests: needs: [lint-checks, default-checks] uses: ./.github/workflows/ci-e2e-tests.yml + # run e2e testing without metrics db ci for internal testnet checks against main branch + e2e-no-metrics-tests: + needs: [lint-checks, default-checks] + uses: ./.github/workflows/ci-e2e-no-metrics-tests.yml # build, tag and publish new service docker images release-docker-images: needs: [e2e-tests] diff --git a/.github/workflows/ci-pr.yml b/.github/workflows/ci-pr.yml index d00f7ff..9a0b81b 100644 --- a/.github/workflows/ci-pr.yml +++ b/.github/workflows/ci-pr.yml @@ -11,3 +11,5 @@ jobs: uses: ./.github/workflows/ci-default.yml e2e-tests: uses: ./.github/workflows/ci-e2e-tests.yml + e2e-no-metrics-tests: + uses: ./.github/workflows/ci-e2e-no-metrics-tests.yml diff --git a/DEVELOPMENT.md b/DEVELOPMENT.md index 06e54e9..cb470d1 100644 --- a/DEVELOPMENT.md +++ b/DEVELOPMENT.md @@ -131,7 +131,7 @@ make it p=".*Eth_getBlockByNumberRequest" ## Migrations -On startup the proxy service will run any SQL based migration in the [migrations folder](./clients/database/migrations) that haven't already been run against the database being used. +On startup the proxy service will run any SQL based migration in the [migrations folder](clients/database/postgres/migrations) that haven't already been run against the database being used. For lower level details on how the migration process works consult [these docs](https://bun.uptrace.dev/guide/migrations.html). @@ -144,7 +144,7 @@ $ date '+%Y%m%d%H%M%S' > 20230306182227 ``` -Add new SQL file with commands to run in the new migration (add/delete/modify tables and or indices) in the in the [migrations folder](./clients/database/migrations) +Add new SQL file with commands to run in the new migration (add/delete/modify tables and or indices) in the in the [migrations folder](clients/database/postgres/migrations) ### Running migrations diff --git a/Makefile b/Makefile index 6e3608c..f9c3a2b 100644 --- a/Makefile +++ b/Makefile @@ -44,6 +44,11 @@ unit-test: e2e-test: go test -count=1 -v -cover -coverprofile cover.out --race ./... -run "^TestE2ETest*" +.PHONY: e2e-test-no-metrics +# run tests that execute against a local or remote instance of the API without database for metrics +e2e-test-no-metrics: + SKIP_METRICS=true go test -count=1 -v -cover -coverprofile cover.out --race ./... -run "^TestE2ETest*" + .PHONY: ci-setup # set up your local environment such that running `make e2e-test` runs against testnet (like in CI) ci-setup: diff --git a/architecture/ARCHITECTURE.md b/architecture/ARCHITECTURE.md index dfe953b..35f8e0e 100644 --- a/architecture/ARCHITECTURE.md +++ b/architecture/ARCHITECTURE.md @@ -20,7 +20,7 @@ The proxy functionality provides the foundation for all other proxy service feat ![API Observability Worfklow Conceptual Overview](./images/observability_workflow_conceptual.jpg) -For every request that is proxied by the proxy service, a [request metric](../decode/evm_rpc.go) is created and stored in a [postgres table](../clients/database/migrations/20230306182203_add_proxied_request_metrics_table.up.sql) that can be aggregated with other request metrics over a time range to answer ad hoc questions such as: +For every request that is proxied by the proxy service, a [request metric](../decode/evm_rpc.go) is created and stored in a [postgres table](../clients/database/postgres/migrations/20230306182203_add_proxied_request_metrics_table.up.sql) that can be aggregated with other request metrics over a time range to answer ad hoc questions such as: - what methods take the longest time? - what methods are called the most frequently? diff --git a/architecture/MIGRATIONS.MD b/architecture/MIGRATIONS.MD index 419b609..8fb2716 100644 --- a/architecture/MIGRATIONS.MD +++ b/architecture/MIGRATIONS.MD @@ -14,7 +14,7 @@ Setting an environment variable named `RUN_DATABASE_MIGRATIONS` to true will cau ### Migration Format -New migration files must be placed in the [migrations directory](../clients/database/migrations/), have a unique name, and start with a timestamp in the below format: +New migration files must be placed in the [migrations directory](../clients/database/postgres/migrations/), have a unique name, and start with a timestamp in the below format: ```bash $ date '+%Y%m%d%H%M%S' diff --git a/ci.docker-compose.yml b/ci.docker-compose.yml index 51e22b1..bf52bfc 100644 --- a/ci.docker-compose.yml +++ b/ci.docker-compose.yml @@ -2,7 +2,7 @@ services: # run postgres for proxy service to store observability metrics postgres: - image: postgres:15 + image: postgres:13.12 env_file: .env ports: - "${POSTGRES_HOST_PORT}:${POSTGRES_CONTAINER_PORT}" @@ -32,6 +32,9 @@ services: # fake the shards by defining shards with existing backends PROXY_SHARD_BACKEND_HOST_URL_MAP: localhost:7777>10|https://evmrpcdata.internal.testnet.proxy.kava.io|20|https://evmrpcdata.internal.testnet.proxy.kava.io EVM_QUERY_SERVICE_URL: https://evmrpc.internal.testnet.proxy.kava.io + # we need the metric to be used from no_metric.env or by default set up as true, so to test the metrics collection. + # doesn't work with the env variable, so need env file. Mentioned here: https://github.com/docker/compose/issues/9737 + METRIC_DATABASE_ENABLED: "${METRIC_DATABASE_ENABLED}" ports: - "${PROXY_HOST_PORT}:${PROXY_CONTAINER_PORT}" - "${TEST_UNCONFIGURED_PROXY_PORT}:${PROXY_CONTAINER_PORT}" diff --git a/clients/database/interface.go b/clients/database/interface.go new file mode 100644 index 0000000..666b6db --- /dev/null +++ b/clients/database/interface.go @@ -0,0 +1,35 @@ +package database + +import ( + "context" + "time" +) + +// MetricsDatabase is an interface for interacting with the database +type MetricsDatabase interface { + SaveProxiedRequestMetric(ctx context.Context, metric *ProxiedRequestMetric) error + ListProxiedRequestMetricsWithPagination(ctx context.Context, cursor int64, limit int) ([]*ProxiedRequestMetric, int64, error) + CountAttachedProxiedRequestMetricPartitions(ctx context.Context) (int64, error) + GetLastCreatedAttachedProxiedRequestMetricsPartitionName(ctx context.Context) (string, error) + DeleteProxiedRequestMetricsOlderThanNDays(ctx context.Context, n int64) error + + HealthCheck() error + Partition(prefillPeriodDays int) error +} + +type ProxiedRequestMetric struct { + ID int64 + MethodName string + BlockNumber *int64 + ResponseLatencyMilliseconds int64 + Hostname string + RequestIP string + RequestTime time.Time + UserAgent *string + Referer *string + Origin *string + ResponseBackend string + ResponseBackendRoute string + CacheHit bool + PartOfBatch bool +} diff --git a/clients/database/noop/database.go b/clients/database/noop/database.go new file mode 100644 index 0000000..fa133e8 --- /dev/null +++ b/clients/database/noop/database.go @@ -0,0 +1,41 @@ +package noop + +import ( + "context" + "github.com/kava-labs/kava-proxy-service/clients/database" +) + +// Noop is a database client that does nothing +type Noop struct{} + +func New() *Noop { + return &Noop{} +} + +func (e *Noop) SaveProxiedRequestMetric(ctx context.Context, metric *database.ProxiedRequestMetric) error { + return nil +} + +func (e *Noop) ListProxiedRequestMetricsWithPagination(ctx context.Context, cursor int64, limit int) ([]*database.ProxiedRequestMetric, int64, error) { + return []*database.ProxiedRequestMetric{}, 0, nil +} + +func (e *Noop) CountAttachedProxiedRequestMetricPartitions(ctx context.Context) (int64, error) { + return 0, nil +} + +func (e *Noop) GetLastCreatedAttachedProxiedRequestMetricsPartitionName(ctx context.Context) (string, error) { + return "", nil +} + +func (e *Noop) DeleteProxiedRequestMetricsOlderThanNDays(ctx context.Context, n int64) error { + return nil +} + +func (e *Noop) HealthCheck() error { + return nil +} + +func (e *Noop) Partition(prefillPeriodDays int) error { + return nil +} diff --git a/clients/database/postgres.go b/clients/database/postgres/database.go similarity index 84% rename from clients/database/postgres.go rename to clients/database/postgres/database.go index 7133831..06bd279 100644 --- a/clients/database/postgres.go +++ b/clients/database/postgres/database.go @@ -1,4 +1,4 @@ -package database +package postgres import ( "crypto/tls" @@ -13,15 +13,15 @@ import ( "github.com/uptrace/bun/extra/bundebug" ) -// PostgresDatabaseConfig contains values for creating a +// DatabaseConfig contains values for creating a // new connection to a postgres database -type PostgresDatabaseConfig struct { +type DatabaseConfig struct { DatabaseName string DatabaseEndpointURL string DatabaseUsername string DatabasePassword string ReadTimeoutSeconds int64 - WriteTimeousSeconds int64 + WriteTimeoutSeconds int64 DatabaseMaxIdleConnections int64 DatabaseConnectionMaxIdleSeconds int64 DatabaseMaxOpenConnections int64 @@ -31,14 +31,15 @@ type PostgresDatabaseConfig struct { Logger *logging.ServiceLogger } -// PostgresClient wraps a connection to a postgres database -type PostgresClient struct { - *bun.DB +// Client wraps a connection to a postgres database +type Client struct { + db *bun.DB + logger *logging.ServiceLogger } -// NewPostgresClient returns a new connection to the specified +// NewClient returns a new connection to the specified // postgres data and error (if any) -func NewPostgresClient(config PostgresDatabaseConfig) (PostgresClient, error) { +func NewClient(config DatabaseConfig) (Client, error) { // configure postgres database connection options var pgOptions *pgdriver.Connector @@ -54,7 +55,7 @@ func NewPostgresClient(config PostgresDatabaseConfig) (PostgresClient, error) { pgdriver.WithPassword(config.DatabasePassword), pgdriver.WithDatabase(config.DatabaseName), pgdriver.WithReadTimeout(time.Second*time.Duration(config.ReadTimeoutSeconds)), - pgdriver.WithWriteTimeout(time.Second*time.Duration(config.WriteTimeousSeconds)), + pgdriver.WithWriteTimeout(time.Second*time.Duration(config.WriteTimeoutSeconds)), ) } else { pgOptions = pgdriver.NewConnector( @@ -64,7 +65,7 @@ func NewPostgresClient(config PostgresDatabaseConfig) (PostgresClient, error) { pgdriver.WithPassword(config.DatabasePassword), pgdriver.WithDatabase(config.DatabaseName), pgdriver.WithReadTimeout(time.Second*time.Duration(config.ReadTimeoutSeconds)), - pgdriver.WithWriteTimeout(time.Second*time.Duration(config.WriteTimeousSeconds)), + pgdriver.WithWriteTimeout(time.Second*time.Duration(config.WriteTimeoutSeconds)), ) } @@ -86,13 +87,14 @@ func NewPostgresClient(config PostgresDatabaseConfig) (PostgresClient, error) { db.AddQueryHook(bundebug.NewQueryHook(bundebug.WithVerbose(true))) } - return PostgresClient{ - DB: db, + return Client{ + db: db, + logger: config.Logger, }, nil } // HealthCheck returns an error if the database can not // be connected to and queried, nil otherwise -func (pg *PostgresClient) HealthCheck() error { - return pg.Ping() +func (c *Client) HealthCheck() error { + return c.db.Ping() } diff --git a/clients/database/postgres/database_test.go b/clients/database/postgres/database_test.go new file mode 100644 index 0000000..d1cd9af --- /dev/null +++ b/clients/database/postgres/database_test.go @@ -0,0 +1,18 @@ +package postgres + +import ( + "github.com/stretchr/testify/require" + "testing" +) + +func TestDisabledDBCreation(t *testing.T) { + config := DatabaseConfig{} + _, err := NewClient(config) + require.Error(t, err) +} + +func TestHealthcheckNoDatabase(t *testing.T) { + config := DatabaseConfig{} + _, err := NewClient(config) + require.Error(t, err) +} diff --git a/clients/database/database.go b/clients/database/postgres/migrate.go similarity index 92% rename from clients/database/database.go rename to clients/database/postgres/migrate.go index acd40c1..20fa0b3 100644 --- a/clients/database/database.go +++ b/clients/database/postgres/migrate.go @@ -1,4 +1,4 @@ -package database +package postgres import ( "context" @@ -6,7 +6,6 @@ import ( "time" "github.com/kava-labs/kava-proxy-service/logging" - "github.com/uptrace/bun" "github.com/uptrace/bun/migrate" ) @@ -14,9 +13,9 @@ import ( // that haven't been run on the database being used by the proxy service // returning error (if any) and a list of migrations that have been // run and any that were not -func Migrate(ctx context.Context, db *bun.DB, migrations migrate.Migrations, logger *logging.ServiceLogger) (*migrate.MigrationSlice, error) { +func (c *Client) Migrate(ctx context.Context, migrations migrate.Migrations, logger *logging.ServiceLogger) (*migrate.MigrationSlice, error) { // set up migration config - migrator := migrate.NewMigrator(db, &migrations) + migrator := migrate.NewMigrator(c.db, &migrations) // create / verify tables used to tack migrations err := migrator.Init(ctx) diff --git a/clients/database/postgres/migrate_test.go b/clients/database/postgres/migrate_test.go new file mode 100644 index 0000000..bd0f9cb --- /dev/null +++ b/clients/database/postgres/migrate_test.go @@ -0,0 +1,15 @@ +package postgres + +import ( + "context" + "github.com/stretchr/testify/require" + "github.com/uptrace/bun/migrate" + "testing" +) + +func TestMigrateNoDatabase(t *testing.T) { + db := &Client{} + + _, err := db.Migrate(context.Background(), migrate.Migrations{}, nil) + require.Error(t, err) +} diff --git a/clients/database/migrations/20230306182203_add_proxied_request_metrics_table.up.sql b/clients/database/postgres/migrations/20230306182203_add_proxied_request_metrics_table.up.sql similarity index 100% rename from clients/database/migrations/20230306182203_add_proxied_request_metrics_table.up.sql rename to clients/database/postgres/migrations/20230306182203_add_proxied_request_metrics_table.up.sql diff --git a/clients/database/migrations/20230310154711_add_proxied_request_metrics_method_index.up.sql b/clients/database/postgres/migrations/20230310154711_add_proxied_request_metrics_method_index.up.sql similarity index 100% rename from clients/database/migrations/20230310154711_add_proxied_request_metrics_method_index.up.sql rename to clients/database/postgres/migrations/20230310154711_add_proxied_request_metrics_method_index.up.sql diff --git a/clients/database/migrations/20230310154712_add_proxied_request_metrics_block_number_index.up.sql b/clients/database/postgres/migrations/20230310154712_add_proxied_request_metrics_block_number_index.up.sql similarity index 100% rename from clients/database/migrations/20230310154712_add_proxied_request_metrics_block_number_index.up.sql rename to clients/database/postgres/migrations/20230310154712_add_proxied_request_metrics_block_number_index.up.sql diff --git a/clients/database/migrations/20230320161721_add_proxied_request_metrics_ip_and_hostname_columns.up.sql b/clients/database/postgres/migrations/20230320161721_add_proxied_request_metrics_ip_and_hostname_columns.up.sql similarity index 100% rename from clients/database/migrations/20230320161721_add_proxied_request_metrics_ip_and_hostname_columns.up.sql rename to clients/database/postgres/migrations/20230320161721_add_proxied_request_metrics_ip_and_hostname_columns.up.sql diff --git a/clients/database/migrations/20230320161722_add_proxied_request_metrics_hostname_index.up.sql b/clients/database/postgres/migrations/20230320161722_add_proxied_request_metrics_hostname_index.up.sql similarity index 100% rename from clients/database/migrations/20230320161722_add_proxied_request_metrics_hostname_index.up.sql rename to clients/database/postgres/migrations/20230320161722_add_proxied_request_metrics_hostname_index.up.sql diff --git a/clients/database/migrations/20230410142045_add_proxied_request_metrics_user_agent_and_referer_columns.up.sql b/clients/database/postgres/migrations/20230410142045_add_proxied_request_metrics_user_agent_and_referer_columns.up.sql similarity index 100% rename from clients/database/migrations/20230410142045_add_proxied_request_metrics_user_agent_and_referer_columns.up.sql rename to clients/database/postgres/migrations/20230410142045_add_proxied_request_metrics_user_agent_and_referer_columns.up.sql diff --git a/clients/database/migrations/20230410153743_add_proxied_request_metrics_origin_column.up.sql b/clients/database/postgres/migrations/20230410153743_add_proxied_request_metrics_origin_column.up.sql similarity index 100% rename from clients/database/migrations/20230410153743_add_proxied_request_metrics_origin_column.up.sql rename to clients/database/postgres/migrations/20230410153743_add_proxied_request_metrics_origin_column.up.sql diff --git a/clients/database/migrations/20230510135051_add_proxied_request_metrics_request_time_index.up.sql b/clients/database/postgres/migrations/20230510135051_add_proxied_request_metrics_request_time_index.up.sql similarity index 100% rename from clients/database/migrations/20230510135051_add_proxied_request_metrics_request_time_index.up.sql rename to clients/database/postgres/migrations/20230510135051_add_proxied_request_metrics_request_time_index.up.sql diff --git a/clients/database/migrations/20230512150351_add_proxied_request_metrics_id_index.up.sql b/clients/database/postgres/migrations/20230512150351_add_proxied_request_metrics_id_index.up.sql similarity index 100% rename from clients/database/migrations/20230512150351_add_proxied_request_metrics_id_index.up.sql rename to clients/database/postgres/migrations/20230512150351_add_proxied_request_metrics_id_index.up.sql diff --git a/clients/database/migrations/20230523101344_partition_proxied_request_metrics_table.up.sql b/clients/database/postgres/migrations/20230523101344_partition_proxied_request_metrics_table.up.sql similarity index 100% rename from clients/database/migrations/20230523101344_partition_proxied_request_metrics_table.up.sql rename to clients/database/postgres/migrations/20230523101344_partition_proxied_request_metrics_table.up.sql diff --git a/clients/database/migrations/20231013122742_add_response_backend.up.sql b/clients/database/postgres/migrations/20231013122742_add_response_backend.up.sql similarity index 100% rename from clients/database/migrations/20231013122742_add_response_backend.up.sql rename to clients/database/postgres/migrations/20231013122742_add_response_backend.up.sql diff --git a/clients/database/migrations/20231027165300_add_proxied_request_metrics_cachehit_column.up.sql b/clients/database/postgres/migrations/20231027165300_add_proxied_request_metrics_cachehit_column.up.sql similarity index 100% rename from clients/database/migrations/20231027165300_add_proxied_request_metrics_cachehit_column.up.sql rename to clients/database/postgres/migrations/20231027165300_add_proxied_request_metrics_cachehit_column.up.sql diff --git a/clients/database/migrations/20240415111242_add_proxied_request_metrics_partofbatch_column.up.sql b/clients/database/postgres/migrations/20240415111242_add_proxied_request_metrics_partofbatch_column.up.sql similarity index 100% rename from clients/database/migrations/20240415111242_add_proxied_request_metrics_partofbatch_column.up.sql rename to clients/database/postgres/migrations/20240415111242_add_proxied_request_metrics_partofbatch_column.up.sql diff --git a/clients/database/migrations/main.go b/clients/database/postgres/migrations/main.go similarity index 100% rename from clients/database/migrations/main.go rename to clients/database/postgres/migrations/main.go diff --git a/clients/database/postgres/partition.go b/clients/database/postgres/partition.go new file mode 100644 index 0000000..4c1cfad --- /dev/null +++ b/clients/database/postgres/partition.go @@ -0,0 +1,260 @@ +package postgres + +import ( + "context" + "fmt" + "github.com/kava-labs/kava-proxy-service/config" + "math" + "strings" + "time" +) + +const ( + PartitionBaseTableName = "proxied_request_metrics" +) + +// PartitionPeriod represents a single postgres partitioned +// table from a starting point (inclusive of that point in time) +// to an end point (exclusive of that point in time) +type PartitionPeriod struct { + TableName string + InclusiveStartPeriod time.Time + ExclusiveEndPeriod time.Time +} + +// daysInMonth returns the number of days in a month +func daysInMonth(t time.Time) int { + y, m, _ := t.Date() + return time.Date(y, m+1, 0, 0, 0, 0, 0, time.UTC).Day() +} + +// PartitionsForPeriod attempts to generate the partitions +// to create when prefilling numDaysToPrefill, returning the list of +// of partitions and error (if any) +func PartitionsForPeriod(start time.Time, numDaysToPrefill int) ([]PartitionPeriod, error) { + var partitionPeriods []PartitionPeriod + // check function constraints needed to ensure expected behavior + if numDaysToPrefill > config.MaxMetricPartitioningPrefillPeriodDays { + return partitionPeriods, fmt.Errorf("more than %d prefill days specified %d", config.MaxMetricPartitioningPrefillPeriodDays, numDaysToPrefill) + } + + currentYear, currentMonth, currentDay := start.Date() + + daysInCurrentMonth := daysInMonth(start) + + // add one to include the current day + newDaysRemainingInCurrentMonth := daysInCurrentMonth - currentDay + 1 + + // generate partitions for current month + totalPartitionsToGenerate := numDaysToPrefill + + partitionsToGenerateForCurrentMonth := int(math.Min(float64(newDaysRemainingInCurrentMonth), float64(numDaysToPrefill))) + + // generate partitions for current month + for partitionIndex := 0; partitionsToGenerateForCurrentMonth > 0; partitionIndex++ { + partitionPeriod := PartitionPeriod{ + TableName: fmt.Sprintf("%s_year_%d_month_%d_day_%d", PartitionBaseTableName, currentYear, currentMonth, currentDay+partitionIndex), + InclusiveStartPeriod: start.Add(time.Duration(partitionIndex) * 24 * time.Hour).Truncate(24 * time.Hour), + ExclusiveEndPeriod: start.Add(time.Duration(partitionIndex+1) * 24 * time.Hour).Truncate(24 * time.Hour), + } + + partitionPeriods = append(partitionPeriods, partitionPeriod) + + partitionsToGenerateForCurrentMonth-- + } + + // check to see if we need to create any partitions for the + // upcoming month + if totalPartitionsToGenerate > newDaysRemainingInCurrentMonth { + futureMonth := start.Add(time.Hour * 24 * time.Duration(newDaysRemainingInCurrentMonth+1)) + + nextYear, nextMonth, nextDay := futureMonth.Date() + + // on function entry we assert that pre-fill days won't + // overflow more than two unique months + // to generate partitions for + partitionsToGenerateForFutureMonth := totalPartitionsToGenerate - newDaysRemainingInCurrentMonth + + // generate partitions for future month + for partitionIndex := 0; partitionsToGenerateForFutureMonth > 0; partitionIndex++ { + partitionPeriod := PartitionPeriod{ + TableName: fmt.Sprintf("%s_year%d_month%d_day%d", PartitionBaseTableName, nextYear, nextMonth, nextDay+partitionIndex), + InclusiveStartPeriod: futureMonth.Add(time.Duration(partitionIndex) * 24 * time.Hour).Truncate(24 * time.Hour), + ExclusiveEndPeriod: futureMonth.Add(time.Duration(partitionIndex+1) * 24 * time.Hour).Truncate(24 * time.Hour), + } + + partitionPeriods = append(partitionPeriods, partitionPeriod) + + partitionsToGenerateForFutureMonth-- + } + } + + return partitionPeriods, nil +} + +// partition attempts to create (idempotently) future partitions +// for storing proxied request metrics, returning error (if any) +func (c *Client) Partition(prefillPeriodDays int) error { + // calculate partition name and ranges to create + partitionsToCreate, err := PartitionsForPeriod(time.Now(), prefillPeriodDays) + + if err != nil { + return err + } + + c.logger.Trace().Msg(fmt.Sprintf("partitionsToCreate %+v", partitionsToCreate)) + + // create partition for each of those days + for _, partitionToCreate := range partitionsToCreate { + // do below in a transaction to allow retries + // each run of the routine to smooth any over transient issues + // such as dropped database connection or rolling service updates + // and support safe concurrency of multiple instances of the service + // attempting to create partitions + // https://go.dev/doc/database/execute-transactions + tx, err := c.db.BeginTx(context.Background(), nil) + + if err != nil { + c.logger.Error().Msg(fmt.Sprintf("error %s beginning transaction for partition %+v", err, partitionToCreate)) + + continue + } + + // check to see if partition already exists + _, err = tx.Exec(fmt.Sprintf("select * from %s limit 1;", partitionToCreate.TableName)) + + if err != nil { + if !strings.Contains(err.Error(), "42P01") { + c.logger.Error().Msg(fmt.Sprintf("error %s querying for partition %+v", err, partitionToCreate)) + + tx.Rollback() + + continue + } + + // else error indicates table doesn't exist so safe for us to create it + createTableStatement := fmt.Sprintf(` + CREATE TABLE IF NOT EXISTS %s + (LIKE proxied_request_metrics INCLUDING DEFAULTS INCLUDING CONSTRAINTS); + `, partitionToCreate.TableName) + _, err = c.db.Exec(createTableStatement) + + if err != nil { + c.logger.Debug().Msg(fmt.Sprintf("error %s creating partition %+v using statement %s", err, partitionToCreate, createTableStatement)) + + err = tx.Rollback() + + if err != nil { + c.logger.Error().Msg(fmt.Sprintf("error %s rolling back statement %s", err, createTableStatement)) + } + + continue + } + + // attach partitions to main table + attachPartitionStatement := fmt.Sprintf(` + ALTER TABLE proxied_request_metrics ATTACH PARTITION %s + FOR VALUES FROM ('%s') TO ('%s'); + `, partitionToCreate.TableName, partitionToCreate.InclusiveStartPeriod.Format("2006-01-02 15:04:05"), partitionToCreate.ExclusiveEndPeriod.Format("2006-01-02 15:04:05")) + _, err = c.db.Exec(attachPartitionStatement) + + if err != nil { + c.logger.Debug().Msg(fmt.Sprintf("error %s attaching partition %+v using statement %s", err, + partitionToCreate, attachPartitionStatement)) + + err = tx.Rollback() + + if err != nil { + c.logger.Error().Msg(fmt.Sprintf("error %s rolling back statement %s", err, attachPartitionStatement)) + } + + continue + } + + err = tx.Commit() + + if err != nil { + c.logger.Error().Msg(fmt.Sprintf("error %s committing transaction to create partition %+v", err, partitionToCreate)) + + continue + } + + c.logger.Trace().Msg(fmt.Sprintf("created partition %+v", partitionToCreate)) + + continue + } else { + // table exists, no need to create it + c.logger.Trace().Msg(fmt.Sprintf("not creating table for partition %+v as it already exists", partitionToCreate)) + + // but check if it is attached + partitionIsAttachedQuery := fmt.Sprintf(` + SELECT + nmsp_parent.nspname AS parent_schema, + parent.relname AS parent, + nmsp_child.nspname AS child_schema, + child.relname AS child + FROM pg_inherits + JOIN pg_class parent ON pg_inherits.inhparent = parent.oid + JOIN pg_class child ON pg_inherits.inhrelid = child.oid + JOIN pg_namespace nmsp_parent ON nmsp_parent.oid = parent.relnamespace + JOIN pg_namespace nmsp_child ON nmsp_child.oid = child.relnamespace + WHERE parent.relname='proxied_request_metrics' and child.relname='%s';`, partitionToCreate.TableName) + result, err := c.db.Query(partitionIsAttachedQuery) + + if err != nil { + c.logger.Error().Msg(fmt.Sprintf("error %s querying %s to see if partition %+v is already attached", err, partitionIsAttachedQuery, partitionToCreate)) + + continue + } + + if !result.Next() { + c.logger.Trace().Msg(fmt.Sprintf("attaching created but dangling partition %+v", partitionToCreate)) + // table is not attached, attach it + attachPartitionStatement := fmt.Sprintf(` + ALTER TABLE proxied_request_metrics ATTACH PARTITION %s + FOR VALUES FROM ('%s') TO ('%s'); + `, partitionToCreate.TableName, partitionToCreate.InclusiveStartPeriod.Format("2006-01-02 15:04:05"), partitionToCreate.ExclusiveEndPeriod.Format("2006-01-02 15:04:05")) + _, err = c.db.Exec(attachPartitionStatement) + + if err != nil { + c.logger.Debug().Msg(fmt.Sprintf("error %s attaching partition %+v using statement %s", err, + partitionToCreate, attachPartitionStatement)) + + err = tx.Rollback() + + if err != nil { + c.logger.Error().Msg(fmt.Sprintf("error %s rolling back statement %s", err, attachPartitionStatement)) + } + + continue + } + + err = tx.Commit() + + if err != nil { + c.logger.Error().Msg(fmt.Sprintf("error %s committing transaction to create partition %+v", err, partitionToCreate)) + + continue + } + + c.logger.Trace().Msg(fmt.Sprintf("created partition %+v", partitionToCreate)) + + continue + } + + result.Close() + + c.logger.Trace().Msg(fmt.Sprintf("not attaching partition %+v as it is already attached", partitionToCreate)) + + err = tx.Commit() + + if err != nil { + c.logger.Error().Msg(fmt.Sprintf("error %s committing empty transaction for already created partition %+v", err, partitionToCreate)) + } + + continue + } + } + + return nil +} diff --git a/clients/database/postgres/partition_test.go b/clients/database/postgres/partition_test.go new file mode 100644 index 0000000..8e8d1f0 --- /dev/null +++ b/clients/database/postgres/partition_test.go @@ -0,0 +1,55 @@ +package postgres + +import ( + "github.com/kava-labs/kava-proxy-service/config" + "github.com/stretchr/testify/assert" + "testing" + "time" +) + +func TestUnitTestPartitionsForPeriodReturnsExpectedNumPartitionsWhenPrefillPeriodIsNotContainedInCurrentMonth(t *testing.T) { + // prepare + + // pick a date in the middle of a month + startFrom := time.Date(1989, 5, 20, 12, 0, 0, 0, time.UTC) + + // set prefill period to more then days remaining in month + // from above date + daysToPrefill := 21 + + // execute + actualPartitionsForPeriod, err := PartitionsForPeriod(startFrom, daysToPrefill) + + // assert + assert.Nil(t, err) + assert.Equal(t, daysToPrefill, len(actualPartitionsForPeriod)) +} + +func TestUnitTestPartitionsForPeriodReturnsErrWhenTooManyPrefillDays(t *testing.T) { + // prepare + daysToPrefill := config.MaxMetricPartitioningPrefillPeriodDays + 1 + + // execute + _, err := PartitionsForPeriod(time.Now(), daysToPrefill) + + // assert + assert.NotNil(t, err) +} + +func TestUnitTestPartitionsForPeriodReturnsExpectedNumPartitionsWhenPrefillPeriodIsContainedInCurrentMonth(t *testing.T) { + // prepare + + // pick a date in the middle of a month + startFrom := time.Date(1989, 5, 11, 12, 0, 0, 0, time.UTC) + + // set prefill period to less then days remaining in month + // from above date + daysToPrefill := 3 + + // execute + actualPartitionsForPeriod, err := PartitionsForPeriod(startFrom, daysToPrefill) + + // assert + assert.Nil(t, err) + assert.Equal(t, daysToPrefill, len(actualPartitionsForPeriod)) +} diff --git a/clients/database/request_metric.go b/clients/database/postgres/request_metric.go similarity index 55% rename from clients/database/request_metric.go rename to clients/database/postgres/request_metric.go index 2ec3e9c..8ab72d1 100644 --- a/clients/database/request_metric.go +++ b/clients/database/postgres/request_metric.go @@ -1,42 +1,22 @@ -package database +package postgres import ( "context" + "database/sql" "fmt" - "time" - - "github.com/uptrace/bun" + "github.com/kava-labs/kava-proxy-service/clients/database" ) const ( ProxiedRequestMetricsTableName = "proxied_request_metrics" ) -// ProxiedRequestMetric contains request metrics for -// a single request proxied by the proxy service -type ProxiedRequestMetric struct { - bun.BaseModel `bun:"table:proxied_request_metrics,alias:prm"` - - ID int64 `bun:",pk,autoincrement"` - MethodName string - BlockNumber *int64 - ResponseLatencyMilliseconds int64 - Hostname string - RequestIP string `bun:"request_ip"` - RequestTime time.Time - UserAgent *string - Referer *string - Origin *string - ResponseBackend string - ResponseBackendRoute string - CacheHit bool - PartOfBatch bool -} - // Save saves the current ProxiedRequestMetric to -// the database, returning error (if any) -func (prm *ProxiedRequestMetric) Save(ctx context.Context, db *bun.DB) error { - _, err := db.NewInsert().Model(prm).Exec(ctx) +// the database, returning error (if any). +// If db is nil, returns nil error. +func (c *Client) SaveProxiedRequestMetric(ctx context.Context, metric *database.ProxiedRequestMetric) error { + prm := convertProxiedRequestMetric(metric) + _, err := c.db.NewInsert().Model(prm).Exec(ctx) return err } @@ -44,26 +24,33 @@ func (prm *ProxiedRequestMetric) Save(ctx context.Context, db *bun.DB) error { // ListProxiedRequestMetricsWithPagination returns a page of max // `limit` ProxiedRequestMetrics from the offset specified by`cursor` // error (if any) along with a cursor to use to fetch the next page -// if the cursor is 0 no more pages exists -func ListProxiedRequestMetricsWithPagination(ctx context.Context, db *bun.DB, cursor int64, limit int) ([]ProxiedRequestMetric, int64, error) { +// if the cursor is 0 no more pages exists. +// Uses only in tests. If db is nil, returns empty slice and 0 cursor. +func (c *Client) ListProxiedRequestMetricsWithPagination(ctx context.Context, cursor int64, limit int) ([]*database.ProxiedRequestMetric, int64, error) { var proxiedRequestMetrics []ProxiedRequestMetric var nextCursor int64 - count, err := db.NewSelect().Model(&proxiedRequestMetrics).Where("ID > ?", cursor).Limit(limit).ScanAndCount(ctx) + count, err := c.db.NewSelect().Model(&proxiedRequestMetrics).Where("ID > ?", cursor).Limit(limit).ScanAndCount(ctx) // look up the id of the last if count == limit { nextCursor = proxiedRequestMetrics[count-1].ID } + metrics := make([]*database.ProxiedRequestMetric, 0, len(proxiedRequestMetrics)) + for _, metric := range proxiedRequestMetrics { + metrics = append(metrics, metric.ToProxiedRequestMetric()) + } + // otherwise leave nextCursor as 0 to signal no more rows - return proxiedRequestMetrics, nextCursor, err + return metrics, nextCursor, err } // CountAttachedProxiedRequestMetricPartitions returns the current // count of attached partitions for the ProxiedRequestMetricsTableName -// and error (if any) -func CountAttachedProxiedRequestMetricPartitions(ctx context.Context, db *bun.DB) (int64, error) { +// and error (if any). +// If db is nil, returns 0 and nil error. +func (c *Client) CountAttachedProxiedRequestMetricPartitions(ctx context.Context) (int64, error) { var count int64 countPartitionsQuery := fmt.Sprintf(` @@ -75,7 +62,7 @@ func CountAttachedProxiedRequestMetricPartitions(ctx context.Context, db *bun.DB JOIN pg_namespace nmsp_child ON nmsp_child.oid = child.relnamespace WHERE parent.relname='%s';`, ProxiedRequestMetricsTableName) - row := db.QueryRow(countPartitionsQuery) + row := c.db.QueryRow(countPartitionsQuery) err := row.Scan(&count) if err != nil { @@ -88,7 +75,8 @@ func CountAttachedProxiedRequestMetricPartitions(ctx context.Context, db *bun.DB // GetLastCreatedAttachedProxiedRequestMetricsPartitionName gets the table name // for the last created (and attached) proxied request metrics partition -func GetLastCreatedAttachedProxiedRequestMetricsPartitionName(ctx context.Context, db *bun.DB) (string, error) { +// Used for status check. If db is nil, returns empty string and nil error. +func (c *Client) GetLastCreatedAttachedProxiedRequestMetricsPartitionName(ctx context.Context) (string, error) { var lastCreatedAttachedPartitionName string lastCreatedAttachedPartitionNameQuery := fmt.Sprintf(` @@ -101,7 +89,7 @@ FROM pg_inherits JOIN pg_namespace nmsp_child ON nmsp_child.oid = child.relnamespace WHERE parent.relname='%s' order by child.oid desc limit 1;`, ProxiedRequestMetricsTableName) - row := db.QueryRow(lastCreatedAttachedPartitionNameQuery) + row := c.db.QueryRow(lastCreatedAttachedPartitionNameQuery) err := row.Scan(&lastCreatedAttachedPartitionName) if err != nil { @@ -114,9 +102,15 @@ WHERE parent.relname='%s' order by child.oid desc limit 1;`, ProxiedRequestMetri // DeleteProxiedRequestMetricsOlderThanNDays deletes // all proxied request metrics older than the specified -// days, returning error (if any) -func DeleteProxiedRequestMetricsOlderThanNDays(ctx context.Context, db *bun.DB, n int64) error { - _, err := db.NewDelete().Model((*ProxiedRequestMetric)(nil)).Where(fmt.Sprintf("request_time < now() - interval '%d' day", n)).Exec(ctx) +// days, returning error (if any). +// Used during pruning process. If db is nil, returns nil error. +func (c *Client) DeleteProxiedRequestMetricsOlderThanNDays(ctx context.Context, n int64) error { + _, err := c.db.NewDelete().Model((*ProxiedRequestMetric)(nil)).Where(fmt.Sprintf("request_time < now() - interval '%d' day", n)).Exec(ctx) return err } + +// Exec is not part of database.MetricsDatabase interface, so it is used only in the implementation for test purposes. +func (c *Client) Exec(query string, args ...interface{}) (sql.Result, error) { + return c.db.Exec(query, args...) +} diff --git a/clients/database/postgres/request_metric_test.go b/clients/database/postgres/request_metric_test.go new file mode 100644 index 0000000..62507e6 --- /dev/null +++ b/clients/database/postgres/request_metric_test.go @@ -0,0 +1,44 @@ +package postgres + +import ( + "context" + "github.com/kava-labs/kava-proxy-service/clients/database" + "github.com/stretchr/testify/require" + "testing" +) + +func TestNoDatabaseSave(t *testing.T) { + db := &Client{} + + prm := &database.ProxiedRequestMetric{} + err := db.SaveProxiedRequestMetric(context.Background(), prm) + require.Error(t, err) +} + +func TestNoDatabaseListProxiedRequestMetricsWithPagination(t *testing.T) { + db := &Client{} + + _, _, err := db.ListProxiedRequestMetricsWithPagination(context.Background(), 0, 0) + require.Error(t, err) +} + +func TestNoDatabaseCountAttachedProxiedRequestMetricPartitions(t *testing.T) { + db := &Client{} + + _, err := db.CountAttachedProxiedRequestMetricPartitions(context.Background()) + require.Error(t, err) +} + +func TestGetLastCreatedAttachedProxiedRequestMetricsPartitionName(t *testing.T) { + db := &Client{} + + _, err := db.GetLastCreatedAttachedProxiedRequestMetricsPartitionName(context.Background()) + require.Error(t, err) +} + +func TestDeleteProxiedRequestMetricsOlderThanNDays(t *testing.T) { + db := &Client{} + + err := db.DeleteProxiedRequestMetricsOlderThanNDays(context.Background(), 0) + require.Error(t, err) +} diff --git a/clients/database/postgres/types.go b/clients/database/postgres/types.go new file mode 100644 index 0000000..4e17c5d --- /dev/null +++ b/clients/database/postgres/types.go @@ -0,0 +1,67 @@ +package postgres + +import ( + "time" + + "github.com/kava-labs/kava-proxy-service/clients/database" + "github.com/uptrace/bun" +) + +// ProxiedRequestMetric contains request metrics for +// a single request proxied by the proxy service +type ProxiedRequestMetric struct { + bun.BaseModel `bun:"table:proxied_request_metrics,alias:prm"` + + ID int64 `bun:",pk,autoincrement"` + MethodName string + BlockNumber *int64 + ResponseLatencyMilliseconds int64 + Hostname string + RequestIP string `bun:"request_ip"` + RequestTime time.Time + UserAgent *string + Referer *string + Origin *string + ResponseBackend string + ResponseBackendRoute string + CacheHit bool + PartOfBatch bool +} + +func (prm *ProxiedRequestMetric) ToProxiedRequestMetric() *database.ProxiedRequestMetric { + return &database.ProxiedRequestMetric{ + ID: prm.ID, + MethodName: prm.MethodName, + BlockNumber: prm.BlockNumber, + ResponseLatencyMilliseconds: prm.ResponseLatencyMilliseconds, + Hostname: prm.Hostname, + RequestIP: prm.RequestIP, + RequestTime: prm.RequestTime, + UserAgent: prm.UserAgent, + Referer: prm.Referer, + Origin: prm.Origin, + ResponseBackend: prm.ResponseBackend, + ResponseBackendRoute: prm.ResponseBackendRoute, + CacheHit: prm.CacheHit, + PartOfBatch: prm.PartOfBatch, + } +} + +func convertProxiedRequestMetric(metric *database.ProxiedRequestMetric) *ProxiedRequestMetric { + return &ProxiedRequestMetric{ + ID: metric.ID, + MethodName: metric.MethodName, + BlockNumber: metric.BlockNumber, + ResponseLatencyMilliseconds: metric.ResponseLatencyMilliseconds, + Hostname: metric.Hostname, + RequestIP: metric.RequestIP, + RequestTime: metric.RequestTime, + UserAgent: metric.UserAgent, + Referer: metric.Referer, + Origin: metric.Origin, + ResponseBackend: metric.ResponseBackend, + ResponseBackendRoute: metric.ResponseBackendRoute, + CacheHit: metric.CacheHit, + PartOfBatch: metric.PartOfBatch, + } +} diff --git a/config/config.go b/config/config.go index 3adcff8..029c349 100644 --- a/config/config.go +++ b/config/config.go @@ -48,6 +48,7 @@ type Config struct { MetricPruningRoutineInterval time.Duration MetricPruningRoutineDelayFirstRun time.Duration MetricPruningMaxRequestMetricsHistoryDays int + MetricDatabaseEnabled bool CacheEnabled bool RedisEndpointURL string RedisPassword string @@ -101,6 +102,8 @@ const ( DEFAULT_METRIC_PRUNING_ENABLED = true METRIC_PRUNING_ROUTINE_INTERVAL_SECONDS_ENVIRONMENT_KEY = "METRIC_PRUNING_ROUTINE_INTERVAL_SECONDS" // 60 seconds * 60 minutes * 24 hours = 1 day + METRIC_DATABASE_ENABLED_ENVIRONMENT_KEY = "METRIC_DATABASE_ENABLED" + DEFAULT_METRIC_DATABASE_ENABLED = true DEFAULT_METRIC_PRUNING_ROUTINE_INTERVAL_SECONDS = 86400 METRIC_PRUNING_ROUTINE_DELAY_FIRST_RUN_SECONDS_ENVIRONMENT_KEY = "METRIC_PRUNING_ROUTINE_DELAY_FIRST_RUN_SECONDS" DEFAULT_METRIC_PRUNING_ROUTINE_DELAY_FIRST_RUN_SECONDS = 10 @@ -380,6 +383,7 @@ func ReadConfig() Config { MetricPruningRoutineInterval: time.Duration(time.Duration(EnvOrDefaultInt(METRIC_PRUNING_ROUTINE_INTERVAL_SECONDS_ENVIRONMENT_KEY, DEFAULT_METRIC_PRUNING_ROUTINE_INTERVAL_SECONDS)) * time.Second), MetricPruningRoutineDelayFirstRun: time.Duration(time.Duration(EnvOrDefaultInt(METRIC_PRUNING_ROUTINE_DELAY_FIRST_RUN_SECONDS_ENVIRONMENT_KEY, DEFAULT_METRIC_PRUNING_ROUTINE_DELAY_FIRST_RUN_SECONDS)) * time.Second), MetricPruningMaxRequestMetricsHistoryDays: EnvOrDefaultInt(METRIC_PRUNING_MAX_REQUEST_METRICS_HISTORY_DAYS_ENVIRONMENT_KEY, DEFAULT_METRIC_PRUNING_MAX_REQUEST_METRICS_HISTORY_DAYS), + MetricDatabaseEnabled: EnvOrDefaultBool(METRIC_DATABASE_ENABLED_ENVIRONMENT_KEY, DEFAULT_METRIC_DATABASE_ENABLED), CacheEnabled: EnvOrDefaultBool(CACHE_ENABLED_ENVIRONMENT_KEY, false), RedisEndpointURL: os.Getenv(REDIS_ENDPOINT_URL_ENVIRONMENT_KEY), RedisPassword: os.Getenv(REDIS_PASSWORD_ENVIRONMENT_KEY), diff --git a/docker-compose.yml b/docker-compose.yml index 178fba2..b09fd5e 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -2,7 +2,7 @@ services: # run postgres for proxy service to store observability metrics postgres: - image: postgres:15 + image: postgres:13.12 env_file: .env ports: - "${POSTGRES_HOST_PORT}:${POSTGRES_CONTAINER_PORT}" diff --git a/main.go b/main.go index 62733b2..4f315b4 100644 --- a/main.go +++ b/main.go @@ -97,6 +97,7 @@ func startMetricCompactionRoutine(serviceConfig config.Config, service service.P func startMetricPruningRoutine(serviceConfig config.Config, service service.ProxyService, serviceLogger logging.ServiceLogger) <-chan error { if !serviceConfig.MetricPruningEnabled { serviceLogger.Info().Msg("skipping starting metric pruning routine since it is disabled via config") + return make(<-chan error) } @@ -141,33 +142,38 @@ func main() { serviceLogger.Panic().Msg(fmt.Sprintf("%v", errors.Unwrap(err))) } - // configure and run background routines - // metric partitioning routine - go func() { - metricPartitioningErrs := startMetricPartitioningRoutine(serviceConfig, service, serviceLogger) - - for routineErr := range metricPartitioningErrs { - serviceLogger.Error().Msg(fmt.Sprintf("metric partitioning routine encountered error %s", routineErr)) - } - }() - - // metric compaction routine - go func() { - metricCompactionErrs := startMetricCompactionRoutine(serviceConfig, service, serviceLogger) - - for routineErr := range metricCompactionErrs { - serviceLogger.Error().Msg(fmt.Sprintf("metric compaction routine encountered error %s", routineErr)) - } - }() - - // metric pruning routine - go func() { - metricPruningErrs := startMetricPruningRoutine(serviceConfig, service, serviceLogger) - - for routineErr := range metricPruningErrs { - serviceLogger.Error().Msg(fmt.Sprintf("metric pruning routine encountered error %s", routineErr)) - } - }() + // if we use metrics with database, we run some background routines + if serviceConfig.MetricDatabaseEnabled { + // configure and run background routines + // metric partitioning routine + go func() { + metricPartitioningErrs := startMetricPartitioningRoutine(serviceConfig, service, serviceLogger) + + for routineErr := range metricPartitioningErrs { + serviceLogger.Error().Msg(fmt.Sprintf("metric partitioning routine encountered error %s", routineErr)) + } + }() + + // metric compaction routine + go func() { + metricCompactionErrs := startMetricCompactionRoutine(serviceConfig, service, serviceLogger) + + for routineErr := range metricCompactionErrs { + serviceLogger.Error().Msg(fmt.Sprintf("metric compaction routine encountered error %s", routineErr)) + } + }() + + // metric pruning routine + go func() { + metricPruningErrs := startMetricPruningRoutine(serviceConfig, service, serviceLogger) + + for routineErr := range metricPruningErrs { + serviceLogger.Error().Msg(fmt.Sprintf("metric pruning routine encountered error %s", routineErr)) + } + }() + } else { + serviceLogger.Info().Msg("skipping starting metric partitioning, compaction, and pruning routines since metric database is disabled") + } // run the proxy service finalErr := service.Run() diff --git a/main_batch_test.go b/main_batch_test.go index 0dec9f8..d598f0e 100644 --- a/main_batch_test.go +++ b/main_batch_test.go @@ -4,13 +4,13 @@ import ( "bytes" "encoding/json" "fmt" + "github.com/kava-labs/kava-proxy-service/clients/database/postgres" "io" "net/http" "strconv" "testing" "time" - "github.com/kava-labs/kava-proxy-service/clients/database" "github.com/kava-labs/kava-proxy-service/decode" "github.com/kava-labs/kava-proxy-service/service/cachemdw" "github.com/redis/go-redis/v9" @@ -44,7 +44,7 @@ func TestE2ETest_ValidBatchEvmRequests(t *testing.T) { cleanUpRedis(t, redisClient) expectKeysNum(t, redisClient, 0) - db, err := database.NewPostgresClient(databaseConfig) + db, err := postgres.NewClient(databaseConfig) require.NoError(t, err) cleanMetricsDb(t, db) @@ -230,6 +230,10 @@ func TestE2ETest_ValidBatchEvmRequests(t *testing.T) { require.Equal(t, resp.Header[accessControlAllowOriginHeaderName], []string{"*"}) } + if shouldSkipMetrics() { + return + } + // wait for all metrics to be created. // besides verification, waiting for the metrics ensures future tests don't fail b/c metrics are being processed waitForMetricsInWindow(t, tc.expectedNumMetrics, db, startTime, []string{}) diff --git a/main_test.go b/main_test.go index 6f554bd..1e8ae2a 100644 --- a/main_test.go +++ b/main_test.go @@ -28,6 +28,7 @@ import ( "github.com/stretchr/testify/require" "github.com/kava-labs/kava-proxy-service/clients/database" + "github.com/kava-labs/kava-proxy-service/clients/database/postgres" "github.com/kava-labs/kava-proxy-service/config" "github.com/kava-labs/kava-proxy-service/decode" "github.com/kava-labs/kava-proxy-service/logging" @@ -70,7 +71,7 @@ var ( databasePassword = os.Getenv("DATABASE_PASSWORD") databaseUsername = os.Getenv("DATABASE_USERNAME") databaseName = os.Getenv("DATABASE_NAME") - databaseConfig = database.PostgresDatabaseConfig{ + databaseConfig = postgres.DatabaseConfig{ DatabaseName: databaseName, DatabaseEndpointURL: databaseURL, DatabaseUsername: databaseUsername, @@ -88,22 +89,22 @@ var ( // lookup all the request metrics in the database paging as necessary // search for any request metrics between startTime and time.Now() for particular request methods // if testedmethods is empty, all metrics in timeframe are returned. -func findMetricsInWindowForMethods(db database.PostgresClient, startTime time.Time, testedmethods []string) []database.ProxiedRequestMetric { +func findMetricsInWindowForMethods(db postgres.Client, startTime time.Time, testedmethods []string) []*database.ProxiedRequestMetric { extension := time.Duration(testExtendMetricWindowMs) * time.Millisecond // add small buffer into future in case metrics are still being created endTime := time.Now().Add(extension) var nextCursor int64 - var proxiedRequestMetrics []database.ProxiedRequestMetric + var proxiedRequestMetrics []*database.ProxiedRequestMetric - proxiedRequestMetricsPage, nextCursor, err := database.ListProxiedRequestMetricsWithPagination(testContext, db.DB, nextCursor, 10000) + proxiedRequestMetricsPage, nextCursor, err := db.ListProxiedRequestMetricsWithPagination(testContext, nextCursor, 10000) if err != nil { panic(err) } proxiedRequestMetrics = proxiedRequestMetricsPage for nextCursor != 0 { - proxiedRequestMetricsPage, nextCursor, err = database.ListProxiedRequestMetricsWithPagination(testContext, db.DB, nextCursor, 10000) + proxiedRequestMetricsPage, nextCursor, err = db.ListProxiedRequestMetricsWithPagination(testContext, nextCursor, 10000) if err != nil { panic(err) } @@ -111,7 +112,7 @@ func findMetricsInWindowForMethods(db database.PostgresClient, startTime time.Ti proxiedRequestMetrics = append(proxiedRequestMetrics, proxiedRequestMetricsPage...) } - var requestMetricsDuringRequestWindow []database.ProxiedRequestMetric + var requestMetricsDuringRequestWindow []*database.ProxiedRequestMetric // iterate in reverse order to start checking the most recent request metrics first for i := len(proxiedRequestMetrics) - 1; i >= 0; i-- { requestMetric := proxiedRequestMetrics[i] @@ -143,13 +144,13 @@ func findMetricsInWindowForMethods(db database.PostgresClient, startTime time.Ti func waitForMetricsInWindow( t *testing.T, expected int, - db database.PostgresClient, + db postgres.Client, startTime time.Time, testedmethods []string, -) (metrics []database.ProxiedRequestMetric) { - timeoutMin := 1 * time.Second +) (metrics []*database.ProxiedRequestMetric) { + timeoutMin := 2 * time.Second // scale the timeout by the number of expected requests, or at least 1 second - timeout := time.Duration(expected+1) * 100 * time.Millisecond + timeout := time.Duration(expected+1)*100*time.Millisecond + time.Second if timeout < timeoutMin { timeout = timeoutMin } @@ -196,13 +197,17 @@ func TestE2ETestProxyProxiesForMultipleHosts(t *testing.T) { } func TestE2ETestProxyCreatesRequestMetricForEachRequest(t *testing.T) { + if shouldSkipMetrics() { + t.Skip("metrics are disabled") + } + testEthMethodName := "eth_getBlockByNumber" // create api and database clients client, err := ethclient.Dial(proxyServiceURL) require.NoError(t, err) - databaseClient, err := database.NewPostgresClient(databaseConfig) + databaseClient, err := postgres.NewClient(databaseConfig) require.NoError(t, err) @@ -233,7 +238,7 @@ func TestE2ETestProxyTracksBlockNumberForEth_getBlockByNumberRequest(t *testing. require.NoError(t, err) - databaseClient, err := database.NewPostgresClient(databaseConfig) + databaseClient, err := postgres.NewClient(databaseConfig) require.NoError(t, err) @@ -253,6 +258,10 @@ func TestE2ETestProxyTracksBlockNumberForEth_getBlockByNumberRequest(t *testing. require.NoError(t, err) + if shouldSkipMetrics() { + return + } + requestMetricsDuringRequestWindow := waitForMetricsInWindow( t, 1, databaseClient, startTime, []string{testEthMethodName}, ) @@ -271,7 +280,7 @@ func TestE2ETestProxyTracksBlockTagForEth_getBlockByNumberRequest(t *testing.T) require.NoError(t, err) - databaseClient, err := database.NewPostgresClient(databaseConfig) + databaseClient, err := postgres.NewClient(databaseConfig) require.NoError(t, err) @@ -283,6 +292,10 @@ func TestE2ETestProxyTracksBlockTagForEth_getBlockByNumberRequest(t *testing.T) require.NoError(t, err) + if shouldSkipMetrics() { + return + } + requestMetricsDuringRequestWindow := waitForMetricsInWindow( t, 1, databaseClient, startTime, []string{testEthMethodName}, ) @@ -304,7 +317,7 @@ func TestE2ETestProxyTracksBlockNumberForMethodsWithBlockNumberParam(t *testing. client, err := ethclient.Dial(proxyServiceURL) require.NoError(t, err) - databaseClient, err := database.NewPostgresClient(databaseConfig) + databaseClient, err := postgres.NewClient(databaseConfig) require.NoError(t, err) // get the latest queryable block number @@ -343,6 +356,10 @@ func TestE2ETestProxyTracksBlockNumberForMethodsWithBlockNumberParam(t *testing. // eth_call _, _ = client.CallContract(testContext, ethereum.CallMsg{}, requestBlockNumber) + if shouldSkipMetrics() { + return + } + requestMetricsDuringRequestWindow := waitForMetricsInWindow( t, 7, databaseClient, startTime, testedmethods, ) @@ -365,7 +382,7 @@ func TestE2ETestProxyTracksBlockNumberForMethodsWithBlockHashParam(t *testing.T) require.NoError(t, err) - databaseClient, err := database.NewPostgresClient(databaseConfig) + databaseClient, err := postgres.NewClient(databaseConfig) require.NoError(t, err) @@ -395,6 +412,10 @@ func TestE2ETestProxyTracksBlockNumberForMethodsWithBlockHashParam(t *testing.T) // eth_getTransactionByBlockHashAndIndex _, _ = client.TransactionInBlock(testContext, requestBlockHash, 0) + if shouldSkipMetrics() { + return + } + requestMetricsDuringRequestWindow := waitForMetricsInWindow( t, 3, databaseClient, startTime, testedmethods, ) @@ -413,7 +434,7 @@ func TestE2ETest_HeightBasedRouting(t *testing.T) { rpc, err := rpc.Dial(proxyServiceURL) require.NoError(t, err) - databaseClient, err := database.NewPostgresClient(databaseConfig) + databaseClient, err := postgres.NewClient(databaseConfig) require.NoError(t, err) testCases := []struct { @@ -490,6 +511,10 @@ func TestE2ETest_HeightBasedRouting(t *testing.T) { err := rpc.Call(nil, tc.method, tc.params...) require.NoError(t, err) + if shouldSkipMetrics() { + return + } + metrics := waitForMetricsInWindow(t, 1, databaseClient, startTime, []string{tc.method}) require.Len(t, metrics, 1) @@ -646,7 +671,7 @@ func containsHeaders(t *testing.T, headersMap1, headersMap2 http.Header, msg str func TestE2ETestCachingMdwWithBlockNumberParam_Metrics(t *testing.T) { client, err := ethclient.Dial(proxyServiceURL) require.NoError(t, err) - db, err := database.NewPostgresClient(databaseConfig) + db, err := postgres.NewClient(databaseConfig) require.NoError(t, err) redisClient := redis.NewClient(&redis.Options{ @@ -658,7 +683,6 @@ func TestE2ETestCachingMdwWithBlockNumberParam_Metrics(t *testing.T) { expectKeysNum(t, redisClient, 0) // startTime is a time before first request startTime := time.Now() - for _, tc := range []struct { desc string method string @@ -721,6 +745,12 @@ func TestE2ETestCachingMdwWithBlockNumberParam_Metrics(t *testing.T) { require.Equal(t, block1, block2, "blocks should be the same") } + if shouldSkipMetrics() { + cleanUpRedis(t, redisClient) + + return + } + // get metrics between startTime & now for eth_getBlockByNumber requests filteredMetrics := waitForMetricsInWindow(t, 4, db, startTime, []string{"eth_getBlockByNumber"}) @@ -1179,7 +1209,11 @@ func cleanUpRedis(t *testing.T, redisClient *redis.Client) { } } -func cleanMetricsDb(t *testing.T, db database.PostgresClient) { +func cleanMetricsDb(t *testing.T, db postgres.Client) { + if shouldSkipMetrics() { + return + } + _, err := db.Exec("TRUNCATE proxied_request_metrics;") require.NoError(t, err) } @@ -1639,3 +1673,8 @@ func (tx *getTxReceiptByHashResponse) IsIncludedInBlock() bool { tx.Result.BlockNumber != "" && tx.Result.TransactionIndex != "" } + +func shouldSkipMetrics() bool { + // Check if the environment variable SKIP_METRICS is set to "true" + return os.Getenv("SKIP_METRICS") == "true" +} diff --git a/no_metric.env b/no_metric.env new file mode 100644 index 0000000..2e282f5 --- /dev/null +++ b/no_metric.env @@ -0,0 +1,6 @@ +##### Local development config, used for CI to disable metrics + +# If we use metric database. If set to false, +# all metric collection and connection to the DB would be disabled +# and the service would run without any metrics +METRIC_DATABASE_ENABLED=false \ No newline at end of file diff --git a/routines/metric_compaction.go b/routines/metric_compaction.go index 5a2d55a..d949bae 100644 --- a/routines/metric_compaction.go +++ b/routines/metric_compaction.go @@ -5,10 +5,10 @@ package routines import ( "fmt" + "github.com/kava-labs/kava-proxy-service/clients/database" "time" "github.com/google/uuid" - "github.com/kava-labs/kava-proxy-service/clients/database" "github.com/kava-labs/kava-proxy-service/logging" ) @@ -16,7 +16,7 @@ import ( // for creating a new metric compaction routine type MetricCompactionRoutineConfig struct { Interval time.Duration - Database *database.PostgresClient + Database database.MetricsDatabase Logger logging.ServiceLogger } @@ -26,7 +26,7 @@ type MetricCompactionRoutineConfig struct { type MetricCompactionRoutine struct { id string interval time.Duration - *database.PostgresClient + db database.MetricsDatabase logging.ServiceLogger } @@ -52,9 +52,9 @@ func (mcr *MetricCompactionRoutine) Run() (<-chan error, error) { // using the provided config, returning the routine and error (if any) func NewMetricCompactionRoutine(config MetricCompactionRoutineConfig) (*MetricCompactionRoutine, error) { return &MetricCompactionRoutine{ - id: uuid.New().String(), - interval: config.Interval, - PostgresClient: config.Database, - ServiceLogger: config.Logger, + id: uuid.New().String(), + interval: config.Interval, + db: config.Database, + ServiceLogger: config.Logger, }, nil } diff --git a/routines/metric_partitioning.go b/routines/metric_partitioning.go index 9d96026..ec62da8 100644 --- a/routines/metric_partitioning.go +++ b/routines/metric_partitioning.go @@ -1,29 +1,21 @@ package routines import ( - "context" "fmt" - "math" - "strings" + "github.com/kava-labs/kava-proxy-service/clients/database" "time" "github.com/google/uuid" - "github.com/kava-labs/kava-proxy-service/clients/database" - "github.com/kava-labs/kava-proxy-service/config" "github.com/kava-labs/kava-proxy-service/logging" ) -const ( - PartitionBaseTableName = "proxied_request_metrics" -) - // MetricPartitioningRoutineConfig wraps values used // for creating a new metric partitioning routine type MetricPartitioningRoutineConfig struct { Interval time.Duration DelayFirstRun time.Duration PrefillPeriodDays int - Database *database.PostgresClient + Database database.MetricsDatabase Logger logging.ServiceLogger } @@ -35,7 +27,7 @@ type MetricPartitioningRoutine struct { interval time.Duration delayFirstRun time.Duration prefillPeriodDays int - *database.PostgresClient + db database.MetricsDatabase logging.ServiceLogger } @@ -49,8 +41,7 @@ func (mcr *MetricPartitioningRoutine) Run() (<-chan error, error) { time.Sleep(mcr.delayFirstRun) - err := mcr.partition() - + err := mcr.db.Partition(mcr.prefillPeriodDays) if err != nil { errorChannel <- err } @@ -62,7 +53,7 @@ func (mcr *MetricPartitioningRoutine) Run() (<-chan error, error) { for tick := range timer { mcr.Trace().Msg(fmt.Sprintf("%s tick at %+v", mcr.id, tick)) - err := mcr.partition() + err := mcr.db.Partition(mcr.prefillPeriodDays) if err != nil { errorChannel <- err @@ -81,253 +72,7 @@ func NewMetricPartitioningRoutine(config MetricPartitioningRoutineConfig) (*Metr interval: config.Interval, delayFirstRun: config.DelayFirstRun, prefillPeriodDays: config.PrefillPeriodDays, - PostgresClient: config.Database, + db: config.Database, ServiceLogger: config.Logger, }, nil } - -// PartitionPeriod represents a single postgres partitioned -// table from a starting point (inclusive of that point in time) -// to an end point (exclusive of that point in time) -type PartitionPeriod struct { - TableName string - InclusiveStartPeriod time.Time - ExclusiveEndPeriod time.Time -} - -// daysInMonth returns the number of days in a month -func daysInMonth(t time.Time) int { - y, m, _ := t.Date() - return time.Date(y, m+1, 0, 0, 0, 0, 0, time.UTC).Day() -} - -// partitionsForPeriod attempts to generate the partitions -// to create when prefilling numDaysToPrefill, returning the list of -// of partitions and error (if any) -func partitionsForPeriod(start time.Time, numDaysToPrefill int) ([]PartitionPeriod, error) { - var partitionPeriods []PartitionPeriod - // check function constraints needed to ensure expected behavior - if numDaysToPrefill > config.MaxMetricPartitioningPrefillPeriodDays { - return partitionPeriods, fmt.Errorf("more than %d prefill days specified %d", config.MaxMetricPartitioningPrefillPeriodDays, numDaysToPrefill) - } - - currentYear, currentMonth, currentDay := start.Date() - - daysInCurrentMonth := daysInMonth(start) - - // add one to include the current day - newDaysRemainingInCurrentMonth := daysInCurrentMonth - currentDay + 1 - - // generate partitions for current month - totalPartitionsToGenerate := numDaysToPrefill - - partitionsToGenerateForCurrentMonth := int(math.Min(float64(newDaysRemainingInCurrentMonth), float64(numDaysToPrefill))) - - // generate partitions for current month - for partitionIndex := 0; partitionsToGenerateForCurrentMonth > 0; partitionIndex++ { - partitionPeriod := PartitionPeriod{ - TableName: fmt.Sprintf("%s_year_%d_month_%d_day_%d", PartitionBaseTableName, currentYear, currentMonth, currentDay+partitionIndex), - InclusiveStartPeriod: start.Add(time.Duration(partitionIndex) * 24 * time.Hour).Truncate(24 * time.Hour), - ExclusiveEndPeriod: start.Add(time.Duration(partitionIndex+1) * 24 * time.Hour).Truncate(24 * time.Hour), - } - - partitionPeriods = append(partitionPeriods, partitionPeriod) - - partitionsToGenerateForCurrentMonth-- - } - - // check to see if we need to create any partitions for the - // upcoming month - if totalPartitionsToGenerate > newDaysRemainingInCurrentMonth { - futureMonth := start.Add(time.Hour * 24 * time.Duration(newDaysRemainingInCurrentMonth+1)) - - nextYear, nextMonth, nextDay := futureMonth.Date() - - // on function entry we assert that pre-fill days won't - // overflow more than two unique months - // to generate partitions for - partitionsToGenerateForFutureMonth := totalPartitionsToGenerate - newDaysRemainingInCurrentMonth - - // generate partitions for future month - for partitionIndex := 0; partitionsToGenerateForFutureMonth > 0; partitionIndex++ { - partitionPeriod := PartitionPeriod{ - TableName: fmt.Sprintf("%s_year%d_month%d_day%d", PartitionBaseTableName, nextYear, nextMonth, nextDay+partitionIndex), - InclusiveStartPeriod: futureMonth.Add(time.Duration(partitionIndex) * 24 * time.Hour).Truncate(24 * time.Hour), - ExclusiveEndPeriod: futureMonth.Add(time.Duration(partitionIndex+1) * 24 * time.Hour).Truncate(24 * time.Hour), - } - - partitionPeriods = append(partitionPeriods, partitionPeriod) - - partitionsToGenerateForFutureMonth-- - } - } - - return partitionPeriods, nil -} - -// partition attempts to create (idempotently) future partitions -// for storing proxied request metrics, returning error (if any) -func (mcr *MetricPartitioningRoutine) partition() error { - // calculate partition name and ranges to create - partitionsToCreate, err := partitionsForPeriod(time.Now(), mcr.prefillPeriodDays) - - if err != nil { - return err - } - - mcr.Trace().Msg(fmt.Sprintf("partitionsToCreate %+v", partitionsToCreate)) - - // create partition for each of those days - for _, partitionToCreate := range partitionsToCreate { - // do below in a transaction to allow retries - // each run of the routine to smooth any over transient issues - // such as dropped database connection or rolling service updates - // and support safe concurrency of multiple instances of the service - // attempting to create partitions - // https://go.dev/doc/database/execute-transactions - tx, err := mcr.DB.BeginTx(context.Background(), nil) - - if err != nil { - mcr.Error().Msg(fmt.Sprintf("error %s beginning transaction for partition %+v", err, partitionToCreate)) - - continue - } - - // check to see if partition already exists - _, err = tx.Exec(fmt.Sprintf("select * from %s limit 1;", partitionToCreate.TableName)) - - if err != nil { - if !strings.Contains(err.Error(), "42P01") { - mcr.Error().Msg(fmt.Sprintf("error %s querying for partition %+v", err, partitionToCreate)) - - tx.Rollback() - - continue - } - - // else error indicates table doesn't exist so safe for us to create it - createTableStatement := fmt.Sprintf(` - CREATE TABLE IF NOT EXISTS %s - (LIKE proxied_request_metrics INCLUDING DEFAULTS INCLUDING CONSTRAINTS); - `, partitionToCreate.TableName) - _, err = mcr.DB.Exec(createTableStatement) - - if err != nil { - mcr.Debug().Msg(fmt.Sprintf("error %s creating partition %+v using statement %s", err, partitionToCreate, createTableStatement)) - - err = tx.Rollback() - - if err != nil { - mcr.Error().Msg(fmt.Sprintf("error %s rolling back statement %s", err, createTableStatement)) - } - - continue - } - - // attach partitions to main table - attachPartitionStatement := fmt.Sprintf(` - ALTER TABLE proxied_request_metrics ATTACH PARTITION %s - FOR VALUES FROM ('%s') TO ('%s'); - `, partitionToCreate.TableName, partitionToCreate.InclusiveStartPeriod.Format("2006-01-02 15:04:05"), partitionToCreate.ExclusiveEndPeriod.Format("2006-01-02 15:04:05")) - _, err = mcr.DB.Exec(attachPartitionStatement) - - if err != nil { - mcr.Debug().Msg(fmt.Sprintf("error %s attaching partition %+v using statement %s", err, - partitionToCreate, attachPartitionStatement)) - - err = tx.Rollback() - - if err != nil { - mcr.Error().Msg(fmt.Sprintf("error %s rolling back statement %s", err, attachPartitionStatement)) - } - - continue - } - - err = tx.Commit() - - if err != nil { - mcr.Error().Msg(fmt.Sprintf("error %s committing transaction to create partition %+v", err, partitionToCreate)) - - continue - } - - mcr.Trace().Msg(fmt.Sprintf("created partition %+v", partitionToCreate)) - - continue - } else { - // table exists, no need to create it - mcr.Trace().Msg(fmt.Sprintf("not creating table for partition %+v as it already exists", partitionToCreate)) - - // but check if it is attached - partitionIsAttachedQuery := fmt.Sprintf(` - SELECT - nmsp_parent.nspname AS parent_schema, - parent.relname AS parent, - nmsp_child.nspname AS child_schema, - child.relname AS child - FROM pg_inherits - JOIN pg_class parent ON pg_inherits.inhparent = parent.oid - JOIN pg_class child ON pg_inherits.inhrelid = child.oid - JOIN pg_namespace nmsp_parent ON nmsp_parent.oid = parent.relnamespace - JOIN pg_namespace nmsp_child ON nmsp_child.oid = child.relnamespace - WHERE parent.relname='proxied_request_metrics' and child.relname='%s';`, partitionToCreate.TableName) - result, err := mcr.DB.Query(partitionIsAttachedQuery) - - if err != nil { - mcr.Error().Msg(fmt.Sprintf("error %s querying %s to see if partition %+v is already attached", err, partitionIsAttachedQuery, partitionToCreate)) - - continue - } - - if !result.Next() { - mcr.Trace().Msg(fmt.Sprintf("attaching created but dangling partition %+v", partitionToCreate)) - // table is not attached, attach it - attachPartitionStatement := fmt.Sprintf(` - ALTER TABLE proxied_request_metrics ATTACH PARTITION %s - FOR VALUES FROM ('%s') TO ('%s'); - `, partitionToCreate.TableName, partitionToCreate.InclusiveStartPeriod.Format("2006-01-02 15:04:05"), partitionToCreate.ExclusiveEndPeriod.Format("2006-01-02 15:04:05")) - _, err = mcr.DB.Exec(attachPartitionStatement) - - if err != nil { - mcr.Debug().Msg(fmt.Sprintf("error %s attaching partition %+v using statement %s", err, - partitionToCreate, attachPartitionStatement)) - - err = tx.Rollback() - - if err != nil { - mcr.Error().Msg(fmt.Sprintf("error %s rolling back statement %s", err, attachPartitionStatement)) - } - - continue - } - - err = tx.Commit() - - if err != nil { - mcr.Error().Msg(fmt.Sprintf("error %s committing transaction to create partition %+v", err, partitionToCreate)) - - continue - } - - mcr.Trace().Msg(fmt.Sprintf("created partition %+v", partitionToCreate)) - - continue - } - - result.Close() - - mcr.Trace().Msg(fmt.Sprintf("not attaching partition %+v as it is already attached", partitionToCreate)) - - err = tx.Commit() - - if err != nil { - mcr.Error().Msg(fmt.Sprintf("error %s committing empty transaction for already created partition %+v", err, partitionToCreate)) - } - - continue - } - } - - return nil -} diff --git a/routines/metric_partitioning_test.go b/routines/metric_partitioning_test.go index 436b96f..439e103 100644 --- a/routines/metric_partitioning_test.go +++ b/routines/metric_partitioning_test.go @@ -2,13 +2,13 @@ package routines import ( "context" - "os" - "testing" - "time" - + "github.com/kava-labs/kava-proxy-service/clients/database/postgres" "github.com/kava-labs/kava-proxy-service/config" "github.com/kava-labs/kava-proxy-service/service" "github.com/stretchr/testify/assert" + "os" + "testing" + "time" ) var ( @@ -31,10 +31,14 @@ var ( ) func TestE2ETestMetricPartitioningRoutinePrefillsExpectedPartitionsAfterStartupDelay(t *testing.T) { + if shouldSkipMetrics() { + t.Skip("Skipping test because environment variable SKIP_METRICS is set to true") + } + // prepare time.Sleep(time.Duration(MetricPartitioningRoutineDelayFirstRunSeconds) * time.Second) - expectedPartitions, err := partitionsForPeriod(time.Now().UTC(), int(configuredPrefillDays)) + expectedPartitions, err := postgres.PartitionsForPeriod(time.Now().UTC(), int(configuredPrefillDays)) assert.Nil(t, err) @@ -47,49 +51,7 @@ func TestE2ETestMetricPartitioningRoutinePrefillsExpectedPartitionsAfterStartupD assert.Equal(t, expectedPartitions[len(expectedPartitions)-1].TableName, databaseStatus.LatestProxiedRequestMetricPartitionTableName) } -func TestUnitTestpartitionsForPeriodReturnsErrWhenTooManyPrefillDays(t *testing.T) { - // prepare - daysToPrefill := config.MaxMetricPartitioningPrefillPeriodDays + 1 - - // execute - _, err := partitionsForPeriod(time.Now(), daysToPrefill) - - // assert - assert.NotNil(t, err) -} - -func TestUnitTestpartitionsForPeriodReturnsExpectedNumPartitionsWhenPrefillPeriodIsContainedInCurrentMonth(t *testing.T) { - // prepare - - // pick a date in the middle of a month - startFrom := time.Date(1989, 5, 11, 12, 0, 0, 0, time.UTC) - - // set prefill period to less then days remaining in month - // from above date - daysToPrefill := 3 - - // execute - actualPartitionsForPeriod, err := partitionsForPeriod(startFrom, daysToPrefill) - - // assert - assert.Nil(t, err) - assert.Equal(t, daysToPrefill, len(actualPartitionsForPeriod)) -} - -func TestUnitTestpartitionsForPeriodReturnsExpectedNumPartitionsWhenPrefillPeriodIsNotContainedInCurrentMonth(t *testing.T) { - // prepare - - // pick a date in the middle of a month - startFrom := time.Date(1989, 5, 20, 12, 0, 0, 0, time.UTC) - - // set prefill period to more then days remaining in month - // from above date - daysToPrefill := 21 - - // execute - actualPartitionsForPeriod, err := partitionsForPeriod(startFrom, daysToPrefill) - - // assert - assert.Nil(t, err) - assert.Equal(t, daysToPrefill, len(actualPartitionsForPeriod)) +func shouldSkipMetrics() bool { + // Check if the environment variable SKIP_METRICS is set to "true" + return os.Getenv("SKIP_METRICS") == "true" } diff --git a/routines/metric_pruning.go b/routines/metric_pruning.go index 6586c7e..95812fd 100644 --- a/routines/metric_pruning.go +++ b/routines/metric_pruning.go @@ -6,10 +6,10 @@ package routines import ( "context" "fmt" + "github.com/kava-labs/kava-proxy-service/clients/database" "time" "github.com/google/uuid" - "github.com/kava-labs/kava-proxy-service/clients/database" "github.com/kava-labs/kava-proxy-service/logging" ) @@ -19,7 +19,7 @@ type MetricPruningRoutineConfig struct { Interval time.Duration StartDelay time.Duration MaxRequestMetricsHistoryDays int64 - Database *database.PostgresClient + Database database.MetricsDatabase Logger logging.ServiceLogger } @@ -31,7 +31,7 @@ type MetricPruningRoutine struct { interval time.Duration startDelay time.Duration maxRequestMetricsHistoryDays int64 - *database.PostgresClient + db database.MetricsDatabase logging.ServiceLogger } @@ -50,7 +50,7 @@ func (mpr *MetricPruningRoutine) Run() (<-chan error, error) { for tick := range timer { mpr.Trace().Msg(fmt.Sprintf("%s tick at %+v", mpr.id, tick)) - database.DeleteProxiedRequestMetricsOlderThanNDays(context.Background(), mpr.DB, mpr.maxRequestMetricsHistoryDays) + mpr.db.DeleteProxiedRequestMetricsOlderThanNDays(context.Background(), mpr.maxRequestMetricsHistoryDays) } }() @@ -65,7 +65,7 @@ func NewMetricPruningRoutine(config MetricPruningRoutineConfig) (*MetricPruningR interval: config.Interval, startDelay: config.StartDelay, maxRequestMetricsHistoryDays: config.MaxRequestMetricsHistoryDays, - PostgresClient: config.Database, + db: config.Database, ServiceLogger: config.Logger, }, nil } diff --git a/service/handlers.go b/service/handlers.go index b7aa38c..91787e9 100644 --- a/service/handlers.go +++ b/service/handlers.go @@ -5,9 +5,8 @@ import ( "encoding/json" "errors" "fmt" - "net/http" - "github.com/kava-labs/kava-proxy-service/clients/database" + "net/http" ) // createHealthcheckHandler creates a health check handler function that @@ -68,11 +67,11 @@ func createServicecheckHandler(service *ProxyService) func(http.ResponseWriter, // function responding to requests for the status of database related // operations such as proxied request metrics compaction and // partitioning -func createDatabaseStatusHandler(service *ProxyService, db *database.PostgresClient) func(http.ResponseWriter, *http.Request) { +func createDatabaseStatusHandler(service *ProxyService, db database.MetricsDatabase) func(http.ResponseWriter, *http.Request) { return func(w http.ResponseWriter, r *http.Request) { service.Debug().Msg("/database/status called") - proxiedRequestMetricPartitionsCount, err := database.CountAttachedProxiedRequestMetricPartitions(r.Context(), db.DB) + proxiedRequestMetricPartitionsCount, err := db.CountAttachedProxiedRequestMetricPartitions(r.Context()) if err != nil { service.Error().Msg(fmt.Sprintf("error %s getting proxiedRequestMetricPartitionsCount", err)) @@ -80,7 +79,7 @@ func createDatabaseStatusHandler(service *ProxyService, db *database.PostgresCli return } - proxiedRequestMetricLatestAttachedPartitionName, err := database.GetLastCreatedAttachedProxiedRequestMetricsPartitionName(r.Context(), db.DB) + proxiedRequestMetricLatestAttachedPartitionName, err := db.GetLastCreatedAttachedProxiedRequestMetricsPartitionName(r.Context()) if err != nil { service.Error().Msg(fmt.Sprintf("error %s getting proxiedRequestMetricPartitionsCount", err)) diff --git a/service/middleware.go b/service/middleware.go index 8df5d9b..2d0cd52 100644 --- a/service/middleware.go +++ b/service/middleware.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "fmt" + "github.com/kava-labs/kava-proxy-service/clients/database" "io" "net/http" "strings" @@ -11,7 +12,6 @@ import ( "github.com/urfave/negroni" - "github.com/kava-labs/kava-proxy-service/clients/database" "github.com/kava-labs/kava-proxy-service/config" "github.com/kava-labs/kava-proxy-service/decode" "github.com/kava-labs/kava-proxy-service/logging" @@ -492,7 +492,7 @@ func createAfterProxyFinalizer(service *ProxyService, config config.Config) http isCached := cachemdw.IsRequestCached(r.Context()) // create a metric for the request - metric := database.ProxiedRequestMetric{ + metric := &database.ProxiedRequestMetric{ MethodName: decodedRequestBody.Method, ResponseLatencyMilliseconds: originRoundtripLatencyMilliseconds, RequestTime: requestStartTime, @@ -511,7 +511,7 @@ func createAfterProxyFinalizer(service *ProxyService, config config.Config) http // save metric to database async go func() { // using background context so save won't be terminated when request finishes - err = metric.Save(context.Background(), service.Database.DB) + err = service.Database.SaveProxiedRequestMetric(context.Background(), metric) if err != nil { // TODO: consider only logging diff --git a/service/service.go b/service/service.go index 7b41c6c..133a5b0 100644 --- a/service/service.go +++ b/service/service.go @@ -5,14 +5,16 @@ package service import ( "context" "fmt" + "github.com/kava-labs/kava-proxy-service/clients/database" + "github.com/kava-labs/kava-proxy-service/clients/database/noop" + "github.com/kava-labs/kava-proxy-service/clients/database/postgres" + "github.com/kava-labs/kava-proxy-service/clients/database/postgres/migrations" "net/http" "time" "github.com/ethereum/go-ethereum/ethclient" "github.com/kava-labs/kava-proxy-service/clients/cache" - "github.com/kava-labs/kava-proxy-service/clients/database" - "github.com/kava-labs/kava-proxy-service/clients/database/migrations" "github.com/kava-labs/kava-proxy-service/config" "github.com/kava-labs/kava-proxy-service/logging" "github.com/kava-labs/kava-proxy-service/service/batchmdw" @@ -21,7 +23,7 @@ import ( // ProxyService represents an instance of the proxy service API type ProxyService struct { - Database *database.PostgresClient + Database database.MetricsDatabase Cache *cachemdw.ServiceCache httpProxy *http.Server evmClient *ethclient.Client @@ -32,10 +34,19 @@ type ProxyService struct { func New(ctx context.Context, config config.Config, serviceLogger *logging.ServiceLogger) (ProxyService, error) { service := ProxyService{} - // create database client - db, err := createDatabaseClient(ctx, config, serviceLogger) - if err != nil { - return ProxyService{}, err + var ( + db database.MetricsDatabase + err error + ) + + if config.MetricDatabaseEnabled { + // create database client + db, err = createDatabaseClient(ctx, config, serviceLogger) + if err != nil { + return ProxyService{}, err + } + } else { + db = noop.New() } // create evm api client @@ -138,10 +149,10 @@ func New(ctx context.Context, config config.Config, serviceLogger *logging.Servi // createDatabaseClient creates a connection to the database // using the specified config and runs migrations async -// (only if migration flag in config is true) returning the +// (only if migration flag in config is true) // returning the database connection and error (if any) -func createDatabaseClient(ctx context.Context, config config.Config, logger *logging.ServiceLogger) (*database.PostgresClient, error) { - databaseConfig := database.PostgresDatabaseConfig{ +func createDatabaseClient(ctx context.Context, config config.Config, logger *logging.ServiceLogger) (*postgres.Client, error) { + databaseConfig := postgres.DatabaseConfig{ DatabaseName: config.DatabaseName, DatabaseEndpointURL: config.DatabaseEndpointURL, DatabaseUsername: config.DatabaseUserName, @@ -149,7 +160,7 @@ func createDatabaseClient(ctx context.Context, config config.Config, logger *log SSLEnabled: config.DatabaseSSLEnabled, QueryLoggingEnabled: config.DatabaseQueryLoggingEnabled, ReadTimeoutSeconds: config.DatabaseReadTimeoutSeconds, - WriteTimeousSeconds: config.DatabaseWriteTimeoutSeconds, + WriteTimeoutSeconds: config.DatabaseWriteTimeoutSeconds, DatabaseMaxIdleConnections: config.DatabaseMaxIdleConnections, DatabaseConnectionMaxIdleSeconds: config.DatabaseConnectionMaxIdleSeconds, DatabaseMaxOpenConnections: config.DatabaseMaxOpenConnections, @@ -157,12 +168,12 @@ func createDatabaseClient(ctx context.Context, config config.Config, logger *log RunDatabaseMigrations: config.RunDatabaseMigrations, } - serviceDatabase, err := database.NewPostgresClient(databaseConfig) + serviceDatabase, err := postgres.NewClient(databaseConfig) if err != nil { logger.Error().Msg(fmt.Sprintf("error %s creating database using config %+v", err, databaseConfig)) - return &database.PostgresClient{}, err + return &postgres.Client{}, err } if !databaseConfig.RunDatabaseMigrations { @@ -173,7 +184,7 @@ func createDatabaseClient(ctx context.Context, config config.Config, logger *log // run migrations async so waiting for the database to // be reachable doesn't block the ability of the proxy service // to degrade gracefully and continue to proxy requests even - // without it's database + // without its database go func() { // wait for database to be reachable var databaseOnline bool @@ -195,7 +206,7 @@ func createDatabaseClient(ctx context.Context, config config.Config, logger *log logger.Debug().Msg("running migrations on database") - migrations, err := database.Migrate(ctx, serviceDatabase.DB, *migrations.Migrations, logger) + migrations, err := serviceDatabase.Migrate(ctx, *migrations.Migrations, logger) if err != nil { // TODO: retry based on config