Skip to content

Envoy ExtProc Integration

The Semantic Router leverages Envoy's External Processing (ExtProc) filter to implement intelligent routing decisions. This integration provides a clean separation between traffic management (Envoy) and business logic (Semantic Router), enabling sophisticated routing capabilities while maintaining high performance.

Understanding Envoy ExtProc

What is ExtProc?

External Processing (ExtProc) is an Envoy filter that allows external services to participate in request and response processing. Unlike other extension mechanisms, ExtProc provides:

  • Streaming Processing: Handle requests and responses as they flow through Envoy
  • Full Control: Modify headers, body, and routing decisions
  • Low Latency: Optimized gRPC communication between Envoy and external services
  • Fault Tolerance: Built-in failure handling and timeout management

ExtProc vs Other Extension Methods

Extension Method Use Case Latency Flexibility Complexity
HTTP Filters Simple transformations Lowest Limited Low
WebAssembly (WASM) Sandboxed logic Low Medium Medium
ExtProc Complex business logic Medium High Medium
HTTP Callouts External API calls High High High

Why ExtProc for Semantic Routing? - Complex ML Models: Need full Python/Go ecosystem for BERT models - Dynamic Decision Making: Requires sophisticated classification logic - State Management: Needs caching and request tracking - Observability: Requires comprehensive metrics and logging

ExtProc Protocol Architecture

Communication Flow

sequenceDiagram
    participant C as Client
    participant E as Envoy
    participant R as Router (ExtProc)

    C->>E: HTTP Request

    Note over E: Envoy processes request

    E->>R: ProcessingRequest (RequestHeaders)
    R->>E: ProcessingResponse (Continue/Modify)

    E->>R: ProcessingRequest (RequestBody)
    Note over R: Classification & Routing Logic
    R->>E: ProcessingResponse (HeaderMutation)

    Note over E: Route based on headers
    E->>Backend: Forward to Selected Model
    Backend->>E: Model Response

    E->>R: ProcessingRequest (ResponseHeaders)
    R->>E: ProcessingResponse (Continue/Modify)

    E->>R: ProcessingRequest (ResponseBody)
    Note over R: Response Processing & Caching
    R->>E: ProcessingResponse (BodyMutation)

    E->>C: Final Response

Processing Modes

ExtProc can be configured to process different parts of the request/response lifecycle:

# Envoy ExtProc Configuration
processing_mode:
  request_header_mode: "SEND"      # Process request headers
  response_header_mode: "SEND"     # Process response headers  
  request_body_mode: "BUFFERED"    # Process entire request body
  response_body_mode: "BUFFERED"   # Process entire response body
  request_trailer_mode: "SKIP"     # Skip request trailers
  response_trailer_mode: "SKIP"    # Skip response trailers

Mode Options: - SKIP: Don't send to ExtProc (fastest) - SEND: Send headers/trailers only - BUFFERED: Send entire body (required for content analysis) - STREAMED: Send body in chunks (for streaming)

Semantic Router ExtProc Implementation

Go Implementation Structure

// Main ExtProc Server
type Server struct {
    router *OpenAIRouter
    server *grpc.Server  
    port   int
}

// Router implements the ExtProc service interface
type OpenAIRouter struct {
    Config               *config.RouterConfig
    CategoryDescriptions []string
    Classifier           *classification.Classifier
    PIIChecker           *pii.PolicyChecker
    Cache                *cache.SemanticCache
    ToolsDatabase        *tools.ToolsDatabase

    pendingRequests     map[string][]byte
    pendingRequestsLock sync.Mutex
}

// Implements the ExtProc service interface
var _ ext_proc.ExternalProcessorServer = &OpenAIRouter{}

gRPC Service Implementation

// Process handles the bidirectional streaming RPC
func (r *OpenAIRouter) Process(stream ext_proc.ExternalProcessor_ProcessServer) error {
    log.Println("Started processing a new request")

    ctx := &RequestContext{
        Headers: make(map[string]string),
        RequestID: generateRequestID(),
    }

    for {
        // Receive request from Envoy
        req, err := stream.Recv()
        if err != nil {
            return r.handleStreamError(err)
        }

        // Process based on request type
        response, err := r.processRequest(ctx, req)
        if err != nil {
            return err
        }

        // Send response back to Envoy
        if err := stream.Send(response); err != nil {
            return err
        }
    }
}

