From kineticadb
Activate when the user is building Python applications that interact with Kinetica, using the Kinetica Python SDK (gpudb) or REST API, or building data pipelines that connect to a Kinetica GPU database. Also activate when the user mentions bulk data ingest, Kinetica connectors, or the gpudb package.
How this skill is triggered — by the user, by Claude, or both
Slash command
/kineticadb:kinetica-codeThe summary Claude sees in its skill listing — used to decide when to auto-load this skill
Kinetica is a GPU-accelerated database. This skill teaches you to build applications
Kinetica is a GPU-accelerated database. This skill teaches you to build applications that connect to Kinetica using the Python SDK, REST API, and common data pipeline patterns.
When you need to call Kinetica's REST API directly via curl (e.g., the user requests raw
REST calls, or neither Node.js nor Python SDK is available), you MUST read
references/curl-api-reference.md first — it covers
authentication (.env loading), required flags, common endpoints, response parsing with jq,
and critical gotchas (always POST, never use -u, data_str double-encoding).
Critical: Kinetica SQL has important deviations from standard PostgreSQL. Read references/kinetica-core-rules.md before writing any query.
This skill includes core Kinetica SQL knowledge so you can embed queries in application code.
pip install gpudb
import gpudb
# Basic connection
db = gpudb.GPUdb(
host="https://your-instance.kinetica.com/",
username="your_user",
password="your_password"
)
# With options
options = gpudb.GPUdb.Options()
options.username = "your_user"
options.password = "your_password"
options.skip_ssl_cert_verification = False
db = gpudb.GPUdb(
host="https://your-instance.kinetica.com/",
options=options
)
# Simple query
response = db.execute_sql("SELECT * FROM my_table LIMIT 10")
records = response.records # List of ordered dicts
# Parameterized query
response = db.execute_sql(
"SELECT * FROM my_table WHERE id = ?",
data=[42]
)
# DDL
db.execute_sql("""
CREATE TABLE my_schema.events (
id INT NOT NULL,
event_time TIMESTAMP,
payload VARCHAR(1024)
)
""")
# Create table type and table, then insert records
table_name = "my_schema.sensor_data"
# Insert with SQL
db.execute_sql(f"""
INSERT INTO {table_name} (sensor_id, reading, ts)
VALUES (?, ?, ?)
""", data=[101, 23.5, "2024-01-15 10:30:00"])
# Bulk insert from records
records = [
[101, 23.5, "2024-01-15 10:30:00"],
[102, 18.2, "2024-01-15 10:30:01"],
[103, 45.1, "2024-01-15 10:30:02"]
]
db.insert_records_from_payload(
table_name=table_name,
field_names=["sensor_id", "reading", "ts"],
payload=records
)
For environments where the Python SDK isn't available, use the REST API directly:
import requests
base_url = "https://your-instance.kinetica.com"
auth = ("your_user", "your_password")
# Execute SQL via REST
response = requests.post(
f"{base_url}/execute/sql",
json={
"statement": "SELECT * FROM my_table LIMIT 10",
"encoding": "json"
},
auth=auth
)
result = response.json()
def build_query(table, filters=None, columns="*", limit=100):
"""Build a parameterized Kinetica query."""
query = f"SELECT {columns} FROM {table}"
params = []
if filters:
conditions = []
for col, val in filters.items():
conditions.append(f"{col} = ?")
params.append(val)
query += " WHERE " + " AND ".join(conditions)
query += f" LIMIT {limit}"
return query, params
from contextlib import contextmanager
class KineticaPool:
"""Simple connection wrapper with retry."""
def __init__(self, host, username, password):
self.host = host
self.username = username
self.password = password
self._conn = None
@property
def conn(self):
if self._conn is None:
self._conn = gpudb.GPUdb(
host=self.host,
username=self.username,
password=self.password
)
return self._conn
def query(self, sql, params=None):
"""Execute query with automatic reconnect on failure."""
try:
return self.conn.execute_sql(sql, data=params)
except gpudb.GPUdbException:
self._conn = None # Force reconnect
return self.conn.execute_sql(sql, data=params)
import pandas as pd
def query_to_dataframe(db, sql, params=None):
"""Execute a Kinetica query and return a pandas DataFrame."""
response = db.execute_sql(sql, data=params)
if response.total_number_of_records == 0:
return pd.DataFrame()
return pd.DataFrame(response.records)
# Usage
df = query_to_dataframe(db, """
SELECT region, SUM(revenue) as total_revenue
FROM sales
GROUP BY region
ORDER BY total_revenue DESC
""")
LIMIT on exploration queries to avoid pulling entire tablesgpudb.GPUdbException for connection and query errorsinsert_records_from_payload over row-by-row INSERTnpx claudepluginhub kineticadb/agent-skills --plugin kineticadbGuides complex SQL queries, performance tuning, and data modeling across cloud-native OLTP/OLAP databases. Useful when writing analytics or optimizing query plans.
Optimizes complex SQL queries, tunes performance with indexes and plans, and designs patterns for cloud-native databases and hybrid OLTP/OLAP systems like PostgreSQL, Snowflake, and BigQuery.