trace2e_core/traceability/api/
p2m.rs

1//! Process-to-Middleware (P2M) API service implementation.
2//!
3//! This module provides the core service implementation for handling requests from application
4//! processes that need traceability tracking for their I/O operations. The P2M API is the
5//! primary interface through which applications integrate with the trace2e system.
6//!
7//! ## Service Architecture
8//!
9//! The `P2mApiService` acts as the central coordinator between four key services:
10//! - **Sequencer Service**: Manages flow ordering and resource reservations
11//! - **Provenance Service**: Tracks data provenance  
12//! - **Compliance Service**: Enforces policies and authorization decisions
13//! - **M2M Client**: Communicates with remote middleware for distributed flows
14//!
15//! ## Resource Management
16//!
17//! The service maintains two primary data structures:
18//! - **Resource Map**: Associates process/file descriptor pairs with source/destination resources
19//! - **Flow Map**: Tracks active flows by grant ID for operation completion reporting
20//!
21//! ## Operation Workflow
22//!
23//! 1. **Enrollment**: Processes register their files and streams before use
24//! 2. **Authorization**: Processes request permission for specific I/O operations
25//! 3. **Execution**: Middleware evaluates policies and grants/denies access
26//! 4. **Reporting**: Processes report completion status for audit trails
27//!
28//! ## Cross-Node Coordination
29//!
30//! For distributed flows involving remote resources, the service coordinates with
31//! remote middleware instances via the M2M API to ensure consistent policy
32//! enforcement and provenance tracking across the network.
33
34use std::{
35    collections::HashSet, future::Future, pin::Pin, sync::Arc, task::Poll, time::SystemTime,
36};
37
38use dashmap::DashMap;
39use tower::{Service, ServiceExt};
40use tracing::{debug, info};
41
42use crate::traceability::{
43    api::types::{
44        ComplianceRequest, ComplianceResponse, M2mRequest, M2mResponse, P2mRequest, P2mResponse,
45        ProvenanceRequest, ProvenanceResponse, SequencerRequest, SequencerResponse,
46    },
47    error::TraceabilityError,
48    infrastructure::{
49        naming::{NodeId, Resource},
50        validation::ResourceValidator,
51    },
52};
53
54/// Maps (process_id, file_descriptor) to (source_resource, destination_resource) pairs
55type ResourceMap = DashMap<(i32, i32), (Resource, Resource)>;
56/// Maps flow_id to (source_resource, destination_resource) pairs for active flows
57type FlowMap = DashMap<u128, (Resource, Resource)>;
58
59/// P2M (Process-to-Middleware) API Service.
60///
61/// Central orchestrator for process-initiated traceability operations. This service
62/// manages the complete lifecycle of tracked I/O operations from initial resource
63/// enrollment through final completion reporting.
64///
65/// ## Core Responsibilities
66///
67/// **Resource Enrollment**: Maintains registry of process file descriptors and their
68/// associated resources (files or network streams) for traceability tracking.
69///
70/// **Flow Authorization**: Coordinates with compliance and sequencer services to
71/// evaluate whether requested I/O operations should be permitted based on current policies.
72///
73/// **Distributed Coordination**: Communicates with remote middleware instances for
74/// cross-node flows, ensuring consistent policy enforcement across the network.
75///
76/// **Provenance Tracking**: Updates provenance records following successful operations
77/// to maintain complete audit trails for compliance and governance.
78///
79/// ## Concurrency and State Management
80///
81/// Uses concurrent data structures (`DashMap`) to handle multiple simultaneous requests
82/// from different processes while maintaining consistency. Resource and flow maps are
83/// shared across service instances using `Arc` for efficient cloning.
84///
85/// ## Generic Type Parameters
86///
87/// - `S`: Sequencer service for flow coordination and resource reservations
88/// - `P`: Provenance service for provenance tracking
89/// - `C`: Compliance service for policy evaluation and authorization decisions  
90/// - `M`: M2M client service for communication with remote middleware instances
91#[derive(Debug, Clone)]
92pub struct P2mApiService<S, P, C, M> {
93    /// Maps (process_id, file_descriptor) to (source_resource, destination_resource) pairs
94    resource_map: Arc<ResourceMap>,
95    /// Maps flow_id to (source_resource, destination_resource) pairs for active flows
96    flow_map: Arc<FlowMap>,
97    /// Service for managing flows sequencing
98    sequencer: S,
99    /// Service for tracking resources provenance
100    provenance: P,
101    /// Service for policy management and compliance checking
102    compliance: C,
103    /// Client service for Middleware-to-Middleware communication
104    m2m: M,
105    /// Whether to perform resource validation on incoming requests
106    enable_resource_validation: bool,
107}
108
109impl<S, P, C, M> P2mApiService<S, P, C, M> {
110    /// Creates a new P2M API service with the provided component services.
111    ///
112    /// Initializes empty resource and flow maps and stores references to the
113    /// core services needed for traceability operations. The service is ready
114    /// to handle process requests immediately after construction.
115    ///
116    /// # Arguments
117    /// * `sequencer` - Service for flow coordination and resource reservations
118    /// * `provenance` - Service for provenance tracking
119    /// * `compliance` - Service for policy evaluation and authorization decisions
120    /// * `m2m` - Client for communication with remote middleware instances
121    pub fn new(sequencer: S, provenance: P, compliance: C, m2m: M) -> Self {
122        Self {
123            resource_map: Arc::new(ResourceMap::new()),
124            flow_map: Arc::new(FlowMap::new()),
125            sequencer,
126            provenance,
127            compliance,
128            m2m,
129            enable_resource_validation: false,
130        }
131    }
132
133    /// Pre-enrolls resources for testing and simulation purposes.
134    ///
135    /// Creates mock enrollments for the specified number of processes, files, and streams
136    /// to support testing scenarios without requiring actual process interactions.
137    /// Should only be used in test environments or for system benchmarking.
138    ///
139    /// # Arguments
140    /// * `process_count` - Number of mock processes to enroll
141    /// * `per_process_file_count` - Number of files to enroll per process
142    /// * `per_process_stream_count` - Number of streams to enroll per process
143    ///
144    /// # Returns
145    /// The service instance with pre-enrolled mock resources
146    #[cfg(test)]
147    pub fn with_enrolled_resources(
148        self,
149        process_count: u32,
150        per_process_file_count: u32,
151        per_process_stream_count: u32,
152    ) -> Self {
153        // Pre-calculate all entries to avoid repeated allocations during insertion
154        let file_entries: Vec<_> = (0..process_count as i32)
155            .flat_map(|process_id| {
156                (3..(per_process_file_count + 3) as i32).map(move |file_id| {
157                    (
158                        (process_id, file_id),
159                        (
160                            Resource::new_process_mock(process_id),
161                            Resource::new_file(format!(
162                                "/file_{}",
163                                (process_id + file_id) % process_count as i32
164                            )),
165                        ),
166                    )
167                })
168            })
169            .collect();
170        let stream_entries: Vec<_> = (0..process_count as i32)
171            .flat_map(|process_id| {
172                ((per_process_file_count + 3) as i32
173                    ..(per_process_stream_count + per_process_file_count + 3) as i32)
174                    .map(move |stream_id| {
175                        (
176                            (process_id, stream_id),
177                            (
178                                Resource::new_process_mock(process_id),
179                                Resource::new_stream(
180                                    format!("127.0.0.1:{stream_id}",),
181                                    format!("127.0.0.2:{stream_id}",),
182                                ),
183                            ),
184                        )
185                    })
186            })
187            .collect();
188
189        // Batch insert all entries at once using DashMap's concurrent insert capabilities
190        for (key, value) in file_entries.into_iter().chain(stream_entries.into_iter()) {
191            self.resource_map.insert(key, value);
192        }
193        self
194    }
195
196    /// Enables or disables resource validation for incoming P2M requests.
197    ///
198    /// When validation is enabled, all incoming requests are validated for:
199    /// - Valid process IDs (must correspond to running processes)
200    /// - Valid stream addresses (must be well-formed and compatible)
201    ///
202    /// This method uses the same ResourceValidator logic as the Tower filter
203    /// but integrates it directly into the service to avoid complex Send/Sync
204    /// constraints with async runtimes.
205    ///
206    /// # Arguments
207    /// * `enable` - Whether to enable resource validation
208    ///
209    /// # Returns
210    /// Self with validation setting applied
211    pub fn with_resource_validation(mut self, enable: bool) -> Self {
212        self.enable_resource_validation = enable;
213        self
214    }
215
216    /// Validates a P2M request according to resource requirements.
217    ///
218    /// Applies the same validation rules as the ResourceValidator:
219    /// - `RemoteEnroll`: Validates both process and stream resources
220    /// - `LocalEnroll`, `IoRequest`: Validates process resources only  
221    /// - `IoReport`: Passes through without validation (grant ID is validated later)
222    ///
223    /// # Arguments
224    /// * `request` - The P2M request to validate
225    ///
226    /// # Returns
227    /// `Ok(())` if validation passes, `Err(TraceabilityError)` if validation fails
228    ///
229    /// # Errors
230    /// - `InvalidProcess`: When the process ID is not found or accessible
231    /// - `InvalidStream`: When socket addresses are malformed or incompatible
232    fn validate_request(request: &P2mRequest) -> Result<&P2mRequest, TraceabilityError> {
233        // Use the same validation logic as the ResourceValidator
234        match request {
235            P2mRequest::RemoteEnroll { pid, local_socket, peer_socket, .. } => {
236                if ResourceValidator.is_valid_process(*pid) {
237                    if ResourceValidator.is_valid_stream(local_socket, peer_socket) {
238                        Ok(request)
239                    } else {
240                        Err(TraceabilityError::InvalidStream(
241                            local_socket.clone(),
242                            peer_socket.clone(),
243                        ))
244                    }
245                } else {
246                    Err(TraceabilityError::InvalidProcess(*pid))
247                }
248            }
249            P2mRequest::LocalEnroll { pid, .. } | P2mRequest::IoRequest { pid, .. } => {
250                if ResourceValidator.is_valid_process(*pid) {
251                    Ok(request)
252                } else {
253                    Err(TraceabilityError::InvalidProcess(*pid))
254                }
255            }
256            P2mRequest::IoReport { .. } => Ok(request),
257        }
258    }
259
260    fn flow_id() -> u128 {
261        SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_nanos()
262    }
263}
264
265impl<S, P, C, M> Service<P2mRequest> for P2mApiService<S, P, C, M>
266where
267    S: Service<SequencerRequest, Response = SequencerResponse, Error = TraceabilityError>
268        + Clone
269        + Send
270        + 'static,
271    S::Future: Send,
272    P: Service<ProvenanceRequest, Response = ProvenanceResponse, Error = TraceabilityError>
273        + Clone
274        + Send
275        + NodeId
276        + 'static,
277    P::Future: Send,
278    C: Service<ComplianceRequest, Response = ComplianceResponse, Error = TraceabilityError>
279        + Clone
280        + Send
281        + 'static,
282    C::Future: Send,
283    M: Service<M2mRequest, Response = M2mResponse, Error = TraceabilityError>
284        + Clone
285        + Send
286        + 'static,
287    M::Future: Send,
288{
289    type Response = P2mResponse;
290    type Error = TraceabilityError;
291    type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
292
293    fn poll_ready(&mut self, _: &mut std::task::Context<'_>) -> Poll<Result<(), Self::Error>> {
294        Poll::Ready(Ok(()))
295    }
296
297    fn call(&mut self, request: P2mRequest) -> Self::Future {
298        let resource_map = self.resource_map.clone();
299        let flow_map = self.flow_map.clone();
300        let mut sequencer = self.sequencer.clone();
301        let mut provenance = self.provenance.clone();
302        let mut compliance = self.compliance.clone();
303        let mut m2m = self.m2m.clone();
304        let enable_validation = self.enable_resource_validation;
305
306        Box::pin(async move {
307            // Perform resource validation if enabled
308            if enable_validation {
309                Self::validate_request(&request)?;
310            }
311
312            match request {
313                P2mRequest::LocalEnroll { pid, fd, path } => {
314                    info!(
315                        node_id = %provenance.node_id(),
316                        pid = %pid,
317                        fd = %fd,
318                        path = %path,
319                        "[p2m] LocalEnroll"
320                    );
321                    resource_map
322                        .insert((pid, fd), (Resource::new_process(pid), Resource::new_file(path)));
323                    Ok(P2mResponse::Ack)
324                }
325                P2mRequest::RemoteEnroll { pid, fd, local_socket, peer_socket } => {
326                    info!(
327                        node_id = %provenance.node_id(),
328                        pid = %pid,
329                        fd = %fd,
330                        local_socket = %local_socket,
331                        peer_socket = %peer_socket,
332                        "[p2m] RemoteEnroll"
333                    );
334                    resource_map.insert(
335                        (pid, fd),
336                        (
337                            Resource::new_process(pid),
338                            Resource::new_stream(local_socket, peer_socket),
339                        ),
340                    );
341                    Ok(P2mResponse::Ack)
342                }
343                P2mRequest::IoRequest { pid, fd, output } => {
344                    if let Some(resource) = resource_map.get(&(pid, fd)) {
345                        let (source, destination) = if output {
346                            (resource.0.to_owned(), resource.1.to_owned())
347                        } else {
348                            (resource.1.to_owned(), resource.0.to_owned())
349                        };
350                        info!(
351                            node_id = %provenance.node_id(),
352                            source = %source,
353                            destination = %destination,
354                            "[p2m] IoRequest"
355                        );
356                        match sequencer
357                            .call(SequencerRequest::ReserveFlow {
358                                source: source.clone(),
359                                destination: destination.clone(),
360                            })
361                            .await
362                        {
363                            Ok(SequencerResponse::FlowReserved) => {
364                                let localized_destination =
365                                    destination.clone().into_localized(provenance.node_id());
366                                let destination_policy = if localized_destination
367                                    .resource()
368                                    .is_stream()
369                                {
370                                    debug!(
371                                        node_id = %provenance.node_id(),
372                                        destination = %localized_destination,
373                                        "[p2m] Querying destination policy for remote stream"
374                                    );
375                                    match m2m
376                                        .ready()
377                                        .await?
378                                        .call(M2mRequest::GetDestinationPolicy(
379                                            localized_destination.clone(),
380                                        ))
381                                        .await
382                                    {
383                                        Ok(M2mResponse::DestinationPolicy(policy)) => Some(policy),
384                                        _ => None, // anyway, errors are handled later, but this may be improved
385                                    }
386                                } else {
387                                    None
388                                };
389                                let flow_id = match provenance
390                                    .call(ProvenanceRequest::GetReferences(source.clone()))
391                                    .await
392                                {
393                                    Ok(ProvenanceResponse::Provenance(references)) => {
394                                        let (local_references, remote_references): (
395                                            HashSet<_>,
396                                            HashSet<_>,
397                                        ) = references
398                                            .into_iter()
399                                            .partition(|r| *r.node_id() == provenance.node_id());
400                                        match compliance
401                                            .call(ComplianceRequest::EvalCompliance {
402                                                sources: local_references
403                                                    .iter()
404                                                    .map(|r| r.resource().to_owned())
405                                                    .collect(),
406                                                destination: localized_destination.clone(),
407                                                destination_policy: destination_policy.clone(),
408                                            })
409                                            .await
410                                        {
411                                            Ok(ComplianceResponse::Grant) => {
412                                                // Local compliance check passed. If we have no remote references, we can
413                                                // grant the flow, otherwise we need to check the compliance of the remote nodes.
414                                                // Destination policy is required for remote sources compliance checking.
415                                                if remote_references.is_empty() {
416                                                    debug!(
417                                                        node_id = %provenance.node_id(),
418                                                        "[p2m] Local compliance check passed, granting flow"
419                                                    );
420                                                    Ok(Self::flow_id())
421                                                } else {
422                                                    // It the destination is not a stream, so it is a local resource, we can get the policy from the compliance service
423                                                    // This could have been done earlier, but we do it here, to make this call only when it is really needed.
424                                                    let destination_policy = if let Some(policy) =
425                                                        destination_policy
426                                                    {
427                                                        policy
428                                                    } else if !destination.is_stream() {
429                                                        match compliance.call(ComplianceRequest::GetPolicy(destination.clone())).await {
430                                                            Ok(ComplianceResponse::Policy(policy)) => policy,
431                                                            _ => return Err(TraceabilityError::InternalTrace2eError),
432                                                        }
433                                                    } else {
434                                                        return Err(
435                                                            TraceabilityError::InternalTrace2eError,
436                                                        );
437                                                    };
438                                                    debug!(
439                                                        node_id = %provenance.node_id(),
440                                                        "[p2m] Querying remote sources compliance"
441                                                    );
442                                                    match m2m
443                                                        .ready()
444                                                        .await?
445                                                        .call(M2mRequest::CheckSourceCompliance {
446                                                            sources: remote_references,
447                                                            destination: (
448                                                                localized_destination,
449                                                                destination_policy,
450                                                            ),
451                                                        })
452                                                        .await
453                                                    {
454                                                        Ok(M2mResponse::Ack) => {
455                                                            // Remote sources compliance check passed
456                                                            // Flow can be granted, return the flow id
457                                                            Ok(Self::flow_id())
458                                                        }
459                                                        Err(e) => Err(e),
460                                                        _ => Err(
461                                                            TraceabilityError::InternalTrace2eError,
462                                                        ),
463                                                    }
464                                                }
465                                            }
466                                            Err(e) => Err(e),
467                                            _ => Err(TraceabilityError::InternalTrace2eError),
468                                        }
469                                    }
470                                    Err(e) => Err(e),
471                                    _ => Err(TraceabilityError::InternalTrace2eError),
472                                };
473                                match flow_id {
474                                    Ok(flow_id) => {
475                                        // Compliance check passed, flow can be granted, return the flow id
476                                        flow_map.insert(flow_id, (source, destination));
477                                        Ok(P2mResponse::Grant(flow_id))
478                                    }
479                                    Err(e) => {
480                                        debug!(
481                                            node_id = %provenance.node_id(),
482                                            error = ?e,
483                                            "[p2m] Compliance check failed, releasing flow"
484                                        );
485                                        // release the flow, and then forward the error
486                                        sequencer
487                                            .call(SequencerRequest::ReleaseFlow { destination })
488                                            .await?;
489                                        Err(e)
490                                    }
491                                }
492                            }
493                            _ => Err(TraceabilityError::InternalTrace2eError),
494                        }
495                    } else {
496                        Err(TraceabilityError::UndeclaredResource(pid, fd))
497                    }
498                }
499                P2mRequest::IoReport { grant_id, .. } => {
500                    if let Some((_, (source, destination))) = flow_map.remove(&grant_id) {
501                        info!(
502                            node_id = %provenance.node_id(),
503                            source = %source,
504                            destination = %destination,
505                            "[p2m] IoReport"
506                        );
507                        if let Some(remote_stream) = destination.try_into_localized_peer_stream() {
508                            match provenance
509                                .call(ProvenanceRequest::GetReferences(source.clone()))
510                                .await?
511                            {
512                                ProvenanceResponse::Provenance(references) => {
513                                    debug!(
514                                        remote_node_id = remote_stream.node_id(),
515                                        "[p2m] Updating remote provenance"
516                                    );
517                                    m2m.ready()
518                                        .await?
519                                        .call(M2mRequest::UpdateProvenance {
520                                            source_prov: references,
521                                            destination: remote_stream,
522                                        })
523                                        .await?;
524                                }
525                                _ => return Err(TraceabilityError::InternalTrace2eError),
526                            };
527                        } else {
528                            info!(
529                                node_id = %provenance.node_id(),
530                                "[p2m] Updating local provenance"
531                            );
532                            provenance
533                                .call(ProvenanceRequest::UpdateProvenance {
534                                    source,
535                                    destination: destination.clone(),
536                                })
537                                .await?;
538                        }
539
540                        sequencer.call(SequencerRequest::ReleaseFlow { destination }).await?;
541                        Ok(P2mResponse::Ack)
542                    } else {
543                        Err(TraceabilityError::NotFoundFlow(grant_id))
544                    }
545                }
546            }
547        })
548    }
549}
550
551#[cfg(test)]
552mod tests {
553    use tower::Service;
554
555    use super::*;
556    use crate::{
557        traceability::services::{
558            compliance::ComplianceService, provenance::ProvenanceService,
559            sequencer::SequencerService,
560        },
561        transport::nop::M2mNop,
562    };
563
564    #[tokio::test]
565    async fn unit_trace2e_service_request_response() {
566        crate::trace2e_tracing::init();
567        let mut p2m_service = P2mApiService::new(
568            SequencerService::default(),
569            ProvenanceService::default(),
570            ComplianceService::default(),
571            M2mNop,
572        );
573
574        assert_eq!(
575            p2m_service
576                .call(P2mRequest::LocalEnroll { pid: 1, fd: 3, path: "/tmp/test.txt".to_string() })
577                .await
578                .unwrap(),
579            P2mResponse::Ack
580        );
581        assert_eq!(
582            p2m_service
583                .call(P2mRequest::RemoteEnroll {
584                    pid: 1,
585                    fd: 3,
586                    local_socket: "127.0.0.1:8080".to_string(),
587                    peer_socket: "127.0.0.1:8081".to_string()
588                })
589                .await
590                .unwrap(),
591            P2mResponse::Ack
592        );
593
594        let P2mResponse::Grant(flow_id) =
595            p2m_service.call(P2mRequest::IoRequest { pid: 1, fd: 3, output: true }).await.unwrap()
596        else {
597            panic!("Expected P2mResponse::Grant");
598        };
599        assert_eq!(
600            p2m_service
601                .call(P2mRequest::IoReport { pid: 1, fd: 3, grant_id: flow_id, result: true })
602                .await
603                .unwrap(),
604            P2mResponse::Ack
605        );
606
607        let P2mResponse::Grant(flow_id) =
608            p2m_service.call(P2mRequest::IoRequest { pid: 1, fd: 3, output: false }).await.unwrap()
609        else {
610            panic!("Expected P2mResponse::Grant");
611        };
612        assert_eq!(
613            p2m_service
614                .call(P2mRequest::IoReport { pid: 1, fd: 3, grant_id: flow_id, result: true })
615                .await
616                .unwrap(),
617            P2mResponse::Ack
618        );
619    }
620
621    #[tokio::test]
622    async fn unit_trace2e_service_validated_resources() {
623        crate::trace2e_tracing::init();
624        let mut p2m_service = P2mApiService::new(
625            SequencerService::default(),
626            ProvenanceService::default(),
627            ComplianceService::default(),
628            M2mNop,
629        )
630        .with_resource_validation(true);
631
632        // Test with invalid process
633        // This request is supposed to be filtered out by the validator
634        assert_eq!(
635            p2m_service
636                .call(P2mRequest::LocalEnroll { pid: 0, fd: 3, path: "/tmp/test.txt".to_string() })
637                .await
638                .unwrap_err()
639                .to_string(),
640            "Traceability error, process not found (pid: 0)"
641        );
642
643        // Test successful process instantiation with validation
644        assert_eq!(
645            p2m_service
646                .call(P2mRequest::LocalEnroll {
647                    pid: std::process::id() as i32,
648                    fd: 3,
649                    path: "/tmp/test.txt".to_string()
650                })
651                .await
652                .unwrap(),
653            P2mResponse::Ack
654        );
655    }
656
657    #[tokio::test]
658    async fn unit_trace2e_service_io_invalid_request() {
659        crate::trace2e_tracing::init();
660        let mut p2m_service = P2mApiService::new(
661            SequencerService::default(),
662            ProvenanceService::default(),
663            ComplianceService::default(),
664            M2mNop,
665        )
666        .with_resource_validation(true);
667
668        // Neither process nor fd are enrolled
669        assert_eq!(
670            p2m_service
671                .call(P2mRequest::IoRequest { pid: std::process::id() as i32, fd: 3, output: true })
672                .await
673                .unwrap_err()
674                .to_string(),
675            format!(
676                "Traceability error, undeclared resource (pid: {}, fd: 3)",
677                std::process::id() as i32
678            )
679        );
680
681        p2m_service
682            .call(P2mRequest::LocalEnroll {
683                pid: std::process::id() as i32,
684                fd: 4,
685                path: "/tmp/test.txt".to_string(),
686            })
687            .await
688            .unwrap();
689
690        // Only process is enrolled
691        assert_eq!(
692            p2m_service
693                .call(P2mRequest::IoRequest { pid: std::process::id() as i32, fd: 3, output: true })
694                .await
695                .unwrap_err()
696                .to_string(),
697            format!(
698                "Traceability error, undeclared resource (pid: {}, fd: 3)",
699                std::process::id() as i32
700            )
701        );
702    }
703
704    #[tokio::test]
705    async fn unit_trace2e_service_io_invalid_report() {
706        crate::trace2e_tracing::init();
707        let mut p2m_service = P2mApiService::new(
708            SequencerService::default(),
709            ProvenanceService::default(),
710            ComplianceService::default(),
711            M2mNop,
712        )
713        .with_resource_validation(true);
714
715        // Invalid grant id
716        assert_eq!(
717            p2m_service
718                .call(P2mRequest::IoReport { pid: 1, fd: 3, grant_id: 0, result: true })
719                .await
720                .unwrap_err()
721                .to_string(),
722            "Traceability error, flow not found (id: 0)"
723        );
724    }
725
726    #[tokio::test]
727    async fn unit_trace2e_service_integrated_validation() {
728        crate::trace2e_tracing::init();
729
730        // Test P2M service with integrated validation enabled
731        let mut p2m_service_with_validation = P2mApiService::new(
732            SequencerService::default(),
733            ProvenanceService::default(),
734            ComplianceService::default(),
735            M2mNop,
736        )
737        .with_resource_validation(true);
738
739        // Test P2M service with validation disabled
740        let mut p2m_service_without_validation = P2mApiService::new(
741            SequencerService::default(),
742            ProvenanceService::default(),
743            ComplianceService::default(),
744            M2mNop,
745        )
746        .with_resource_validation(false);
747
748        // Test invalid process - should fail with validation enabled
749        assert_eq!(
750            p2m_service_with_validation
751                .call(P2mRequest::LocalEnroll { pid: 0, fd: 3, path: "/tmp/test.txt".to_string() })
752                .await
753                .unwrap_err()
754                .to_string(),
755            "Traceability error, process not found (pid: 0)"
756        );
757
758        // Test invalid process - should succeed with validation disabled
759        assert_eq!(
760            p2m_service_without_validation
761                .call(P2mRequest::LocalEnroll { pid: 0, fd: 3, path: "/tmp/test.txt".to_string() })
762                .await
763                .unwrap(),
764            P2mResponse::Ack
765        );
766
767        // Test valid process - should succeed with validation enabled
768        assert_eq!(
769            p2m_service_with_validation
770                .call(P2mRequest::LocalEnroll {
771                    pid: std::process::id() as i32,
772                    fd: 3,
773                    path: "/tmp/test.txt".to_string()
774                })
775                .await
776                .unwrap(),
777            P2mResponse::Ack
778        );
779
780        // Test valid process - should succeed with validation disabled
781        assert_eq!(
782            p2m_service_without_validation
783                .call(P2mRequest::LocalEnroll {
784                    pid: std::process::id() as i32,
785                    fd: 3,
786                    path: "/tmp/test.txt".to_string()
787                })
788                .await
789                .unwrap(),
790            P2mResponse::Ack
791        );
792    }
793}