SQL Plugin Examples

This document provides practical examples of common database patterns and operations using the ElizaOS plugin-sql system.

Basic Operations

Creating Records

// Simple insert
const newUser = await db
  .insert(userTable)
  .values({
    name: 'Alice',
    email: 'alice@example.com',
    isActive: true,
  })
  .returning();

// Bulk insert
const users = await db
  .insert(userTable)
  .values([
    { name: 'Bob', email: 'bob@example.com' },
    { name: 'Charlie', email: 'charlie@example.com' },
  ])
  .returning();

// Insert with conflict handling
await db
  .insert(userTable)
  .values({ email: 'alice@example.com', name: 'Alice Updated' })
  .onConflictDoUpdate({
    target: userTable.email,
    set: { name: 'Alice Updated', updatedAt: sql`now()` },
  });

Querying Data

import { eq, and, or, like, inArray, desc, lt, gte, sql } from 'drizzle-orm';

// Simple select
const users = await db.select().from(userTable);

// Select with conditions
const activeUsers = await db
  .select()
  .from(userTable)
  .where(eq(userTable.isActive, true))
  .orderBy(desc(userTable.createdAt))
  .limit(10);

// Select specific columns
const userEmails = await db
  .select({
    email: userTable.email,
    name: userTable.name,
  })
  .from(userTable);

// Complex conditions
const filteredUsers = await db
  .select()
  .from(userTable)
  .where(
    and(
      eq(userTable.isActive, true),
      or(
        like(userTable.email, '%@company.com'),
        inArray(userTable.role, ['admin', 'moderator'])
      )
    )
  );

Updating Records

// Update single record
await db
  .update(userTable)
  .set({
    name: 'Updated Name',
    updatedAt: sql`now()`,
  })
  .where(eq(userTable.id, userId));

// Update multiple records
const thirtyDaysAgo = new Date();
thirtyDaysAgo.setDate(thirtyDaysAgo.getDate() - 30);

await db
  .update(userTable)
  .set({ isActive: false })
  .where(lt(userTable.lastLoginAt, thirtyDaysAgo));

// Update with returning
const [updatedUser] = await db
  .update(userTable)
  .set({ level: sql`${userTable.level} + 1` })
  .where(eq(userTable.id, userId))
  .returning();

Deleting Records

// Delete single record
await db.delete(userTable).where(eq(userTable.id, userId));

// Delete with conditions
await db
  .delete(sessionTable)
  .where(lt(sessionTable.expiresAt, new Date()));

// Soft delete pattern
await db
  .update(userTable)
  .set({
    isDeleted: true,
    deletedAt: sql`now()`,
  })
  .where(eq(userTable.id, userId));

Working with Relationships

One-to-Many

// Get user with their posts
const userWithPosts = await db
  .select({
    user: userTable,
    posts: postTable,
  })
  .from(userTable)
  .leftJoin(postTable, eq(userTable.id, postTable.authorId))
  .where(eq(userTable.id, userId));

// Group posts by user
const usersWithPostCount = await db
  .select({
    userId: userTable.id,
    userName: userTable.name,
    postCount: count(postTable.id),
  })
  .from(userTable)
  .leftJoin(postTable, eq(userTable.id, postTable.authorId))
  .groupBy(userTable.id);

Many-to-Many

// Get user's roles through junction table
const userRoles = await db
  .select({
    role: roleTable,
    assignedAt: userRoleTable.assignedAt,
  })
  .from(userRoleTable)
  .innerJoin(roleTable, eq(userRoleTable.roleId, roleTable.id))
  .where(eq(userRoleTable.userId, userId));

// Get users with specific role
const admins = await db
  .select({
    user: userTable,
  })
  .from(userTable)
  .innerJoin(userRoleTable, eq(userTable.id, userRoleTable.userId))
  .innerJoin(roleTable, eq(userRoleTable.roleId, roleTable.id))
  .where(eq(roleTable.name, 'admin'));

Advanced Queries

Aggregations

// Count, sum, average
const stats = await db
  .select({
    totalUsers: count(userTable.id),
    avgAge: avg(userTable.age),
    totalRevenue: sum(orderTable.amount),
  })
  .from(userTable)
  .leftJoin(orderTable, eq(userTable.id, orderTable.userId));

// Group by with having
const activeCategories = await db
  .select({
    category: productTable.category,
    productCount: count(productTable.id),
    avgPrice: avg(productTable.price),
  })
  .from(productTable)
  .where(eq(productTable.isActive, true))
  .groupBy(productTable.category)
  .having(gte(count(productTable.id), 5));

Subqueries

// Subquery in select
const usersWithLatestPost = await db
  .select({
    user: userTable,
    latestPostId: sql<string>`(
      SELECT id FROM ${postTable}
      WHERE ${postTable.authorId} = ${userTable.id}
      ORDER BY ${postTable.createdAt} DESC
      LIMIT 1
    )`,
  })
  .from(userTable);

