Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spark Connector sends FloatArray as String #666

Open
piotrkan opened this issue Aug 30, 2024 · 3 comments
Open

Spark Connector sends FloatArray as String #666

piotrkan opened this issue Aug 30, 2024 · 3 comments

Comments

@piotrkan
Copy link

Describe the bug
I am using node2vec to generate graph embeddings for my graph. I use the following code and it works fine - it successfully saves the embeddings in the graph stored in neo4j under 'topological embeddings' property name

gds.node2vec.write(graph, writeProperty='topological_embeddings)

But when I am reading the graph in the next step when conducting dimensionality reduction (df corresponds to a spark dataframe with nodes and topological embeddings as cols):

df = df.withColumn("features", array_to_vector('topological_embeddings))

I get the following error:

pyspark.errors.exceptions.captured.AnalysisException: [UNSUPPORTED_DESERIALIZER.DATA_TYPE_MISMATCH] The deserializer is not supported: need a(n) "ARRAY" field but got "STRING".  

This is not happening when I use GraphSage and write the embeddings in the following format

model, attr =  gds.beta.graphsage.train(graph) 
model = gds.model.get('topological_embeddings') 
model.predict_write(graph, writeProperty='topological_embeddings')

Have you experienced anything like this? seems like the .write() function in the python client saves the embeddings as strings?

graphdatascience library version:
GDS plugin version: 2.7.0
Python version: 3.11
Neo4j version: 5.21.0
Operating system: macOS

@jjaderberg
Copy link

Hi @piotrkan 👋

Can you share a fuller example? What happens between your gds.node2vec.write and df.withColumn calls? How do you get the dataframe from Neo4j?

Node2Vec in write mode writes arrays of 32-bit floating point values.

@piotrkan
Copy link
Author

piotrkan commented Sep 4, 2024

Hi @jjaderberg sorry for delayed reply!

I reproduced the error in the following code - I now realize that the problem is not in writing the embeddings but reading them using pyspark and neo4j - do you have any idea what's causing the issue?

from graphdatascience import GraphDataScience as gds
from neo4j import GraphDatabase
from graphdatascience import GraphDataScience
import numpy as np
from pyspark.sql import SparkSession

uri = "bolt://127.0.0.1:7687"
driver = GraphDatabase.driver(uri, auth=("user", "password"))

with driver.session() as session:
    result = session.run("""
    CREATE (alice:Person {name: 'Alice'})
    CREATE (bob:Person {name: 'Bob'})
    CREATE (carol:Person {name: 'Carol'})
    CREATE (dave:Person {name: 'Dave'})
    CREATE (eve:Person {name: 'Eve'})
    CREATE (guitar:Instrument {name: 'Guitar'})
    CREATE (synth:Instrument {name: 'Synthesizer'})
    CREATE (bongos:Instrument {name: 'Bongos'})
    CREATE (trumpet:Instrument {name: 'Trumpet'})

    CREATE (alice)-[:LIKES]->(guitar)
    CREATE (alice)-[:LIKES]->(synth)
    CREATE (alice)-[:LIKES]->(bongos)
    CREATE (bob)-[:LIKES]->(guitar)
    CREATE (bob)-[:LIKES]->(synth)
    CREATE (carol)-[:LIKES]->(bongos)
    CREATE (dave)-[:LIKES]->(guitar)
    CREATE (dave)-[:LIKES]->(synth)
    CREATE (dave)-[:LIKES]->(bongos);

    """)
with driver.session() as session:
    result = session.run("""
        CALL gds.graph.project(
            'myGraph',
            ['Person', 'Instrument'], // Node labels
            {
                LIKES: { orientation: 'UNDIRECTED' }
            }
        )
        YIELD graphName, nodeCount, relationshipCount
        RETURN graphName, nodeCount, relationshipCount
        """)



gds = GraphDataScience("bolt://127.0.0.1:7687", auth=('user', 'password'))
G = gds.graph.get('myGraph')

attr = gds.node2vec.write(G=G, writeProperty='topological_embedding')

topo_list = []
with driver.session() as session:
    result = session.run("""MATCH (n) RETURN n.topological_embedding as topological_embedding
        """)
    for record in result:
        topo_list.append(record['topological_embedding'])

spark = SparkSession.builder \
    .appName("Neo4j-Spark-Connector") \
    .config("spark.neo4j.url", 'bolt://127.0.0.1:7687') \
    .config("spark.jars", "gcs-connector-hadoop3-2.2.2-shaded.jar") \
    .config('spark.jars.packages', 'com.google.cloud.spark:spark-3.5-bigquery:0.39.0,org.neo4j:neo4j-connector-apache-spark_2.12:5.3.0_for_spark_3')\
    .config('spark.hadoop.fs.gs.impl', 'com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem')\
    .getOrCreate()

load_args = {'partitions':16,'labels':'Person'}

# Load data from Neo4j using the `labels` option
df = spark.read.format("org.neo4j.spark.DataSource") \
    .option("labels", "Person") \
    .option("partitions", 16) \
    .option("url", 'bolt://127.0.0.1:7687')\
    .options(**neo4j_credentials)\
    .options(**load_args).load()
print(df)

What this returns is the following DataFrame, where topological_embedding is a string not an array

DataFrame[<id>: bigint, <labels>: array<string>, topological_embedding: string, name: string]

@jjaderberg
Copy link

@piotrkan I agree that your reproducer suggests something goes wrong reading the float array node properties via pyspark. I have raised it with the maintainers of the Neo4j Spark Connector.

If you want to work around the problem you should be able to convert the float array property values to double array values with the toFloatList Cypher function.

result = session.run("MATCH (n) RETURN toFloatList(n.topological_embedding) as topological_embedding")

and for the label-based loading with pyspark and the Spark Connector you can overwrite the property with the type converted double array value

MATCH (n:Person) SET n.emb = toFloatList(n.emb)

If you do this before reading the Person label with pyspark, it should succeed (assuming our hypothesis is correct).

N.B. "float" in the function name "toFloatList" refers to Cypher floating point type, which is 64 bits. Node2Vec writes 32 bit floating point values for smaller memory footprint. Some languages use "float" for the 32-bit type and "double" for the 64-bit type. Neo4j supports storing both of these types and the Cypher runtime can also handle both. But in the Cypher language there is only a single floating point type and it is 64 bits.

It's unfortunate the Node2Vec result didn't work to consume via pyspark, hopefully the Connector maintainers can confirm or debunk our hypothesis and address the problem if it is indeed missing support in the Connector.

@jjaderberg jjaderberg changed the title Node2Vec writes embedding array as a string Spark Connector sends FloatArray as String Sep 16, 2024
@jjaderberg jjaderberg transferred this issue from neo4j/graph-data-science-client Sep 17, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants