trace2e_core/traceability/m2m.rs
1//! Middleware-to-Middleware (M2M) API service implementation.
2//!
3//! This module provides the service implementation for communication between distributed
4//! middleware instances in the traceability network. The M2M API enables consistent
5//! compliance checking, flow coordination, and provenance synchronization across
6//! multiple nodes in geographically distributed deployments.
7//!
8//! ## Service Architecture
9//!
10//! The `M2mApiService` coordinates between three core services:
11//! - **Sequencer Service**: For flow reservation and coordination across nodes
12//! - **Provenance Service**: For provenance tracking and ancestry synchronization
13//! - **Compliance Service**: For distributed policy evaluation and enforcement
14//!
15//! ## Distributed Operations
16//!
17//! **Cross-Node Compliance**: Query and evaluate policies across middleware boundaries
18//! to ensure consistent enforcement of organizational and regulatory requirements.
19//!
20//! **Flow Coordination**: Reserve and release distributed flows to prevent race
21//! conditions and ensure atomic operations across the network.
22//!
23//! **Provenance Synchronization**: Transfer provenance data between nodes to maintain
24//! complete audit trails for cross-boundary data flows.
25//!
26//! ## Network Considerations
27//!
28//! M2M operations involve network communication and may experience latency or failures.
29//! The service handles these conditions gracefully and provides appropriate error
30//! responses for downstream handling.
31
32use std::{future::Future, pin::Pin, task::Poll};
33
34use tower::Service;
35#[cfg(feature = "trace2e_tracing")]
36use tracing::info;
37
38use crate::traceability::{
39 api::{
40 ComplianceRequest, ComplianceResponse, M2mRequest, M2mResponse, ProvenanceRequest,
41 ProvenanceResponse, SequencerRequest, SequencerResponse,
42 },
43 error::TraceabilityError,
44 naming::NodeId,
45};
46
47/// M2M (Middleware-to-Middleware) API Service
48///
49/// This service handles communication between distributed middleware instances,
50/// enabling consistent compliance checking, flow coordination, and provenance updates
51/// across multiple nodes in the traceability network.
52#[derive(Debug, Clone)]
53pub struct M2mApiService<S, P, C> {
54 /// Service for managing flows sequencing
55 sequencer: S,
56 /// Service for tracking resources provenance
57 provenance: P,
58 /// Service for policy management and compliance checking
59 compliance: C,
60}
61
62impl<S, P, C> M2mApiService<S, P, C> {
63 /// Creates a new M2M API service with the provided sequencer, provenance, and compliance
64 /// services
65 pub fn new(sequencer: S, provenance: P, compliance: C) -> Self {
66 Self { sequencer, provenance, compliance }
67 }
68}
69
70impl<S, P, C> Service<M2mRequest> for M2mApiService<S, P, C>
71where
72 S: Service<SequencerRequest, Response = SequencerResponse, Error = TraceabilityError>
73 + Clone
74 + Send
75 + 'static,
76 S::Future: Send,
77 P: Service<ProvenanceRequest, Response = ProvenanceResponse, Error = TraceabilityError>
78 + Clone
79 + Send
80 + NodeId
81 + 'static,
82 P::Future: Send,
83 C: Service<ComplianceRequest, Response = ComplianceResponse, Error = TraceabilityError>
84 + Clone
85 + Send
86 + 'static,
87 C::Future: Send,
88{
89 type Response = M2mResponse;
90 type Error = TraceabilityError;
91 type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
92
93 fn poll_ready(&mut self, _: &mut std::task::Context<'_>) -> Poll<Result<(), Self::Error>> {
94 Poll::Ready(Ok(()))
95 }
96
97 fn call(&mut self, request: M2mRequest) -> Self::Future {
98 let mut sequencer = self.sequencer.clone();
99 let mut provenance = self.provenance.clone();
100 let mut compliance = self.compliance.clone();
101 Box::pin(async move {
102 match request {
103 M2mRequest::GetDestinationCompliance { source, destination } => {
104 #[cfg(feature = "trace2e_tracing")]
105 info!(
106 "[m2m-{}] GetDestinationCompliance: source: {:?}, destination: {:?}",
107 provenance.node_id(),
108 source,
109 destination
110 );
111 match sequencer
112 .call(SequencerRequest::ReserveFlow {
113 source,
114 destination: destination.clone(),
115 })
116 .await?
117 {
118 SequencerResponse::FlowReserved => {
119 match compliance.call(ComplianceRequest::GetPolicy(destination)).await?
120 {
121 ComplianceResponse::Policy(policy) => {
122 Ok(M2mResponse::DestinationCompliance(policy))
123 }
124 _ => Err(TraceabilityError::InternalTrace2eError),
125 }
126 }
127 _ => Err(TraceabilityError::InternalTrace2eError),
128 }
129 }
130 M2mRequest::GetSourceCompliance { resources, .. } => {
131 #[cfg(feature = "trace2e_tracing")]
132 info!(
133 "[m2m-{}] GetSourceCompliance: resources: {:?}",
134 provenance.node_id(),
135 resources
136 );
137 match compliance.call(ComplianceRequest::GetPolicies(resources)).await? {
138 ComplianceResponse::Policies(policies) => {
139 Ok(M2mResponse::SourceCompliance(policies))
140 }
141 _ => Err(TraceabilityError::InternalTrace2eError),
142 }
143 }
144 M2mRequest::UpdateProvenance { source_prov, destination } => {
145 #[cfg(feature = "trace2e_tracing")]
146 info!(
147 "[m2m-{}] UpdateProvenance: source_prov: {:?}, destination: {:?}",
148 provenance.node_id(),
149 source_prov,
150 destination
151 );
152 match provenance
153 .call(ProvenanceRequest::UpdateProvenanceRaw {
154 source_prov,
155 destination: destination.clone(),
156 })
157 .await?
158 {
159 ProvenanceResponse::ProvenanceUpdated
160 | ProvenanceResponse::ProvenanceNotUpdated => {
161 match sequencer
162 .call(SequencerRequest::ReleaseFlow { destination })
163 .await?
164 {
165 SequencerResponse::FlowReleased { .. } => Ok(M2mResponse::Ack),
166 _ => Err(TraceabilityError::InternalTrace2eError),
167 }
168 }
169 _ => Err(TraceabilityError::InternalTrace2eError),
170 }
171 }
172 }
173 })
174 }
175}