Skip to content

Commit

Permalink
Implement search updater as a hook (#4073)
Browse files Browse the repository at this point in the history
* Implement search updater as a hook

---------

Co-authored-by: Simon Dumas <[email protected]>
  • Loading branch information
imsdu and Simon Dumas committed Jul 19, 2023
1 parent d51a201 commit 413c02c
Show file tree
Hide file tree
Showing 15 changed files with 424 additions and 351 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,43 +10,41 @@ import scala.reflect.{classTag, ClassTag}
* Wrapper class that is used to be able to create log statements as UIOs. It is needed because "any type class from
* Sync and above will only work with IO[Throwable, A]" (see https://bio.monix.io/docs/cats-effect#sync-and-above)
*/
class Logger[A: ClassTag] extends Log4CatsLogger[UIO] {
trait Logger extends Log4CatsLogger[UIO]

implicit private val loggerName: LoggerName = LoggerName(classTag[A].runtimeClass.getName.stripSuffix("$"))
private val logger: Log4CatsLogger[Task] = Slf4jLogger.getLogger[Task]

override def error(t: Throwable)(message: => String): UIO[Unit] =
logger.error(t)(message).hideErrors
object Logger {
def apply[A: ClassTag]: Logger = new Logger {
implicit private val loggerName: LoggerName = LoggerName(classTag[A].runtimeClass.getName.stripSuffix("$"))
private val logger: Log4CatsLogger[Task] = Slf4jLogger.getLogger[Task]

override def warn(t: Throwable)(message: => String): UIO[Unit] =
logger.warn(t)(message).hideErrors
override def error(t: Throwable)(message: => String): UIO[Unit] =
logger.error(t)(message).hideErrors

override def info(t: Throwable)(message: => String): UIO[Unit] =
logger.info(t)(message).hideErrors
override def warn(t: Throwable)(message: => String): UIO[Unit] =
logger.warn(t)(message).hideErrors

override def debug(t: Throwable)(message: => String): UIO[Unit] =
logger.debug(t)(message).hideErrors
override def info(t: Throwable)(message: => String): UIO[Unit] =
logger.info(t)(message).hideErrors

override def trace(t: Throwable)(message: => String): UIO[Unit] =
logger.trace(t)(message).hideErrors
override def debug(t: Throwable)(message: => String): UIO[Unit] =
logger.debug(t)(message).hideErrors

override def error(message: => String): UIO[Unit] =
logger.error(message).hideErrors
override def trace(t: Throwable)(message: => String): UIO[Unit] =
logger.trace(t)(message).hideErrors

override def warn(message: => String): UIO[Unit] =
logger.warn(message).hideErrors
override def error(message: => String): UIO[Unit] =
logger.error(message).hideErrors

override def info(message: => String): UIO[Unit] =
logger.info(message).hideErrors
override def warn(message: => String): UIO[Unit] =
logger.warn(message).hideErrors

override def debug(message: => String): UIO[Unit] =
logger.debug(message).hideErrors
override def info(message: => String): UIO[Unit] =
logger.info(message).hideErrors

override def trace(message: => String): UIO[Unit] =
logger.trace(message).hideErrors
override def debug(message: => String): UIO[Unit] =
logger.debug(message).hideErrors

}

object Logger {
def apply[A: ClassTag]: Logger[A] = new Logger()
override def trace(message: => String): UIO[Unit] =
logger.trace(message).hideErrors
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,8 @@ object BlazegraphCoordinator {

}

val metadata: ProjectionMetadata = ProjectionMetadata("system", "blazegraph-coordinator", None, None)
private val logger: Logger[BlazegraphCoordinator] = Logger[BlazegraphCoordinator]
val metadata: ProjectionMetadata = ProjectionMetadata("system", "blazegraph-coordinator", None, None)
private val logger: Logger = Logger[BlazegraphCoordinator]

def apply(
views: BlazegraphViews,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.client.BlazegraphClient
import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.client.DeltaClient
import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.config.CompositeViewsConfig
import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.deletion.CompositeViewsDeletionTask
import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.indexing.{CompositeSpaces, CompositeViewsCoordinator, MetadataPredicates}
import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.indexing.{CompositeProjectionLifeCycle, CompositeSpaces, CompositeViewsCoordinator, MetadataPredicates}
import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.model.CompositeViewRejection.ProjectContextRejection
import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.model._
import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.projections.{CompositeIndexingDetails, CompositeProjections}
Expand Down Expand Up @@ -185,28 +185,41 @@ class CompositeViewsPluginModule(priority: Int) extends ModuleDef {
CompositeGraphStream(local, remote)
}

make[CompositeViewsCoordinator].fromEffect {
many[CompositeProjectionLifeCycle.Hook].addValue(CompositeProjectionLifeCycle.NoopHook)

make[CompositeProjectionLifeCycle].from {
(
compositeViews: CompositeViews,
supervisor: Supervisor,
hooks: Set[CompositeProjectionLifeCycle.Hook],
registry: ReferenceRegistry,
graphStream: CompositeGraphStream,
buildSpaces: CompositeSpaces.Builder,
compositeProjections: CompositeProjections,
config: CompositeViewsConfig,
cr: RemoteContextResolution @Id("aggregate")
) =>
CompositeViewsCoordinator(
compositeViews,
supervisor,
CompositeProjectionLifeCycle(
hooks,
PipeChain.compile(_, registry),
graphStream,
buildSpaces.apply,
compositeProjections,
config
compositeProjections
)(cr)
}

make[CompositeViewsCoordinator].fromEffect {
(
compositeViews: CompositeViews,
supervisor: Supervisor,
lifecycle: CompositeProjectionLifeCycle,
config: CompositeViewsConfig
) =>
CompositeViewsCoordinator(
compositeViews,
supervisor,
lifecycle,
config
)
}

many[ProjectDeletionTask].add { (views: CompositeViews) => CompositeViewsDeletionTask(views) }

many[MetadataContextValue].addEffect(MetadataContextValue.fromFile("contexts/composite-views-metadata.json"))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.indexing

import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.indexing.CompositeViewDef.ActiveViewDef
import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.projections.CompositeProjections
import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.stream.CompositeGraphStream
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.context.RemoteContextResolution
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.ExecutionStrategy.TransientSingleNode
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.{CompiledProjection, PipeChain}
import monix.bio.Task

/**
* Handle the different life stages of a composite view projection
*/
trait CompositeProjectionLifeCycle {

/**
* Initialise the projection related to the view
*/
def init(view: ActiveViewDef): Task[Unit]

/**
* Build the projection related to the view, applying any matching hook If none, start the regular indexing
*/
def build(view: ActiveViewDef): Task[CompiledProjection]

/**
* Destroy the projection related to the view
*/
def destroy(view: ActiveViewDef): Task[Unit]
}

object CompositeProjectionLifeCycle {

/**
* Hook that allows to capture changes to apply before starting the indexing of a composite view
*/
trait Hook {
def apply(view: ActiveViewDef): Option[Task[Unit]]
}

/**
* A default implementation that does nothing
*/
val NoopHook: Hook = (_: ActiveViewDef) => None

/**
* Constructs the lifecycle of the projection of a composite view including building the projection itself and how to
* create/destroy the namespaces and indices related to it
*/
def apply(
hooks: Set[Hook],
compilePipeChain: PipeChain.Compile,
graphStream: CompositeGraphStream,
buildSpaces: ActiveViewDef => CompositeSpaces,
compositeProjections: CompositeProjections
)(implicit cr: RemoteContextResolution): CompositeProjectionLifeCycle = {
def init(view: ActiveViewDef): Task[Unit] = buildSpaces(view).init

def index(view: ActiveViewDef): Task[CompiledProjection] =
CompositeViewDef.compile(view, buildSpaces(view), compilePipeChain, graphStream, compositeProjections)

def destroy(view: ActiveViewDef): Task[Unit] =
for {
_ <- buildSpaces(view).destroy
_ <- compositeProjections.deleteAll(view.ref, view.rev)
} yield ()

apply(hooks, init, index, destroy)
}

private[indexing] def apply(
hooks: Set[Hook],
onInit: ActiveViewDef => Task[Unit],
index: ActiveViewDef => Task[CompiledProjection],
onDestroy: ActiveViewDef => Task[Unit]
): CompositeProjectionLifeCycle = new CompositeProjectionLifeCycle {

override def init(view: ActiveViewDef): Task[Unit] = onInit(view)

override def build(view: ActiveViewDef): Task[CompiledProjection] = {
detectHook(view).getOrElse {
index(view)
}
}

private def detectHook(view: ActiveViewDef) = {
val initial: Option[Task[Unit]] = None
hooks.toList
.foldLeft(initial) { case (acc, hook) =>
(acc ++ hook(view)).reduceOption(_ >> _)
}
.map { task =>
Task.pure(CompiledProjection.fromTask(view.metadata, TransientSingleNode, task))
}
}

override def destroy(view: ActiveViewDef): Task[Unit] = onDestroy(view)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,21 @@ object CompositeViewDef {
*/
final case class ActiveViewDef(ref: ViewRef, uuid: UUID, rev: Int, value: CompositeViewValue)
extends CompositeViewDef {
def projection = s"composite-views-${ref.project}-${ref.viewId}-$rev"

/**
* The projection name for this view
*/
val projection = s"composite-views-${ref.project}-${ref.viewId}-$rev"

/**
* The projection metadata for this view
*/
val metadata: ProjectionMetadata = ProjectionMetadata(
CompositeViews.entityType.value,
projection,
Some(ref.project),
Some(ref.viewId)
)
}

/**
Expand Down Expand Up @@ -118,12 +132,7 @@ object CompositeViewDef {
graphStream: CompositeGraphStream,
compositeProjections: CompositeProjections
)(implicit cr: RemoteContextResolution): Task[CompiledProjection] = {
val metadata = ProjectionMetadata(
CompositeViews.entityType.value,
view.projection,
Some(view.ref.project),
Some(view.ref.viewId)
)
val metadata = view.metadata
val fetchProgress: UIO[CompositeProgress] = compositeProjections.progress(view.ref, view.rev)

def compileSource =
Expand Down
Loading

0 comments on commit 413c02c

Please sign in to comment.