trace2e_core/traceability/api/p2m.rs
1//! Process-to-Middleware (P2M) API service implementation.
2//!
3//! This module provides the core service implementation for handling requests from application
4//! processes that need traceability tracking for their I/O operations. The P2M API is the
5//! primary interface through which applications integrate with the trace2e system.
6//!
7//! ## Service Architecture
8//!
9//! The `P2mApiService` acts as the central coordinator between four key services:
10//! - **Sequencer Service**: Manages flow ordering and resource reservations
11//! - **Provenance Service**: Tracks data provenance
12//! - **Compliance Service**: Enforces policies and authorization decisions
13//! - **M2M Client**: Communicates with remote middleware for distributed flows
14//!
15//! ## Resource Management
16//!
17//! The service maintains two primary data structures:
18//! - **Resource Map**: Associates process/file descriptor pairs with source/destination resources
19//! - **Flow Map**: Tracks active flows by grant ID for operation completion reporting
20//!
21//! ## Operation Workflow
22//!
23//! 1. **Enrollment**: Processes register their files and streams before use
24//! 2. **Authorization**: Processes request permission for specific I/O operations
25//! 3. **Execution**: Middleware evaluates policies and grants/denies access
26//! 4. **Reporting**: Processes report completion status for audit trails
27//!
28//! ## Cross-Node Coordination
29//!
30//! For distributed flows involving remote resources, the service coordinates with
31//! remote middleware instances via the M2M API to ensure consistent policy
32//! enforcement and provenance tracking across the network.
33
34use std::{
35 collections::HashSet, future::Future, pin::Pin, sync::Arc, task::Poll, time::SystemTime,
36};
37
38use dashmap::DashMap;
39use tower::{Service, ServiceExt};
40use tracing::{debug, info};
41
42use crate::traceability::{
43 api::types::{
44 ComplianceRequest, ComplianceResponse, M2mRequest, M2mResponse, P2mRequest, P2mResponse,
45 ProvenanceRequest, ProvenanceResponse, SequencerRequest, SequencerResponse,
46 },
47 error::TraceabilityError,
48 infrastructure::{
49 naming::{NodeId, Resource},
50 validation::ResourceValidator,
51 },
52};
53
54/// Maps (process_id, file_descriptor) to (source_resource, destination_resource) pairs
55type ResourceMap = DashMap<(i32, i32), (Resource, Resource)>;
56/// Maps flow_id to (source_resource, destination_resource) pairs for active flows
57type FlowMap = DashMap<u128, (Resource, Resource)>;
58
59/// P2M (Process-to-Middleware) API Service.
60///
61/// Central orchestrator for process-initiated traceability operations. This service
62/// manages the complete lifecycle of tracked I/O operations from initial resource
63/// enrollment through final completion reporting.
64///
65/// ## Core Responsibilities
66///
67/// **Resource Enrollment**: Maintains registry of process file descriptors and their
68/// associated resources (files or network streams) for traceability tracking.
69///
70/// **Flow Authorization**: Coordinates with compliance and sequencer services to
71/// evaluate whether requested I/O operations should be permitted based on current policies.
72///
73/// **Distributed Coordination**: Communicates with remote middleware instances for
74/// cross-node flows, ensuring consistent policy enforcement across the network.
75///
76/// **Provenance Tracking**: Updates provenance records following successful operations
77/// to maintain complete audit trails for compliance and governance.
78///
79/// ## Concurrency and State Management
80///
81/// Uses concurrent data structures (`DashMap`) to handle multiple simultaneous requests
82/// from different processes while maintaining consistency. Resource and flow maps are
83/// shared across service instances using `Arc` for efficient cloning.
84///
85/// ## Generic Type Parameters
86///
87/// - `S`: Sequencer service for flow coordination and resource reservations
88/// - `P`: Provenance service for provenance tracking
89/// - `C`: Compliance service for policy evaluation and authorization decisions
90/// - `M`: M2M client service for communication with remote middleware instances
91#[derive(Debug, Clone)]
92pub struct P2mApiService<S, P, C, M> {
93 /// Maps (process_id, file_descriptor) to (source_resource, destination_resource) pairs
94 resource_map: Arc<ResourceMap>,
95 /// Maps flow_id to (source_resource, destination_resource) pairs for active flows
96 flow_map: Arc<FlowMap>,
97 /// Service for managing flows sequencing
98 sequencer: S,
99 /// Service for tracking resources provenance
100 provenance: P,
101 /// Service for policy management and compliance checking
102 compliance: C,
103 /// Client service for Middleware-to-Middleware communication
104 m2m: M,
105 /// Whether to perform resource validation on incoming requests
106 enable_resource_validation: bool,
107}
108
109impl<S, P, C, M> P2mApiService<S, P, C, M> {
110 /// Creates a new P2M API service with the provided component services.
111 ///
112 /// Initializes empty resource and flow maps and stores references to the
113 /// core services needed for traceability operations. The service is ready
114 /// to handle process requests immediately after construction.
115 ///
116 /// # Arguments
117 /// * `sequencer` - Service for flow coordination and resource reservations
118 /// * `provenance` - Service for provenance tracking
119 /// * `compliance` - Service for policy evaluation and authorization decisions
120 /// * `m2m` - Client for communication with remote middleware instances
121 pub fn new(sequencer: S, provenance: P, compliance: C, m2m: M) -> Self {
122 Self {
123 resource_map: Arc::new(ResourceMap::new()),
124 flow_map: Arc::new(FlowMap::new()),
125 sequencer,
126 provenance,
127 compliance,
128 m2m,
129 enable_resource_validation: false,
130 }
131 }
132
133 /// Enables or disables resource validation for incoming P2M requests.
134 ///
135 /// When validation is enabled, all incoming requests are validated for:
136 /// - Valid process IDs (must correspond to running processes)
137 /// - Valid stream addresses (must be well-formed and compatible)
138 ///
139 /// This method uses the same ResourceValidator logic as the Tower filter
140 /// but integrates it directly into the service to avoid complex Send/Sync
141 /// constraints with async runtimes.
142 ///
143 /// # Arguments
144 /// * `enable` - Whether to enable resource validation
145 ///
146 /// # Returns
147 /// Self with validation setting applied
148 pub fn with_enrolled_resource(
149 self,
150 pid: i32,
151 fd: i32,
152 source: Resource,
153 destination: Resource,
154 ) -> Self {
155 self.resource_map.insert((pid, fd), (source, destination));
156 self
157 }
158
159 pub fn with_resource_validation(mut self, enable: bool) -> Self {
160 self.enable_resource_validation = enable;
161 self
162 }
163
164 /// Validates a P2M request according to resource requirements.
165 ///
166 /// Applies the same validation rules as the ResourceValidator:
167 /// - `RemoteEnroll`: Validates both process and stream resources
168 /// - `LocalEnroll`, `IoRequest`: Validates process resources only
169 /// - `IoReport`: Passes through without validation (grant ID is validated later)
170 ///
171 /// # Arguments
172 /// * `request` - The P2M request to validate
173 ///
174 /// # Returns
175 /// `Ok(())` if validation passes, `Err(TraceabilityError)` if validation fails
176 ///
177 /// # Errors
178 /// - `InvalidProcess`: When the process ID is not found or accessible
179 /// - `InvalidStream`: When socket addresses are malformed or incompatible
180 fn validate_request(request: &P2mRequest) -> Result<&P2mRequest, TraceabilityError> {
181 // Use the same validation logic as the ResourceValidator
182 match request {
183 P2mRequest::RemoteEnroll { pid, local_socket, peer_socket, .. } => {
184 if ResourceValidator.is_valid_process(*pid) {
185 if ResourceValidator.is_valid_stream(local_socket, peer_socket) {
186 Ok(request)
187 } else {
188 Err(TraceabilityError::InvalidStream(
189 local_socket.clone(),
190 peer_socket.clone(),
191 ))
192 }
193 } else {
194 Err(TraceabilityError::InvalidProcess(*pid))
195 }
196 }
197 P2mRequest::LocalEnroll { pid, .. } | P2mRequest::IoRequest { pid, .. } => {
198 if ResourceValidator.is_valid_process(*pid) {
199 Ok(request)
200 } else {
201 Err(TraceabilityError::InvalidProcess(*pid))
202 }
203 }
204 P2mRequest::IoReport { .. } => Ok(request),
205 }
206 }
207
208 fn flow_id() -> u128 {
209 SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_nanos()
210 }
211}
212
213impl<S, P, C, M> Service<P2mRequest> for P2mApiService<S, P, C, M>
214where
215 S: Service<SequencerRequest, Response = SequencerResponse, Error = TraceabilityError>
216 + Clone
217 + Send
218 + 'static,
219 S::Future: Send,
220 P: Service<ProvenanceRequest, Response = ProvenanceResponse, Error = TraceabilityError>
221 + Clone
222 + Send
223 + NodeId
224 + 'static,
225 P::Future: Send,
226 C: Service<ComplianceRequest, Response = ComplianceResponse, Error = TraceabilityError>
227 + Clone
228 + Send
229 + 'static,
230 C::Future: Send,
231 M: Service<M2mRequest, Response = M2mResponse, Error = TraceabilityError>
232 + Clone
233 + Send
234 + 'static,
235 M::Future: Send,
236{
237 type Response = P2mResponse;
238 type Error = TraceabilityError;
239 type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
240
241 fn poll_ready(&mut self, _: &mut std::task::Context<'_>) -> Poll<Result<(), Self::Error>> {
242 Poll::Ready(Ok(()))
243 }
244
245 fn call(&mut self, request: P2mRequest) -> Self::Future {
246 let resource_map = self.resource_map.clone();
247 let flow_map = self.flow_map.clone();
248 let mut sequencer = self.sequencer.clone();
249 let mut provenance = self.provenance.clone();
250 let mut compliance = self.compliance.clone();
251 let mut m2m = self.m2m.clone();
252 let enable_validation = self.enable_resource_validation;
253
254 Box::pin(async move {
255 // Perform resource validation if enabled
256 if enable_validation {
257 Self::validate_request(&request)?;
258 }
259
260 match request {
261 P2mRequest::LocalEnroll { pid, fd, path } => {
262 info!(
263 node_id = %provenance.node_id(),
264 pid = %pid,
265 fd = %fd,
266 path = %path,
267 "[p2m] LocalEnroll"
268 );
269 resource_map
270 .insert((pid, fd), (Resource::new_process(pid), Resource::new_file(path)));
271 Ok(P2mResponse::Ack)
272 }
273 P2mRequest::RemoteEnroll { pid, fd, local_socket, peer_socket } => {
274 info!(
275 node_id = %provenance.node_id(),
276 pid = %pid,
277 fd = %fd,
278 local_socket = %local_socket,
279 peer_socket = %peer_socket,
280 "[p2m] RemoteEnroll"
281 );
282 resource_map.insert(
283 (pid, fd),
284 (
285 Resource::new_process(pid),
286 Resource::new_stream(local_socket, peer_socket),
287 ),
288 );
289 Ok(P2mResponse::Ack)
290 }
291 P2mRequest::IoRequest { pid, fd, output } => {
292 if let Some(resource) = resource_map.get(&(pid, fd)) {
293 let (source, destination) = if output {
294 (resource.0.to_owned(), resource.1.to_owned())
295 } else {
296 (resource.1.to_owned(), resource.0.to_owned())
297 };
298 info!(
299 node_id = %provenance.node_id(),
300 source = %source,
301 destination = %destination,
302 "[p2m] IoRequest"
303 );
304 match sequencer
305 .call(SequencerRequest::ReserveFlow {
306 source: source.clone(),
307 destination: destination.clone(),
308 })
309 .await
310 {
311 Ok(SequencerResponse::FlowReserved) => {
312 let localized_destination =
313 destination.clone().into_localized(provenance.node_id());
314 let destination_policy = if localized_destination
315 .resource()
316 .is_stream()
317 {
318 debug!(
319 node_id = %provenance.node_id(),
320 destination = %localized_destination,
321 "[p2m] Querying destination policy for remote stream"
322 );
323 match m2m
324 .ready()
325 .await?
326 .call(M2mRequest::GetDestinationPolicy(
327 localized_destination.clone(),
328 ))
329 .await
330 {
331 Ok(M2mResponse::DestinationPolicy(policy)) => Some(policy),
332 _ => None, // anyway, errors are handled later, but this may be improved
333 }
334 } else {
335 None
336 };
337 let flow_id = match provenance
338 .call(ProvenanceRequest::GetReferences(source.clone()))
339 .await
340 {
341 Ok(ProvenanceResponse::Provenance(references)) => {
342 let (local_references, remote_references): (
343 HashSet<_>,
344 HashSet<_>,
345 ) = references
346 .into_iter()
347 .partition(|r| *r.node_id() == provenance.node_id());
348 match compliance
349 .call(ComplianceRequest::EvalCompliance {
350 sources: local_references
351 .iter()
352 .map(|r| r.resource().to_owned())
353 .collect(),
354 destination: localized_destination.clone(),
355 destination_policy: destination_policy.clone(),
356 })
357 .await
358 {
359 Ok(ComplianceResponse::Grant) => {
360 // Local compliance check passed. If we have no remote references, we can
361 // grant the flow, otherwise we need to check the compliance of the remote nodes.
362 // Destination policy is required for remote sources compliance checking.
363 if remote_references.is_empty() {
364 debug!(
365 node_id = %provenance.node_id(),
366 "[p2m] Local compliance check passed, granting flow"
367 );
368 Ok(Self::flow_id())
369 } else {
370 // It the destination is not a stream, so it is a local resource, we can get the policy from the compliance service
371 // This could have been done earlier, but we do it here, to make this call only when it is really needed.
372 let destination_policy = if let Some(policy) =
373 destination_policy
374 {
375 policy
376 } else if !destination.is_stream() {
377 match compliance.call(ComplianceRequest::GetPolicy(destination.clone())).await {
378 Ok(ComplianceResponse::Policy(policy)) => policy,
379 _ => return Err(TraceabilityError::InternalTrace2eError),
380 }
381 } else {
382 return Err(
383 TraceabilityError::InternalTrace2eError,
384 );
385 };
386 debug!(
387 node_id = %provenance.node_id(),
388 "[p2m] Querying remote sources compliance"
389 );
390 match m2m
391 .ready()
392 .await?
393 .call(M2mRequest::CheckSourceCompliance {
394 sources: remote_references,
395 destination: (
396 localized_destination,
397 destination_policy,
398 ),
399 })
400 .await
401 {
402 Ok(M2mResponse::Ack) => {
403 // Remote sources compliance check passed
404 // Flow can be granted, return the flow id
405 Ok(Self::flow_id())
406 }
407 Err(e) => Err(e),
408 _ => Err(
409 TraceabilityError::InternalTrace2eError,
410 ),
411 }
412 }
413 }
414 Err(e) => Err(e),
415 _ => Err(TraceabilityError::InternalTrace2eError),
416 }
417 }
418 Err(e) => Err(e),
419 _ => Err(TraceabilityError::InternalTrace2eError),
420 };
421 match flow_id {
422 Ok(flow_id) => {
423 // Compliance check passed, flow can be granted, return the flow id
424 flow_map.insert(flow_id, (source, destination));
425 Ok(P2mResponse::Grant(flow_id))
426 }
427 Err(e) => {
428 debug!(
429 node_id = %provenance.node_id(),
430 error = ?e,
431 "[p2m] Compliance check failed, releasing flow"
432 );
433 // release the flow, and then forward the error
434 sequencer
435 .call(SequencerRequest::ReleaseFlow { destination })
436 .await?;
437 Err(e)
438 }
439 }
440 }
441 _ => Err(TraceabilityError::InternalTrace2eError),
442 }
443 } else {
444 Err(TraceabilityError::UndeclaredResource(pid, fd))
445 }
446 }
447 P2mRequest::IoReport { grant_id, .. } => {
448 if let Some((_, (source, destination))) = flow_map.remove(&grant_id) {
449 info!(
450 node_id = %provenance.node_id(),
451 source = %source,
452 destination = %destination,
453 "[p2m] IoReport"
454 );
455 if let Some(remote_stream) = destination.try_into_localized_peer_stream() {
456 match provenance
457 .call(ProvenanceRequest::GetReferences(source.clone()))
458 .await?
459 {
460 ProvenanceResponse::Provenance(references) => {
461 debug!(
462 remote_node_id = remote_stream.node_id(),
463 "[p2m] Updating remote provenance"
464 );
465 m2m.ready()
466 .await?
467 .call(M2mRequest::UpdateProvenance {
468 source_prov: references,
469 destination: remote_stream,
470 })
471 .await?;
472 }
473 _ => return Err(TraceabilityError::InternalTrace2eError),
474 };
475 } else {
476 info!(
477 node_id = %provenance.node_id(),
478 "[p2m] Updating local provenance"
479 );
480 provenance
481 .call(ProvenanceRequest::UpdateProvenance {
482 source,
483 destination: destination.clone(),
484 })
485 .await?;
486 }
487
488 sequencer.call(SequencerRequest::ReleaseFlow { destination }).await?;
489 Ok(P2mResponse::Ack)
490 } else {
491 Err(TraceabilityError::NotFoundFlow(grant_id))
492 }
493 }
494 }
495 })
496 }
497}
498
499#[cfg(test)]
500mod tests {
501 use tower::Service;
502
503 use super::*;
504 use crate::{
505 traceability::services::{
506 compliance::ComplianceService, provenance::ProvenanceService,
507 sequencer::SequencerService,
508 },
509 transport::nop::M2mNop,
510 };
511
512 #[tokio::test]
513 async fn unit_trace2e_service_request_response() {
514 crate::trace2e_tracing::init();
515 let mut p2m_service = P2mApiService::new(
516 SequencerService::default(),
517 ProvenanceService::default(),
518 ComplianceService::default(),
519 M2mNop,
520 );
521
522 assert_eq!(
523 p2m_service
524 .call(P2mRequest::LocalEnroll { pid: 1, fd: 3, path: "/tmp/test.txt".to_string() })
525 .await
526 .unwrap(),
527 P2mResponse::Ack
528 );
529 assert_eq!(
530 p2m_service
531 .call(P2mRequest::RemoteEnroll {
532 pid: 1,
533 fd: 3,
534 local_socket: "127.0.0.1:8080".to_string(),
535 peer_socket: "127.0.0.1:8081".to_string()
536 })
537 .await
538 .unwrap(),
539 P2mResponse::Ack
540 );
541
542 let P2mResponse::Grant(flow_id) =
543 p2m_service.call(P2mRequest::IoRequest { pid: 1, fd: 3, output: true }).await.unwrap()
544 else {
545 panic!("Expected P2mResponse::Grant");
546 };
547 assert_eq!(
548 p2m_service
549 .call(P2mRequest::IoReport { pid: 1, fd: 3, grant_id: flow_id, result: true })
550 .await
551 .unwrap(),
552 P2mResponse::Ack
553 );
554
555 let P2mResponse::Grant(flow_id) =
556 p2m_service.call(P2mRequest::IoRequest { pid: 1, fd: 3, output: false }).await.unwrap()
557 else {
558 panic!("Expected P2mResponse::Grant");
559 };
560 assert_eq!(
561 p2m_service
562 .call(P2mRequest::IoReport { pid: 1, fd: 3, grant_id: flow_id, result: true })
563 .await
564 .unwrap(),
565 P2mResponse::Ack
566 );
567 }
568
569 #[tokio::test]
570 async fn unit_trace2e_service_validated_resources() {
571 crate::trace2e_tracing::init();
572 let mut p2m_service = P2mApiService::new(
573 SequencerService::default(),
574 ProvenanceService::default(),
575 ComplianceService::default(),
576 M2mNop,
577 )
578 .with_resource_validation(true);
579
580 // Test with invalid process
581 // This request is supposed to be filtered out by the validator
582 assert_eq!(
583 p2m_service
584 .call(P2mRequest::LocalEnroll { pid: 0, fd: 3, path: "/tmp/test.txt".to_string() })
585 .await
586 .unwrap_err()
587 .to_string(),
588 "Traceability error, process not found (pid: 0)"
589 );
590
591 // Test successful process instantiation with validation
592 assert_eq!(
593 p2m_service
594 .call(P2mRequest::LocalEnroll {
595 pid: std::process::id() as i32,
596 fd: 3,
597 path: "/tmp/test.txt".to_string()
598 })
599 .await
600 .unwrap(),
601 P2mResponse::Ack
602 );
603 }
604
605 #[tokio::test]
606 async fn unit_trace2e_service_io_invalid_request() {
607 crate::trace2e_tracing::init();
608 let mut p2m_service = P2mApiService::new(
609 SequencerService::default(),
610 ProvenanceService::default(),
611 ComplianceService::default(),
612 M2mNop,
613 )
614 .with_resource_validation(true);
615
616 // Neither process nor fd are enrolled
617 assert_eq!(
618 p2m_service
619 .call(P2mRequest::IoRequest { pid: std::process::id() as i32, fd: 3, output: true })
620 .await
621 .unwrap_err()
622 .to_string(),
623 format!(
624 "Traceability error, undeclared resource (pid: {}, fd: 3)",
625 std::process::id() as i32
626 )
627 );
628
629 p2m_service
630 .call(P2mRequest::LocalEnroll {
631 pid: std::process::id() as i32,
632 fd: 4,
633 path: "/tmp/test.txt".to_string(),
634 })
635 .await
636 .unwrap();
637
638 // Only process is enrolled
639 assert_eq!(
640 p2m_service
641 .call(P2mRequest::IoRequest { pid: std::process::id() as i32, fd: 3, output: true })
642 .await
643 .unwrap_err()
644 .to_string(),
645 format!(
646 "Traceability error, undeclared resource (pid: {}, fd: 3)",
647 std::process::id() as i32
648 )
649 );
650 }
651
652 #[tokio::test]
653 async fn unit_trace2e_service_io_invalid_report() {
654 crate::trace2e_tracing::init();
655 let mut p2m_service = P2mApiService::new(
656 SequencerService::default(),
657 ProvenanceService::default(),
658 ComplianceService::default(),
659 M2mNop,
660 )
661 .with_resource_validation(true);
662
663 // Invalid grant id
664 assert_eq!(
665 p2m_service
666 .call(P2mRequest::IoReport { pid: 1, fd: 3, grant_id: 0, result: true })
667 .await
668 .unwrap_err()
669 .to_string(),
670 "Traceability error, flow not found (id: 0)"
671 );
672 }
673
674 #[tokio::test]
675 async fn unit_trace2e_service_integrated_validation() {
676 crate::trace2e_tracing::init();
677
678 // Test P2M service with integrated validation enabled
679 let mut p2m_service_with_validation = P2mApiService::new(
680 SequencerService::default(),
681 ProvenanceService::default(),
682 ComplianceService::default(),
683 M2mNop,
684 )
685 .with_resource_validation(true);
686
687 // Test P2M service with validation disabled
688 let mut p2m_service_without_validation = P2mApiService::new(
689 SequencerService::default(),
690 ProvenanceService::default(),
691 ComplianceService::default(),
692 M2mNop,
693 )
694 .with_resource_validation(false);
695
696 // Test invalid process - should fail with validation enabled
697 assert_eq!(
698 p2m_service_with_validation
699 .call(P2mRequest::LocalEnroll { pid: 0, fd: 3, path: "/tmp/test.txt".to_string() })
700 .await
701 .unwrap_err()
702 .to_string(),
703 "Traceability error, process not found (pid: 0)"
704 );
705
706 // Test invalid process - should succeed with validation disabled
707 assert_eq!(
708 p2m_service_without_validation
709 .call(P2mRequest::LocalEnroll { pid: 0, fd: 3, path: "/tmp/test.txt".to_string() })
710 .await
711 .unwrap(),
712 P2mResponse::Ack
713 );
714
715 // Test valid process - should succeed with validation enabled
716 assert_eq!(
717 p2m_service_with_validation
718 .call(P2mRequest::LocalEnroll {
719 pid: std::process::id() as i32,
720 fd: 3,
721 path: "/tmp/test.txt".to_string()
722 })
723 .await
724 .unwrap(),
725 P2mResponse::Ack
726 );
727
728 // Test valid process - should succeed with validation disabled
729 assert_eq!(
730 p2m_service_without_validation
731 .call(P2mRequest::LocalEnroll {
732 pid: std::process::id() as i32,
733 fd: 3,
734 path: "/tmp/test.txt".to_string()
735 })
736 .await
737 .unwrap(),
738 P2mResponse::Ack
739 );
740 }
741}