From qa-concurrency
Build per-database MVCC isolation-level tests - Read Uncommitted vs Read Committed vs Repeatable Read vs Serializable; verify which anomalies are prevented at each level (dirty read, non-repeatable read, phantom read, serialization anomaly, write skew). Per PostgreSQL transaction isolation docs; analogous patterns for MySQL InnoDB, SQL Server, and DynamoDB.
How this skill is triggered — by the user, by Claude, or both
Slash command
/qa-concurrency:mvcc-isolation-testsThe summary Claude sees in its skill listing — used to decide when to auto-load this skill
Per the [PostgreSQL transaction isolation docs], the four standard
Per the PostgreSQL transaction isolation docs, the four standard SQL isolation levels permit different anomalies. Production code must run at the right level - and tests verify both that the chosen level prevents the anomalies the business cares about AND that the code copes when the level allows them.
Per the PostgreSQL transaction isolation docs:
| Anomaly | What |
|---|---|
| Dirty read | T2 reads uncommitted T1 changes |
| Non-repeatable read | T1 reads X = 5; T2 commits X = 6; T1 re-reads X = 6 |
| Phantom read | T1 query returns 3 rows; T2 inserts row matching predicate; T1 re-runs query, gets 4 rows |
| Serialization anomaly | Concurrent execution produces a result no serial order would |
| Write skew | T1 reads {x, y} = {0, 0}; T2 reads same; both decide based on read-set; both commit; final state inconsistent |
Per the PostgreSQL transaction isolation docs:
| Level | Dirty | Non-repeatable | Phantom | Serialization |
|---|---|---|---|---|
| Read Uncommitted (= Read Committed in PG) | NO | YES | YES | YES |
| Read Committed | NO | YES | YES | YES |
| Repeatable Read | NO | NO | NO (PG strict) | YES |
| Serializable | NO | NO | NO | NO |
PostgreSQL's Repeatable Read is stricter than SQL standard - it prevents phantoms via snapshot isolation. Other databases differ.
import psycopg
import threading
def two_connection_test(workload_a, workload_b, isolation="read committed"):
conn_a = psycopg.connect("dbname=test", autocommit=False)
conn_b = psycopg.connect("dbname=test", autocommit=False)
# Set isolation level for both
for c in (conn_a, conn_b):
with c.cursor() as cur:
cur.execute(f"SET TRANSACTION ISOLATION LEVEL {isolation}")
barrier = threading.Barrier(2)
results = {}
def run(conn, work, key):
barrier.wait()
results[key] = work(conn)
t_a = threading.Thread(target=run, args=(conn_a, workload_a, "a"))
t_b = threading.Thread(target=run, args=(conn_b, workload_b, "b"))
t_a.start(); t_b.start()
t_a.join(); t_b.join()
return results
def test_non_repeatable_read_under_read_committed():
setup_balance(account="acc1", balance=100)
def reader(conn):
with conn.cursor() as cur:
cur.execute("SELECT balance FROM accounts WHERE id = 'acc1'")
first = cur.fetchone()[0]
time.sleep(0.5) # let writer commit
cur.execute("SELECT balance FROM accounts WHERE id = 'acc1'")
second = cur.fetchone()[0]
conn.commit()
return (first, second)
def writer(conn):
with conn.cursor() as cur:
cur.execute("UPDATE accounts SET balance = 200 WHERE id = 'acc1'")
conn.commit()
return None
results = two_connection_test(reader, writer, isolation="read committed")
first, second = results["a"]
assert first == 100
assert second == 200, "Read Committed permits non-repeatable read"
def test_phantom_read_prevented_under_repeatable_read():
setup_orders(initial_count=3)
def reader(conn):
with conn.cursor() as cur:
cur.execute("SELECT COUNT(*) FROM orders WHERE status = 'pending'")
first_count = cur.fetchone()[0]
time.sleep(0.5)
cur.execute("SELECT COUNT(*) FROM orders WHERE status = 'pending'")
second_count = cur.fetchone()[0]
conn.commit()
return (first_count, second_count)
def writer(conn):
with conn.cursor() as cur:
cur.execute("INSERT INTO orders (status) VALUES ('pending')")
conn.commit()
return None
results = two_connection_test(reader, writer, isolation="repeatable read")
first, second = results["a"]
assert first == second == 3, "PG Repeatable Read should prevent phantom"
Per the PostgreSQL transaction isolation docs, write skew is a serialization anomaly that Repeatable Read does NOT prevent:
def test_write_skew_under_repeatable_read():
"""Two doctors on call; rule: at least one must remain on call.
T1 reads {alice: on, bob: on}; decides safe to take alice off.
T2 reads same; decides safe to take bob off.
Both commit. Now nobody on call → anomaly."""
setup_doctors([("alice", "on_call"), ("bob", "on_call")])
def take_alice_off(conn):
with conn.cursor() as cur:
cur.execute("SELECT COUNT(*) FROM doctors WHERE status = 'on_call'")
on_call_count = cur.fetchone()[0]
if on_call_count >= 2:
cur.execute("UPDATE doctors SET status = 'off' WHERE name = 'alice'")
conn.commit()
def take_bob_off(conn):
with conn.cursor() as cur:
cur.execute("SELECT COUNT(*) FROM doctors WHERE status = 'on_call'")
on_call_count = cur.fetchone()[0]
if on_call_count >= 2:
cur.execute("UPDATE doctors SET status = 'off' WHERE name = 'bob'")
conn.commit()
two_connection_test(take_alice_off, take_bob_off, isolation="repeatable read")
on_call_after = count_on_call()
# Under Repeatable Read, write skew possible → 0 on call (anomaly)
# Under Serializable, one transaction would error
if isolation == "repeatable read":
assert on_call_after in (0, 1) # demonstrate anomaly OR safe outcome
def test_serializable_aborts_one_of_skew_pair():
setup_doctors([("alice", "on_call"), ("bob", "on_call")])
aborted = [0]
def take_off(name, conn):
try:
with conn.cursor() as cur:
cur.execute("SELECT COUNT(*) FROM doctors WHERE status = 'on_call'")
if cur.fetchone()[0] >= 2:
cur.execute("UPDATE doctors SET status = 'off' WHERE name = %s", (name,))
conn.commit()
except psycopg.errors.SerializationFailure:
conn.rollback()
aborted[0] += 1
two_connection_test(
lambda c: take_off("alice", c),
lambda c: take_off("bob", c),
isolation="serializable",
)
assert aborted[0] >= 1, "Serializable must abort at least one"
assert count_on_call() >= 1, "At least one doctor still on call"
Per the PostgreSQL transaction isolation docs: Serializable raises
could not serialize access due to read/write dependencies among transactions - application must catch + retry.
| DB | Default | RR prevents phantoms? | Serializable cost |
|---|---|---|---|
| PostgreSQL | Read Committed | Yes (snapshot isolation) | Predicate locks; can be expensive |
| MySQL InnoDB | Repeatable Read | Yes (gap locks) | Default RR is "good enough" for most |
| SQL Server | Read Committed | No (default), Yes with READ_COMMITTED_SNAPSHOT | Snapshot isolation available |
| Oracle | Read Committed | Yes via Read Consistency | Serializable via lock |
| DynamoDB | Eventually consistent reads | Strongly consistent reads opt-in | Transactions: ACID with TransactWriteItems |
| MongoDB | Snapshot read concern | Multi-doc txn (4.0+) ACID | Limit on doc size (16MB / txn) |
Test the actual DB you ship with - defaults differ.
def with_serializable_retry(fn, max_retries=3):
for attempt in range(max_retries):
try:
with conn.cursor() as cur:
cur.execute("SET TRANSACTION ISOLATION LEVEL SERIALIZABLE")
result = fn()
conn.commit()
return result
except psycopg.errors.SerializationFailure:
conn.rollback()
if attempt == max_retries - 1:
raise
time.sleep(0.01 * (2 ** attempt))
Test the retry logic itself:
def test_serializable_retry_completes():
# Simulate concurrent conflict; verify retry succeeds eventually
...
| Anti-pattern | Why it fails | Fix |
|---|---|---|
| Run at default isolation, hope for the best | Anomaly hits in prod | Choose level deliberately + test |
| Test only happy-path single-transaction | Concurrency bugs need 2-conn harness | Step 3 harness |
| Use Serializable everywhere "to be safe" | Performance cliff; retries swamp | Test perf at chosen level |
| Skip retry logic for Serializable | Production transaction errors leak | Step 9 retry pattern |
| Test PG and assume MySQL behaves same | RR semantics differ (gap locks vs snapshot) | Per-DB tests (Step 8) |
race-condition-test-author,
deadlock-detection-harness -
in-process concurrency siblingsjepsen-patterns - distributed
consistency analoguenpx claudepluginhub testland/qa --plugin qa-concurrencyProvides CDSS development patterns for drug interaction checking, dose validation, clinical scoring (NEWS2, qSOFA), and alert classification integrated into EMR workflows.