Streaming
Overview
Section titled “Overview”The Streaming package (framework/streaming) is a core utility within DeepIntShield designed to handle real-time data streams from AI providers. It provides a robust and efficient mechanism for plugins like Logging, OTel, and Maxim to process, aggregate, and format streaming responses for chat completions, transcriptions, and other real-time AI interactions.
sequenceDiagram participant Plugin participant BC as DeepIntShield Core participant Accumulator
BC->>Plugin: PreLLMHook(StreamingRequest) activate Plugin Plugin->>Accumulator: CreateStreamAccumulator(requestID) activate Accumulator Accumulator-->>Plugin: ack deactivate Accumulator Plugin-->>BC: return deactivate Plugin
loop For each response chunk BC->>Plugin: PostLLMHook(StreamChunk) activate Plugin Plugin->>Accumulator: ProcessStreamingResponse(StreamChunk) activate Accumulator alt Is NOT Final Chunk Accumulator-->>Plugin: return {Type: Delta} else Is Final Chunk Accumulator->>Accumulator: buildCompleteResponse() Accumulator-->>Plugin: return {Type: Final, CompleteData} end deactivate Accumulator Plugin-->>BC: return deactivate Plugin endIts primary purpose is to simplify the complexity of handling chunked data, ensuring that plugins can work with complete, well-structured responses without needing to implement their own aggregation logic.
How It Works
Section titled “How It Works”The streaming package uses an Accumulator to manage the lifecycle of a streaming operation. This process is designed to be highly efficient, using sync.Pool to reuse objects and minimize memory allocations.
-
Initialization: When a plugin that needs to process streams (like
loggingorotel) is initialized, it creates a newstreaming.Accumulator. -
Stream Start: In the
PreLLMHookphase of a request, if the request is identified as a streaming type, the plugin callsaccumulator.CreateStreamAccumulator(requestID, timestamp)to prepare a dedicated buffer for the incoming chunks of that request. -
Chunk Processing: In the
PostLLMHookphase, as each chunk of the streaming response arrives, the plugin passes it toaccumulator.ProcessStreamingResponse().- For each
deltachunk, the accumulator appends it to the buffer associated with the request ID. - The accumulator handles different types of streams, including chat, audio, and transcriptions, using specialized logic to correctly piece together the data. For example, it accumulates text deltas, tool call argument deltas, and other parts of the message.
- For each
-
Finalization: When the final chunk of the stream is received (indicated by a
finish_reasonor other provider-specific signal),ProcessStreamingResponseperforms the final assembly.- It reconstructs the complete
ChatMessageor other response object from all the stored chunks. - It calculates total token usage, cost, and latency.
- It returns a
ProcessedStreamResponseobject withStreamResponseTypeFinaland the complete, structuredAccumulatedData.
- It reconstructs the complete
-
Cleanup: Once the final response is processed, the accumulator cleans up all buffered chunks for that request ID, returning them to the
sync.Poolfor reuse.
Key Components
Section titled “Key Components”Accumulator
Section titled “Accumulator”The central component of the package. It is a thread-safe manager that:
- Tracks stream chunks for multiple concurrent requests using a
sync.Map. - Uses
sync.Poolto recycle*StreamChunkobjects, reducing garbage collection overhead. - Provides methods to add chunks (
addChatStreamChunk,addAudioStreamChunk, etc.). - Includes a periodic cleanup worker to remove stale accumulators for incomplete or orphaned requests.
ProcessStreamingResponse
Section titled “ProcessStreamingResponse”This is the main entry point for plugins to process stream data. It inspects the response type and delegates to the appropriate handler:
processChatStreamingResponseprocessAudioStreamingResponseprocessTranscriptionStreamingResponseprocessResponsesStreamingResponse
It returns a ProcessedStreamResponse, which indicates whether the chunk is a delta or the final aggregated response.
Stream-Specific Builders
Section titled “Stream-Specific Builders”The package includes internal logic to correctly build complete messages from chunks. For example, buildCompleteMessageFromChatStreamChunks iterates through the collected ChatStreamChunk objects, appending content deltas and assembling tool calls into a final, coherent schemas.ChatMessage.
Usage Example
Section titled “Usage Example”The following snippet from the logging plugin shows how the streaming package is used in practice within a plugin’s PostLLMHook.
// In plugins/logging/main.go
func (p *LoggerPlugin) PostLLMHook(ctx *schemas.DeepIntShieldContext, result *schemas.DeepIntShieldResponse, bifrostErr *schemas.DeepIntShieldError) (*schemas.DeepIntShieldResponse, *schemas.DeepIntShieldError, error) { // ... setup, get requestID ...
go func() { // ... if deepintshield.IsStreamRequestType(requestType) { p.logger.Debug("[logging] processing streaming response")
// 1. Pass the response chunk to the accumulator streamResponse, err := p.accumulator.ProcessStreamingResponse(ctx, result, bifrostErr) if err != nil { p.logger.Error("failed to process streaming response: %v", err) // 2. Check if this is the final, aggregated response } else if streamResponse != nil && streamResponse.Type == streaming.StreamResponseTypeFinal { // Prepare final log data logMsg.Operation = LogOperationStreamUpdate logMsg.StreamResponse = streamResponse
// 3. Update the log entry with the complete data processingErr := retryOnNotFound(p.ctx, func() error { return p.updateStreamingLogEntry(p.ctx, logMsg.RequestID, logMsg.SemanticCacheDebug, logMsg.StreamResponse, true) })
// ... handle errors and callbacks ... } } // ... handle non-streaming responses ... }()
return result, bifrostErr, nil}This demonstrates how a plugin can remain agnostic to the details of stream aggregation and simply react to the final, complete data returned by the streaming package. This greatly simplifies plugin development and ensures consistent data handling across the framework.