From a13c3b1c8a6657aa2a29b549915abd02ff521310 Mon Sep 17 00:00:00 2001 From: Simon Date: Wed, 26 Jul 2023 12:30:31 +0200 Subject: [PATCH] Implement listing of projection errors (#4106) * Implement listing of projection errors * Fix compilation --------- Co-authored-by: Simon Dumas --- .../plugins/blazegraph/BlazegraphViews.scala | 6 +- .../compositeviews/CompositeViews.scala | 10 +- .../projections/CompositeProjections.scala | 2 +- .../elasticsearch/ElasticSearchViews.scala | 6 +- .../EventMetricsProjection.scala | 5 +- .../query/DefaultSearchRequest.scala | 12 +- .../query/DefaultViewsQuery.scala | 12 +- .../query/DefaultViewsStore.scala | 10 +- .../query/DefaultViewsQuerySuite.scala | 14 +- .../query/DefaultViewsStoreSuite.scala | 10 +- .../delta/plugins/storage/files/Files.scala | 2 +- .../plugins/storage/storages/Storages.scala | 6 +- .../delta/sdk/projects/ProjectsImpl.scala | 6 +- .../delta/sdk/resolvers/ResolversImpl.scala | 4 +- .../nexus/delta/sdk/sse/SseEventLog.scala | 19 +- .../nexus/delta/sdk/views/ViewsStore.scala | 10 +- .../delta/sourcing/FragmentEncoder.scala | 21 ++ .../nexus/delta/sourcing/Predicate.scala | 39 ---- .../nexus/delta/sourcing/Scope.scala | 38 ++++ .../nexus/delta/sourcing/ScopedEventLog.scala | 48 ++--- .../delta/sourcing/event/EventStreaming.scala | 20 +- .../sourcing/event/GlobalEventStore.scala | 2 +- .../sourcing/event/ScopedEventStore.scala | 24 +-- .../implicits/TimeRangeInstances.scala | 22 +++ .../delta/sourcing/implicits/package.scala | 10 +- .../sourcing/model/FailedElemLogRow.scala | 94 +++++++++ .../nexus/delta/sourcing/offset/Offset.scala | 16 +- .../projections/FailedElemLogStore.scala | 73 ++++++- .../projections/ProjectionErrors.scala | 9 +- .../delta/sourcing/query/StreamingQuery.scala | 9 +- .../sourcing/state/ScopedStateStore.scala | 59 +++--- .../sourcing/stream/ProjectionStore.scala | 86 +-------- .../delta/sourcing/syntax/DoobieSyntax.scala | 20 ++ .../delta/sourcing/ScopedEventLogSuite.scala | 2 +- .../sourcing/event/EventStreamingSuite.scala | 14 +- .../event/ScopedEventStoreSuite.scala | 14 +- .../projections/FailedElemLogStoreSuite.scala | 182 ++++++++++-------- .../state/ScopedStateStoreSuite.scala | 22 +-- .../nexus/testkit/MutableClock.scala | 29 +++ 39 files changed, 586 insertions(+), 401 deletions(-) create mode 100644 delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/FragmentEncoder.scala delete mode 100644 delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/Predicate.scala create mode 100644 delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/Scope.scala create mode 100644 delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/implicits/TimeRangeInstances.scala create mode 100644 delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/model/FailedElemLogRow.scala create mode 100644 delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/syntax/DoobieSyntax.scala create mode 100644 delta/testkit/src/main/scala/ch/epfl/bluebrain/nexus/testkit/MutableClock.scala diff --git a/delta/plugins/blazegraph/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/BlazegraphViews.scala b/delta/plugins/blazegraph/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/BlazegraphViews.scala index a5a9f51d1e..ca16cc06ce 100644 --- a/delta/plugins/blazegraph/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/BlazegraphViews.scala +++ b/delta/plugins/blazegraph/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/blazegraph/BlazegraphViews.scala @@ -293,7 +293,7 @@ final class BlazegraphViews( * Return the existing indexing views in a project in a finite stream */ def currentIndexingViews(project: ProjectRef): ElemStream[IndexingViewDef] = - log.currentStates(Predicate.Project(project)).evalMapFilter { envelope => + log.currentStates(Scope.Project(project)).evalMapFilter { envelope => Task.pure(toIndexViewDef(envelope)) } @@ -301,7 +301,7 @@ final class BlazegraphViews( * Return all existing indexing views in a finite stream */ def currentIndexingViews: ElemStream[IndexingViewDef] = - log.currentStates(Predicate.Root).evalMapFilter { envelope => + log.currentStates(Scope.Root).evalMapFilter { envelope => Task.pure(toIndexViewDef(envelope)) } @@ -309,7 +309,7 @@ final class BlazegraphViews( * Return the indexing views in a non-ending stream */ def indexingViews(start: Offset): ElemStream[IndexingViewDef] = - log.states(Predicate.Root, start).evalMapFilter { envelope => + log.states(Scope.Root, start).evalMapFilter { envelope => Task.pure(toIndexViewDef(envelope)) } diff --git a/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/CompositeViews.scala b/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/CompositeViews.scala index 49dd0ca8c6..96a5ee691a 100644 --- a/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/CompositeViews.scala +++ b/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/CompositeViews.scala @@ -36,7 +36,7 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.model.Identity.Subject import ch.epfl.bluebrain.nexus.delta.sourcing.model.Tag.UserTag import ch.epfl.bluebrain.nexus.delta.sourcing.model._ import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset -import ch.epfl.bluebrain.nexus.delta.sourcing.{Predicate, ScopedEntityDefinition, ScopedEventLog, StateMachine, Transactors} +import ch.epfl.bluebrain.nexus.delta.sourcing.{Scope, ScopedEntityDefinition, ScopedEventLog, StateMachine, Transactors} import io.circe.Json import monix.bio.{IO, Task, UIO} @@ -400,7 +400,7 @@ final class CompositeViews private ( params: CompositeViewSearchParams, ordering: Ordering[ViewResource] ): UIO[UnscoredSearchResults[ViewResource]] = { - val predicate = params.project.fold[Predicate](Predicate.Root)(ref => Predicate.Project(ref)) + val predicate = params.project.fold[Scope](Scope.Root)(ref => Scope.Project(ref)) SearchResults( log.currentStates(predicate, identity(_)).evalMapFilter[Task, ViewResource] { state => fetchContext.cacheOnReads @@ -422,19 +422,19 @@ final class CompositeViews private ( * Return all existing views for the given project in a finite stream */ def currentViews(project: ProjectRef): ElemStream[CompositeViewDef] = - log.currentStates(Predicate.Project(project)).map(toCompositeViewDef) + log.currentStates(Scope.Project(project)).map(toCompositeViewDef) /** * Return all existing indexing views in a finite stream */ def currentViews: ElemStream[CompositeViewDef] = - log.currentStates(Predicate.Root).map(toCompositeViewDef) + log.currentStates(Scope.Root).map(toCompositeViewDef) /** * Return the indexing views in a non-ending stream */ def views(start: Offset): ElemStream[CompositeViewDef] = - log.states(Predicate.Root, start).map(toCompositeViewDef) + log.states(Scope.Root, start).map(toCompositeViewDef) private def toCompositeViewDef(envelope: Envelope[CompositeViewState]) = envelope.toElem { v => Some(v.project) }.map { v => diff --git a/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/projections/CompositeProjections.scala b/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/projections/CompositeProjections.scala index cd7575a140..7d4a0397f2 100644 --- a/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/projections/CompositeProjections.scala +++ b/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/projections/CompositeProjections.scala @@ -142,7 +142,7 @@ object CompositeProjections { Projection.persist( progress, compositeProgressStore.save(view, rev, branch, _), - failedElemLogStore.saveFailedElems(metadata, _) + failedElemLogStore.save(metadata, _) )(batch) ) diff --git a/delta/plugins/elasticsearch/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/ElasticSearchViews.scala b/delta/plugins/elasticsearch/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/ElasticSearchViews.scala index 00525a7ac2..5acacf131f 100644 --- a/delta/plugins/elasticsearch/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/ElasticSearchViews.scala +++ b/delta/plugins/elasticsearch/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/ElasticSearchViews.scala @@ -325,7 +325,7 @@ final class ElasticSearchViews private ( * Return the existing indexing views in a project in a finite stream */ def currentIndexingViews(project: ProjectRef): ElemStream[IndexingViewDef] = - log.currentStates(Predicate.Project(project)).evalMapFilter { envelope => + log.currentStates(Scope.Project(project)).evalMapFilter { envelope => Task.pure(toIndexViewDef(envelope)) } @@ -333,7 +333,7 @@ final class ElasticSearchViews private ( * Return all existing indexing views in a finite stream */ def currentIndexingViews: ElemStream[IndexingViewDef] = - log.currentStates(Predicate.Root).evalMapFilter { envelope => + log.currentStates(Scope.Root).evalMapFilter { envelope => Task.pure(toIndexViewDef(envelope)) } @@ -341,7 +341,7 @@ final class ElasticSearchViews private ( * Return the indexing views in a non-ending stream */ def indexingViews(start: Offset): ElemStream[IndexingViewDef] = - log.states(Predicate.Root, start).evalMapFilter { envelope => + log.states(Scope.Root, start).evalMapFilter { envelope => Task.pure(toIndexViewDef(envelope)) } diff --git a/delta/plugins/elasticsearch/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/EventMetricsProjection.scala b/delta/plugins/elasticsearch/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/EventMetricsProjection.scala index 55249fcff1..c7e8965b9b 100644 --- a/delta/plugins/elasticsearch/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/EventMetricsProjection.scala +++ b/delta/plugins/elasticsearch/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/EventMetricsProjection.scala @@ -14,7 +14,7 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Operation.Sink import ch.epfl.bluebrain.nexus.delta.sourcing.stream._ import ch.epfl.bluebrain.nexus.delta.sourcing.stream.pipes.AsJson -import ch.epfl.bluebrain.nexus.delta.sourcing.{MultiDecoder, Predicate, Transactors} +import ch.epfl.bluebrain.nexus.delta.sourcing.{MultiDecoder, Scope, Transactors} import monix.bio.Task trait EventMetricsProjection @@ -73,8 +73,7 @@ object EventMetricsProjection { MultiDecoder(metricEncoders.map { encoder => encoder.entityType -> encoder.toMetric }.toMap) // define how to get metrics from a given offset - val metrics = (offset: Offset) => - EventStreaming.fetchScoped(Predicate.root, allEntityTypes, offset, queryConfig, xas) + val metrics = (offset: Offset) => EventStreaming.fetchScoped(Scope.root, allEntityTypes, offset, queryConfig, xas) val index = eventMetricsIndex(indexPrefix) diff --git a/delta/plugins/elasticsearch/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/query/DefaultSearchRequest.scala b/delta/plugins/elasticsearch/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/query/DefaultSearchRequest.scala index 46b9aeaff9..3a407a583d 100644 --- a/delta/plugins/elasticsearch/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/query/DefaultSearchRequest.scala +++ b/delta/plugins/elasticsearch/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/query/DefaultSearchRequest.scala @@ -8,7 +8,7 @@ import ch.epfl.bluebrain.nexus.delta.sdk.model.IdSegment import ch.epfl.bluebrain.nexus.delta.sdk.model.search.SortList import ch.epfl.bluebrain.nexus.delta.sdk.projects.FetchContext import ch.epfl.bluebrain.nexus.delta.sdk.projects.model.{ApiMappings, ProjectBase} -import ch.epfl.bluebrain.nexus.delta.sourcing.Predicate +import ch.epfl.bluebrain.nexus.delta.sourcing.Scope import ch.epfl.bluebrain.nexus.delta.sourcing.model.{Label, ProjectRef, ResourceRef} import monix.bio.IO @@ -33,9 +33,9 @@ sealed trait DefaultSearchRequest extends Product with Serializable { def sort: SortList /** - * If the search applies to the project/org/root level + * If the search applies to the project/org/root scope */ - def predicate: Predicate + def scope: Scope } @@ -46,7 +46,7 @@ object DefaultSearchRequest { */ case class ProjectSearch(ref: ProjectRef, params: ResourcesSearchParams, pagination: Pagination, sort: SortList) extends DefaultSearchRequest { - override def predicate: Predicate = Predicate.Project(ref) + override def scope: Scope = Scope(ref) } object ProjectSearch { @@ -82,7 +82,7 @@ object DefaultSearchRequest { */ case class OrgSearch(label: Label, params: ResourcesSearchParams, pagination: Pagination, sort: SortList) extends DefaultSearchRequest { - override def predicate: Predicate = Predicate.Org(label) + override def scope: Scope = Scope.Org(label) } object OrgSearch { @@ -103,7 +103,7 @@ object DefaultSearchRequest { */ case class RootSearch(params: ResourcesSearchParams, pagination: Pagination, sort: SortList) extends DefaultSearchRequest { - override def predicate: Predicate = Predicate.Root + override def scope: Scope = Scope.Root } object RootSearch { diff --git a/delta/plugins/elasticsearch/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/query/DefaultViewsQuery.scala b/delta/plugins/elasticsearch/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/query/DefaultViewsQuery.scala index 89c07424a1..2a977d3ab7 100644 --- a/delta/plugins/elasticsearch/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/query/DefaultViewsQuery.scala +++ b/delta/plugins/elasticsearch/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/query/DefaultViewsQuery.scala @@ -11,7 +11,7 @@ import ch.epfl.bluebrain.nexus.delta.sdk.identities.model.Caller import ch.epfl.bluebrain.nexus.delta.sdk.model.BaseUri import ch.epfl.bluebrain.nexus.delta.sdk.model.search.{AggregationResult, SearchResults} import ch.epfl.bluebrain.nexus.delta.sdk.views.View.IndexingView -import ch.epfl.bluebrain.nexus.delta.sourcing.{Predicate, Transactors} +import ch.epfl.bluebrain.nexus.delta.sourcing.{Scope, Transactors} import io.circe.JsonObject import monix.bio.{IO, UIO} @@ -58,14 +58,14 @@ object DefaultViewsQuery { } def apply[Result, Aggregate]( - fetchViews: Predicate => UIO[List[IndexingView]], + fetchViews: Scope => UIO[List[IndexingView]], aclCheck: AclCheck, listAction: (DefaultSearchRequest, Set[IndexingView]) => IO[ElasticSearchQueryError, Result], aggregateAction: (DefaultSearchRequest, Set[IndexingView]) => IO[ElasticSearchQueryError, Aggregate] ): DefaultViewsQuery[Result, Aggregate] = new DefaultViewsQuery[Result, Aggregate] { - private def filterViews(predicate: Predicate)(implicit caller: Caller) = - fetchViews(predicate) + private def filterViews(scope: Scope)(implicit caller: Caller) = + fetchViews(scope) .flatMap { allViews => aclCheck.mapFilter[IndexingView, IndexingView]( allViews, @@ -80,7 +80,7 @@ object DefaultViewsQuery { override def list( searchRequest: DefaultSearchRequest )(implicit caller: Caller): IO[ElasticSearchQueryError, Result] = - filterViews(searchRequest.predicate).flatMap { views => + filterViews(searchRequest.scope).flatMap { views => listAction(searchRequest, views) } @@ -90,7 +90,7 @@ object DefaultViewsQuery { override def aggregate( searchRequest: DefaultSearchRequest )(implicit caller: Caller): IO[ElasticSearchQueryError, Aggregate] = - filterViews(searchRequest.predicate).flatMap { views => + filterViews(searchRequest.scope).flatMap { views => aggregateAction(searchRequest, views) } diff --git a/delta/plugins/elasticsearch/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/query/DefaultViewsStore.scala b/delta/plugins/elasticsearch/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/query/DefaultViewsStore.scala index 20d83c1052..b9f35ce374 100644 --- a/delta/plugins/elasticsearch/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/query/DefaultViewsStore.scala +++ b/delta/plugins/elasticsearch/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/query/DefaultViewsStore.scala @@ -5,8 +5,8 @@ import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.ElasticSearchViews import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.model.{defaultViewId, permissions, ElasticSearchViewState} import ch.epfl.bluebrain.nexus.delta.sdk.views.View.IndexingView import ch.epfl.bluebrain.nexus.delta.sdk.views.ViewRef -import ch.epfl.bluebrain.nexus.delta.sourcing.{Predicate, Transactors} import ch.epfl.bluebrain.nexus.delta.sourcing.model.Tag +import ch.epfl.bluebrain.nexus.delta.sourcing.{Scope, Transactors} import doobie._ import doobie.implicits._ import io.circe.{Decoder, Json} @@ -18,9 +18,9 @@ import monix.bio.{IO, UIO} trait DefaultViewsStore { /** - * Return views at the given predicate + * Return views at the given scope */ - def find(predicate: Predicate): UIO[List[IndexingView]] + def find(scope: Scope): UIO[List[IndexingView]] } object DefaultViewsStore { @@ -38,11 +38,11 @@ object DefaultViewsStore { def apply(prefix: String, xas: Transactors): DefaultViewsStore = { new DefaultViewsStore { implicit val stateDecoder: Decoder[ElasticSearchViewState] = ElasticSearchViewState.serializer.codec - def find(predicate: Predicate): UIO[List[IndexingView]] = + def find(scope: Scope): UIO[List[IndexingView]] = (fr"SELECT value FROM scoped_states" ++ Fragments.whereAndOpt( Some(fr"type = ${ElasticSearchViews.entityType}"), - predicate.asFragment, + scope.asFragment, Some(fr"tag = ${Tag.Latest.value}"), Some(fr"id = $defaultViewId"), Some(fr"deprecated = false") diff --git a/delta/plugins/elasticsearch/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/query/DefaultViewsQuerySuite.scala b/delta/plugins/elasticsearch/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/query/DefaultViewsQuerySuite.scala index cbc64dc787..88b859f521 100644 --- a/delta/plugins/elasticsearch/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/query/DefaultViewsQuerySuite.scala +++ b/delta/plugins/elasticsearch/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/query/DefaultViewsQuerySuite.scala @@ -10,7 +10,7 @@ import ch.epfl.bluebrain.nexus.delta.sdk.identities.model.Caller import ch.epfl.bluebrain.nexus.delta.sdk.model.search.SortList import ch.epfl.bluebrain.nexus.delta.sdk.views.View.IndexingView import ch.epfl.bluebrain.nexus.delta.sdk.views.ViewRef -import ch.epfl.bluebrain.nexus.delta.sourcing.Predicate +import ch.epfl.bluebrain.nexus.delta.sourcing.Scope import ch.epfl.bluebrain.nexus.delta.sourcing.model.Identity.{Anonymous, Group, User} import ch.epfl.bluebrain.nexus.delta.sourcing.model.{Label, ProjectRef} import ch.epfl.bluebrain.nexus.testkit.bio.BioSuite @@ -45,13 +45,13 @@ class DefaultViewsQuerySuite extends BioSuite { (charlie.subject, AclAddress.Project(project1), Set(permissions.read)) ) - private def fetchViews(predicate: Predicate) = UIO.pure { + private def fetchViews(predicate: Scope) = UIO.pure { val viewRefs = predicate match { - case Predicate.Root => List(defaultView, defaultView2, defaultView3) - case Predicate.Org(`org`) => List(defaultView, defaultView2) - case Predicate.Org(`org2`) => List(defaultView3) - case Predicate.Org(_) => List.empty - case Predicate.Project(project) => List(ViewRef(project, defaultViewId)) + case Scope.Root => List(defaultView, defaultView2, defaultView3) + case Scope.Org(`org`) => List(defaultView, defaultView2) + case Scope.Org(`org2`) => List(defaultView3) + case Scope.Org(_) => List.empty + case Scope.Project(project) => List(ViewRef(project, defaultViewId)) } viewRefs.map { ref => IndexingView(ref, "index", permissions.read) diff --git a/delta/plugins/elasticsearch/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/query/DefaultViewsStoreSuite.scala b/delta/plugins/elasticsearch/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/query/DefaultViewsStoreSuite.scala index 6bd8351d3e..f55e217c12 100644 --- a/delta/plugins/elasticsearch/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/query/DefaultViewsStoreSuite.scala +++ b/delta/plugins/elasticsearch/src/test/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/query/DefaultViewsStoreSuite.scala @@ -6,7 +6,7 @@ import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.{ElasticSearchViewGen import ch.epfl.bluebrain.nexus.delta.rdf.Vocabulary.nxv import ch.epfl.bluebrain.nexus.delta.sdk.views.View.IndexingView import ch.epfl.bluebrain.nexus.delta.sdk.views.ViewRef -import ch.epfl.bluebrain.nexus.delta.sourcing.Predicate +import ch.epfl.bluebrain.nexus.delta.sourcing.Scope import ch.epfl.bluebrain.nexus.delta.sourcing.model.{Label, ProjectRef, Tag} import ch.epfl.bluebrain.nexus.delta.sourcing.state.ScopedStateStoreFixture import ch.epfl.bluebrain.nexus.testkit.bio.{BioSuite, ResourceFixture} @@ -75,7 +75,7 @@ class DefaultViewsStoreSuite extends BioSuite { private lazy val viewStore = defaultViewsStore() - private def findDefaultRefs(predicate: Predicate) = + private def findDefaultRefs(predicate: Scope) = viewStore.find(predicate).map(_.map(_.ref)) test("Construct indexing view correctly") { @@ -89,14 +89,14 @@ class DefaultViewsStoreSuite extends BioSuite { } test(s"Get non-deprecated default views in '$project1'") { - findDefaultRefs(Predicate.Project(project1)).assert(List(defaultView1)) + findDefaultRefs(Scope.Project(project1)).assert(List(defaultView1)) } test(s"Get non-deprecated default views in '$org'") { - findDefaultRefs(Predicate.Org(org)).assert(List(defaultView1, defaultView2)) + findDefaultRefs(Scope.Org(org)).assert(List(defaultView1, defaultView2)) } test(s"Get non-deprecated in all orgs") { - findDefaultRefs(Predicate.Root).assert(List(defaultView1, defaultView2, defaultView3)) + findDefaultRefs(Scope.Root).assert(List(defaultView1, defaultView2, defaultView3)) } } diff --git a/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/files/Files.scala b/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/files/Files.scala index c0e3f2c1f4..771f2b2221 100644 --- a/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/files/Files.scala +++ b/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/files/Files.scala @@ -467,7 +467,7 @@ final class Files( .bimap(WrappedStorageRejection, _.value) ) stream <- log - .states(Predicate.root, offset) + .states(Scope.root, offset) .map { envelope => envelope.value match { case f diff --git a/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/Storages.scala b/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/Storages.scala index 400dabeb3e..b4f29e5717 100644 --- a/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/Storages.scala +++ b/delta/plugins/storage/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/storage/storages/Storages.scala @@ -31,7 +31,7 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.ScopedEntityDefinition.Tagger import ch.epfl.bluebrain.nexus.delta.sourcing.model.Identity.Subject import ch.epfl.bluebrain.nexus.delta.sourcing.model.Tag.UserTag import ch.epfl.bluebrain.nexus.delta.sourcing.model.{ElemStream, EntityType, ProjectRef, ResourceRef} -import ch.epfl.bluebrain.nexus.delta.sourcing.{Predicate, ScopedEntityDefinition, ScopedEventLog, StateMachine, Transactors} +import ch.epfl.bluebrain.nexus.delta.sourcing.{Scope, ScopedEntityDefinition, ScopedEventLog, StateMachine, Transactors} import com.typesafe.scalalogging.Logger import fs2.Stream import io.circe.Json @@ -288,7 +288,7 @@ final class Storages private ( private def fetchDefaults(project: ProjectRef): IO[StorageFetchRejection, Stream[Task, StorageResource]] = fetchContext.onRead(project).map { pc => - log.currentStates(Predicate.Project(project), _.toResource(pc.apiMappings, pc.base)).filter(_.value.default) + log.currentStates(Scope.Project(project), _.toResource(pc.apiMappings, pc.base)).filter(_.value.default) } /** @@ -309,7 +309,7 @@ final class Storages private ( * Return the existing storages in a project in a finite stream */ def currentStorages(project: ProjectRef): ElemStream[StorageState] = - log.currentStates(Predicate.Project(project)).map { + log.currentStates(Scope.Project(project)).map { _.toElem { s => Some(s.project) } } diff --git a/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/projects/ProjectsImpl.scala b/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/projects/ProjectsImpl.scala index 1e344858d1..015f290332 100644 --- a/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/projects/ProjectsImpl.scala +++ b/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/projects/ProjectsImpl.scala @@ -95,16 +95,16 @@ final class ProjectsImpl private ( ): UIO[SearchResults.UnscoredSearchResults[ProjectResource]] = SearchResults( log - .currentStates(params.organization.fold(Predicate.root)(Predicate.Org), _.toResource(defaultApiMappings)) + .currentStates(params.organization.fold(Scope.root)(Scope.Org), _.toResource(defaultApiMappings)) .evalFilter(params.matches), pagination, ordering ).span("listProjects") override def currentRefs: Stream[Task, ProjectRef] = - log.currentStates(Predicate.root).map(_.value.project) + log.currentStates(Scope.root).map(_.value.project) - override def states(offset: Offset): ElemStream[ProjectState] = log.states(Predicate.root, offset).map { + override def states(offset: Offset): ElemStream[ProjectState] = log.states(Scope.root, offset).map { _.toElem { p => Some(p.project) } } diff --git a/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/resolvers/ResolversImpl.scala b/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/resolvers/ResolversImpl.scala index f87eec739d..e0beff4103 100644 --- a/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/resolvers/ResolversImpl.scala +++ b/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/resolvers/ResolversImpl.scala @@ -153,9 +153,9 @@ final class ResolversImpl private ( params: ResolverSearchParams, ordering: Ordering[ResolverResource] ): UIO[UnscoredSearchResults[ResolverResource]] = { - val predicate = params.project.fold[Predicate](Predicate.Root)(ref => Predicate.Project(ref)) + val scope = params.project.fold[Scope](Scope.Root)(ref => Scope.Project(ref)) SearchResults( - log.currentStates(predicate, identity(_)).evalMapFilter[Task, ResolverResource] { state => + log.currentStates(scope, identity(_)).evalMapFilter[Task, ResolverResource] { state => fetchContext.cacheOnReads .onRead(state.project) .redeemWith( diff --git a/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/sse/SseEventLog.scala b/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/sse/SseEventLog.scala index 1ed9ecfdde..8e849ceb12 100644 --- a/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/sse/SseEventLog.scala +++ b/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/sse/SseEventLog.scala @@ -13,7 +13,7 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.event.EventStreaming import ch.epfl.bluebrain.nexus.delta.sourcing.model._ import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset.{At, Start} -import ch.epfl.bluebrain.nexus.delta.sourcing.{MultiDecoder, Predicate, Transactors} +import ch.epfl.bluebrain.nexus.delta.sourcing.{MultiDecoder, Scope, Transactors} import com.typesafe.scalalogging.Logger import fs2.Stream import io.circe.syntax.EncoderOps @@ -161,8 +161,7 @@ object SseEventLog { private def fetchUuids(ref: ProjectRef) = cache.getOrElseUpdate(ref, fetchProject(ref)).attempt.map(_.toOption) - private def stream(predicate: Predicate, selector: Option[Label], offset: Offset) - : Stream[Task, ServerSentEvent] = { + private def stream(scope: Scope, selector: Option[Label], offset: Offset): Stream[Task, ServerSentEvent] = { Stream .fromEither[Task]( selector @@ -174,7 +173,7 @@ object SseEventLog { .flatMap { entityTypes => EventStreaming .fetchAll( - predicate, + scope, entityTypes, offset, config.query, @@ -184,25 +183,25 @@ object SseEventLog { } } - override def stream(offset: Offset): Stream[Task, ServerSentEvent] = stream(Predicate.root, None, offset) + override def stream(offset: Offset): Stream[Task, ServerSentEvent] = stream(Scope.root, None, offset) override def streamBy(selector: Label, offset: Offset): Stream[Task, ServerSentEvent] = - stream(Predicate.root, Some(selector), offset) + stream(Scope.root, Some(selector), offset) override def stream(org: Label, offset: Offset): IO[OrganizationRejection, Stream[Task, ServerSentEvent]] = - fetchOrg(org).as(stream(Predicate.Org(org), None, offset)) + fetchOrg(org).as(stream(Scope.Org(org), None, offset)) override def streamBy(selector: Label, org: Label, offset: Offset) : IO[OrganizationRejection, Stream[Task, ServerSentEvent]] = - fetchOrg(org).as(stream(Predicate.Org(org), Some(selector), offset)) + fetchOrg(org).as(stream(Scope.Org(org), Some(selector), offset)) override def stream(project: ProjectRef, offset: Offset) : IO[ProjectRejection, Stream[Task, ServerSentEvent]] = - fetchProject(project).as(stream(Predicate.Project(project), None, offset)) + fetchProject(project).as(stream(Scope.Project(project), None, offset)) override def streamBy(selector: Label, project: ProjectRef, offset: Offset) : IO[ProjectRejection, Stream[Task, ServerSentEvent]] = - fetchProject(project).as(stream(Predicate.Project(project), Some(selector), offset)) + fetchProject(project).as(stream(Scope.Project(project), Some(selector), offset)) } } .tapEval { sseLog => diff --git a/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/views/ViewsStore.scala b/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/views/ViewsStore.scala index 4baa62f2f9..553b935260 100644 --- a/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/views/ViewsStore.scala +++ b/delta/sdk/src/main/scala/ch/epfl/bluebrain/nexus/delta/sdk/views/ViewsStore.scala @@ -7,7 +7,7 @@ import ch.epfl.bluebrain.nexus.delta.sdk.implicits._ import ch.epfl.bluebrain.nexus.delta.sdk.model.IdSegmentRef import ch.epfl.bluebrain.nexus.delta.sdk.views.View.{AggregateView, IndexingView} import ch.epfl.bluebrain.nexus.delta.sourcing.model.{EntityType, ProjectRef, Tag} -import ch.epfl.bluebrain.nexus.delta.sourcing.{EntityDependencyStore, Predicate, Serializer, Transactors} +import ch.epfl.bluebrain.nexus.delta.sourcing.{EntityDependencyStore, Scope, Serializer, Transactors} import com.typesafe.scalalogging.Logger import doobie._ import doobie.implicits._ @@ -28,11 +28,11 @@ trait ViewsStore[Rejection] { /** * Fetch default views and combine them in an aggregate view - * @param predicate + * @param scope * to get all default view from the system / a given organization / a given project * @return */ - def fetchDefaultViews(predicate: Predicate): UIO[AggregateView] + def fetchDefaultViews(scope: Scope): UIO[AggregateView] } @@ -74,11 +74,11 @@ object ViewsStore { } } yield singleOrMultiple - override def fetchDefaultViews(predicate: Predicate): UIO[AggregateView] = { + override def fetchDefaultViews(scope: Scope): UIO[AggregateView] = { (fr"SELECT value FROM scoped_states" ++ Fragments.whereAndOpt( Some(fr"type = $entityType"), - predicate.asFragment, + scope.asFragment, Some(fr"tag = ${Tag.Latest.value}"), Some(fr"id = $defaultViewId"), Some(fr"deprecated = false") diff --git a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/FragmentEncoder.scala b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/FragmentEncoder.scala new file mode 100644 index 0000000000..97035587ca --- /dev/null +++ b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/FragmentEncoder.scala @@ -0,0 +1,21 @@ +package ch.epfl.bluebrain.nexus.delta.sourcing + +import doobie.util.fragment.Fragment + +/** + * A type class that provides a conversion from a value of type `A` to a doobie [[Fragment]]. + */ +trait FragmentEncoder[A] { + + def apply(value: A): Option[Fragment] + +} + +object FragmentEncoder { + + /** + * Construct an instance from a function + */ + def instance[A](f: A => Option[Fragment]): FragmentEncoder[A] = (value: A) => f(value) + +} diff --git a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/Predicate.scala b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/Predicate.scala deleted file mode 100644 index ac665e5a43..0000000000 --- a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/Predicate.scala +++ /dev/null @@ -1,39 +0,0 @@ -package ch.epfl.bluebrain.nexus.delta.sourcing - -import ch.epfl.bluebrain.nexus.delta.sourcing.model.{Label, ProjectRef} -import doobie.implicits._ -import doobie.util.fragment.Fragment - -/** - * Allows to filter results when querying the database for scoped entities - */ -sealed trait Predicate extends Product with Serializable { - def asFragment: Option[Fragment] -} - -object Predicate { - - val root: Predicate = Root - - /** - * Get all results for any org and any project - */ - final case object Root extends Predicate { - override def asFragment: Option[Fragment] = None - } - - /** - * Get all results within the given org - */ - final case class Org(label: Label) extends Predicate { - override def asFragment: Option[Fragment] = Some(fr"org = $label") - } - - /** - * Get all results within the given project - */ - final case class Project(ref: ProjectRef) extends Predicate { - override def asFragment: Option[Fragment] = Some(fr"org = ${ref.organization} and project = ${ref.project}") - } - -} diff --git a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/Scope.scala b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/Scope.scala new file mode 100644 index 0000000000..e16e7618dd --- /dev/null +++ b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/Scope.scala @@ -0,0 +1,38 @@ +package ch.epfl.bluebrain.nexus.delta.sourcing + +import ch.epfl.bluebrain.nexus.delta.sourcing.model.{Label, ProjectRef} +import doobie.implicits._ + +/** + * Allows to filter results when querying the database for scoped entities + */ +sealed trait Scope extends Product with Serializable + +object Scope { + + val root: Scope = Root + + def apply(project: ProjectRef): Scope = Project(project) + + /** + * Get all results for any org and any project + */ + final case object Root extends Scope + + /** + * Get all results within the given org + */ + final case class Org(label: Label) extends Scope + + /** + * Get all results within the given project + */ + final case class Project(ref: ProjectRef) extends Scope + + implicit val scopeFragmentEncoder: FragmentEncoder[Scope] = FragmentEncoder.instance { + case Root => None + case Org(label) => Some(fr"org = $label") + case Project(ref) => Some(fr"org = ${ref.organization} and project = ${ref.project}") + } + +} diff --git a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/ScopedEventLog.scala b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/ScopedEventLog.scala index 41e41c6853..bfcfbf4d8a 100644 --- a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/ScopedEventLog.scala +++ b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/ScopedEventLog.scala @@ -114,66 +114,66 @@ trait ScopedEventLog[Id, S <: ScopedState, Command, E <: ScopedEvent, Rejection] /** * Allow to stream all current events within [[Envelope]] s - * @param predicate + * @param scope * to filter returned events * @param offset * offset to start from */ - def currentEvents(predicate: Predicate, offset: Offset): EnvelopeStream[E] + def currentEvents(scope: Scope, offset: Offset): EnvelopeStream[E] /** * Allow to stream all current events within [[Envelope]] s - * @param predicate + * @param scope * to filter returned events * @param offset * offset to start from */ - def events(predicate: Predicate, offset: Offset): EnvelopeStream[E] + def events(scope: Scope, offset: Offset): EnvelopeStream[E] /** * Allow to stream all latest states within [[Envelope]] s without applying transformation - * @param predicate + * @param scope * to filter returned states * @param offset * offset to start from */ - def currentStates(predicate: Predicate, offset: Offset): EnvelopeStream[S] + def currentStates(scope: Scope, offset: Offset): EnvelopeStream[S] /** * Allow to stream all latest states from the beginning within [[Envelope]] s without applying transformation - * @param predicate + * @param scope * to filter returned states */ - def currentStates(predicate: Predicate): EnvelopeStream[S] = currentStates(predicate, Offset.Start) + def currentStates(scope: Scope): EnvelopeStream[S] = currentStates(scope, Offset.Start) /** * Allow to stream all current states from the provided offset - * @param predicate + * @param scope * to filter returned states * @param offset * offset to start from * @param f * the function to apply on each state */ - def currentStates[T](predicate: Predicate, offset: Offset, f: S => T): Stream[Task, T] + def currentStates[T](scope: Scope, offset: Offset, f: S => T): Stream[Task, T] /** * Allow to stream all current states from the beginning - * @param predicate + * @param scope * to filter returned states * @param f * the function to apply on each state */ - def currentStates[T](predicate: Predicate, f: S => T): Stream[Task, T] = currentStates(predicate, Offset.Start, f) + def currentStates[T](scope: Scope, f: S => T): Stream[Task, T] = currentStates(scope, Offset.Start, f) /** * Stream the state changes continuously from the provided offset. - * @param predicate + * @param scope * to filter returned states * @param offset * the start offset */ - def states(predicate: Predicate, offset: Offset): EnvelopeStream[S] + def states(scope: Scope, offset: Offset): EnvelopeStream[S] } object ScopedEventLog { @@ -313,22 +313,22 @@ object ScopedEventLog { stateMachine.evaluate(state, command, maxDuration) } - override def currentEvents(predicate: Predicate, offset: Offset): EnvelopeStream[E] = - eventStore.currentEvents(predicate, offset) + override def currentEvents(scope: Scope, offset: Offset): EnvelopeStream[E] = + eventStore.currentEvents(scope, offset) - override def events(predicate: Predicate, offset: Offset): EnvelopeStream[E] = - eventStore.events(predicate, offset) + override def events(scope: Scope, offset: Offset): EnvelopeStream[E] = + eventStore.events(scope, offset) - override def currentStates(predicate: Predicate, offset: Offset): EnvelopeStream[S] = - stateStore.currentStates(predicate, offset) + override def currentStates(scope: Scope, offset: Offset): EnvelopeStream[S] = + stateStore.currentStates(scope, offset) - override def currentStates[T](predicate: Predicate, offset: Offset, f: S => T): Stream[Task, T] = - currentStates(predicate, offset).map { s => + override def currentStates[T](scope: Scope, offset: Offset, f: S => T): Stream[Task, T] = + currentStates(scope, offset).map { s => f(s.value) } - override def states(predicate: Predicate, offset: Offset): EnvelopeStream[S] = - stateStore.states(predicate, offset) + override def states(scope: Scope, offset: Offset): EnvelopeStream[S] = + stateStore.states(scope, offset) } } diff --git a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/event/EventStreaming.scala b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/event/EventStreaming.scala index 2b8e26c5b5..a77743e9b1 100644 --- a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/event/EventStreaming.scala +++ b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/event/EventStreaming.scala @@ -1,12 +1,12 @@ package ch.epfl.bluebrain.nexus.delta.sourcing.event import cats.data.NonEmptyList -import ch.epfl.bluebrain.nexus.delta.sourcing.Predicate.Root +import ch.epfl.bluebrain.nexus.delta.sourcing.Scope.Root import ch.epfl.bluebrain.nexus.delta.sourcing.config.QueryConfig import ch.epfl.bluebrain.nexus.delta.sourcing.model.{EntityType, Envelope, EnvelopeStream} import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset import ch.epfl.bluebrain.nexus.delta.sourcing.implicits._ -import ch.epfl.bluebrain.nexus.delta.sourcing.{MultiDecoder, Predicate, Transactors} +import ch.epfl.bluebrain.nexus.delta.sourcing.{MultiDecoder, Scope, Transactors} import doobie.implicits._ import doobie.{Fragment, Fragments} import io.circe.Json @@ -14,7 +14,7 @@ import io.circe.Json object EventStreaming { def fetchAll[A]( - predicate: Predicate, + scope: Scope, types: List[EntityType], offset: Offset, config: QueryConfig, @@ -25,12 +25,12 @@ object EventStreaming { Envelope.streamA( offset, offset => - predicate match { + scope match { case Root => - sql"""(${globalEvents(typeIn, offset, config)}) UNION ALL (${scopedEvents(typeIn, predicate, offset, config)}) + sql"""(${globalEvents(typeIn, offset, config)}) UNION ALL (${scopedEvents(typeIn, scope, offset, config)}) |ORDER BY ordering |LIMIT ${config.batchSize}""".stripMargin.query[Envelope[Json]] - case _ => scopedEvents(typeIn, predicate, offset, config).query[Envelope[Json]] + case _ => scopedEvents(typeIn, scope, offset, config).query[Envelope[Json]] }, xas, config @@ -38,7 +38,7 @@ object EventStreaming { } def fetchScoped[A]( - predicate: Predicate, + scope: Scope, types: List[EntityType], offset: Offset, config: QueryConfig, @@ -48,7 +48,7 @@ object EventStreaming { Envelope.streamA( offset, - offset => scopedEvents(typeIn, predicate, offset, config).query[Envelope[Json]], + offset => scopedEvents(typeIn, scope, offset, config).query[Envelope[Json]], xas, config ) @@ -60,9 +60,9 @@ object EventStreaming { |ORDER BY ordering |LIMIT ${cfg.batchSize}""".stripMargin - private def scopedEvents(typeIn: Option[Fragment], predicate: Predicate, o: Offset, cfg: QueryConfig) = + private def scopedEvents(typeIn: Option[Fragment], scope: Scope, o: Offset, cfg: QueryConfig) = fr"""SELECT type, id, value, rev, instant, ordering FROM public.scoped_events - |${Fragments.whereAndOpt(typeIn, predicate.asFragment, o.asFragment)} + |${Fragments.whereAndOpt(typeIn, scope.asFragment, o.asFragment)} |ORDER BY ordering |LIMIT ${cfg.batchSize}""".stripMargin diff --git a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/event/GlobalEventStore.scala b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/event/GlobalEventStore.scala index adb1d6bbc5..402eafebb9 100644 --- a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/event/GlobalEventStore.scala +++ b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/event/GlobalEventStore.scala @@ -4,7 +4,7 @@ import cats.syntax.all._ import ch.epfl.bluebrain.nexus.delta.sourcing.{Serializer, Transactors} import ch.epfl.bluebrain.nexus.delta.sourcing.config.QueryConfig import ch.epfl.bluebrain.nexus.delta.sourcing.event.Event.GlobalEvent -import ch.epfl.bluebrain.nexus.delta.sourcing.implicits.IriInstances +import ch.epfl.bluebrain.nexus.delta.sourcing.implicits._ import ch.epfl.bluebrain.nexus.delta.sourcing.model.{EntityType, Envelope, EnvelopeStream} import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset import ch.epfl.bluebrain.nexus.delta.sourcing.query.{RefreshStrategy, StreamingQuery} diff --git a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/event/ScopedEventStore.scala b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/event/ScopedEventStore.scala index 2206d0e8b9..bacad6308d 100644 --- a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/event/ScopedEventStore.scala +++ b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/event/ScopedEventStore.scala @@ -3,11 +3,11 @@ package ch.epfl.bluebrain.nexus.delta.sourcing.event import cats.syntax.all._ import ch.epfl.bluebrain.nexus.delta.sourcing.config.QueryConfig import ch.epfl.bluebrain.nexus.delta.sourcing.event.Event.ScopedEvent -import ch.epfl.bluebrain.nexus.delta.sourcing.implicits.IriInstances +import ch.epfl.bluebrain.nexus.delta.sourcing.implicits._ import ch.epfl.bluebrain.nexus.delta.sourcing.model.{EntityType, Envelope, EnvelopeStream, ProjectRef} import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset import ch.epfl.bluebrain.nexus.delta.sourcing.query.{RefreshStrategy, StreamingQuery} -import ch.epfl.bluebrain.nexus.delta.sourcing.{Execute, PartitionInit, Predicate, Serializer, Transactors} +import ch.epfl.bluebrain.nexus.delta.sourcing.{Execute, PartitionInit, Scope, Serializer, Transactors} import doobie._ import doobie.implicits._ import doobie.postgres.implicits._ @@ -54,21 +54,21 @@ trait ScopedEventStore[Id, E <: ScopedEvent] { /** * Allow to stream all current events within [[Envelope]] s - * @param predicate + * @param scope * to filter returned events * @param offset * offset to start from */ - def currentEvents(predicate: Predicate, offset: Offset): EnvelopeStream[E] + def currentEvents(scope: Scope, offset: Offset): EnvelopeStream[E] /** * Allow to stream all current events within [[Envelope]] s - * @param predicate + * @param scope * to filter returned events * @param offset * offset to start from */ - def events(predicate: Predicate, offset: Offset): EnvelopeStream[E] + def events(scope: Scope, offset: Offset): EnvelopeStream[E] } @@ -129,14 +129,14 @@ object ScopedEventStore { } private def events( - predicate: Predicate, + scope: Scope, offset: Offset, strategy: RefreshStrategy ): Stream[Task, Envelope[E]] = StreamingQuery[Envelope[E]]( offset, offset => sql"""SELECT type, id, value, rev, instant, ordering FROM public.scoped_events - |${Fragments.whereAndOpt(Some(fr"type = $tpe"), predicate.asFragment, offset.asFragment)} + |${Fragments.whereAndOpt(Some(fr"type = $tpe"), scope.asFragment, offset.asFragment)} |ORDER BY ordering |LIMIT ${config.batchSize}""".stripMargin.query[Envelope[E]], _.offset, @@ -144,11 +144,11 @@ object ScopedEventStore { xas ) - override def currentEvents(predicate: Predicate, offset: Offset): Stream[Task, Envelope[E]] = - events(predicate, offset, RefreshStrategy.Stop) + override def currentEvents(scope: Scope, offset: Offset): Stream[Task, Envelope[E]] = + events(scope, offset, RefreshStrategy.Stop) - override def events(predicate: Predicate, offset: Offset): Stream[Task, Envelope[E]] = - events(predicate, offset, config.refreshStrategy) + override def events(scope: Scope, offset: Offset): Stream[Task, Envelope[E]] = + events(scope, offset, config.refreshStrategy) } } diff --git a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/implicits/TimeRangeInstances.scala b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/implicits/TimeRangeInstances.scala new file mode 100644 index 0000000000..0a48413007 --- /dev/null +++ b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/implicits/TimeRangeInstances.scala @@ -0,0 +1,22 @@ +package ch.epfl.bluebrain.nexus.delta.sourcing.implicits + +import ch.epfl.bluebrain.nexus.delta.kernel.search.TimeRange +import ch.epfl.bluebrain.nexus.delta.kernel.search.TimeRange._ +import ch.epfl.bluebrain.nexus.delta.sourcing.FragmentEncoder +import doobie.implicits._ +import doobie.util.fragment.Fragment +import doobie.postgres.implicits._ + +trait TimeRangeInstances { + + def createTimeRangeFragmentEncoder(columnName: String): FragmentEncoder[TimeRange] = { + val column = Fragment.const(columnName) + FragmentEncoder.instance { + case Anytime => None + case After(value) => Some(fr"$column >= $value") + case Before(value) => Some(fr"$column <= $value") + case Between(start, end) => Some(fr"$column >= $start and $column <= $end") + } + } + +} diff --git a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/implicits/package.scala b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/implicits/package.scala index 91d441f21f..437f1b7efe 100644 --- a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/implicits/package.scala +++ b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/implicits/package.scala @@ -1,3 +1,11 @@ package ch.epfl.bluebrain.nexus.delta.sourcing -package object implicits extends InstantInstances with IriInstances with CirceInstances with DurationInstances +import ch.epfl.bluebrain.nexus.delta.sourcing.syntax.DoobieSyntax + +package object implicits + extends InstantInstances + with IriInstances + with CirceInstances + with DurationInstances + with TimeRangeInstances + with DoobieSyntax diff --git a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/model/FailedElemLogRow.scala b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/model/FailedElemLogRow.scala new file mode 100644 index 0000000000..3c31706f23 --- /dev/null +++ b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/model/FailedElemLogRow.scala @@ -0,0 +1,94 @@ +package ch.epfl.bluebrain.nexus.delta.sourcing.model + +import ch.epfl.bluebrain.nexus.delta.rdf.IriOrBNode.Iri +import ch.epfl.bluebrain.nexus.delta.rdf.Vocabulary.contexts +import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.context.ContextValue +import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.encoder.JsonLdEncoder +import ch.epfl.bluebrain.nexus.delta.sourcing.implicits._ +import ch.epfl.bluebrain.nexus.delta.sourcing.model.FailedElemLogRow.FailedElemData +import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset +import ch.epfl.bluebrain.nexus.delta.sourcing.stream.ProjectionMetadata +import doobie._ +import doobie.postgres.implicits._ +import io.circe.Encoder +import io.circe.generic.semiauto.deriveEncoder + +import java.time.Instant + +/** + * The row of the failed_elem_log table + */ +final case class FailedElemLogRow( + ordering: Offset, + projectionMetadata: ProjectionMetadata, + failedElemData: FailedElemData, + instant: Instant +) + +object FailedElemLogRow { + private type Row = + ( + Offset, + String, + String, + Option[ProjectRef], + Option[Iri], + EntityType, + Offset, + Iri, + Option[ProjectRef], + Int, + String, + String, + String, + Instant + ) + + /** + * Helper case class to structure FailedElemLogRow + */ + final case class FailedElemData( + id: Iri, + project: Option[ProjectRef], + entityType: EntityType, + offset: Offset, + rev: Int, + errorType: String, + message: String, + stackTrace: String + ) + + implicit val failedElemDataEncoder: Encoder.AsObject[FailedElemData] = + deriveEncoder[FailedElemData] + .mapJsonObject(_.remove("stackTrace")) + .mapJsonObject(_.remove("entityType")) + implicit val failedElemDataJsonLdEncoder: JsonLdEncoder[FailedElemData] = + JsonLdEncoder.computeFromCirce(ContextValue(contexts.error)) + + implicit val failedElemLogRow: Read[FailedElemLogRow] = { + Read[Row].map { + case ( + ordering, + name, + module, + project, + resourceId, + entityType, + elemOffset, + elemId, + elemProject, + revision, + errorType, + message, + stackTrace, + instant + ) => + FailedElemLogRow( + ordering, + ProjectionMetadata(module, name, project, resourceId), + FailedElemData(elemId, elemProject, entityType, elemOffset, revision, errorType, message, stackTrace), + instant + ) + } + } +} diff --git a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/offset/Offset.scala b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/offset/Offset.scala index 2c7269dbdf..594fd70245 100644 --- a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/offset/Offset.scala +++ b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/offset/Offset.scala @@ -5,9 +5,9 @@ import ch.epfl.bluebrain.nexus.delta.rdf.Vocabulary.contexts import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.context.ContextValue import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.context.JsonLdContext.keywords import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.encoder.JsonLdEncoder +import ch.epfl.bluebrain.nexus.delta.sourcing.FragmentEncoder import doobie._ import doobie.implicits._ -import doobie.util.fragment.Fragment import io.circe.Codec import io.circe.generic.extras.Configuration import io.circe.generic.extras.semiauto.deriveConfiguredCodec @@ -18,8 +18,6 @@ sealed trait Offset extends Product with Serializable { def value: Long - def asFragment: Option[Fragment] - def ordering: Long = this match { case Offset.Start => 0L case Offset.At(value) => value @@ -32,10 +30,7 @@ object Offset { * To fetch all rows from the beginning */ final case object Start extends Offset { - override val value: Long = 0L - - override def asFragment: Option[Fragment] = None } def from(value: Long): Offset = if (value > 0L) Offset.at(value) else Offset.Start @@ -43,9 +38,7 @@ object Offset { /** * To fetch rows from the given offset */ - final case class At(value: Long) extends Offset { - override def asFragment: Option[Fragment] = Some(fr"ordering > $value") - } + final case class At(value: Long) extends Offset val start: Offset = Start @@ -68,6 +61,11 @@ object Offset { implicit final val offsetGet: Get[Offset] = Get[Long].map(from) implicit final val offsetPut: Put[Offset] = Put[Long].contramap(_.value) + implicit val offsetFragmentEncoder: FragmentEncoder[Offset] = FragmentEncoder.instance { + case Start => None + case At(value) => Some(fr"ordering > $value") + } + implicit val offsetJsonLdEncoder: JsonLdEncoder[Offset] = JsonLdEncoder.computeFromCirce(ContextValue(contexts.offset)) diff --git a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/projections/FailedElemLogStore.scala b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/projections/FailedElemLogStore.scala index 8bf844b2bb..632956120a 100644 --- a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/projections/FailedElemLogStore.scala +++ b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/projections/FailedElemLogStore.scala @@ -3,17 +3,18 @@ package ch.epfl.bluebrain.nexus.delta.sourcing.projections import cats.effect.Clock import cats.implicits._ import ch.epfl.bluebrain.nexus.delta.kernel.Logger +import ch.epfl.bluebrain.nexus.delta.kernel.search.Pagination.FromPagination +import ch.epfl.bluebrain.nexus.delta.kernel.search.TimeRange import ch.epfl.bluebrain.nexus.delta.kernel.utils.IOUtils import ch.epfl.bluebrain.nexus.delta.kernel.utils.ThrowableUtils._ import ch.epfl.bluebrain.nexus.delta.rdf.IriOrBNode.Iri -import ch.epfl.bluebrain.nexus.delta.sourcing.Transactors import ch.epfl.bluebrain.nexus.delta.sourcing.config.QueryConfig import ch.epfl.bluebrain.nexus.delta.sourcing.implicits._ -import ch.epfl.bluebrain.nexus.delta.sourcing.model.ProjectRef +import ch.epfl.bluebrain.nexus.delta.sourcing.model.{FailedElemLogRow, ProjectRef} import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Elem.FailedElem -import ch.epfl.bluebrain.nexus.delta.sourcing.stream.ProjectionStore.FailedElemLogRow import ch.epfl.bluebrain.nexus.delta.sourcing.stream.{ProjectionMetadata, ProjectionStore} +import ch.epfl.bluebrain.nexus.delta.sourcing.{FragmentEncoder, Transactors} import doobie._ import doobie.implicits._ import doobie.postgres.implicits._ @@ -27,6 +28,11 @@ import java.time.Instant */ trait FailedElemLogStore { + /** + * Returns the total number of elems + */ + def count: UIO[Long] + /** * Saves a list of failed elems * @@ -35,7 +41,7 @@ trait FailedElemLogStore { * @param failures * the FailedElem to save */ - def saveFailedElems(metadata: ProjectionMetadata, failures: List[FailedElem]): UIO[Unit] + def save(metadata: ProjectionMetadata, failures: List[FailedElem]): UIO[Unit] /** * Saves one failed elem @@ -53,7 +59,7 @@ trait FailedElemLogStore { * @param offset * failed elem offset */ - def failedElemEntries( + def stream( projectionProject: ProjectRef, projectionId: Iri, offset: Offset @@ -68,11 +74,30 @@ trait FailedElemLogStore { * failed elem offset * @return */ - def failedElemEntries( + def stream( projectionName: String, offset: Offset ): Stream[Task, FailedElemLogRow] + /** + * Return a list of errors for the given projection ordered by instant + * @param project + * the project of the projection + * @param projectionId + * its identifier + * @param pagination + * the pagination to apply + * @param timeRange + * the time range to restrict on + * @return + */ + def list( + project: ProjectRef, + projectionId: Iri, + pagination: FromPagination, + timeRange: TimeRange + ): UIO[List[FailedElemLogRow]] + } object FailedElemLogStore { @@ -82,7 +107,16 @@ object FailedElemLogStore { def apply(xas: Transactors, config: QueryConfig)(implicit clock: Clock[UIO]): FailedElemLogStore = new FailedElemLogStore { - override def saveFailedElems(metadata: ProjectionMetadata, failures: List[FailedElem]): UIO[Unit] = { + implicit val timeRangeFragmentEncoder: FragmentEncoder[TimeRange] = createTimeRangeFragmentEncoder("instant") + + override def count: UIO[Long] = + sql"SELECT count(ordering) FROM public.failed_elem_logs" + .query[Long] + .unique + .transact(xas.read) + .hideErrors + + override def save(metadata: ProjectionMetadata, failures: List[FailedElem]): UIO[Unit] = { val log = logger.debug(s"[${metadata.name}] Saving ${failures.length} failed elems.") val save = IOUtils.instant.flatMap { instant => failures.traverse(elem => saveFailedElem(metadata, elem, instant)).transact(xas.write).void.hideErrors @@ -127,7 +161,7 @@ object FailedElemLogStore { | $instant | )""".stripMargin.update.run.void - override def failedElemEntries( + override def stream( projectionProject: ProjectRef, projectionId: Iri, offset: Offset @@ -141,7 +175,7 @@ object FailedElemLogStore { .streamWithChunkSize(config.batchSize) .transact(xas.read) - override def failedElemEntries(projectionName: String, offset: Offset): Stream[Task, FailedElemLogRow] = + override def stream(projectionName: String, offset: Offset): Stream[Task, FailedElemLogRow] = sql"""SELECT * from public.failed_elem_logs |WHERE projection_name = $projectionName |AND ordering > $offset @@ -149,6 +183,27 @@ object FailedElemLogStore { .query[FailedElemLogRow] .streamWithChunkSize(config.batchSize) .transact(xas.read) + + override def list( + project: ProjectRef, + projectionId: Iri, + pagination: FromPagination, + timeRange: TimeRange + ): UIO[List[FailedElemLogRow]] = { + val where = Fragments.whereAndOpt( + Some(fr"projection_project = $project"), + Some(fr"projection_id = $projectionId"), + timeRange.asFragment + ) + sql"""SELECT * from public.failed_elem_logs + |$where + |ORDER BY ordering ASC + |LIMIT ${pagination.size} OFFSET ${pagination.from}""".stripMargin + .query[FailedElemLogRow] + .to[List] + .transact(xas.read) + .hideErrors + } } } diff --git a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/projections/ProjectionErrors.scala b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/projections/ProjectionErrors.scala index ec2da0f119..cbd7b36187 100644 --- a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/projections/ProjectionErrors.scala +++ b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/projections/ProjectionErrors.scala @@ -8,11 +8,10 @@ import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.api.{JsonLdApi, JsonLdJavaApi} import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.context.RemoteContextResolution import ch.epfl.bluebrain.nexus.delta.sourcing.Transactors import ch.epfl.bluebrain.nexus.delta.sourcing.config.QueryConfig -import ch.epfl.bluebrain.nexus.delta.sourcing.model.ProjectRef +import ch.epfl.bluebrain.nexus.delta.sourcing.model.{FailedElemLogRow, ProjectRef} import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Elem.FailedElem import ch.epfl.bluebrain.nexus.delta.sourcing.stream.ProjectionMetadata -import ch.epfl.bluebrain.nexus.delta.sourcing.stream.ProjectionStore.FailedElemLogRow import fs2.Stream import io.circe.Printer import monix.bio.{Task, UIO} @@ -84,16 +83,16 @@ object ProjectionErrors { val store = FailedElemLogStore(xas, config) override def saveFailedElems(metadata: ProjectionMetadata, failures: List[FailedElem]): UIO[Unit] = - store.saveFailedElems(metadata, failures) + store.save(metadata, failures) override def failedElemEntries( projectionProject: ProjectRef, projectionId: Iri, offset: Offset - ): Stream[Task, FailedElemLogRow] = store.failedElemEntries(projectionProject, projectionId, offset) + ): Stream[Task, FailedElemLogRow] = store.stream(projectionProject, projectionId, offset) override def failedElemEntries(projectionName: String, offset: Offset): Stream[Task, FailedElemLogRow] = - store.failedElemEntries(projectionName, offset) + store.stream(projectionName, offset) override def failedElemSses(projectionProject: ProjectRef, projectionId: Iri, offset: Offset)(implicit rcr: RemoteContextResolution diff --git a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/query/StreamingQuery.scala b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/query/StreamingQuery.scala index 77d3406556..a5e698ce47 100644 --- a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/query/StreamingQuery.scala +++ b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/query/StreamingQuery.scala @@ -2,8 +2,7 @@ package ch.epfl.bluebrain.nexus.delta.sourcing.query import cats.effect.ExitCase import ch.epfl.bluebrain.nexus.delta.rdf.IriOrBNode.Iri -import ch.epfl.bluebrain.nexus.delta.sourcing.Predicate.Project -import ch.epfl.bluebrain.nexus.delta.sourcing.Transactors +import ch.epfl.bluebrain.nexus.delta.sourcing.{Scope, Transactors} import ch.epfl.bluebrain.nexus.delta.sourcing.config.QueryConfig import ch.epfl.bluebrain.nexus.delta.sourcing.implicits._ import ch.epfl.bluebrain.nexus.delta.sourcing.model.{EntityType, Label, ProjectRef, Tag} @@ -41,7 +40,7 @@ object StreamingQuery { * the transactors */ def remaining(project: ProjectRef, tag: Tag, start: Offset, xas: Transactors): UIO[Option[RemainingElems]] = { - val where = Fragments.whereAndOpt(Project(project).asFragment, Some(fr"tag = $tag"), start.asFragment) + val where = Fragments.whereAndOpt(Scope(project).asFragment, Some(fr"tag = $tag"), start.asFragment) sql"""SELECT count(ordering), max(instant) |FROM public.scoped_states |$where @@ -81,7 +80,7 @@ object StreamingQuery { xas: Transactors ): Stream[Task, Elem[Unit]] = { def query(offset: Offset): Query0[Elem[Unit]] = { - val where = Fragments.whereAndOpt(Project(project).asFragment, Some(fr"tag = $tag"), offset.asFragment) + val where = Fragments.whereAndOpt(Scope(project).asFragment, Some(fr"tag = $tag"), offset.asFragment) sql"""((SELECT 'newState', type, id, org, project, instant, ordering, rev |FROM public.scoped_states |$where @@ -137,7 +136,7 @@ object StreamingQuery { decodeValue: (EntityType, Json) => Task[A] ): Stream[Task, Elem[A]] = { def query(offset: Offset): Query0[Elem[Json]] = { - val where = Fragments.whereAndOpt(Project(project).asFragment, Some(fr"tag = $tag"), offset.asFragment) + val where = Fragments.whereAndOpt(Scope(project).asFragment, Some(fr"tag = $tag"), offset.asFragment) sql"""((SELECT 'newState', type, id, org, project, value, instant, ordering, rev |FROM public.scoped_states |$where diff --git a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/state/ScopedStateStore.scala b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/state/ScopedStateStore.scala index 933ad772fc..f49b62a7f5 100644 --- a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/state/ScopedStateStore.scala +++ b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/state/ScopedStateStore.scala @@ -6,12 +6,13 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.config.QueryConfig import ch.epfl.bluebrain.nexus.delta.sourcing.implicits.IriInstances import ch.epfl.bluebrain.nexus.delta.sourcing.model.Tag.Latest import ch.epfl.bluebrain.nexus.delta.sourcing.model._ +import ch.epfl.bluebrain.nexus.delta.sourcing.implicits._ import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset import ch.epfl.bluebrain.nexus.delta.sourcing.query.{RefreshStrategy, StreamingQuery} import ch.epfl.bluebrain.nexus.delta.sourcing.state.ScopedStateStore.StateNotFound import ch.epfl.bluebrain.nexus.delta.sourcing.state.ScopedStateStore.StateNotFound.{TagNotFound, UnknownState} import ch.epfl.bluebrain.nexus.delta.sourcing.state.State.ScopedState -import ch.epfl.bluebrain.nexus.delta.sourcing.{Execute, PartitionInit, Predicate, Serializer, Transactors} +import ch.epfl.bluebrain.nexus.delta.sourcing.{Execute, PartitionInit, Scope, Serializer, Transactors} import doobie._ import doobie.implicits._ import doobie.postgres.implicits._ @@ -19,7 +20,7 @@ import io.circe.Decoder import monix.bio.IO /** - * Allows to save/fetch [[ScopeState]] from the database + * Allows to save/fetch [[ScopedState]] from the database */ trait ScopedStateStore[Id, S <: ScopedState] { @@ -72,48 +73,48 @@ trait ScopedStateStore[Id, S <: ScopedState] { * Fetches latest states from the given type from the beginning. * * The stream is completed when it reaches the end. - * @param predicate + * @param scope * to filter returned states */ - def currentStates(predicate: Predicate): EnvelopeStream[S] = - currentStates(predicate, Offset.Start) + def currentStates(scope: Scope): EnvelopeStream[S] = + currentStates(scope, Offset.Start) /** * Fetches states from the given type with the given tag from the beginning. * * The stream is completed when it reaches the end. - * @param predicate + * @param scope * to filter returned states * @param tag * only states with this tag will be selected */ - def currentStates(predicate: Predicate, tag: Tag): EnvelopeStream[S] = - currentStates(predicate, tag, Offset.Start) + def currentStates(scope: Scope, tag: Tag): EnvelopeStream[S] = + currentStates(scope, tag, Offset.Start) /** * Fetches latest states from the given type from the provided offset. * * The stream is completed when it reaches the end. - * @param predicate + * @param scope * to filter returned states * @param offset * the offset */ - def currentStates(predicate: Predicate, offset: Offset): EnvelopeStream[S] = - currentStates(predicate, Latest, offset) + def currentStates(scope: Scope, offset: Offset): EnvelopeStream[S] = + currentStates(scope, Latest, offset) /** * Fetches states from the given type with the given tag from the provided offset. * * The stream is completed when it reaches the end. - * @param predicate + * @param scope * to filter returned states * @param tag * only states with this tag will be selected * @param offset * the offset */ - def currentStates(predicate: Predicate, tag: Tag, offset: Offset): EnvelopeStream[S] + def currentStates(scope: Scope, tag: Tag, offset: Offset): EnvelopeStream[S] /** * Fetches latest states from the given type from the beginning @@ -121,11 +122,11 @@ trait ScopedStateStore[Id, S <: ScopedState] { * The stream is not completed when it reaches the end of the existing events, but it continues to push new events * when new events are persisted. * - * @param predicate + * @param scope * to filter returned states */ - def states(predicate: Predicate): EnvelopeStream[S] = - states(predicate, Latest, Offset.Start) + def states(scope: Scope): EnvelopeStream[S] = + states(scope, Latest, Offset.Start) /** * Fetches states from the given type with the given tag from the beginning @@ -133,12 +134,12 @@ trait ScopedStateStore[Id, S <: ScopedState] { * The stream is not completed when it reaches the end of the existing events, but it continues to push new events * when new states are persisted. * - * @param predicate + * @param scope * to filter returned states * @param tag * only states with this tag will be selected */ - def states(predicate: Predicate, tag: Tag): EnvelopeStream[S] = states(predicate, tag, Offset.Start) + def states(scope: Scope, tag: Tag): EnvelopeStream[S] = states(scope, tag, Offset.Start) /** * Fetches latest states from the given type from the provided offset @@ -146,13 +147,13 @@ trait ScopedStateStore[Id, S <: ScopedState] { * The stream is not completed when it reaches the end of the existing events, but it continues to push new events * when new events are persisted. * - * @param predicate + * @param scope * to filter returned states * @param offset * the offset */ - def states(predicate: Predicate, offset: Offset): EnvelopeStream[S] = - states(predicate, Latest, offset) + def states(scope: Scope, offset: Offset): EnvelopeStream[S] = + states(scope, Latest, offset) /** * Fetches states from the given type with the given tag from the provided offset @@ -160,14 +161,14 @@ trait ScopedStateStore[Id, S <: ScopedState] { * The stream is not completed when it reaches the end of the existing events, but it continues to push new events * when new states are persisted. * - * @param predicate + * @param scope * to filter returned states * @param tag * only states with this tag will be selected * @param offset * the offset */ - def states(predicate: Predicate, tag: Tag, offset: Offset): EnvelopeStream[S] + def states(scope: Scope, tag: Tag, offset: Offset): EnvelopeStream[S] } @@ -274,7 +275,7 @@ object ScopedStateStore { } private def states( - predicate: Predicate, + scope: Scope, tag: Tag, offset: Offset, strategy: RefreshStrategy @@ -284,7 +285,7 @@ object ScopedStateStore { offset => // format: off sql"""SELECT type, id, value, rev, instant, ordering FROM public.scoped_states - |${Fragments.whereAndOpt(Some(fr"type = $tpe"), predicate.asFragment, Some(fr"tag = $tag"), offset.asFragment)} + |${Fragments.whereAndOpt(Some(fr"type = $tpe"), scope.asFragment, Some(fr"tag = $tag"), offset.asFragment)} |ORDER BY ordering |LIMIT ${config.batchSize}""".stripMargin.query[Envelope[S]], _.offset, @@ -292,11 +293,11 @@ object ScopedStateStore { xas ) - override def currentStates(predicate: Predicate, tag: Tag, offset: Offset): EnvelopeStream[S] = - states(predicate, tag, offset, RefreshStrategy.Stop) + override def currentStates(scope: Scope, tag: Tag, offset: Offset): EnvelopeStream[S] = + states(scope, tag, offset, RefreshStrategy.Stop) - override def states(predicate: Predicate, tag: Tag, offset: Offset): EnvelopeStream[S] = - states(predicate, tag, offset, config.refreshStrategy) + override def states(scope: Scope, tag: Tag, offset: Offset): EnvelopeStream[S] = + states(scope, tag, offset, config.refreshStrategy) } } diff --git a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/ProjectionStore.scala b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/ProjectionStore.scala index 439ee869d1..68d5075dbb 100644 --- a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/ProjectionStore.scala +++ b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/ProjectionStore.scala @@ -3,22 +3,16 @@ package ch.epfl.bluebrain.nexus.delta.sourcing.stream import cats.effect.Clock import ch.epfl.bluebrain.nexus.delta.kernel.utils.IOUtils import ch.epfl.bluebrain.nexus.delta.rdf.IriOrBNode.Iri -import ch.epfl.bluebrain.nexus.delta.rdf.Vocabulary.contexts -import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.context.ContextValue -import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.encoder.JsonLdEncoder import ch.epfl.bluebrain.nexus.delta.sourcing.Transactors import ch.epfl.bluebrain.nexus.delta.sourcing.config.QueryConfig import ch.epfl.bluebrain.nexus.delta.sourcing.implicits._ -import ch.epfl.bluebrain.nexus.delta.sourcing.model.{EntityType, ProjectRef} +import ch.epfl.bluebrain.nexus.delta.sourcing.model.ProjectRef import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset -import ch.epfl.bluebrain.nexus.delta.sourcing.stream.ProjectionStore.FailedElemLogRow.FailedElemData import ch.epfl.bluebrain.nexus.delta.sourcing.stream.ProjectionStore.ProjectionProgressRow import doobie._ import doobie.implicits._ import doobie.postgres.implicits._ import fs2.Stream -import io.circe.Encoder -import io.circe.generic.semiauto.deriveEncoder import monix.bio.{Task, UIO} import java.time.Instant @@ -141,82 +135,4 @@ object ProjectionStore { } } - /** - * The row of the failed_elem_log table - */ - final case class FailedElemLogRow( - ordering: Offset, - projectionMetadata: ProjectionMetadata, - failedElemData: FailedElemData, - instant: Instant - ) - - object FailedElemLogRow { - private type Row = - ( - Offset, - String, - String, - Option[ProjectRef], - Option[Iri], - EntityType, - Offset, - Iri, - Option[ProjectRef], - Int, - String, - String, - String, - Instant - ) - - /** - * Helper case class to structure FailedElemLogRow - */ - final case class FailedElemData( - id: Iri, - project: Option[ProjectRef], - entityType: EntityType, - offset: Offset, - rev: Int, - errorType: String, - message: String, - stackTrace: String - ) - - implicit val failedElemDataEncoder: Encoder.AsObject[FailedElemData] = - deriveEncoder[FailedElemData] - .mapJsonObject(_.remove("stackTrace")) - .mapJsonObject(_.remove("entityType")) - implicit val failedElemDataJsonLdEncoder: JsonLdEncoder[FailedElemData] = - JsonLdEncoder.computeFromCirce(ContextValue(contexts.error)) - - implicit val failedElemLogRow: Read[FailedElemLogRow] = { - Read[Row].map { - case ( - ordering, - name, - module, - project, - resourceId, - entityType, - elemOffset, - elemId, - elemProject, - revision, - errorType, - message, - stackTrace, - instant - ) => - FailedElemLogRow( - ordering, - ProjectionMetadata(module, name, project, resourceId), - FailedElemData(elemId, elemProject, entityType, elemOffset, revision, errorType, message, stackTrace), - instant - ) - } - } - } - } diff --git a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/syntax/DoobieSyntax.scala b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/syntax/DoobieSyntax.scala new file mode 100644 index 0000000000..f1c7b465d9 --- /dev/null +++ b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/syntax/DoobieSyntax.scala @@ -0,0 +1,20 @@ +package ch.epfl.bluebrain.nexus.delta.sourcing.syntax + +import ch.epfl.bluebrain.nexus.delta.sourcing.FragmentEncoder +import ch.epfl.bluebrain.nexus.delta.sourcing.syntax.DoobieSyntax.FragmentEncoderOps +import doobie.util.fragment.Fragment + +/** + * This package provides syntax via enrichment classes for Doobie + */ +trait DoobieSyntax { + + implicit final def fragmentEncoderOps[A](value: A): FragmentEncoderOps[A] = new FragmentEncoderOps[A](value) + +} + +object DoobieSyntax { + implicit class FragmentEncoderOps[A](private val value: A) extends AnyVal { + def asFragment(implicit encoder: FragmentEncoder[A]): Option[Fragment] = encoder(value) + } +} diff --git a/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/ScopedEventLogSuite.scala b/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/ScopedEventLogSuite.scala index 5664f3d6a2..568059b14e 100644 --- a/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/ScopedEventLogSuite.scala +++ b/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/ScopedEventLogSuite.scala @@ -204,7 +204,7 @@ class ScopedEventLogSuite extends BioSuite with Doobie.Fixture { test("Stream continuously the current states") { for { queue <- Queue.unbounded[Task, Envelope[PullRequestState]] - _ <- eventLog.states(Predicate.root, Offset.Start).through(queue.enqueue).compile.drain.timeout(500.millis) + _ <- eventLog.states(Scope.root, Offset.Start).through(queue.enqueue).compile.drain.timeout(500.millis) elems <- queue.tryDequeueChunk1(Int.MaxValue).map(opt => opt.map(_.toList).getOrElse(Nil)) _ = elems.assertSize(2) } yield () diff --git a/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/event/EventStreamingSuite.scala b/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/event/EventStreamingSuite.scala index 5fdd1313d7..51274c660e 100644 --- a/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/event/EventStreamingSuite.scala +++ b/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/event/EventStreamingSuite.scala @@ -13,7 +13,7 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.model.Identity.Anonymous import ch.epfl.bluebrain.nexus.delta.sourcing.model.ProjectRef import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset import ch.epfl.bluebrain.nexus.delta.sourcing.query.RefreshStrategy -import ch.epfl.bluebrain.nexus.delta.sourcing.{Arithmetic, MultiDecoder, Predicate, PullRequest} +import ch.epfl.bluebrain.nexus.delta.sourcing.{Arithmetic, MultiDecoder, PullRequest, Scope} import ch.epfl.bluebrain.nexus.testkit.bio.BioSuite import ch.epfl.bluebrain.nexus.delta.sourcing.postgres.Doobie import doobie.implicits._ @@ -82,7 +82,7 @@ class EventStreamingSuite extends BioSuite with Doobie.Fixture with Doobie.Asser test("Get events of all types from the start") { EventStreaming .fetchAll( - Predicate.root, + Scope.root, List.empty, Offset.Start, queryConfig, @@ -102,7 +102,7 @@ class EventStreamingSuite extends BioSuite with Doobie.Fixture with Doobie.Asser test("Get events of all types from offset 2") { EventStreaming .fetchAll( - Predicate.root, + Scope.root, List.empty, Offset.at(2L), queryConfig, @@ -120,7 +120,7 @@ class EventStreamingSuite extends BioSuite with Doobie.Fixture with Doobie.Asser test("Get PR events from offset 2") { EventStreaming .fetchAll( - Predicate.root, + Scope.root, List(PullRequest.entityType), Offset.at(2L), queryConfig, @@ -137,7 +137,7 @@ class EventStreamingSuite extends BioSuite with Doobie.Fixture with Doobie.Asser test("Get events from project 1 from offset 1") { EventStreaming .fetchAll( - Predicate.Project(project1), + Scope.Project(project1), List.empty, Offset.at(1L), queryConfig, @@ -153,7 +153,7 @@ class EventStreamingSuite extends BioSuite with Doobie.Fixture with Doobie.Asser test("Get events from org 1 from offset 1") { EventStreaming .fetchAll( - Predicate.Org(project1.organization), + Scope.Org(project1.organization), List.empty, Offset.at(1L), queryConfig, @@ -170,7 +170,7 @@ class EventStreamingSuite extends BioSuite with Doobie.Fixture with Doobie.Asser test("Get all scoped events from offset 1") { EventStreaming .fetchScoped( - Predicate.Root, + Scope.Root, List.empty, Offset.at(1L), queryConfig, diff --git a/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/event/ScopedEventStoreSuite.scala b/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/event/ScopedEventStoreSuite.scala index b4597337f1..a3545b8d52 100644 --- a/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/event/ScopedEventStoreSuite.scala +++ b/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/event/ScopedEventStoreSuite.scala @@ -10,7 +10,7 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.model.Identity.{Anonymous, User} import ch.epfl.bluebrain.nexus.delta.sourcing.model.{Envelope, Label, ProjectRef} import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset import ch.epfl.bluebrain.nexus.delta.sourcing.query.RefreshStrategy -import ch.epfl.bluebrain.nexus.delta.sourcing.{Predicate, PullRequest} +import ch.epfl.bluebrain.nexus.delta.sourcing.{PullRequest, Scope} import ch.epfl.bluebrain.nexus.testkit.bio.BioSuite import ch.epfl.bluebrain.nexus.delta.sourcing.postgres.Doobie import doobie.implicits._ @@ -90,28 +90,28 @@ class ScopedEventStoreSuite extends BioSuite with Doobie.Fixture with Doobie.Ass test("Fetch all current events from the beginning") { store - .currentEvents(Predicate.Root, Offset.Start) + .currentEvents(Scope.Root, Offset.Start) .assert(envelope1, envelope2, envelope3, envelope4, envelope5, envelope6) } test("Fetch current events for `org` from offset 2") { - store.currentEvents(Predicate.Org(Label.unsafe("org")), Offset.at(2L)).assert(envelope3, envelope4, envelope5) + store.currentEvents(Scope.Org(Label.unsafe("org")), Offset.at(2L)).assert(envelope3, envelope4, envelope5) } test("Fetch current events for `proj1` from the beginning") { - store.currentEvents(Predicate.Project(project1), Offset.Start).assert(envelope1, envelope2, envelope3, envelope4) + store.currentEvents(Scope.Project(project1), Offset.Start).assert(envelope1, envelope2, envelope3, envelope4) } test("Fetch all events from the beginning") { - store.events(Predicate.Root, Offset.Start).assert(envelope1, envelope2, envelope3, envelope4, envelope5, envelope6) + store.events(Scope.Root, Offset.Start).assert(envelope1, envelope2, envelope3, envelope4, envelope5, envelope6) } test(s"Fetch current events for `${project1.organization}` from offset 2") { - store.events(Predicate.Org(project1.organization), Offset.at(2L)).assert(envelope3, envelope4, envelope5) + store.events(Scope.Org(project1.organization), Offset.at(2L)).assert(envelope3, envelope4, envelope5) } test(s"Fetch current events for `$project1` from the beginning") { - store.events(Predicate.Project(project1), Offset.Start).assert(envelope1, envelope2, envelope3, envelope4) + store.events(Scope.Project(project1), Offset.Start).assert(envelope1, envelope2, envelope3, envelope4) } } diff --git a/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/projections/FailedElemLogStoreSuite.scala b/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/projections/FailedElemLogStoreSuite.scala index ad332d9c83..651a98660e 100644 --- a/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/projections/FailedElemLogStoreSuite.scala +++ b/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/projections/FailedElemLogStoreSuite.scala @@ -1,7 +1,10 @@ package ch.epfl.bluebrain.nexus.delta.sourcing.projections +import cats.syntax.all._ +import ch.epfl.bluebrain.nexus.delta.kernel.search.Pagination.FromPagination +import ch.epfl.bluebrain.nexus.delta.kernel.search.{Pagination, TimeRange} +import ch.epfl.bluebrain.nexus.delta.rdf.IriOrBNode.Iri import ch.epfl.bluebrain.nexus.delta.rdf.Vocabulary.nxv -import ch.epfl.bluebrain.nexus.delta.rdf.syntax._ import ch.epfl.bluebrain.nexus.delta.sourcing.PurgeElemFailures import ch.epfl.bluebrain.nexus.delta.sourcing.config.QueryConfig import ch.epfl.bluebrain.nexus.delta.sourcing.model.{EntityType, ProjectRef} @@ -10,138 +13,161 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.postgres.Doobie import ch.epfl.bluebrain.nexus.delta.sourcing.query.RefreshStrategy import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Elem.FailedElem import ch.epfl.bluebrain.nexus.delta.sourcing.stream.ProjectionMetadata -import ch.epfl.bluebrain.nexus.testkit.IOFixedClock +import ch.epfl.bluebrain.nexus.testkit.MutableClock import ch.epfl.bluebrain.nexus.testkit.bio.BioSuite -import munit.AnyFixture +import munit.{AnyFixture, Location} import java.time.Instant import scala.concurrent.duration.{DurationInt, FiniteDuration} -class FailedElemLogStoreSuite extends BioSuite with IOFixedClock with Doobie.Fixture with Doobie.Assertions { +class FailedElemLogStoreSuite extends BioSuite with MutableClock.Fixture with Doobie.Fixture with Doobie.Assertions { - override def munitFixtures: Seq[AnyFixture[_]] = List(doobie) + override def munitFixtures: Seq[AnyFixture[_]] = List(doobie, clock) private lazy val xas = doobie() + implicit private lazy val mutableClock: MutableClock = clock() + private lazy val store = FailedElemLogStore(xas, QueryConfig(10, RefreshStrategy.Stop)) - private val name = "offset" - private val project = ProjectRef.unsafe("org", "proj") - private val resource = iri"https://resource" - private val metadata = ProjectionMetadata("test", name, Some(project), Some(resource)) + private def createMetadata(project: ProjectRef, id: Iri) = + ProjectionMetadata("test", s"project|$id", Some(project), Some(id)) + + private val project1 = ProjectRef.unsafe("org", "proj") + private val projection11 = nxv + "projection11" + private val metadata11 = createMetadata(project1, projection11) + private val projection12 = nxv + "projection12" + private val metadata12 = createMetadata(project1, projection12) + + private val project2 = ProjectRef.unsafe("org", "proj2") + private val metadata21 = createMetadata(project2, projection12) private val id = nxv + "id" private val error = new RuntimeException("boom") private val rev = 1 - private val fail1 = FailedElem(EntityType("ACL"), id, Some(project), Instant.EPOCH, Offset.At(42L), error, rev) - private val fail2 = FailedElem(EntityType("Schema"), id, Some(project), Instant.EPOCH, Offset.At(42L), error, rev) - test("Return no failed elem entries by name") { - for { - entries <- store.failedElemEntries(name, Offset.start).compile.toList - _ = entries.assertEmpty() - } yield () - } + private val entityType = EntityType("Test") + private def createFailedElem(project: ProjectRef, offset: Long) = + FailedElem(entityType, id, Some(project), Instant.EPOCH.plusSeconds(offset), Offset.at(offset), error, rev) - test("Return no failed elem entries by (project, id)") { + private val fail1 = createFailedElem(project1, 1L) + private val fail2 = createFailedElem(project1, 2L) + private val fail3 = createFailedElem(project1, 3L) + private val fail4 = createFailedElem(project1, 4L) + private val fail5 = createFailedElem(project2, 5L) + + private def assertSave(metadata: ProjectionMetadata, failed: FailedElem) = for { - entries <- store.failedElemEntries(project, resource, Offset.start).compile.toList - _ = entries.assertEmpty() + _ <- mutableClock.set(failed.instant) + _ <- store.save(metadata, List(failed)) } yield () - } - test("Insert empty list of failed elem") { + private def assertStream(metadata: ProjectionMetadata, offset: Offset, expected: List[FailedElem])(implicit + loc: Location + ) = { + val expectedOffsets = expected.map(_.offset) for { - _ <- store.saveFailedElems(metadata, List.empty) - entries <- store.failedElemEntries(name, Offset.start).compile.toList - _ = entries.assertEmpty() + _ <- store.stream(metadata.name, offset).map(_.failedElemData.offset).assert(expectedOffsets) + _ <- (metadata.project, metadata.resourceId).traverseN { case (project, resourceId) => + store.stream(project, resourceId, offset).map(_.failedElemData.offset).assert(expectedOffsets) + } } yield () } - test("Return no failed elem entries by name") { - for { - entries <- store.failedElemEntries(name, Offset.start).compile.toList - _ = entries.assertEmpty() - } yield () + private def assertList( + project: ProjectRef, + projectionId: Iri, + pagination: FromPagination, + timeRange: TimeRange, + expected: List[FailedElem] + )(implicit loc: Location) = { + val expectedOffsets = expected.map(_.offset) + store + .list(project, projectionId, pagination, timeRange) + .map(_.map(_.failedElemData.offset)) + .assert(expectedOffsets) } - test("Return no failed elem entries by (project, id)") { + test("Insert empty list of failures") { for { - entries <- store.failedElemEntries(project, resource, Offset.start).compile.toList - _ = entries.assertEmpty() + _ <- store.save(metadata11, List.empty) + _ <- assertStream(metadata11, Offset.Start, List.empty) } yield () } - test("Insert empty list of failed elem") { + test("Insert several failures") { for { - _ <- store.saveFailedElems(metadata, List.empty) - entries <- store.failedElemEntries(name, Offset.start).compile.toList - _ = entries.assertEmpty() + _ <- assertSave(metadata11, fail1) + _ <- assertSave(metadata12, fail2) + _ <- assertSave(metadata12, fail3) + _ <- assertSave(metadata12, fail4) + _ <- assertSave(metadata21, fail5) } yield () } - test("Insert failed elem") { + test(s"Get stream of failures for ${metadata11.name}") { for { - _ <- store.saveFailedElems(metadata, List(fail1)) - entries <- store.failedElemEntries(name, Offset.start).compile.toList + entries <- store.stream(metadata11.name, Offset.start).compile.toList r = entries.assertOneElem - _ = assertEquals(r.projectionMetadata, metadata) + _ = assertEquals(r.projectionMetadata, metadata11) _ = assertEquals(r.ordering, Offset.At(1L)) - _ = assertEquals(r.instant, Instant.EPOCH) + _ = assertEquals(r.instant, fail1.instant) elem = r.failedElemData - _ = assertEquals(elem.offset, Offset.At(42L)) + _ = assertEquals(elem.offset, Offset.At(1L)) _ = assertEquals(elem.errorType, "java.lang.RuntimeException") _ = assertEquals(elem.id, id) - _ = assertEquals(elem.entityType, EntityType("ACL")) + _ = assertEquals(elem.entityType, entityType) _ = assertEquals(elem.rev, rev) - _ = assertEquals(elem.project, Some(project)) + _ = assertEquals(elem.project, Some(project1)) } yield () } - test("Insert several failed elem") { - for { - _ <- store.saveFailedElems(metadata, List(fail1, fail2)) - entries <- store.failedElemEntries(name, Offset.start).compile.toList - _ = entries.assertSize(3) - } yield () + test(s"Get stream of failures for ${metadata12.name}") { + assertStream(metadata12, Offset.start, List(fail2, fail3, fail4)) } - test("Return failed elem entries by (project, id)") { - for { - entries <- store.failedElemEntries(project, resource, Offset.start).compile.toList - _ = entries.assertSize(3) - } yield () + test("Get an empty stream for an unknown projection") { + val unknownMetadata = createMetadata(ProjectRef.unsafe("xxx", "xxx"), nxv + "xxx") + assertStream(unknownMetadata, Offset.start, List.empty) } - test("Return empty if no failed elem is found by name") { - for { - entries <- store.failedElemEntries("other", Offset.start).compile.toList - _ = entries.assertEmpty() - } yield () + test(s"List all failures for ${metadata12.name}") { + assertList(project1, projection12, Pagination.OnePage, TimeRange.Anytime, List(fail2, fail3, fail4)) } - test("Return empty if not found by (project, id)") { - for { - entries <- store.failedElemEntries(project, iri"https://example.com", Offset.start).compile.toList - _ = entries.assertEmpty() - } yield () + test(s"Paginate to list 'fail3' for ${metadata12.name}") { + assertList(project1, projection12, FromPagination(1, 1), TimeRange.Anytime, List(fail3)) + } + + test(s"Paginate to list 'fail3' and 'fail4' for ${metadata12.name}") { + assertList(project1, projection12, FromPagination(1, 2), TimeRange.Anytime, List(fail3, fail4)) + } + + test(s"List failures before ${fail3.instant} for ${metadata12.name}") { + assertList(project1, projection12, Pagination.OnePage, TimeRange.Before(fail3.instant), List(fail2, fail3)) + } + + private val between = TimeRange.Between(fail2.instant.plusMillis(1L), fail3.instant.plusMillis(1L)) + test(s"List failures between ${between.start} and ${between.end} for ${metadata12.name}") { + assertList(project1, projection12, Pagination.OnePage, between, List(fail3)) } - test("Purge failed elements after predefined ttl") { - val failedElemTtl = 14.days + test("Purge failures after predefined ttl") { + val failedElemTtl = 14.days + val purgeElemFailures = new PurgeElemFailures(xas, failedElemTtl) - lazy val purgeElemFailures: FiniteDuration => PurgeElemFailures = timeTravel => - new PurgeElemFailures(xas, failedElemTtl)( - IOFixedClock.ioClock(Instant.EPOCH.plusMillis(timeTravel.toMillis)) - ) + def timeTravel(duration: FiniteDuration) = mutableClock.set(Instant.EPOCH.plusMillis(duration.toMillis)) for { - _ <- purgeElemFailures(failedElemTtl - 500.millis)() - entries <- store.failedElemEntries(project, resource, Offset.start).compile.toList - _ = entries.assertSize(3) // no elements are deleted after 13 days - _ <- purgeElemFailures(failedElemTtl + 500.millis)() - entries2 <- store.failedElemEntries(project, resource, Offset.start).compile.toList - _ = entries2.assertEmpty() // all elements were deleted after 14 days + _ <- store.count.assert(5L) + _ <- timeTravel(failedElemTtl - 500.millis) + _ <- purgeElemFailures() + // no elements are deleted after 13 days + _ <- store.count.assert(5L) + _ <- timeTravel(failedElemTtl + 10.seconds) + _ <- purgeElemFailures() + // all elements were deleted after 14 days + _ <- store.count.assert(0L) } yield () } diff --git a/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/state/ScopedStateStoreSuite.scala b/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/state/ScopedStateStoreSuite.scala index 0ce01a5b67..2092d5fd50 100644 --- a/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/state/ScopedStateStoreSuite.scala +++ b/delta/sourcing-psql/src/test/scala/ch/epfl/bluebrain/nexus/delta/sourcing/state/ScopedStateStoreSuite.scala @@ -13,7 +13,7 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.model.{Envelope, Label, ProjectRef import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset import ch.epfl.bluebrain.nexus.delta.sourcing.query.RefreshStrategy import ch.epfl.bluebrain.nexus.delta.sourcing.state.ScopedStateStore.StateNotFound.{TagNotFound, UnknownState} -import ch.epfl.bluebrain.nexus.delta.sourcing.{EntityCheck, Predicate, PullRequest} +import ch.epfl.bluebrain.nexus.delta.sourcing.{EntityCheck, PullRequest, Scope} import ch.epfl.bluebrain.nexus.testkit.bio.BioSuite import ch.epfl.bluebrain.nexus.delta.sourcing.postgres.Doobie import doobie.implicits._ @@ -84,35 +84,35 @@ class ScopedStateStoreSuite extends BioSuite with Doobie.Fixture with Doobie.Ass } test("Fetch all current latest states from the beginning") { - store.currentStates(Predicate.Root).assert(envelope1, envelope2, envelope3, envelope4) + store.currentStates(Scope.Root).assert(envelope1, envelope2, envelope3, envelope4) } test("Fetch all latest states from the beginning") { - store.states(Predicate.Root).assert(envelope1, envelope2, envelope3, envelope4) + store.states(Scope.Root).assert(envelope1, envelope2, envelope3, envelope4) } test(s"Fetch current states for ${project1.organization} from the beginning") { - store.currentStates(Predicate.Org(project1.organization)).assert(envelope1, envelope2, envelope3) + store.currentStates(Scope.Org(project1.organization)).assert(envelope1, envelope2, envelope3) } test(s"Fetch states for ${project1.organization} from the beginning") { - store.states(Predicate.Org(project1.organization)).assert(envelope1, envelope2, envelope3) + store.states(Scope.Org(project1.organization)).assert(envelope1, envelope2, envelope3) } test(s"Fetch current states for $project1 from offset 2") { - store.currentStates(Predicate.Project(project1), Offset.at(1L)).assert(envelope2) + store.currentStates(Scope.Project(project1), Offset.at(1L)).assert(envelope2) } test(s"Fetch states for $project1 from offset 2") { - store.states(Predicate.Project(project1), Offset.at(1L)).assert(envelope2) + store.states(Scope.Project(project1), Offset.at(1L)).assert(envelope2) } test(s"Fetch all current states from the beginning for tag `$customTag`") { - store.currentStates(Predicate.Root, customTag).assert(envelope1Tagged, envelope3Tagged) + store.currentStates(Scope.Root, customTag).assert(envelope1Tagged, envelope3Tagged) } test(s"Fetch all states from the beginning for tag `$customTag`") { - store.states(Predicate.Root, customTag).assert(envelope1Tagged, envelope3Tagged) + store.states(Scope.Root, customTag).assert(envelope1Tagged, envelope3Tagged) } test("Update state 1 successfully") { @@ -124,7 +124,7 @@ class ScopedStateStoreSuite extends BioSuite with Doobie.Fixture with Doobie.Ass } test("Fetch all current latest states from the beginning") { - store.currentStates(Predicate.Root).assert(envelope2, envelope3, envelope4, envelopeUpdated1) + store.currentStates(Scope.Root).assert(envelope2, envelope3, envelope4, envelopeUpdated1) } test("Delete tagged state 3 successfully") { @@ -136,7 +136,7 @@ class ScopedStateStoreSuite extends BioSuite with Doobie.Fixture with Doobie.Ass } test(s"Fetch all states from the beginning for tag `$customTag` after deletion of `state3`") { - store.states(Predicate.Root, customTag).assert(envelope1Tagged) + store.states(Scope.Root, customTag).assert(envelope1Tagged) } test("Check that the given ids does exist") { diff --git a/delta/testkit/src/main/scala/ch/epfl/bluebrain/nexus/testkit/MutableClock.scala b/delta/testkit/src/main/scala/ch/epfl/bluebrain/nexus/testkit/MutableClock.scala new file mode 100644 index 0000000000..77418f34d1 --- /dev/null +++ b/delta/testkit/src/main/scala/ch/epfl/bluebrain/nexus/testkit/MutableClock.scala @@ -0,0 +1,29 @@ +package ch.epfl.bluebrain.nexus.testkit + +import cats.effect.concurrent.Ref +import cats.effect.{Clock, Resource} +import ch.epfl.bluebrain.nexus.testkit.bio.ResourceFixture.TaskFixture +import ch.epfl.bluebrain.nexus.testkit.bio.{BioSuite, ResourceFixture} +import monix.bio.{Task, UIO} + +import java.time.Instant +import scala.concurrent.duration.TimeUnit + +final class MutableClock(value: Ref[Task, Instant]) extends Clock[UIO] { + + def set(instant: Instant): UIO[Unit] = value.set(instant).hideErrors + override def realTime(unit: TimeUnit): UIO[Long] = value.get.map(_.toEpochMilli).hideErrors + + override def monotonic(unit: TimeUnit): UIO[Long] = value.get.map(_.toEpochMilli).hideErrors +} +object MutableClock { + private def suiteLocalFixture: TaskFixture[MutableClock] = { + val clock = Ref.of[Task, Instant](Instant.EPOCH).map(new MutableClock(_)) + ResourceFixture.suiteLocal("clock", Resource.eval(clock)) + } + + trait Fixture { + self: BioSuite => + val clock: ResourceFixture.TaskFixture[MutableClock] = suiteLocalFixture + } +}