Skip to content

Commit

Permalink
Fix blazegraph indexing when the resource is indexed twice in the sam…
Browse files Browse the repository at this point in the history
…e chunk (#4181)

Co-authored-by: Simon Dumas <[email protected]>
  • Loading branch information
imsdu and Simon Dumas committed Aug 21, 2023
1 parent b71d5a8 commit badb008
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ final class BlazegraphSink(
private val endpoint: Iri = base.endpoint.toIri

override def apply(elements: Chunk[Elem[NTriples]]): Task[Chunk[Elem[Unit]]] = {
val bulk = elements.foldLeft(BlazegraphBulk(Set.empty, List.empty, endpoint)) {
val bulk = elements.foldLeft(BlazegraphBulk.empty(endpoint)) {
case (acc, Elem.SuccessElem(_, id, _, _, _, triples, _)) =>
acc.replace(id, triples)
case (acc, Elem.DroppedElem(_, id, _, _, _, _)) =>
Expand Down Expand Up @@ -82,21 +82,25 @@ object BlazegraphSink {
def apply(client: BlazegraphClient, batchConfig: BatchConfig, namespace: String)(implicit base: BaseUri) =
new BlazegraphSink(client, batchConfig.maxElements, batchConfig.maxInterval, namespace = namespace)

final case class BlazegraphBulk(invalidIds: Set[Iri], queries: List[SparqlWriteQuery], endpoint: Iri) {
final case class BlazegraphBulk(invalidIds: Set[Iri], queries: Vector[SparqlWriteQuery], endpoint: Iri) {

private def parseUri(id: Iri) = id.resolvedAgainst(endpoint).toUri

def replace(id: Iri, triples: NTriples): BlazegraphBulk =
parseUri(id).fold(
_ => copy(invalidIds = invalidIds + id),
uri => copy(queries = SparqlWriteQuery.replace(uri, triples) :: queries)
uri => copy(queries = queries :+ SparqlWriteQuery.replace(uri, triples))
)

def drop(id: Iri): BlazegraphBulk =
parseUri(id).fold(
_ => copy(invalidIds = invalidIds + id),
uri => copy(queries = SparqlWriteQuery.drop(uri) :: queries)
uri => copy(queries = queries :+ SparqlWriteQuery.drop(uri))
)

}

object BlazegraphBulk {
def empty(endpoint: Iri): BlazegraphBulk = BlazegraphBulk(Set.empty, Vector.empty, endpoint)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
<https://bbp.epfl.ch/resource1> <https://bbp.epfl.ch/outgoing> <https://bbp.epfl.ch/resource2> .
<https://bbp.epfl.ch/resource1> <https://bbp.epfl.ch/outgoing> <https://bbp.epfl.ch/resource3> .
<https://bbp.epfl.ch/resource1> <https://bbp.epfl.ch/outgoing> <https://bbp.epfl.ch/resource4> .
<https://bbp.epfl.ch/resource1> <http://www.w3.org/1999/02/22-rdf-syntax-ns#type> <https://bbp.epfl.ch/resource1/type> .
<https://bbp.epfl.ch/resource1> <https://bluebrain.github.io/nexus/vocabulary/self> <https://bbp.epfl.ch/resource1/self> .
<https://bbp.epfl.ch/resource1> <https://bluebrain.github.io/nexus/vocabulary/constrainedBy> <https://bbp.epfl.ch/resource1/schema> .
<https://bbp.epfl.ch/resource1> <https://bluebrain.github.io/nexus/vocabulary/project> <http://localhost/v1/projects/org/proj> .
<https://bbp.epfl.ch/resource1> <https://bluebrain.github.io/nexus/vocabulary/schemaProject> <http://localhost/v1/projects/org/proj> .
<https://bbp.epfl.ch/resource1> <https://bluebrain.github.io/nexus/vocabulary/rev> "3"^^<http://www.w3.org/2001/XMLSchema#integer> .
<https://bbp.epfl.ch/resource1> <https://bluebrain.github.io/nexus/vocabulary/deprecated> "false"^^<http://www.w3.org/2001/XMLSchema#boolean> .
<https://bbp.epfl.ch/resource1> <https://bluebrain.github.io/nexus/vocabulary/createdAt> "1970-01-01T00:00:00.000Z"^^<http://www.w3.org/2001/XMLSchema#dateTume> .
<https://bbp.epfl.ch/resource1> <https://bluebrain.github.io/nexus/vocabulary/createdBy> <http://localhost/v1/anonymous> .
<https://bbp.epfl.ch/resource1> <https://bluebrain.github.io/nexus/vocabulary/updatedAt> "1970-01-01T00:00:00.000Z"^^<http://www.w3.org/2001/XMLSchema#dateTime> .
<https://bbp.epfl.ch/resource1> <https://bluebrain.github.io/nexus/vocabulary/updatedBy> <http://localhost/v1/anonymous> .
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.indexing

import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.BlazegraphClientSetup
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.client.SparqlQueryResponseType
import ch.epfl.bluebrain.nexus.delta.rdf.IriOrBNode.Iri
import ch.epfl.bluebrain.nexus.delta.rdf.Vocabulary.nxv
import ch.epfl.bluebrain.nexus.delta.rdf.RdfError.InvalidIri
import ch.epfl.bluebrain.nexus.delta.rdf.graph.{Graph, NTriples}
Expand All @@ -27,10 +28,14 @@ class BlazegraphSinkSuite extends BioSuite with BlazegraphClientSetup.Fixture wi

private lazy val client = blazegraphClient()
private val namespace = "test_sink"
private lazy val sink = new BlazegraphSink(client, 2, 50.millis, namespace)

private val resource1Id = iri"https://bbp.epfl.ch/resource1"
private val resource1Ntriples = NTriples(contentOf("sparql/resource1.ntriples"), resource1Id)
def createSink(namespace: String) = new BlazegraphSink(client, 2, 50.millis, namespace)

private lazy val sink = createSink(namespace)

private val resource1Id = iri"https://bbp.epfl.ch/resource1"
private val resource1Ntriples = NTriples(contentOf("sparql/resource1.ntriples"), resource1Id)
private val resource1NtriplesUpdated = NTriples(contentOf("sparql/resource1_updated.ntriples"), resource1Id)

private val resource2Id = iri"https://bbp.epfl.ch/resource2"
private val resource2Ntriples = NTriples(contentOf("sparql/resource2.ntriples"), resource2Id)
Expand All @@ -48,58 +53,99 @@ class BlazegraphSinkSuite extends BioSuite with BlazegraphClientSetup.Fixture wi
resource3Id -> resource3Ntriples
)

private val resource123Graph = allResources.foldLeft(Graph.empty) { case (acc, (_, ntriples)) =>
private def asElems(chunk: Chunk[(Iri, NTriples)]) =
chunk.zipWithIndex.map { case ((id, ntriples), index) =>
SuccessElem(entityType, id, None, Instant.EPOCH, Offset.at(index.toLong + 1), ntriples, 1)
}

private def createGraph(chunk: Chunk[(Iri, NTriples)]) = chunk.foldLeft(Graph.empty) { case (acc, (_, ntriples)) =>
acc ++ Graph(ntriples).getOrElse(Graph.empty)
}

private val resource13Graph = List(resource1Ntriples, resource3Ntriples).foldLeft(Graph.empty) {
case (acc, ntriples) => acc ++ Graph(ntriples).getOrElse(Graph.empty)
}
private def dropped(id: Iri, offset: Offset) = DroppedElem(entityType, id, None, Instant.EPOCH, offset, 1)

private def query(namespace: String) =
client
.query(Set(namespace), constructQuery, SparqlQueryResponseType.SparqlNTriples)
.map { response => Graph(response.value).toOption }

test("Create the namespace") {
client.createNamespace(namespace)
}

test("Push a chunk of elements and retrieve them") {
val chunk = allResources.zipWithIndex.map { case ((id, ntriples), index) =>
SuccessElem(entityType, id, None, Instant.EPOCH, Offset.at(index.toLong + 1), ntriples, 1)
}
val input = asElems(allResources)
val expected = createGraph(allResources)

for {
_ <- sink.apply(chunk).assert(chunk.map(_.void))
_ <- client
.query(Set(namespace), constructQuery, SparqlQueryResponseType.SparqlNTriples)
.map { response => Graph(response.value).toOption }
.assertSome(resource123Graph)
_ <- sink.apply(asElems(allResources)).assert(input.map(_.void))
_ <- query(namespace).assertSome(expected)
} yield ()
}

test("Delete dropped elements from the namespace") {
val chunk = Chunk(
DroppedElem(entityType, resource2Id, None, Instant.EPOCH, Offset.at(4L), 1)
)
val input = Chunk(dropped(resource2Id, Offset.at(4L)))

val expected = createGraph(Chunk(resource1Id -> resource1Ntriples, resource3Id -> resource3Ntriples))

for {
_ <- sink.apply(chunk).assert(chunk.map(_.void))
_ <- client
.query(Set(namespace), constructQuery, SparqlQueryResponseType.SparqlNTriples)
.map { response => Graph(response.value).toOption }
.assertSome(resource13Graph)
_ <- sink.apply(input).assert(input.map(_.void))
_ <- query(namespace).assertSome(expected)
} yield ()

}

test("Report errors when the id is not a valid absolute iri") {
val chunk = Chunk(
val chunk = Chunk(
SuccessElem(entityType, nxv + "é-wrong", None, Instant.EPOCH, Offset.at(5L), resource1Ntriples, 1)
)
val expected = createGraph(Chunk(resource1Id -> resource1Ntriples, resource3Id -> resource3Ntriples))

for {
_ <- sink.apply(chunk).assert(chunk.map(_.failed(InvalidIri)))
_ <- client
.query(Set(namespace), constructQuery, SparqlQueryResponseType.SparqlNTriples)
.map { response => Graph(response.value).toOption }
.assertSome(resource13Graph)
_ <- query(namespace).assertSome(expected)
} yield ()
}

test("When the same resource appears twice in a chunk, only the last update prevails") {
val namespace = "test_last_update"
val sink = createSink(namespace)

val input = Chunk(
resource1Id -> resource1Ntriples,
resource2Id -> resource2Ntriples,
resource1Id -> resource1NtriplesUpdated
)

val expected = createGraph(Chunk(resource2Id -> resource2Ntriples, resource1Id -> resource1NtriplesUpdated))

for {
_ <- client.createNamespace(namespace).assert(true)
_ <- sink.apply(asElems(input))
_ <- query(namespace).assertSome(expected)
} yield ()
}

test("When the same resource appears twice in a chunk, only the last delete prevails") {
val namespace = "test_last_delete"
val sink = createSink(namespace)

val indexingChunk = asElems(
Chunk(
resource1Id -> resource1Ntriples,
resource2Id -> resource2Ntriples
)
)

val deleteChunk = Chunk.singleton(dropped(resource1Id, Offset.at(3L)))
val chunk = Chunk.concat(List(indexingChunk, deleteChunk))

val expected = createGraph(Chunk.singleton(resource2Id -> resource2Ntriples))

for {
_ <- client.createNamespace(namespace).assert(true)
_ <- sink.apply(chunk)
_ <- query(namespace).assertSome(expected)
} yield ()
}

Expand Down

0 comments on commit badb008

Please sign in to comment.