// Subquery in where
const usersWithRecentActivity = await db
  .select()
  .from(userTable)
  .where(
    sql`EXISTS (
      SELECT 1 FROM ${activityTable}
      WHERE ${activityTable.userId} = ${userTable.id}
      AND ${activityTable.createdAt} > NOW() - INTERVAL '7 days'
    )`
  );

Window Functions

// Row number for pagination
const rankedUsers = await db
  .select({
    id: userTable.id,
    name: userTable.name,
    score: userTable.score,
    rank: sql<number>`ROW_NUMBER() OVER (ORDER BY ${userTable.score} DESC)`,
  })
  .from(userTable);

// Running totals
const salesWithRunningTotal = await db
  .select({
    date: salesTable.date,
    amount: salesTable.amount,
    runningTotal: sql<number>`
      SUM(${salesTable.amount}) OVER (
        ORDER BY ${salesTable.date}
        ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
      )
    `,
  })
  .from(salesTable)
  .orderBy(salesTable.date);

Transaction Patterns

Basic Transactions

// Simple transaction
await db.transaction(async (tx) => {
  // Create user
  const [user] = await tx
    .insert(userTable)
    .values({ name: 'John', email: 'john@example.com' })
    .returning();

  // Create profile
  await tx
    .insert(profileTable)
    .values({ userId: user.id, bio: 'New user' });

  // Create initial settings
  await tx
    .insert(settingsTable)
    .values({ userId: user.id, theme: 'light' });
});

Conditional Transactions

// Transaction with rollback
try {
  await db.transaction(async (tx) => {
    const [user] = await tx
      .select()
      .from(userTable)
      .where(eq(userTable.id, userId))
      .for('update'); // Lock row

    if (user.balance < amount) {
      throw new Error('Insufficient balance');
    }

    // Deduct from sender
    await tx
      .update(userTable)
      .set({ balance: sql`${userTable.balance} - ${amount}` })
      .where(eq(userTable.id, userId));

    // Add to receiver
    await tx
      .update(userTable)
      .set({ balance: sql`${userTable.balance} + ${amount}` })
      .where(eq(userTable.id, receiverId));

    // Log transaction
    await tx
      .insert(transactionTable)
      .values({
        fromUserId: userId,
        toUserId: receiverId,
        amount,
        type: 'transfer',
      });
  });
} catch (error) {
  console.error('Transaction failed:', error);
  // Transaction automatically rolled back
}

Plugin Integration Examples

Memory Storage Plugin

// Plugin schema definition
export const memoryTable = pgTable('plugin_memories', {
  id: uuid('id').primaryKey().defaultRandom(),
  agentId: uuid('agent_id')
    .notNull()
    .references(() => agentTable.id, { onDelete: 'cascade' }),
  
  type: text('type').notNull(),
  content: text('content').notNull(),
  
  // Vector for similarity search
  embedding: vector('embedding', { dimensions: 1536 }),
  
  // Metadata
  metadata: jsonb('metadata').$type<{
    source?: string;
    confidence?: number;
    tags?: string[];
  }>().default(sql`'{}'::jsonb`),
  
  createdAt: timestamp('created_at').default(sql`now()`),
}, (table) => ({
  // Index for vector similarity search
  embeddingIdx: index('memories_embedding_idx')
    .using('ivfflat')
    .on(table.embedding.op('vector_ip_ops')),
  
  // Regular indexes
  agentTypeIdx: index('memories_agent_type_idx')
    .on(table.agentId, table.type),
}));

// Memory service
export class MemoryService {
  async storeMemory(agentId: string, content: string, embedding: number[]) {
    return await db
      .insert(memoryTable)
      .values({
        agentId,
        content,
        type: 'conversation',
        embedding,
        metadata: {
          source: 'chat',
          confidence: 0.95,
        },
      })
      .returning();
  }

  async findSimilarMemories(agentId: string, embedding: number[], limit = 10) {
    return await db
      .select({
        id: memoryTable.id,
        content: memoryTable.content,
        similarity: sql<number>`1 - (${memoryTable.embedding} <=> ${embedding})`,
      })
      .from(memoryTable)
      .where(eq(memoryTable.agentId, agentId))
      .orderBy(sql`${memoryTable.embedding} <=> ${embedding}`)
      .limit(limit);
  }
}

Analytics Plugin

// Event tracking schema
export const eventTable = pgTable('analytics_events', {
  id: uuid('id').primaryKey().defaultRandom(),
  agentId: uuid('agent_id').notNull(),
  
  // Event details
  name: text('name').notNull(),
  category: text('category'),
  
  // User context
  userId: uuid('user_id'),
  sessionId: uuid('session_id'),
  
  // Event data
  properties: jsonb('properties'),
  
  // Timing
  timestamp: timestamp('timestamp').default(sql`now()`),
}, (table) => ({
  // Indexes for common queries
  agentTimestampIdx: index('events_agent_timestamp_idx')
    .on(table.agentId, table.timestamp),
  nameIdx: index('events_name_idx').on(table.name),
}));