Request Processing Pipeline

1. Request Headers Processing

func (r *OpenAIRouter) handleRequestHeaders(
    ctx *RequestContext, 
    headers *ext_proc.ProcessingRequest_RequestHeaders,
) (*ext_proc.ProcessingResponse, error) {

    // Extract and store headers
    for _, header := range headers.RequestHeaders.Headers.Headers {
        ctx.Headers[header.Key] = header.Value
    }

    // Extract request metadata
    ctx.Method = ctx.Headers[":method"]
    ctx.Path = ctx.Headers[":path"] 
    ctx.ContentType = ctx.Headers["content-type"]

    // Continue processing - we need the body for classification
    return &ext_proc.ProcessingResponse{
        Response: &ext_proc.ProcessingResponse_RequestHeaders_{
            RequestHeaders: &ext_proc.ProcessingResponse_RequestHeaders{},
        },
    }, nil
}

2. Request Body Processing (Core Logic)

func (r *OpenAIRouter) handleRequestBody(
    ctx *RequestContext,
    body *ext_proc.ProcessingRequest_RequestBody,
) (*ext_proc.ProcessingResponse, error) {

    // Extract request body
    requestBody := body.RequestBody.Body

    // Parse OpenAI API request
    var openAIRequest OpenAIRequest
    if err := json.Unmarshal(requestBody, &openAIRequest); err != nil {
        return nil, fmt.Errorf("failed to parse OpenAI request: %w", err)
    }

    // Extract user query from messages
    userQuery := extractUserQuery(openAIRequest.Messages)

    // Step 1: Check semantic cache
    if cachedResponse, found := r.Cache.Get(userQuery); found {
        return r.handleCacheHit(cachedResponse)
    }

    // Step 2: Security checks
    if blocked, reason := r.performSecurityChecks(userQuery); blocked {
        return r.handleSecurityBlock(reason)
    }

    // Step 3: Classify query intent  
    classification, err := r.Classifier.ClassifyIntent(userQuery)
    if err != nil {
        return nil, err
    }

    // Step 4: Select optimal model
    selectedEndpoint := r.selectModelEndpoint(classification)

    // Step 5: Auto-select relevant tools
    selectedTools := r.autoSelectTools(userQuery, openAIRequest.Tools)

    // Step 6: Modify request if needed
    modifiedRequest := r.modifyRequest(openAIRequest, selectedTools)
    modifiedBody, _ := json.Marshal(modifiedRequest)

    // Step 7: Set routing headers for Envoy
    headerMutations := []*core.HeaderValueOption{
        {
            Header: &core.HeaderValue{
                Key:   "x-gateway-destination-endpoint", 
                Value: selectedEndpoint,
            },
            Append: &wrapperspb.BoolValue{Value: false},
        },
        {
            Header: &core.HeaderValue{
                Key:   "x-selected-model",
                Value: classification.Category,
            },
            Append: &wrapperspb.BoolValue{Value: false},
        },
        {
            Header: &core.HeaderValue{
                Key:   "x-routing-confidence",
                Value: fmt.Sprintf("%.3f", classification.Confidence),
            },
            Append: &wrapperspb.BoolValue{Value: false},
        },
    }

    // Record routing decision for monitoring
    r.recordRoutingDecision(ctx, classification, selectedEndpoint)

    return &ext_proc.ProcessingResponse{
        Response: &ext_proc.ProcessingResponse_RequestBody_{
            RequestBody: &ext_proc.ProcessingResponse_RequestBody{
                Response: &ext_proc.BodyResponse{
                    BodyMutation: &ext_proc.BodyMutation{
                        Mutation: &ext_proc.BodyMutation_Body{
                            Body: modifiedBody,
                        },
                    },
                },
            },
        },
        ModeOverride: &ext_proc.ProcessingMode{
            RequestHeaderMode: ext_proc.ProcessingMode_SEND,
            ResponseHeaderMode: ext_proc.ProcessingMode_SEND,
        },
        DynamicMetadata: r.buildDynamicMetadata(classification),
    }, nil
}

