trace2e_core/traceability/api/m2m.rs
1//! Middleware-to-Middleware (M2M) API service implementation.
2//!
3//! This module provides the service implementation for communication between distributed
4//! middleware instances in the traceability network. The M2M API enables consistent
5//! compliance checking, flow coordination, and provenance synchronization across
6//! multiple nodes in geographically distributed deployments.
7//!
8//! ## Service Architecture
9//!
10//! The `M2mApiService` coordinates between three core services:
11//! - **Sequencer Service**: For flow reservation and coordination across nodes
12//! - **Provenance Service**: For provenance tracking and ancestry synchronization
13//! - **Compliance Service**: For distributed policy evaluation and enforcement
14//!
15//! ## Distributed Operations
16//!
17//! **Cross-Node Compliance**: Query and evaluate policies across middleware boundaries
18//! to ensure consistent enforcement of organizational and regulatory requirements.
19//!
20//! **Flow Coordination**: Reserve and release distributed flows to prevent race
21//! conditions and ensure atomic operations across the network.
22//!
23//! **Provenance Synchronization**: Transfer provenance data between nodes to maintain
24//! complete audit trails for cross-boundary data flows.
25//!
26//! ## Network Considerations
27//!
28//! M2M operations involve network communication and may experience latency or failures.
29//! The service handles these conditions gracefully and provides appropriate error
30//! responses for downstream handling.
31
32use std::{collections::HashSet, future::Future, pin::Pin, task::Poll};
33
34use tower::Service;
35
36use crate::traceability::infrastructure::naming::DisplayableResource;
37use tracing::info;
38
39use crate::traceability::{
40 api::types::{
41 ComplianceRequest, ComplianceResponse, M2mRequest, M2mResponse, ProvenanceRequest,
42 ProvenanceResponse, SequencerRequest, SequencerResponse,
43 },
44 error::TraceabilityError,
45 infrastructure::naming::{NodeId, Resource},
46};
47
48/// M2M (Middleware-to-Middleware) API Service
49///
50/// This service handles communication between distributed middleware instances,
51/// enabling consistent compliance checking, flow coordination, and provenance updates
52/// across multiple nodes in the traceability network.
53#[derive(Debug, Clone)]
54pub struct M2mApiService<S, P, C> {
55 /// Service for managing flows sequencing
56 sequencer: S,
57 /// Service for tracking resources provenance
58 provenance: P,
59 /// Service for policy management and compliance checking
60 compliance: C,
61}
62
63impl<S, P, C> M2mApiService<S, P, C> {
64 /// Creates a new M2M API service with the provided sequencer, provenance, and compliance
65 /// services
66 pub fn new(sequencer: S, provenance: P, compliance: C) -> Self {
67 Self { sequencer, provenance, compliance }
68 }
69}
70
71impl<S, P, C> Service<M2mRequest> for M2mApiService<S, P, C>
72where
73 S: Service<SequencerRequest, Response = SequencerResponse, Error = TraceabilityError>
74 + Clone
75 + Send
76 + 'static,
77 S::Future: Send,
78 P: Service<ProvenanceRequest, Response = ProvenanceResponse, Error = TraceabilityError>
79 + Clone
80 + Send
81 + NodeId
82 + 'static,
83 P::Future: Send,
84 C: Service<ComplianceRequest, Response = ComplianceResponse, Error = TraceabilityError>
85 + Clone
86 + Send
87 + 'static,
88 C::Future: Send,
89{
90 type Response = M2mResponse;
91 type Error = TraceabilityError;
92 type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
93
94 fn poll_ready(&mut self, _: &mut std::task::Context<'_>) -> Poll<Result<(), Self::Error>> {
95 Poll::Ready(Ok(()))
96 }
97
98 fn call(&mut self, request: M2mRequest) -> Self::Future {
99 let mut sequencer = self.sequencer.clone();
100 let mut provenance = self.provenance.clone();
101 let mut compliance = self.compliance.clone();
102 Box::pin(async move {
103 match request {
104 M2mRequest::GetDestinationPolicy(destination) => {
105 info!(
106 node_id = %provenance.node_id(),
107 destination = %destination,
108 "[m2m] GetDestinationPolicy"
109 );
110 // check if the destination is local
111 let destination = if *destination.node_id() == provenance.node_id() {
112 destination.resource().to_owned()
113 } else {
114 return Err(TraceabilityError::NotLocalResource);
115 };
116 match sequencer
117 .call(SequencerRequest::ReserveFlow {
118 source: Resource::None, // placeholder for remote source resource
119 destination: destination.clone(),
120 })
121 .await?
122 {
123 SequencerResponse::FlowReserved => {
124 match compliance.call(ComplianceRequest::GetPolicy(destination)).await?
125 {
126 ComplianceResponse::Policy(policy) => {
127 Ok(M2mResponse::DestinationPolicy(policy))
128 }
129 _ => Err(TraceabilityError::InternalTrace2eError),
130 }
131 }
132 _ => Err(TraceabilityError::InternalTrace2eError),
133 }
134 }
135 M2mRequest::CheckSourceCompliance { sources, destination } => {
136 info!(
137 node_id = %provenance.node_id(),
138 sources = %DisplayableResource::from(&sources),
139 destination = %destination.0,
140 destination_policy = ?destination.1,
141 "[m2m] CheckSourceCompliance"
142 );
143 let sources = sources
144 .iter()
145 .filter(|r| *r.node_id() == provenance.node_id())
146 .map(|r| r.resource().to_owned())
147 .collect::<HashSet<_>>();
148 match compliance
149 .call(ComplianceRequest::EvalCompliance {
150 sources,
151 destination: destination.0,
152 destination_policy: Some(destination.1),
153 })
154 .await
155 {
156 Ok(ComplianceResponse::Grant) => Ok(M2mResponse::Ack),
157 Err(e) => Err(e),
158 _ => Err(TraceabilityError::InternalTrace2eError),
159 }
160 }
161 M2mRequest::UpdateProvenance { source_prov, destination } => {
162 info!(
163 node_id = %provenance.node_id(),
164 source_prov = %DisplayableResource::from(&source_prov),
165 destination = %destination,
166 "[m2m] UpdateProvenance"
167 );
168 // check if the destination is local
169 let destination = if *destination.node_id() == provenance.node_id() {
170 destination.resource().to_owned()
171 } else {
172 return Err(TraceabilityError::NotLocalResource);
173 };
174 match provenance
175 .call(ProvenanceRequest::UpdateProvenanceRaw {
176 source_prov,
177 destination: destination.clone(),
178 })
179 .await?
180 {
181 ProvenanceResponse::ProvenanceUpdated
182 | ProvenanceResponse::ProvenanceNotUpdated => {
183 match sequencer
184 .call(SequencerRequest::ReleaseFlow { destination })
185 .await?
186 {
187 SequencerResponse::FlowReleased { .. } => Ok(M2mResponse::Ack),
188 _ => Err(TraceabilityError::InternalTrace2eError),
189 }
190 }
191 _ => Err(TraceabilityError::InternalTrace2eError),
192 }
193 }
194 M2mRequest::BroadcastDeletion(resource) => {
195 info!(
196 node_id = %provenance.node_id(),
197 resource = %resource,
198 "[m2m] BroadcastDeletion"
199 );
200 // check if the resource is local
201 if *resource.node_id() == provenance.node_id() {
202 match compliance
203 .call(ComplianceRequest::SetDeleted(resource.resource().to_owned()))
204 .await?
205 {
206 ComplianceResponse::PolicyUpdated => Ok(M2mResponse::Ack),
207 _ => Err(TraceabilityError::InternalTrace2eError),
208 }
209 } else {
210 // If the resource is not local, just return Ack, as no action is needed here.
211 Ok(M2mResponse::Ack)
212 }
213 }
214 }
215 })
216 }
217}