Database
GatheRing uses PostgreSQL with pgvector for vector similarity search.
pycopg - Database Layer
GatheRing uses pycopg, a high-level Python API for PostgreSQL/PostGIS/TimescaleDB. It wraps asyncpg (async) and psycopg2 (sync) with a unified, pythonic interface.
from pycopg import Database, AsyncDatabase
# Sync usage
db = Database.from_env()
db.list_schemas()
db.list_tables("agent")
# Async usage (recommended for API)
db = AsyncDatabase.from_env()
schemas = await db.list_schemas()
users = await db.execute("SELECT * FROM agent.agents WHERE status = %s", ["active"])
# Connection pooling
from pycopg import PooledDatabase
db = PooledDatabase.from_env(min_size=5, max_size=20)
with db.connection() as conn:
result = conn.execute("SELECT * FROM agent.agents")
# Migrations
from pycopg import Migrator
migrator = Migrator(db, "gathering/db/migrations/")
migrator.migrate()
See pycopg README for full API documentation.
Async Database Service (v1.0)
For async route handlers, GatheRing provides AsyncDatabaseService with connection pooling:
from gathering.db.async_database import AsyncDatabaseService
# Created during FastAPI lifespan startup
async_db = AsyncDatabaseService(database_url)
await async_db.initialize() # Creates pool (min_size=4, max_size=20)
# Usage in route handlers
async with async_db.connection() as conn:
result = await conn.execute(
"SELECT * FROM agent.agents WHERE id = %s", [agent_id]
)
# Shutdown (last in lifespan teardown)
await async_db.shutdown()
The sync DatabaseService is preserved for CLI tools and migrations. Async route handlers should use AsyncDatabaseService for non-blocking DB access.
Lifespan Ordering
Startup: configure_logging -> async DB pool -> scheduler(async_db) -> rate limiter
Shutdown: set_shutting_down -> LB drain (3s) -> scheduler.stop -> task drain (2s)
-> executor.shutdown -> async_db.shutdown (LAST)
Advisory Locks (v1.0)
Multi-instance task coordination uses PostgreSQL advisory locks:
# Prevent duplicate task execution across instances
async with async_db.connection() as conn:
result = await conn.execute(
"SELECT pg_try_advisory_xact_lock(%s, %s)",
[namespace, action_id]
)
acquired = result[0][0] # True if lock acquired
Lock semantics: transaction-scoped, fail-closed on DB error (skip execution rather than risk duplicate).
Low-Level Drivers
Under the hood, pycopg uses:
Driver |
Type |
Usage |
|---|---|---|
|
Async |
Route handlers via AsyncDatabaseService |
|
Sync |
Migrations, CLI tools, blocking operations |
Database Diagram
Full database model available in gathering.dbm (pgModeler format)
Overview
The database is organized into schemas:
Schema |
Purpose |
|---|---|
|
Common types, functions, extensions |
|
Agent definitions and sessions |
|
Users, token blacklist, audit events (v1.0) |
|
Circle management |
|
Conversations and messages |
|
Long-term memory and RAG |
|
Pipeline definitions, runs, node runs (v1.0) |
|
Project and task management |
|
Scheduled actions with execution history (v1.0) |
Setup
Prerequisites
# Install PostgreSQL 15+
sudo apt install postgresql-15
# Install pgvector
sudo apt install postgresql-15-pgvector
Create Database
# Create user and database
sudo -u postgres createuser gathering
sudo -u postgres createdb gathering -O gathering
# Set password
sudo -u postgres psql -c "ALTER USER gathering PASSWORD 'your_password';"
Enable Extensions
-- Connect to database
\c gathering
-- Enable required extensions
CREATE EXTENSION IF NOT EXISTS vector;
CREATE EXTENSION IF NOT EXISTS pg_trgm;
Run Migrations
python -m gathering.db.migrate
Or manually:
for f in gathering/db/migrations/*.sql; do
psql -d gathering -f "$f"
done
Schema Details
Agent Schema
-- agent.agents
CREATE TABLE agent.agents (
id BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
name VARCHAR(100) NOT NULL,
role VARCHAR(100),
provider VARCHAR(50) DEFAULT 'anthropic', -- openai, anthropic, ollama
model VARCHAR(100) DEFAULT 'claude-sonnet-4-20250514',
personality JSONB DEFAULT '{}',
config JSONB DEFAULT '{}',
status agent_status DEFAULT 'idle',
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW()
);
-- agent.sessions
CREATE TABLE agent.sessions (
id BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
agent_id BIGINT REFERENCES agent.agents(id),
started_at TIMESTAMPTZ DEFAULT NOW(),
ended_at TIMESTAMPTZ,
context JSONB DEFAULT '{}'
);
Circle Schema
-- circle.circles
CREATE TABLE circle.circles (
id BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
name VARCHAR(100) UNIQUE NOT NULL,
display_name VARCHAR(200),
description TEXT,
config JSONB DEFAULT '{}',
status circle_status DEFAULT 'created',
created_at TIMESTAMPTZ DEFAULT NOW()
);
-- circle.memberships
CREATE TABLE circle.memberships (
circle_id BIGINT REFERENCES circle.circles(id),
agent_id BIGINT REFERENCES agent.agents(id),
role VARCHAR(50) DEFAULT 'member',
joined_at TIMESTAMPTZ DEFAULT NOW(),
PRIMARY KEY (circle_id, agent_id)
);
Memory Schema
-- memory.memories
CREATE TABLE memory.memories (
id BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
scope memory_scope NOT NULL,
scope_id BIGINT,
agent_id BIGINT REFERENCES agent.agents(id),
memory_type memory_type DEFAULT 'fact',
key VARCHAR(200) NOT NULL,
value TEXT NOT NULL,
embedding vector(1536), -- pgvector
importance FLOAT DEFAULT 0.5,
created_at TIMESTAMPTZ DEFAULT NOW()
);
-- Vector similarity index
CREATE INDEX idx_memories_embedding
ON memory.memories USING ivfflat (embedding vector_cosine_ops)
WITH (lists = 100);
Vector Search
Creating Embeddings
from gathering.memory import EmbeddingService
service = EmbeddingService()
embedding = await service.create_embedding("Some text to embed")
Similarity Search
-- Find similar memories
SELECT id, key, value,
1 - (embedding <=> query_embedding) AS similarity
FROM memory.memories
WHERE embedding IS NOT NULL
AND agent_id = $1
ORDER BY embedding <=> query_embedding
LIMIT 10;
Using the helper function:
from gathering.memory import search_similar_memories
results = await search_similar_memories(
query="What is the user's name?",
agent_id=1,
limit=5,
threshold=0.7
)
Migrations
Creating a Migration
Create a new file in gathering/db/migrations/:
-- 010_add_new_feature.sql
-- Add new column
ALTER TABLE agent.agents ADD COLUMN new_field VARCHAR(100);
-- Create new table
CREATE TABLE agent.new_table (
id BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
-- ...
);
-- Create indexes
CREATE INDEX idx_new_field ON agent.agents(new_field);
Migration Best Practices
Incremental changes: One feature per migration
Backward compatible: Don’t break existing code
Idempotent: Safe to run multiple times
Documented: Comment complex changes
Connection Pooling
GatheRing uses connection pooling for efficiency:
from gathering.db import DatabasePool
# Get pool
pool = await DatabasePool.get_pool()
# Use connection
async with pool.acquire() as conn:
result = await conn.fetch("SELECT * FROM agent.agents")
Configuration
# .env
DATABASE_URL=postgresql://user:pass@localhost:5432/gathering
DATABASE_POOL_MIN=5
DATABASE_POOL_MAX=20
Backup and Restore
Backup
# Full backup
pg_dump -Fc gathering > backup.dump
# Schema only
pg_dump -s gathering > schema.sql
# Data only
pg_dump -a gathering > data.sql
Restore
# Restore full backup
pg_restore -d gathering backup.dump
# Restore from SQL
psql -d gathering -f schema.sql
Performance
Indexes
Key indexes for performance:
-- Agent queries
CREATE INDEX idx_agents_status ON agent.agents(status);
CREATE INDEX idx_agents_name ON agent.agents(name);
-- Memory search
CREATE INDEX idx_memories_agent ON memory.memories(agent_id);
CREATE INDEX idx_memories_scope ON memory.memories(scope, scope_id);
-- Full-text search
CREATE INDEX idx_memories_value_fts
ON memory.memories USING gin(to_tsvector('english', value));
Query Optimization
-- Explain analyze for debugging
EXPLAIN ANALYZE
SELECT * FROM memory.memories
WHERE agent_id = 1 AND scope = 'agent';
-- Check index usage
SELECT indexrelname, idx_scan, idx_tup_read
FROM pg_stat_user_indexes
WHERE schemaname = 'memory';
Troubleshooting
Connection Issues
# Check PostgreSQL is running
sudo systemctl status postgresql
# Check logs
sudo tail -f /var/log/postgresql/postgresql-15-main.log
# Test connection
psql -h localhost -U gathering -d gathering -c "SELECT 1"
pgvector Issues
-- Check extension is installed
SELECT * FROM pg_extension WHERE extname = 'vector';
-- Reinstall if needed
DROP EXTENSION IF EXISTS vector CASCADE;
CREATE EXTENSION vector;
Performance Issues
-- Check table sizes
SELECT relname, pg_size_pretty(pg_total_relation_size(relid))
FROM pg_stat_user_tables
ORDER BY pg_total_relation_size(relid) DESC;
-- Vacuum and analyze
VACUUM ANALYZE memory.memories;