3. Response Processing

func (r *OpenAIRouter) handleResponseBody(
    ctx *RequestContext,
    responseBody *ext_proc.ProcessingRequest_ResponseBody,
) (*ext_proc.ProcessingResponse, error) {

    // Parse model response
    var modelResponse OpenAIResponse
    if err := json.Unmarshal(responseBody.ResponseBody.Body, &modelResponse); err != nil {
        return nil, err
    }

    // Store in semantic cache for future requests
    if ctx.UserQuery != "" {
        r.Cache.Store(ctx.UserQuery, modelResponse, ctx.SelectedModel)
    }

    // Record response metrics
    r.recordResponseMetrics(ctx, modelResponse)

    // Add routing metadata to response
    modifiedResponse := r.addRoutingMetadata(modelResponse, ctx)
    modifiedBody, _ := json.Marshal(modifiedResponse)

    return &ext_proc.ProcessingResponse{
        Response: &ext_proc.ProcessingResponse_ResponseBody_{
            ResponseBody: &ext_proc.ProcessingResponse_ResponseBody{
                Response: &ext_proc.BodyResponse{
                    BodyMutation: &ext_proc.BodyMutation{
                        Mutation: &ext_proc.BodyMutation_Body{
                            Body: modifiedBody,
                        },
                    },
                },
            },
        },
    }, nil
}

Envoy Configuration for ExtProc

Complete Configuration Example

