Skip to content

Commit

Permalink
Support Neo4j 5.0 indexes and constraints syntax changes (#449)
Browse files Browse the repository at this point in the history
  • Loading branch information
conker84 committed Oct 4, 2022
1 parent 53e76d1 commit 81f5c28
Show file tree
Hide file tree
Showing 9 changed files with 100 additions and 165 deletions.
83 changes: 25 additions & 58 deletions common/src/main/scala/org/neo4j/spark/service/SchemaService.scala
Original file line number Diff line number Diff line change
Expand Up @@ -452,48 +452,36 @@ class SchemaService(private val options: Neo4jOptions, private val driverCache:
case OptimizationType.NONE => log.info("No optimization type provided")
case _ => {
try {
val isNeo4j35 = neo4jVersion().versions(0).startsWith("3.5")
val quotedLabel = label.quote()
val quotedProps = props
.map(prop => s"${Neo4jUtil.NODE_ALIAS}.${prop.quote()}")
.mkString(", ")
val isNeo4j5 = neo4jVersion().versions(0).startsWith("5.")
val uniqueFieldName = if (isNeo4j5) "owningConstraint" else "uniqueness"
val dashSeparatedProps = props.mkString("-")
val (querySuffix, uniqueness) = action match {
case OptimizationType.INDEX => {
if (isNeo4j35) {
(s"ON :$quotedLabel(${props.map(_.quote()).mkString(",")})", "node_label_property")
} else {
(s"FOR (${Neo4jUtil.NODE_ALIAS}:$quotedLabel) ON ($quotedProps)", "NONUNIQUE")
}
}
val (querySuffix, uniqueCondition) = action match {
case OptimizationType.INDEX => (s"FOR (${Neo4jUtil.NODE_ALIAS}:$quotedLabel) ON ($quotedProps)",
if (isNeo4j5) s"$uniqueFieldName IS NULL" else s"$uniqueFieldName = 'NONUNIQUE'")
case OptimizationType.NODE_CONSTRAINTS => {
val assertType = if (props.size > 1) "NODE KEY" else "UNIQUE"
val uniquenessValue = if (isNeo4j35) {
"node_unique_property"
} else {
"UNIQUE"
}
(s"ON (${Neo4jUtil.NODE_ALIAS}:$quotedLabel) ASSERT ($quotedProps) IS $assertType", uniquenessValue)
(s"FOR (${Neo4jUtil.NODE_ALIAS}:$quotedLabel) REQUIRE ($quotedProps) IS $assertType",
if (isNeo4j5) s"$uniqueFieldName IS NOT NULL" else s"$uniqueFieldName = 'UNIQUE'")
}
}
val (labelIndexFieldName, uniqueFieldName, actionName) = if (isNeo4j35) {
("tokenNames", "type", "")
} else {
("labelsOrTypes", "uniqueness", s"spark_${action.toString}_${label}_$dashSeparatedProps".quote())
}
val actionName = s"spark_${action.toString}_${label}_$dashSeparatedProps".quote()
val queryPrefix = action match {
case OptimizationType.INDEX => s"CREATE INDEX $actionName"
case OptimizationType.NODE_CONSTRAINTS => s"CREATE CONSTRAINT $actionName"
}
val queryCheck =
s"""CALL db.indexes() YIELD $labelIndexFieldName, properties, $uniqueFieldName
|WHERE $labelIndexFieldName = ${'$'}labels
s"""SHOW INDEXES YIELD labelsOrTypes, properties, $uniqueFieldName
|WHERE labelsOrTypes = ${'$'}labels
|AND properties = ${'$'}properties
|AND $uniqueFieldName = ${'$'}uniqueness
|AND $uniqueCondition
|RETURN count(*) > 0 AS isPresent""".stripMargin
val isPresent = session.run(queryCheck, Map("labels" -> Seq(label).asJava,
"properties" -> props.asJava,
"uniqueness" -> uniqueness).asJava)
val params: util.Map[String, AnyRef] = Map("labels" -> Seq(label).asJava,
"properties" -> props.asJava).asJava.asInstanceOf[util.Map[String, AnyRef]]
val isPresent = session.run(queryCheck, params)
.single()
.get("isPresent")
.asBoolean()
Expand All @@ -514,42 +502,21 @@ class SchemaService(private val options: Neo4jOptions, private val driverCache:
}

def checkIndex(indexType: OptimizationType.Value, label: String, props: Seq[String]): Boolean = try {
val isNeo4j35 = neo4jVersion().versions(0).startsWith("3.5")
val quotedLabel = label.quote()
val quotedProps = props
.map(prop => s"${Neo4jUtil.NODE_ALIAS}.${prop.quote()}")
.mkString(", ")
val dashSeparatedProps = props.mkString("-")
val uniqueness = indexType match {
case OptimizationType.INDEX => {
if (isNeo4j35) {
"node_label_property"
} else {
"NONUNIQUE"
}
}
case OptimizationType.NODE_CONSTRAINTS => {
if (isNeo4j35) {
"node_unique_property"
} else {
"UNIQUE"
}
}
}
val (labelIndexFieldName, uniqueFieldName) = if (isNeo4j35) {
("tokenNames", "type")
} else {
("labelsOrTypes", "uniqueness")
val isNeo4j5 = neo4jVersion().versions(0).startsWith("5.")
val uniqueFieldName = if (isNeo4j5) "owningConstraint" else "uniqueness"
val uniqueCondition = indexType match {
case OptimizationType.INDEX => if (isNeo4j5) s"$uniqueFieldName = NULL" else s"$uniqueFieldName = 'NONUNIQUE'"
case OptimizationType.NODE_CONSTRAINTS => if (isNeo4j5) s"$uniqueFieldName IS NOT NULL" else s"$uniqueFieldName = 'UNIQUE'"
}
val queryCheck =
s"""CALL db.indexes() YIELD $labelIndexFieldName, properties, $uniqueFieldName
|WHERE $labelIndexFieldName = ${'$'}labels
s"""SHOW INDEXES YIELD labelsOrTypes, properties, $uniqueFieldName
|WHERE labelsOrTypes = ${'$'}labels
|AND properties = ${'$'}properties
|AND $uniqueFieldName = ${'$'}uniqueness
|AND $uniqueCondition
|RETURN count(*) > 0 AS isPresent""".stripMargin
session.run(queryCheck, Map("labels" -> Seq(label).asJava,
"properties" -> props.asJava,
"uniqueness" -> uniqueness).asJava)
val params: util.Map[String, AnyRef] = Map("labels" -> Seq(label).asJava,
"properties" -> props.asJava).asJava.asInstanceOf[util.Map[String, AnyRef]]
session.run(queryCheck, params)
.single()
.get("isPresent")
.asBoolean()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ case class ValidateReadStreaming(neo4jOptions: Neo4jOptions, jobId: String) exte
|The connector need to store intermediate results
|for pushing the data into Streaming tables.
|Please define a constraint into your Neo4j instance in this way:
|`CREATE CONSTRAINT ON (n:${Neo4jAccumulator.LABEL}) ASSERT (n.${Neo4jAccumulator.KEY}) IS UNIQUE`
|`CREATE CONSTRAINT FOR (n:${Neo4jAccumulator.LABEL}) REQUIRE (n.${Neo4jAccumulator.KEY}) IS UNIQUE`
|""".stripMargin)
}
case _ =>
Expand Down
8 changes: 4 additions & 4 deletions doc/docs/modules/ROOT/pages/writing.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -667,7 +667,7 @@ ds.write
Before the import starts, the code above creates the following schema query:

----
CREATE CONSTRAINT ON (p:Person) ASSERT (p.surname) IS UNIQUE
CREATE CONSTRAINT FOR (p:Person) REQUIRE (p.surname) IS UNIQUE
----

*Take into consideration that the first label is used for the index creation.*
Expand All @@ -687,9 +687,9 @@ ds.write
.option("url", SparkConnectorScalaSuiteIT.server.getBoltUrl)
.option("query", "CREATE (n:Person{fullName: event.name + ' ' + event.surname, age: scriptResult[0].age})")
.option("script",
"""CREATE INDEX ON :Person(surname);
|CREATE CONSTRAINT ON (p:Product)
| ASSERT (p.name, p.sku)
"""CREATE INDEX person_surname FOR (p:Person) ON (p.surname);
|CREATE CONSTRAINT product_name_sku FOR (p:Product)
| REQUIRE (p.name, p.sku)
| IS NODE KEY;
|RETURN 36 AS age;
|""".stripMargin)
Expand Down
9 changes: 8 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
</licenses>

<properties>
<neo4j.version>4.0.8</neo4j.version>
<neo4j.version>4.4.10</neo4j.version>
<driver.version>4.4.9</driver.version>
<neo4j.experimental>false</neo4j.experimental>
<testcontainers.version>1.15.3</testcontainers.version>
Expand Down Expand Up @@ -91,6 +91,13 @@
<neo4j.experimental>false</neo4j.experimental>
</properties>
</profile>
<profile>
<id>neo4j-5.0-dev</id>
<properties>
<neo4j.version>5.0.0-drop09.0</neo4j.version>
<neo4j.experimental>false</neo4j.experimental>
</properties>
</profile>
<!-- end neo4j profiles -->
</profiles>

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ class DataSourceStreamingWriterTSE extends SparkConnectorScalaBaseTSE {
val partition = 5
val checkpointLocation = "/tmp/checkpoint/" + UUID.randomUUID().toString

SparkConnectorScalaSuiteIT.session().run("CREATE CONSTRAINT ON (t:Timestamp) ASSERT (t.value) IS UNIQUE")
SparkConnectorScalaSuiteIT.session().run("CREATE CONSTRAINT timestamp_value FOR (t:Timestamp) REQUIRE (t.value) IS UNIQUE")

query = memStream.toDF().writeStream
.format(classOf[DataSource].getName)
Expand Down Expand Up @@ -208,7 +208,7 @@ class DataSourceStreamingWriterTSE extends SparkConnectorScalaBaseTSE {
}
}, Matchers.equalTo(true), 30L, TimeUnit.SECONDS)

SparkConnectorScalaSuiteIT.session().run("DROP CONSTRAINT ON (t:Timestamp) ASSERT (t.value) IS UNIQUE")
SparkConnectorScalaSuiteIT.session().run("DROP CONSTRAINT timestamp_value")
}

@Test
Expand All @@ -223,8 +223,8 @@ class DataSourceStreamingWriterTSE extends SparkConnectorScalaBaseTSE {
.writeTransaction(
new TransactionWork[Unit] {
override def execute(tx: Transaction): Unit = {
tx.run("CREATE CONSTRAINT ON (p:From) ASSERT p.value IS UNIQUE")
tx.run("CREATE CONSTRAINT ON (p:To) ASSERT p.value IS UNIQUE")
tx.run("CREATE CONSTRAINT From_value FOR (p:From) REQUIRE p.value IS UNIQUE")
tx.run("CREATE CONSTRAINT To_value FOR (p:To) REQUIRE p.value IS UNIQUE")
}
})

Expand Down Expand Up @@ -274,8 +274,8 @@ class DataSourceStreamingWriterTSE extends SparkConnectorScalaBaseTSE {
.writeTransaction(
new TransactionWork[Unit] {
override def execute(tx: Transaction): Unit = {
tx.run("DROP CONSTRAINT ON (p:From) ASSERT p.value IS UNIQUE")
tx.run("DROP CONSTRAINT ON (p:To) ASSERT p.value IS UNIQUE")
tx.run("DROP CONSTRAINT From_value")
tx.run("DROP CONSTRAINT To_value")
}
})
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ class DataSourceWriterNeo4j4xTSE extends SparkConnectorScalaBaseTSE {
.writeTransaction(
new TransactionWork[Unit] {
override def execute(tx: Transaction): Unit = {
tx.run("CREATE CONSTRAINT ON (p:Person) ASSERT p.id IS UNIQUE")
tx.run("CREATE CONSTRAINT ON (p:Product) ASSERT p.id IS UNIQUE")
tx.run("CREATE CONSTRAINT person_id FOR (p:Person) REQUIRE p.id IS UNIQUE")
tx.run("CREATE CONSTRAINT product_id FOR (p:Product) REQUIRE p.id IS UNIQUE")
}
})

Expand Down Expand Up @@ -136,8 +136,8 @@ class DataSourceWriterNeo4j4xTSE extends SparkConnectorScalaBaseTSE {
.writeTransaction(
new TransactionWork[Unit] {
override def execute(tx: Transaction): Unit = {
tx.run("DROP CONSTRAINT ON (p:Person) ASSERT p.id IS UNIQUE")
tx.run("DROP CONSTRAINT ON (p:Product) ASSERT p.id IS UNIQUE")
tx.run("DROP CONSTRAINT person_id")
tx.run("DROP CONSTRAINT product_id")
}
})
}
Expand Down Expand Up @@ -174,8 +174,8 @@ class DataSourceWriterNeo4j4xTSE extends SparkConnectorScalaBaseTSE {
.writeTransaction(
new TransactionWork[Unit] {
override def execute(tx: Transaction): Unit = {
tx.run("CREATE CONSTRAINT ON (p:Person) ASSERT p.id IS UNIQUE")
tx.run("CREATE CONSTRAINT ON (p:Product) ASSERT p.id IS UNIQUE")
tx.run("CREATE CONSTRAINT person_id FOR (p:Person) REQUIRE p.id IS UNIQUE")
tx.run("CREATE CONSTRAINT product_id FOR (p:Product) REQUIRE p.id IS UNIQUE")
}
})

Expand Down Expand Up @@ -254,8 +254,8 @@ class DataSourceWriterNeo4j4xTSE extends SparkConnectorScalaBaseTSE {
.writeTransaction(
new TransactionWork[Unit] {
override def execute(tx: Transaction): Unit = {
tx.run("DROP CONSTRAINT ON (p:Person) ASSERT p.id IS UNIQUE")
tx.run("DROP CONSTRAINT ON (p:Product) ASSERT p.id IS UNIQUE")
tx.run("DROP CONSTRAINT person_id")
tx.run("DROP CONSTRAINT product_id")
}
})
}
Expand Down Expand Up @@ -344,7 +344,7 @@ class DataSourceWriterNeo4j4xTSE extends SparkConnectorScalaBaseTSE {
def `should read and write relations with MERGE and node keys`(): Unit = {
SparkConnectorScalaSuiteIT.session()
.writeTransaction(new TransactionWork[Result] {
override def execute(transaction: Transaction): Result = transaction.run("CREATE CONSTRAINT ON (i:Instrument) ASSERT i.name IS UNIQUE")
override def execute(transaction: Transaction): Result = transaction.run("CREATE CONSTRAINT instrument_name FOR (i:Instrument) REQUIRE i.name IS UNIQUE")
})

val total = 100
Expand Down Expand Up @@ -421,7 +421,7 @@ class DataSourceWriterNeo4j4xTSE extends SparkConnectorScalaBaseTSE {

SparkConnectorScalaSuiteIT.session()
.writeTransaction(new TransactionWork[Result] {
override def execute(transaction: Transaction): Result = transaction.run("DROP CONSTRAINT ON (i:Instrument) ASSERT i.name IS UNIQUE")
override def execute(transaction: Transaction): Result = transaction.run("DROP CONSTRAINT instrument_name")
})
}

Expand Down
Loading

0 comments on commit 81f5c28

Please sign in to comment.