From neo4j-skills
Configures and operates the Neo4j Connector for Kafka (sink + source) and native CDC API. Covers Cypher/Pattern/CUD sink strategies, CDC-based and query-based source, exactly-once semantics, DLQ error handling, Confluent Cloud managed connector, schema registry (Avro/JSON), and native db.cdc.query cursor-loop patterns.
How this skill is triggered — by the user, by Claude, or both
Slash command
/neo4j-skills:neo4j-kafka-skillThis skill is limited to the following tools:
The summary Claude sees in its skill listing — used to decide when to auto-load this skill
- Writing Kafka events into Neo4j (sink connector — Cypher, Pattern, CDC, CUD strategies)
db.cdc.query)neo4j-cypher-skillneo4j-import-skillneo4j-gds-skillneo4j-cypher-skill| Use case | Strategy |
|---|---|
| Custom transformation of Kafka payload → graph | Sink: Cypher |
| Mirror another Neo4j CDC source | Sink: CDC (schema or source-id sub-strategy) |
| Map Kafka JSON fields to graph nodes/rels with no code | Sink: Pattern |
| Consume pre-formatted CUD JSON messages | Sink: CUD |
| Stream all Neo4j changes to Kafka (real-time) | Source: CDC (Neo4j 5.13+ EE/Aura BC/VDC) |
| Stream specific query results on a schedule | Source: Query |
| Consume CDC events in-process, no Kafka | Native CDC API (db.cdc.query) |
{
"neo4j.uri": "neo4j+s://your-instance.databases.neo4j.io:7687",
"neo4j.authentication.type": "BASIC",
"neo4j.authentication.basic.username": "neo4j",
"neo4j.authentication.basic.password": "${file:/opt/secrets.properties:neo4j.password}",
"neo4j.database": "neo4j"
}
Authentication types: BASIC | BEARER | KERBEROS | CUSTOM | NONE
Never hardcode passwords — use Kafka Connect secrets provider (${file:...} or ${env:...}).
Connector auto-prepends UNWIND $events AS __value — write query using __value:
{
"connector.class": "org.neo4j.connectors.kafka.sink.Neo4jConnector",
"topics": "person-creates,person-updates",
"neo4j.uri": "neo4j+s://...",
"neo4j.authentication.type": "BASIC",
"neo4j.authentication.basic.username": "neo4j",
"neo4j.authentication.basic.password": "secret",
"neo4j.cypher.topic.person-creates":
"MERGE (p:Person {id: __value.id}) SET p += __value.properties",
"neo4j.cypher.topic.person-updates":
"MATCH (p:Person {id: __value.id}) SET p += __value.properties",
"neo4j.cypher.bind-value-as": "__value",
"neo4j.cypher.bind-key-as": "__key",
"neo4j.cypher.bind-header-as": "__header"
}
MERGE pattern — idempotent upsert:
MERGE (p:Person {id: __value.id})
ON CREATE SET p.createdAt = datetime(), p += __value.properties
ON MATCH SET p.updatedAt = datetime(), p += __value.properties
No Cypher needed — map message fields to graph via pattern syntax:
{
"neo4j.pattern.topic.users": "(:User{!userId, name, email})",
"neo4j.pattern.topic.friendships":
"(:User{!userId: from.userId})-[:KNOWS{since}]->(:User{!userId: to.userId})"
}
Pattern rules:
!prop = key property (used for MERGE)prop: field.path = map from nested message field* = map all message fields-prop = exclude property (cannot mix with inclusions){
"neo4j.cdc.schema.topics": "neo4j-cdc-events"
}
Or with source-id tracking (stores elementId as property):
{
"neo4j.cdc.source-id.topics": "neo4j-cdc-events",
"neo4j.cdc.source-id.label-name": "SourceEvent",
"neo4j.cdc.source-id.property-name": "sourceId"
}
Requires: connector ≥ 5.3.0, Kafka broker EOS support, and a NODE KEY constraint.
Step 1 — Create constraint:
CREATE CONSTRAINT kafka_offset_key IF NOT EXISTS
FOR (n:__KafkaOffset)
REQUIRE (n.strategy, n.topic, n.partition) IS NODE KEY;
Step 2 — Add to connector config:
{
"neo4j.eos-offset-label": "__KafkaOffset"
}
Without EOS: connector provides at-least-once — write idempotent Cypher (MERGE, not CREATE).
{
"errors.tolerance": "all",
"errors.log.enable": "true",
"errors.log.include.messages": "true",
"errors.deadletterqueue.topic.name": "neo4j-dlq",
"errors.deadletterqueue.context.headers.enable": "true",
"errors.deadletterqueue.topic.replication.factor": "3"
}
errors.tolerance=none (default) — stops on first error. Use all + DLQ for production.
{
"connector.class": "org.neo4j.connectors.kafka.source.Neo4jConnector",
"neo4j.uri": "neo4j+s://...",
"neo4j.authentication.type": "BASIC",
"neo4j.authentication.basic.username": "neo4j",
"neo4j.authentication.basic.password": "secret",
"neo4j.source-strategy": "CDC",
"neo4j.start-from": "NOW",
"neo4j.cdc.poll-interval": "1s",
"neo4j.cdc.poll-duration": "5s",
"neo4j.cdc.topic.person-creates.patterns.0.pattern": "(:Person)",
"neo4j.cdc.topic.person-creates.patterns.0.operation": "CREATE",
"neo4j.cdc.topic.person-updates.patterns.0.pattern": "(:Person)",
"neo4j.cdc.topic.person-updates.patterns.0.operation": "UPDATE",
"neo4j.cdc.topic.person-deletes.patterns.0.pattern": "(:Person)",
"neo4j.cdc.topic.person-deletes.patterns.0.operation": "DELETE"
}
neo4j.start-from options: NOW | EARLIEST | a specific cursor string
Multiple patterns per topic — indexed 0, 1, 2...:
{
"neo4j.cdc.topic.all-changes.patterns.0.pattern": "(:Person)",
"neo4j.cdc.topic.all-changes.patterns.1.pattern": "(:Organization)"
}
Cursor warning: after DB restore from backup, CDC cursors are invalidated. Reconfigure neo4j.start-from.
{
"neo4j.source-strategy": "QUERY",
"neo4j.query": "MATCH (p:Person) WHERE p.updatedAt > $lastCheck RETURN p.id AS id, p.name AS name, p.updatedAt AS updatedAt",
"neo4j.query.streaming-property": "updatedAt",
"neo4j.query.topic": "person-changes",
"neo4j.query.polling-interval": "5s",
"neo4j.query.polling-duration": "10s"
}
$lastCheck is auto-injected by connector. neo4j.query.streaming-property must be returned by the query and should be indexed.
Requires: Neo4j 5.13+ Enterprise, AuraDB BC, or AuraDB VDC.
Enable CDC first (self-managed — set in neo4j.conf):
db.cdc.enabled=true
On Aura: enabled by default on eligible tiers.
// Get cursor for "right now" — start tracking from this point forward
CALL db.cdc.current() YIELD id RETURN id AS cursor;
// Get earliest available cursor (replay from history start)
CALL db.cdc.earliest() YIELD id RETURN id AS cursor;
Cursors are exclusive: db.cdc.current() does NOT include the transaction it points to.
// All changes since cursor
CALL db.cdc.query($cursor, []) YIELD id, txId, seq, metadata, event
RETURN id, txId, seq, metadata, event
ORDER BY txId, seq;
Filtered — nodes with label Person, CREATE only:
CALL db.cdc.query($cursor, [
{select: 'n', labels: ['Person'], operation: 'c'}
]) YIELD id, txId, seq, event
RETURN id, event.state.after.properties AS newProps
ORDER BY txId, seq;
Filtered — specific relationship type with property change tracking:
CALL db.cdc.query($cursor, [
{select: 'r', type: 'KNOWS', changesTo: ['since', 'strength']}
]) YIELD id, txId, seq, event
RETURN id, event.state.before AS before, event.state.after AS after;
| Field | Values | Applies to |
|---|---|---|
select | 'e' (all), 'n' (nodes), 'r' (rels) | both |
operation | 'c' (create), 'u' (update), 'd' (delete) | both |
labels | ['Label1','Label2'] (node must have ALL) | nodes |
type | 'REL_TYPE' | relationships |
elementId | specific element ID string | both |
key | {propName: value} (requires key constraint) | both |
changesTo | ['prop1','prop2'] (AND — all must change) | both |
authenticatedUser | username string | both |
executingUser | username string | both |
txMetadata | {key: value} | both |
{
id: STRING, // cursor for this event (use as next $cursor)
txId: INTEGER, // transaction ID
seq: INTEGER, // ordering within transaction
metadata: {
executingUser: STRING,
authenticatedUser: STRING,
captureMode: STRING, // "DIFF" or "FULL"
txStartTime: DATETIME,
txCommitTime: DATETIME,
txMetadata: MAP
},
event: {
elementId: STRING,
eventType: STRING, // "n" or "r"
operation: STRING, // "c", "u", "d"
labels: [STRING], // nodes only
type: STRING, // relationships only
keys: MAP,
state: {
before: { properties: MAP }, // null on CREATE
after: { properties: MAP } // null on DELETE
}
}
}
from neo4j import GraphDatabase
driver = GraphDatabase.driver("neo4j+s://...", auth=("neo4j", "password"))
def poll_changes(cursor: str, selectors: list) -> tuple[list, str]:
records, _, _ = driver.execute_query(
"CALL db.cdc.query($cursor, $selectors) YIELD id, txId, seq, event "
"RETURN id, txId, seq, event ORDER BY txId, seq",
cursor=cursor, selectors=selectors,
database_="neo4j"
)
events = [r.data() for r in records]
# Advance cursor to last event id; keep current if no events
next_cursor = events[-1]["id"] if events else cursor
return events, next_cursor
# Bootstrap
with driver.session(database="neo4j") as s:
cursor = s.run("CALL db.cdc.current() YIELD id RETURN id").single()["id"]
selectors = [{"select": "n", "labels": ["Person"]}]
import time
while True:
events, cursor = poll_changes(cursor, selectors)
for e in events:
print(e["event"]["operation"], e["event"]["elementId"])
time.sleep(1)
Confluent Cloud hosts the Neo4j Sink connector as a fully managed service (no JAR upload needed).
Config differences vs self-managed:
connector.class field — selected in UI/APIRequired Confluent Cloud fields:
{
"kafka.auth.mode": "KAFKA_API_KEY",
"kafka.api.key": "...",
"kafka.api.secret": "...",
"input.data.format": "JSON",
"neo4j.uri": "neo4j+s://...",
"neo4j.authentication.type": "BASIC",
"neo4j.authentication.basic.username": "neo4j",
"neo4j.authentication.basic.password": "..."
}
One strategy per topic — cannot mix Cypher and Pattern on same topic.
Source connector always generates messages with schema support — must configure converters:
{
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "https://your-schema-registry",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "https://your-schema-registry"
}
For JSON Schema:
{
"value.converter": "io.confluent.connect.json.JsonSchemaConverter",
"value.converter.schema.registry.url": "https://..."
}
Sink converter must match source — Avro sink cannot consume JSON schema source messages.
| Error | Cause | Fix |
|---|---|---|
CDC is not enabled | db.cdc.enabled not set / wrong tier | Enable in neo4j.conf or upgrade to EE/BC/VDC |
Invalid cursor after DB restore | Backup invalidates cursors | Reset neo4j.start-from to NOW or EARLIEST |
Cannot merge node using null | Key property missing in message | Validate message schema; add null check in Cypher |
| Messages replayed after restart | No EOS configured | Add neo4j.eos-offset-label + NODE KEY constraint |
| Connector stops on bad message | errors.tolerance=none (default) | Set errors.tolerance=all + DLQ topic |
SchemaException on sink | Converter mismatch source/sink | Match key/value converters on both ends |
Empty events from db.cdc.query | Cursor points to current | Use db.cdc.earliest() to replay; wait for new txns |
neo4j.eos-offset-labelerrors.tolerance=all + DLQ configured for production sinkneo4j.query.streaming-property indexedneo4j.start-from)npx claudepluginhub neo4j-contrib/neo4j-skills --plugin neo4j-skillsReads from and writes to Neo4j using Apache Spark or Databricks with the Neo4j Connector. Covers SparkSession setup, DataFrame reads/writes, partition tuning, and Delta Lake pipelines.
Set up end-to-end Change Data Capture (CDC) pipelines on Confluent Cloud using Debezium source connectors, Flink for transformation, and Tableflow for data lake integration. Supports JSON_SR, Avro, and Protobuf formats. Handles schemaless topics (plain JSON without SR) and multi-event topics. This skill handles the complete workflow from database to Iceberg/Delta tables. Use this skill when users want to capture database changes and materialize them into Iceberg or Delta Lake tables via Confluent Cloud Tableflow. Trigger phrases include "CDC to Tableflow", "database to Iceberg", "database to Delta Lake", "stream database changes to data lake", "set up Tableflow pipeline", "schemaless topic to Tableflow", or "multi-event topic to Iceberg". Do NOT trigger for general CDC, Debezium, or database replication requests that do not involve Tableflow or Iceberg/Delta Lake as the destination.
Creates and repairs Redpanda Connect pipeline configurations from natural language descriptions or broken configs. Delegates to component-search and bloblang-authoring skills.