# config/envoy.yaml
static_resources:
  listeners:
  - name: listener_0
    address:
      socket_address:
        address: 0.0.0.0
        port_value: 8801
    filter_chains:
    - filters:
      - name: envoy.filters.network.http_connection_manager
        typed_config:
          "@type": type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager
          stat_prefix: ingress_http

          # Comprehensive access logging
          access_log:
          - name: envoy.access_loggers.stdout
            typed_config:
              "@type": type.googleapis.com/envoy.extensions.access_loggers.stream.v3.StdoutAccessLog
              log_format:
                json_format:
                  time: "%START_TIME%"
                  method: "%REQ(:METHOD)%"
                  path: "%REQ(X-ENVOY-ORIGINAL-PATH?:PATH)%"
                  response_code: "%RESPONSE_CODE%"
                  duration: "%DURATION%"
                  selected_model: "%REQ(X-SELECTED-MODEL)%"
                  selected_endpoint: "%REQ(X-GATEWAY-DESTINATION-ENDPOINT)%"
                  routing_confidence: "%REQ(X-ROUTING-CONFIDENCE)%"

          # Route configuration with dynamic routing
          route_config:
            name: local_route
            virtual_hosts:
            - name: local_service
              domains: ["*"]
              routes:
              # Dynamic routing based on ExtProc decisions
              - match:
                  prefix: "/"
                  headers:
                  - name: "x-gateway-destination-endpoint"
                    string_match:
                      exact: "endpoint1"
                route:
                  cluster: math_model_cluster
                  timeout: 300s
              - match:
                  prefix: "/"
                  headers:
                  - name: "x-gateway-destination-endpoint"  
                    string_match:
                      exact: "endpoint2"
                route:
                  cluster: creative_model_cluster
                  timeout: 300s
              - match:
                  prefix: "/"
                  headers:
                  - name: "x-gateway-destination-endpoint"
                    string_match:
                      exact: "endpoint3"
                route:
                  cluster: code_model_cluster
                  timeout: 300s
              # Fallback route
              - match:
                  prefix: "/"
                route:
                  cluster: general_model_cluster
                  timeout: 300s

          # HTTP filters chain
          http_filters:
          # ExtProc filter - MUST come before router filter
          - name: envoy.filters.http.ext_proc
            typed_config:
              "@type": type.googleapis.com/envoy.extensions.filters.http.ext_proc.v3.ExternalProcessor

              # gRPC service configuration
              grpc_service:
                envoy_grpc:
                  cluster_name: semantic_router_extproc
                timeout: 30s

              # Processing mode configuration  
              processing_mode:
                request_header_mode: "SEND"
                response_header_mode: "SEND"
                request_body_mode: "BUFFERED"     # Required for content analysis
                response_body_mode: "BUFFERED"    # Required for caching
                request_trailer_mode: "SKIP"
                response_trailer_mode: "SKIP"

              # Failure handling
              failure_mode_allow: true            # Continue on ExtProc failure
              allow_mode_override: true           # Allow ExtProc to change modes
              message_timeout: 300s               # Timeout for ExtProc responses
              max_message_timeout: 600s           # Maximum allowed timeout

              # Advanced configuration
              mutation_rules:
                allow_all_routing: true
                allow_envoy: true
                disallow_system: false
                disallow_x_forwarded: false

              # Stats configuration  
              stats_config:
                stats_matches:
                - name: "extproc_requests"
                  actions:
                  - name: "extproc_requests_total"
                    action: 
                      "@type": type.googleapis.com/envoy.extensions.filters.http.fault.v3.HTTPFault

          # Router filter - MUST come after ExtProc
          - name: envoy.filters.http.router
            typed_config:
              "@type": type.googleapis.com/envoy.extensions.filters.http.router.v3.Router
              suppress_envoy_headers: true

  # Backend model clusters
  clusters:
  # ExtProc service cluster
  - name: semantic_router_extproc
    connect_timeout: 5s
    type: STATIC
    lb_policy: ROUND_ROBIN
    typed_extension_protocol_options:
      envoy.extensions.upstreams.http.v3.HttpProtocolOptions:
        "@type": type.googleapis.com/envoy.extensions.upstreams.http.v3.HttpProtocolOptions
        explicit_http_config:
          http2_protocol_options:
            # Optimize for ExtProc communication
            connection_keepalive:
              interval: 30s
              timeout: 5s
            max_concurrent_streams: 1000
    load_assignment:
      cluster_name: semantic_router_extproc
      endpoints:
      - lb_endpoints:
        - endpoint:
            address:
              socket_address:
                address: 127.0.0.1
                port_value: 50051

    # Health checking for ExtProc
    health_checks:
    - timeout: 5s
      interval: 10s
      unhealthy_threshold: 3
      healthy_threshold: 2
      grpc_health_check:
        service_name: "semantic_router"

  # Model endpoint clusters
  - name: math_model_cluster
    connect_timeout: 30s
    type: STRICT_DNS
    lb_policy: ROUND_ROBIN
    load_assignment:
      cluster_name: math_model_cluster
      endpoints:
      - lb_endpoints:
        - endpoint:
            address:
              socket_address:
                address: 192.168.12.90
                port_value: 11434
    # Health checks for model endpoints
    health_checks:
    - timeout: 10s
      interval: 15s
      unhealthy_threshold: 3
      healthy_threshold: 2
      http_health_check:
        path: "/health"
        expected_statuses:
        - start: 200
          end: 299

  - name: creative_model_cluster
    # Similar configuration for creative model...

  - name: code_model_cluster  
    # Similar configuration for code model...

  - name: general_model_cluster
    # Similar configuration for general model...

Performance Optimization

Reducing ExtProc Latency

1. Connection Pooling and Keepalives

# Optimize gRPC connection to ExtProc
grpc_service:
  envoy_grpc:
    cluster_name: semantic_router_extproc
  timeout: 10s  # Reduced from default 30s

# Cluster optimization
typed_extension_protocol_options:
  envoy.extensions.upstreams.http.v3.HttpProtocolOptions:
    explicit_http_config:
      http2_protocol_options:
        connection_keepalive:
          interval: 30s      # Keep connections alive
          timeout: 5s
        max_concurrent_streams: 1000  # Allow many parallel requests

2. Selective Processing

# Only process what's necessary
processing_mode:
  request_header_mode: "SEND"        # Always needed for routing
  response_header_mode: "SKIP"       # Skip if not needed
  request_body_mode: "BUFFERED"      # Required for classification
  response_body_mode: "BUFFERED"     # Only if caching enabled

