From sparq
Cost-based federated SPARQL source selection + bind-vs-hash join planning over already-fetched source descriptors, plus an ANAPSID-style non-blocking streaming join with operator spill, via the opt-in sparq-fedplan crate. Use when planning a federated BGP across multiple SPARQL endpoints from their served statistics (VoID property/class partitions + mined scs: characteristic sets): deciding which sources can contribute to each triple pattern (HiBISCuS recall-safe pruning + CostFed skew-aware cardinality), choosing a join order with per-join bind-vs-hash-vs-streaming algorithm selection (characteristic-set star cardinality for intermediate sizes), and executing a memory-bounded non-blocking symmetric hash join over incrementally-arriving sub-results (StreamJoin, spill to a backing store, result multiset-equal to a blocking join). Pure + deterministic planning, no network I/O. Off by default; does not touch sparq-core/sparq-engine's lean build. Also covers live adaptive RE-planning at stage boundaries (mid-execution plan switching when observed cardinalities diverge from estimates, or when a source is observed to be slow — per-source EWMA-smoothed latency is folded into the cost model as a documented heuristic bias toward faster sources, smoothing out transient spikes) via the further opt-in adaptive-replan feature (AdaptiveExecutor) — sound because BGP join is order-independent (latency changes ordering/cost only, never results), with mid-operator swap + live source failover deferred.
How this skill is triggered — by the user, by Claude, or both
Slash command
/sparq:federated-planningThe summary Claude sees in its skill listing — used to decide when to auto-load this skill
`sparq-fedplan` plans a federated SPARQL **Basic Graph Pattern** (BGP) across several
sparq-fedplan plans a federated SPARQL Basic Graph Pattern (BGP) across several
remote endpoints from statistics already in hand — it never contacts the network. A
caller fetches each source's descriptor once (the W3C VoID document a sparq-server
serves at /.well-known/void, including the mined scs: characteristic sets), and the
planner decides, deterministically:
It is the opt-in public surface for cost-based federation planning. Add it
explicitly and enable the fedplan feature; it is not in sparq's default build
(sparq-core/sparq-engine stay lean, the wasm artifact is unchanged unless you pull it
in). There is no sparq-core/sparq-engine dependency — it plans over descriptors only.
[dependencies]
sparq-fedplan = { path = "crates/sparq-fedplan", features = ["fedplan"] }
oxrdf = { version = "0.3", features = ["rdf-12"] }
The whole planner is behind the fedplan feature (off by default), so even a crate that
depends on sparq-fedplan pays nothing for it unless the feature is enabled.
Either programmatically via the builder, or by parsing the served N-Triples document.
// `SourceDescriptorBuilder` is the (public, nameable) return type of `.builder(..)`; you only
// need to import it if you hold the builder in a `let` rather than chaining straight to `.build()`.
use sparq_fedplan::{SourceDescriptor, SourceDescriptorBuilder, SourceId, PredPartition, ClassPartition};
// Programmatic (VoID property/class partitions).
let src = SourceDescriptor::builder(SourceId::new("https://a.example/sparql"))
.total_triples(10_000)
.predicate(PredPartition { predicate: "http://xmlns.com/foaf/0.1/knows".into(),
triples: 2000, distinct_subjects: 1000, distinct_objects: 1800 })
.class(ClassPartition { class: "http://xmlns.com/foaf/0.1/Person".into(), entities: 1000 })
.build();
// Or parse the served descriptor (the /.well-known/void N-Triples form, with scs: sets):
let parsed = SourceDescriptor::from_void_nt(SourceId::new("https://b.example/sparql"), nt)?;
A descriptor parsed from VoID partitions is authority-incomplete (it sees only
predicate/class authorities, never subject/object instance authorities), so
subject/object authority-pruning is disabled for it — recall-safe by construction. To
enable authority pruning, build via .builder(..) and call .authorities_complete()
only when you truly enumerate every authority the source mints (a HiBISCuS-style
capability set / void:uriSpace declaration).
use sparq_fedplan::{Bgp, TriplePattern, Term, Var, select_sources};
let bgp = Bgp::new(vec![
TriplePattern::new(Term::Var(Var::new("s")),
Term::Iri("http://xmlns.com/foaf/0.1/knows".into()), Term::Var(Var::new("o"))),
]);
let sources = [src];
let selection = select_sources(&bgp, &sources);
// selection[i].candidates: the sources retained for pattern i, with estimated_cardinality.
Recall-safety invariant: a source is pruned for a pattern only when the descriptor proves it holds no matching triple. A bound predicate absent from the source's (complete) predicate-partition set prunes; a bound class absent from a declared class section prunes; a bound subject/object whose authority is absent prunes only when the authority set is complete. On any uncertainty — open predicate, incomplete authority set, absent class section — the source is kept. The cardinality estimate never prunes (a source with a tiny or zero estimate is still retained). This is HiBISCuS's design goal: maximise pruning subject to never losing a result.
use sparq_fedplan::{plan_bgp, PlanOptions, JoinAlgo, JoinNode};
let plan = plan_bgp(&bgp, &selection, &sources, &PlanOptions::default()).unwrap();
let order: Vec<usize> = plan.join_order(); // patterns in join order (left-deep)
let cost: f64 = plan.total_cost;
Each binary join is a bind join (cost ≈ L·(req + fan_out) — probe the right with the
left's bindings; cheap when the left is small and the right selective) or a hash /
symmetric join (cost ≈ R + L — scan both sides once; cheap when the left is large or
the right unselective). The decision flips as the left intermediate grows past the point
where per-row requests overtake a full scan; tune the round-trip penalty with
PlanOptions::request_cost. Star arms (?s p₁ ?a . ?s p₂ ?b) use characteristic-set
cardinality (Σ_{C⊇Q} count(C)·Π avg_mult) for intermediate sizes, capturing the
predicate correlation an independence product loses.
StreamJoin, sq-vf7q)The planner's JoinAlgo::Streaming choice corresponds to an execution-side operator:
StreamJoin, an ANAPSID/XJoin-style symmetric hash join over two
incrementally-arriving tuple streams. Feed tuples from either side as federated sub-results
arrive; each push returns the results that arrival newly completes — it never blocks on
either input finishing.
use sparq_fedplan::{StreamJoin, StreamJoinOptions, SpillStore, Tuple, Var, blocking_hash_join};
let opts = StreamJoinOptions { mem_budget_tuples: 100_000, spill_store: SpillStore::TempFile };
let mut join = StreamJoin::new([Var::new("s")], opts);
let _ = join.push_left(Tuple::new([(Var::new("s"), "a".into()), (Var::new("o"), "1".into())]));
let out = join.push_right(Tuple::new([(Var::new("s"), "a".into()), (Var::new("n"), "x".into())]));
assert_eq!(out.len(), 1); // emitted the moment both sides hold key s=a — non-blocking.
Bounded spill. Memory is capped at mem_budget_tuples; when an insert would exceed it,
the largest in-memory join-key partition is spilled to a backing run (a temp file under
std::env::temp_dir by default — std only, no new dependency; SpillStore::Memory is an
in-process simulation for tests). Spilling only relocates tuples; the probe consults both
the live bucket and every spilled run for the key.
Correctness invariant (load-bearing). The streamed + spilled result is multiset-equal
to blocking_hash_join(left, right, join_vars) — same tuples, no loss, no duplication — for
any stream interleaving and any budget (including one so low every partition spills).
Each matching pair (l, r) is emitted exactly once, when the second of the two arrives and
probes the other side (found in memory or a spill run). Proven by the streamed_equals_*,
spill_path_equals_*, duplicate_keys_*, and emits_before_inputs_exhausted tests.
The planner picks Streaming over plain Hash when a hash-class join's combined estimated
inputs L + R exceed PlanOptions::stream_threshold (default 100 000 rows; set to
f64::INFINITY to always use plain hash) — large joins run non-blocking + spillable rather
than materialising a side up front.
adaptive-replan feature, sq-7s4z)Behind the off-by-default adaptive-replan cargo feature (which implies fedplan), the
crate adds the reactive half of ANAPSID adaptivity: mid-execution plan switching. A build
that does not enable the feature compiles zero adaptive code (#[cfg]-gated out), so the
lean default build and the fedplan-only build are byte-unchanged.
AdaptiveExecutor models execution as a sequence of stages (the left-deep join order)
and holds, at all times, the patterns already joined (the prefix) and the patterns still
to join (the suffix).
RuntimeStats records the observed per-pattern leaf cardinality (real row
counts the sources returned) and per-source latency, fed in as each stage completes. Latency
is EWMA-smoothed per source (below) so the cost model + trigger track the trend, not the
last raw sample.maybe_replan(&stats) checks whether a
not-yet-executed pattern's observed cardinality o diverges from its estimate e past
ReplanPolicy::divergence_factor k either way (o > k·e or e > k·o; default k = 4),
or (sq-b51o) a not-yet-executed pattern's slowest source is observed at more than k×
the latency baseline. If so it re-invokes the cost model on the remaining patterns with
the observed cardinalities substituted in (corrected_selection) and the join costs
latency-weighted (below). Source membership is never re-pruned — only the order changes —
so HiBISCuS recall-safety is preserved.factor = clamp(1 + latency_weight·(s − 1), latency_floor, latency_cap) where
s = slowest_ewma_source_latency / latency_baseline over the pattern's retained sources.
A source at baseline — or with no observation — yields factor = 1.0, so a re-plan with
no latency data is byte-identical to the cardinality-only planner; a 2×-slow source costs
1.5× at the default latency_weight = 0.5, with latency_cap (4.0) bounding any one
outlier. The constants (latency_weight 0.5, latency_baseline 100, latency_floor 0.5,
latency_cap 4.0) are hand-tuned, not derived — a deliberately gentle bias toward
faster sources / deferring a slow one, not a claim to compute the latency-optimal plan.
Latency enters only the cost term (and the suffix-selection score), never the output
cardinality, so results are unchanged. latency_weight = 0 disables it.record_source_latency folds each sample in as ewma = α·observed + (1−α)·prev
(first sample seeds it), with α = RuntimeStats::latency_alpha, default
DEFAULT_LATENCY_ALPHA = 0.3 — a hand-picked factor (latest 30% / history 70%), not
workload-derived. This is the cleaner anti-thrash than a bare last-sample-plus-clamp: a
single transient spike does not move the average over the trigger band, but a sustained
shift converges past it in a few samples. The latency_floor/latency_cap clamp is kept as a
final guard. RuntimeStats::with_latency_alpha(α) overrides (α = 1.0 ⇒ un-smoothed
last-sample behaviour); higher α = faster-tracking/twitchier, lower α = calmer/laggier.ReplanPolicy::improvement_margin (default 10%), with a hard max_replans budget
(default 8). Stable-but-noisy stats — including jittery latency — never thrash;
maybe_replan returns ReplanOutcome::{NoDivergence, KeptWithinHysteresis, Switched, BudgetExhausted}.Soundness boundary (load-bearing). Re-planning reorders only the not-yet-started
suffix — it is NOT a mid-operator swap (an in-flight join is never torn down). A BGP
answer is the natural join of the per-pattern solution multisets, which is commutative and
associative: any order over the same patterns yields the same result multiset, and the
already-produced prefix is carried across the switch unchanged (no binding lost or
duplicated). The latency weighting does not move this boundary — it touches only
cost/ordering, never the output cardinality or the pattern set, so a latency-driven reorder is
the same kind of pure suffix permutation; the EWMA smoothing changes only when the latency
path fires, never this boundary — re-planning stays a sub-query / stage-boundary reorder,
never mid-operator. Proven by adaptive::tests::replan_result_equals_static
(cardinality-driven), latency_replan_result_equals_static (latency-driven) and
ewma_replan_result_equals_static (EWMA-smoothed-latency-driven) — each genuinely flips the
order yet yields the identical multiset to the static plan — plus an exhaustive
all-permutations order-independence test.
sparq-fedclient, sq-dnko)sparq-fedplan is the planning brain with no consumer — it plans, but nothing
fetches descriptors or issues a query. The consumer is a separate opt-in crate,
sparq-fedclient (epic sq-dnko / sq-3183, design research/federation-client-design.md):
the streaming federation client that discovers each remote source's capability, lowers
a query BGP into this crate's Bgp, calls select_sources + plan_bgp, interprets the
resulting JoinTree into physical operators (Bind → VALUES bind-join, Hash/Streaming →
the StreamJoin above, Local → sparq-engine eval), and streams results back. It REUSES
this planner and the engine's service SRJ transport + SSRF guard; the dependency arrow
points one-way into the engine.
sparq-fedclient started as the Phase-0 skeleton (sq-s1uy) — the public module layout
(source / discovery / planner / pushdown / operators / stream) behind a
default-OFF fedclient feature, plus the load-bearing dependency-boundary proof
(sparq-core/sparq-engine have no edge to it, enforced by
scripts/fedclient-boundary-guard.sh + crates/sparq-fedclient/tests/boundary.rs). Landed
since:
sq-nfxl) — Service-Description parser + VoID/scs: reuse + an
SSRF-guarded fetch seam + ASK-probe fallback → a Capability (+ optional
SourceDescriptor for this planner).sq-rsxf) — the Endpoint adapter over the engine's
transport seam behind a default-deny SSRF guard.sq-j27p) — the consumer of THIS
planner's JoinTree. The plan speaks pattern/source indices only (no endpoint-URL
mapping — the Phase-0 finding); sparq_fedclient::SourceResolver is the index → adapter
resolution layer that maps a plan pattern: usize → TriplePattern and a source: usize
→ a source adapter (the resolver requires the descriptors/adapters slices to be in the
same order, and range-checks every lookup). lower_leaf lowers one BGP pattern to a
single-pattern SELECT; materialize_single_source walks the JoinTree, fetches each
leaf's SRJ through the Phase-2 adapter, parses it, and natural-joins in the plan's join
order. The load-bearing property — the materialised federated result equals local
sparq-engine evaluation of the same query (solutions_equal bag comparison) — is driven
end-to-end in tests/planner_result_equals_local_eval.rs. The interpreter is single-source
InterpError::MultiSource.sq-7byx) — the pushdown module decides the most
precise sub-query each source is asked. exclusive_groups(selection, bgp) derives the FedX
exclusive groups (maximal connected sub-patterns whose only retained source is one
member — exactly-one-source, same-source, share-a-variable, via union-find); a cross-source
or zero-source pattern is excluded. push_group(...) builds the maximal sub-algebra per
group: projection trimmed to the join + output vars, the FILTER conjuncts the source's
FilterClass covers AND that pass the common-variable check, ORDER/LIMIT when the
capability allows — a full endpoint gets the whole group as one multi-pattern SELECT, a
fragment source answers one pattern only (no collapse, no filter pushed — honest about a
fragment server's access unit). common_variable_check(filter, group_vars) is the exact
check Comunica omits (#834/#609): push a conjunct only when every variable it references is
bound by the group. render_values_block / bind_block_size are the cross-group bind-join
block primitive (VALUES for endpoints — DEFAULT_BIND_BLOCK; maxMpR for brTPF; none for
plain TPF), mirroring sparq-engine's pub(crate) service.rs helpers. Pushdown only ever
narrows a source's result, so it is correctness-preserving; the FILTER model is light
(the parsed-query FILTER algebra wiring is Phase 5).sq-vtba) — the streaming counterpart of the Phase-3
interpreter, built ON THIS crate's non-blocking StreamJoin. sparq_fedclient::stream's
SolutionStream is a bounded, backpressured Iterator over a std::sync::mpsc::sync_channel
(the channel bound IS the backpressure); operators::ScatterPool is a bounded blocking
thread-pool over the blocking transport — the ASYNC/RUNTIME decision (no async runtime is
pulled in; all concurrency is std-only and confined to the opt-in crate). StreamingJoin
drives THIS crate's StreamJoin over two SolutionStreams, bridging oxrdf::Term rows into
the Tuple model losslessly via the term's canonical N-Triples form (Term::Display ↔
Term::from_str). stream_single_source walks the same JoinTree, fans each leaf's blocking
fetch onto the pool, and chains the leaves through streaming joins so results EMIT before the
inputs are exhausted. The load-bearing invariant — the streamed multiset is multiset-equal
to the Phase-3 materialised result for any source-arrival interleaving (and both equal
local eval) — is driven on the real engine path under injected per-leaf delays + a forced spill
in tests/streaming_result_equals_phase3.rs. Multi-source UNION-per-leaf and the pushed-down
bind-join (VALUES/maxMpR) remain deferred; a bind-classified join runs as the same streaming
symmetric hash join (identical result multiset).sq-2qze): source::TpfSource (plain TPF —
materialise a fragment to exhaustion, no bind-join) and source::BrTpfSource
(bindings-restricted — push maxMpR-bounded binding blocks per request, the standardised
brTPF bind-join), both over the FragmentTransport seam, with count-metadata
(hydra:totalItems) cardinality surfaced as a one-pattern SourceDescriptor for this
planner. The fragment adapters answer one triple pattern completely and return typed
FragBindings via solutions(...) (a fragment server speaks triples, not
SPARQL-Results-JSON, so their FederatedSource::execute is a deliberate Unsupported that
points at solutions).
FragmentTransport + interpreter wiring (sq-yzca):
source::HttpFragmentTransport (native-only — ureq behind the SAME default-deny SSRF
resolver as the SRJ HttpTransport) is the production seam. It serialises a FragPattern
into the Hydra TPF query string (?subject=&predicate=&object=, percent-encoded N-Triples
terms), attaches a brTPF binding block as the values parameter (the server's text wire),
follows the opaque hydra:next page URL to exhaustion, and parses the Turtle/TriG body
(oxttl TriGParser, a Turtle superset) — splitting Hydra/VoID control triples
(hydra:totalItems/void:triples → count; hydra:next → next link) from data triples
(kept only when they match the requested pattern). The operators interpreter is wired:
fetch_leaf_relation dispatches on source_type(), routing an endpoint/local leaf to the
SRJ execute path and a TPF/brTPF leaf to the typed solutions path (lowered via
planner::lower_leaf_fragment), converting FragBinding rows back to oxrdf::Term so a
fragment leaf equi-joins with an endpoint leaf. A brTPF leaf currently runs as a complete
unbound scan the interpreter hash-joins locally (the same discipline the Phase-3 interpreter
applies to JoinAlgo::Bind); the streamed per-block bind-join feeder is a later phase.wire module, sq-6ihg, follow-up to the
server's sq-dxhb): the brTPF bind-join attaches a SET of upstream solution mappings (a
&[FragBinding] block, at most maxMpR) to each fragment request, and that block is
re-sent on every request of a bind nested-loop join. The sparq server parses it from a
line-oriented TEXT wire — one mapping per line, space-separated position=term pairs,
each term fully N-Triples-decorated — which is readable but verbose: it repeats the
s=/p=/o= key and the <…>/"…"/^^<…> framing on every term. The wire module
adds the compact, self-describing BINARY mapping wire the bead asks for
(encode_bindings / decode_bindings) the client can emit instead, plus the text-wire
writer (encode_bindings_text) so a client can speak EITHER form over the same
FragBinding model (the server already parses the text one). The binary form's twofold
compactness win: a 1-byte per-mapping header bitmask records which of s/p/o the
mapping binds, so a position term carries no name bytes (the header bit IS the name),
and a 1-byte kind tag distinguishes IRI / blank / literal so the bare lexical bytes follow
length-prefixed with no <>/"" framing. A binding over an arbitrary (non-position)
variable name still round-trips losslessly via an overflow EXTRA section, and the binary
wire carries literals with embedded =, whitespace, or newlines that the one-mapping-
per-line text wire cannot represent (the text writer drops a non-position variable — it
has no brTPF slot). The container leads with a 4-byte magic (BINARY_MAGIC, ASCII bTPF)
BINARY_VERSION so a future revision is detectable, and decode_bindings
validates every length against the remaining input, so a truncated / bad-magic / bad-
version / bad-kind / varint-overflow buffer is a clean WireError, never a panic or OOB
read (the crate is forbid(unsafe_code)). Position keys decode in a deterministic
canonical s→p→o order, and the empty mapping μ₀ is skipped on encode (it does not
restrict a fragment) exactly as the server's parse_bindings skips an all-blank line.
HONEST scope — a codec only: it converts &[FragBinding] ↔ bytes / String; it
issues no request. The native HTTP FragmentTransport (HttpFragmentTransport, sq-yzca,
above) attaches the text form on the values query parameter (the carrier the server
reads); this binary wire is the compact alternative a body-carrying transport emits.sq-ij5x, the FINAL phase) — the client-side ANAPSID
feedback loop, behind the extra default-OFF fedclient-adaptive feature (which pulls
this planner's adaptive-replan). adaptive::execute_adaptive_single_source runs the plan
as a leaf-scan phase (fetch each leaf once through the real adapter, record its REAL observed
row count into RuntimeStats) followed by an adaptive join-ordering phase that drives this
crate's AdaptiveExecutor: at each operator boundary it re-invokes the planner on the
unjoined remainder when an observation diverges past divergence_factor, adopting the
cheaper suffix only when it clears the hysteresis margin — at most once per boundary, no
thrash. The re-plan DECISION engine is this planner's AdaptiveExecutor (the client does not
re-write it); the client supplies real observed cardinalities and joins the re-ordered suffix
with the SAME materialised natural_join. Re-planning changes the plan, never the answer:
tests/adaptive_result_equals_static.rs asserts the adaptive result equals both the static
interpreter and ground-truth local engine eval across a genuine large-divergence switch.With Phase 7 the 8-phase streaming federation client is feature-complete (Phases 0–7 all
landed; epic sq-dnko closed). Still ahead as future beads under epic sq-3183: multi-source
UNION-per-leaf fan-out, the pushed-down streaming bind-join, and the ANAPSID "adaptive operator"
refinement (estimate a leaf's cardinality from a prefix of its rows while still streaming it).
See crates/sparq-fedclient/README.md.
Mid-operator adaptivity (tearing down a join while it is producing output and resuming its half-built hash tables under a new algorithm) and live source failover (switching to a replica mid-stage when a source goes dark — observed latency now biases the join order toward faster sources, but hard failover needs the live multi-source execution layer this pure crate does not own) are out of scope. Filed as roadmap beads under epic sq-3183.
[OPUS-4.8] sq-a35t / sq-vf7q / sq-7s4z / sq-b51o / sq-7byx — flag for Fable re-review.
Creates, edits, and optimizes skills for Claude Code, including drafting, evaluating with test prompts, iterating on performance, and improving skill descriptions for better triggering accuracy.
npx claudepluginhub jeswr/sparq --plugin sparq