Architecture

This document describes the architecture of GatheRing.

System Overview

┌─────────────────────────────────────────────────────────────────┐
│                         Dashboard (React)                        │
│   ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐           │
│   │  Agents  │ │ Circles  │ │ Workspace│ │ Calendar │           │
│   └──────────┘ └──────────┘ └──────────┘ └──────────┘           │
└─────────────────────────┬───────────────────────────────────────┘
                          │ HTTP/WebSocket
┌─────────────────────────┴───────────────────────────────────────┐
│                        FastAPI Backend                           │
│   ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐           │
│   │   API    │ │ WebSocket│ │ Event Bus│ │  Skills  │           │
│   │ Routers  │ │  Server  │ │ (In-Mem) │ │  System  │           │
│   └──────────┘ └──────────┘ └──────────┘ └──────────┘           │
└─────────────────────────┬───────────────────────────────────────┘
                          │
┌─────────────────────────┴───────────────────────────────────────┐
│                        Core Services                             │
│   ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐           │
│   │  Agent   │ │  Circle  │ │  Memory  │ │Conversation│          │
│   │ Service  │ │ Service  │ │ Service  │ │  Service  │           │
│   └──────────┘ └──────────┘ └──────────┘ └──────────┘           │
└─────────────────────────┬───────────────────────────────────────┘
                          │
┌─────────────────────────┴───────────────────────────────────────┐
│                       Data Layer                                 │
│   ┌────────────────┐ ┌────────────────┐ ┌────────────────┐      │
│   │   PostgreSQL   │ │  Redis (opt.)  │ │  File System   │      │
│   │ + pgvector     │ │ Rate limit bk  │ │  (Workspace)   │      │
│   │ + adv. locks   │ │                │ │                │      │
│   └────────────────┘ └────────────────┘ └────────────────┘      │
└─────────────────────────────────────────────────────────────────┘

Directory Structure

gathering/
├── gathering/              # Python backend
│   ├── api/               # FastAPI routers
│   ├── core/              # Core domain logic
│   ├── agents/            # Agent implementations
│   ├── circles/           # Circle management
│   ├── memory/            # Memory systems
│   ├── skills/            # Skills framework
│   ├── tools/             # Agent tools
│   ├── db/                # Database layer
│   └── websocket/         # WebSocket handlers
├── dashboard/             # React frontend
│   ├── src/
│   │   ├── components/    # React components
│   │   ├── pages/         # Page components
│   │   ├── stores/        # Zustand stores
│   │   ├── api/           # API client
│   │   └── lib/           # Utilities
│   └── public/
├── docs/                  # Documentation
├── tests/                 # Test suite
├── personas/              # Agent personas
└── scripts/               # Utility scripts

Core Components

Agent System

Agents are the primary actors in GatheRing.

class Agent:
    """Core agent class."""
    id: int
    name: str
    role: str
    provider: str  # openai, anthropic, ollama
    model: str     # provider-specific model name
    personality: PersonalityConfig
    memory: MemorySystem
    tools: list[Tool]

Key concepts:

  • Personality: Traits that influence behavior

  • Memory: Short-term and long-term storage

  • Tools: Capabilities the agent can use

  • Sessions: Track agent activity

Circle System

Circles organize agents into collaborative groups.

class Circle:
    """Circle for agent collaboration."""
    id: int
    name: str
    agents: list[Agent]
    config: CircleConfig
    state: CircleState

Features:

  • Agent membership management

  • Conversation hosting

  • Shared memory scope

  • Configurable conversation modes

Memory System

Multi-layered memory architecture:

┌─────────────────────────────────────┐
│         Semantic Search             │
│        (pgvector + RAG)             │
├─────────────────────────────────────┤
│         Long-term Memory            │
│      (PostgreSQL + embeddings)      │
├─────────────────────────────────────┤
│         Short-term Memory           │
│       (Session context)             │
└─────────────────────────────────────┘

Memory scopes:

  • global: Shared across all agents

  • circle: Shared within a circle

  • agent: Private to an agent

  • conversation: Specific to a conversation

Pipeline Engine

DAG-based workflow execution with topological traversal:

┌─────────┐     ┌─────────┐     ┌─────────┐
│  Start  │────>│ Agent A │────>│  End    │
└─────────┘     └────┬────┘     └─────────┘
                     │               ▲
                     ▼               │
                ┌─────────┐    ┌─────────┐
                │Condition│───>│ Agent B │
                └─────────┘    └─────────┘

