trace2e_core/transport/
loopback.rs

1//! # Loopback Transport Implementation
2//!
3//! This module provides a loopback transport implementation for testing and
4//! development scenarios where multiple middleware instances need to communicate
5//! within a single process or test environment. It simulates network communication
6//! by routing M2M requests directly to local middleware instances.
7//!
8//! ## Features
9//!
10//! - **In-Process Communication**: Routes calls directly to registered middleware instances
11//! - **Network Simulation**: Supports configurable delays and jitter to simulate network latency
12//! - **Test Orchestration**: Provides utilities for spawning multiple middleware instances
13//! - **Resource Pre-enrollment**: Supports pre-populating middleware with test resources
14//!
15//! ## Use Cases
16//!
17//! - Unit and integration testing of distributed traceability scenarios
18//! - Development and debugging of multi-node workflows
19//! - Performance testing with controlled network conditions
20//! - Simulation of distributed systems in a single process
21//!
22//! ## Network Simulation
23//!
24//! The loopback transport can simulate network characteristics by introducing
25//! configurable delays and jitter to M2M calls, allowing testing of timeout
26//! handling and performance under various network conditions.
27
28use std::{
29    collections::VecDeque,
30    future::Future,
31    pin::Pin,
32    sync::Arc,
33    task::Poll,
34    time::{Duration, Instant},
35};
36
37use dashmap::DashMap;
38use tower::Service;
39
40use crate::{
41    traceability::{
42        M2mApiDefaultStack, O2mApiDefaultStack, P2mApiDefaultStack,
43        api::{M2mRequest, M2mResponse},
44        error::TraceabilityError,
45        init_middleware,
46    },
47    transport::eval_remote_ip,
48};
49
50/// Spawns multiple loopback middleware instances.
51///
52/// Creates a set of middleware instances that can communicate with each other
53/// through the loopback transport. Each middleware is identified by an IP address
54/// for routing purposes.
55///
56/// # Arguments
57///
58/// * `ips` - Vector of IP addresses to assign to the middleware instances
59///
60/// # Returns
61///
62/// A queue of (P2M service, O2M service) tuples for each middleware instance.
63pub async fn spawn_loopback_middlewares(
64    ips: Vec<String>,
65) -> VecDeque<(P2mApiDefaultStack<M2mLoopback>, O2mApiDefaultStack<M2mLoopback>)> {
66    spawn_loopback_middlewares_with_delay(ips, 0, 0).await
67}
68
69/// Spawns multiple loopback middleware instances with network delay simulation.
70///
71/// Creates middleware instances with configurable network delay simulation
72/// for testing timeout handling and performance under various network conditions.
73///
74/// # Arguments
75///
76/// * `ips` - Vector of IP addresses for the middleware instances
77/// * `base_delay_ms` - Base network delay in milliseconds
78/// * `jitter_max_ms` - Maximum additional random delay in milliseconds
79pub async fn spawn_loopback_middlewares_with_delay(
80    ips: Vec<String>,
81    base_delay_ms: u64,
82    jitter_max_ms: u64,
83) -> VecDeque<(P2mApiDefaultStack<M2mLoopback>, O2mApiDefaultStack<M2mLoopback>)> {
84    let m2m_loopback = M2mLoopback::new(base_delay_ms, jitter_max_ms);
85    let mut middlewares = VecDeque::new();
86    for ip in ips {
87        let (m2m, p2m, o2m) = init_middleware(
88            ip.clone(),
89            None,
90            0,
91            m2m_loopback.clone(),
92            false, // Disable resource validation for loopback tests
93        );
94        m2m_loopback.register_middleware(ip.clone(), m2m).await;
95        middlewares.push_back((p2m, o2m));
96    }
97    middlewares
98}
99
100/// Loopback transport service for in-process M2M communication.
101///
102/// `M2mLoopback` provides a transport implementation that routes M2M requests
103/// to local middleware instances within the same process. It maintains a
104/// registry of middleware instances indexed by IP address and supports
105/// configurable network delay simulation.
106///
107/// ## Network Simulation
108///
109/// The service can simulate network latency by introducing delays before
110/// processing requests. This includes both a base delay and random jitter
111/// to simulate real network conditions.
112///
113/// ## Thread Safety
114///
115/// The service is thread-safe and can be safely cloned and used across
116/// multiple concurrent tasks. All internal state is protected by appropriate
117/// synchronization primitives.
118#[derive(Clone)]
119pub struct M2mLoopback {
120    /// Registry of middleware instances indexed by IP address.
121    middlewares: Arc<DashMap<String, M2mApiDefaultStack>>,
122    /// Base network delay in milliseconds.
123    base_delay_ms: u64,
124    /// Maximum additional random delay in milliseconds.
125    jitter_max_ms: u64,
126    /// Timestamp of the last call for delay calculation.
127    last_call_time: Arc<std::sync::Mutex<Option<Instant>>>,
128}
129
130impl Default for M2mLoopback {
131    fn default() -> Self {
132        Self::new(0, 0)
133    }
134}
135
136impl M2mLoopback {
137    /// Creates a new loopback transport with the specified delay characteristics.
138    ///
139    /// # Arguments
140    ///
141    /// * `base_delay_ms` - Base delay to add to all requests in milliseconds
142    /// * `jitter_max_ms` - Maximum random additional delay in milliseconds
143    pub fn new(base_delay_ms: u64, jitter_max_ms: u64) -> Self {
144        Self {
145            middlewares: Arc::new(DashMap::new()),
146            base_delay_ms,
147            jitter_max_ms,
148            last_call_time: Arc::new(std::sync::Mutex::new(None)),
149        }
150    }
151
152    /// Registers a middleware instance with the specified IP address.
153    ///
154    /// This allows the loopback transport to route requests to the appropriate
155    /// middleware instance based on the target IP address extracted from requests.
156    ///
157    /// # Arguments
158    ///
159    /// * `ip` - IP address identifier for the middleware
160    /// * `middleware` - The middleware service instance to register
161    pub async fn register_middleware(&self, ip: String, middleware: M2mApiDefaultStack) {
162        self.middlewares.insert(ip, middleware);
163    }
164
165    /// Retrieves a middleware instance for the specified IP address.
166    ///
167    /// # Arguments
168    ///
169    /// * `ip` - IP address of the target middleware
170    ///
171    /// # Returns
172    ///
173    /// The middleware service instance, or an error if not found.
174    ///
175    /// # Errors
176    ///
177    /// Returns `TransportFailedToContactRemote` if no middleware is registered
178    /// for the specified IP address.
179    pub fn get_middleware(&self, ip: String) -> Result<M2mApiDefaultStack, TraceabilityError> {
180        self.middlewares
181            .get(&ip)
182            .map(|c| c.to_owned())
183            .ok_or(TraceabilityError::TransportFailedToContactRemote(ip))
184    }
185
186    /// Calculates the delay to apply based on the configured delay parameters.
187    ///
188    /// Combines the base delay with a random jitter component to simulate
189    /// realistic network latency characteristics.
190    ///
191    /// # Returns
192    ///
193    /// The total delay duration to apply to the current request.
194    fn calculate_delay(&self) -> Duration {
195        if self.base_delay_ms == 0 && self.jitter_max_ms == 0 {
196            return Duration::from_millis(0);
197        }
198
199        let base = Duration::from_millis(self.base_delay_ms);
200        if self.jitter_max_ms == 0 {
201            return base;
202        }
203
204        // Simple entropy based on current time
205        let seed = Instant::now().elapsed().as_nanos() as u64;
206        let jitter_ms = seed % (self.jitter_max_ms + 1);
207        base + Duration::from_millis(jitter_ms)
208    }
209}
210
211impl Service<M2mRequest> for M2mLoopback {
212    type Response = M2mResponse;
213    type Error = TraceabilityError;
214    type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
215
216    fn poll_ready(&mut self, cx: &mut std::task::Context<'_>) -> Poll<Result<(), Self::Error>> {
217        let now = Instant::now();
218        let delay = self.calculate_delay();
219
220        if delay.is_zero() {
221            return Poll::Ready(Ok(()));
222        }
223
224        // Check if we need to wait based on last call time
225        if let Ok(mut last_time) = self.last_call_time.lock() {
226            match *last_time {
227                Some(last) if now.duration_since(last) < delay => {
228                    // Still need to wait
229                    let waker = cx.waker().clone();
230                    let remaining = delay - now.duration_since(last);
231                    tokio::spawn(async move {
232                        tokio::time::sleep(remaining).await;
233                        waker.wake();
234                    });
235                    return Poll::Pending;
236                }
237                _ => {
238                    // Update last call time
239                    *last_time = Some(now);
240                }
241            }
242        }
243
244        Poll::Ready(Ok(()))
245    }
246
247    fn call(&mut self, request: M2mRequest) -> Self::Future {
248        let this = self.clone();
249        let request_clone = request.clone();
250        match request_clone {
251            M2mRequest::BroadcastDeletion(_) => {
252                Box::pin(async move {
253                    // Spawn all middleware calls concurrently
254                    let mut handles = Vec::new();
255                    for entry in this.middlewares.iter() {
256                        let mut middleware = entry.value().clone();
257                        let request = request_clone.clone();
258                        let handle = tokio::spawn(async move { middleware.call(request).await });
259                        handles.push(handle);
260                    }
261
262                    // Collect all results and check for any errors
263                    for handle in handles {
264                        match handle.await {
265                            Ok(result) => {
266                                result?; // Check for service errors, ignore the response for the moment
267                            }
268                            Err(_) => {
269                                return Err(TraceabilityError::TransportFailedToContactRemote(
270                                    "BroadcastDeletion join failed".to_string(),
271                                ));
272                            }
273                        }
274                    }
275
276                    // All middleware calls succeeded, return Ack
277                    Ok(M2mResponse::Ack)
278                })
279            }
280            M2mRequest::CheckSourceCompliance { sources, destination } => Box::pin(async move {
281                // Partition sources by node_id
282                let mut partitioned: std::collections::HashMap<String, Vec<_>> =
283                    std::collections::HashMap::new();
284                for source in sources {
285                    partitioned
286                        .entry(source.node_id().clone())
287                        .or_insert_with(Vec::new)
288                        .push(source);
289                }
290
291                // Spawn tasks for each remote node
292                let mut handles = Vec::new();
293                for (node_id, sources) in partitioned {
294                    let mut middleware = this.get_middleware(node_id)?;
295                    let dest = destination.clone();
296                    let request = M2mRequest::CheckSourceCompliance {
297                        sources: sources.into_iter().collect(),
298                        destination: dest,
299                    };
300                    handles.push(tokio::spawn(async move { middleware.call(request).await }));
301                }
302
303                // Collect all results
304                for handle in handles {
305                    match handle.await.map_err(|_| TraceabilityError::InternalTrace2eError)?? {
306                        M2mResponse::Ack => continue,
307                        _ => return Err(TraceabilityError::InternalTrace2eError),
308                    }
309                }
310
311                Ok(M2mResponse::Ack)
312            }),
313            _ => Box::pin(async move {
314                this.get_middleware(eval_remote_ip(request.clone())?)?.call(request).await
315            }),
316        }
317    }
318}