trace2e_core/traceability/api/
p2m.rs

1//! Process-to-Middleware (P2M) API service implementation.
2//!
3//! This module provides the core service implementation for handling requests from application
4//! processes that need traceability tracking for their I/O operations. The P2M API is the
5//! primary interface through which applications integrate with the trace2e system.
6//!
7//! ## Service Architecture
8//!
9//! The `P2mApiService` acts as the central coordinator between four key services:
10//! - **Sequencer Service**: Manages flow ordering and resource reservations
11//! - **Provenance Service**: Tracks data provenance  
12//! - **Compliance Service**: Enforces policies and authorization decisions
13//! - **M2M Client**: Communicates with remote middleware for distributed flows
14//!
15//! ## Resource Management
16//!
17//! The service maintains two primary data structures:
18//! - **Resource Map**: Associates process/file descriptor pairs with source/destination resources
19//! - **Flow Map**: Tracks active flows by grant ID for operation completion reporting
20//!
21//! ## Operation Workflow
22//!
23//! 1. **Enrollment**: Processes register their files and streams before use
24//! 2. **Authorization**: Processes request permission for specific I/O operations
25//! 3. **Execution**: Middleware evaluates policies and grants/denies access
26//! 4. **Reporting**: Processes report completion status for audit trails
27//!
28//! ## Cross-Node Coordination
29//!
30//! For distributed flows involving remote resources, the service coordinates with
31//! remote middleware instances via the M2M API to ensure consistent policy
32//! enforcement and provenance tracking across the network.
33
34use std::{
35    collections::HashSet, future::Future, pin::Pin, sync::Arc, task::Poll, time::SystemTime,
36};
37
38use dashmap::DashMap;
39use tower::{Service, ServiceExt};
40use tracing::{debug, info};
41
42use crate::traceability::{
43    api::types::{
44        ComplianceRequest, ComplianceResponse, M2mRequest, M2mResponse, P2mRequest, P2mResponse,
45        ProvenanceRequest, ProvenanceResponse, SequencerRequest, SequencerResponse,
46    },
47    error::TraceabilityError,
48    infrastructure::{
49        naming::{NodeId, Resource},
50        validation::ResourceValidator,
51    },
52};
53
54/// Maps (process_id, file_descriptor) to (source_resource, destination_resource) pairs
55type ResourceMap = DashMap<(i32, i32), (Resource, Resource)>;
56/// Maps flow_id to (source_resource, destination_resource) pairs for active flows
57type FlowMap = DashMap<u128, (Resource, Resource)>;
58
59/// P2M (Process-to-Middleware) API Service.
60///
61/// Central orchestrator for process-initiated traceability operations. This service
62/// manages the complete lifecycle of tracked I/O operations from initial resource
63/// enrollment through final completion reporting.
64///
65/// ## Core Responsibilities
66///
67/// **Resource Enrollment**: Maintains registry of process file descriptors and their
68/// associated resources (files or network streams) for traceability tracking.
69///
70/// **Flow Authorization**: Coordinates with compliance and sequencer services to
71/// evaluate whether requested I/O operations should be permitted based on current policies.
72///
73/// **Distributed Coordination**: Communicates with remote middleware instances for
74/// cross-node flows, ensuring consistent policy enforcement across the network.
75///
76/// **Provenance Tracking**: Updates provenance records following successful operations
77/// to maintain complete audit trails for compliance and governance.
78///
79/// ## Concurrency and State Management
80///
81/// Uses concurrent data structures (`DashMap`) to handle multiple simultaneous requests
82/// from different processes while maintaining consistency. Resource and flow maps are
83/// shared across service instances using `Arc` for efficient cloning.
84///
85/// ## Generic Type Parameters
86///
87/// - `S`: Sequencer service for flow coordination and resource reservations
88/// - `P`: Provenance service for provenance tracking
89/// - `C`: Compliance service for policy evaluation and authorization decisions  
90/// - `M`: M2M client service for communication with remote middleware instances
91#[derive(Debug, Clone)]
92pub struct P2mApiService<S, P, C, M> {
93    /// Maps (process_id, file_descriptor) to (source_resource, destination_resource) pairs
94    resource_map: Arc<ResourceMap>,
95    /// Maps flow_id to (source_resource, destination_resource) pairs for active flows
96    flow_map: Arc<FlowMap>,
97    /// Service for managing flows sequencing
98    sequencer: S,
99    /// Service for tracking resources provenance
100    provenance: P,
101    /// Service for policy management and compliance checking
102    compliance: C,
103    /// Client service for Middleware-to-Middleware communication
104    m2m: M,
105    /// Whether to perform resource validation on incoming requests
106    enable_resource_validation: bool,
107}
108
109impl<S, P, C, M> P2mApiService<S, P, C, M> {
110    /// Creates a new P2M API service with the provided component services.
111    ///
112    /// Initializes empty resource and flow maps and stores references to the
113    /// core services needed for traceability operations. The service is ready
114    /// to handle process requests immediately after construction.
115    ///
116    /// # Arguments
117    /// * `sequencer` - Service for flow coordination and resource reservations
118    /// * `provenance` - Service for provenance tracking
119    /// * `compliance` - Service for policy evaluation and authorization decisions
120    /// * `m2m` - Client for communication with remote middleware instances
121    pub fn new(sequencer: S, provenance: P, compliance: C, m2m: M) -> Self {
122        Self {
123            resource_map: Arc::new(ResourceMap::new()),
124            flow_map: Arc::new(FlowMap::new()),
125            sequencer,
126            provenance,
127            compliance,
128            m2m,
129            enable_resource_validation: false,
130        }
131    }
132
133    /// Pre-enrolls resources for testing and simulation purposes.
134    ///
135    /// Creates mock enrollments for the specified number of processes, files, and streams
136    /// to support testing scenarios without requiring actual process interactions.
137    /// Should only be used in test environments or for system benchmarking.
138    ///
139    /// # Arguments
140    /// * `process_count` - Number of mock processes to enroll
141    /// * `per_process_file_count` - Number of files to enroll per process
142    /// * `per_process_stream_count` - Number of streams to enroll per process
143    ///
144    /// # Returns
145    /// The service instance with pre-enrolled mock resources
146    #[cfg(test)]
147    pub fn with_enrolled_resources(
148        self,
149        process_count: u32,
150        per_process_file_count: u32,
151        per_process_stream_count: u32,
152    ) -> Self {
153        // Pre-calculate all entries to avoid repeated allocations during insertion
154        let file_entries: Vec<_> = (0..process_count as i32)
155            .flat_map(|process_id| {
156                (3..(per_process_file_count + 3) as i32).map(move |file_id| {
157                    (
158                        (process_id, file_id),
159                        (
160                            Resource::new_process_mock(process_id),
161                            Resource::new_file(format!(
162                                "/file_{}",
163                                (process_id + file_id) % process_count as i32
164                            )),
165                        ),
166                    )
167                })
168            })
169            .collect();
170        let stream_entries: Vec<_> = (0..process_count as i32)
171            .flat_map(|process_id| {
172                ((per_process_file_count + 3) as i32
173                    ..(per_process_stream_count + per_process_file_count + 3) as i32)
174                    .map(move |stream_id| {
175                        (
176                            (process_id, stream_id),
177                            (
178                                Resource::new_process_mock(process_id),
179                                Resource::new_stream(
180                                    format!("127.0.0.1:{stream_id}",),
181                                    format!("127.0.0.2:{stream_id}",),
182                                ),
183                            ),
184                        )
185                    })
186            })
187            .collect();
188
189        // Batch insert all entries at once using DashMap's concurrent insert capabilities
190        for (key, value) in file_entries.into_iter().chain(stream_entries.into_iter()) {
191            self.resource_map.insert(key, value);
192        }
193        self
194    }
195
196    /// Enables or disables resource validation for incoming P2M requests.
197    ///
198    /// When validation is enabled, all incoming requests are validated for:
199    /// - Valid process IDs (must correspond to running processes)
200    /// - Valid stream addresses (must be well-formed and compatible)
201    ///
202    /// This method uses the same ResourceValidator logic as the Tower filter
203    /// but integrates it directly into the service to avoid complex Send/Sync
204    /// constraints with async runtimes.
205    ///
206    /// # Arguments
207    /// * `enable` - Whether to enable resource validation
208    ///
209    /// # Returns
210    /// Self with validation setting applied
211    pub fn with_resource_validation(mut self, enable: bool) -> Self {
212        self.enable_resource_validation = enable;
213        self
214    }
215
216    /// Validates a P2M request according to resource requirements.
217    ///
218    /// Applies the same validation rules as the ResourceValidator:
219    /// - `RemoteEnroll`: Validates both process and stream resources
220    /// - `LocalEnroll`, `IoRequest`: Validates process resources only  
221    /// - `IoReport`: Passes through without validation (grant ID is validated later)
222    ///
223    /// # Arguments
224    /// * `request` - The P2M request to validate
225    ///
226    /// # Returns
227    /// `Ok(())` if validation passes, `Err(TraceabilityError)` if validation fails
228    ///
229    /// # Errors
230    /// - `InvalidProcess`: When the process ID is not found or accessible
231    /// - `InvalidStream`: When socket addresses are malformed or incompatible
232    fn validate_request(request: &P2mRequest) -> Result<&P2mRequest, TraceabilityError> {
233        // Use the same validation logic as the ResourceValidator
234        match request {
235            P2mRequest::RemoteEnroll { pid, local_socket, peer_socket, .. } => {
236                if ResourceValidator.is_valid_process(*pid) {
237                    if ResourceValidator.is_valid_stream(local_socket, peer_socket) {
238                        Ok(request)
239                    } else {
240                        Err(TraceabilityError::InvalidStream(
241                            local_socket.clone(),
242                            peer_socket.clone(),
243                        ))
244                    }
245                } else {
246                    Err(TraceabilityError::InvalidProcess(*pid))
247                }
248            }
249            P2mRequest::LocalEnroll { pid, .. } | P2mRequest::IoRequest { pid, .. } => {
250                if ResourceValidator.is_valid_process(*pid) {
251                    Ok(request)
252                } else {
253                    Err(TraceabilityError::InvalidProcess(*pid))
254                }
255            }
256            P2mRequest::IoReport { .. } => Ok(request),
257        }
258    }
259
260    fn flow_id() -> u128 {
261        SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_nanos()
262    }
263}
264
265impl<S, P, C, M> Service<P2mRequest> for P2mApiService<S, P, C, M>
266where
267    S: Service<SequencerRequest, Response = SequencerResponse, Error = TraceabilityError>
268        + Clone
269        + Send
270        + 'static,
271    S::Future: Send,
272    P: Service<ProvenanceRequest, Response = ProvenanceResponse, Error = TraceabilityError>
273        + Clone
274        + Send
275        + NodeId
276        + 'static,
277    P::Future: Send,
278    C: Service<ComplianceRequest, Response = ComplianceResponse, Error = TraceabilityError>
279        + Clone
280        + Send
281        + 'static,
282    C::Future: Send,
283    M: Service<M2mRequest, Response = M2mResponse, Error = TraceabilityError>
284        + Clone
285        + Send
286        + 'static,
287    M::Future: Send,
288{
289    type Response = P2mResponse;
290    type Error = TraceabilityError;
291    type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
292
293    fn poll_ready(&mut self, _: &mut std::task::Context<'_>) -> Poll<Result<(), Self::Error>> {
294        Poll::Ready(Ok(()))
295    }
296
297    fn call(&mut self, request: P2mRequest) -> Self::Future {
298        let resource_map = self.resource_map.clone();
299        let flow_map = self.flow_map.clone();
300        let mut sequencer = self.sequencer.clone();
301        let mut provenance = self.provenance.clone();
302        let mut compliance = self.compliance.clone();
303        let mut m2m = self.m2m.clone();
304        let enable_validation = self.enable_resource_validation;
305
306        Box::pin(async move {
307            // Perform resource validation if enabled
308            if enable_validation {
309                Self::validate_request(&request)?;
310            }
311
312            match request {
313                P2mRequest::LocalEnroll { pid, fd, path } => {
314                    info!(
315                        node_id = %provenance.node_id(),
316                        pid = %pid,
317                        fd = %fd,
318                        path = %path,
319                        "[p2m] LocalEnroll"
320                    );
321                    resource_map
322                        .insert((pid, fd), (Resource::new_process(pid), Resource::new_file(path)));
323                    Ok(P2mResponse::Ack)
324                }
325                P2mRequest::RemoteEnroll { pid, fd, local_socket, peer_socket } => {
326                    info!(
327                        node_id = %provenance.node_id(),
328                        pid = %pid,
329                        fd = %fd,
330                        local_socket = %local_socket,
331                        peer_socket = %peer_socket,
332                        "[p2m] RemoteEnroll"
333                    );
334                    resource_map.insert(
335                        (pid, fd),
336                        (
337                            Resource::new_process(pid),
338                            Resource::new_stream(local_socket, peer_socket),
339                        ),
340                    );
341                    Ok(P2mResponse::Ack)
342                }
343                P2mRequest::IoRequest { pid, fd, output } => {
344                    if let Some(resource) = resource_map.get(&(pid, fd)) {
345                        let (source, destination) = if output {
346                            (resource.0.to_owned(), resource.1.to_owned())
347                        } else {
348                            (resource.1.to_owned(), resource.0.to_owned())
349                        };
350                        info!(
351                            node_id = %provenance.node_id(),
352                            source = %source,
353                            destination = %destination,
354                            "[p2m] IoRequest"
355                        );
356                        match sequencer
357                            .call(SequencerRequest::ReserveFlow {
358                                source: source.clone(),
359                                destination: destination.clone(),
360                            })
361                            .await
362                        {
363                            Ok(SequencerResponse::FlowReserved) => {
364                                let localized_destination =
365                                    destination.clone().into_localized(provenance.node_id());
366                                let destination_policy = if localized_destination
367                                    .resource()
368                                    .is_stream()
369                                {
370                                    debug!(
371                                        node_id = %provenance.node_id(),
372                                        destination = %localized_destination,
373                                        "[p2m] Querying destination policy for remote stream"
374                                    );
375                                    match m2m
376                                        .ready()
377                                        .await?
378                                        .call(M2mRequest::GetDestinationPolicy(
379                                            localized_destination.clone(),
380                                        ))
381                                        .await
382                                    {
383                                        Ok(M2mResponse::DestinationPolicy(policy)) => Some(policy),
384                                        _ => None, // anyway, errors are handled later, but this may be improved
385                                    }
386                                } else {
387                                    None
388                                };
389                                let flow_id = match provenance
390                                    .call(ProvenanceRequest::GetReferences(source.clone()))
391                                    .await
392                                {
393                                    Ok(ProvenanceResponse::Provenance(references)) => {
394                                        let (local_references, remote_references): (
395                                            HashSet<_>,
396                                            HashSet<_>,
397                                        ) = references
398                                            .into_iter()
399                                            .partition(|r| *r.node_id() == provenance.node_id());
400                                        match compliance
401                                            .call(ComplianceRequest::EvalCompliance {
402                                                sources: local_references
403                                                    .iter()
404                                                    .map(|r| r.resource().to_owned())
405                                                    .collect(),
406                                                destination: localized_destination.clone(),
407                                                destination_policy: destination_policy.clone(),
408                                            })
409                                            .await
410                                        {
411                                            Ok(ComplianceResponse::Grant) => {
412                                                // Local compliance check passed. If we have no remote references, we can
413                                                // grant the flow, otherwise we need to check the compliance of the remote nodes.
414                                                // Destination policy is required for remote sources compliance checking.
415                                                if remote_references.is_empty() {
416                                                    debug!(
417                                                        node_id = %provenance.node_id(),
418                                                        "[p2m] Local compliance check passed, granting flow"
419                                                    );
420                                                    Ok(Self::flow_id())
421                                                } else {
422                                                    // It the destination is not a stream, so it is a local resource, we can get the policy from the compliance service
423                                                    // This could have been done earlier, but we do it here, to make this call only when it is really needed.
424                                                    let destination_policy = if let Some(policy) =
425                                                        destination_policy
426                                                    {
427                                                        policy
428                                                    } else if !destination.is_stream() {
429                                                        match compliance.call(ComplianceRequest::GetPolicy(destination.clone())).await {
430                                                            Ok(ComplianceResponse::Policy(policy)) => policy,
431                                                            _ => return Err(TraceabilityError::InternalTrace2eError),
432                                                        }
433                                                    } else {
434                                                        return Err(
435                                                            TraceabilityError::InternalTrace2eError,
436                                                        );
437                                                    };
438                                                    debug!(
439                                                        node_id = %provenance.node_id(),
440                                                        "[p2m] Querying remote sources compliance"
441                                                    );
442                                                    match m2m
443                                                        .ready()
444                                                        .await?
445                                                        .call(M2mRequest::CheckSourceCompliance {
446                                                            sources: remote_references,
447                                                            destination: (
448                                                                localized_destination,
449                                                                destination_policy,
450                                                            ),
451                                                        })
452                                                        .await
453                                                    {
454                                                        Ok(M2mResponse::Ack) => {
455                                                            // Remote sources compliance check passed
456                                                            // Flow can be granted, return the flow id
457                                                            Ok(Self::flow_id())
458                                                        }
459                                                        Err(e) => Err(e),
460                                                        _ => Err(
461                                                            TraceabilityError::InternalTrace2eError,
462                                                        ),
463                                                    }
464                                                }
465                                            }
466                                            Err(e) => Err(e),
467                                            _ => Err(TraceabilityError::InternalTrace2eError),
468                                        }
469                                    }
470                                    Err(e) => Err(e),
471                                    _ => Err(TraceabilityError::InternalTrace2eError),
472                                };
473                                match flow_id {
474                                    Ok(flow_id) => {
475                                        // Compliance check passed, flow can be granted, return the flow id
476                                        flow_map.insert(flow_id, (source, destination));
477                                        Ok(P2mResponse::Grant(flow_id))
478                                    }
479                                    Err(e) => {
480                                        debug!(
481                                            node_id = %provenance.node_id(),
482                                            error = ?e,
483                                            "[p2m] Compliance check failed, releasing flow"
484                                        );
485                                        // release the flow, and then forward the error
486                                        sequencer
487                                            .call(SequencerRequest::ReleaseFlow { destination })
488                                            .await?;
489                                        Err(e)
490                                    }
491                                }
492                            }
493                            _ => Err(TraceabilityError::InternalTrace2eError),
494                        }
495                    } else {
496                        Err(TraceabilityError::UndeclaredResource(pid, fd))
497                    }
498                }
499                P2mRequest::IoReport { grant_id, .. } => {
500                    if let Some((_, (source, destination))) = flow_map.remove(&grant_id) {
501                        info!(
502                            node_id = %provenance.node_id(),
503                            source = %source,
504                            destination = %destination,
505                            "[p2m] IoReport"
506                        );
507                        if let Some(remote_stream) = destination.try_into_localized_peer_stream() {
508                            match provenance
509                                .call(ProvenanceRequest::GetReferences(source.clone()))
510                                .await?
511                            {
512                                ProvenanceResponse::Provenance(references) => {
513                                    m2m.ready()
514                                        .await?
515                                        .call(M2mRequest::UpdateProvenance {
516                                            source_prov: references,
517                                            destination: remote_stream,
518                                        })
519                                        .await?;
520                                }
521                                _ => return Err(TraceabilityError::InternalTrace2eError),
522                            };
523                        }
524
525                        provenance
526                            .call(ProvenanceRequest::UpdateProvenance {
527                                source,
528                                destination: destination.clone(),
529                            })
530                            .await?;
531
532                        sequencer.call(SequencerRequest::ReleaseFlow { destination }).await?;
533                        Ok(P2mResponse::Ack)
534                    } else {
535                        Err(TraceabilityError::NotFoundFlow(grant_id))
536                    }
537                }
538            }
539        })
540    }
541}
542
543#[cfg(test)]
544mod tests {
545    use tower::Service;
546
547    use super::*;
548    use crate::{
549        traceability::services::{
550            compliance::ComplianceService, provenance::ProvenanceService,
551            sequencer::SequencerService,
552        },
553        transport::nop::M2mNop,
554    };
555
556    #[tokio::test]
557    async fn unit_trace2e_service_request_response() {
558        crate::trace2e_tracing::init();
559        let mut p2m_service = P2mApiService::new(
560            SequencerService::default(),
561            ProvenanceService::default(),
562            ComplianceService::default(),
563            M2mNop,
564        );
565
566        assert_eq!(
567            p2m_service
568                .call(P2mRequest::LocalEnroll { pid: 1, fd: 3, path: "/tmp/test.txt".to_string() })
569                .await
570                .unwrap(),
571            P2mResponse::Ack
572        );
573        assert_eq!(
574            p2m_service
575                .call(P2mRequest::RemoteEnroll {
576                    pid: 1,
577                    fd: 3,
578                    local_socket: "127.0.0.1:8080".to_string(),
579                    peer_socket: "127.0.0.1:8081".to_string()
580                })
581                .await
582                .unwrap(),
583            P2mResponse::Ack
584        );
585
586        let P2mResponse::Grant(flow_id) =
587            p2m_service.call(P2mRequest::IoRequest { pid: 1, fd: 3, output: true }).await.unwrap()
588        else {
589            panic!("Expected P2mResponse::Grant");
590        };
591        assert_eq!(
592            p2m_service
593                .call(P2mRequest::IoReport { pid: 1, fd: 3, grant_id: flow_id, result: true })
594                .await
595                .unwrap(),
596            P2mResponse::Ack
597        );
598
599        let P2mResponse::Grant(flow_id) =
600            p2m_service.call(P2mRequest::IoRequest { pid: 1, fd: 3, output: false }).await.unwrap()
601        else {
602            panic!("Expected P2mResponse::Grant");
603        };
604        assert_eq!(
605            p2m_service
606                .call(P2mRequest::IoReport { pid: 1, fd: 3, grant_id: flow_id, result: true })
607                .await
608                .unwrap(),
609            P2mResponse::Ack
610        );
611    }
612
613    #[tokio::test]
614    async fn unit_trace2e_service_validated_resources() {
615        crate::trace2e_tracing::init();
616        let mut p2m_service = P2mApiService::new(
617            SequencerService::default(),
618            ProvenanceService::default(),
619            ComplianceService::default(),
620            M2mNop,
621        )
622        .with_resource_validation(true);
623
624        // Test with invalid process
625        // This request is supposed to be filtered out by the validator
626        assert_eq!(
627            p2m_service
628                .call(P2mRequest::LocalEnroll { pid: 0, fd: 3, path: "/tmp/test.txt".to_string() })
629                .await
630                .unwrap_err()
631                .to_string(),
632            "Traceability error, process not found (pid: 0)"
633        );
634
635        // Test successful process instantiation with validation
636        assert_eq!(
637            p2m_service
638                .call(P2mRequest::LocalEnroll {
639                    pid: std::process::id() as i32,
640                    fd: 3,
641                    path: "/tmp/test.txt".to_string()
642                })
643                .await
644                .unwrap(),
645            P2mResponse::Ack
646        );
647    }
648
649    #[tokio::test]
650    async fn unit_trace2e_service_io_invalid_request() {
651        crate::trace2e_tracing::init();
652        let mut p2m_service = P2mApiService::new(
653            SequencerService::default(),
654            ProvenanceService::default(),
655            ComplianceService::default(),
656            M2mNop,
657        )
658        .with_resource_validation(true);
659
660        // Neither process nor fd are enrolled
661        assert_eq!(
662            p2m_service
663                .call(P2mRequest::IoRequest { pid: std::process::id() as i32, fd: 3, output: true })
664                .await
665                .unwrap_err()
666                .to_string(),
667            format!(
668                "Traceability error, undeclared resource (pid: {}, fd: 3)",
669                std::process::id() as i32
670            )
671        );
672
673        p2m_service
674            .call(P2mRequest::LocalEnroll {
675                pid: std::process::id() as i32,
676                fd: 4,
677                path: "/tmp/test.txt".to_string(),
678            })
679            .await
680            .unwrap();
681
682        // Only process is enrolled
683        assert_eq!(
684            p2m_service
685                .call(P2mRequest::IoRequest { pid: std::process::id() as i32, fd: 3, output: true })
686                .await
687                .unwrap_err()
688                .to_string(),
689            format!(
690                "Traceability error, undeclared resource (pid: {}, fd: 3)",
691                std::process::id() as i32
692            )
693        );
694    }
695
696    #[tokio::test]
697    async fn unit_trace2e_service_io_invalid_report() {
698        crate::trace2e_tracing::init();
699        let mut p2m_service = P2mApiService::new(
700            SequencerService::default(),
701            ProvenanceService::default(),
702            ComplianceService::default(),
703            M2mNop,
704        )
705        .with_resource_validation(true);
706
707        // Invalid grant id
708        assert_eq!(
709            p2m_service
710                .call(P2mRequest::IoReport { pid: 1, fd: 3, grant_id: 0, result: true })
711                .await
712                .unwrap_err()
713                .to_string(),
714            "Traceability error, flow not found (id: 0)"
715        );
716    }
717
718    #[tokio::test]
719    async fn unit_trace2e_service_integrated_validation() {
720        crate::trace2e_tracing::init();
721
722        // Test P2M service with integrated validation enabled
723        let mut p2m_service_with_validation = P2mApiService::new(
724            SequencerService::default(),
725            ProvenanceService::default(),
726            ComplianceService::default(),
727            M2mNop,
728        )
729        .with_resource_validation(true);
730
731        // Test P2M service with validation disabled
732        let mut p2m_service_without_validation = P2mApiService::new(
733            SequencerService::default(),
734            ProvenanceService::default(),
735            ComplianceService::default(),
736            M2mNop,
737        )
738        .with_resource_validation(false);
739
740        // Test invalid process - should fail with validation enabled
741        assert_eq!(
742            p2m_service_with_validation
743                .call(P2mRequest::LocalEnroll { pid: 0, fd: 3, path: "/tmp/test.txt".to_string() })
744                .await
745                .unwrap_err()
746                .to_string(),
747            "Traceability error, process not found (pid: 0)"
748        );
749
750        // Test invalid process - should succeed with validation disabled
751        assert_eq!(
752            p2m_service_without_validation
753                .call(P2mRequest::LocalEnroll { pid: 0, fd: 3, path: "/tmp/test.txt".to_string() })
754                .await
755                .unwrap(),
756            P2mResponse::Ack
757        );
758
759        // Test valid process - should succeed with validation enabled
760        assert_eq!(
761            p2m_service_with_validation
762                .call(P2mRequest::LocalEnroll {
763                    pid: std::process::id() as i32,
764                    fd: 3,
765                    path: "/tmp/test.txt".to_string()
766                })
767                .await
768                .unwrap(),
769            P2mResponse::Ack
770        );
771
772        // Test valid process - should succeed with validation disabled
773        assert_eq!(
774            p2m_service_without_validation
775                .call(P2mRequest::LocalEnroll {
776                    pid: std::process::id() as i32,
777                    fd: 3,
778                    path: "/tmp/test.txt".to_string()
779                })
780                .await
781                .unwrap(),
782            P2mResponse::Ack
783        );
784    }
785}