trace2e_core/traceability/api/
m2m.rs

1//! Middleware-to-Middleware (M2M) API service implementation.
2//!
3//! This module provides the service implementation for communication between distributed
4//! middleware instances in the traceability network. The M2M API enables consistent
5//! compliance checking, flow coordination, and provenance synchronization across
6//! multiple nodes in geographically distributed deployments.
7//!
8//! ## Service Architecture
9//!
10//! The `M2mApiService` coordinates between three core services:
11//! - **Sequencer Service**: For flow reservation and coordination across nodes
12//! - **Provenance Service**: For provenance tracking and ancestry synchronization
13//! - **Compliance Service**: For distributed policy evaluation and enforcement
14//!
15//! ## Distributed Operations
16//!
17//! **Cross-Node Compliance**: Query and evaluate policies across middleware boundaries
18//! to ensure consistent enforcement of organizational and regulatory requirements.
19//!
20//! **Flow Coordination**: Reserve and release distributed flows to prevent race
21//! conditions and ensure atomic operations across the network.
22//!
23//! **Provenance Synchronization**: Transfer provenance data between nodes to maintain
24//! complete audit trails for cross-boundary data flows.
25//!
26//! ## Network Considerations
27//!
28//! M2M operations involve network communication and may experience latency or failures.
29//! The service handles these conditions gracefully and provides appropriate error
30//! responses for downstream handling.
31
32use std::{collections::HashSet, future::Future, pin::Pin, task::Poll};
33
34use tower::Service;
35
36use crate::traceability::infrastructure::naming::DisplayableResource;
37use tracing::info;
38
39use crate::traceability::{
40    api::types::{
41        ComplianceRequest, ComplianceResponse, M2mRequest, M2mResponse, ProvenanceRequest,
42        ProvenanceResponse, SequencerRequest, SequencerResponse,
43    },
44    error::TraceabilityError,
45    infrastructure::naming::{NodeId, Resource},
46};
47
48/// M2M (Middleware-to-Middleware) API Service
49///
50/// This service handles communication between distributed middleware instances,
51/// enabling consistent compliance checking, flow coordination, and provenance updates
52/// across multiple nodes in the traceability network.
53#[derive(Debug, Clone)]
54pub struct M2mApiService<S, P, C> {
55    /// Service for managing flows sequencing
56    sequencer: S,
57    /// Service for tracking resources provenance
58    provenance: P,
59    /// Service for policy management and compliance checking
60    compliance: C,
61}
62
63impl<S, P, C> M2mApiService<S, P, C> {
64    /// Creates a new M2M API service with the provided sequencer, provenance, and compliance
65    /// services
66    pub fn new(sequencer: S, provenance: P, compliance: C) -> Self {
67        Self { sequencer, provenance, compliance }
68    }
69}
70
71impl<S, P, C> Service<M2mRequest> for M2mApiService<S, P, C>
72where
73    S: Service<SequencerRequest, Response = SequencerResponse, Error = TraceabilityError>
74        + Clone
75        + Send
76        + 'static,
77    S::Future: Send,
78    P: Service<ProvenanceRequest, Response = ProvenanceResponse, Error = TraceabilityError>
79        + Clone
80        + Send
81        + NodeId
82        + 'static,
83    P::Future: Send,
84    C: Service<ComplianceRequest, Response = ComplianceResponse, Error = TraceabilityError>
85        + Clone
86        + Send
87        + 'static,
88    C::Future: Send,
89{
90    type Response = M2mResponse;
91    type Error = TraceabilityError;
92    type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
93
94    fn poll_ready(&mut self, _: &mut std::task::Context<'_>) -> Poll<Result<(), Self::Error>> {
95        Poll::Ready(Ok(()))
96    }
97
98    fn call(&mut self, request: M2mRequest) -> Self::Future {
99        let mut sequencer = self.sequencer.clone();
100        let mut provenance = self.provenance.clone();
101        let mut compliance = self.compliance.clone();
102        Box::pin(async move {
103            match request {
104                M2mRequest::GetDestinationPolicy(destination) => {
105                    info!(
106                        node_id = %provenance.node_id(),
107                        destination = %destination,
108                        "[m2m] GetDestinationPolicy"
109                    );
110                    // check if the destination is local
111                    let destination = if *destination.node_id() == provenance.node_id() {
112                        destination.resource().to_owned()
113                    } else {
114                        return Err(TraceabilityError::NotLocalResource);
115                    };
116                    match sequencer
117                        .call(SequencerRequest::ReserveFlow {
118                            source: Resource::None, // placeholder for remote source resource
119                            destination: destination.clone(),
120                        })
121                        .await?
122                    {
123                        SequencerResponse::FlowReserved => {
124                            match compliance.call(ComplianceRequest::GetPolicy(destination)).await?
125                            {
126                                ComplianceResponse::Policy(policy) => {
127                                    Ok(M2mResponse::DestinationPolicy(policy))
128                                }
129                                _ => Err(TraceabilityError::InternalTrace2eError),
130                            }
131                        }
132                        _ => Err(TraceabilityError::InternalTrace2eError),
133                    }
134                }
135                M2mRequest::CheckSourceCompliance { sources, destination } => {
136                    info!(
137                        node_id = %provenance.node_id(),
138                        sources = %DisplayableResource::from(&sources),
139                        destination = %destination.0,
140                        destination_policy = ?destination.1,
141                        "[m2m] CheckSourceCompliance"
142                    );
143                    let sources = sources
144                        .iter()
145                        .filter(|r| *r.node_id() == provenance.node_id())
146                        .map(|r| r.resource().to_owned())
147                        .collect::<HashSet<_>>();
148                    match compliance
149                        .call(ComplianceRequest::EvalCompliance {
150                            sources,
151                            destination: destination.0,
152                            destination_policy: Some(destination.1),
153                        })
154                        .await
155                    {
156                        Ok(ComplianceResponse::Grant) => Ok(M2mResponse::Ack),
157                        Err(e) => Err(e),
158                        _ => Err(TraceabilityError::InternalTrace2eError),
159                    }
160                }
161                M2mRequest::UpdateProvenance { source_prov, destination } => {
162                    info!(
163                        node_id = %provenance.node_id(),
164                        source_prov = %DisplayableResource::from(&source_prov),
165                        destination = %destination,
166                        "[m2m] UpdateProvenance"
167                    );
168                    // check if the destination is local
169                    let destination = if *destination.node_id() == provenance.node_id() {
170                        destination.resource().to_owned()
171                    } else {
172                        return Err(TraceabilityError::NotLocalResource);
173                    };
174                    match provenance
175                        .call(ProvenanceRequest::UpdateProvenanceRaw {
176                            source_prov,
177                            destination: destination.clone(),
178                        })
179                        .await?
180                    {
181                        ProvenanceResponse::ProvenanceUpdated
182                        | ProvenanceResponse::ProvenanceNotUpdated => {
183                            match sequencer
184                                .call(SequencerRequest::ReleaseFlow { destination })
185                                .await?
186                            {
187                                SequencerResponse::FlowReleased { .. } => Ok(M2mResponse::Ack),
188                                _ => Err(TraceabilityError::InternalTrace2eError),
189                            }
190                        }
191                        _ => Err(TraceabilityError::InternalTrace2eError),
192                    }
193                }
194                M2mRequest::BroadcastDeletion(resource) => {
195                    info!(
196                        node_id = %provenance.node_id(),
197                        resource = %resource,
198                        "[m2m] BroadcastDeletion"
199                    );
200                    // check if the resource is local
201                    if *resource.node_id() == provenance.node_id() {
202                        match compliance
203                            .call(ComplianceRequest::SetDeleted(resource.resource().to_owned()))
204                            .await?
205                        {
206                            ComplianceResponse::PolicyUpdated => Ok(M2mResponse::Ack),
207                            _ => Err(TraceabilityError::InternalTrace2eError),
208                        }
209                    } else {
210                        // If the resource is not local, just return Ack, as no action is needed here.
211                        Ok(M2mResponse::Ack)
212                    }
213                }
214            }
215        })
216    }
217}