Key components:

  • PipelineExecutor: Traverses DAG in topological order via graphlib.TopologicalSorter

  • Node dispatchers: 6 types – agent, condition, action, transform, filter, merge

  • CircuitBreaker: Per-node with configurable failure threshold

  • Retry: Exponential backoff via tenacity, only retries NodeExecutionError

  • PipelineRunManager: Tracks active runs, enforces timeout (asyncio.timeout), handles cancellation (cooperative then forced)

  • Validation: Rejects cyclic graphs, enforces node schema before execution

Scheduler

Cron-based action dispatch with crash recovery:

  • Action types: run_task, execute_pipeline, send_notification, call_api

  • Crash recovery: Detects missed runs on startup, deduplicates via execution history

  • Advisory locks: Multi-instance coordination via pg_try_advisory_xact_lock

  • Dispatcher functions: Module-level async functions, not methods, for clean testability

Event System

Publish-subscribe event bus (in-memory with backpressure):

# Publishing events
await event_bus.publish("agent.message", {
    "agent_id": 1,
    "content": "Hello!"
})

# Subscribing to events
@event_bus.subscribe("agent.message")
async def handle_message(event):
    print(f"Agent said: {event['content']}")

Event types:

  • agent.*: Agent lifecycle events

  • circle.*: Circle events

  • conversation.*: Conversation events

  • workspace.*: Workspace events

API Layer

REST Endpoints

Organized by domain:

/agents           # Agent CRUD
/circles          # Circle management
/conversations    # Conversation handling
/workspace        # Workspace operations
/memory           # Memory access

WebSocket Channels

Real-time communication:

/ws/circles/{name}     # Circle events
/ws/workspace/{id}     # Workspace updates
/ws/agents/{id}        # Agent streams

Message format:

{
  "type": "message",
  "data": {
    "agent_id": 1,
    "content": "Hello!",
    "timestamp": "2025-01-01T00:00:00Z"
  }
}

Database Schema

Core Tables

agent.agents           # Agent definitions
agent.sessions         # Agent sessions
agent.tools            # Agent tools

circle.circles         # Circle definitions
circle.memberships     # Agent-circle relationships

conversation.conversations  # Conversations
conversation.turns          # Conversation turns
conversation.messages       # Messages

memory.memories        # Long-term memories
memory.embeddings_cache    # Cached embeddings
memory.knowledge_base      # Shared knowledge

project.projects       # Projects
project.tasks          # Tasks

Schemas

  • public: Common types and functions

  • agent: Agent-related tables

  • circle: Circle-related tables

  • conversation: Conversation tables

  • memory: Memory and RAG

  • project: Project management

Frontend Architecture

Tech Stack

  • React 18: UI framework

  • TypeScript: Type safety

  • Zustand: State management

  • TanStack Query: Data fetching

  • Tailwind CSS: Styling

  • Monaco Editor: Code editing

  • xterm.js: Terminal emulation

State Management

// Zustand store example
const useAgentStore = create((set) => ({
  agents: [],
  selectedAgent: null,

  setAgents: (agents) => set({ agents }),
  selectAgent: (agent) => set({ selectedAgent: agent }),
}));

Component Structure

src/
├── components/
│   ├── ui/            # Base UI components
│   ├── agents/        # Agent components
│   ├── circles/       # Circle components
│   └── workspace/     # Workspace components
├── pages/
│   ├── Dashboard.tsx
│   ├── AgentDetail.tsx
│   ├── CircleView.tsx
│   └── Workspace.tsx
└── stores/
    ├── agentStore.ts
    ├── circleStore.ts
    └── workspaceStore.ts

Security

Authentication

  • JWT tokens: PyJWT with HS256, DB-persisted token blacklist (write-through LRU cache + PostgreSQL)

  • Password hashing: Direct bcrypt (>= 4.0.0), constant-time comparisons to prevent timing attacks

  • Audit logging: Auth events (login, logout, token creation, failed attempts) logged to audit table

  • Token lifecycle: Creation, expiry, blacklist cleanup, concurrent use, refresh – all tested

API Security

  • Input validation with Pydantic

  • SQL injection prevention – all queries use parameterized statements via safe_update_builder

  • Path traversal defense – validate_file_path double-decodes URLs, blocks ../, %2e%2e/, symlink escape

  • Rate limiting – per-endpoint tiers (strict/standard/relaxed/bulk) via slowapi with 429 + Retry-After headers

  • CORS configuration

  • Structured logging (structlog) with JSON output and request correlation IDs

