From neo4j-skills
Reads from and writes to Neo4j using Apache Spark or Databricks with the Neo4j Connector. Covers SparkSession setup, DataFrame reads/writes, partition tuning, and Delta Lake pipelines.
How this skill is triggered — by the user, by Claude, or both
Slash command
/neo4j-skills:neo4j-spark-skillThis skill is limited to the following tools:
The summary Claude sees in its skill listing — used to decide when to auto-load this skill
- Reading Neo4j nodes/relationships into Spark DataFrames
neo4j-driver-python-skillneo4j-cypher-skillneo4j-gds-skillneo4j-spring-data-skill| Connector | Spark | Scala | Databricks Runtime | Neo4j |
|---|---|---|---|---|
| 5.4.x | 3.3, 3.4, 3.5 | 2.12, 2.13 | 12.2, 13.3, 14.3 LTS | 4.4, 5.x, 2025.x |
Maven artifact (Scala 2.12, Spark 3):
org.neo4j:neo4j-connector-apache-spark_2.12:5.4.2_for_spark_3
Scala 2.13 variant:
org.neo4j:neo4j-connector-apache-spark_2.13:5.4.2_for_spark_3
from pyspark.sql import SparkSession
spark = (SparkSession.builder
.appName("neo4j-app")
.config("spark.jars.packages",
"org.neo4j:neo4j-connector-apache-spark_2.12:5.4.2_for_spark_3")
.config("neo4j.url", "neo4j+s://xxxx.databases.neo4j.io")
.config("neo4j.authentication.type", "basic")
.config("neo4j.authentication.basic.username", "neo4j")
.config("neo4j.authentication.basic.password", "password")
.getOrCreate())
val spark = SparkSession.builder
.appName("neo4j-app")
.config("spark.jars.packages",
"org.neo4j:neo4j-connector-apache-spark_2.12:5.4.2_for_spark_3")
.config("neo4j.url", "neo4j+s://xxxx.databases.neo4j.io")
.config("neo4j.authentication.type", "basic")
.config("neo4j.authentication.basic.username", "neo4j")
.config("neo4j.authentication.basic.password", "password")
.getOrCreate()
org.neo4j:neo4j-connector-apache-spark_2.12 — match Scala version to runtimeneo4j.url neo4j+s://xxxx.databases.neo4j.io
neo4j.authentication.type basic
neo4j.authentication.basic.username {{secrets/neo4j/username}}
neo4j.authentication.basic.password {{secrets/neo4j/password}}
# Store credentials once:
# databricks secrets create-scope --scope neo4j
# databricks secrets put --scope neo4j --key url
# databricks secrets put --scope neo4j --key username
# databricks secrets put --scope neo4j --key password
neo4j_url = dbutils.secrets.get(scope="neo4j", key="url")
neo4j_user = dbutils.secrets.get(scope="neo4j", key="username")
neo4j_pass = dbutils.secrets.get(scope="neo4j", key="password")
spark.conf.set("neo4j.url", neo4j_url)
spark.conf.set("neo4j.authentication.type", "basic")
spark.conf.set("neo4j.authentication.basic.username", neo4j_user)
spark.conf.set("neo4j.authentication.basic.password", neo4j_pass)
| Option | Description | Default |
|---|---|---|
neo4j.url | Bolt/Neo4j URI | — (required) |
neo4j.authentication.type | none, basic, kerberos, bearer | basic |
neo4j.authentication.basic.username | Username | driver default |
neo4j.authentication.basic.password | Password | driver default |
neo4j.authentication.bearer.token | Bearer token | — |
neo4j.database | Target database | driver default |
neo4j.access.mode | read or write | read |
neo4j.encryption.enabled | TLS (ignored with +s/+ssc URI) | false |
Three mutually exclusive read modes — use exactly one per .read() call.
# PySpark
df = (spark.read.format("org.neo4j.spark.DataSource")
.option("labels", ":Person")
.load())
df.printSchema()
df.show()
// Scala
val df = spark.read
.format("org.neo4j.spark.DataSource")
.option("labels", ":Person")
.load()
Multi-label filter (AND): .option("labels", ":Person:Employee")
Result includes <id> (internal Neo4j id) and <labels> columns.
df = (spark.read.format("org.neo4j.spark.DataSource")
.option("query", "MATCH (p:Person)-[:ACTED_IN]->(m:Movie) RETURN p.name AS actor, m.title AS movie, m.year AS year")
.load())
Use explicit RETURN aliases — they become DataFrame column names. No SKIP/LIMIT in query (connector handles pagination).
df = (spark.read.format("org.neo4j.spark.DataSource")
.option("relationship", "BOUGHT")
.option("relationship.source.labels", ":Customer")
.option("relationship.target.labels", ":Product")
.load())
Result columns: <rel.id>, <rel.type>, <source.*>, <target.*>, plus relationship properties.
df = (spark.read.format("org.neo4j.spark.DataSource")
.option("labels", ":Transaction")
.option("partitions", "10") # parallel partitions (default: 1)
.option("batch.size", "5000") # rows per partition batch (default: 5000)
.option("schema.flatten.limit", "100") # rows sampled for schema inference
.load())
Full read options reference: references/read-patterns.md
| SaveMode | Cypher | Requires |
|---|---|---|
Append | CREATE | nothing extra |
Overwrite | MERGE | node.keys (nodes) or *.node.keys (rels) |
ErrorIfExists | CREATE + error if exists | — |
Always create uniqueness constraints on node.keys properties before writing in Overwrite mode.
from pyspark.sql import Row
people = spark.createDataFrame([
{"name": "Alice", "age": 30},
{"name": "Bob", "age": 25},
])
(people.write.format("org.neo4j.spark.DataSource")
.mode("Append")
.option("labels", ":Person")
.save())
(people.write.format("org.neo4j.spark.DataSource")
.mode("Overwrite")
.option("labels", ":Person")
.option("node.keys", "name") # comma-separated; df_col:node_prop if names differ
.save())
node.keys with rename: .option("node.keys", "df_col:node_property,id:personId")
import org.apache.spark.sql.SaveMode
peopleDF.write
.format("org.neo4j.spark.DataSource")
.mode(SaveMode.Overwrite)
.option("labels", ":Person")
.option("node.keys", "name")
.save()
Use coalesce(1) before relationship writes to avoid deadlocks.
rel_df = spark.createDataFrame([
{"cust_id": "C1", "prod_id": "P1", "qty": 3},
{"cust_id": "C2", "prod_id": "P2", "qty": 1},
])
(rel_df.coalesce(1)
.write.format("org.neo4j.spark.DataSource")
.mode("Append")
.option("relationship", "BOUGHT")
.option("relationship.save.strategy", "keys")
.option("relationship.source.labels", ":Customer")
.option("relationship.source.save.mode", "Match") # require existing nodes
.option("relationship.source.node.keys", "cust_id:id")
.option("relationship.target.labels", ":Product")
.option("relationship.target.save.mode", "Match")
.option("relationship.target.node.keys", "prod_id:id")
.option("relationship.properties", "qty:quantity")
.save())
relationship.source.save.mode / relationship.target.save.mode:
Match — find existing nodes (fail if missing)Append — always CREATE new nodesOverwrite — MERGE nodesFull write options reference: references/write-patterns.md
# Read from Delta table (Unity Catalog or DBFS)
delta_df = spark.read.format("delta").table("catalog.schema.customers")
# Optional: filter/transform in Spark before writing
filtered = delta_df.filter("active = true").select("customer_id", "name", "region")
# Write to Neo4j
(filtered.write.format("org.neo4j.spark.DataSource")
.mode("Overwrite")
.option("labels", ":Customer")
.option("node.keys", "customer_id")
.option("batch.size", "20000")
.save())
Pipeline pattern for relationships — load both node sets first, then write edges:
# Step 1: ensure nodes exist
customers_df.write.format("org.neo4j.spark.DataSource").mode("Overwrite") \
.option("labels", ":Customer").option("node.keys", "customer_id").save()
products_df.write.format("org.neo4j.spark.DataSource").mode("Overwrite") \
.option("labels", ":Product").option("node.keys", "product_id").save()
# Step 2: write relationships (single partition)
orders_df.coalesce(1).write.format("org.neo4j.spark.DataSource").mode("Append") \
.option("relationship", "ORDERED") \
.option("relationship.save.strategy", "keys") \
.option("relationship.source.labels", ":Customer") \
.option("relationship.source.save.mode", "Match") \
.option("relationship.source.node.keys", "customer_id:customer_id") \
.option("relationship.target.labels", ":Product") \
.option("relationship.target.save.mode", "Match") \
.option("relationship.target.node.keys", "product_id:product_id") \
.save()
| Scenario | Recommendation |
|---|---|
| Node writes (no lock contention) | repartition(N) where N ≤ Neo4j CPU cores |
| Relationship writes (lock risk) | coalesce(1) — single partition |
| Large datasets | batch.size 10000–20000 (adjust to heap) |
| MERGE-heavy loads | Add uniqueness constraint on node.keys properties first |
# Aggressive batch — monitor Neo4j heap; OOM risk above 50k
(big_df.repartition(8)
.write.format("org.neo4j.spark.DataSource")
.mode("Overwrite")
.option("labels", ":Event")
.option("node.keys", "event_id")
.option("batch.size", "20000")
.save())
| Error | Cause | Fix |
|---|---|---|
ClassNotFoundException: org.neo4j.spark.DataSource | JAR not on classpath | Add spark.jars.packages or attach library |
| Deadlock on relationship write | Multiple partitions locking nodes | coalesce(1) before write |
| Duplicate nodes on Overwrite | No uniqueness constraint on keys | CREATE CONSTRAINT ON (n:Label) ASSERT n.prop IS UNIQUE |
| OOM on Neo4j side | batch.size too large | Reduce to 5000–10000; check heap |
Schema all string columns | No APOC, schema not sampled | Set schema.flatten.limit higher; or use query mode with explicit types |
Access mode is read error on write | Session opened in read mode | Remove neo4j.access.mode or set to write |
| Databricks Shared cluster fails | Unity Catalog shared mode unsupported | Switch to Single User access mode |
_for_spark_3)node.keys set when using Overwrite modenode.keys properties before MERGE writescoalesce(1) applied before relationship writesbatch.size sized to Neo4j heap (start 5000, tune up)query mode: no SKIP/LIMIT in Cypher (connector paginates internally)npx claudepluginhub neo4j-contrib/neo4j-skills --plugin neo4j-skillsConfigures and operates the Neo4j Connector for Kafka (sink + source) and native CDC API. Covers Cypher/Pattern/CUD sink strategies, CDC-based and query-based source, exactly-once semantics, DLQ error handling, Confluent Cloud managed connector, schema registry (Avro/JSON), and native db.cdc.query cursor-loop patterns.
Expert guidance for Azure Databricks covering troubleshooting, best practices, architecture, deployment, Unity Catalog, Delta Live Tables, Model Serving, and Databricks SQL. Activates when working with Azure Databricks tools and services.
Develops Lakeflow Spark Declarative Pipelines on Databricks for batch and streaming data pipelines using Python or SQL. Guides dataset types like Streaming Tables and features like Auto Loader, Auto CDC via decision tree.