Architecture
This document describes the architecture of GatheRing.
System Overview
┌─────────────────────────────────────────────────────────────────┐
│ Dashboard (React) │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ Agents │ │ Circles │ │ Workspace│ │ Calendar │ │
│ └──────────┘ └──────────┘ └──────────┘ └──────────┘ │
└─────────────────────────┬───────────────────────────────────────┘
│ HTTP/WebSocket
┌─────────────────────────┴───────────────────────────────────────┐
│ FastAPI Backend │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ API │ │ WebSocket│ │ Event Bus│ │ Skills │ │
│ │ Routers │ │ Server │ │ (In-Mem) │ │ System │ │
│ └──────────┘ └──────────┘ └──────────┘ └──────────┘ │
└─────────────────────────┬───────────────────────────────────────┘
│
┌─────────────────────────┴───────────────────────────────────────┐
│ Core Services │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ Agent │ │ Circle │ │ Memory │ │Conversation│ │
│ │ Service │ │ Service │ │ Service │ │ Service │ │
│ └──────────┘ └──────────┘ └──────────┘ └──────────┘ │
└─────────────────────────┬───────────────────────────────────────┘
│
┌─────────────────────────┴───────────────────────────────────────┐
│ Data Layer │
│ ┌────────────────┐ ┌────────────────┐ ┌────────────────┐ │
│ │ PostgreSQL │ │ Redis (opt.) │ │ File System │ │
│ │ + pgvector │ │ Rate limit bk │ │ (Workspace) │ │
│ │ + adv. locks │ │ │ │ │ │
│ └────────────────┘ └────────────────┘ └────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
Directory Structure
gathering/
├── gathering/ # Python backend
│ ├── api/ # FastAPI routers
│ ├── core/ # Core domain logic
│ ├── agents/ # Agent implementations
│ ├── circles/ # Circle management
│ ├── memory/ # Memory systems
│ ├── skills/ # Skills framework
│ ├── tools/ # Agent tools
│ ├── db/ # Database layer
│ └── websocket/ # WebSocket handlers
├── dashboard/ # React frontend
│ ├── src/
│ │ ├── components/ # React components
│ │ ├── pages/ # Page components
│ │ ├── stores/ # Zustand stores
│ │ ├── api/ # API client
│ │ └── lib/ # Utilities
│ └── public/
├── docs/ # Documentation
├── tests/ # Test suite
├── personas/ # Agent personas
└── scripts/ # Utility scripts
Core Components
Agent System
Agents are the primary actors in GatheRing.
class Agent:
"""Core agent class."""
id: int
name: str
role: str
provider: str # openai, anthropic, ollama
model: str # provider-specific model name
personality: PersonalityConfig
memory: MemorySystem
tools: list[Tool]
Key concepts:
Personality: Traits that influence behavior
Memory: Short-term and long-term storage
Tools: Capabilities the agent can use
Sessions: Track agent activity
Circle System
Circles organize agents into collaborative groups.
class Circle:
"""Circle for agent collaboration."""
id: int
name: str
agents: list[Agent]
config: CircleConfig
state: CircleState
Features:
Agent membership management
Conversation hosting
Shared memory scope
Configurable conversation modes
Memory System
Multi-layered memory architecture:
┌─────────────────────────────────────┐
│ Semantic Search │
│ (pgvector + RAG) │
├─────────────────────────────────────┤
│ Long-term Memory │
│ (PostgreSQL + embeddings) │
├─────────────────────────────────────┤
│ Short-term Memory │
│ (Session context) │
└─────────────────────────────────────┘
Memory scopes:
global: Shared across all agentscircle: Shared within a circleagent: Private to an agentconversation: Specific to a conversation
Pipeline Engine
DAG-based workflow execution with topological traversal:
┌─────────┐ ┌─────────┐ ┌─────────┐
│ Start │────>│ Agent A │────>│ End │
└─────────┘ └────┬────┘ └─────────┘
│ ▲
▼ │
┌─────────┐ ┌─────────┐
│Condition│───>│ Agent B │
└─────────┘ └─────────┘
Key components:
PipelineExecutor: Traverses DAG in topological order via
graphlib.TopologicalSorterNode dispatchers: 6 types – agent, condition, action, transform, filter, merge
CircuitBreaker: Per-node with configurable failure threshold
Retry: Exponential backoff via
tenacity, only retriesNodeExecutionErrorPipelineRunManager: Tracks active runs, enforces timeout (
asyncio.timeout), handles cancellation (cooperative then forced)Validation: Rejects cyclic graphs, enforces node schema before execution
Scheduler
Cron-based action dispatch with crash recovery:
Action types:
run_task,execute_pipeline,send_notification,call_apiCrash recovery: Detects missed runs on startup, deduplicates via execution history
Advisory locks: Multi-instance coordination via
pg_try_advisory_xact_lockDispatcher functions: Module-level async functions, not methods, for clean testability
Event System
Publish-subscribe event bus (in-memory with backpressure):
# Publishing events
await event_bus.publish("agent.message", {
"agent_id": 1,
"content": "Hello!"
})
# Subscribing to events
@event_bus.subscribe("agent.message")
async def handle_message(event):
print(f"Agent said: {event['content']}")
Event types:
agent.*: Agent lifecycle eventscircle.*: Circle eventsconversation.*: Conversation eventsworkspace.*: Workspace events
API Layer
REST Endpoints
Organized by domain:
/agents # Agent CRUD
/circles # Circle management
/conversations # Conversation handling
/workspace # Workspace operations
/memory # Memory access
WebSocket Channels
Real-time communication:
/ws/circles/{name} # Circle events
/ws/workspace/{id} # Workspace updates
/ws/agents/{id} # Agent streams
Message format:
{
"type": "message",
"data": {
"agent_id": 1,
"content": "Hello!",
"timestamp": "2025-01-01T00:00:00Z"
}
}
Database Schema
Core Tables
agent.agents # Agent definitions
agent.sessions # Agent sessions
agent.tools # Agent tools
circle.circles # Circle definitions
circle.memberships # Agent-circle relationships
conversation.conversations # Conversations
conversation.turns # Conversation turns
conversation.messages # Messages
memory.memories # Long-term memories
memory.embeddings_cache # Cached embeddings
memory.knowledge_base # Shared knowledge
project.projects # Projects
project.tasks # Tasks
Schemas
public: Common types and functionsagent: Agent-related tablescircle: Circle-related tablesconversation: Conversation tablesmemory: Memory and RAGproject: Project management
Frontend Architecture
Tech Stack
React 18: UI framework
TypeScript: Type safety
Zustand: State management
TanStack Query: Data fetching
Tailwind CSS: Styling
Monaco Editor: Code editing
xterm.js: Terminal emulation
State Management
// Zustand store example
const useAgentStore = create((set) => ({
agents: [],
selectedAgent: null,
setAgents: (agents) => set({ agents }),
selectAgent: (agent) => set({ selectedAgent: agent }),
}));
Component Structure
src/
├── components/
│ ├── ui/ # Base UI components
│ ├── agents/ # Agent components
│ ├── circles/ # Circle components
│ └── workspace/ # Workspace components
├── pages/
│ ├── Dashboard.tsx
│ ├── AgentDetail.tsx
│ ├── CircleView.tsx
│ └── Workspace.tsx
└── stores/
├── agentStore.ts
├── circleStore.ts
└── workspaceStore.ts
Security
Authentication
JWT tokens: PyJWT with HS256, DB-persisted token blacklist (write-through LRU cache + PostgreSQL)
Password hashing: Direct bcrypt (>= 4.0.0), constant-time comparisons to prevent timing attacks
Audit logging: Auth events (login, logout, token creation, failed attempts) logged to audit table
Token lifecycle: Creation, expiry, blacklist cleanup, concurrent use, refresh – all tested
API Security
Input validation with Pydantic
SQL injection prevention – all queries use parameterized statements via
safe_update_builderPath traversal defense –
validate_file_pathdouble-decodes URLs, blocks../,%2e%2e/, symlink escapeRate limiting – per-endpoint tiers (strict/standard/relaxed/bulk) via slowapi with 429 + Retry-After headers
CORS configuration
Structured logging (structlog) with JSON output and request correlation IDs
Agent Security
Tool permission system
JSON Schema validation on all tool parameters before execution
Filesystem sandboxing
Code execution isolation
Memory scope isolation
Scalability
Multi-Instance Support
Advisory locks:
pg_try_advisory_xact_lock(namespace, action_id)prevents duplicate task execution across instancesFail-closed: DB errors return False (skip execution) rather than risk duplicates
Graceful shutdown: Ordered teardown – stop accepting requests, drain in-flight, stop scheduler, close DB pool last
Readiness probe:
/health/readyreturns 503 during shutdown for load balancer integration
Horizontal Scaling
Stateless API servers with PostgreSQL advisory lock coordination
Redis optional (rate limiting backend, not required)
pycopg async connection pooling (min 4, max 20)
Load balancer compatible with graceful shutdown
Performance Optimizations
Async DB: pycopg
AsyncPooledDatabasefor non-blocking queries in async handlersN+1 elimination:
get_circle_members_full()single JOIN replaces 2N+1 queriesBounded caches:
BoundedLRUDictwith configurable max size and LRU evictionEvent bus backpressure: Semaphore-limited concurrent handlers, time-windowed deduplication
Embedding caching
Connection pooling
Distributed Coordination
Problem
When running multiple GatheRing instances (horizontal scaling), several coordination challenges arise:
Task Distribution: Ensuring the same task isn’t executed by multiple instances
Distributed Locks: Preventing concurrent execution of exclusive operations
Event Consistency: Maintaining event ordering across instances
Current State
The current implementation uses in-memory coordination:
Event bus operates within a single process
Scheduler tracks running actions in-memory
No mechanism for cross-instance coordination
Proposed Solution: Redis-Based Coordination
Redis provides primitives suitable for distributed coordination:
1. Task Queue (Atomic Claiming)
Use Redis lists for task distribution with atomic BRPOP:
┌─────────────┐ BRPOP ┌─────────────┐
│ Instance A │ ◄──────────────│ Redis Queue │
└─────────────┘ └─────────────┘
│ │
│ LPUSH task │
│───────────────────────────►│
Prevents duplicate execution: Only one instance can claim a task
Fair distribution: Tasks are claimed in FIFO order
2. Distributed Locks
Use Redis SET NX with TTL for locking:
SET lock:pipeline:{id} {instance_id} NX EX 300
Use cases:
Pipeline execution locks (prevent parallel runs)
Agent session locks
Scheduler coordination
3. Event Synchronization
Use Redis PUBSUB for cross-instance events:
Instance A ──PUBLISH──► channel ──SUBSCRIBE──► Instance B
Use cases:
Task status updates
Agent availability
Circle membership changes
Implementation Priority
Phase 1: Add Redis dependency, implement distributed lock utility
Phase 2: Implement task queue for scheduler
Phase 3: Migrate event bus to Redis PUBSUB (optional, for high-scale)
Files to Modify
gathering/orchestration/coordination.py(new) - Coordination primitivesgathering/orchestration/scheduler.py- Use Redis queuegathering/orchestration/pipeline/executor.py- Use distributed locksgathering/orchestration/events.py- Optional Redis-backed EventBus
Configuration
REDIS_URL=redis://localhost:6379
COORDINATION_ENABLED=true
Security Considerations
Redis connection should use password authentication in production
Lock TTLs should be long enough to handle longest operation
Consider Redis Cluster for high availability
Configuration
Environment Variables
# Database
DATABASE_URL=postgresql://...
REDIS_URL=redis://...
# AI Providers
ANTHROPIC_API_KEY=sk-ant-...
OPENAI_API_KEY=sk-...
# Server
API_HOST=0.0.0.0
API_PORT=8000
# Features
ENABLE_REDIS_CACHE=true
ENABLE_WEBSOCKET=true
Configuration Files
pyproject.toml: Python project configpytest.ini: Test configuration.env: Environment variablesdashboard/vite.config.ts: Frontend config