SQL
Examples
Practical code examples and patterns
SQL Plugin Examples
This document provides practical examples of common database patterns and operations using the ElizaOS plugin-sql system.
Basic Operations
Creating Records
Copy
Ask AI
// Simple insert
const newUser = await db
.insert(userTable)
.values({
name: 'Alice',
email: 'alice@example.com',
isActive: true,
})
.returning();
// Bulk insert
const users = await db
.insert(userTable)
.values([
{ name: 'Bob', email: 'bob@example.com' },
{ name: 'Charlie', email: 'charlie@example.com' },
])
.returning();
// Insert with conflict handling
await db
.insert(userTable)
.values({ email: 'alice@example.com', name: 'Alice Updated' })
.onConflictDoUpdate({
target: userTable.email,
set: { name: 'Alice Updated', updatedAt: sql`now()` },
});
Querying Data
Copy
Ask AI
import { eq, and, or, like, inArray, desc, lt, gte, sql } from 'drizzle-orm';
// Simple select
const users = await db.select().from(userTable);
// Select with conditions
const activeUsers = await db
.select()
.from(userTable)
.where(eq(userTable.isActive, true))
.orderBy(desc(userTable.createdAt))
.limit(10);
// Select specific columns
const userEmails = await db
.select({
email: userTable.email,
name: userTable.name,
})
.from(userTable);
// Complex conditions
const filteredUsers = await db
.select()
.from(userTable)
.where(
and(
eq(userTable.isActive, true),
or(
like(userTable.email, '%@company.com'),
inArray(userTable.role, ['admin', 'moderator'])
)
)
);
Updating Records
Copy
Ask AI
// Update single record
await db
.update(userTable)
.set({
name: 'Updated Name',
updatedAt: sql`now()`,
})
.where(eq(userTable.id, userId));
// Update multiple records
const thirtyDaysAgo = new Date();
thirtyDaysAgo.setDate(thirtyDaysAgo.getDate() - 30);
await db
.update(userTable)
.set({ isActive: false })
.where(lt(userTable.lastLoginAt, thirtyDaysAgo));
// Update with returning
const [updatedUser] = await db
.update(userTable)
.set({ level: sql`${userTable.level} + 1` })
.where(eq(userTable.id, userId))
.returning();
Deleting Records
Copy
Ask AI
// Delete single record
await db.delete(userTable).where(eq(userTable.id, userId));
// Delete with conditions
await db
.delete(sessionTable)
.where(lt(sessionTable.expiresAt, new Date()));
// Soft delete pattern
await db
.update(userTable)
.set({
isDeleted: true,
deletedAt: sql`now()`,
})
.where(eq(userTable.id, userId));
Working with Relationships
One-to-Many
Copy
Ask AI
// Get user with their posts
const userWithPosts = await db
.select({
user: userTable,
posts: postTable,
})
.from(userTable)
.leftJoin(postTable, eq(userTable.id, postTable.authorId))
.where(eq(userTable.id, userId));
// Group posts by user
const usersWithPostCount = await db
.select({
userId: userTable.id,
userName: userTable.name,
postCount: count(postTable.id),
})
.from(userTable)
.leftJoin(postTable, eq(userTable.id, postTable.authorId))
.groupBy(userTable.id);
Many-to-Many
Copy
Ask AI
// Get user's roles through junction table
const userRoles = await db
.select({
role: roleTable,
assignedAt: userRoleTable.assignedAt,
})
.from(userRoleTable)
.innerJoin(roleTable, eq(userRoleTable.roleId, roleTable.id))
.where(eq(userRoleTable.userId, userId));
// Get users with specific role
const admins = await db
.select({
user: userTable,
})
.from(userTable)
.innerJoin(userRoleTable, eq(userTable.id, userRoleTable.userId))
.innerJoin(roleTable, eq(userRoleTable.roleId, roleTable.id))
.where(eq(roleTable.name, 'admin'));
Advanced Queries
Aggregations
Copy
Ask AI
// Count, sum, average
const stats = await db
.select({
totalUsers: count(userTable.id),
avgAge: avg(userTable.age),
totalRevenue: sum(orderTable.amount),
})
.from(userTable)
.leftJoin(orderTable, eq(userTable.id, orderTable.userId));
// Group by with having
const activeCategories = await db
.select({
category: productTable.category,
productCount: count(productTable.id),
avgPrice: avg(productTable.price),
})
.from(productTable)
.where(eq(productTable.isActive, true))
.groupBy(productTable.category)
.having(gte(count(productTable.id), 5));
Subqueries
Copy
Ask AI
// Subquery in select
const usersWithLatestPost = await db
.select({
user: userTable,
latestPostId: sql<string>`(
SELECT id FROM ${postTable}
WHERE ${postTable.authorId} = ${userTable.id}
ORDER BY ${postTable.createdAt} DESC
LIMIT 1
)`,
})
.from(userTable);
// Subquery in where
const usersWithRecentActivity = await db
.select()
.from(userTable)
.where(
sql`EXISTS (
SELECT 1 FROM ${activityTable}
WHERE ${activityTable.userId} = ${userTable.id}
AND ${activityTable.createdAt} > NOW() - INTERVAL '7 days'
)`
);
Window Functions
Copy
Ask AI
// Row number for pagination
const rankedUsers = await db
.select({
id: userTable.id,
name: userTable.name,
score: userTable.score,
rank: sql<number>`ROW_NUMBER() OVER (ORDER BY ${userTable.score} DESC)`,
})
.from(userTable);
// Running totals
const salesWithRunningTotal = await db
.select({
date: salesTable.date,
amount: salesTable.amount,
runningTotal: sql<number>`
SUM(${salesTable.amount}) OVER (
ORDER BY ${salesTable.date}
ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
)
`,
})
.from(salesTable)
.orderBy(salesTable.date);
Transaction Patterns
Basic Transactions
Copy
Ask AI
// Simple transaction
await db.transaction(async (tx) => {
// Create user
const [user] = await tx
.insert(userTable)
.values({ name: 'John', email: 'john@example.com' })
.returning();
// Create profile
await tx
.insert(profileTable)
.values({ userId: user.id, bio: 'New user' });
// Create initial settings
await tx
.insert(settingsTable)
.values({ userId: user.id, theme: 'light' });
});
Conditional Transactions
Copy
Ask AI
// Transaction with rollback
try {
await db.transaction(async (tx) => {
const [user] = await tx
.select()
.from(userTable)
.where(eq(userTable.id, userId))
.for('update'); // Lock row
if (user.balance < amount) {
throw new Error('Insufficient balance');
}
// Deduct from sender
await tx
.update(userTable)
.set({ balance: sql`${userTable.balance} - ${amount}` })
.where(eq(userTable.id, userId));
// Add to receiver
await tx
.update(userTable)
.set({ balance: sql`${userTable.balance} + ${amount}` })
.where(eq(userTable.id, receiverId));
// Log transaction
await tx
.insert(transactionTable)
.values({
fromUserId: userId,
toUserId: receiverId,
amount,
type: 'transfer',
});
});
} catch (error) {
console.error('Transaction failed:', error);
// Transaction automatically rolled back
}
Plugin Integration Examples
Memory Storage Plugin
Copy
Ask AI
// Plugin schema definition
export const memoryTable = pgTable('plugin_memories', {
id: uuid('id').primaryKey().defaultRandom(),
agentId: uuid('agent_id')
.notNull()
.references(() => agentTable.id, { onDelete: 'cascade' }),
type: text('type').notNull(),
content: text('content').notNull(),
// Vector for similarity search
embedding: vector('embedding', { dimensions: 1536 }),
// Metadata
metadata: jsonb('metadata').$type<{
source?: string;
confidence?: number;
tags?: string[];
}>().default(sql`'{}'::jsonb`),
createdAt: timestamp('created_at').default(sql`now()`),
}, (table) => ({
// Index for vector similarity search
embeddingIdx: index('memories_embedding_idx')
.using('ivfflat')
.on(table.embedding.op('vector_ip_ops')),
// Regular indexes
agentTypeIdx: index('memories_agent_type_idx')
.on(table.agentId, table.type),
}));
// Memory service
export class MemoryService {
async storeMemory(agentId: string, content: string, embedding: number[]) {
return await db
.insert(memoryTable)
.values({
agentId,
content,
type: 'conversation',
embedding,
metadata: {
source: 'chat',
confidence: 0.95,
},
})
.returning();
}
async findSimilarMemories(agentId: string, embedding: number[], limit = 10) {
return await db
.select({
id: memoryTable.id,
content: memoryTable.content,
similarity: sql<number>`1 - (${memoryTable.embedding} <=> ${embedding})`,
})
.from(memoryTable)
.where(eq(memoryTable.agentId, agentId))
.orderBy(sql`${memoryTable.embedding} <=> ${embedding}`)
.limit(limit);
}
}
Analytics Plugin
Copy
Ask AI
// Event tracking schema
export const eventTable = pgTable('analytics_events', {
id: uuid('id').primaryKey().defaultRandom(),
agentId: uuid('agent_id').notNull(),
// Event details
name: text('name').notNull(),
category: text('category'),
// User context
userId: uuid('user_id'),
sessionId: uuid('session_id'),
// Event data
properties: jsonb('properties'),
// Timing
timestamp: timestamp('timestamp').default(sql`now()`),
}, (table) => ({
// Indexes for common queries
agentTimestampIdx: index('events_agent_timestamp_idx')
.on(table.agentId, table.timestamp),
nameIdx: index('events_name_idx').on(table.name),
}));
// Analytics service
export class AnalyticsService {
async trackEvent(event: {
agentId: string;
name: string;
userId?: string;
properties?: Record<string, any>;
}) {
await db.insert(eventTable).values(event);
}
async getEventStats(agentId: string, days = 7) {
const startDate = new Date();
startDate.setDate(startDate.getDate() - days);
return await db
.select({
name: eventTable.name,
count: count(eventTable.id),
uniqueUsers: countDistinct(eventTable.userId),
})
.from(eventTable)
.where(
and(
eq(eventTable.agentId, agentId),
gte(eventTable.timestamp, startDate)
)
)
.groupBy(eventTable.name)
.orderBy(desc(count(eventTable.id)));
}
}
Task Queue Plugin
Copy
Ask AI
// Task queue schema
export const taskQueueTable = pgTable('task_queue', {
id: uuid('id').primaryKey().defaultRandom(),
// Task identification
type: text('type').notNull(),
priority: integer('priority').default(0),
// Task data
payload: jsonb('payload').notNull(),
// Execution control
status: text('status').default('pending'), // pending, processing, completed, failed
attempts: integer('attempts').default(0),
maxAttempts: integer('max_attempts').default(3),
// Scheduling
scheduledFor: timestamp('scheduled_for').default(sql`now()`),
// Execution results
result: jsonb('result'),
error: text('error'),
// Timestamps
createdAt: timestamp('created_at').default(sql`now()`),
startedAt: timestamp('started_at'),
completedAt: timestamp('completed_at'),
}, (table) => ({
// Index for queue processing
queueIdx: index('task_queue_idx')
.on(table.status, table.scheduledFor, table.priority),
}));
// Task queue service
export class TaskQueueService {
async enqueueTask(task: {
type: string;
payload: any;
priority?: number;
scheduledFor?: Date;
}) {
return await db.insert(taskQueueTable).values(task).returning();
}
async getNextTask() {
return await db.transaction(async (tx) => {
// Get next available task
const [task] = await tx
.select()
.from(taskQueueTable)
.where(
and(
eq(taskQueueTable.status, 'pending'),
lte(taskQueueTable.scheduledFor, new Date()),
lt(taskQueueTable.attempts, taskQueueTable.maxAttempts)
)
)
.orderBy(
desc(taskQueueTable.priority),
asc(taskQueueTable.scheduledFor)
)
.limit(1)
.for('update skip locked'); // Skip locked rows
if (!task) return null;
// Mark as processing
await tx
.update(taskQueueTable)
.set({
status: 'processing',
startedAt: sql`now()`,
attempts: sql`${taskQueueTable.attempts} + 1`,
})
.where(eq(taskQueueTable.id, task.id));
return task;
});
}
async completeTask(taskId: string, result: any) {
await db
.update(taskQueueTable)
.set({
status: 'completed',
result,
completedAt: sql`now()`,
})
.where(eq(taskQueueTable.id, taskId));
}
async failTask(taskId: string, error: string) {
await db
.update(taskQueueTable)
.set({
status: 'failed',
error,
completedAt: sql`now()`,
})
.where(eq(taskQueueTable.id, taskId));
}
}
Performance Optimization
Batch Operations
Copy
Ask AI
// Batch insert with chunks
async function batchInsert<T>(
table: any,
data: T[],
chunkSize = 1000
) {
for (let i = 0; i < data.length; i += chunkSize) {
const chunk = data.slice(i, i + chunkSize);
await db.insert(table).values(chunk);
}
}
// Batch update
async function batchUpdate(updates: Array<{ id: string; data: any }>) {
const updatePromises = updates.map(({ id, data }) =>
db.update(userTable).set(data).where(eq(userTable.id, id))
);
// Execute in parallel with concurrency limit
const results = [];
for (let i = 0; i < updatePromises.length; i += 10) {
const batch = updatePromises.slice(i, i + 10);
results.push(...(await Promise.all(batch)));
}
return results;
}
Query Optimization
Copy
Ask AI
// Use covering indexes
const optimizedQuery = await db
.select({
id: userTable.id,
name: userTable.name,
email: userTable.email,
})
.from(userTable)
.where(eq(userTable.isActive, true))
.orderBy(userTable.createdAt)
.limit(100);
// Avoid N+1 queries - use joins
const usersWithPosts = await db
.select({
user: userTable,
posts: sql<any[]>`
COALESCE(
json_agg(
json_build_object(
'id', ${postTable.id},
'title', ${postTable.title}
) ORDER BY ${postTable.createdAt} DESC
) FILTER (WHERE ${postTable.id} IS NOT NULL),
'[]'
)
`,
})
.from(userTable)
.leftJoin(postTable, eq(userTable.id, postTable.authorId))
.groupBy(userTable.id);
Was this page helpful?
On this page
- SQL Plugin Examples
- Basic Operations
- Creating Records
- Querying Data
- Updating Records
- Deleting Records
- Working with Relationships
- One-to-Many
- Many-to-Many
- Advanced Queries
- Aggregations
- Subqueries
- Window Functions
- Transaction Patterns
- Basic Transactions
- Conditional Transactions
- Plugin Integration Examples
- Memory Storage Plugin
- Analytics Plugin
- Task Queue Plugin
- Performance Optimization
- Batch Operations
- Query Optimization
Assistant
Responses are generated using AI and may contain mistakes.