Agent Security

  • Tool permission system

  • JSON Schema validation on all tool parameters before execution

  • Filesystem sandboxing

  • Code execution isolation

  • Memory scope isolation

Scalability

Multi-Instance Support

  • Advisory locks: pg_try_advisory_xact_lock(namespace, action_id) prevents duplicate task execution across instances

  • Fail-closed: DB errors return False (skip execution) rather than risk duplicates

  • Graceful shutdown: Ordered teardown – stop accepting requests, drain in-flight, stop scheduler, close DB pool last

  • Readiness probe: /health/ready returns 503 during shutdown for load balancer integration

Horizontal Scaling

  • Stateless API servers with PostgreSQL advisory lock coordination

  • Redis optional (rate limiting backend, not required)

  • pycopg async connection pooling (min 4, max 20)

  • Load balancer compatible with graceful shutdown

Performance Optimizations

  • Async DB: pycopg AsyncPooledDatabase for non-blocking queries in async handlers

  • N+1 elimination: get_circle_members_full() single JOIN replaces 2N+1 queries

  • Bounded caches: BoundedLRUDict with configurable max size and LRU eviction

  • Event bus backpressure: Semaphore-limited concurrent handlers, time-windowed deduplication

  • Embedding caching

  • Connection pooling

Distributed Coordination

Problem

When running multiple GatheRing instances (horizontal scaling), several coordination challenges arise:

  • Task Distribution: Ensuring the same task isn’t executed by multiple instances

  • Distributed Locks: Preventing concurrent execution of exclusive operations

  • Event Consistency: Maintaining event ordering across instances

Current State

The current implementation uses in-memory coordination:

  • Event bus operates within a single process

  • Scheduler tracks running actions in-memory

  • No mechanism for cross-instance coordination

Proposed Solution: Redis-Based Coordination

Redis provides primitives suitable for distributed coordination:

1. Task Queue (Atomic Claiming)

Use Redis lists for task distribution with atomic BRPOP:

┌─────────────┐     BRPOP      ┌─────────────┐
│  Instance A │ ◄──────────────│ Redis Queue │
└─────────────┘                └─────────────┘
        │                            │
        │ LPUSH task                 │
        │───────────────────────────►│
  • Prevents duplicate execution: Only one instance can claim a task

  • Fair distribution: Tasks are claimed in FIFO order

2. Distributed Locks

Use Redis SET NX with TTL for locking:

SET lock:pipeline:{id} {instance_id} NX EX 300

Use cases:

  • Pipeline execution locks (prevent parallel runs)

  • Agent session locks

  • Scheduler coordination

3. Event Synchronization

Use Redis PUBSUB for cross-instance events:

Instance A ──PUBLISH──► channel ──SUBSCRIBE──► Instance B

Use cases:

  • Task status updates

  • Agent availability

  • Circle membership changes

Implementation Priority

  1. Phase 1: Add Redis dependency, implement distributed lock utility

  2. Phase 2: Implement task queue for scheduler

  3. Phase 3: Migrate event bus to Redis PUBSUB (optional, for high-scale)

Files to Modify

  • gathering/orchestration/coordination.py (new) - Coordination primitives

  • gathering/orchestration/scheduler.py - Use Redis queue

  • gathering/orchestration/pipeline/executor.py - Use distributed locks

  • gathering/orchestration/events.py - Optional Redis-backed EventBus

Configuration

REDIS_URL=redis://localhost:6379
COORDINATION_ENABLED=true

Security Considerations

  • Redis connection should use password authentication in production

  • Lock TTLs should be long enough to handle longest operation

  • Consider Redis Cluster for high availability

Configuration

Environment Variables

# Database
DATABASE_URL=postgresql://...
REDIS_URL=redis://...

# AI Providers
ANTHROPIC_API_KEY=sk-ant-...
OPENAI_API_KEY=sk-...

# Server
API_HOST=0.0.0.0
API_PORT=8000

# Features
ENABLE_REDIS_CACHE=true
ENABLE_WEBSOCKET=true

Configuration Files

  • pyproject.toml: Python project config

  • pytest.ini: Test configuration

  • .env: Environment variables

  • dashboard/vite.config.ts: Frontend config