From sui-dev-agents
Builds custom indexer pipelines for SUI blockchain data extraction, processing, and storage. Use for full historical event aggregation, custom analytics, real-time event streams, or when gRPC/GraphQL queries are insufficient.
How this skill is triggered — by the user, by Claude, or both
Slash command
/sui-dev-agents:sui-indexerThe summary Claude sees in its skill listing — used to decide when to auto-load this skill
**Build custom indexer pipelines for SUI blockchain data extraction and processing.**
Build custom indexer pipelines for SUI blockchain data extraction and processing.
The SUI Indexing Framework (sui-indexer-alt-framework) lets you build custom data pipelines that process blockchain checkpoints and write structured data to your own storage (typically PostgreSQL). Use this when gRPC/GraphQL queries are insufficient — e.g., you need full historical event aggregation, custom analytics, or real-time derived data.
When to use an indexer vs gRPC/GraphQL:
| Use Case | gRPC/GraphQL | Custom Indexer |
|---|---|---|
| Read current object state | ✓ | |
| Query recent events | ✓ | |
| Full historical event aggregation | ✓ | |
| Custom analytics / derived data | ✓ | |
| Real-time price feeds | ✓ | |
| Cross-object correlation at scale | ✓ |
Checkpoint Stream → Ingestion Client → Processor(s) → Store (PostgreSQL / custom)
↓
Service lifecycle
(start, shutdown, metrics)
Components:
As of Protocol 119, IngestionClientTrait::checkpoint() returns a CheckpointEnvelope containing both checkpoint data and chain identification:
/// Returned by IngestionClientTrait::checkpoint()
pub struct CheckpointEnvelope {
/// The full checkpoint data (transactions, effects, events, objects)
pub data: CheckpointData,
/// Chain identifier — full 32-byte Base58-encoded digest
pub chain_id: String,
}
Breaking change from Protocol 118: The method was renamed from fetch() to checkpoint() and the return type changed from CheckpointData to CheckpointEnvelope. Update existing indexers accordingly.
#[async_trait]
pub trait IngestionClientTrait: Send + Sync {
/// Fetch a checkpoint by sequence number (renamed from `fetch` in Protocol 119)
async fn checkpoint(&self, checkpoint: u64) -> Result<Arc<CheckpointEnvelope>>;
}
Built-in implementations:
StoreIngestionClient — reads from any object_store::ObjectStore (S3, GCS, local filesystem)#[async_trait]
pub trait Processor: Send + Sync + 'static {
/// Human-readable name for logging and metrics
const NAME: &'static str;
/// Process a single checkpoint envelope
async fn process(&self, envelope: &CheckpointEnvelope) -> Result<()>;
}
Example — Event indexer:
use sui_indexer_alt_framework::prelude::*;
struct MyEventProcessor {
db: PgPool,
}
#[async_trait]
impl Processor for MyEventProcessor {
const NAME: &'static str = "my-event-processor";
async fn process(&self, envelope: &CheckpointEnvelope) -> Result<()> {
let checkpoint = &envelope.data;
for tx in &checkpoint.transactions {
for event in &tx.events {
if event.type_.module == "my_module" {
sqlx::query("INSERT INTO events (tx_digest, type, data, checkpoint, chain_id) VALUES ($1, $2, $3, $4, $5)")
.bind(&tx.transaction.digest().to_string())
.bind(&event.type_.to_string())
.bind(&serde_json::to_value(&event.parsed_json)?)
.bind(checkpoint.checkpoint_summary.sequence_number as i64)
.bind(&envelope.chain_id)
.execute(&self.db)
.await?;
}
}
}
Ok(())
}
}
use sui_indexer_alt_framework::Service;
// Build and run the indexer service
let service = Service::builder()
.ingestion_client(store_client)
.add_processor(MyEventProcessor { db: pool.clone() })
.add_processor(MyObjectTracker { db: pool.clone() })
.build()
.await?;
// Run with clean shutdown handling
// Blocks until SIGINT/SIGTERM or fatal error
service.main().await?;
Key points:
Service replaces the old JoinHandle<()> pattern (breaking change from v1.63)service.main() for clean shutdown handling (responds to SIGINT/SIGTERM)cargo new my-indexer
cd my-indexer
Cargo.toml:
[package]
name = "my-indexer"
version = "0.1.0"
edition = "2021"
[dependencies]
sui-indexer-alt-framework = { git = "https://github.com/MystenLabs/sui.git", branch = "mainline" }
sqlx = { version = "0.8", features = ["runtime-tokio", "postgres"] }
tokio = { version = "1", features = ["full"] }
serde_json = "1"
anyhow = "1"
async-trait = "0.1"
See the Event indexer example in Core API section above.
use sui_indexer_alt_framework::{Service, StoreIngestionClient};
use sqlx::postgres::PgPoolOptions;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let pool = PgPoolOptions::new()
.max_connections(10)
.connect(&std::env::var("DATABASE_URL")?)
.await?;
let ingestion = StoreIngestionClient::new_remote(
"https://fullnode.testnet.sui.io:443".to_string(),
)?;
let service = Service::builder()
.ingestion_client(ingestion)
.add_processor(MyEventProcessor { db: pool.clone() })
.build()
.await?;
service.main().await
}
export DATABASE_URL="postgres://user:pass@localhost/my_indexer"
cargo run
Run multiple processors concurrently — each processes the same checkpoint stream independently:
let service = Service::builder()
.ingestion_client(ingestion)
.add_processor(EventProcessor { db: pool.clone() })
.add_processor(ObjectTracker { db: pool.clone() })
.add_processor(BalanceAggregator { db: pool.clone() })
.build()
.await?;
Each processor runs in its own task. Failures in one processor do not affect others. The service logs errors and continues.
For historical data, configure the starting checkpoint:
let service = Service::builder()
.ingestion_client(ingestion)
.add_processor(processor)
.start_checkpoint(0) // Start from genesis for full backfill
.build()
.await?;
Tips:
StoreIngestionClient pointed at a checkpoint archive (S3/GCS) — much faster than fetching from a full nodeSince Protocol 118, the framework uses Adaptive Concurrency Control instead of fixed FANOUT:
// Old (removed):
// const FANOUT: usize = 10;
// New: framework automatically scales concurrency based on throughput
// No configuration needed — the framework adapts to your processor speed
Both checkpoint_lag and checkpoint_buffer_size were removed in v1.71. Sequential pipelines now participate in the same adaptive ingestion concurrency system as concurrent pipelines.
Available knobs:
subscriber_channel_size — per-pipeline, under the pipeline's ingestion section. Defaults to max(num_cpus / 2, 4). Drives fetch concurrency via bounded-channel fill.pipeline-depth — new in v1.72: lets a sequential pipeline keep building batches while one is flushing.Upgrade note (v1.72):
rpc-indexDB version bumped to4. First start after upgrade triggers a full re-index of object history. Duration scales with object count — plan accordingly.
The framework exposes Prometheus metrics automatically:
let service = Service::builder()
.ingestion_client(ingestion)
.add_processor(processor)
.metrics_address("0.0.0.0:9184".parse()?)
.build()
.await?;
Key metrics:
indexer_checkpoint_processed_total — checkpoints processed per processorindexer_checkpoint_latency_seconds — processing time histogramindexer_ingestion_lag — how far behind the tip| Version | Change |
|---|---|
| v1.73 (Protocol 125) | Testnet v1.73.0 / mainnet P125 (v1.72.3+). JSON-RPC permanent deactivation 2026-07-31 — migrate indexer reads to gRPC / GraphQL before the cutoff |
| v1.72 (Protocol 124) | rpc-index DB v4 — first start re-indexes full object history; added pipeline-depth for sequential pipelines |
| v1.71 (Protocol 123) | checkpoint_lag / checkpoint_buffer_size removed; sequential pipelines use adaptive concurrency + subscriber_channel_size |
| v1.69.1 (Protocol 119) | IngestionClientTrait::fetch → checkpoint; returns CheckpointEnvelope with chain_id |
| v1.68 (Protocol 118) | Processor::FANOUT removed; Adaptive Concurrency Control replaces fixed workers |
| v1.65.2 (Protocol 111) | RemoteIngestionClient renamed to StoreIngestionClient; supports any ObjectStore |
| v1.63.3 (Protocol 107) | Indexer/ingestion services return Service instead of JoinHandle<()>; use Service::main() |
Provides behavioral guidelines to reduce common LLM coding mistakes, focusing on simplicity, surgical changes, assumption surfacing, and verifiable success criteria.
Searches, retrieves, and installs Agent Skills from prompts.chat registry using MCP tools like search_skills and get_skill. Activates for finding skills, browsing catalogs, or extending Claude.
npx claudepluginhub first-mover-tw/sui-dev-agents --plugin sui-dev-agents