From snowflake-pack
Implements production-ready Snowflake SDK patterns for Node.js (snowflake-sdk) and Python (snowflake-connector-python), including connection pooling, async wrappers, and streaming results.
How this skill is triggered — by the user, by Claude, or both
Slash command
/snowflake-pack:snowflake-sdk-patternsThis skill is limited to the following tools:
The summary Claude sees in its skill listing — used to decide when to auto-load this skill
Production-ready patterns for `snowflake-sdk` (Node.js) and `snowflake-connector-python` using real driver APIs.
Production-ready patterns for snowflake-sdk (Node.js) and snowflake-connector-python using real driver APIs.
snowflake-install-auth setup// src/snowflake/pool.ts
import snowflake from 'snowflake-sdk';
interface PoolConfig {
max: number;
idleTimeoutMs: number;
}
class SnowflakePool {
private pool: snowflake.Connection[] = [];
private available: snowflake.Connection[] = [];
private waiting: ((conn: snowflake.Connection) => void)[] = [];
private config: PoolConfig;
constructor(
private connConfig: snowflake.ConnectionOptions,
config: Partial<PoolConfig> = {}
) {
this.config = { max: 10, idleTimeoutMs: 60000, ...config };
}
async acquire(): Promise<snowflake.Connection> {
// Return available connection
if (this.available.length > 0) {
return this.available.pop()!;
}
// Create new if under limit
if (this.pool.length < this.config.max) {
const conn = snowflake.createConnection(this.connConfig);
await new Promise<void>((resolve, reject) => {
conn.connect((err) => (err ? reject(err) : resolve()));
});
this.pool.push(conn);
return conn;
}
// Wait for one to become available
return new Promise((resolve) => {
this.waiting.push(resolve);
});
}
release(conn: snowflake.Connection): void {
if (this.waiting.length > 0) {
const next = this.waiting.shift()!;
next(conn);
} else {
this.available.push(conn);
}
}
async withConnection<T>(fn: (conn: snowflake.Connection) => Promise<T>): Promise<T> {
const conn = await this.acquire();
try {
return await fn(conn);
} finally {
this.release(conn);
}
}
}
// Singleton pool
export const pool = new SnowflakePool({
account: process.env.SNOWFLAKE_ACCOUNT!,
username: process.env.SNOWFLAKE_USER!,
password: process.env.SNOWFLAKE_PASSWORD!,
warehouse: process.env.SNOWFLAKE_WAREHOUSE || 'COMPUTE_WH',
database: process.env.SNOWFLAKE_DATABASE!,
schema: process.env.SNOWFLAKE_SCHEMA || 'PUBLIC',
});
// src/snowflake/query.ts
import snowflake from 'snowflake-sdk';
interface QueryResult<T = Record<string, any>> {
rows: T[];
statement: snowflake.Statement;
sqlText: string;
}
export function query<T = Record<string, any>>(
conn: snowflake.Connection,
sqlText: string,
binds?: snowflake.Binds
): Promise<QueryResult<T>> {
return new Promise((resolve, reject) => {
conn.execute({
sqlText,
binds,
complete: (err, stmt, rows) => {
if (err) {
reject(Object.assign(err, { sqlText }));
} else {
resolve({ rows: (rows || []) as T[], statement: stmt, sqlText });
}
},
});
});
}
// Multi-statement execution
export async function multiQuery(
conn: snowflake.Connection,
statements: string[]
): Promise<QueryResult[]> {
const results: QueryResult[] = [];
for (const sql of statements) {
results.push(await query(conn, sql));
}
return results;
}
// src/snowflake/stream.ts
export async function* streamQuery<T = Record<string, any>>(
conn: snowflake.Connection,
sqlText: string,
binds?: snowflake.Binds
): AsyncGenerator<T> {
const stmt = await new Promise<snowflake.Statement>((resolve, reject) => {
conn.execute({
sqlText,
binds,
streamResult: true,
complete: (err, stmt) => {
if (err) reject(err);
else resolve(stmt);
},
});
});
const stream = stmt.streamRows();
for await (const row of stream) {
yield row as T;
}
}
// Usage: process millions of rows without memory pressure
// for await (const row of streamQuery(conn, 'SELECT * FROM big_table')) {
// await processRow(row);
// }
# src/snowflake_pool.py
import snowflake.connector
from contextlib import contextmanager
from typing import Generator, Any
class SnowflakePool:
def __init__(self, **conn_params):
self._params = conn_params
@contextmanager
def connection(self) -> Generator[snowflake.connector.SnowflakeConnection, None, None]:
conn = snowflake.connector.connect(**self._params)
try:
yield conn
finally:
conn.close()
@contextmanager
def cursor(self) -> Generator[snowflake.connector.cursor.SnowflakeCursor, None, None]:
with self.connection() as conn:
cur = conn.cursor()
try:
yield cur
finally:
cur.close()
def execute(self, sql: str, params: tuple = ()) -> list[dict[str, Any]]:
with self.cursor() as cur:
cur.execute(sql, params)
columns = [desc[0] for desc in cur.description] if cur.description else []
return [dict(zip(columns, row)) for row in cur.fetchall()]
def execute_many(self, sql: str, params_list: list[tuple]) -> int:
"""Batch insert — much faster than individual inserts."""
with self.cursor() as cur:
cur.executemany(sql, params_list)
return cur.rowcount
# Usage
pool = SnowflakePool(
account=os.environ['SNOWFLAKE_ACCOUNT'],
user=os.environ['SNOWFLAKE_USER'],
password=os.environ['SNOWFLAKE_PASSWORD'],
warehouse='COMPUTE_WH',
database='MY_DB',
schema='PUBLIC',
)
users = pool.execute("SELECT * FROM users WHERE status = %s", ('active',))
// src/snowflake/errors.ts
export class SnowflakeQueryError extends Error {
constructor(
message: string,
public readonly sqlState: string,
public readonly code: number,
public readonly sqlText: string,
public readonly retryable: boolean
) {
super(message);
this.name = 'SnowflakeQueryError';
}
}
const RETRYABLE_CODES = new Set([
390114, // Connection token expired — reconnect
390503, // Service unavailable
]);
export async function safeQuery<T>(
conn: snowflake.Connection,
sqlText: string,
binds?: snowflake.Binds,
maxRetries = 3
): Promise<T[]> {
for (let attempt = 1; attempt <= maxRetries; attempt++) {
try {
const { rows } = await query<T>(conn, sqlText, binds);
return rows;
} catch (err: any) {
const retryable = RETRYABLE_CODES.has(err.code);
if (!retryable || attempt === maxRetries) {
throw new SnowflakeQueryError(
err.message, err.sqlState, err.code, sqlText, retryable
);
}
const delay = 1000 * Math.pow(2, attempt - 1);
await new Promise((r) => setTimeout(r, delay));
}
}
throw new Error('Unreachable');
}
| Pattern | Use Case | Benefit |
|---|---|---|
| Connection pool | High concurrency apps | Reuses connections, prevents exhaustion |
| Promise wrapper | All Node.js code | Clean async/await instead of callbacks |
| Streaming | Large result sets (>100K rows) | Constant memory usage |
| Context manager | All Python code | Guarantees connection cleanup |
| Retry with backoff | Transient failures | Handles token expiry, service blips |
Apply patterns in snowflake-core-workflow-a for real-world data pipeline usage.
npx claudepluginhub jeremylongshore/claude-code-plugins-plus-skills --plugin snowflake-packCreates minimal Snowflake examples: connect/query in Node.js/TypeScript/Python, SQL for DB objects. For testing setups, first queries, SDK patterns.
Assists with Snowflake SQL best practices, data pipelines (Dynamic Tables, Streams, Tasks, Snowpipe), Cortex AI, Snowpark Python, dbt, performance tuning, and security hardening.
Searches MemPalace before answering questions about past work, people, projects, or prior decisions. Returns verbatim stored content instead of guessing from model memory.