From antigravity-awesome-skills
Publishes CloudEvents and EventGridEvents to Azure Event Grid using Python SDK. Supports authentication and event-driven pub/sub architectures.
How this skill is triggered — by the user, by Claude, or both
Slash command
/antigravity-awesome-skills:azure-eventgrid-pyThe summary Claude sees in its skill listing — used to decide when to auto-load this skill
Event routing service for building event-driven applications with pub/sub semantics.
Event routing service for building event-driven applications with pub/sub semantics.
pip install azure-eventgrid azure-identity
EVENTGRID_TOPIC_ENDPOINT=https://<topic-name>.<region>.eventgrid.azure.net/api/events
EVENTGRID_NAMESPACE_ENDPOINT=https://<namespace>.<region>.eventgrid.azure.net
from azure.identity import DefaultAzureCredential
from azure.eventgrid import EventGridPublisherClient
credential = DefaultAzureCredential()
endpoint = "https://<topic-name>.<region>.eventgrid.azure.net/api/events"
client = EventGridPublisherClient(endpoint, credential)
| Format | Class | Use Case |
|---|---|---|
| Cloud Events 1.0 | CloudEvent | Standard, interoperable (recommended) |
| Event Grid Schema | EventGridEvent | Azure-native format |
from azure.eventgrid import EventGridPublisherClient, CloudEvent
from azure.identity import DefaultAzureCredential
client = EventGridPublisherClient(endpoint, DefaultAzureCredential())
# Single event
event = CloudEvent(
type="MyApp.Events.OrderCreated",
source="/myapp/orders",
data={"order_id": "12345", "amount": 99.99}
)
client.send(event)
# Multiple events
events = [
CloudEvent(
type="MyApp.Events.OrderCreated",
source="/myapp/orders",
data={"order_id": f"order-{i}"}
)
for i in range(10)
]
client.send(events)
from azure.eventgrid import EventGridEvent
from datetime import datetime, timezone
event = EventGridEvent(
subject="/myapp/orders/12345",
event_type="MyApp.Events.OrderCreated",
data={"order_id": "12345", "amount": 99.99},
data_version="1.0"
)
client.send(event)
event = CloudEvent(
type="MyApp.Events.ItemCreated", # Required: event type
source="/myapp/items", # Required: event source
data={"key": "value"}, # Event payload
subject="items/123", # Optional: subject/path
datacontenttype="application/json", # Optional: content type
dataschema="https://schema.example", # Optional: schema URL
time=datetime.now(timezone.utc), # Optional: timestamp
extensions={"custom": "value"} # Optional: custom attributes
)
event = EventGridEvent(
subject="/myapp/items/123", # Required: subject
event_type="MyApp.ItemCreated", # Required: event type
data={"key": "value"}, # Required: event payload
data_version="1.0", # Required: schema version
topic="/subscriptions/.../topics/...", # Optional: auto-set
event_time=datetime.now(timezone.utc) # Optional: timestamp
)
from azure.eventgrid.aio import EventGridPublisherClient
from azure.identity.aio import DefaultAzureCredential
async def publish_events():
credential = DefaultAzureCredential()
async with EventGridPublisherClient(endpoint, credential) as client:
event = CloudEvent(
type="MyApp.Events.Test",
source="/myapp",
data={"message": "hello"}
)
await client.send(event)
import asyncio
asyncio.run(publish_events())
For Event Grid Namespaces (pull delivery):
from azure.eventgrid.aio import EventGridPublisherClient
# Namespace endpoint (different from custom topic)
namespace_endpoint = "https://<namespace>.<region>.eventgrid.azure.net"
topic_name = "my-topic"
async with EventGridPublisherClient(
endpoint=namespace_endpoint,
credential=DefaultAzureCredential()
) as client:
await client.send(
event,
namespace_topic=topic_name
)
This skill is applicable to execute the workflow or actions described in the overview.
npx claudepluginhub sickn33/antigravity-awesome-skills --plugin antigravity-awesome-skillsPublishes events to Azure Event Grid using Python SDK. Supports CloudEvents 1.0 and EventGrid schemas with topic and namespace endpoints.
Publishes CloudEvents and EventGridEvents to Azure Event Grid using Python SDK. Supports authentication, batch sending for event-driven apps.
Publishes and consumes events with Azure Event Grid .NET SDK. Handles push/pull delivery to topics/domains/namespaces, CloudEvents, auth via keys/Entra ID/SAS.