Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add database layer for new architecture #132

Draft
wants to merge 20 commits into
base: rewrite
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

19 changes: 19 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
db-up:
docker compose up --detach
sleep 3

db-up-test:
dotenvy -f .env_files/test.env docker compose up --detach
sleep 3

db-down:
docker compose down

test:
dotenvy -f .env_files/test-server.env cargo test --package ratings_new

full-test: db-up-test test

clean:
docker rmi ratings-postgres -f
cargo clean
4 changes: 1 addition & 3 deletions crates/ratings/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
mod app;
mod features;
mod utils;
use ratings::{app, utils};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
Expand Down
9 changes: 6 additions & 3 deletions crates/ratings_new/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,18 @@ edition = "2021"

[dependencies]
chrono = "0.4.38"
dotenvy = "0.15.7"
envy = "0.4.2"
prost = "0.13.3"
prost-types = "0.13.3"
sqlx = { version = "0.8.2", features = ["runtime-tokio-rustls", "postgres", "chrono", "migrate"] }
serde = { version = "1.0.210", features = ["derive"] }
sqlx = { version = "0.8.2", features = ["runtime-tokio-rustls", "postgres", "chrono", "migrate", "time"] }
thiserror = "1.0.64"
tokio = "1.40.0"
tokio = { version = "1.40.0", features = ["full"] }
tonic = "0.12.2"
tonic-reflection = "0.12.2"
tracing = "0.1.40"
tracing-subscriber = "0.3.18"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }

[dev-dependencies]
simple_test_case = "1.2.0"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
REVOKE ALL PRIVILEGES ON TABLE users FROM CURRENT_USER;
REVOKE USAGE, SELECT ON SEQUENCE users_id_seq FROM CURRENT_USER;
REVOKE ALL PRIVILEGES ON TABLE votes FROM CURRENT_USER;
REVOKE USAGE, SELECT ON SEQUENCE votes_id_seq FROM CURRENT_USER;
REVOKE CONNECT ON DATABASE ratings FROM CURRENT_USER;

DROP TABLE IF EXISTS votes;
DROP TABLE IF EXISTS users;

Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
-- Create database and then execute the second set of commands when connected
-- to the ratings database
--

-- Stage 1

-- CREATE DATABASE IF EXISTS ratings;

-- Stage 2
--

CREATE TABLE users (
id SERIAL PRIMARY KEY,
client_hash CHAR(64) NOT NULL UNIQUE, -- sha256([$user:$machineId])
created TIMESTAMPTZ NOT NULL DEFAULT NOW(),
last_seen TIMESTAMPTZ NOT NULL
);

CREATE TABLE votes (
id SERIAL PRIMARY KEY,
created TIMESTAMPTZ NOT NULL DEFAULT NOW(),
user_id_fk INTEGER NOT NULL REFERENCES users(id) ON DELETE CASCADE,
snap_id CHAR(32) NOT NULL,
snap_revision INT NOT NULL CHECK (snap_revision > 0),
vote_up BOOLEAN NOT NULL
);

-- Create a unique index on user_id, snap_id, and snap_revision.
-- This ensures that the combination of user_id, snap_id, and snap_revision
-- is unique in the votes table. It helps enforce the rule that a user
-- can't vote more than once for the same snap revision.
CREATE UNIQUE INDEX idx_votes_unique_user_snap ON votes (user_id_fk, snap_id, snap_revision);

-- Grant privileges to the user currently running the script
GRANT ALL PRIVILEGES ON TABLE users TO CURRENT_USER;
GRANT USAGE, SELECT ON SEQUENCE users_id_seq TO CURRENT_USER;
GRANT ALL PRIVILEGES ON TABLE votes TO CURRENT_USER;
GRANT USAGE, SELECT ON SEQUENCE votes_id_seq TO CURRENT_USER;
GRANT CONNECT ON DATABASE ratings TO CURRENT_USER;
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
-- Add up migration script here

CREATE TABLE snap_categories (
id SERIAL PRIMARY KEY,
snap_id CHAR(32) NOT NULL,
category INTEGER NOT NULL,
CONSTRAINT category CHECK (category BETWEEN 0 AND 19)
);
156 changes: 156 additions & 0 deletions crates/ratings_new/src/db/mod.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1,158 @@
use crate::utils::{Config, Migrator};
use sqlx::{postgres::PgPoolOptions, PgPool};
use thiserror::Error;
use tokio::sync::OnceCell;
use tracing::info;

pub mod user;
pub mod vote;

pub type ClientHash = String;
pub type Result<T> = std::result::Result<T, DbError>;

