trace2e_core/transport/
loopback.rs

1//! # Loopback Transport Implementation
2//!
3//! This module provides a loopback transport implementation for testing and
4//! development scenarios where multiple middleware instances need to communicate
5//! within a single process or test environment. It simulates network communication
6//! by routing M2M requests directly to local middleware instances.
7//!
8//! ## Features
9//!
10//! - **In-Process Communication**: Routes calls directly to registered middleware instances
11//! - **Network Simulation**: Supports configurable delays and jitter to simulate network latency
12//! - **Test Orchestration**: Provides utilities for spawning multiple middleware instances
13//! - **Resource Pre-enrollment**: Supports pre-populating middleware with test resources
14//!
15//! ## Use Cases
16//!
17//! - Unit and integration testing of distributed traceability scenarios
18//! - Development and debugging of multi-node workflows
19//! - Performance testing with controlled network conditions
20//! - Simulation of distributed systems in a single process
21//!
22//! ## Network Simulation
23//!
24//! The loopback transport can simulate network characteristics by introducing
25//! configurable delays and jitter to M2M calls, allowing testing of timeout
26//! handling and performance under various network conditions.
27
28use std::{
29    collections::VecDeque,
30    future::Future,
31    pin::Pin,
32    sync::Arc,
33    task::Poll,
34    time::{Duration, Instant},
35};
36
37use dashmap::DashMap;
38use tower::Service;
39
40use crate::{
41    traceability::{
42        M2mApiDefaultStack, O2mApiDefaultStack, P2mApiDefaultStack,
43        api::{M2mRequest, M2mResponse},
44        error::TraceabilityError,
45        init_middleware_with_enrolled_resources,
46    },
47    transport::eval_remote_ip,
48};
49
50/// Spawns multiple loopback middleware instances with no pre-enrolled resources.
51///
52/// Creates a set of middleware instances that can communicate with each other
53/// through the loopback transport. Each middleware is identified by an IP address
54/// for routing purposes.
55///
56/// # Arguments
57///
58/// * `ips` - Vector of IP addresses to assign to the middleware instances
59///
60/// # Returns
61///
62/// A queue of (P2M service, O2M service) tuples for each middleware instance.
63pub async fn spawn_loopback_middlewares(
64    ips: Vec<String>,
65) -> VecDeque<(P2mApiDefaultStack<M2mLoopback>, O2mApiDefaultStack)> {
66    spawn_loopback_middlewares_with_enrolled_resources(ips, 0, 0, 0).await
67}
68
69/// Spawns multiple loopback middleware instances with pre-enrolled test resources.
70///
71/// Creates middleware instances and pre-populates them with the specified number
72/// of processes, files, and network streams for testing purposes.
73///
74/// # Arguments
75///
76/// * `ips` - Vector of IP addresses for the middleware instances
77/// * `process_count` - Number of processes to enroll per middleware
78/// * `per_process_file_count` - Number of files to create per process
79/// * `per_process_stream_count` - Number of network streams to create per process
80pub async fn spawn_loopback_middlewares_with_enrolled_resources(
81    ips: Vec<String>,
82    process_count: u32,
83    per_process_file_count: u32,
84    per_process_stream_count: u32,
85) -> VecDeque<(P2mApiDefaultStack<M2mLoopback>, O2mApiDefaultStack)> {
86    spawn_loopback_middlewares_with_entropy(
87        ips,
88        0,
89        0,
90        process_count,
91        per_process_file_count,
92        per_process_stream_count,
93    )
94    .await
95}
96
97/// Spawns loopback middleware instances with network simulation and pre-enrolled resources.
98///
99/// Creates middleware instances with configurable network delay simulation and
100/// pre-populated test resources. This is the most comprehensive setup function
101/// for testing complex distributed scenarios.
102///
103/// # Arguments
104///
105/// * `ips` - Vector of IP addresses for the middleware instances
106/// * `base_delay_ms` - Base network delay in milliseconds
107/// * `jitter_max_ms` - Maximum additional random delay in milliseconds
108/// * `process_count` - Number of processes to enroll per middleware
109/// * `per_process_file_count` - Number of files to create per process
110/// * `per_process_stream_count` - Number of network streams to create per process
111pub async fn spawn_loopback_middlewares_with_entropy(
112    ips: Vec<String>,
113    base_delay_ms: u64,
114    jitter_max_ms: u64,
115    process_count: u32,
116    per_process_file_count: u32,
117    per_process_stream_count: u32,
118) -> VecDeque<(P2mApiDefaultStack<M2mLoopback>, O2mApiDefaultStack)> {
119    let m2m_loopback = M2mLoopback::new(base_delay_ms, jitter_max_ms);
120    let mut middlewares = VecDeque::new();
121    for ip in ips {
122        let (m2m, p2m, o2m) = init_middleware_with_enrolled_resources(
123            ip.clone(),
124            None,
125            0,
126            m2m_loopback.clone(),
127            false, // Disable resource validation for loopback tests
128            process_count,
129            per_process_file_count,
130            per_process_stream_count,
131        );
132        m2m_loopback.register_middleware(ip.clone(), m2m).await;
133        middlewares.push_back((p2m, o2m));
134    }
135    middlewares
136}
137
138/// Loopback transport service for in-process M2M communication.
139///
140/// `M2mLoopback` provides a transport implementation that routes M2M requests
141/// to local middleware instances within the same process. It maintains a
142/// registry of middleware instances indexed by IP address and supports
143/// configurable network delay simulation.
144///
145/// ## Network Simulation
146///
147/// The service can simulate network latency by introducing delays before
148/// processing requests. This includes both a base delay and random jitter
149/// to simulate real network conditions.
150///
151/// ## Thread Safety
152///
153/// The service is thread-safe and can be safely cloned and used across
154/// multiple concurrent tasks. All internal state is protected by appropriate
155/// synchronization primitives.
156#[derive(Clone)]
157pub struct M2mLoopback {
158    /// Registry of middleware instances indexed by IP address.
159    middlewares: Arc<DashMap<String, M2mApiDefaultStack>>,
160    /// Base network delay in milliseconds.
161    base_delay_ms: u64,
162    /// Maximum additional random delay in milliseconds.
163    jitter_max_ms: u64,
164    /// Timestamp of the last call for delay calculation.
165    last_call_time: Arc<std::sync::Mutex<Option<Instant>>>,
166}
167
168impl Default for M2mLoopback {
169    fn default() -> Self {
170        Self::new(0, 0)
171    }
172}
173
174impl M2mLoopback {
175    /// Creates a new loopback transport with the specified delay characteristics.
176    ///
177    /// # Arguments
178    ///
179    /// * `base_delay_ms` - Base delay to add to all requests in milliseconds
180    /// * `jitter_max_ms` - Maximum random additional delay in milliseconds
181    pub fn new(base_delay_ms: u64, jitter_max_ms: u64) -> Self {
182        Self {
183            middlewares: Arc::new(DashMap::new()),
184            base_delay_ms,
185            jitter_max_ms,
186            last_call_time: Arc::new(std::sync::Mutex::new(None)),
187        }
188    }
189
190    /// Registers a middleware instance with the specified IP address.
191    ///
192    /// This allows the loopback transport to route requests to the appropriate
193    /// middleware instance based on the target IP address extracted from requests.
194    ///
195    /// # Arguments
196    ///
197    /// * `ip` - IP address identifier for the middleware
198    /// * `middleware` - The middleware service instance to register
199    pub async fn register_middleware(&self, ip: String, middleware: M2mApiDefaultStack) {
200        self.middlewares.insert(ip, middleware);
201    }
202
203    /// Retrieves a middleware instance for the specified IP address.
204    ///
205    /// # Arguments
206    ///
207    /// * `ip` - IP address of the target middleware
208    ///
209    /// # Returns
210    ///
211    /// The middleware service instance, or an error if not found.
212    ///
213    /// # Errors
214    ///
215    /// Returns `TransportFailedToContactRemote` if no middleware is registered
216    /// for the specified IP address.
217    pub async fn get_middleware(
218        &self,
219        ip: String,
220    ) -> Result<M2mApiDefaultStack, TraceabilityError> {
221        self.middlewares
222            .get(&ip)
223            .map(|c| c.to_owned())
224            .ok_or(TraceabilityError::TransportFailedToContactRemote(ip))
225    }
226
227    /// Calculates the delay to apply based on the configured delay parameters.
228    ///
229    /// Combines the base delay with a random jitter component to simulate
230    /// realistic network latency characteristics.
231    ///
232    /// # Returns
233    ///
234    /// The total delay duration to apply to the current request.
235    fn calculate_delay(&self) -> Duration {
236        if self.base_delay_ms == 0 && self.jitter_max_ms == 0 {
237            return Duration::from_millis(0);
238        }
239
240        let base = Duration::from_millis(self.base_delay_ms);
241        if self.jitter_max_ms == 0 {
242            return base;
243        }
244
245        // Simple entropy based on current time
246        let seed = Instant::now().elapsed().as_nanos() as u64;
247        let jitter_ms = seed % (self.jitter_max_ms + 1);
248        base + Duration::from_millis(jitter_ms)
249    }
250}
251
252impl Service<M2mRequest> for M2mLoopback {
253    type Response = M2mResponse;
254    type Error = TraceabilityError;
255    type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
256
257    fn poll_ready(&mut self, cx: &mut std::task::Context<'_>) -> Poll<Result<(), Self::Error>> {
258        let now = Instant::now();
259        let delay = self.calculate_delay();
260
261        if delay.is_zero() {
262            return Poll::Ready(Ok(()));
263        }
264
265        // Check if we need to wait based on last call time
266        if let Ok(mut last_time) = self.last_call_time.lock() {
267            match *last_time {
268                Some(last) if now.duration_since(last) < delay => {
269                    // Still need to wait
270                    let waker = cx.waker().clone();
271                    let remaining = delay - now.duration_since(last);
272                    tokio::spawn(async move {
273                        tokio::time::sleep(remaining).await;
274                        waker.wake();
275                    });
276                    return Poll::Pending;
277                }
278                _ => {
279                    // Update last call time
280                    *last_time = Some(now);
281                }
282            }
283        }
284
285        Poll::Ready(Ok(()))
286    }
287
288    fn call(&mut self, request: M2mRequest) -> Self::Future {
289        let this = self.clone();
290        Box::pin(async move {
291            this.get_middleware(eval_remote_ip(request.clone())?).await?.call(request).await
292        })
293    }
294}