Skip to content

Commit

Permalink
Implement listing of projection errors (#4106)
Browse files Browse the repository at this point in the history
* Implement listing of projection errors

* Fix compilation

---------

Co-authored-by: Simon Dumas <[email protected]>
  • Loading branch information
imsdu and Simon Dumas committed Jul 26, 2023
1 parent 7ee5332 commit a13c3b1
Show file tree
Hide file tree
Showing 39 changed files with 586 additions and 401 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -293,23 +293,23 @@ final class BlazegraphViews(
* Return the existing indexing views in a project in a finite stream
*/
def currentIndexingViews(project: ProjectRef): ElemStream[IndexingViewDef] =
log.currentStates(Predicate.Project(project)).evalMapFilter { envelope =>
log.currentStates(Scope.Project(project)).evalMapFilter { envelope =>
Task.pure(toIndexViewDef(envelope))
}

/**
* Return all existing indexing views in a finite stream
*/
def currentIndexingViews: ElemStream[IndexingViewDef] =
log.currentStates(Predicate.Root).evalMapFilter { envelope =>
log.currentStates(Scope.Root).evalMapFilter { envelope =>
Task.pure(toIndexViewDef(envelope))
}

/**
* Return the indexing views in a non-ending stream
*/
def indexingViews(start: Offset): ElemStream[IndexingViewDef] =
log.states(Predicate.Root, start).evalMapFilter { envelope =>
log.states(Scope.Root, start).evalMapFilter { envelope =>
Task.pure(toIndexViewDef(envelope))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.model.Identity.Subject
import ch.epfl.bluebrain.nexus.delta.sourcing.model.Tag.UserTag
import ch.epfl.bluebrain.nexus.delta.sourcing.model._
import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset
import ch.epfl.bluebrain.nexus.delta.sourcing.{Predicate, ScopedEntityDefinition, ScopedEventLog, StateMachine, Transactors}
import ch.epfl.bluebrain.nexus.delta.sourcing.{Scope, ScopedEntityDefinition, ScopedEventLog, StateMachine, Transactors}
import io.circe.Json
import monix.bio.{IO, Task, UIO}

Expand Down Expand Up @@ -400,7 +400,7 @@ final class CompositeViews private (
params: CompositeViewSearchParams,
ordering: Ordering[ViewResource]
): UIO[UnscoredSearchResults[ViewResource]] = {
val predicate = params.project.fold[Predicate](Predicate.Root)(ref => Predicate.Project(ref))
val predicate = params.project.fold[Scope](Scope.Root)(ref => Scope.Project(ref))
SearchResults(
log.currentStates(predicate, identity(_)).evalMapFilter[Task, ViewResource] { state =>
fetchContext.cacheOnReads
Expand All @@ -422,19 +422,19 @@ final class CompositeViews private (
* Return all existing views for the given project in a finite stream
*/
def currentViews(project: ProjectRef): ElemStream[CompositeViewDef] =
log.currentStates(Predicate.Project(project)).map(toCompositeViewDef)
log.currentStates(Scope.Project(project)).map(toCompositeViewDef)

/**
* Return all existing indexing views in a finite stream
*/
def currentViews: ElemStream[CompositeViewDef] =
log.currentStates(Predicate.Root).map(toCompositeViewDef)
log.currentStates(Scope.Root).map(toCompositeViewDef)

/**
* Return the indexing views in a non-ending stream
*/
def views(start: Offset): ElemStream[CompositeViewDef] =
log.states(Predicate.Root, start).map(toCompositeViewDef)
log.states(Scope.Root, start).map(toCompositeViewDef)

private def toCompositeViewDef(envelope: Envelope[CompositeViewState]) =
envelope.toElem { v => Some(v.project) }.map { v =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ object CompositeProjections {
Projection.persist(
progress,
compositeProgressStore.save(view, rev, branch, _),
failedElemLogStore.saveFailedElems(metadata, _)
failedElemLogStore.save(metadata, _)
)(batch)
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -325,23 +325,23 @@ final class ElasticSearchViews private (
* Return the existing indexing views in a project in a finite stream
*/
def currentIndexingViews(project: ProjectRef): ElemStream[IndexingViewDef] =
log.currentStates(Predicate.Project(project)).evalMapFilter { envelope =>
log.currentStates(Scope.Project(project)).evalMapFilter { envelope =>
Task.pure(toIndexViewDef(envelope))
}

/**
* Return all existing indexing views in a finite stream
*/
def currentIndexingViews: ElemStream[IndexingViewDef] =
log.currentStates(Predicate.Root).evalMapFilter { envelope =>
log.currentStates(Scope.Root).evalMapFilter { envelope =>
Task.pure(toIndexViewDef(envelope))
}

/**
* Return the indexing views in a non-ending stream
*/
def indexingViews(start: Offset): ElemStream[IndexingViewDef] =
log.states(Predicate.Root, start).evalMapFilter { envelope =>
log.states(Scope.Root, start).evalMapFilter { envelope =>
Task.pure(toIndexViewDef(envelope))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Operation.Sink
import ch.epfl.bluebrain.nexus.delta.sourcing.stream._
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.pipes.AsJson
import ch.epfl.bluebrain.nexus.delta.sourcing.{MultiDecoder, Predicate, Transactors}
import ch.epfl.bluebrain.nexus.delta.sourcing.{MultiDecoder, Scope, Transactors}
import monix.bio.Task

trait EventMetricsProjection
Expand Down Expand Up @@ -73,8 +73,7 @@ object EventMetricsProjection {
MultiDecoder(metricEncoders.map { encoder => encoder.entityType -> encoder.toMetric }.toMap)

// define how to get metrics from a given offset
val metrics = (offset: Offset) =>
EventStreaming.fetchScoped(Predicate.root, allEntityTypes, offset, queryConfig, xas)
val metrics = (offset: Offset) => EventStreaming.fetchScoped(Scope.root, allEntityTypes, offset, queryConfig, xas)

val index = eventMetricsIndex(indexPrefix)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import ch.epfl.bluebrain.nexus.delta.sdk.model.IdSegment
import ch.epfl.bluebrain.nexus.delta.sdk.model.search.SortList
import ch.epfl.bluebrain.nexus.delta.sdk.projects.FetchContext
import ch.epfl.bluebrain.nexus.delta.sdk.projects.model.{ApiMappings, ProjectBase}
import ch.epfl.bluebrain.nexus.delta.sourcing.Predicate
import ch.epfl.bluebrain.nexus.delta.sourcing.Scope
import ch.epfl.bluebrain.nexus.delta.sourcing.model.{Label, ProjectRef, ResourceRef}
import monix.bio.IO

Expand All @@ -33,9 +33,9 @@ sealed trait DefaultSearchRequest extends Product with Serializable {
def sort: SortList

/**
* If the search applies to the project/org/root level
* If the search applies to the project/org/root scope
*/
def predicate: Predicate
def scope: Scope

}

Expand All @@ -46,7 +46,7 @@ object DefaultSearchRequest {
*/
case class ProjectSearch(ref: ProjectRef, params: ResourcesSearchParams, pagination: Pagination, sort: SortList)
extends DefaultSearchRequest {
override def predicate: Predicate = Predicate.Project(ref)
override def scope: Scope = Scope(ref)
}

object ProjectSearch {
Expand Down Expand Up @@ -82,7 +82,7 @@ object DefaultSearchRequest {
*/
case class OrgSearch(label: Label, params: ResourcesSearchParams, pagination: Pagination, sort: SortList)
extends DefaultSearchRequest {
override def predicate: Predicate = Predicate.Org(label)
override def scope: Scope = Scope.Org(label)
}

object OrgSearch {
Expand All @@ -103,7 +103,7 @@ object DefaultSearchRequest {
*/
case class RootSearch(params: ResourcesSearchParams, pagination: Pagination, sort: SortList)
extends DefaultSearchRequest {
override def predicate: Predicate = Predicate.Root
override def scope: Scope = Scope.Root
}

object RootSearch {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import ch.epfl.bluebrain.nexus.delta.sdk.identities.model.Caller
import ch.epfl.bluebrain.nexus.delta.sdk.model.BaseUri
import ch.epfl.bluebrain.nexus.delta.sdk.model.search.{AggregationResult, SearchResults}
import ch.epfl.bluebrain.nexus.delta.sdk.views.View.IndexingView
import ch.epfl.bluebrain.nexus.delta.sourcing.{Predicate, Transactors}
import ch.epfl.bluebrain.nexus.delta.sourcing.{Scope, Transactors}
import io.circe.JsonObject
import monix.bio.{IO, UIO}

Expand Down Expand Up @@ -58,14 +58,14 @@ object DefaultViewsQuery {
}

def apply[Result, Aggregate](
fetchViews: Predicate => UIO[List[IndexingView]],
fetchViews: Scope => UIO[List[IndexingView]],
aclCheck: AclCheck,
listAction: (DefaultSearchRequest, Set[IndexingView]) => IO[ElasticSearchQueryError, Result],
aggregateAction: (DefaultSearchRequest, Set[IndexingView]) => IO[ElasticSearchQueryError, Aggregate]
): DefaultViewsQuery[Result, Aggregate] = new DefaultViewsQuery[Result, Aggregate] {

private def filterViews(predicate: Predicate)(implicit caller: Caller) =
fetchViews(predicate)
private def filterViews(scope: Scope)(implicit caller: Caller) =
fetchViews(scope)
.flatMap { allViews =>
aclCheck.mapFilter[IndexingView, IndexingView](
allViews,
Expand All @@ -80,7 +80,7 @@ object DefaultViewsQuery {
override def list(
searchRequest: DefaultSearchRequest
)(implicit caller: Caller): IO[ElasticSearchQueryError, Result] =
filterViews(searchRequest.predicate).flatMap { views =>
filterViews(searchRequest.scope).flatMap { views =>
listAction(searchRequest, views)
}

Expand All @@ -90,7 +90,7 @@ object DefaultViewsQuery {
override def aggregate(
searchRequest: DefaultSearchRequest
)(implicit caller: Caller): IO[ElasticSearchQueryError, Aggregate] =
filterViews(searchRequest.predicate).flatMap { views =>
filterViews(searchRequest.scope).flatMap { views =>
aggregateAction(searchRequest, views)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.ElasticSearchViews
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.model.{defaultViewId, permissions, ElasticSearchViewState}
import ch.epfl.bluebrain.nexus.delta.sdk.views.View.IndexingView
import ch.epfl.bluebrain.nexus.delta.sdk.views.ViewRef
import ch.epfl.bluebrain.nexus.delta.sourcing.{Predicate, Transactors}
import ch.epfl.bluebrain.nexus.delta.sourcing.model.Tag
import ch.epfl.bluebrain.nexus.delta.sourcing.{Scope, Transactors}
import doobie._
import doobie.implicits._
import io.circe.{Decoder, Json}
Expand All @@ -18,9 +18,9 @@ import monix.bio.{IO, UIO}
trait DefaultViewsStore {

/**
* Return views at the given predicate
* Return views at the given scope
*/
def find(predicate: Predicate): UIO[List[IndexingView]]
def find(scope: Scope): UIO[List[IndexingView]]
}

object DefaultViewsStore {
Expand All @@ -38,11 +38,11 @@ object DefaultViewsStore {
def apply(prefix: String, xas: Transactors): DefaultViewsStore = {
new DefaultViewsStore {
implicit val stateDecoder: Decoder[ElasticSearchViewState] = ElasticSearchViewState.serializer.codec
def find(predicate: Predicate): UIO[List[IndexingView]] =
def find(scope: Scope): UIO[List[IndexingView]] =
(fr"SELECT value FROM scoped_states" ++
Fragments.whereAndOpt(
Some(fr"type = ${ElasticSearchViews.entityType}"),
predicate.asFragment,
scope.asFragment,
Some(fr"tag = ${Tag.Latest.value}"),
Some(fr"id = $defaultViewId"),
Some(fr"deprecated = false")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import ch.epfl.bluebrain.nexus.delta.sdk.identities.model.Caller
import ch.epfl.bluebrain.nexus.delta.sdk.model.search.SortList
import ch.epfl.bluebrain.nexus.delta.sdk.views.View.IndexingView
import ch.epfl.bluebrain.nexus.delta.sdk.views.ViewRef
import ch.epfl.bluebrain.nexus.delta.sourcing.Predicate
import ch.epfl.bluebrain.nexus.delta.sourcing.Scope
import ch.epfl.bluebrain.nexus.delta.sourcing.model.Identity.{Anonymous, Group, User}
import ch.epfl.bluebrain.nexus.delta.sourcing.model.{Label, ProjectRef}
import ch.epfl.bluebrain.nexus.testkit.bio.BioSuite
Expand Down Expand Up @@ -45,13 +45,13 @@ class DefaultViewsQuerySuite extends BioSuite {
(charlie.subject, AclAddress.Project(project1), Set(permissions.read))
)

private def fetchViews(predicate: Predicate) = UIO.pure {
private def fetchViews(predicate: Scope) = UIO.pure {
val viewRefs = predicate match {
case Predicate.Root => List(defaultView, defaultView2, defaultView3)
case Predicate.Org(`org`) => List(defaultView, defaultView2)
case Predicate.Org(`org2`) => List(defaultView3)
case Predicate.Org(_) => List.empty
case Predicate.Project(project) => List(ViewRef(project, defaultViewId))
case Scope.Root => List(defaultView, defaultView2, defaultView3)
case Scope.Org(`org`) => List(defaultView, defaultView2)
case Scope.Org(`org2`) => List(defaultView3)
case Scope.Org(_) => List.empty
case Scope.Project(project) => List(ViewRef(project, defaultViewId))
}
viewRefs.map { ref =>
IndexingView(ref, "index", permissions.read)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.{ElasticSearchViewGen
import ch.epfl.bluebrain.nexus.delta.rdf.Vocabulary.nxv
import ch.epfl.bluebrain.nexus.delta.sdk.views.View.IndexingView
import ch.epfl.bluebrain.nexus.delta.sdk.views.ViewRef
import ch.epfl.bluebrain.nexus.delta.sourcing.Predicate
import ch.epfl.bluebrain.nexus.delta.sourcing.Scope
import ch.epfl.bluebrain.nexus.delta.sourcing.model.{Label, ProjectRef, Tag}
import ch.epfl.bluebrain.nexus.delta.sourcing.state.ScopedStateStoreFixture
import ch.epfl.bluebrain.nexus.testkit.bio.{BioSuite, ResourceFixture}
Expand Down Expand Up @@ -75,7 +75,7 @@ class DefaultViewsStoreSuite extends BioSuite {

private lazy val viewStore = defaultViewsStore()

private def findDefaultRefs(predicate: Predicate) =
private def findDefaultRefs(predicate: Scope) =
viewStore.find(predicate).map(_.map(_.ref))

test("Construct indexing view correctly") {
Expand All @@ -89,14 +89,14 @@ class DefaultViewsStoreSuite extends BioSuite {
}

test(s"Get non-deprecated default views in '$project1'") {
findDefaultRefs(Predicate.Project(project1)).assert(List(defaultView1))
findDefaultRefs(Scope.Project(project1)).assert(List(defaultView1))
}

test(s"Get non-deprecated default views in '$org'") {
findDefaultRefs(Predicate.Org(org)).assert(List(defaultView1, defaultView2))
findDefaultRefs(Scope.Org(org)).assert(List(defaultView1, defaultView2))
}

test(s"Get non-deprecated in all orgs") {
findDefaultRefs(Predicate.Root).assert(List(defaultView1, defaultView2, defaultView3))
findDefaultRefs(Scope.Root).assert(List(defaultView1, defaultView2, defaultView3))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -467,7 +467,7 @@ final class Files(
.bimap(WrappedStorageRejection, _.value)
)
stream <- log
.states(Predicate.root, offset)
.states(Scope.root, offset)
.map { envelope =>
envelope.value match {
case f
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.ScopedEntityDefinition.Tagger
import ch.epfl.bluebrain.nexus.delta.sourcing.model.Identity.Subject
import ch.epfl.bluebrain.nexus.delta.sourcing.model.Tag.UserTag
import ch.epfl.bluebrain.nexus.delta.sourcing.model.{ElemStream, EntityType, ProjectRef, ResourceRef}
import ch.epfl.bluebrain.nexus.delta.sourcing.{Predicate, ScopedEntityDefinition, ScopedEventLog, StateMachine, Transactors}
import ch.epfl.bluebrain.nexus.delta.sourcing.{Scope, ScopedEntityDefinition, ScopedEventLog, StateMachine, Transactors}
import com.typesafe.scalalogging.Logger
import fs2.Stream
import io.circe.Json
Expand Down Expand Up @@ -288,7 +288,7 @@ final class Storages private (

private def fetchDefaults(project: ProjectRef): IO[StorageFetchRejection, Stream[Task, StorageResource]] =
fetchContext.onRead(project).map { pc =>
log.currentStates(Predicate.Project(project), _.toResource(pc.apiMappings, pc.base)).filter(_.value.default)
log.currentStates(Scope.Project(project), _.toResource(pc.apiMappings, pc.base)).filter(_.value.default)
}

/**
Expand All @@ -309,7 +309,7 @@ final class Storages private (
* Return the existing storages in a project in a finite stream
*/
def currentStorages(project: ProjectRef): ElemStream[StorageState] =
log.currentStates(Predicate.Project(project)).map {
log.currentStates(Scope.Project(project)).map {
_.toElem { s => Some(s.project) }
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,16 +95,16 @@ final class ProjectsImpl private (
): UIO[SearchResults.UnscoredSearchResults[ProjectResource]] =
SearchResults(
log
.currentStates(params.organization.fold(Predicate.root)(Predicate.Org), _.toResource(defaultApiMappings))
.currentStates(params.organization.fold(Scope.root)(Scope.Org), _.toResource(defaultApiMappings))
.evalFilter(params.matches),
pagination,
ordering
).span("listProjects")

override def currentRefs: Stream[Task, ProjectRef] =
log.currentStates(Predicate.root).map(_.value.project)
log.currentStates(Scope.root).map(_.value.project)

override def states(offset: Offset): ElemStream[ProjectState] = log.states(Predicate.root, offset).map {
override def states(offset: Offset): ElemStream[ProjectState] = log.states(Scope.root, offset).map {
_.toElem { p => Some(p.project) }
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,9 +153,9 @@ final class ResolversImpl private (
params: ResolverSearchParams,
ordering: Ordering[ResolverResource]
): UIO[UnscoredSearchResults[ResolverResource]] = {
val predicate = params.project.fold[Predicate](Predicate.Root)(ref => Predicate.Project(ref))
val scope = params.project.fold[Scope](Scope.Root)(ref => Scope.Project(ref))
SearchResults(
log.currentStates(predicate, identity(_)).evalMapFilter[Task, ResolverResource] { state =>
log.currentStates(scope, identity(_)).evalMapFilter[Task, ResolverResource] { state =>
fetchContext.cacheOnReads
.onRead(state.project)
.redeemWith(
Expand Down
Loading

0 comments on commit a13c3b1

Please sign in to comment.