trace2e_core/traceability/p2m.rs
1//! Process-to-Middleware (P2M) API service implementation.
2//!
3//! This module provides the core service implementation for handling requests from application
4//! processes that need traceability tracking for their I/O operations. The P2M API is the
5//! primary interface through which applications integrate with the trace2e system.
6//!
7//! ## Service Architecture
8//!
9//! The `P2mApiService` acts as the central coordinator between four key services:
10//! - **Sequencer Service**: Manages flow ordering and resource reservations
11//! - **Provenance Service**: Tracks data provenance
12//! - **Compliance Service**: Enforces policies and authorization decisions
13//! - **M2M Client**: Communicates with remote middleware for distributed flows
14//!
15//! ## Resource Management
16//!
17//! The service maintains two primary data structures:
18//! - **Resource Map**: Associates process/file descriptor pairs with source/destination resources
19//! - **Flow Map**: Tracks active flows by grant ID for operation completion reporting
20//!
21//! ## Operation Workflow
22//!
23//! 1. **Enrollment**: Processes register their files and streams before use
24//! 2. **Authorization**: Processes request permission for specific I/O operations
25//! 3. **Execution**: Middleware evaluates policies and grants/denies access
26//! 4. **Reporting**: Processes report completion status for audit trails
27//!
28//! ## Cross-Node Coordination
29//!
30//! For distributed flows involving remote resources, the service coordinates with
31//! remote middleware instances via the M2M API to ensure consistent policy
32//! enforcement and provenance tracking across the network.
33
34use std::{
35 collections::HashMap, future::Future, pin::Pin, sync::Arc, task::Poll, time::SystemTime,
36};
37
38use dashmap::DashMap;
39use tower::{Service, ServiceExt};
40#[cfg(feature = "trace2e_tracing")]
41use tracing::{debug, info};
42
43use crate::traceability::{
44 api::{
45 ComplianceRequest, ComplianceResponse, M2mRequest, M2mResponse, P2mRequest, P2mResponse,
46 ProvenanceRequest, ProvenanceResponse, SequencerRequest, SequencerResponse,
47 },
48 error::TraceabilityError,
49 naming::{NodeId, Resource},
50 validation::ResourceValidator,
51};
52
53/// Maps (process_id, file_descriptor) to (source_resource, destination_resource) pairs
54type ResourceMap = DashMap<(i32, i32), (Resource, Resource)>;
55/// Maps flow_id to (source_resource, destination_resource) pairs for active flows
56type FlowMap = DashMap<u128, (Resource, Resource)>;
57
58/// P2M (Process-to-Middleware) API Service.
59///
60/// Central orchestrator for process-initiated traceability operations. This service
61/// manages the complete lifecycle of tracked I/O operations from initial resource
62/// enrollment through final completion reporting.
63///
64/// ## Core Responsibilities
65///
66/// **Resource Enrollment**: Maintains registry of process file descriptors and their
67/// associated resources (files or network streams) for traceability tracking.
68///
69/// **Flow Authorization**: Coordinates with compliance and sequencer services to
70/// evaluate whether requested I/O operations should be permitted based on current policies.
71///
72/// **Distributed Coordination**: Communicates with remote middleware instances for
73/// cross-node flows, ensuring consistent policy enforcement across the network.
74///
75/// **Provenance Tracking**: Updates provenance records following successful operations
76/// to maintain complete audit trails for compliance and governance.
77///
78/// ## Concurrency and State Management
79///
80/// Uses concurrent data structures (`DashMap`) to handle multiple simultaneous requests
81/// from different processes while maintaining consistency. Resource and flow maps are
82/// shared across service instances using `Arc` for efficient cloning.
83///
84/// ## Generic Type Parameters
85///
86/// - `S`: Sequencer service for flow coordination and resource reservations
87/// - `P`: Provenance service for provenance tracking
88/// - `C`: Compliance service for policy evaluation and authorization decisions
89/// - `M`: M2M client service for communication with remote middleware instances
90#[derive(Debug, Clone)]
91pub struct P2mApiService<S, P, C, M> {
92 /// Maps (process_id, file_descriptor) to (source_resource, destination_resource) pairs
93 resource_map: Arc<ResourceMap>,
94 /// Maps flow_id to (source_resource, destination_resource) pairs for active flows
95 flow_map: Arc<FlowMap>,
96 /// Service for managing flows sequencing
97 sequencer: S,
98 /// Service for tracking resources provenance
99 provenance: P,
100 /// Service for policy management and compliance checking
101 compliance: C,
102 /// Client service for Middleware-to-Middleware communication
103 m2m: M,
104 /// Whether to perform resource validation on incoming requests
105 enable_resource_validation: bool,
106}
107
108impl<S, P, C, M> P2mApiService<S, P, C, M> {
109 /// Creates a new P2M API service with the provided component services.
110 ///
111 /// Initializes empty resource and flow maps and stores references to the
112 /// core services needed for traceability operations. The service is ready
113 /// to handle process requests immediately after construction.
114 ///
115 /// # Arguments
116 /// * `sequencer` - Service for flow coordination and resource reservations
117 /// * `provenance` - Service for provenance tracking
118 /// * `compliance` - Service for policy evaluation and authorization decisions
119 /// * `m2m` - Client for communication with remote middleware instances
120 pub fn new(sequencer: S, provenance: P, compliance: C, m2m: M) -> Self {
121 Self {
122 resource_map: Arc::new(ResourceMap::new()),
123 flow_map: Arc::new(FlowMap::new()),
124 sequencer,
125 provenance,
126 compliance,
127 m2m,
128 enable_resource_validation: false,
129 }
130 }
131
132 /// Pre-enrolls resources for testing and simulation purposes.
133 ///
134 /// Creates mock enrollments for the specified number of processes, files, and streams
135 /// to support testing scenarios without requiring actual process interactions.
136 /// Should only be used in test environments or for system benchmarking.
137 ///
138 /// # Arguments
139 /// * `process_count` - Number of mock processes to enroll
140 /// * `per_process_file_count` - Number of files to enroll per process
141 /// * `per_process_stream_count` - Number of streams to enroll per process
142 ///
143 /// # Returns
144 /// The service instance with pre-enrolled mock resources
145 #[cfg(test)]
146 pub fn with_enrolled_resources(
147 self,
148 process_count: u32,
149 per_process_file_count: u32,
150 per_process_stream_count: u32,
151 ) -> Self {
152 // Pre-calculate all entries to avoid repeated allocations during insertion
153 let file_entries: Vec<_> = (0..process_count as i32)
154 .flat_map(|process_id| {
155 (3..(per_process_file_count + 3) as i32).map(move |file_id| {
156 (
157 (process_id, file_id),
158 (
159 Resource::new_process_mock(process_id),
160 Resource::new_file(format!(
161 "/file_{}",
162 (process_id + file_id) % process_count as i32
163 )),
164 ),
165 )
166 })
167 })
168 .collect();
169 let stream_entries: Vec<_> = (0..process_count as i32)
170 .flat_map(|process_id| {
171 ((per_process_file_count + 3) as i32
172 ..(per_process_stream_count + per_process_file_count + 3) as i32)
173 .map(move |stream_id| {
174 (
175 (process_id, stream_id),
176 (
177 Resource::new_process_mock(process_id),
178 Resource::new_stream(
179 format!("127.0.0.1:{stream_id}",),
180 format!("127.0.0.2:{stream_id}",),
181 ),
182 ),
183 )
184 })
185 })
186 .collect();
187
188 // Batch insert all entries at once using DashMap's concurrent insert capabilities
189 for (key, value) in file_entries.into_iter().chain(stream_entries.into_iter()) {
190 self.resource_map.insert(key, value);
191 }
192 self
193 }
194
195 /// Enables or disables resource validation for incoming P2M requests.
196 ///
197 /// When validation is enabled, all incoming requests are validated for:
198 /// - Valid process IDs (must correspond to running processes)
199 /// - Valid stream addresses (must be well-formed and compatible)
200 ///
201 /// This method uses the same ResourceValidator logic as the Tower filter
202 /// but integrates it directly into the service to avoid complex Send/Sync
203 /// constraints with async runtimes.
204 ///
205 /// # Arguments
206 /// * `enable` - Whether to enable resource validation
207 ///
208 /// # Returns
209 /// Self with validation setting applied
210 pub fn with_resource_validation(mut self, enable: bool) -> Self {
211 self.enable_resource_validation = enable;
212 self
213 }
214
215 /// Validates a P2M request according to resource requirements.
216 ///
217 /// Applies the same validation rules as the ResourceValidator:
218 /// - `RemoteEnroll`: Validates both process and stream resources
219 /// - `LocalEnroll`, `IoRequest`: Validates process resources only
220 /// - `IoReport`: Passes through without validation (grant ID is validated later)
221 ///
222 /// # Arguments
223 /// * `request` - The P2M request to validate
224 ///
225 /// # Returns
226 /// `Ok(())` if validation passes, `Err(TraceabilityError)` if validation fails
227 ///
228 /// # Errors
229 /// - `InvalidProcess`: When the process ID is not found or accessible
230 /// - `InvalidStream`: When socket addresses are malformed or incompatible
231 fn validate_request(request: &P2mRequest) -> Result<&P2mRequest, TraceabilityError> {
232 // Use the same validation logic as the ResourceValidator
233 match request {
234 P2mRequest::RemoteEnroll { pid, local_socket, peer_socket, .. } => {
235 if ResourceValidator.is_valid_process(*pid) {
236 if ResourceValidator.is_valid_stream(local_socket, peer_socket) {
237 Ok(request)
238 } else {
239 Err(TraceabilityError::InvalidStream(
240 local_socket.clone(),
241 peer_socket.clone(),
242 ))
243 }
244 } else {
245 Err(TraceabilityError::InvalidProcess(*pid))
246 }
247 }
248 P2mRequest::LocalEnroll { pid, .. } | P2mRequest::IoRequest { pid, .. } => {
249 if ResourceValidator.is_valid_process(*pid) {
250 Ok(request)
251 } else {
252 Err(TraceabilityError::InvalidProcess(*pid))
253 }
254 }
255 P2mRequest::IoReport { .. } => Ok(request),
256 }
257 }
258}
259
260impl<S, P, C, M> Service<P2mRequest> for P2mApiService<S, P, C, M>
261where
262 S: Service<SequencerRequest, Response = SequencerResponse, Error = TraceabilityError>
263 + Clone
264 + Send
265 + 'static,
266 S::Future: Send,
267 P: Service<ProvenanceRequest, Response = ProvenanceResponse, Error = TraceabilityError>
268 + Clone
269 + Send
270 + NodeId
271 + 'static,
272 P::Future: Send,
273 C: Service<ComplianceRequest, Response = ComplianceResponse, Error = TraceabilityError>
274 + Clone
275 + Send
276 + 'static,
277 C::Future: Send,
278 M: Service<M2mRequest, Response = M2mResponse, Error = TraceabilityError>
279 + Clone
280 + Send
281 + 'static,
282 M::Future: Send,
283{
284 type Response = P2mResponse;
285 type Error = TraceabilityError;
286 type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
287
288 fn poll_ready(&mut self, _: &mut std::task::Context<'_>) -> Poll<Result<(), Self::Error>> {
289 Poll::Ready(Ok(()))
290 }
291
292 fn call(&mut self, request: P2mRequest) -> Self::Future {
293 let resource_map = self.resource_map.clone();
294 let flow_map = self.flow_map.clone();
295 let mut sequencer = self.sequencer.clone();
296 let mut provenance = self.provenance.clone();
297 let mut compliance = self.compliance.clone();
298 let mut m2m = self.m2m.clone();
299 let enable_validation = self.enable_resource_validation;
300
301 Box::pin(async move {
302 // Perform resource validation if enabled
303 if enable_validation {
304 Self::validate_request(&request)?;
305 }
306
307 match request {
308 P2mRequest::LocalEnroll { pid, fd, path } => {
309 #[cfg(feature = "trace2e_tracing")]
310 info!(
311 "[p2m-{}] LocalEnroll: pid: {}, fd: {}, path: {}",
312 provenance.node_id(),
313 pid,
314 fd,
315 path
316 );
317 resource_map
318 .insert((pid, fd), (Resource::new_process(pid), Resource::new_file(path)));
319 Ok(P2mResponse::Ack)
320 }
321 P2mRequest::RemoteEnroll { pid, fd, local_socket, peer_socket } => {
322 #[cfg(feature = "trace2e_tracing")]
323 info!(
324 "[p2m-{}] RemoteEnroll: pid: {}, fd: {}, local_socket: {}, peer_socket: {}",
325 provenance.node_id(),
326 pid,
327 fd,
328 local_socket,
329 peer_socket
330 );
331 resource_map.insert(
332 (pid, fd),
333 (
334 Resource::new_process(pid),
335 Resource::new_stream(local_socket, peer_socket),
336 ),
337 );
338 Ok(P2mResponse::Ack)
339 }
340 P2mRequest::IoRequest { pid, fd, output } => {
341 if let Some(resource) = resource_map.get(&(pid, fd)) {
342 let flow_id = match SystemTime::now().duration_since(SystemTime::UNIX_EPOCH)
343 {
344 Ok(n) => n.as_nanos(),
345 Err(_) => return Err(TraceabilityError::SystemTimeError),
346 };
347 let (source, destination) = if output {
348 (resource.0.to_owned(), resource.1.to_owned())
349 } else {
350 (resource.1.to_owned(), resource.0.to_owned())
351 };
352 #[cfg(feature = "trace2e_tracing")]
353 info!(
354 "[p2m-{}] IoRequest: source: {:?}, destination: {:?}",
355 provenance.node_id(),
356 source,
357 destination
358 );
359 match sequencer
360 .call(SequencerRequest::ReserveFlow {
361 source: source.clone(),
362 destination: destination.clone(),
363 })
364 .await
365 {
366 Ok(SequencerResponse::FlowReserved) => {
367 // If destination is a stream, query remote policy via m2m
368 // else if query local destination policy
369 // else return error
370
371 let source_policies = match provenance
372 .call(ProvenanceRequest::GetReferences(source.clone()))
373 .await?
374 {
375 ProvenanceResponse::Provenance(mut references) => {
376 #[cfg(feature = "trace2e_tracing")]
377 debug!(
378 "[p2m-{}] Aggregated resources: {:?}",
379 provenance.node_id(),
380 references
381 );
382 // Get local source policies
383 let mut source_policies = if let Some(local_references) =
384 references.remove(&provenance.node_id())
385 {
386 match compliance
387 .call(ComplianceRequest::GetPolicies(
388 local_references,
389 ))
390 .await?
391 {
392 ComplianceResponse::Policies(policies) => {
393 HashMap::from([(
394 provenance.node_id(),
395 policies,
396 )])
397 }
398 _ => {
399 return Err(
400 TraceabilityError::InternalTrace2eError,
401 );
402 }
403 }
404 } else {
405 HashMap::new()
406 };
407 // Get remote source policies
408 // Collect all futures without awaiting them
409 let mut tasks = Vec::new();
410 for (node_id, resources) in references {
411 let mut m2m_clone = m2m.clone();
412 let node_id_clone = node_id.clone();
413 let task = tokio::spawn(async move {
414 let result =
415 match m2m_clone.ready().await {
416 Ok(ready_service) => ready_service
417 .call(M2mRequest::GetSourceCompliance {
418 authority_ip: node_id_clone.clone(),
419 resources,
420 })
421 .await,
422 Err(e) => Err(e),
423 };
424 (node_id_clone, result)
425 });
426 tasks.push(task);
427 }
428
429 // Await all requests concurrently
430 for task in tasks {
431 let (node_id, result) = task.await.map_err(|_| {
432 TraceabilityError::InternalTrace2eError
433 })?;
434 match result? {
435 M2mResponse::SourceCompliance(policies) => {
436 source_policies.insert(node_id, policies);
437 }
438 _ => {
439 return Err(
440 TraceabilityError::InternalTrace2eError,
441 );
442 }
443 }
444 }
445 source_policies
446 }
447 _ => {
448 return Err(TraceabilityError::InternalTrace2eError);
449 }
450 };
451 match compliance
452 .call(ComplianceRequest::EvalPolicies {
453 source_policies,
454 destination: destination.clone(),
455 })
456 .await
457 {
458 Ok(ComplianceResponse::Grant) => {
459 flow_map.insert(flow_id, (source, destination));
460 Ok(P2mResponse::Grant(flow_id))
461 }
462 Err(TraceabilityError::DirectPolicyViolation) => {
463 // Release the flow if the policy is violated
464 #[cfg(feature = "trace2e_tracing")]
465 info!(
466 "[p2m-{}] Release flow: {:?} as it is not compliant",
467 provenance.node_id(),
468 flow_id
469 );
470 sequencer
471 .call(SequencerRequest::ReleaseFlow { destination })
472 .await?;
473 Err(TraceabilityError::DirectPolicyViolation)
474 }
475 _ => Err(TraceabilityError::InternalTrace2eError),
476 }
477 }
478 _ => Err(TraceabilityError::InternalTrace2eError),
479 }
480 } else {
481 Err(TraceabilityError::UndeclaredResource(pid, fd))
482 }
483 }
484 P2mRequest::IoReport { grant_id, .. } => {
485 if let Some((_, (source, destination))) = flow_map.remove(&grant_id) {
486 #[cfg(feature = "trace2e_tracing")]
487 info!(
488 "[p2m-{}] IoReport: source: {:?}, destination: {:?}",
489 provenance.node_id(),
490 source,
491 destination
492 );
493 if let Some(remote_stream) = destination.is_stream() {
494 match provenance
495 .call(ProvenanceRequest::GetReferences(source.clone()))
496 .await?
497 {
498 ProvenanceResponse::Provenance(references) => {
499 m2m.ready()
500 .await?
501 .call(M2mRequest::UpdateProvenance {
502 source_prov: references,
503 destination: remote_stream,
504 })
505 .await?;
506 }
507 _ => return Err(TraceabilityError::InternalTrace2eError),
508 };
509 }
510
511 provenance
512 .call(ProvenanceRequest::UpdateProvenance {
513 source,
514 destination: destination.clone(),
515 })
516 .await?;
517
518 sequencer.call(SequencerRequest::ReleaseFlow { destination }).await?;
519 Ok(P2mResponse::Ack)
520 } else {
521 Err(TraceabilityError::NotFoundFlow(grant_id))
522 }
523 }
524 }
525 })
526 }
527}
528
529#[cfg(test)]
530mod tests {
531 use tower::Service;
532
533 use super::*;
534 use crate::{
535 traceability::core::{
536 compliance::ComplianceService, provenance::ProvenanceService,
537 sequencer::SequencerService,
538 },
539 transport::nop::M2mNop,
540 };
541
542 #[tokio::test]
543 async fn unit_trace2e_service_request_response() {
544 #[cfg(feature = "trace2e_tracing")]
545 crate::trace2e_tracing::init();
546 let mut p2m_service = P2mApiService::new(
547 SequencerService::default(),
548 ProvenanceService::default(),
549 ComplianceService::default(),
550 M2mNop,
551 );
552
553 assert_eq!(
554 p2m_service
555 .call(P2mRequest::LocalEnroll { pid: 1, fd: 3, path: "/tmp/test.txt".to_string() })
556 .await
557 .unwrap(),
558 P2mResponse::Ack
559 );
560 assert_eq!(
561 p2m_service
562 .call(P2mRequest::RemoteEnroll {
563 pid: 1,
564 fd: 3,
565 local_socket: "127.0.0.1:8080".to_string(),
566 peer_socket: "127.0.0.1:8081".to_string()
567 })
568 .await
569 .unwrap(),
570 P2mResponse::Ack
571 );
572
573 let P2mResponse::Grant(flow_id) =
574 p2m_service.call(P2mRequest::IoRequest { pid: 1, fd: 3, output: true }).await.unwrap()
575 else {
576 panic!("Expected P2mResponse::Grant");
577 };
578 assert_eq!(
579 p2m_service
580 .call(P2mRequest::IoReport { pid: 1, fd: 3, grant_id: flow_id, result: true })
581 .await
582 .unwrap(),
583 P2mResponse::Ack
584 );
585
586 let P2mResponse::Grant(flow_id) =
587 p2m_service.call(P2mRequest::IoRequest { pid: 1, fd: 3, output: false }).await.unwrap()
588 else {
589 panic!("Expected P2mResponse::Grant");
590 };
591 assert_eq!(
592 p2m_service
593 .call(P2mRequest::IoReport { pid: 1, fd: 3, grant_id: flow_id, result: true })
594 .await
595 .unwrap(),
596 P2mResponse::Ack
597 );
598 }
599
600 #[tokio::test]
601 async fn unit_trace2e_service_validated_resources() {
602 #[cfg(feature = "trace2e_tracing")]
603 crate::trace2e_tracing::init();
604 let mut p2m_service = P2mApiService::new(
605 SequencerService::default(),
606 ProvenanceService::default(),
607 ComplianceService::default(),
608 M2mNop,
609 )
610 .with_resource_validation(true);
611
612 // Test with invalid process
613 // This request is supposed to be filtered out by the validator
614 assert_eq!(
615 p2m_service
616 .call(P2mRequest::LocalEnroll { pid: 0, fd: 3, path: "/tmp/test.txt".to_string() })
617 .await
618 .unwrap_err()
619 .to_string(),
620 "Traceability error, process not found (pid: 0)"
621 );
622
623 // Test successful process instantiation with validation
624 assert_eq!(
625 p2m_service
626 .call(P2mRequest::LocalEnroll {
627 pid: std::process::id() as i32,
628 fd: 3,
629 path: "/tmp/test.txt".to_string()
630 })
631 .await
632 .unwrap(),
633 P2mResponse::Ack
634 );
635 }
636
637 #[tokio::test]
638 async fn unit_trace2e_service_io_invalid_request() {
639 #[cfg(feature = "trace2e_tracing")]
640 crate::trace2e_tracing::init();
641 let mut p2m_service = P2mApiService::new(
642 SequencerService::default(),
643 ProvenanceService::default(),
644 ComplianceService::default(),
645 M2mNop,
646 )
647 .with_resource_validation(true);
648
649 // Neither process nor fd are enrolled
650 assert_eq!(
651 p2m_service
652 .call(P2mRequest::IoRequest { pid: std::process::id() as i32, fd: 3, output: true })
653 .await
654 .unwrap_err()
655 .to_string(),
656 format!(
657 "Traceability error, undeclared resource (pid: {}, fd: 3)",
658 std::process::id() as i32
659 )
660 );
661
662 p2m_service
663 .call(P2mRequest::LocalEnroll {
664 pid: std::process::id() as i32,
665 fd: 4,
666 path: "/tmp/test.txt".to_string(),
667 })
668 .await
669 .unwrap();
670
671 // Only process is enrolled
672 assert_eq!(
673 p2m_service
674 .call(P2mRequest::IoRequest { pid: std::process::id() as i32, fd: 3, output: true })
675 .await
676 .unwrap_err()
677 .to_string(),
678 format!(
679 "Traceability error, undeclared resource (pid: {}, fd: 3)",
680 std::process::id() as i32
681 )
682 );
683 }
684
685 #[tokio::test]
686 async fn unit_trace2e_service_io_invalid_report() {
687 #[cfg(feature = "trace2e_tracing")]
688 crate::trace2e_tracing::init();
689 let mut p2m_service = P2mApiService::new(
690 SequencerService::default(),
691 ProvenanceService::default(),
692 ComplianceService::default(),
693 M2mNop,
694 )
695 .with_resource_validation(true);
696
697 // Invalid grant id
698 assert_eq!(
699 p2m_service
700 .call(P2mRequest::IoReport { pid: 1, fd: 3, grant_id: 0, result: true })
701 .await
702 .unwrap_err()
703 .to_string(),
704 "Traceability error, flow not found (id: 0)"
705 );
706 }
707
708 #[tokio::test]
709 async fn unit_trace2e_service_integrated_validation() {
710 #[cfg(feature = "trace2e_tracing")]
711 crate::trace2e_tracing::init();
712
713 // Test P2M service with integrated validation enabled
714 let mut p2m_service_with_validation = P2mApiService::new(
715 SequencerService::default(),
716 ProvenanceService::default(),
717 ComplianceService::default(),
718 M2mNop,
719 )
720 .with_resource_validation(true);
721
722 // Test P2M service with validation disabled
723 let mut p2m_service_without_validation = P2mApiService::new(
724 SequencerService::default(),
725 ProvenanceService::default(),
726 ComplianceService::default(),
727 M2mNop,
728 )
729 .with_resource_validation(false);
730
731 // Test invalid process - should fail with validation enabled
732 assert_eq!(
733 p2m_service_with_validation
734 .call(P2mRequest::LocalEnroll { pid: 0, fd: 3, path: "/tmp/test.txt".to_string() })
735 .await
736 .unwrap_err()
737 .to_string(),
738 "Traceability error, process not found (pid: 0)"
739 );
740
741 // Test invalid process - should succeed with validation disabled
742 assert_eq!(
743 p2m_service_without_validation
744 .call(P2mRequest::LocalEnroll { pid: 0, fd: 3, path: "/tmp/test.txt".to_string() })
745 .await
746 .unwrap(),
747 P2mResponse::Ack
748 );
749
750 // Test valid process - should succeed with validation enabled
751 assert_eq!(
752 p2m_service_with_validation
753 .call(P2mRequest::LocalEnroll {
754 pid: std::process::id() as i32,
755 fd: 3,
756 path: "/tmp/test.txt".to_string()
757 })
758 .await
759 .unwrap(),
760 P2mResponse::Ack
761 );
762
763 // Test valid process - should succeed with validation disabled
764 assert_eq!(
765 p2m_service_without_validation
766 .call(P2mRequest::LocalEnroll {
767 pid: std::process::id() as i32,
768 fd: 3,
769 path: "/tmp/test.txt".to_string()
770 })
771 .await
772 .unwrap(),
773 P2mResponse::Ack
774 );
775 }
776}