trace2e_core/traceability/
m2m.rs

1//! Middleware-to-Middleware (M2M) API service implementation.
2//!
3//! This module provides the service implementation for communication between distributed
4//! middleware instances in the traceability network. The M2M API enables consistent
5//! compliance checking, flow coordination, and provenance synchronization across
6//! multiple nodes in geographically distributed deployments.
7//!
8//! ## Service Architecture
9//!
10//! The `M2mApiService` coordinates between three core services:
11//! - **Sequencer Service**: For flow reservation and coordination across nodes
12//! - **Provenance Service**: For provenance tracking and ancestry synchronization
13//! - **Compliance Service**: For distributed policy evaluation and enforcement
14//!
15//! ## Distributed Operations
16//!
17//! **Cross-Node Compliance**: Query and evaluate policies across middleware boundaries
18//! to ensure consistent enforcement of organizational and regulatory requirements.
19//!
20//! **Flow Coordination**: Reserve and release distributed flows to prevent race
21//! conditions and ensure atomic operations across the network.
22//!
23//! **Provenance Synchronization**: Transfer provenance data between nodes to maintain
24//! complete audit trails for cross-boundary data flows.
25//!
26//! ## Network Considerations
27//!
28//! M2M operations involve network communication and may experience latency or failures.
29//! The service handles these conditions gracefully and provides appropriate error
30//! responses for downstream handling.
31
32use std::{future::Future, pin::Pin, task::Poll};
33
34use tower::Service;
35#[cfg(feature = "trace2e_tracing")]
36use tracing::info;
37
38use crate::traceability::{
39    api::{
40        ComplianceRequest, ComplianceResponse, M2mRequest, M2mResponse, ProvenanceRequest,
41        ProvenanceResponse, SequencerRequest, SequencerResponse,
42    },
43    error::TraceabilityError,
44    naming::NodeId,
45};
46
47/// M2M (Middleware-to-Middleware) API Service
48///
49/// This service handles communication between distributed middleware instances,
50/// enabling consistent compliance checking, flow coordination, and provenance updates
51/// across multiple nodes in the traceability network.
52#[derive(Debug, Clone)]
53pub struct M2mApiService<S, P, C> {
54    /// Service for managing flows sequencing
55    sequencer: S,
56    /// Service for tracking resources provenance
57    provenance: P,
58    /// Service for policy management and compliance checking
59    compliance: C,
60}
61
62impl<S, P, C> M2mApiService<S, P, C> {
63    /// Creates a new M2M API service with the provided sequencer, provenance, and compliance
64    /// services
65    pub fn new(sequencer: S, provenance: P, compliance: C) -> Self {
66        Self { sequencer, provenance, compliance }
67    }
68}
69
70impl<S, P, C> Service<M2mRequest> for M2mApiService<S, P, C>
71where
72    S: Service<SequencerRequest, Response = SequencerResponse, Error = TraceabilityError>
73        + Clone
74        + Send
75        + 'static,
76    S::Future: Send,
77    P: Service<ProvenanceRequest, Response = ProvenanceResponse, Error = TraceabilityError>
78        + Clone
79        + Send
80        + NodeId
81        + 'static,
82    P::Future: Send,
83    C: Service<ComplianceRequest, Response = ComplianceResponse, Error = TraceabilityError>
84        + Clone
85        + Send
86        + 'static,
87    C::Future: Send,
88{
89    type Response = M2mResponse;
90    type Error = TraceabilityError;
91    type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
92
93    fn poll_ready(&mut self, _: &mut std::task::Context<'_>) -> Poll<Result<(), Self::Error>> {
94        Poll::Ready(Ok(()))
95    }
96
97    fn call(&mut self, request: M2mRequest) -> Self::Future {
98        let mut sequencer = self.sequencer.clone();
99        let mut provenance = self.provenance.clone();
100        let mut compliance = self.compliance.clone();
101        Box::pin(async move {
102            match request {
103                M2mRequest::GetDestinationCompliance { source, destination } => {
104                    #[cfg(feature = "trace2e_tracing")]
105                    info!(
106                        "[m2m-{}] GetDestinationCompliance: source: {:?}, destination: {:?}",
107                        provenance.node_id(),
108                        source,
109                        destination
110                    );
111                    match sequencer
112                        .call(SequencerRequest::ReserveFlow {
113                            source,
114                            destination: destination.clone(),
115                        })
116                        .await?
117                    {
118                        SequencerResponse::FlowReserved => {
119                            match compliance.call(ComplianceRequest::GetPolicy(destination)).await?
120                            {
121                                ComplianceResponse::Policy(policy) => {
122                                    Ok(M2mResponse::DestinationCompliance(policy))
123                                }
124                                _ => Err(TraceabilityError::InternalTrace2eError),
125                            }
126                        }
127                        _ => Err(TraceabilityError::InternalTrace2eError),
128                    }
129                }
130                M2mRequest::GetSourceCompliance { resources, .. } => {
131                    #[cfg(feature = "trace2e_tracing")]
132                    info!(
133                        "[m2m-{}] GetSourceCompliance: resources: {:?}",
134                        provenance.node_id(),
135                        resources
136                    );
137                    match compliance.call(ComplianceRequest::GetPolicies(resources)).await? {
138                        ComplianceResponse::Policies(policies) => {
139                            Ok(M2mResponse::SourceCompliance(policies))
140                        }
141                        _ => Err(TraceabilityError::InternalTrace2eError),
142                    }
143                }
144                M2mRequest::UpdateProvenance { source_prov, destination } => {
145                    #[cfg(feature = "trace2e_tracing")]
146                    info!(
147                        "[m2m-{}] UpdateProvenance: source_prov: {:?}, destination: {:?}",
148                        provenance.node_id(),
149                        source_prov,
150                        destination
151                    );
152                    match provenance
153                        .call(ProvenanceRequest::UpdateProvenanceRaw {
154                            source_prov,
155                            destination: destination.clone(),
156                        })
157                        .await?
158                    {
159                        ProvenanceResponse::ProvenanceUpdated
160                        | ProvenanceResponse::ProvenanceNotUpdated => {
161                            match sequencer
162                                .call(SequencerRequest::ReleaseFlow { destination })
163                                .await?
164                            {
165                                SequencerResponse::FlowReleased { .. } => Ok(M2mResponse::Ack),
166                                _ => Err(TraceabilityError::InternalTrace2eError),
167                            }
168                        }
169                        _ => Err(TraceabilityError::InternalTrace2eError),
170                    }
171                }
172            }
173        })
174    }
175}