Skip to content

Commit

Permalink
Merge pull request #28 from Freyskeyd/feature/update-sqlx
Browse files Browse the repository at this point in the history
  • Loading branch information
Freyskeyd authored Mar 26, 2023
2 parents 9f4e1d3 + 8713db1 commit 934a739
Show file tree
Hide file tree
Showing 17 changed files with 47 additions and 37 deletions.
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ default-members = ["crates/*"]
members = ["crates/*", "examples/*"]

[workspace.dependencies]
uuid = { version = "0.8.1", features = ["serde", "v4"] }
uuid = { version = "1.3.0", features = ["serde", "v4"] }
serde = { version = "1.0.117", features = ["derive"] }
sqlx = { version = "0.5.9", features = ["chrono", "time", "uuid", "json", "offline", "runtime-actix-native-tls"] }
sqlx = { version = "0.6.2", features = ["chrono", "time", "uuid", "json", "offline", "runtime-actix-native-tls"] }
async-trait = "0.1.51"
serde_json = "1.0.68"
actix = "0.12.0"
Expand Down
2 changes: 1 addition & 1 deletion crates/chekov/src/aggregate/instance/internal.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
pub struct CommandExecutionResult<E, A> {
pub events: Vec<E>,
pub new_version: i64,
pub new_version: u64,
pub state: A,
}
14 changes: 8 additions & 6 deletions crates/chekov/src/aggregate/instance/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ mod runtime;
// TODO: Only one aggregate per app, need to add generic APP
pub struct AggregateInstance<A: Aggregate> {
pub(crate) inner: A,
pub(crate) current_version: i64,
pub(crate) current_version: u64,
pub(crate) identity: String,
pub(crate) resolver: &'static EventResolverRegistry<A>,
}
Expand Down Expand Up @@ -132,11 +132,13 @@ impl<A: Aggregate> AggregateInstance<A> {

fn apply_recorded_event(&mut self, event: &RecordedEvent) -> Result<(), ApplyError> {
match event.stream_version {
Some(version) if self.current_version + 1 > version => Ok(()),
// FIXME: Use u64 instead of i64
Some(version) if (self.current_version + 1) as i64 > version => Ok(()),
// TODO: Replace Any by some more descriptive errors
None if self.current_version != 0 => Err(ApplyError::Any),
// TODO: Replace Any by some more descriptive errors
Some(version) if (self.current_version + 1) != version => Err(ApplyError::Any),
// FIXME: Use u64 instead of i64
Some(version) if (self.current_version + 1) as i64 != version => Err(ApplyError::Any),
_ => {
if let Some(resolver) = self.resolver.get_applier(&event.event_type) {
// TODO: Remove clone
Expand Down Expand Up @@ -197,7 +199,7 @@ impl<A: Aggregate> AggregateInstance<A> {
mut state: A,
correlation_id: Uuid,
stream_id: String,
current_version: i64,
current_version: u64,
) -> Result<CommandExecutionResult<E, A>, CommandExecutorError>
where
A: EventApplier<E>,
Expand All @@ -215,7 +217,7 @@ impl<A: Aggregate> AggregateInstance<A> {
// TODO deal with mailbox error
.await??;

let new_version = current_version + events.len() as i64;
let new_version = current_version + events.len() as u64;
Ok(CommandExecutionResult {
events,
new_version,
Expand All @@ -226,7 +228,7 @@ impl<A: Aggregate> AggregateInstance<A> {
async fn execute_and_apply<C: Command, APP: Application>(
state: A,
command: Dispatch<C, APP>,
current_version: i64,
current_version: u64,
) -> Result<CommandExecutionResult<C::Event, A>, CommandExecutorError>
where
A: CommandExecutor<C>,
Expand Down
2 changes: 1 addition & 1 deletion crates/chekov/src/aggregate/instance/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use tracing::trace;
use super::AggregateInstance;

impl<A: Aggregate> ActixHandler<AggregateVersion> for AggregateInstance<A> {
type Result = i64;
type Result = u64;

fn handle(&mut self, _: AggregateVersion, _: &mut Self::Context) -> Self::Result {
self.current_version
Expand Down
2 changes: 1 addition & 1 deletion crates/chekov/src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ pub(crate) struct ExecuteAppender(pub(crate) event_store::prelude::Appender);
pub(crate) struct ExecuteStreamInfo(pub(crate) String);

#[derive(Message)]
#[rtype("i64")]
#[rtype("u64")]
pub(crate) struct AggregateVersion;

#[derive(Message)]
Expand Down
2 changes: 1 addition & 1 deletion crates/chekov/src/subscriber/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ where
.stream(&subscription.stream_uuid)
.unwrap()
.from(event_store::prelude::ReadVersion::Version(
subscription.first_stream_version as i64,
subscription.first_stream_version as u64,
))
.limit(
(subscription.last_stream_version - subscription.first_stream_version + 1)
Expand Down
1 change: 1 addition & 0 deletions crates/chekov/src/subscriber/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ pub struct EventNotification {
#[allow(dead_code)]
stream_id: i32,
stream_uuid: String,
// FIXME: use u64
first_stream_version: i32,
last_stream_version: i32,
}
Expand Down
10 changes: 6 additions & 4 deletions crates/event_store-backend-inmemory/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ impl Backend for InMemoryBackend {
}

self.stream_counter += 1;
stream.stream_id = self.stream_counter as i64;
stream.stream_id = self.stream_counter as u64;

self.streams.insert(stream_uuid.clone(), stream);
self.events.insert(stream_uuid.clone(), Vec::new());
Expand Down Expand Up @@ -134,10 +134,11 @@ impl Backend for InMemoryBackend {
.iter()
.enumerate()
.map(|(i, event)| RecordedEvent {
event_number: (e.len() + 1 + i) as i64,
// FIXME: Unsafe cast of usize to u64
event_number: (e.len() + 1 + i) as u64,
event_uuid: Uuid::new_v4(),
stream_uuid: event.stream_uuid.clone(),
stream_version: Some(event.stream_version),
stream_version: Some(event.stream_version as i64),
causation_id: event.causation_id,
correlation_id: event.correlation_id,
event_type: event.event_type.clone(),
Expand All @@ -148,7 +149,8 @@ impl Backend for InMemoryBackend {
.collect();

if let Some(s) = self.streams.get_mut(stream_uuid) {
s.stream_version += re.len() as i64;
// FIXME: Unsafe cast of usize to u64
s.stream_version += re.len() as u64;
} else {
return Box::pin(async move { Err(StorageError::StreamDoesntExists) });
}
Expand Down
7 changes: 5 additions & 2 deletions crates/event_store-backend-postgres/src/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,11 @@ pub fn stream_forward(

pub async fn read_stream(
conn: impl sqlx::Executor<'_, Database = sqlx::Postgres>,
stream_id: i64,
stream_id: u64,
version: usize,
limit: usize,
) -> Result<Vec<RecordedEvent>, sqlx::Error> {
let stream_id: i64 = stream_id.try_into().unwrap();
let version: i64 = version.try_into().unwrap();
let limit: i64 = limit.try_into().unwrap();
trace!("Version {}, Limit: {}", version, limit);
Expand Down Expand Up @@ -205,7 +206,9 @@ FROM
.bind(event.created_at);
}
for event in events.iter() {
query = query.bind(event.event_uuid).bind(event.stream_version);
query = query
.bind(event.event_uuid)
.bind(event.stream_version as i64);
}

query.map(|row: PgRow| row.get(0)).fetch_all(conn).await
Expand Down
5 changes: 3 additions & 2 deletions crates/event_store-core/src/event/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ pub trait Event: Serialize + Send + std::convert::TryFrom<RecordedEvent> {
#[rtype("()")]
pub struct RecordedEvent {
/// an incrementing and gapless integer used to order the event in a stream.
pub event_number: i64,
#[sqlx(try_from = "i64")]
pub event_number: u64,
/// Unique identifier for this event
pub event_uuid: Uuid,
/// The stream identifier for thie event
Expand Down Expand Up @@ -82,7 +83,7 @@ pub struct UnsavedEvent {
pub metadata: serde_json::Value,
pub event_uuid: Uuid,
pub stream_uuid: String,
pub stream_version: i64,
pub stream_version: u64,
pub created_at: DateTime<chrono::offset::Utc>,
}

Expand Down
8 changes: 4 additions & 4 deletions crates/event_store-core/src/stream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,13 @@ use error::StreamError;
/// A `Stream` represents an `Event` stream
#[derive(Clone, Debug, PartialEq, sqlx::FromRow)]
pub struct Stream {
// FIXME: Uncomment when sqlx is up to date
// #[sqlx(try_from = "i64")]
pub stream_id: i64,
#[sqlx(try_from = "i64")]
pub stream_id: u64,
/// The stream identifier which is unique
pub stream_uuid: String,
/// The current stream version number
pub stream_version: i64,
#[sqlx(try_from = "i64")]
pub stream_version: u64,
/// The creation date of the stream
pub created_at: DateTime<chrono::offset::Utc>,
/// The deletion date of the stream
Expand Down
4 changes: 2 additions & 2 deletions crates/event_store-core/src/versions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crate::stream::Stream;
#[derive(Debug)]
pub enum ReadVersion {
Origin,
Version(i64),
Version(u64),
}

/// The `ExpectedVersion` used to define optimistic concurrency
Expand All @@ -16,7 +16,7 @@ pub enum ExpectedVersion {
/// Define that we expect an existing stream
StreamExists,
/// Define that we expect a stream in a particular version
Version(i64),
Version(u64),
}

impl ExpectedVersion {
Expand Down
3 changes: 2 additions & 1 deletion crates/event_store/src/storage/appender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,8 @@ impl Appender {
.into_iter()
.enumerate()
.map(|(index, mut event)| {
event.stream_version = stream.stream_version + (index + 1) as i64;
// FIXME: Unsafe cast from usize to u64
event.stream_version = stream.stream_version + (index + 1) as u64;
event.stream_uuid = stream.stream_uuid.clone();
event
})
Expand Down
6 changes: 3 additions & 3 deletions crates/event_store/src/subscriptions/fsm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ impl<S: Storage> SubscriptionFSM<S> {
if self.state == InternalFSMState::Initial && self.data.transient {
self.data.reset_event_tracking();
if self.subscribe_to_events().await.is_ok() {
let start_from: i64 = self.data.start_from.into();
let start_from: u64 = self.data.start_from.into();
debug!("Start from is : {}", start_from);
self.data.last_received = start_from;
self.data.last_sent = start_from;
Expand Down Expand Up @@ -264,7 +264,7 @@ impl<S: Storage> SubscriptionFSM<S> {
}

#[tracing::instrument(skip(self))]
fn track_sent(&mut self, event_number: i64) {
fn track_sent(&mut self, event_number: u64) {
debug!("Executing track_sent for {}", self.data.subscription_name);
self.data.last_sent = std::cmp::max(self.data.last_sent, event_number);
// TODO: Improve this part to be more efficient
Expand All @@ -274,7 +274,7 @@ impl<S: Storage> SubscriptionFSM<S> {
}

#[tracing::instrument(skip(self))]
fn track_last_received(&mut self, event_number: i64) {
fn track_last_received(&mut self, event_number: u64) {
debug!(
"Executing track_last_received for {}",
self.data.subscription_name
Expand Down
4 changes: 2 additions & 2 deletions crates/event_store/src/subscriptions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,10 @@ impl Default for SubscriptionOptions {
#[derive(Debug, PartialEq, Eq, Copy, Clone)]
pub enum StartFrom {
Origin,
Version(i64),
Version(u64),
}

impl From<StartFrom> for i64 {
impl From<StartFrom> for u64 {
fn from(s: StartFrom) -> Self {
match s {
StartFrom::Origin => 0,
Expand Down
8 changes: 4 additions & 4 deletions crates/event_store/src/subscriptions/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,12 @@ pub struct SubscriptionState<S: Storage> {
pub(crate) stream_uuid: String,
pub(crate) start_from: StartFrom,
pub(crate) subscription_name: String,
pub(crate) last_received: i64,
pub(crate) last_sent: i64,
pub(crate) last_ack: i64,
pub(crate) last_received: u64,
pub(crate) last_sent: u64,
pub(crate) last_ack: u64,
pub(crate) queue: VecDeque<Arc<RecordedEvent>>,
pub(crate) transient: bool,
pub(crate) in_flight_event_numbers: Vec<i64>,
pub(crate) in_flight_event_numbers: Vec<u64>,
}

impl<S: Storage> SubscriptionState<S> {
Expand Down
2 changes: 1 addition & 1 deletion crates/event_store/src/subscriptions/subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use super::SubscriptionNotification;
pub struct Subscriber {
pub recipient: Recipient<SubscriptionNotification>,
pub(crate) in_flight: VecDeque<Arc<RecordedEvent>>,
last_sent: i64,
last_sent: u64,
}

impl Actor for Subscriber {
Expand Down

0 comments on commit 934a739

Please sign in to comment.