trace2e_core/traceability/api/
p2m.rs

1//! Process-to-Middleware (P2M) API service implementation.
2//!
3//! This module provides the core service implementation for handling requests from application
4//! processes that need traceability tracking for their I/O operations. The P2M API is the
5//! primary interface through which applications integrate with the trace2e system.
6//!
7//! ## Service Architecture
8//!
9//! The `P2mApiService` acts as the central coordinator between four key services:
10//! - **Sequencer Service**: Manages flow ordering and resource reservations
11//! - **Provenance Service**: Tracks data provenance  
12//! - **Compliance Service**: Enforces policies and authorization decisions
13//! - **M2M Client**: Communicates with remote middleware for distributed flows
14//!
15//! ## Resource Management
16//!
17//! The service maintains two primary data structures:
18//! - **Resource Map**: Associates process/file descriptor pairs with source/destination resources
19//! - **Flow Map**: Tracks active flows by grant ID for operation completion reporting
20//!
21//! ## Operation Workflow
22//!
23//! 1. **Enrollment**: Processes register their files and streams before use
24//! 2. **Authorization**: Processes request permission for specific I/O operations
25//! 3. **Execution**: Middleware evaluates policies and grants/denies access
26//! 4. **Reporting**: Processes report completion status for audit trails
27//!
28//! ## Cross-Node Coordination
29//!
30//! For distributed flows involving remote resources, the service coordinates with
31//! remote middleware instances via the M2M API to ensure consistent policy
32//! enforcement and provenance tracking across the network.
33
34use std::{
35    collections::HashSet, future::Future, pin::Pin, sync::Arc, task::Poll, time::SystemTime,
36};
37
38use dashmap::DashMap;
39use tower::{Service, ServiceExt};
40use tracing::{debug, info};
41
42use crate::traceability::{
43    api::types::{
44        ComplianceRequest, ComplianceResponse, M2mRequest, M2mResponse, P2mRequest, P2mResponse,
45        ProvenanceRequest, ProvenanceResponse, SequencerRequest, SequencerResponse,
46    },
47    error::TraceabilityError,
48    infrastructure::{
49        naming::{NodeId, Resource},
50        validation::ResourceValidator,
51    },
52};
53
54/// Maps (process_id, file_descriptor) to (source_resource, destination_resource) pairs
55type ResourceMap = DashMap<(i32, i32), (Resource, Resource)>;
56/// Maps flow_id to (source_resource, destination_resource) pairs for active flows
57type FlowMap = DashMap<u128, (Resource, Resource)>;
58
59/// P2M (Process-to-Middleware) API Service.
60///
61/// Central orchestrator for process-initiated traceability operations. This service
62/// manages the complete lifecycle of tracked I/O operations from initial resource
63/// enrollment through final completion reporting.
64///
65/// ## Core Responsibilities
66///
67/// **Resource Enrollment**: Maintains registry of process file descriptors and their
68/// associated resources (files or network streams) for traceability tracking.
69///
70/// **Flow Authorization**: Coordinates with compliance and sequencer services to
71/// evaluate whether requested I/O operations should be permitted based on current policies.
72///
73/// **Distributed Coordination**: Communicates with remote middleware instances for
74/// cross-node flows, ensuring consistent policy enforcement across the network.
75///
76/// **Provenance Tracking**: Updates provenance records following successful operations
77/// to maintain complete audit trails for compliance and governance.
78///
79/// ## Concurrency and State Management
80///
81/// Uses concurrent data structures (`DashMap`) to handle multiple simultaneous requests
82/// from different processes while maintaining consistency. Resource and flow maps are
83/// shared across service instances using `Arc` for efficient cloning.
84///
85/// ## Generic Type Parameters
86///
87/// - `S`: Sequencer service for flow coordination and resource reservations
88/// - `P`: Provenance service for provenance tracking
89/// - `C`: Compliance service for policy evaluation and authorization decisions  
90/// - `M`: M2M client service for communication with remote middleware instances
91#[derive(Debug, Clone)]
92pub struct P2mApiService<S, P, C, M> {
93    /// Maps (process_id, file_descriptor) to (source_resource, destination_resource) pairs
94    resource_map: Arc<ResourceMap>,
95    /// Maps flow_id to (source_resource, destination_resource) pairs for active flows
96    flow_map: Arc<FlowMap>,
97    /// Service for managing flows sequencing
98    sequencer: S,
99    /// Service for tracking resources provenance
100    provenance: P,
101    /// Service for policy management and compliance checking
102    compliance: C,
103    /// Client service for Middleware-to-Middleware communication
104    m2m: M,
105    /// Whether to perform resource validation on incoming requests
106    enable_resource_validation: bool,
107}
108
109impl<S, P, C, M> P2mApiService<S, P, C, M> {
110    /// Creates a new P2M API service with the provided component services.
111    ///
112    /// Initializes empty resource and flow maps and stores references to the
113    /// core services needed for traceability operations. The service is ready
114    /// to handle process requests immediately after construction.
115    ///
116    /// # Arguments
117    /// * `sequencer` - Service for flow coordination and resource reservations
118    /// * `provenance` - Service for provenance tracking
119    /// * `compliance` - Service for policy evaluation and authorization decisions
120    /// * `m2m` - Client for communication with remote middleware instances
121    pub fn new(sequencer: S, provenance: P, compliance: C, m2m: M) -> Self {
122        Self {
123            resource_map: Arc::new(ResourceMap::new()),
124            flow_map: Arc::new(FlowMap::new()),
125            sequencer,
126            provenance,
127            compliance,
128            m2m,
129            enable_resource_validation: false,
130        }
131    }
132
133    /// Enables or disables resource validation for incoming P2M requests.
134    ///
135    /// When validation is enabled, all incoming requests are validated for:
136    /// - Valid process IDs (must correspond to running processes)
137    /// - Valid stream addresses (must be well-formed and compatible)
138    ///
139    /// This method uses the same ResourceValidator logic as the Tower filter
140    /// but integrates it directly into the service to avoid complex Send/Sync
141    /// constraints with async runtimes.
142    ///
143    /// # Arguments
144    /// * `enable` - Whether to enable resource validation
145    ///
146    /// # Returns
147    /// Self with validation setting applied
148    pub fn with_enrolled_resource(
149        self,
150        pid: i32,
151        fd: i32,
152        source: Resource,
153        destination: Resource,
154    ) -> Self {
155        self.resource_map.insert((pid, fd), (source, destination));
156        self
157    }
158
159    pub fn with_resource_validation(mut self, enable: bool) -> Self {
160        self.enable_resource_validation = enable;
161        self
162    }
163
164    /// Validates a P2M request according to resource requirements.
165    ///
166    /// Applies the same validation rules as the ResourceValidator:
167    /// - `RemoteEnroll`: Validates both process and stream resources
168    /// - `LocalEnroll`, `IoRequest`: Validates process resources only  
169    /// - `IoReport`: Passes through without validation (grant ID is validated later)
170    ///
171    /// # Arguments
172    /// * `request` - The P2M request to validate
173    ///
174    /// # Returns
175    /// `Ok(())` if validation passes, `Err(TraceabilityError)` if validation fails
176    ///
177    /// # Errors
178    /// - `InvalidProcess`: When the process ID is not found or accessible
179    /// - `InvalidStream`: When socket addresses are malformed or incompatible
180    fn validate_request(request: &P2mRequest) -> Result<&P2mRequest, TraceabilityError> {
181        // Use the same validation logic as the ResourceValidator
182        match request {
183            P2mRequest::RemoteEnroll { pid, local_socket, peer_socket, .. } => {
184                if ResourceValidator.is_valid_process(*pid) {
185                    if ResourceValidator.is_valid_stream(local_socket, peer_socket) {
186                        Ok(request)
187                    } else {
188                        Err(TraceabilityError::InvalidStream(
189                            local_socket.clone(),
190                            peer_socket.clone(),
191                        ))
192                    }
193                } else {
194                    Err(TraceabilityError::InvalidProcess(*pid))
195                }
196            }
197            P2mRequest::LocalEnroll { pid, .. } | P2mRequest::IoRequest { pid, .. } => {
198                if ResourceValidator.is_valid_process(*pid) {
199                    Ok(request)
200                } else {
201                    Err(TraceabilityError::InvalidProcess(*pid))
202                }
203            }
204            P2mRequest::IoReport { .. } => Ok(request),
205        }
206    }
207
208    fn flow_id() -> u128 {
209        SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_nanos()
210    }
211}
212
213impl<S, P, C, M> Service<P2mRequest> for P2mApiService<S, P, C, M>
214where
215    S: Service<SequencerRequest, Response = SequencerResponse, Error = TraceabilityError>
216        + Clone
217        + Send
218        + 'static,
219    S::Future: Send,
220    P: Service<ProvenanceRequest, Response = ProvenanceResponse, Error = TraceabilityError>
221        + Clone
222        + Send
223        + NodeId
224        + 'static,
225    P::Future: Send,
226    C: Service<ComplianceRequest, Response = ComplianceResponse, Error = TraceabilityError>
227        + Clone
228        + Send
229        + 'static,
230    C::Future: Send,
231    M: Service<M2mRequest, Response = M2mResponse, Error = TraceabilityError>
232        + Clone
233        + Send
234        + 'static,
235    M::Future: Send,
236{
237    type Response = P2mResponse;
238    type Error = TraceabilityError;
239    type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
240
241    fn poll_ready(&mut self, _: &mut std::task::Context<'_>) -> Poll<Result<(), Self::Error>> {
242        Poll::Ready(Ok(()))
243    }
244
245    fn call(&mut self, request: P2mRequest) -> Self::Future {
246        let resource_map = self.resource_map.clone();
247        let flow_map = self.flow_map.clone();
248        let mut sequencer = self.sequencer.clone();
249        let mut provenance = self.provenance.clone();
250        let mut compliance = self.compliance.clone();
251        let mut m2m = self.m2m.clone();
252        let enable_validation = self.enable_resource_validation;
253
254        Box::pin(async move {
255            // Perform resource validation if enabled
256            if enable_validation {
257                Self::validate_request(&request)?;
258            }
259
260            match request {
261                P2mRequest::LocalEnroll { pid, fd, path } => {
262                    info!(
263                        node_id = %provenance.node_id(),
264                        pid = %pid,
265                        fd = %fd,
266                        path = %path,
267                        "[p2m] LocalEnroll"
268                    );
269                    resource_map
270                        .insert((pid, fd), (Resource::new_process(pid), Resource::new_file(path)));
271                    Ok(P2mResponse::Ack)
272                }
273                P2mRequest::RemoteEnroll { pid, fd, local_socket, peer_socket } => {
274                    info!(
275                        node_id = %provenance.node_id(),
276                        pid = %pid,
277                        fd = %fd,
278                        local_socket = %local_socket,
279                        peer_socket = %peer_socket,
280                        "[p2m] RemoteEnroll"
281                    );
282                    resource_map.insert(
283                        (pid, fd),
284                        (
285                            Resource::new_process(pid),
286                            Resource::new_stream(local_socket, peer_socket),
287                        ),
288                    );
289                    Ok(P2mResponse::Ack)
290                }
291                P2mRequest::IoRequest { pid, fd, output } => {
292                    if let Some(resource) = resource_map.get(&(pid, fd)) {
293                        let (source, destination) = if output {
294                            (resource.0.to_owned(), resource.1.to_owned())
295                        } else {
296                            (resource.1.to_owned(), resource.0.to_owned())
297                        };
298                        info!(
299                            node_id = %provenance.node_id(),
300                            source = %source,
301                            destination = %destination,
302                            "[p2m] IoRequest"
303                        );
304                        match sequencer
305                            .call(SequencerRequest::ReserveFlow {
306                                source: source.clone(),
307                                destination: destination.clone(),
308                            })
309                            .await
310                        {
311                            Ok(SequencerResponse::FlowReserved) => {
312                                let localized_destination =
313                                    destination.clone().into_localized(provenance.node_id());
314                                let destination_policy = if localized_destination
315                                    .resource()
316                                    .is_stream()
317                                {
318                                    debug!(
319                                        node_id = %provenance.node_id(),
320                                        destination = %localized_destination,
321                                        "[p2m] Querying destination policy for remote stream"
322                                    );
323                                    match m2m
324                                        .ready()
325                                        .await?
326                                        .call(M2mRequest::GetDestinationPolicy(
327                                            localized_destination.clone(),
328                                        ))
329                                        .await
330                                    {
331                                        Ok(M2mResponse::DestinationPolicy(policy)) => Some(policy),
332                                        _ => None, // anyway, errors are handled later, but this may be improved
333                                    }
334                                } else {
335                                    None
336                                };
337                                let flow_id = match provenance
338                                    .call(ProvenanceRequest::GetReferences(source.clone()))
339                                    .await
340                                {
341                                    Ok(ProvenanceResponse::Provenance(references)) => {
342                                        let (local_references, remote_references): (
343                                            HashSet<_>,
344                                            HashSet<_>,
345                                        ) = references
346                                            .into_iter()
347                                            .partition(|r| *r.node_id() == provenance.node_id());
348                                        match compliance
349                                            .call(ComplianceRequest::EvalCompliance {
350                                                sources: local_references
351                                                    .iter()
352                                                    .map(|r| r.resource().to_owned())
353                                                    .collect(),
354                                                destination: localized_destination.clone(),
355                                                destination_policy: destination_policy.clone(),
356                                            })
357                                            .await
358                                        {
359                                            Ok(ComplianceResponse::Grant) => {
360                                                // Local compliance check passed. If we have no remote references, we can
361                                                // grant the flow, otherwise we need to check the compliance of the remote nodes.
362                                                // Destination policy is required for remote sources compliance checking.
363                                                if remote_references.is_empty() {
364                                                    debug!(
365                                                        node_id = %provenance.node_id(),
366                                                        "[p2m] Local compliance check passed, granting flow"
367                                                    );
368                                                    Ok(Self::flow_id())
369                                                } else {
370                                                    // It the destination is not a stream, so it is a local resource, we can get the policy from the compliance service
371                                                    // This could have been done earlier, but we do it here, to make this call only when it is really needed.
372                                                    let destination_policy = if let Some(policy) =
373                                                        destination_policy
374                                                    {
375                                                        policy
376                                                    } else if !destination.is_stream() {
377                                                        match compliance.call(ComplianceRequest::GetPolicy(destination.clone())).await {
378                                                            Ok(ComplianceResponse::Policy(policy)) => policy,
379                                                            _ => return Err(TraceabilityError::InternalTrace2eError),
380                                                        }
381                                                    } else {
382                                                        return Err(
383                                                            TraceabilityError::InternalTrace2eError,
384                                                        );
385                                                    };
386                                                    debug!(
387                                                        node_id = %provenance.node_id(),
388                                                        "[p2m] Querying remote sources compliance"
389                                                    );
390                                                    match m2m
391                                                        .ready()
392                                                        .await?
393                                                        .call(M2mRequest::CheckSourceCompliance {
394                                                            sources: remote_references,
395                                                            destination: (
396                                                                localized_destination,
397                                                                destination_policy,
398                                                            ),
399                                                        })
400                                                        .await
401                                                    {
402                                                        Ok(M2mResponse::Ack) => {
403                                                            // Remote sources compliance check passed
404                                                            // Flow can be granted, return the flow id
405                                                            Ok(Self::flow_id())
406                                                        }
407                                                        Err(e) => Err(e),
408                                                        _ => Err(
409                                                            TraceabilityError::InternalTrace2eError,
410                                                        ),
411                                                    }
412                                                }
413                                            }
414                                            Err(e) => Err(e),
415                                            _ => Err(TraceabilityError::InternalTrace2eError),
416                                        }
417                                    }
418                                    Err(e) => Err(e),
419                                    _ => Err(TraceabilityError::InternalTrace2eError),
420                                };
421                                match flow_id {
422                                    Ok(flow_id) => {
423                                        // Compliance check passed, flow can be granted, return the flow id
424                                        flow_map.insert(flow_id, (source, destination));
425                                        Ok(P2mResponse::Grant(flow_id))
426                                    }
427                                    Err(e) => {
428                                        debug!(
429                                            node_id = %provenance.node_id(),
430                                            error = ?e,
431                                            "[p2m] Compliance check failed, releasing flow"
432                                        );
433                                        // release the flow, and then forward the error
434                                        sequencer
435                                            .call(SequencerRequest::ReleaseFlow { destination })
436                                            .await?;
437                                        Err(e)
438                                    }
439                                }
440                            }
441                            _ => Err(TraceabilityError::InternalTrace2eError),
442                        }
443                    } else {
444                        Err(TraceabilityError::UndeclaredResource(pid, fd))
445                    }
446                }
447                P2mRequest::IoReport { grant_id, .. } => {
448                    if let Some((_, (source, destination))) = flow_map.remove(&grant_id) {
449                        info!(
450                            node_id = %provenance.node_id(),
451                            source = %source,
452                            destination = %destination,
453                            "[p2m] IoReport"
454                        );
455                        if let Some(remote_stream) = destination.try_into_localized_peer_stream() {
456                            match provenance
457                                .call(ProvenanceRequest::GetReferences(source.clone()))
458                                .await?
459                            {
460                                ProvenanceResponse::Provenance(references) => {
461                                    debug!(
462                                        remote_node_id = remote_stream.node_id(),
463                                        "[p2m] Updating remote provenance"
464                                    );
465                                    m2m.ready()
466                                        .await?
467                                        .call(M2mRequest::UpdateProvenance {
468                                            source_prov: references,
469                                            destination: remote_stream,
470                                        })
471                                        .await?;
472                                }
473                                _ => return Err(TraceabilityError::InternalTrace2eError),
474                            };
475                        } else {
476                            info!(
477                                node_id = %provenance.node_id(),
478                                "[p2m] Updating local provenance"
479                            );
480                            provenance
481                                .call(ProvenanceRequest::UpdateProvenance {
482                                    source,
483                                    destination: destination.clone(),
484                                })
485                                .await?;
486                        }
487
488                        sequencer.call(SequencerRequest::ReleaseFlow { destination }).await?;
489                        Ok(P2mResponse::Ack)
490                    } else {
491                        Err(TraceabilityError::NotFoundFlow(grant_id))
492                    }
493                }
494            }
495        })
496    }
497}
498
499#[cfg(test)]
500mod tests {
501    use tower::Service;
502
503    use super::*;
504    use crate::{
505        traceability::services::{
506            compliance::ComplianceService, provenance::ProvenanceService,
507            sequencer::SequencerService,
508        },
509        transport::nop::M2mNop,
510    };
511
512    #[tokio::test]
513    async fn unit_trace2e_service_request_response() {
514        crate::trace2e_tracing::init();
515        let mut p2m_service = P2mApiService::new(
516            SequencerService::default(),
517            ProvenanceService::default(),
518            ComplianceService::default(),
519            M2mNop,
520        );
521
522        assert_eq!(
523            p2m_service
524                .call(P2mRequest::LocalEnroll { pid: 1, fd: 3, path: "/tmp/test.txt".to_string() })
525                .await
526                .unwrap(),
527            P2mResponse::Ack
528        );
529        assert_eq!(
530            p2m_service
531                .call(P2mRequest::RemoteEnroll {
532                    pid: 1,
533                    fd: 3,
534                    local_socket: "127.0.0.1:8080".to_string(),
535                    peer_socket: "127.0.0.1:8081".to_string()
536                })
537                .await
538                .unwrap(),
539            P2mResponse::Ack
540        );
541
542        let P2mResponse::Grant(flow_id) =
543            p2m_service.call(P2mRequest::IoRequest { pid: 1, fd: 3, output: true }).await.unwrap()
544        else {
545            panic!("Expected P2mResponse::Grant");
546        };
547        assert_eq!(
548            p2m_service
549                .call(P2mRequest::IoReport { pid: 1, fd: 3, grant_id: flow_id, result: true })
550                .await
551                .unwrap(),
552            P2mResponse::Ack
553        );
554
555        let P2mResponse::Grant(flow_id) =
556            p2m_service.call(P2mRequest::IoRequest { pid: 1, fd: 3, output: false }).await.unwrap()
557        else {
558            panic!("Expected P2mResponse::Grant");
559        };
560        assert_eq!(
561            p2m_service
562                .call(P2mRequest::IoReport { pid: 1, fd: 3, grant_id: flow_id, result: true })
563                .await
564                .unwrap(),
565            P2mResponse::Ack
566        );
567    }
568
569    #[tokio::test]
570    async fn unit_trace2e_service_validated_resources() {
571        crate::trace2e_tracing::init();
572        let mut p2m_service = P2mApiService::new(
573            SequencerService::default(),
574            ProvenanceService::default(),
575            ComplianceService::default(),
576            M2mNop,
577        )
578        .with_resource_validation(true);
579
580        // Test with invalid process
581        // This request is supposed to be filtered out by the validator
582        assert_eq!(
583            p2m_service
584                .call(P2mRequest::LocalEnroll { pid: 0, fd: 3, path: "/tmp/test.txt".to_string() })
585                .await
586                .unwrap_err()
587                .to_string(),
588            "Traceability error, process not found (pid: 0)"
589        );
590
591        // Test successful process instantiation with validation
592        assert_eq!(
593            p2m_service
594                .call(P2mRequest::LocalEnroll {
595                    pid: std::process::id() as i32,
596                    fd: 3,
597                    path: "/tmp/test.txt".to_string()
598                })
599                .await
600                .unwrap(),
601            P2mResponse::Ack
602        );
603    }
604
605    #[tokio::test]
606    async fn unit_trace2e_service_io_invalid_request() {
607        crate::trace2e_tracing::init();
608        let mut p2m_service = P2mApiService::new(
609            SequencerService::default(),
610            ProvenanceService::default(),
611            ComplianceService::default(),
612            M2mNop,
613        )
614        .with_resource_validation(true);
615
616        // Neither process nor fd are enrolled
617        assert_eq!(
618            p2m_service
619                .call(P2mRequest::IoRequest { pid: std::process::id() as i32, fd: 3, output: true })
620                .await
621                .unwrap_err()
622                .to_string(),
623            format!(
624                "Traceability error, undeclared resource (pid: {}, fd: 3)",
625                std::process::id() as i32
626            )
627        );
628
629        p2m_service
630            .call(P2mRequest::LocalEnroll {
631                pid: std::process::id() as i32,
632                fd: 4,
633                path: "/tmp/test.txt".to_string(),
634            })
635            .await
636            .unwrap();
637
638        // Only process is enrolled
639        assert_eq!(
640            p2m_service
641                .call(P2mRequest::IoRequest { pid: std::process::id() as i32, fd: 3, output: true })
642                .await
643                .unwrap_err()
644                .to_string(),
645            format!(
646                "Traceability error, undeclared resource (pid: {}, fd: 3)",
647                std::process::id() as i32
648            )
649        );
650    }
651
652    #[tokio::test]
653    async fn unit_trace2e_service_io_invalid_report() {
654        crate::trace2e_tracing::init();
655        let mut p2m_service = P2mApiService::new(
656            SequencerService::default(),
657            ProvenanceService::default(),
658            ComplianceService::default(),
659            M2mNop,
660        )
661        .with_resource_validation(true);
662
663        // Invalid grant id
664        assert_eq!(
665            p2m_service
666                .call(P2mRequest::IoReport { pid: 1, fd: 3, grant_id: 0, result: true })
667                .await
668                .unwrap_err()
669                .to_string(),
670            "Traceability error, flow not found (id: 0)"
671        );
672    }
673
674    #[tokio::test]
675    async fn unit_trace2e_service_integrated_validation() {
676        crate::trace2e_tracing::init();
677
678        // Test P2M service with integrated validation enabled
679        let mut p2m_service_with_validation = P2mApiService::new(
680            SequencerService::default(),
681            ProvenanceService::default(),
682            ComplianceService::default(),
683            M2mNop,
684        )
685        .with_resource_validation(true);
686
687        // Test P2M service with validation disabled
688        let mut p2m_service_without_validation = P2mApiService::new(
689            SequencerService::default(),
690            ProvenanceService::default(),
691            ComplianceService::default(),
692            M2mNop,
693        )
694        .with_resource_validation(false);
695
696        // Test invalid process - should fail with validation enabled
697        assert_eq!(
698            p2m_service_with_validation
699                .call(P2mRequest::LocalEnroll { pid: 0, fd: 3, path: "/tmp/test.txt".to_string() })
700                .await
701                .unwrap_err()
702                .to_string(),
703            "Traceability error, process not found (pid: 0)"
704        );
705
706        // Test invalid process - should succeed with validation disabled
707        assert_eq!(
708            p2m_service_without_validation
709                .call(P2mRequest::LocalEnroll { pid: 0, fd: 3, path: "/tmp/test.txt".to_string() })
710                .await
711                .unwrap(),
712            P2mResponse::Ack
713        );
714
715        // Test valid process - should succeed with validation enabled
716        assert_eq!(
717            p2m_service_with_validation
718                .call(P2mRequest::LocalEnroll {
719                    pid: std::process::id() as i32,
720                    fd: 3,
721                    path: "/tmp/test.txt".to_string()
722                })
723                .await
724                .unwrap(),
725            P2mResponse::Ack
726        );
727
728        // Test valid process - should succeed with validation disabled
729        assert_eq!(
730            p2m_service_without_validation
731                .call(P2mRequest::LocalEnroll {
732                    pid: std::process::id() as i32,
733                    fd: 3,
734                    path: "/tmp/test.txt".to_string()
735                })
736                .await
737                .unwrap(),
738            P2mResponse::Ack
739        );
740    }
741}