trace2e_core/traceability/
p2m.rs

1//! Process-to-Middleware (P2M) API service implementation.
2//!
3//! This module provides the core service implementation for handling requests from application
4//! processes that need traceability tracking for their I/O operations. The P2M API is the
5//! primary interface through which applications integrate with the trace2e system.
6//!
7//! ## Service Architecture
8//!
9//! The `P2mApiService` acts as the central coordinator between four key services:
10//! - **Sequencer Service**: Manages flow ordering and resource reservations
11//! - **Provenance Service**: Tracks data provenance  
12//! - **Compliance Service**: Enforces policies and authorization decisions
13//! - **M2M Client**: Communicates with remote middleware for distributed flows
14//!
15//! ## Resource Management
16//!
17//! The service maintains two primary data structures:
18//! - **Resource Map**: Associates process/file descriptor pairs with source/destination resources
19//! - **Flow Map**: Tracks active flows by grant ID for operation completion reporting
20//!
21//! ## Operation Workflow
22//!
23//! 1. **Enrollment**: Processes register their files and streams before use
24//! 2. **Authorization**: Processes request permission for specific I/O operations
25//! 3. **Execution**: Middleware evaluates policies and grants/denies access
26//! 4. **Reporting**: Processes report completion status for audit trails
27//!
28//! ## Cross-Node Coordination
29//!
30//! For distributed flows involving remote resources, the service coordinates with
31//! remote middleware instances via the M2M API to ensure consistent policy
32//! enforcement and provenance tracking across the network.
33
34use std::{
35    collections::HashMap, future::Future, pin::Pin, sync::Arc, task::Poll, time::SystemTime,
36};
37
38use dashmap::DashMap;
39use tower::{Service, ServiceExt};
40#[cfg(feature = "trace2e_tracing")]
41use tracing::{debug, info};
42
43use crate::traceability::{
44    api::{
45        ComplianceRequest, ComplianceResponse, M2mRequest, M2mResponse, P2mRequest, P2mResponse,
46        ProvenanceRequest, ProvenanceResponse, SequencerRequest, SequencerResponse,
47    },
48    error::TraceabilityError,
49    naming::{NodeId, Resource},
50    validation::ResourceValidator,
51};
52
53/// Maps (process_id, file_descriptor) to (source_resource, destination_resource) pairs
54type ResourceMap = DashMap<(i32, i32), (Resource, Resource)>;
55/// Maps flow_id to (source_resource, destination_resource) pairs for active flows
56type FlowMap = DashMap<u128, (Resource, Resource)>;
57
58/// P2M (Process-to-Middleware) API Service.
59///
60/// Central orchestrator for process-initiated traceability operations. This service
61/// manages the complete lifecycle of tracked I/O operations from initial resource
62/// enrollment through final completion reporting.
63///
64/// ## Core Responsibilities
65///
66/// **Resource Enrollment**: Maintains registry of process file descriptors and their
67/// associated resources (files or network streams) for traceability tracking.
68///
69/// **Flow Authorization**: Coordinates with compliance and sequencer services to
70/// evaluate whether requested I/O operations should be permitted based on current policies.
71///
72/// **Distributed Coordination**: Communicates with remote middleware instances for
73/// cross-node flows, ensuring consistent policy enforcement across the network.
74///
75/// **Provenance Tracking**: Updates provenance records following successful operations
76/// to maintain complete audit trails for compliance and governance.
77///
78/// ## Concurrency and State Management
79///
80/// Uses concurrent data structures (`DashMap`) to handle multiple simultaneous requests
81/// from different processes while maintaining consistency. Resource and flow maps are
82/// shared across service instances using `Arc` for efficient cloning.
83///
84/// ## Generic Type Parameters
85///
86/// - `S`: Sequencer service for flow coordination and resource reservations
87/// - `P`: Provenance service for provenance tracking
88/// - `C`: Compliance service for policy evaluation and authorization decisions  
89/// - `M`: M2M client service for communication with remote middleware instances
90#[derive(Debug, Clone)]
91pub struct P2mApiService<S, P, C, M> {
92    /// Maps (process_id, file_descriptor) to (source_resource, destination_resource) pairs
93    resource_map: Arc<ResourceMap>,
94    /// Maps flow_id to (source_resource, destination_resource) pairs for active flows
95    flow_map: Arc<FlowMap>,
96    /// Service for managing flows sequencing
97    sequencer: S,
98    /// Service for tracking resources provenance
99    provenance: P,
100    /// Service for policy management and compliance checking
101    compliance: C,
102    /// Client service for Middleware-to-Middleware communication
103    m2m: M,
104    /// Whether to perform resource validation on incoming requests
105    enable_resource_validation: bool,
106}
107
108impl<S, P, C, M> P2mApiService<S, P, C, M> {
109    /// Creates a new P2M API service with the provided component services.
110    ///
111    /// Initializes empty resource and flow maps and stores references to the
112    /// core services needed for traceability operations. The service is ready
113    /// to handle process requests immediately after construction.
114    ///
115    /// # Arguments
116    /// * `sequencer` - Service for flow coordination and resource reservations
117    /// * `provenance` - Service for provenance tracking
118    /// * `compliance` - Service for policy evaluation and authorization decisions
119    /// * `m2m` - Client for communication with remote middleware instances
120    pub fn new(sequencer: S, provenance: P, compliance: C, m2m: M) -> Self {
121        Self {
122            resource_map: Arc::new(ResourceMap::new()),
123            flow_map: Arc::new(FlowMap::new()),
124            sequencer,
125            provenance,
126            compliance,
127            m2m,
128            enable_resource_validation: false,
129        }
130    }
131
132    /// Pre-enrolls resources for testing and simulation purposes.
133    ///
134    /// Creates mock enrollments for the specified number of processes, files, and streams
135    /// to support testing scenarios without requiring actual process interactions.
136    /// Should only be used in test environments or for system benchmarking.
137    ///
138    /// # Arguments
139    /// * `process_count` - Number of mock processes to enroll
140    /// * `per_process_file_count` - Number of files to enroll per process
141    /// * `per_process_stream_count` - Number of streams to enroll per process
142    ///
143    /// # Returns
144    /// The service instance with pre-enrolled mock resources
145    #[cfg(test)]
146    pub fn with_enrolled_resources(
147        self,
148        process_count: u32,
149        per_process_file_count: u32,
150        per_process_stream_count: u32,
151    ) -> Self {
152        // Pre-calculate all entries to avoid repeated allocations during insertion
153        let file_entries: Vec<_> = (0..process_count as i32)
154            .flat_map(|process_id| {
155                (3..(per_process_file_count + 3) as i32).map(move |file_id| {
156                    (
157                        (process_id, file_id),
158                        (
159                            Resource::new_process_mock(process_id),
160                            Resource::new_file(format!(
161                                "/file_{}",
162                                (process_id + file_id) % process_count as i32
163                            )),
164                        ),
165                    )
166                })
167            })
168            .collect();
169        let stream_entries: Vec<_> = (0..process_count as i32)
170            .flat_map(|process_id| {
171                ((per_process_file_count + 3) as i32
172                    ..(per_process_stream_count + per_process_file_count + 3) as i32)
173                    .map(move |stream_id| {
174                        (
175                            (process_id, stream_id),
176                            (
177                                Resource::new_process_mock(process_id),
178                                Resource::new_stream(
179                                    format!("127.0.0.1:{stream_id}",),
180                                    format!("127.0.0.2:{stream_id}",),
181                                ),
182                            ),
183                        )
184                    })
185            })
186            .collect();
187
188        // Batch insert all entries at once using DashMap's concurrent insert capabilities
189        for (key, value) in file_entries.into_iter().chain(stream_entries.into_iter()) {
190            self.resource_map.insert(key, value);
191        }
192        self
193    }
194
195    /// Enables or disables resource validation for incoming P2M requests.
196    ///
197    /// When validation is enabled, all incoming requests are validated for:
198    /// - Valid process IDs (must correspond to running processes)
199    /// - Valid stream addresses (must be well-formed and compatible)
200    ///
201    /// This method uses the same ResourceValidator logic as the Tower filter
202    /// but integrates it directly into the service to avoid complex Send/Sync
203    /// constraints with async runtimes.
204    ///
205    /// # Arguments
206    /// * `enable` - Whether to enable resource validation
207    ///
208    /// # Returns
209    /// Self with validation setting applied
210    pub fn with_resource_validation(mut self, enable: bool) -> Self {
211        self.enable_resource_validation = enable;
212        self
213    }
214
215    /// Validates a P2M request according to resource requirements.
216    ///
217    /// Applies the same validation rules as the ResourceValidator:
218    /// - `RemoteEnroll`: Validates both process and stream resources
219    /// - `LocalEnroll`, `IoRequest`: Validates process resources only  
220    /// - `IoReport`: Passes through without validation (grant ID is validated later)
221    ///
222    /// # Arguments
223    /// * `request` - The P2M request to validate
224    ///
225    /// # Returns
226    /// `Ok(())` if validation passes, `Err(TraceabilityError)` if validation fails
227    ///
228    /// # Errors
229    /// - `InvalidProcess`: When the process ID is not found or accessible
230    /// - `InvalidStream`: When socket addresses are malformed or incompatible
231    fn validate_request(request: &P2mRequest) -> Result<&P2mRequest, TraceabilityError> {
232        // Use the same validation logic as the ResourceValidator
233        match request {
234            P2mRequest::RemoteEnroll { pid, local_socket, peer_socket, .. } => {
235                if ResourceValidator.is_valid_process(*pid) {
236                    if ResourceValidator.is_valid_stream(local_socket, peer_socket) {
237                        Ok(request)
238                    } else {
239                        Err(TraceabilityError::InvalidStream(
240                            local_socket.clone(),
241                            peer_socket.clone(),
242                        ))
243                    }
244                } else {
245                    Err(TraceabilityError::InvalidProcess(*pid))
246                }
247            }
248            P2mRequest::LocalEnroll { pid, .. } | P2mRequest::IoRequest { pid, .. } => {
249                if ResourceValidator.is_valid_process(*pid) {
250                    Ok(request)
251                } else {
252                    Err(TraceabilityError::InvalidProcess(*pid))
253                }
254            }
255            P2mRequest::IoReport { .. } => Ok(request),
256        }
257    }
258}
259
260impl<S, P, C, M> Service<P2mRequest> for P2mApiService<S, P, C, M>
261where
262    S: Service<SequencerRequest, Response = SequencerResponse, Error = TraceabilityError>
263        + Clone
264        + Send
265        + 'static,
266    S::Future: Send,
267    P: Service<ProvenanceRequest, Response = ProvenanceResponse, Error = TraceabilityError>
268        + Clone
269        + Send
270        + NodeId
271        + 'static,
272    P::Future: Send,
273    C: Service<ComplianceRequest, Response = ComplianceResponse, Error = TraceabilityError>
274        + Clone
275        + Send
276        + 'static,
277    C::Future: Send,
278    M: Service<M2mRequest, Response = M2mResponse, Error = TraceabilityError>
279        + Clone
280        + Send
281        + 'static,
282    M::Future: Send,
283{
284    type Response = P2mResponse;
285    type Error = TraceabilityError;
286    type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
287
288    fn poll_ready(&mut self, _: &mut std::task::Context<'_>) -> Poll<Result<(), Self::Error>> {
289        Poll::Ready(Ok(()))
290    }
291
292    fn call(&mut self, request: P2mRequest) -> Self::Future {
293        let resource_map = self.resource_map.clone();
294        let flow_map = self.flow_map.clone();
295        let mut sequencer = self.sequencer.clone();
296        let mut provenance = self.provenance.clone();
297        let mut compliance = self.compliance.clone();
298        let mut m2m = self.m2m.clone();
299        let enable_validation = self.enable_resource_validation;
300
301        Box::pin(async move {
302            // Perform resource validation if enabled
303            if enable_validation {
304                Self::validate_request(&request)?;
305            }
306
307            match request {
308                P2mRequest::LocalEnroll { pid, fd, path } => {
309                    #[cfg(feature = "trace2e_tracing")]
310                    info!(
311                        "[p2m-{}] LocalEnroll: pid: {}, fd: {}, path: {}",
312                        provenance.node_id(),
313                        pid,
314                        fd,
315                        path
316                    );
317                    resource_map
318                        .insert((pid, fd), (Resource::new_process(pid), Resource::new_file(path)));
319                    Ok(P2mResponse::Ack)
320                }
321                P2mRequest::RemoteEnroll { pid, fd, local_socket, peer_socket } => {
322                    #[cfg(feature = "trace2e_tracing")]
323                    info!(
324                        "[p2m-{}] RemoteEnroll: pid: {}, fd: {}, local_socket: {}, peer_socket: {}",
325                        provenance.node_id(),
326                        pid,
327                        fd,
328                        local_socket,
329                        peer_socket
330                    );
331                    resource_map.insert(
332                        (pid, fd),
333                        (
334                            Resource::new_process(pid),
335                            Resource::new_stream(local_socket, peer_socket),
336                        ),
337                    );
338                    Ok(P2mResponse::Ack)
339                }
340                P2mRequest::IoRequest { pid, fd, output } => {
341                    if let Some(resource) = resource_map.get(&(pid, fd)) {
342                        let flow_id = match SystemTime::now().duration_since(SystemTime::UNIX_EPOCH)
343                        {
344                            Ok(n) => n.as_nanos(),
345                            Err(_) => return Err(TraceabilityError::SystemTimeError),
346                        };
347                        let (source, destination) = if output {
348                            (resource.0.to_owned(), resource.1.to_owned())
349                        } else {
350                            (resource.1.to_owned(), resource.0.to_owned())
351                        };
352                        #[cfg(feature = "trace2e_tracing")]
353                        info!(
354                            "[p2m-{}] IoRequest: source: {:?}, destination: {:?}",
355                            provenance.node_id(),
356                            source,
357                            destination
358                        );
359                        match sequencer
360                            .call(SequencerRequest::ReserveFlow {
361                                source: source.clone(),
362                                destination: destination.clone(),
363                            })
364                            .await
365                        {
366                            Ok(SequencerResponse::FlowReserved) => {
367                                // If destination is a stream, query remote policy via m2m
368                                // else if query local destination policy
369                                // else return error
370
371                                let source_policies = match provenance
372                                    .call(ProvenanceRequest::GetReferences(source.clone()))
373                                    .await?
374                                {
375                                    ProvenanceResponse::Provenance(mut references) => {
376                                        #[cfg(feature = "trace2e_tracing")]
377                                        debug!(
378                                            "[p2m-{}] Aggregated resources: {:?}",
379                                            provenance.node_id(),
380                                            references
381                                        );
382                                        // Get local source policies
383                                        let mut source_policies = if let Some(local_references) =
384                                            references.remove(&provenance.node_id())
385                                        {
386                                            match compliance
387                                                .call(ComplianceRequest::GetPolicies(
388                                                    local_references,
389                                                ))
390                                                .await?
391                                            {
392                                                ComplianceResponse::Policies(policies) => {
393                                                    HashMap::from([(
394                                                        provenance.node_id(),
395                                                        policies,
396                                                    )])
397                                                }
398                                                _ => {
399                                                    return Err(
400                                                        TraceabilityError::InternalTrace2eError,
401                                                    );
402                                                }
403                                            }
404                                        } else {
405                                            HashMap::new()
406                                        };
407                                        // Get remote source policies
408                                        // Collect all futures without awaiting them
409                                        let mut tasks = Vec::new();
410                                        for (node_id, resources) in references {
411                                            let mut m2m_clone = m2m.clone();
412                                            let node_id_clone = node_id.clone();
413                                            let task = tokio::spawn(async move {
414                                                let result =
415                                                    match m2m_clone.ready().await {
416                                                        Ok(ready_service) => ready_service
417                                                            .call(M2mRequest::GetSourceCompliance {
418                                                                authority_ip: node_id_clone.clone(),
419                                                                resources,
420                                                            })
421                                                            .await,
422                                                        Err(e) => Err(e),
423                                                    };
424                                                (node_id_clone, result)
425                                            });
426                                            tasks.push(task);
427                                        }
428
429                                        // Await all requests concurrently
430                                        for task in tasks {
431                                            let (node_id, result) = task.await.map_err(|_| {
432                                                TraceabilityError::InternalTrace2eError
433                                            })?;
434                                            match result? {
435                                                M2mResponse::SourceCompliance(policies) => {
436                                                    source_policies.insert(node_id, policies);
437                                                }
438                                                _ => {
439                                                    return Err(
440                                                        TraceabilityError::InternalTrace2eError,
441                                                    );
442                                                }
443                                            }
444                                        }
445                                        source_policies
446                                    }
447                                    _ => {
448                                        return Err(TraceabilityError::InternalTrace2eError);
449                                    }
450                                };
451                                match compliance
452                                    .call(ComplianceRequest::EvalPolicies {
453                                        source_policies,
454                                        destination: destination.clone(),
455                                    })
456                                    .await
457                                {
458                                    Ok(ComplianceResponse::Grant) => {
459                                        flow_map.insert(flow_id, (source, destination));
460                                        Ok(P2mResponse::Grant(flow_id))
461                                    }
462                                    Err(TraceabilityError::DirectPolicyViolation) => {
463                                        // Release the flow if the policy is violated
464                                        #[cfg(feature = "trace2e_tracing")]
465                                        info!(
466                                            "[p2m-{}] Release flow: {:?} as it is not compliant",
467                                            provenance.node_id(),
468                                            flow_id
469                                        );
470                                        sequencer
471                                            .call(SequencerRequest::ReleaseFlow { destination })
472                                            .await?;
473                                        Err(TraceabilityError::DirectPolicyViolation)
474                                    }
475                                    _ => Err(TraceabilityError::InternalTrace2eError),
476                                }
477                            }
478                            _ => Err(TraceabilityError::InternalTrace2eError),
479                        }
480                    } else {
481                        Err(TraceabilityError::UndeclaredResource(pid, fd))
482                    }
483                }
484                P2mRequest::IoReport { grant_id, .. } => {
485                    if let Some((_, (source, destination))) = flow_map.remove(&grant_id) {
486                        #[cfg(feature = "trace2e_tracing")]
487                        info!(
488                            "[p2m-{}] IoReport: source: {:?}, destination: {:?}",
489                            provenance.node_id(),
490                            source,
491                            destination
492                        );
493                        if let Some(remote_stream) = destination.is_stream() {
494                            match provenance
495                                .call(ProvenanceRequest::GetReferences(source.clone()))
496                                .await?
497                            {
498                                ProvenanceResponse::Provenance(references) => {
499                                    m2m.ready()
500                                        .await?
501                                        .call(M2mRequest::UpdateProvenance {
502                                            source_prov: references,
503                                            destination: remote_stream,
504                                        })
505                                        .await?;
506                                }
507                                _ => return Err(TraceabilityError::InternalTrace2eError),
508                            };
509                        }
510
511                        provenance
512                            .call(ProvenanceRequest::UpdateProvenance {
513                                source,
514                                destination: destination.clone(),
515                            })
516                            .await?;
517
518                        sequencer.call(SequencerRequest::ReleaseFlow { destination }).await?;
519                        Ok(P2mResponse::Ack)
520                    } else {
521                        Err(TraceabilityError::NotFoundFlow(grant_id))
522                    }
523                }
524            }
525        })
526    }
527}
528
529#[cfg(test)]
530mod tests {
531    use tower::Service;
532
533    use super::*;
534    use crate::{
535        traceability::core::{
536            compliance::ComplianceService, provenance::ProvenanceService,
537            sequencer::SequencerService,
538        },
539        transport::nop::M2mNop,
540    };
541
542    #[tokio::test]
543    async fn unit_trace2e_service_request_response() {
544        #[cfg(feature = "trace2e_tracing")]
545        crate::trace2e_tracing::init();
546        let mut p2m_service = P2mApiService::new(
547            SequencerService::default(),
548            ProvenanceService::default(),
549            ComplianceService::default(),
550            M2mNop,
551        );
552
553        assert_eq!(
554            p2m_service
555                .call(P2mRequest::LocalEnroll { pid: 1, fd: 3, path: "/tmp/test.txt".to_string() })
556                .await
557                .unwrap(),
558            P2mResponse::Ack
559        );
560        assert_eq!(
561            p2m_service
562                .call(P2mRequest::RemoteEnroll {
563                    pid: 1,
564                    fd: 3,
565                    local_socket: "127.0.0.1:8080".to_string(),
566                    peer_socket: "127.0.0.1:8081".to_string()
567                })
568                .await
569                .unwrap(),
570            P2mResponse::Ack
571        );
572
573        let P2mResponse::Grant(flow_id) =
574            p2m_service.call(P2mRequest::IoRequest { pid: 1, fd: 3, output: true }).await.unwrap()
575        else {
576            panic!("Expected P2mResponse::Grant");
577        };
578        assert_eq!(
579            p2m_service
580                .call(P2mRequest::IoReport { pid: 1, fd: 3, grant_id: flow_id, result: true })
581                .await
582                .unwrap(),
583            P2mResponse::Ack
584        );
585
586        let P2mResponse::Grant(flow_id) =
587            p2m_service.call(P2mRequest::IoRequest { pid: 1, fd: 3, output: false }).await.unwrap()
588        else {
589            panic!("Expected P2mResponse::Grant");
590        };
591        assert_eq!(
592            p2m_service
593                .call(P2mRequest::IoReport { pid: 1, fd: 3, grant_id: flow_id, result: true })
594                .await
595                .unwrap(),
596            P2mResponse::Ack
597        );
598    }
599
600    #[tokio::test]
601    async fn unit_trace2e_service_validated_resources() {
602        #[cfg(feature = "trace2e_tracing")]
603        crate::trace2e_tracing::init();
604        let mut p2m_service = P2mApiService::new(
605            SequencerService::default(),
606            ProvenanceService::default(),
607            ComplianceService::default(),
608            M2mNop,
609        )
610        .with_resource_validation(true);
611
612        // Test with invalid process
613        // This request is supposed to be filtered out by the validator
614        assert_eq!(
615            p2m_service
616                .call(P2mRequest::LocalEnroll { pid: 0, fd: 3, path: "/tmp/test.txt".to_string() })
617                .await
618                .unwrap_err()
619                .to_string(),
620            "Traceability error, process not found (pid: 0)"
621        );
622
623        // Test successful process instantiation with validation
624        assert_eq!(
625            p2m_service
626                .call(P2mRequest::LocalEnroll {
627                    pid: std::process::id() as i32,
628                    fd: 3,
629                    path: "/tmp/test.txt".to_string()
630                })
631                .await
632                .unwrap(),
633            P2mResponse::Ack
634        );
635    }
636
637    #[tokio::test]
638    async fn unit_trace2e_service_io_invalid_request() {
639        #[cfg(feature = "trace2e_tracing")]
640        crate::trace2e_tracing::init();
641        let mut p2m_service = P2mApiService::new(
642            SequencerService::default(),
643            ProvenanceService::default(),
644            ComplianceService::default(),
645            M2mNop,
646        )
647        .with_resource_validation(true);
648
649        // Neither process nor fd are enrolled
650        assert_eq!(
651            p2m_service
652                .call(P2mRequest::IoRequest { pid: std::process::id() as i32, fd: 3, output: true })
653                .await
654                .unwrap_err()
655                .to_string(),
656            format!(
657                "Traceability error, undeclared resource (pid: {}, fd: 3)",
658                std::process::id() as i32
659            )
660        );
661
662        p2m_service
663            .call(P2mRequest::LocalEnroll {
664                pid: std::process::id() as i32,
665                fd: 4,
666                path: "/tmp/test.txt".to_string(),
667            })
668            .await
669            .unwrap();
670
671        // Only process is enrolled
672        assert_eq!(
673            p2m_service
674                .call(P2mRequest::IoRequest { pid: std::process::id() as i32, fd: 3, output: true })
675                .await
676                .unwrap_err()
677                .to_string(),
678            format!(
679                "Traceability error, undeclared resource (pid: {}, fd: 3)",
680                std::process::id() as i32
681            )
682        );
683    }
684
685    #[tokio::test]
686    async fn unit_trace2e_service_io_invalid_report() {
687        #[cfg(feature = "trace2e_tracing")]
688        crate::trace2e_tracing::init();
689        let mut p2m_service = P2mApiService::new(
690            SequencerService::default(),
691            ProvenanceService::default(),
692            ComplianceService::default(),
693            M2mNop,
694        )
695        .with_resource_validation(true);
696
697        // Invalid grant id
698        assert_eq!(
699            p2m_service
700                .call(P2mRequest::IoReport { pid: 1, fd: 3, grant_id: 0, result: true })
701                .await
702                .unwrap_err()
703                .to_string(),
704            "Traceability error, flow not found (id: 0)"
705        );
706    }
707
708    #[tokio::test]
709    async fn unit_trace2e_service_integrated_validation() {
710        #[cfg(feature = "trace2e_tracing")]
711        crate::trace2e_tracing::init();
712
713        // Test P2M service with integrated validation enabled
714        let mut p2m_service_with_validation = P2mApiService::new(
715            SequencerService::default(),
716            ProvenanceService::default(),
717            ComplianceService::default(),
718            M2mNop,
719        )
720        .with_resource_validation(true);
721
722        // Test P2M service with validation disabled
723        let mut p2m_service_without_validation = P2mApiService::new(
724            SequencerService::default(),
725            ProvenanceService::default(),
726            ComplianceService::default(),
727            M2mNop,
728        )
729        .with_resource_validation(false);
730
731        // Test invalid process - should fail with validation enabled
732        assert_eq!(
733            p2m_service_with_validation
734                .call(P2mRequest::LocalEnroll { pid: 0, fd: 3, path: "/tmp/test.txt".to_string() })
735                .await
736                .unwrap_err()
737                .to_string(),
738            "Traceability error, process not found (pid: 0)"
739        );
740
741        // Test invalid process - should succeed with validation disabled
742        assert_eq!(
743            p2m_service_without_validation
744                .call(P2mRequest::LocalEnroll { pid: 0, fd: 3, path: "/tmp/test.txt".to_string() })
745                .await
746                .unwrap(),
747            P2mResponse::Ack
748        );
749
750        // Test valid process - should succeed with validation enabled
751        assert_eq!(
752            p2m_service_with_validation
753                .call(P2mRequest::LocalEnroll {
754                    pid: std::process::id() as i32,
755                    fd: 3,
756                    path: "/tmp/test.txt".to_string()
757                })
758                .await
759                .unwrap(),
760            P2mResponse::Ack
761        );
762
763        // Test valid process - should succeed with validation disabled
764        assert_eq!(
765            p2m_service_without_validation
766                .call(P2mRequest::LocalEnroll {
767                    pid: std::process::id() as i32,
768                    fd: 3,
769                    path: "/tmp/test.txt".to_string()
770                })
771                .await
772                .unwrap(),
773            P2mResponse::Ack
774        );
775    }
776}