3. Failure Mode Configuration

# Fast failure handling
failure_mode_allow: true     # Don't block traffic on ExtProc failure
message_timeout: 30s         # Reasonable timeout
max_message_timeout: 60s     # Emergency timeout

Memory Management

Request Context Pooling

// Pool request contexts to reduce GC pressure
var requestContextPool = sync.Pool{
    New: func() interface{} {
        return &RequestContext{
            Headers: make(map[string]string, 10),
        }
    },
}

func (r *OpenAIRouter) Process(stream ext_proc.ExternalProcessor_ProcessServer) error {
    // Get context from pool
    ctx := requestContextPool.Get().(*RequestContext)
    defer func() {
        // Clean and return to pool
        ctx.Reset()
        requestContextPool.Put(ctx)
    }()

    // Process request...
}

Error Handling and Resilience

ExtProc Error Handling

func (r *OpenAIRouter) handleStreamError(err error) error {
    if err == io.EOF {
        log.Println("Stream ended gracefully")
        return nil
    }

    // Handle gRPC status-based errors
    if s, ok := status.FromError(err); ok {
        switch s.Code() {
        case codes.Canceled, codes.DeadlineExceeded:
            log.Println("Stream canceled gracefully")
            return nil
        case codes.Unavailable:
            log.Printf("ExtProc temporarily unavailable: %v", err)
            return err
        default:
            log.Printf("gRPC error: %v", err)
            return err
        }
    }

    // Handle context cancellation
    if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
        log.Println("Stream canceled gracefully")
        return nil
    }

    log.Printf("Unexpected error receiving request: %v", err)
    return err
}

Graceful Degradation

func (r *OpenAIRouter) handleClassificationFailure(
    query string, 
    err error,
) *RoutingDecision {
    log.Printf("Classification failed: %v, using fallback", err)

    // Increment failure metrics
    classificationFailures.Inc()

    // Return safe fallback decision
    return &RoutingDecision{
        Category:       "general",
        Confidence:     0.0,
        SelectedModel:  r.Config.DefaultModel,
        Fallback:       true,
        FailureReason:  err.Error(),
    }
}

Monitoring ExtProc Integration

Key Metrics to Track

// ExtProc-specific metrics
var (
    extprocRequestsTotal = prometheus.NewCounterVec(
        prometheus.CounterOpts{
            Name: "extproc_requests_total",
            Help: "Total ExtProc requests by type",
        },
        []string{"request_type", "status"},
    )

    extprocProcessingDuration = prometheus.NewHistogramVec(
        prometheus.HistogramOpts{
            Name: "extproc_processing_duration_seconds",
            Help: "Time spent processing ExtProc requests",
            Buckets: []float64{0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0},
        },
        []string{"request_type"},
    )

    extprocStreamErrors = prometheus.NewCounterVec(
        prometheus.CounterOpts{
            Name: "extproc_stream_errors_total", 
            Help: "Total ExtProc stream errors",
        },
        []string{"error_type"},
    )
)

Health Check Implementation

// Health check for ExtProc service
func (s *Server) Check(
    ctx context.Context, 
    req *grpc_health_v1.HealthCheckRequest,
) (*grpc_health_v1.HealthCheckResponse, error) {

    // Check classifier health
    if !s.router.Classifier.IsHealthy() {
        return &grpc_health_v1.HealthCheckResponse{
            Status: grpc_health_v1.HealthCheckResponse_NOT_SERVING,
        }, nil
    }

    // Check cache health
    if !s.router.Cache.IsHealthy() {
        return &grpc_health_v1.HealthCheckResponse{
            Status: grpc_health_v1.HealthCheckResponse_NOT_SERVING,
        }, nil
    }

    return &grpc_health_v1.HealthCheckResponse{
        Status: grpc_health_v1.HealthCheckResponse_SERVING,
    }, nil
}

The ExtProc integration provides a powerful foundation for implementing complex routing logic while maintaining high performance and reliability. Next, explore the Router Implementation to understand the detailed classification and routing algorithms.