From ccfg-sqlite
This skill should be used when embedding SQLite in applications, managing concurrent access, implementing backup strategies, or using libSQL/Turso.
How this skill is triggered — by the user, by Claude, or both
Slash command
/ccfg-sqlite:embedded-patternsThe summary Claude sees in its skill listing — used to decide when to auto-load this skill
Patterns and best practices for embedding SQLite in applications across mobile, desktop, serverless,
Patterns and best practices for embedding SQLite in applications across mobile, desktop, serverless, and edge computing platforms. This skill covers connection management, concurrent access, backup strategies, migration handling, and platform-specific considerations for production SQLite deployments.
When working with existing applications that use SQLite, respect the established patterns:
SQLite allows only one writer at a time. The recommended pattern is:
# CORRECT: Writer + reader pattern (Python)
class Database:
def __init__(self, path):
self.path = path
self._writer = self._open_connection()
self._writer.execute("PRAGMA journal_mode = WAL")
self._readers = threading.local()
def _open_connection(self):
conn = sqlite3.connect(self.path)
conn.execute("PRAGMA foreign_keys = ON")
conn.execute("PRAGMA busy_timeout = 5000")
conn.execute("PRAGMA synchronous = NORMAL")
conn.execute("PRAGMA cache_size = -64000")
conn.execute("PRAGMA temp_store = MEMORY")
return conn
def read(self, sql, params=()):
if not hasattr(self._readers, 'conn'):
self._readers.conn = self._open_connection()
return self._readers.conn.execute(sql, params).fetchall()
def write(self, sql, params=()):
with self._writer_lock:
self._writer.execute(sql, params)
self._writer.commit()
# WRONG: Single connection shared across threads
db = sqlite3.connect('app.db')
# Thread 1: db.execute("INSERT ...")
# Thread 2: db.execute("SELECT ...")
# Data corruption, SQLITE_BUSY errors, undefined behavior
# WRONG: New connection per query (no PRAGMA reuse)
def get_user(user_id):
conn = sqlite3.connect('app.db') # No PRAGMAs configured!
result = conn.execute("SELECT * FROM users WHERE id = ?", (user_id,))
conn.close() # Connection overhead on every query
return result.fetchone()
| Component | Count | Purpose |
|---|---|---|
| Writer | 1 | All write operations |
| Readers | 1-4 | Concurrent read operations |
| Total | 2-5 | Keep total under 10 |
Mobile apps: 1 writer + 1-2 readers (limited concurrency). Web servers: 1 writer + 1 reader per worker thread. CLI tools: Single connection usually sufficient.
// CORRECT: Separate writer and reader pools
func Open(path string) (*sql.DB, *sql.DB, error) {
dsn := path + "?_journal_mode=WAL&_foreign_keys=ON&_busy_timeout=5000&_synchronous=NORMAL"
writer, err := sql.Open("sqlite3", dsn)
if err != nil {
return nil, nil, err
}
writer.SetMaxOpenConns(1)
reader, err := sql.Open("sqlite3", dsn+"&mode=ro")
if err != nil {
writer.Close()
return nil, nil, err
}
reader.SetMaxOpenConns(4)
return writer, reader, nil
}
// CORRECT: better-sqlite3 (synchronous, recommended for Node.js)
const Database = require('better-sqlite3');
const db = new Database('app.db');
db.pragma('journal_mode = WAL');
db.pragma('foreign_keys = ON');
db.pragma('busy_timeout = 5000');
db.pragma('synchronous = NORMAL');
db.pragma('cache_size = -64000');
// Prepared statements for performance
const getUser = db.prepare('SELECT * FROM users WHERE id = ?');
const insertUser = db.prepare('INSERT INTO users (name, email) VALUES (@name, @email)');
// Transaction helper (automatic rollback on error)
const bulkInsert = db.transaction((users) => {
for (const user of users) insertUser.run(user);
});
// WRONG: Async sqlite3 package without proper error handling
const sqlite3 = require('sqlite3');
const db = new sqlite3.Database('app.db');
// No PRAGMA configuration, callback hell, poor error handling
With WAL mode enabled:
| Operation | Blocks Writers? | Blocks Readers? |
|---|---|---|
| Reader | No | No |
| Writer | No | No |
| Writer | Yes (other writers) | No |
busy_timeout.Even with busy_timeout, applications should handle SQLITE_BUSY:
# CORRECT: Retry with backoff
import time
def execute_with_retry(conn, sql, params=(), max_retries=3):
for attempt in range(max_retries):
try:
result = conn.execute(sql, params)
conn.commit()
return result
except sqlite3.OperationalError as e:
if "database is locked" in str(e) and attempt < max_retries - 1:
time.sleep(0.1 * (attempt + 1))
continue
raise
When many threads need to write, funnel through a write queue:
# CORRECT: Write queue pattern
import queue
import threading
class WriteQueue:
def __init__(self, db_path):
self._queue = queue.Queue()
self._conn = sqlite3.connect(db_path)
# ... configure PRAGMAs ...
self._thread = threading.Thread(target=self._process, daemon=True)
self._thread.start()
def _process(self):
while True:
batch = [self._queue.get()]
# Drain queue for batching
while not self._queue.empty() and len(batch) < 100:
try:
batch.append(self._queue.get_nowait())
except queue.Empty:
break
self._conn.execute("BEGIN IMMEDIATE")
for sql, params, future in batch:
try:
result = self._conn.execute(sql, params)
future.set_result(result.lastrowid)
except Exception as e:
future.set_exception(e)
self._conn.execute("COMMIT")
-- CORRECT: Acquire write lock immediately
BEGIN IMMEDIATE;
INSERT INTO orders (customer_id, total) VALUES (42, 5000);
UPDATE inventory SET quantity = quantity - 1 WHERE product_id = 7;
COMMIT;
-- WRONG: Deferred BEGIN for write transactions
BEGIN; -- Doesn't acquire write lock yet
SELECT * FROM orders; -- Read succeeds
INSERT INTO orders ...; -- May get SQLITE_BUSY here mid-transaction!
COMMIT;
-- Use BEGIN IMMEDIATE for write transactions to fail fast at BEGIN.
Safe hot backup while the database is in use:
# CORRECT: Online Backup API
def backup(source_path, backup_path):
source = sqlite3.connect(source_path)
dest = sqlite3.connect(backup_path)
with dest:
source.backup(dest, pages=100, progress=lambda s, r, t: None)
dest.close()
source.close()
Creates a compacted copy:
-- CORRECT: Compacted backup
VACUUM INTO '/backups/app_2024_03_15.db';
-- Original database unchanged, backup is defragmented
For simpler needs:
# CORRECT: Checkpoint then file copy
conn = sqlite3.connect(db_path)
conn.execute("PRAGMA wal_checkpoint(TRUNCATE)") # Flush WAL to main file
conn.close()
shutil.copy2(db_path, backup_path) # Safe to copy just .db file
# WRONG: Copy without checkpoint
shutil.copy2('app.db', 'backup.db')
# With WAL mode, the .db file may not contain recent writes.
# The -wal and -shm files are also needed. Always checkpoint first.
# CORRECT: Startup migration pattern
def migrate(db_path, migrations_dir):
conn = sqlite3.connect(db_path)
conn.execute("PRAGMA journal_mode = WAL")
conn.execute("PRAGMA foreign_keys = ON")
current_version = conn.execute("PRAGMA user_version").fetchone()[0]
for migration_file in sorted(glob.glob(f"{migrations_dir}/*.up.sql")):
version = int(os.path.basename(migration_file).split("_")[0])
if version <= current_version:
continue
with open(migration_file) as f:
conn.executescript(f.read())
conn.execute(f"PRAGMA user_version = {version}")
conn.commit()
conn.close()
CREATE TABLE IF NOT EXISTS, check column existence before ALTER.PRAGMA user_version or a migrations table.-- CORRECT: Idempotent migration
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY,
name TEXT NOT NULL,
email TEXT NOT NULL UNIQUE
) STRICT;
-- Adding a column (check existence in application code first)
-- SELECT COUNT(*) FROM pragma_table_info('users') WHERE name = 'phone';
-- If 0: ALTER TABLE users ADD COLUMN phone TEXT;
-- WRONG: Non-idempotent migration
CREATE TABLE users (...); -- Fails if exists
ALTER TABLE users ADD COLUMN phone TEXT; -- Fails if column exists
libSQL embedded replicas provide local reads with remote writes:
// CORRECT: Turso embedded replica
import { createClient } from '@libsql/client';
const db = createClient({
url: 'file:local-replica.db', // Local for fast reads
syncUrl: 'libsql://your-db.turso.io', // Remote for replication
authToken: process.env.TURSO_AUTH_TOKEN,
syncInterval: 60, // Seconds between syncs
});
// Reads hit local replica (fast, no network)
const users = await db.execute('SELECT * FROM users WHERE id = ?', [userId]);
// Writes go to remote, replicate back
await db.execute('INSERT INTO users (name) VALUES (?)', [name]);
// Manual sync
await db.sync();
// Remote-only (serverless)
const remote = createClient({
url: 'libsql://your-db.turso.io',
authToken: process.env.TURSO_AUTH_TOKEN,
});
// Local-only (same as SQLite)
const local = createClient({ url: 'file:app.db' });
// Embedded replica (best of both)
const replica = createClient({
url: 'file:local.db',
syncUrl: 'libsql://your-db.turso.io',
authToken: process.env.TURSO_AUTH_TOKEN,
});
# CORRECT: In-memory database with production PRAGMAs
def create_test_db():
conn = sqlite3.connect(":memory:")
conn.execute("PRAGMA foreign_keys = ON")
# Apply same migrations as production
with open("db/init.sql") as f:
conn.executescript(f.read())
return conn
# CORRECT: Multiple connections to same in-memory database
conn1 = sqlite3.connect("file::memory:?cache=shared", uri=True)
conn2 = sqlite3.connect("file::memory:?cache=shared", uri=True)
conn1.execute("PRAGMA foreign_keys = ON")
conn2.execute("PRAGMA foreign_keys = ON")
# Both connections share the same database
conn1.execute("CREATE TABLE test (id INTEGER PRIMARY KEY) STRICT")
conn1.commit()
conn2.execute("SELECT * FROM test") # Works — same database
# CORRECT: Isolated test databases
import uuid
def isolated_test_db():
name = f"test_{uuid.uuid4().hex[:8]}"
conn = sqlite3.connect(f"file:{name}?mode=memory&cache=shared", uri=True)
conn.execute("PRAGMA foreign_keys = ON")
return conn
applicationSupportDirectory)isExcludedFromBackupSQLITE_OPEN_FILEPROTECTION_* flagscontext.getDatabasePath())Migration objects// CORRECT: Room database with WAL
Room.databaseBuilder(context, AppDatabase::class.java, "app.db")
.setJournalMode(RoomDatabase.JournalMode.WRITE_AHEAD_LOGGING)
.addMigrations(MIGRATION_1_2)
.build()
better-sqlite3 (synchronous, faster, fewer bugs)app.getPath('userData') for persistent storage/tmp is ephemeral — download database on cold start/tmp is 512MB default (up to 10GB)SQLite relies on POSIX file locking. Network filesystems do not reliably support this:
NEVER use SQLite on:
/mnt/nfs/... NFS mount
/Volumes/... macOS network volume
\\server\share\... Windows UNC path (SMB)
/mnt/cifs/... CIFS mount
Consequences: silent data corruption, WAL failures, lock starvation
# CORRECT: Named volume (local filesystem)
services:
app:
volumes:
- sqlite-data:/app/data
volumes:
sqlite-data: # Local filesystem, reliable locks
# WRONG: Network filesystem bind mount
services:
app:
volumes:
- /mnt/nfs/data:/app/data # Unreliable locks!
# CORRECT: Restrict database file access
chmod 640 app.db app.db-wal app.db-shm
chmod 750 /app/data/ # Directory must be writable for temp files
| Pitfall | Cause | Solution |
|---|---|---|
| SQLITE_BUSY | No busy_timeout | PRAGMA busy_timeout = 5000 |
| FK not enforced | Default is OFF | PRAGMA foreign_keys = ON |
| WAL file grows | No size limit | PRAGMA journal_size_limit = 67108864 |
| Slow bulk inserts | Auto-commit per statement | Use explicit transactions |
| Data corruption | Network filesystem | Use local storage only |
| Slow reads | Missing indexes | Use EXPLAIN QUERY PLAN |
| Type coercion | Non-STRICT table | Use STRICT tables (3.37+) |
npx claudepluginhub jsamuelsen11/claude-config --plugin ccfg-sqliteProvides SQLite patterns for Python projects: WAL mode connections, context managers, async aiosqlite ops, migrations, gotchas, CLI. For local DB state, caching, concurrency.
Guides selection, configuration, and troubleshooting of 12 embedded databases including SQLite (PRAGMA, WAL, FTS5), RocksDB (LSM-tree, compaction), LMDB, BoltDB, BadgerDB, Realm for mobile, desktop, edge apps.
SQLite database implementation using LibSQL client and Drizzle ORM. This skill covers local development, Turso cloud, and Cloudflare D1 deployments.