Skip to content

Commit

Permalink
Validate index with indexing rev (#4200)
Browse files Browse the repository at this point in the history
* Validate index with indexing rev

---------

Co-authored-by: Simon Dumas <[email protected]>
  • Loading branch information
imsdu and Simon Dumas committed Aug 25, 2023
1 parent 051463e commit 416bd5a
Show file tree
Hide file tree
Showing 30 changed files with 88 additions and 99 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@
"View": "https://bluebrain.github.io/nexus/vocabulary/View",
"ElasticSearchView": "https://bluebrain.github.io/nexus/vocabulary/ElasticSearchView",
"AggregateElasticSearchView": "https://bluebrain.github.io/nexus/vocabulary/AggregateElasticSearchView",
"_uuid": "https://bluebrain.github.io/nexus/vocabulary/uuid",
"_indexingRev": "https://bluebrain.github.io/nexus/vocabulary/indexingRev"
"_uuid": "https://bluebrain.github.io/nexus/vocabulary/uuid"
},
"@id": "https://bluebrain.github.io/nexus/contexts/elasticsearch-metadata.json"
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@
"viewId": {
"@type": "@id"
},
"_uuid": "https://bluebrain.github.io/nexus/vocabulary/uuid",
"_indexingRev": "https://bluebrain.github.io/nexus/vocabulary/indexingRev"
"_uuid": "https://bluebrain.github.io/nexus/vocabulary/uuid"
},
"https://bluebrain.github.io/nexus/contexts/pipeline.json"
],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import ch.epfl.bluebrain.nexus.delta.sdk.model._
import ch.epfl.bluebrain.nexus.delta.sdk.projects.FetchContext
import ch.epfl.bluebrain.nexus.delta.sdk.projects.model.ApiMappings
import ch.epfl.bluebrain.nexus.delta.sdk.resolvers.ResolverContextResolution
import ch.epfl.bluebrain.nexus.delta.sdk.views.IndexingRev
import ch.epfl.bluebrain.nexus.delta.sourcing.ScopedEntityDefinition.Tagger
import ch.epfl.bluebrain.nexus.delta.sourcing._
import ch.epfl.bluebrain.nexus.delta.sourcing.config.EventLogConfig
Expand Down Expand Up @@ -384,12 +385,12 @@ object ElasticSearchViews {
def projectionName(state: ElasticSearchViewState): String =
projectionName(state.project, state.id, state.indexingRev)

def projectionName(project: ProjectRef, id: Iri, indexingRev: Int): String = {
s"elasticsearch-$project-$id-$indexingRev"
def projectionName(project: ProjectRef, id: Iri, indexingRev: IndexingRev): String = {
s"elasticsearch-$project-$id-${indexingRev.value}"
}

def index(uuid: UUID, rev: Int, prefix: String): IndexLabel =
IndexLabel.fromView(prefix, uuid, rev)
def index(uuid: UUID, indexingRev: IndexingRev, prefix: String): IndexLabel =
IndexLabel.fromView(prefix, uuid, indexingRev)

def apply(
fetchContext: FetchContext[ElasticSearchViewRejection],
Expand Down Expand Up @@ -424,15 +425,11 @@ object ElasticSearchViews {
// format: off
def created(e: ElasticSearchViewCreated): Option[ElasticSearchViewState] =
Option.when(state.isEmpty) {
ElasticSearchViewState(e.id, e.project, e.uuid, e.value, e.source, Tags.empty, e.rev, e.rev, deprecated = false, e.instant, e.subject, e.instant, e.subject)
ElasticSearchViewState(e.id, e.project, e.uuid, e.value, e.source, Tags.empty, e.rev, IndexingRev.init, deprecated = false, e.instant, e.subject, e.instant, e.subject)
}

def updated(e: ElasticSearchViewUpdated): Option[ElasticSearchViewState] = state.map { s =>
val newIndexingRev =
(e.value.asIndexingValue, s.value.asIndexingValue, Option(s.indexingRev))
.mapN(nextIndexingRev)
.getOrElse(s.indexingRev)

val newIndexingRev = nextIndexingRev(s.value, e.value, s.indexingRev, e.rev)
s.copy(rev = e.rev, indexingRev = newIndexingRev, value = e.value, source = e.source, updatedAt = e.instant, updatedBy = e.subject)
}
// format: on
Expand Down Expand Up @@ -465,7 +462,7 @@ object ElasticSearchViews {
for {
t <- IOUtils.instant
u <- uuidF()
_ <- validate(u, 1, c.value)
_ <- validate(u, IndexingRev.init, c.value)
} yield ElasticSearchViewCreated(c.id, c.project, u, c.value, c.source, 1, t, c.subject)
case Some(_) => IO.raiseError(ResourceAlreadyExists(c.id, c.project))
}
Expand All @@ -480,8 +477,9 @@ object ElasticSearchViews {
case Some(s) if c.value.tpe != s.value.tpe =>
IO.raiseError(DifferentElasticSearchViewType(s.id.toString, c.value.tpe, s.value.tpe))
case Some(s) =>
val newIndexingRev = nextIndexingRev(s.value, c.value, s.indexingRev, c.rev)
for {
_ <- validate(s.uuid, s.rev + 1, c.value)
_ <- validate(s.uuid, newIndexingRev, c.value)
t <- IOUtils.instant
} yield ElasticSearchViewUpdated(c.id, c.project, s.uuid, c.value, c.source, s.rev + 1, t, c.subject)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import ch.epfl.bluebrain.nexus.delta.sdk.http.HttpClient.HttpResult
import ch.epfl.bluebrain.nexus.delta.sdk.http.HttpClientError.HttpClientStatusError
import ch.epfl.bluebrain.nexus.delta.sdk.permissions.Permissions
import ch.epfl.bluebrain.nexus.delta.sdk.permissions.model.Permission
import ch.epfl.bluebrain.nexus.delta.sdk.views.ValidateAggregate
import ch.epfl.bluebrain.nexus.delta.sdk.views.{IndexingRev, ValidateAggregate}
import ch.epfl.bluebrain.nexus.delta.sourcing.Transactors
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.{PipeChain, ProjectionErr}
import io.circe.JsonObject
Expand All @@ -22,12 +22,12 @@ import java.util.UUID
*/
trait ValidateElasticSearchView {

def apply(uuid: UUID, rev: Int, v: ElasticSearchViewValue): IO[ElasticSearchViewRejection, Unit]
def apply(uuid: UUID, indexingRev: IndexingRev, v: ElasticSearchViewValue): IO[ElasticSearchViewRejection, Unit]
}

object ValidateElasticSearchView {

val always: ValidateElasticSearchView = (_: UUID, _: Int, _: ElasticSearchViewValue) => IO.unit
val always: ValidateElasticSearchView = (_: UUID, _: IndexingRev, _: ElasticSearchViewValue) => IO.unit

def apply(
validatePipeChain: PipeChain => Either[ProjectionErr, Unit],
Expand Down Expand Up @@ -63,7 +63,7 @@ object ValidateElasticSearchView {
xas
)

private def validateIndexing(uuid: UUID, rev: Int, value: IndexingElasticSearchViewValue) =
private def validateIndexing(uuid: UUID, indexingRev: IndexingRev, value: IndexingElasticSearchViewValue) =
for {
defaultMapping <- defaultElasticsearchMapping
defaultSettings <- defaultElasticsearchSettings
Expand All @@ -72,7 +72,7 @@ object ValidateElasticSearchView {
}
_ <- IO.fromEither(value.pipeChain.traverse(validatePipeChain)).mapError(InvalidPipeline)
_ <- createIndex(
IndexLabel.fromView(prefix, uuid, rev),
IndexLabel.fromView(prefix, uuid, indexingRev),
value.mapping.orElse(Some(defaultMapping)),
value.settings.orElse(Some(defaultSettings))
)
Expand All @@ -82,12 +82,16 @@ object ValidateElasticSearchView {
}
} yield ()

override def apply(uuid: UUID, rev: Int, value: ElasticSearchViewValue): IO[ElasticSearchViewRejection, Unit] =
override def apply(
uuid: UUID,
indexingRev: IndexingRev,
value: ElasticSearchViewValue
): IO[ElasticSearchViewRejection, Unit] =
value match {
case v: AggregateElasticSearchViewValue =>
validateAggregate(v.views)
case v: IndexingElasticSearchViewValue =>
validateIndexing(uuid, rev, v)
validateIndexing(uuid, indexingRev, v)
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import ch.epfl.bluebrain.nexus.delta.kernel.error.FormatError
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.ExpandedJsonLdCursor
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.decoder.JsonLdDecoder
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.decoder.JsonLdDecoderError.ParsingFailure
import ch.epfl.bluebrain.nexus.delta.sdk.views.IndexingRev
import io.circe.{Decoder, Encoder}

import java.util.UUID
Expand Down Expand Up @@ -79,8 +80,8 @@ object IndexLabel {
* @param indexingRev
* the view's indexing revision
*/
final def fromView(prefix: String, uuid: UUID, indexingRev: Int): IndexLabel =
new IndexLabel(s"${prefix}_${uuid}_$indexingRev")
final def fromView(prefix: String, uuid: UUID, indexingRev: IndexingRev): IndexLabel =
new IndexLabel(s"${prefix}_${uuid}_${indexingRev.value}")

implicit val indexLabelJsonLdDecoder: JsonLdDecoder[IndexLabel] =
(cursor: ExpandedJsonLdCursor) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.model.{contexts, Elas
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.context.ContextValue.ContextObject
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.context.{ContextValue, RemoteContextResolution}
import ch.epfl.bluebrain.nexus.delta.sdk.stream.GraphResourceStream
import ch.epfl.bluebrain.nexus.delta.sdk.views.ViewRef
import ch.epfl.bluebrain.nexus.delta.sdk.views.{IndexingRev, ViewRef}
import ch.epfl.bluebrain.nexus.delta.sourcing.model.Tag.UserTag
import ch.epfl.bluebrain.nexus.delta.sourcing.model.{ElemStream, Tag}
import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset
Expand Down Expand Up @@ -48,7 +48,7 @@ object IndexingViewDef {
mapping: JsonObject,
settings: JsonObject,
context: Option[ContextObject],
indexingRev: Int,
indexingRev: IndexingRev,
rev: Int
) extends IndexingViewDef {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,10 +126,9 @@ object ElasticSearchView {
context: Option[ContextObject],
permission: Permission,
tags: Tags,
source: Json,
indexingRev: Int
source: Json
) extends ElasticSearchView {
override def metadata: Metadata = Metadata(Some(uuid), Some(indexingRev))
override def metadata: Metadata = Metadata(Some(uuid))

override def tpe: ElasticSearchViewType = ElasticSearchViewType.ElasticSearch
}
Expand Down Expand Up @@ -157,7 +156,7 @@ object ElasticSearchView {
tags: Tags,
source: Json
) extends ElasticSearchView {
override def metadata: Metadata = Metadata(None, None)
override def metadata: Metadata = Metadata(None)
override def tpe: ElasticSearchViewType = ElasticSearchViewType.AggregateElasticSearch
}

Expand All @@ -169,7 +168,7 @@ object ElasticSearchView {
* @param indexingRev
* the optionally available indexing revision
*/
final case class Metadata(uuid: Option[UUID], indexingRev: Option[Int])
final case class Metadata(uuid: Option[UUID])

val context: ContextValue = ContextValue(contexts.elasticsearch)

Expand Down Expand Up @@ -265,7 +264,6 @@ object ElasticSearchView {
Encoder.encodeJsonObject.contramapObject(meta =>
JsonObject.empty
.addIfExists("_uuid", meta.uuid)
.addIfExists("_indexingRev", meta.indexingRev)
)

implicit val elasticSearchMetadataJsonLdEncoder: JsonLdEncoder[Metadata] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.model.ElasticSearchVi
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.model.ElasticSearchViewValue._
import ch.epfl.bluebrain.nexus.delta.rdf.IriOrBNode.Iri
import ch.epfl.bluebrain.nexus.delta.sdk.model.{ResourceF, ResourceUris, Tags}
import ch.epfl.bluebrain.nexus.delta.sdk.views.IndexingRev
import ch.epfl.bluebrain.nexus.delta.sourcing.Serializer
import ch.epfl.bluebrain.nexus.delta.sourcing.model.Identity.Subject
import ch.epfl.bluebrain.nexus.delta.sourcing.model.{ProjectRef, ResourceRef}
Expand Down Expand Up @@ -55,7 +56,7 @@ final case class ElasticSearchViewState(
source: Json,
tags: Tags,
rev: Int,
indexingRev: Int,
indexingRev: IndexingRev,
deprecated: Boolean,
createdAt: Instant,
createdBy: Subject,
Expand Down Expand Up @@ -94,8 +95,7 @@ final case class ElasticSearchViewState(
context = context,
permission = permission,
tags = tags,
source = source,
indexingRev = indexingRev
source = source
)
case AggregateElasticSearchViewValue(name, description, views) =>
AggregateElasticSearchView(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.model

import cats.syntax.all._
import cats.data.{NonEmptyChain, NonEmptySet}
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.model.ElasticSearchViewValue.IndexingElasticSearchViewValue
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.model.ElasticSearchViewValue.IndexingElasticSearchViewValue.defaultPipeline
Expand All @@ -8,7 +9,7 @@ import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.ExpandedJsonLd
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.context.ContextValue.ContextObject
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.context.JsonLdContext.keywords
import ch.epfl.bluebrain.nexus.delta.sdk.permissions.model.Permission
import ch.epfl.bluebrain.nexus.delta.sdk.views.{PipeStep, ViewRef}
import ch.epfl.bluebrain.nexus.delta.sdk.views.{IndexingRev, PipeStep, ViewRef}
import ch.epfl.bluebrain.nexus.delta.sourcing.model.Tag.UserTag
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.pipes.{DefaultLabelPredicates, DiscardMetadata, FilterDeprecated}
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.{PipeChain, PipeRef}
Expand Down Expand Up @@ -135,12 +136,17 @@ object ElasticSearchViewValue {
* the next indexing revision based on the differences between the given views
*/
def nextIndexingRev(
view1: IndexingElasticSearchViewValue,
view2: IndexingElasticSearchViewValue,
currentRev: Int
): Int =
if (!view1.hasSameIndexingFields(view2)) currentRev + 1
else currentRev
view1: ElasticSearchViewValue,
view2: ElasticSearchViewValue,
currentIndexingRev: IndexingRev,
newEventRev: Int
): IndexingRev =
(view1.asIndexingValue, view2.asIndexingValue)
.mapN { case (v1, v2) =>
if (!v1.hasSameIndexingFields(v2)) IndexingRev(newEventRev)
else currentIndexingRev
}
.getOrElse(currentIndexingRev)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
],
"sourceAsText": true,
"_uuid": "f85d862a-9ec0-4b9a-8aed-2938d7ca9981",
"_indexingRev": 1,
"mapping": {
"properties": {
"@type": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
"resourceTypes": [],
"sourceAsText": false,
"_uuid": "f85d862a-9ec0-4b9a-8aed-2938d7ca9981",
"_indexingRev": 1,
"mapping": {
"properties": {
"@type": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
"resourceTypes": [],
"sourceAsText": false,
"_uuid": "f85d862a-9ec0-4b9a-8aed-2938d7ca9981",
"_indexingRev": 1,
"mapping": {
"properties": {
"@type": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,11 +120,6 @@
"@value": "f85d862a-9ec0-4b9a-8aed-2938d7ca9981"
}
],
"https://bluebrain.github.io/nexus/vocabulary/indexingRev" : [
{
"@value" : 1
}
],
"@id": "https://bluebrain.github.io/nexus/vocabulary/myview"
}
]
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
"_outgoing" : "{{self}}/outgoing",
"_project" : "http://localhost/v1/projects/{{project}}",
"_rev" : {{rev}},
"_indexingRev": {{indexingRev}},
"_self" : "{{self}}",
"_updatedAt" : "1970-01-01T00:00:00Z",
"_updatedBy" : "{{updatedBy}}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
"_outgoing" : "{{self}}/outgoing",
"_project" : "http://localhost/v1/projects/{{project}}",
"_rev" : {{rev}},
"_indexingRev": {{indexingRev}},
"_self" : "{{self}}",
"_updatedAt" : "1970-01-01T00:00:00Z",
"_updatedBy" : "{{updatedBy}}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import ch.epfl.bluebrain.nexus.delta.rdf.IriOrBNode.Iri
import ch.epfl.bluebrain.nexus.delta.rdf.Vocabulary.nxv
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.ExpandedJsonLd
import ch.epfl.bluebrain.nexus.delta.sdk.syntax._
import ch.epfl.bluebrain.nexus.delta.sdk.views.ViewRef
import ch.epfl.bluebrain.nexus.delta.sdk.views.{IndexingRev, ViewRef}
import ch.epfl.bluebrain.nexus.delta.sourcing.PullRequest
import ch.epfl.bluebrain.nexus.delta.sourcing.PullRequest.PullRequestState
import ch.epfl.bluebrain.nexus.delta.sourcing.PullRequest.PullRequestState.PullRequestActive
Expand All @@ -33,7 +33,7 @@ class ElasticSearchIndexingActionSuite extends BioSuite with CirceLiteral with F

private val instant = Instant.EPOCH

private val indexingRev = 1
private val indexingRev = IndexingRev.init
private val rev = 2

private val project = ProjectRef.unsafe("org", "proj")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.model.{ElasticSearchViewState, ElasticSearchViewValue, ViewResource}
import ch.epfl.bluebrain.nexus.delta.rdf.IriOrBNode.Iri
import ch.epfl.bluebrain.nexus.delta.sdk.model.Tags
import ch.epfl.bluebrain.nexus.delta.sdk.views.IndexingRev
import ch.epfl.bluebrain.nexus.delta.sourcing.model.Identity.{Anonymous, Subject}
import ch.epfl.bluebrain.nexus.delta.sourcing.model.ProjectRef
import io.circe.{Json, JsonObject}
Expand All @@ -19,7 +20,7 @@ object ElasticSearchViewGen {
uuid: UUID = UUID.randomUUID(),
source: Json = Json.obj(),
rev: Int = 1,
indexingRev: Int = 1,
indexingRev: IndexingRev = IndexingRev.init,
deprecated: Boolean = false,
tags: Tags = Tags.empty,
createdBy: Subject = Anonymous,
Expand Down Expand Up @@ -48,7 +49,7 @@ object ElasticSearchViewGen {
uuid: UUID = UUID.randomUUID(),
source: Json = Json.obj(),
rev: Int = 1,
indexingRev: Int = 1,
indexingRev: IndexingRev = IndexingRev.init,
deprecated: Boolean = false,
tags: Tags = Tags.empty,
createdBy: Subject = Anonymous,
Expand Down
Loading

0 comments on commit 416bd5a

Please sign in to comment.