/// Errors that can occur when a user votes.
#[derive(Error, Debug)]
pub enum DbError {
/// A record could not be created for the user
#[error("failed to create user record")]
FailedToCreateUserRecord,
/// We were unable to delete a user with the given instance ID
#[error("failed to delete user by instance id")]
FailedToDeleteUserRecord,
/// We could not get a vote by a given user
#[error("failed to get user vote")]
FailedToGetUserVote,
/// The user was unable to cast a vote
#[error("failed to cast vote")]
FailedToCastVote,
/// An error that occurred in category updating
#[error(transparent)]
Sqlx(#[from] sqlx::Error),
/// An error that occurred in the configuration
#[error(transparent)]
Envy(#[from] envy::Error),
}

const MAX_POOL_CONNECTIONS: u32 = 5;

static POOL: OnceCell<PgPool> = OnceCell::const_new();

pub async fn init_pool_from_uri(postgres_uri: &str) -> Result<PgPool> {
info!("Initialising DB connection pool");
let pool = PgPoolOptions::new()
.max_connections(MAX_POOL_CONNECTIONS)
.connect(postgres_uri)
.await?;

Ok(pool)
}

pub async fn init_pool_from_uri_and_migrate(postgres_uri: &str) -> Result<PgPool> {
let pool = init_pool_from_uri(postgres_uri).await?;
info!("Running DB migrations");
let migrator = Migrator::new(postgres_uri).await?;
migrator.run().await?;

Ok(pool)
}

pub async fn get_pool() -> Result<&'static PgPool> {
let config = Config::get().await?;
let pool = POOL
.get_or_try_init(|| init_pool_from_uri_and_migrate(&config.postgres_uri))
.await?;
Ok(pool)
}

#[macro_export]
macro_rules! conn {
{ } => {
&mut *($crate::db::get_pool().await?.acquire().await?)
}
}

#[cfg(test)]
mod test {
use sqlx::types::time::OffsetDateTime;
use tracing_subscriber::EnvFilter;

use crate::conn;

use super::*;

#[tokio::test]
async fn save_and_read_votes() -> Result<()> {
let client_hash_1 = "0000000000000000000000000000000000000000000000000000000000000001";
let client_hash_2 = "0000000000000000000000000000000000000000000000000000000000000002";
let snap_id_1 = "00000000000000000000000000000001";
let snap_id_2 = "00000000000000000000000000000002";

tracing_subscriber::fmt()
.with_env_filter(EnvFilter::from_default_env())
.init();

let test_users = [
user::User::new(client_hash_1),
user::User::new(client_hash_2),
];

let test_votes = [
vote::Vote {
client_hash: String::from(client_hash_1),
snap_id: String::from(snap_id_1),
vote_up: true,
timestamp: OffsetDateTime::from_unix_timestamp(123).unwrap(),
snap_revision: 1,
},
vote::Vote {
client_hash: String::from(client_hash_2),
snap_id: String::from(snap_id_2),
vote_up: false,
timestamp: OffsetDateTime::from_unix_timestamp(456).unwrap(),
snap_revision: 2,
},
];

let conn = conn!();

for user in test_users.into_iter() {
user.create_or_seen(conn).await?;
}

for vote in test_votes.into_iter() {
vote.save_to_db(conn).await?;
}

let votes_client_1 = vote::Vote::get_all_by_client_hash(
String::from(client_hash_1),
Some(String::from(snap_id_1)),
conn,
)
.await
.unwrap();

let votes_client_2 = vote::Vote::get_all_by_client_hash(
String::from(client_hash_2),
Some(String::from(snap_id_2)),
conn,
)
.await
.unwrap();

assert_eq!(votes_client_1.len(), 1);
let first_vote = votes_client_1.first().unwrap();
assert_eq!(first_vote.snap_id, snap_id_1);
assert_eq!(first_vote.client_hash, client_hash_1);
assert_eq!(first_vote.snap_revision, 1);
assert!(first_vote.vote_up);

let second_vote = votes_client_2.first().unwrap();
assert_eq!(votes_client_2.len(), 1);
assert_eq!(second_vote.snap_id, snap_id_2);
assert_eq!(second_vote.client_hash, client_hash_2);
assert_eq!(second_vote.snap_revision, 2);
assert!(!second_vote.vote_up);

Ok(())
}
}
68 changes: 68 additions & 0 deletions crates/ratings_new/src/db/user.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
use super::{ClientHash, DbError, Result};
use sqlx::{prelude::FromRow, types::time::OffsetDateTime, PgConnection};
use tracing::error;

/// Information about a user who may be rating snaps.
#[derive(Debug, FromRow)]
pub struct User {
/// The user's ID
pub id: i32,
/// A hash of the user's client
pub client_hash: ClientHash,
/// The time the user was created
pub created: OffsetDateTime,
/// The time the user was last seen
pub last_seen: OffsetDateTime,
}

impl User {
/// Creates a new user from the given [`ClientHash`]
pub fn new(client_hash: &str) -> Self {
let now = OffsetDateTime::now_utc();
Self {
id: -1,
client_hash: client_hash.to_string(),
last_seen: now,
created: now,
}
}

/// Create a [`User`] entry, or note that the user has recently been seen
pub async fn create_or_seen(self, conn: &mut PgConnection) -> Result<Self> {
let user_with_id = sqlx::query_as(
r#"
INSERT INTO users (client_hash, created, last_seen)
VALUES ($1, NOW(), NOW())
ON CONFLICT (client_hash)
DO UPDATE SET last_seen = NOW()
RETURNING id, client_hash, created, last_seen;
"#,
)
.bind(self.client_hash)
.fetch_one(conn)
.await
.map_err(|error| {
error!("{error:?}");
DbError::FailedToCreateUserRecord
})?;

Ok(user_with_id)
}

pub async fn delete_by_client_hash(client_hash: String, conn: &mut PgConnection) -> Result<()> {
sqlx::query(
r#"
DELETE FROM users
WHERE client_hash = $1
"#,
)
.bind(client_hash)
.execute(conn)
.await
.map_err(|error| {
error!("{error:?}");
DbError::FailedToDeleteUserRecord
})?;
Ok(())
}
}
Loading
Loading