// Analytics service
export class AnalyticsService {
  async trackEvent(event: {
    agentId: string;
    name: string;
    userId?: string;
    properties?: Record<string, any>;
  }) {
    await db.insert(eventTable).values(event);
  }

  async getEventStats(agentId: string, days = 7) {
    const startDate = new Date();
    startDate.setDate(startDate.getDate() - days);

    return await db
      .select({
        name: eventTable.name,
        count: count(eventTable.id),
        uniqueUsers: countDistinct(eventTable.userId),
      })
      .from(eventTable)
      .where(
        and(
          eq(eventTable.agentId, agentId),
          gte(eventTable.timestamp, startDate)
        )
      )
      .groupBy(eventTable.name)
      .orderBy(desc(count(eventTable.id)));
  }
}

Task Queue Plugin

// Task queue schema
export const taskQueueTable = pgTable('task_queue', {
  id: uuid('id').primaryKey().defaultRandom(),
  
  // Task identification
  type: text('type').notNull(),
  priority: integer('priority').default(0),
  
  // Task data
  payload: jsonb('payload').notNull(),
  
  // Execution control
  status: text('status').default('pending'), // pending, processing, completed, failed
  attempts: integer('attempts').default(0),
  maxAttempts: integer('max_attempts').default(3),
  
  // Scheduling
  scheduledFor: timestamp('scheduled_for').default(sql`now()`),
  
  // Execution results
  result: jsonb('result'),
  error: text('error'),
  
  // Timestamps
  createdAt: timestamp('created_at').default(sql`now()`),
  startedAt: timestamp('started_at'),
  completedAt: timestamp('completed_at'),
}, (table) => ({
  // Index for queue processing
  queueIdx: index('task_queue_idx')
    .on(table.status, table.scheduledFor, table.priority),
}));

// Task queue service
export class TaskQueueService {
  async enqueueTask(task: {
    type: string;
    payload: any;
    priority?: number;
    scheduledFor?: Date;
  }) {
    return await db.insert(taskQueueTable).values(task).returning();
  }

  async getNextTask() {
    return await db.transaction(async (tx) => {
      // Get next available task
      const [task] = await tx
        .select()
        .from(taskQueueTable)
        .where(
          and(
            eq(taskQueueTable.status, 'pending'),
            lte(taskQueueTable.scheduledFor, new Date()),
            lt(taskQueueTable.attempts, taskQueueTable.maxAttempts)
          )
        )
        .orderBy(
          desc(taskQueueTable.priority),
          asc(taskQueueTable.scheduledFor)
        )
        .limit(1)
        .for('update skip locked'); // Skip locked rows

      if (!task) return null;

      // Mark as processing
      await tx
        .update(taskQueueTable)
        .set({
          status: 'processing',
          startedAt: sql`now()`,
          attempts: sql`${taskQueueTable.attempts} + 1`,
        })
        .where(eq(taskQueueTable.id, task.id));

      return task;
    });
  }

  async completeTask(taskId: string, result: any) {
    await db
      .update(taskQueueTable)
      .set({
        status: 'completed',
        result,
        completedAt: sql`now()`,
      })
      .where(eq(taskQueueTable.id, taskId));
  }

  async failTask(taskId: string, error: string) {
    await db
      .update(taskQueueTable)
      .set({
        status: 'failed',
        error,
        completedAt: sql`now()`,
      })
      .where(eq(taskQueueTable.id, taskId));
  }
}

Performance Optimization

Batch Operations

// Batch insert with chunks
async function batchInsert<T>(
  table: any,
  data: T[],
  chunkSize = 1000
) {
  for (let i = 0; i < data.length; i += chunkSize) {
    const chunk = data.slice(i, i + chunkSize);
    await db.insert(table).values(chunk);
  }
}

// Batch update
async function batchUpdate(updates: Array<{ id: string; data: any }>) {
  const updatePromises = updates.map(({ id, data }) =>
    db.update(userTable).set(data).where(eq(userTable.id, id))
  );
  
  // Execute in parallel with concurrency limit
  const results = [];
  for (let i = 0; i < updatePromises.length; i += 10) {
    const batch = updatePromises.slice(i, i + 10);
    results.push(...(await Promise.all(batch)));
  }
  return results;
}

Query Optimization

// Use covering indexes
const optimizedQuery = await db
  .select({
    id: userTable.id,
    name: userTable.name,
    email: userTable.email,
  })
  .from(userTable)
  .where(eq(userTable.isActive, true))
  .orderBy(userTable.createdAt)
  .limit(100);

// Avoid N+1 queries - use joins
const usersWithPosts = await db
  .select({
    user: userTable,
    posts: sql<any[]>`
      COALESCE(
        json_agg(
          json_build_object(
            'id', ${postTable.id},
            'title', ${postTable.title}
          ) ORDER BY ${postTable.createdAt} DESC
        ) FILTER (WHERE ${postTable.id} IS NOT NULL),
        '[]'
      )
    `,
  })
  .from(userTable)
  .leftJoin(postTable, eq(userTable.id, postTable.authorId))
  .groupBy(userTable.id);