Concurrency
Concurrency Philosophy
Section titled “Concurrency Philosophy”Core Principles
Section titled “Core Principles”| Principle | Implementation | Benefit |
|---|---|---|
| Provider Isolation | Independent worker pools per provider | Fault tolerance, no cascade failures |
| Channel-Based Communication | Go channels for all async operations | Type-safe, deadlock-free communication |
| Resource Pooling | Object pools with lifecycle management | Predictable memory usage, minimal GC |
| Non-Blocking Operations | Async processing throughout pipeline | Maximum concurrency, no blocking waits |
| Backpressure Handling | Configurable buffers and flow control | Graceful degradation under load |
Threading Architecture Overview
Section titled “Threading Architecture Overview”graph TB subgraph "Main Thread" Main[Main Process<br/>HTTP Server] Router[Request Router<br/>Goroutine] PluginMgr[Plugin Manager<br/>Goroutine] end
subgraph "Provider Worker Pools" subgraph "OpenAI Pool" OAI1[Worker 1<br/>Goroutine] OAI2[Worker 2<br/>Goroutine] OAIN[Worker N<br/>Goroutine] end subgraph "Anthropic Pool" ANT1[Worker 1<br/>Goroutine] ANT2[Worker 2<br/>Goroutine] ANTN[Worker N<br/>Goroutine] end subgraph "Bedrock Pool" BED1[Worker 1<br/>Goroutine] BED2[Worker 2<br/>Goroutine] BEDN[Worker N<br/>Goroutine] end end
subgraph "Memory Pools" ChannelPool[Channel Pool<br/>sync.Pool] MessagePool[Message Pool<br/>sync.Pool] ResponsePool[Response Pool<br/>sync.Pool] end
Main --> Router Router --> PluginMgr PluginMgr --> OAI1 PluginMgr --> ANT1 PluginMgr --> BED1
OAI1 --> ChannelPool ANT1 --> MessagePool BED1 --> ResponsePoolWorker Pool Architecture
Section titled “Worker Pool Architecture”Provider-Isolated Worker Pools
Section titled “Provider-Isolated Worker Pools”stateDiagram-v2 [*] --> PoolInit: Worker Pool Creation PoolInit --> WorkerSpawn: Spawn Worker Goroutines WorkerSpawn --> Listening: Workers Listen on Channels
Listening --> Processing: Job Received Processing --> API_Call: Provider API Request API_Call --> Response: Process Response Response --> Listening: Job Complete
Listening --> Shutdown: Graceful Shutdown Processing --> Shutdown: Complete Current Job Shutdown --> [*]: Pool DestroyedWorker Pool Architecture:
The worker pool system maintains a sophisticated balance between resource efficiency and performance isolation:
Key Components:
- Worker Pool Management - Pre-spawned workers reduce startup latency
- Job Queue System - Buffered channels provide smooth load balancing
- Resource Pools - HTTP clients and API keys are pooled for efficiency
- Health Monitoring - Circuit breakers detect and isolate failing providers
- Graceful Shutdown - Workers complete current jobs before terminating
Startup Process:
- Worker Pre-spawning - Workers are created during pool initialization
- Channel Setup - Job queues and worker channels are established
- Resource Allocation - HTTP clients and API keys are distributed
- Health Checks - Initial connectivity tests verify provider availability
- Ready State - Pool becomes available for request processing
Job Dispatch Logic:
- Round-Robin Assignment - Jobs are distributed evenly across available workers
- Load Balancing - Worker availability determines job assignment
- Overflow Handling - Excess jobs are queued or dropped based on configuration
Worker Lifecycle Management
Section titled “Worker Lifecycle Management”sequenceDiagram participant Pool participant Worker participant HTTPClient participant Provider participant Metrics
Pool->>Worker: Start() Worker->>Worker: Initialize HTTP Client Worker->>Pool: Ready Signal
loop Job Processing Pool->>Worker: Job Assignment Worker->>HTTPClient: Prepare Request HTTPClient->>Provider: API Call Provider-->>HTTPClient: Response HTTPClient-->>Worker: Parsed Response Worker->>Metrics: Record Performance Worker->>Pool: Job Complete end
Pool->>Worker: Shutdown Signal Worker->>Worker: Complete Current Job Worker-->>Pool: Shutdown ConfirmedChannel-Based Communication
Section titled “Channel-Based Communication”Channel Architecture
Section titled “Channel Architecture”graph TB subgraph "Channel Types" JobQueue[Job Queue<br/>Buffered Channel] WorkerPool[Worker Pool<br/>Buffered Channel] ResultChan[Result Channel<br/>Buffered Channel] QuitChan[Quit Channel<br/>Unbuffered] end
subgraph "Flow Control" BackPressure[Backpressure<br/>Buffer Limits] Timeout[Timeout<br/>Context Cancellation] Graceful[Graceful Shutdown<br/>Channel Closing] end
JobQueue --> BackPressure WorkerPool --> Timeout ResultChan --> GracefulChannel Configuration Principles:
DeepIntShield’s channel system balances throughput and memory usage through careful buffer sizing:
Job Queuing Configuration:
- Job Queue Buffer - Sized based on expected burst traffic (100-1000 jobs)
- Worker Pool Size - Matches provider concurrency limits (10-100 workers)
- Result Buffer - Accommodates response processing delays (50-500 responses)
Flow Control Parameters:
- Queue Wait Limits - Maximum time jobs wait before timeout (1-10 seconds)
- Processing Timeouts - Per-job execution limits (30-300 seconds)
- Shutdown Timeouts - Graceful termination periods (5-30 seconds)
Backpressure Policies:
- Drop Policy - Discard excess jobs when queues are full
- Block Policy - Wait for queue space with timeout
- Error Policy - Immediately return error for full queues
Channel Type Selection:
- Buffered Channels - Used for async job processing and result handling
- Unbuffered Channels - Used for synchronization signals (quit, done)
- Context Cancellation - Used for timeout and cancellation propagation
Backpressure and Flow Control
Section titled “Backpressure and Flow Control”flowchart TD Request[Incoming Request] --> QueueCheck{Queue Full?} QueueCheck -->|No| Queue[Add to Queue] QueueCheck -->|Yes| Policy{Drop Policy?}
Policy -->|Drop| Drop[Drop Request<br/>Return Error] Policy -->|Block| Block[Block Until Space<br/>With Timeout] Policy -->|Error| Error[Return Queue Full Error]
Queue --> Worker[Assign to Worker] Block --> TimeoutCheck{Timeout?} TimeoutCheck -->|Yes| Error TimeoutCheck -->|No| Queue
Worker --> Processing[Process Request] Processing --> Complete[Complete]
Drop --> Client[Client Response] Error --> Client Complete --> ClientBackpressure Implementation Strategy:
The backpressure system protects DeepIntShield from being overwhelmed while maintaining service availability:
Non-Blocking Job Submission:
- Immediate Queue Check - Jobs are submitted without blocking on queue space
- Success Path - Available queue space allows immediate job acceptance
- Overflow Detection - Full queues trigger backpressure policies
- Metrics Collection - All queue operations are tracked for monitoring
Backpressure Policy Execution:
- Drop Policy - Immediately rejects excess jobs with meaningful error messages
- Block Policy - Waits for queue space with configurable timeout limits
- Error Policy - Returns queue full errors for immediate client feedback
- Metrics Tracking - Dropped, blocked, and successful submissions are measured
Timeout Management:
- Context-Based Timeouts - All blocking operations respect timeout boundaries
- Graceful Degradation - Timeouts result in controlled error responses
- Resource Protection - Prevents goroutine leaks from infinite waits
case pool.jobQueue <- job: pool.metrics.IncQueuedJobs() return nil case <-ctx.Done(): pool.metrics.IncTimeoutJobs() return errors.New("queue full, timeout waiting") }
case "error": pool.metrics.IncRejectedJobs() return errors.New("queue full, job rejected")
default: return errors.New("unknown queue policy") } } }Memory Pool Concurrency
Section titled “Memory Pool Concurrency”Thread-Safe Object Pools
Section titled “Thread-Safe Object Pools”graph TD subgraph "sync.Pool Lifecycle" direction LR GetObject[Get Object<br/>sync.Pool.Get] PoolCheck{Is Pool Empty?} NewObject[New Object<br/>Factory Function] UseObject[Use Object<br/>Application Logic] ResetObject[Reset Object<br/>Clear State] ReturnObject[Return Object<br/>sync.Pool.Put]
GetObject --> PoolCheck PoolCheck -- Yes --> NewObject PoolCheck -- No --> UseObject NewObject --> UseObject UseObject --> ResetObject ResetObject --> ReturnObject ReturnObject --> GetObject end
subgraph "GC Interaction" direction TB GCRun[GC Runs] PoolCleanup[Pool Cleanup<br>Removes idle objects]
GCRun --> PoolCleanup endThread-Safe Pool Architecture:
DeepIntShield’s memory pool system ensures thread-safe object reuse across multiple goroutines:
Pool Structure Design:
- Multiple Pool Types - Separate pools for channels, messages, responses, and buffers
- Factory Functions - Dynamic object creation when pools are empty
- Statistics Tracking - Comprehensive metrics for pool performance monitoring
- Thread Safety - Synchronized access using Go’s sync.Pool and read-write mutexes
Object Lifecycle Management:
- Pool Initialization - Factory functions define object creation patterns
- Unique Identification - Each pooled object gets a unique ID for tracking
- Timestamp Tracking - Creation, acquisition, and return times are recorded
- Reusability Flags - Objects can be marked as non-reusable for single-use scenarios
Acquisition Strategy:
- Request Tracking - All pool requests are counted for monitoring
- Hit/Miss Tracking - Pool effectiveness is measured through hit ratios
- Fallback Creation - New objects are created when pools are empty
- Performance Metrics - Acquisition times and patterns are monitored
Return and Reset Process:
- State Validation - Only reusable objects are returned to pools
- Object Reset - All object state is cleared before returning to pool
- Return Tracking - Return operations are counted and timed
- Pool Replenishment - Returned objects become available for reuse
Pool Performance Monitoring
Section titled “Pool Performance Monitoring”Comprehensive metrics provide insights into pool efficiency and system health:
Usage Statistics Collection:
- Request Counting - Track total pool requests by object type
- Creation Tracking - Monitor new object allocations when pools are empty
- Hit/Miss Ratios - Measure pool effectiveness through reuse rates
- Return Monitoring - Track successful object returns to pools
Performance Metrics Analysis:
- Acquisition Times - Measure how long it takes to get objects from pools
- Reset Performance - Track time spent cleaning objects for reuse
- Hit Ratio Calculation - Determine percentage of requests served from pools
- Memory Efficiency - Calculate memory savings from object reuse
Key Performance Indicators:
- Channel Pool Hit Ratio - Typically 85-95% in steady state
- Message Pool Efficiency - Usually 80-90% reuse rate
- Response Pool Utilization - Often 70-85% hit ratio
- Total Memory Savings - Measured reduction in garbage collection pressure
Monitoring Integration:
- Thread-Safe Access - All metrics collection is synchronized
- Real-Time Updates - Statistics are updated with each pool operation
- Export Capability - Metrics are available in JSON format for monitoring systems
- Alerting Support - Low hit ratios can trigger performance alerts
Goroutine Management
Section titled “Goroutine Management”Goroutine Lifecycle Patterns
Section titled “Goroutine Lifecycle Patterns”stateDiagram-v2 [*] --> Created: go routine() Created --> Running: Execute Function Running --> Waiting: Channel/Mutex Block Waiting --> Running: Unblocked Running --> Syscall: Network I/O Syscall --> Running: I/O Complete Running --> GCAssist: GC Triggered GCAssist --> Running: GC Complete Running --> Terminated: Function Exit Terminated --> [*]: CleanupGoroutine Pool Management Strategy:
DeepIntShield’s goroutine management ensures optimal resource usage while preventing goroutine leaks:
Pool Configuration Management:
- Goroutine Limits - Maximum concurrent goroutines prevent resource exhaustion
- Active Counting - Atomic counters track currently running goroutines
- Idle Timeouts - Unused goroutines are cleaned up after configured periods
- Resource Boundaries - Hard limits prevent runaway goroutine creation
Lifecycle Orchestration:
- Spawn Channels - New goroutine creation is tracked through channels
- Completion Monitoring - Finished goroutines signal completion for cleanup
- Shutdown Coordination - Graceful shutdown ensures all goroutines complete properly
- Health Monitoring - Continuous monitoring tracks goroutine health and performance
Worker Creation Process:
- Limit Enforcement - Creation fails when maximum goroutine count is reached
- Unique Identification - Each goroutine gets a unique ID for tracking and debugging
- Lifecycle Tracking - Start times and names enable performance analysis
- Atomic Operations - Thread-safe counters prevent race conditions
Panic Recovery and Error Handling:
- Panic Isolation - Goroutine panics don’t crash the entire system
- Error Logging - Panic details are logged with goroutine context
- Metrics Updates - Panic counts are tracked for monitoring and alerting
- Resource Cleanup - Failed goroutines are properly cleaned up and counted
Health Monitoring System:
- Periodic Health Checks - Regular intervals check goroutine pool health
- Completion Tracking - Finished goroutines are recorded for performance analysis
- Shutdown Handling - Clean shutdown process ensures no goroutine leaks
Resource Leak Prevention
Section titled “Resource Leak Prevention”flowchart TD GoroutineStart[Goroutine Start] --> ResourceCheck[Resource Allocation Check] ResourceCheck --> Timeout[Set Timeout Context] Timeout --> Work[Execute Work]
Work --> Complete{Work Complete?} Complete -->|Yes| Cleanup[Cleanup Resources] Complete -->|No| TimeoutCheck{Timeout?}
TimeoutCheck -->|Yes| ForceCleanup[Force Cleanup] TimeoutCheck -->|No| Work
Cleanup --> Return[Return Resources to Pool] ForceCleanup --> Return Return --> End[Goroutine End]Resource Leak Prevention:
func (worker *Worker) ExecuteWithCleanup(job *Job) { // Set timeout context ctx, cancel := context.WithTimeout( context.Background(), worker.config.ProcessTimeout, ) defer cancel()
// Acquire resources with timeout resources, err := worker.acquireResources(ctx) if err != nil { job.resultChan <- &Result{Error: err} return }
// Ensure cleanup happens defer func() { // Always return resources worker.returnResources(resources)
// Handle panics if r := recover(); r != nil { worker.metrics.IncPanics() job.resultChan <- &Result{ Error: fmt.Errorf("worker panic: %v", r), } } }()
// Execute job with context result := worker.processJob(ctx, job, resources)
// Return result select { case job.resultChan <- result: // Success case <-ctx.Done(): // Timeout - result channel might be closed worker.metrics.IncTimeouts() }}Concurrency Optimization Strategies
Section titled “Concurrency Optimization Strategies”Load-Based Worker Scaling (Planned)
Section titled “Load-Based Worker Scaling (Planned)”graph TB subgraph "Load Monitoring" QueueDepth[Queue Depth<br/>Monitoring] ResponseTime[Response Time<br/>Tracking] WorkerUtil[Worker Utilization<br/>Metrics] end
subgraph "Scaling Decisions" ScaleUp{Scale Up?<br/>Load > 80%} ScaleDown{Scale Down?<br/>Load < 30%} Maintain[Maintain<br/>Current Size] end
subgraph "Actions" AddWorkers[Spawn Additional<br/>Workers] RemoveWorkers[Graceful Worker<br/>Shutdown] NoAction[No Action<br/>Monitor Continue] end
QueueDepth --> ScaleUp ResponseTime --> ScaleUp WorkerUtil --> ScaleDown
ScaleUp -->|Yes| AddWorkers ScaleUp -->|No| ScaleDown ScaleDown -->|Yes| RemoveWorkers ScaleDown -->|No| Maintain
Maintain --> NoActionAdaptive Scaling Implementation:
type AdaptiveScaler struct { pool *ProviderWorkerPool config ScalingConfig metrics *ScalingMetrics lastScaleTime time.Time scalingMutex sync.Mutex}
func (scaler *AdaptiveScaler) EvaluateScaling() { scaler.scalingMutex.Lock() defer scaler.scalingMutex.Unlock()
// Prevent frequent scaling if time.Since(scaler.lastScaleTime) < scaler.config.MinScaleInterval { return }
current := scaler.getCurrentMetrics()
// Scale up conditions if current.QueueUtilization > scaler.config.ScaleUpThreshold || current.AvgResponseTime > scaler.config.MaxResponseTime {
scaler.scaleUp(current) return }
// Scale down conditions if current.QueueUtilization < scaler.config.ScaleDownThreshold && current.AvgResponseTime < scaler.config.TargetResponseTime {
scaler.scaleDown(current) return }}
func (scaler *AdaptiveScaler) scaleUp(metrics *CurrentMetrics) { currentWorkers := scaler.pool.GetWorkerCount() targetWorkers := int(float64(currentWorkers) * scaler.config.ScaleUpFactor)
// Respect maximum limits if targetWorkers > scaler.config.MaxWorkers { targetWorkers = scaler.config.MaxWorkers }
additionalWorkers := targetWorkers - currentWorkers if additionalWorkers > 0 { scaler.pool.AddWorkers(additionalWorkers) scaler.lastScaleTime = time.Now() scaler.metrics.RecordScaleUp(additionalWorkers) }}Provider-Specific Optimization
Section titled “Provider-Specific Optimization”type ProviderOptimization struct { // Provider characteristics ProviderName string `json:"provider_name"` RateLimit int `json:"rate_limit"` // Requests per second AvgLatency time.Duration `json:"avg_latency"` // Average response time ErrorRate float64 `json:"error_rate"` // Historical error rate
// Optimal configuration OptimalWorkers int `json:"optimal_workers"` OptimalBuffer int `json:"optimal_buffer"` TimeoutConfig time.Duration `json:"timeout_config"` RetryStrategy RetryConfig `json:"retry_strategy"`}
func CalculateOptimalConcurrency(provider ProviderOptimization) ConcurrencyConfig { // Calculate based on rate limits and latency optimalWorkers := provider.RateLimit * int(provider.AvgLatency.Seconds())
// Adjust for error rate (more workers for higher error rate) errorAdjustment := 1.0 + provider.ErrorRate optimalWorkers = int(float64(optimalWorkers) * errorAdjustment)
// Buffer should be 2-3x worker count for smooth operation optimalBuffer := optimalWorkers * 3
return ConcurrencyConfig{ Concurrency: optimalWorkers, BufferSize: optimalBuffer, Timeout: provider.AvgLatency * 2, // 2x avg latency for timeout }}Concurrency Monitoring & Metrics
Section titled “Concurrency Monitoring & Metrics”Key Concurrency Metrics
Section titled “Key Concurrency Metrics”graph TB subgraph "Worker Metrics" ActiveWorkers[Active Workers<br/>Current Count] IdleWorkers[Idle Workers<br/>Available Count] BusyWorkers[Busy Workers<br/>Processing Count] end
subgraph "Queue Metrics" QueueDepth[Queue Depth<br/>Pending Jobs] QueueThroughput[Queue Throughput<br/>Jobs/Second] QueueWaitTime[Queue Wait Time<br/>Average Delay] end
subgraph "Performance Metrics" GoroutineCount[Goroutine Count<br/>Total Active] MemoryUsage[Memory Usage<br/>Pool Utilization] GCPressure[GC Pressure<br/>Collection Frequency] end
subgraph "Health Metrics" ErrorRate[Error Rate<br/>Failed Jobs %] PanicCount[Panic Count<br/>Crashed Goroutines] DeadlockDetection[Deadlock Detection<br/>Blocked Operations] endMetrics Collection Strategy:
Comprehensive concurrency monitoring provides operational insights and performance optimization data:
Worker Pool Monitoring:
- Total Worker Tracking - Monitor configured vs actual worker counts
- Active Worker Monitoring - Track workers currently processing requests
- Idle Worker Analysis - Identify unused capacity and optimization opportunities
- Queue Depth Monitoring - Track pending job backlog and processing delays
Performance Data Collection:
- Throughput Metrics - Measure jobs processed per second across all pools
- Wait Time Analysis - Track how long jobs wait in queues before processing
- Memory Pool Performance - Monitor hit/miss ratios for memory pool effectiveness
- Goroutine Count Tracking - Ensure goroutine counts remain within healthy limits
Health and Reliability Metrics:
- Panic Recovery Tracking - Count and analyze worker panic occurrences
- Timeout Monitoring - Track jobs that exceed processing time limits
- Circuit Breaker Events - Monitor provider isolation events and recoveries
- Error Rate Analysis - Track failure patterns for capacity planning
Real-Time Updates:
- Live Metric Updates - Worker metrics are updated continuously during operation
- Processing Event Recording - Each job completion updates relevant metrics
- Performance Correlation - Queue times and processing times are correlated for analysis
- Success/Failure Tracking - All job outcomes are recorded for reliability analysis
Deadlock Prevention & Detection
Section titled “Deadlock Prevention & Detection”Deadlock Prevention Strategies
Section titled “Deadlock Prevention Strategies”flowchart TD Strategy1[Lock Ordering<br/>Consistent Acquisition] Strategy2[Timeout-Based Locks<br/>Context Cancellation] Strategy3[Channel Select<br/>Non-blocking Operations] Strategy4[Resource Hierarchy<br/>Layered Locking]
Prevention[Deadlock Prevention<br/>Design Patterns]
Prevention --> Strategy1 Prevention --> Strategy2 Prevention --> Strategy3 Prevention --> Strategy4
Strategy1 --> Success[No Deadlocks<br/>Guaranteed Order] Strategy2 --> Success Strategy3 --> Success Strategy4 --> SuccessDeadlock Prevention Implementation Strategy:
DeepIntShield employs multiple complementary strategies to prevent deadlocks in concurrent operations:
Lock Ordering Management:
- Consistent Acquisition Order - All locks are acquired in a predetermined order
- Global Lock Registry - Centralized registry maintains lock ordering relationships
- Order Enforcement - Lock acquisition automatically sorts by predetermined order
- Dependency Tracking - Lock dependencies are mapped to prevent circular waits
Timeout-Based Protection:
- Default Timeouts - All lock acquisitions have reasonable timeout limits
- Context Cancellation - Operations respect context cancellation for cleanup
- Maximum Timeout Limits - Upper bounds prevent indefinite blocking
- Graceful Timeout Handling - Timeout errors provide meaningful context
Multi-Lock Acquisition Process:
- Ordered Sorting - Multiple locks are sorted before acquisition attempts
- Progressive Acquisition - Locks are acquired one by one in sorted order
- Failure Recovery - Failed acquisitions trigger automatic cleanup of held locks
- Resource Tracking - All acquired locks are tracked for proper release
Lock Acquisition Safety:
- Non-Blocking Detection - Channel-based lock attempts prevent indefinite blocking
- Timeout Enforcement - All lock attempts respect configured timeout limits
- Error Propagation - Lock failures are properly propagated with context
- Cleanup Guarantees - Failed operations always clean up partially acquired resources
Deadlock Detection and Recovery:
- Active Monitoring - Continuous monitoring for potential deadlock conditions
- Automatic Recovery - Detected deadlocks trigger automatic resolution procedures
- Resource Release - Deadlock resolution involves strategic resource release
- Prevention Learning - Deadlock patterns inform prevention strategy improvements
Related Architecture Documentation
Section titled “Related Architecture Documentation”- Request Flow - How concurrency fits in request processing
- Benchmarks - Concurrency performance characteristics
- Plugin System - Plugin concurrency considerations
- MCP System - MCP concurrency and worker integration
Usage Documentation
Section titled “Usage Documentation”- Provider Configuration - Configure concurrency settings per provider
- Performance Analysis - Memory pool configuration and optimization
- Performance Monitoring - Monitor concurrency metrics and health
- Go SDK Usage - Use DeepIntShield concurrency in Go applications
- Gateway Setup - Deploy DeepIntShield with optimal concurrency settings
🎯 Next Step: Understand how plugins integrate with the concurrency model in Plugin System.