trace2e_core/traceability/api/p2m.rs
1//! Process-to-Middleware (P2M) API service implementation.
2//!
3//! This module provides the core service implementation for handling requests from application
4//! processes that need traceability tracking for their I/O operations. The P2M API is the
5//! primary interface through which applications integrate with the trace2e system.
6//!
7//! ## Service Architecture
8//!
9//! The `P2mApiService` acts as the central coordinator between four key services:
10//! - **Sequencer Service**: Manages flow ordering and resource reservations
11//! - **Provenance Service**: Tracks data provenance
12//! - **Compliance Service**: Enforces policies and authorization decisions
13//! - **M2M Client**: Communicates with remote middleware for distributed flows
14//!
15//! ## Resource Management
16//!
17//! The service maintains two primary data structures:
18//! - **Resource Map**: Associates process/file descriptor pairs with source/destination resources
19//! - **Flow Map**: Tracks active flows by grant ID for operation completion reporting
20//!
21//! ## Operation Workflow
22//!
23//! 1. **Enrollment**: Processes register their files and streams before use
24//! 2. **Authorization**: Processes request permission for specific I/O operations
25//! 3. **Execution**: Middleware evaluates policies and grants/denies access
26//! 4. **Reporting**: Processes report completion status for audit trails
27//!
28//! ## Cross-Node Coordination
29//!
30//! For distributed flows involving remote resources, the service coordinates with
31//! remote middleware instances via the M2M API to ensure consistent policy
32//! enforcement and provenance tracking across the network.
33
34use std::{
35 collections::HashSet, future::Future, pin::Pin, sync::Arc, task::Poll, time::SystemTime,
36};
37
38use dashmap::DashMap;
39use tower::{Service, ServiceExt};
40use tracing::{debug, info};
41
42use crate::traceability::{
43 api::types::{
44 ComplianceRequest, ComplianceResponse, M2mRequest, M2mResponse, P2mRequest, P2mResponse,
45 ProvenanceRequest, ProvenanceResponse, SequencerRequest, SequencerResponse,
46 },
47 error::TraceabilityError,
48 infrastructure::{
49 naming::{NodeId, Resource},
50 validation::ResourceValidator,
51 },
52};
53
54/// Maps (process_id, file_descriptor) to (source_resource, destination_resource) pairs
55type ResourceMap = DashMap<(i32, i32), (Resource, Resource)>;
56/// Maps flow_id to (source_resource, destination_resource) pairs for active flows
57type FlowMap = DashMap<u128, (Resource, Resource)>;
58
59/// P2M (Process-to-Middleware) API Service.
60///
61/// Central orchestrator for process-initiated traceability operations. This service
62/// manages the complete lifecycle of tracked I/O operations from initial resource
63/// enrollment through final completion reporting.
64///
65/// ## Core Responsibilities
66///
67/// **Resource Enrollment**: Maintains registry of process file descriptors and their
68/// associated resources (files or network streams) for traceability tracking.
69///
70/// **Flow Authorization**: Coordinates with compliance and sequencer services to
71/// evaluate whether requested I/O operations should be permitted based on current policies.
72///
73/// **Distributed Coordination**: Communicates with remote middleware instances for
74/// cross-node flows, ensuring consistent policy enforcement across the network.
75///
76/// **Provenance Tracking**: Updates provenance records following successful operations
77/// to maintain complete audit trails for compliance and governance.
78///
79/// ## Concurrency and State Management
80///
81/// Uses concurrent data structures (`DashMap`) to handle multiple simultaneous requests
82/// from different processes while maintaining consistency. Resource and flow maps are
83/// shared across service instances using `Arc` for efficient cloning.
84///
85/// ## Generic Type Parameters
86///
87/// - `S`: Sequencer service for flow coordination and resource reservations
88/// - `P`: Provenance service for provenance tracking
89/// - `C`: Compliance service for policy evaluation and authorization decisions
90/// - `M`: M2M client service for communication with remote middleware instances
91#[derive(Debug, Clone)]
92pub struct P2mApiService<S, P, C, M> {
93 /// Maps (process_id, file_descriptor) to (source_resource, destination_resource) pairs
94 resource_map: Arc<ResourceMap>,
95 /// Maps flow_id to (source_resource, destination_resource) pairs for active flows
96 flow_map: Arc<FlowMap>,
97 /// Service for managing flows sequencing
98 sequencer: S,
99 /// Service for tracking resources provenance
100 provenance: P,
101 /// Service for policy management and compliance checking
102 compliance: C,
103 /// Client service for Middleware-to-Middleware communication
104 m2m: M,
105 /// Whether to perform resource validation on incoming requests
106 enable_resource_validation: bool,
107}
108
109impl<S, P, C, M> P2mApiService<S, P, C, M> {
110 /// Creates a new P2M API service with the provided component services.
111 ///
112 /// Initializes empty resource and flow maps and stores references to the
113 /// core services needed for traceability operations. The service is ready
114 /// to handle process requests immediately after construction.
115 ///
116 /// # Arguments
117 /// * `sequencer` - Service for flow coordination and resource reservations
118 /// * `provenance` - Service for provenance tracking
119 /// * `compliance` - Service for policy evaluation and authorization decisions
120 /// * `m2m` - Client for communication with remote middleware instances
121 pub fn new(sequencer: S, provenance: P, compliance: C, m2m: M) -> Self {
122 Self {
123 resource_map: Arc::new(ResourceMap::new()),
124 flow_map: Arc::new(FlowMap::new()),
125 sequencer,
126 provenance,
127 compliance,
128 m2m,
129 enable_resource_validation: false,
130 }
131 }
132
133 /// Pre-enrolls resources for testing and simulation purposes.
134 ///
135 /// Creates mock enrollments for the specified number of processes, files, and streams
136 /// to support testing scenarios without requiring actual process interactions.
137 /// Should only be used in test environments or for system benchmarking.
138 ///
139 /// # Arguments
140 /// * `process_count` - Number of mock processes to enroll
141 /// * `per_process_file_count` - Number of files to enroll per process
142 /// * `per_process_stream_count` - Number of streams to enroll per process
143 ///
144 /// # Returns
145 /// The service instance with pre-enrolled mock resources
146 #[cfg(test)]
147 pub fn with_enrolled_resources(
148 self,
149 process_count: u32,
150 per_process_file_count: u32,
151 per_process_stream_count: u32,
152 ) -> Self {
153 // Pre-calculate all entries to avoid repeated allocations during insertion
154 let file_entries: Vec<_> = (0..process_count as i32)
155 .flat_map(|process_id| {
156 (3..(per_process_file_count + 3) as i32).map(move |file_id| {
157 (
158 (process_id, file_id),
159 (
160 Resource::new_process_mock(process_id),
161 Resource::new_file(format!(
162 "/file_{}",
163 (process_id + file_id) % process_count as i32
164 )),
165 ),
166 )
167 })
168 })
169 .collect();
170 let stream_entries: Vec<_> = (0..process_count as i32)
171 .flat_map(|process_id| {
172 ((per_process_file_count + 3) as i32
173 ..(per_process_stream_count + per_process_file_count + 3) as i32)
174 .map(move |stream_id| {
175 (
176 (process_id, stream_id),
177 (
178 Resource::new_process_mock(process_id),
179 Resource::new_stream(
180 format!("127.0.0.1:{stream_id}",),
181 format!("127.0.0.2:{stream_id}",),
182 ),
183 ),
184 )
185 })
186 })
187 .collect();
188
189 // Batch insert all entries at once using DashMap's concurrent insert capabilities
190 for (key, value) in file_entries.into_iter().chain(stream_entries.into_iter()) {
191 self.resource_map.insert(key, value);
192 }
193 self
194 }
195
196 /// Enables or disables resource validation for incoming P2M requests.
197 ///
198 /// When validation is enabled, all incoming requests are validated for:
199 /// - Valid process IDs (must correspond to running processes)
200 /// - Valid stream addresses (must be well-formed and compatible)
201 ///
202 /// This method uses the same ResourceValidator logic as the Tower filter
203 /// but integrates it directly into the service to avoid complex Send/Sync
204 /// constraints with async runtimes.
205 ///
206 /// # Arguments
207 /// * `enable` - Whether to enable resource validation
208 ///
209 /// # Returns
210 /// Self with validation setting applied
211 pub fn with_resource_validation(mut self, enable: bool) -> Self {
212 self.enable_resource_validation = enable;
213 self
214 }
215
216 /// Validates a P2M request according to resource requirements.
217 ///
218 /// Applies the same validation rules as the ResourceValidator:
219 /// - `RemoteEnroll`: Validates both process and stream resources
220 /// - `LocalEnroll`, `IoRequest`: Validates process resources only
221 /// - `IoReport`: Passes through without validation (grant ID is validated later)
222 ///
223 /// # Arguments
224 /// * `request` - The P2M request to validate
225 ///
226 /// # Returns
227 /// `Ok(())` if validation passes, `Err(TraceabilityError)` if validation fails
228 ///
229 /// # Errors
230 /// - `InvalidProcess`: When the process ID is not found or accessible
231 /// - `InvalidStream`: When socket addresses are malformed or incompatible
232 fn validate_request(request: &P2mRequest) -> Result<&P2mRequest, TraceabilityError> {
233 // Use the same validation logic as the ResourceValidator
234 match request {
235 P2mRequest::RemoteEnroll { pid, local_socket, peer_socket, .. } => {
236 if ResourceValidator.is_valid_process(*pid) {
237 if ResourceValidator.is_valid_stream(local_socket, peer_socket) {
238 Ok(request)
239 } else {
240 Err(TraceabilityError::InvalidStream(
241 local_socket.clone(),
242 peer_socket.clone(),
243 ))
244 }
245 } else {
246 Err(TraceabilityError::InvalidProcess(*pid))
247 }
248 }
249 P2mRequest::LocalEnroll { pid, .. } | P2mRequest::IoRequest { pid, .. } => {
250 if ResourceValidator.is_valid_process(*pid) {
251 Ok(request)
252 } else {
253 Err(TraceabilityError::InvalidProcess(*pid))
254 }
255 }
256 P2mRequest::IoReport { .. } => Ok(request),
257 }
258 }
259
260 fn flow_id() -> u128 {
261 SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_nanos()
262 }
263}
264
265impl<S, P, C, M> Service<P2mRequest> for P2mApiService<S, P, C, M>
266where
267 S: Service<SequencerRequest, Response = SequencerResponse, Error = TraceabilityError>
268 + Clone
269 + Send
270 + 'static,
271 S::Future: Send,
272 P: Service<ProvenanceRequest, Response = ProvenanceResponse, Error = TraceabilityError>
273 + Clone
274 + Send
275 + NodeId
276 + 'static,
277 P::Future: Send,
278 C: Service<ComplianceRequest, Response = ComplianceResponse, Error = TraceabilityError>
279 + Clone
280 + Send
281 + 'static,
282 C::Future: Send,
283 M: Service<M2mRequest, Response = M2mResponse, Error = TraceabilityError>
284 + Clone
285 + Send
286 + 'static,
287 M::Future: Send,
288{
289 type Response = P2mResponse;
290 type Error = TraceabilityError;
291 type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
292
293 fn poll_ready(&mut self, _: &mut std::task::Context<'_>) -> Poll<Result<(), Self::Error>> {
294 Poll::Ready(Ok(()))
295 }
296
297 fn call(&mut self, request: P2mRequest) -> Self::Future {
298 let resource_map = self.resource_map.clone();
299 let flow_map = self.flow_map.clone();
300 let mut sequencer = self.sequencer.clone();
301 let mut provenance = self.provenance.clone();
302 let mut compliance = self.compliance.clone();
303 let mut m2m = self.m2m.clone();
304 let enable_validation = self.enable_resource_validation;
305
306 Box::pin(async move {
307 // Perform resource validation if enabled
308 if enable_validation {
309 Self::validate_request(&request)?;
310 }
311
312 match request {
313 P2mRequest::LocalEnroll { pid, fd, path } => {
314 info!(
315 node_id = %provenance.node_id(),
316 pid = %pid,
317 fd = %fd,
318 path = %path,
319 "[p2m] LocalEnroll"
320 );
321 resource_map
322 .insert((pid, fd), (Resource::new_process(pid), Resource::new_file(path)));
323 Ok(P2mResponse::Ack)
324 }
325 P2mRequest::RemoteEnroll { pid, fd, local_socket, peer_socket } => {
326 info!(
327 node_id = %provenance.node_id(),
328 pid = %pid,
329 fd = %fd,
330 local_socket = %local_socket,
331 peer_socket = %peer_socket,
332 "[p2m] RemoteEnroll"
333 );
334 resource_map.insert(
335 (pid, fd),
336 (
337 Resource::new_process(pid),
338 Resource::new_stream(local_socket, peer_socket),
339 ),
340 );
341 Ok(P2mResponse::Ack)
342 }
343 P2mRequest::IoRequest { pid, fd, output } => {
344 if let Some(resource) = resource_map.get(&(pid, fd)) {
345 let (source, destination) = if output {
346 (resource.0.to_owned(), resource.1.to_owned())
347 } else {
348 (resource.1.to_owned(), resource.0.to_owned())
349 };
350 info!(
351 node_id = %provenance.node_id(),
352 source = %source,
353 destination = %destination,
354 "[p2m] IoRequest"
355 );
356 match sequencer
357 .call(SequencerRequest::ReserveFlow {
358 source: source.clone(),
359 destination: destination.clone(),
360 })
361 .await
362 {
363 Ok(SequencerResponse::FlowReserved) => {
364 let localized_destination =
365 destination.clone().into_localized(provenance.node_id());
366 let destination_policy = if localized_destination
367 .resource()
368 .is_stream()
369 {
370 debug!(
371 node_id = %provenance.node_id(),
372 destination = %localized_destination,
373 "[p2m] Querying destination policy for remote stream"
374 );
375 match m2m
376 .ready()
377 .await?
378 .call(M2mRequest::GetDestinationPolicy(
379 localized_destination.clone(),
380 ))
381 .await
382 {
383 Ok(M2mResponse::DestinationPolicy(policy)) => Some(policy),
384 _ => None, // anyway, errors are handled later, but this may be improved
385 }
386 } else {
387 None
388 };
389 let flow_id = match provenance
390 .call(ProvenanceRequest::GetReferences(source.clone()))
391 .await
392 {
393 Ok(ProvenanceResponse::Provenance(references)) => {
394 let (local_references, remote_references): (
395 HashSet<_>,
396 HashSet<_>,
397 ) = references
398 .into_iter()
399 .partition(|r| *r.node_id() == provenance.node_id());
400 match compliance
401 .call(ComplianceRequest::EvalCompliance {
402 sources: local_references
403 .iter()
404 .map(|r| r.resource().to_owned())
405 .collect(),
406 destination: localized_destination.clone(),
407 destination_policy: destination_policy.clone(),
408 })
409 .await
410 {
411 Ok(ComplianceResponse::Grant) => {
412 // Local compliance check passed. If we have no remote references, we can
413 // grant the flow, otherwise we need to check the compliance of the remote nodes.
414 // Destination policy is required for remote sources compliance checking.
415 if remote_references.is_empty() {
416 debug!(
417 node_id = %provenance.node_id(),
418 "[p2m] Local compliance check passed, granting flow"
419 );
420 Ok(Self::flow_id())
421 } else {
422 // It the destination is not a stream, so it is a local resource, we can get the policy from the compliance service
423 // This could have been done earlier, but we do it here, to make this call only when it is really needed.
424 let destination_policy = if let Some(policy) =
425 destination_policy
426 {
427 policy
428 } else if !destination.is_stream() {
429 match compliance.call(ComplianceRequest::GetPolicy(destination.clone())).await {
430 Ok(ComplianceResponse::Policy(policy)) => policy,
431 _ => return Err(TraceabilityError::InternalTrace2eError),
432 }
433 } else {
434 return Err(
435 TraceabilityError::InternalTrace2eError,
436 );
437 };
438 debug!(
439 node_id = %provenance.node_id(),
440 "[p2m] Querying remote sources compliance"
441 );
442 match m2m
443 .ready()
444 .await?
445 .call(M2mRequest::CheckSourceCompliance {
446 sources: remote_references,
447 destination: (
448 localized_destination,
449 destination_policy,
450 ),
451 })
452 .await
453 {
454 Ok(M2mResponse::Ack) => {
455 // Remote sources compliance check passed
456 // Flow can be granted, return the flow id
457 Ok(Self::flow_id())
458 }
459 Err(e) => Err(e),
460 _ => Err(
461 TraceabilityError::InternalTrace2eError,
462 ),
463 }
464 }
465 }
466 Err(e) => Err(e),
467 _ => Err(TraceabilityError::InternalTrace2eError),
468 }
469 }
470 Err(e) => Err(e),
471 _ => Err(TraceabilityError::InternalTrace2eError),
472 };
473 match flow_id {
474 Ok(flow_id) => {
475 // Compliance check passed, flow can be granted, return the flow id
476 flow_map.insert(flow_id, (source, destination));
477 Ok(P2mResponse::Grant(flow_id))
478 }
479 Err(e) => {
480 debug!(
481 node_id = %provenance.node_id(),
482 error = ?e,
483 "[p2m] Compliance check failed, releasing flow"
484 );
485 // release the flow, and then forward the error
486 sequencer
487 .call(SequencerRequest::ReleaseFlow { destination })
488 .await?;
489 Err(e)
490 }
491 }
492 }
493 _ => Err(TraceabilityError::InternalTrace2eError),
494 }
495 } else {
496 Err(TraceabilityError::UndeclaredResource(pid, fd))
497 }
498 }
499 P2mRequest::IoReport { grant_id, .. } => {
500 if let Some((_, (source, destination))) = flow_map.remove(&grant_id) {
501 info!(
502 node_id = %provenance.node_id(),
503 source = %source,
504 destination = %destination,
505 "[p2m] IoReport"
506 );
507 if let Some(remote_stream) = destination.try_into_localized_peer_stream() {
508 match provenance
509 .call(ProvenanceRequest::GetReferences(source.clone()))
510 .await?
511 {
512 ProvenanceResponse::Provenance(references) => {
513 m2m.ready()
514 .await?
515 .call(M2mRequest::UpdateProvenance {
516 source_prov: references,
517 destination: remote_stream,
518 })
519 .await?;
520 }
521 _ => return Err(TraceabilityError::InternalTrace2eError),
522 };
523 }
524
525 provenance
526 .call(ProvenanceRequest::UpdateProvenance {
527 source,
528 destination: destination.clone(),
529 })
530 .await?;
531
532 sequencer.call(SequencerRequest::ReleaseFlow { destination }).await?;
533 Ok(P2mResponse::Ack)
534 } else {
535 Err(TraceabilityError::NotFoundFlow(grant_id))
536 }
537 }
538 }
539 })
540 }
541}
542
543#[cfg(test)]
544mod tests {
545 use tower::Service;
546
547 use super::*;
548 use crate::{
549 traceability::services::{
550 compliance::ComplianceService, provenance::ProvenanceService,
551 sequencer::SequencerService,
552 },
553 transport::nop::M2mNop,
554 };
555
556 #[tokio::test]
557 async fn unit_trace2e_service_request_response() {
558 crate::trace2e_tracing::init();
559 let mut p2m_service = P2mApiService::new(
560 SequencerService::default(),
561 ProvenanceService::default(),
562 ComplianceService::default(),
563 M2mNop,
564 );
565
566 assert_eq!(
567 p2m_service
568 .call(P2mRequest::LocalEnroll { pid: 1, fd: 3, path: "/tmp/test.txt".to_string() })
569 .await
570 .unwrap(),
571 P2mResponse::Ack
572 );
573 assert_eq!(
574 p2m_service
575 .call(P2mRequest::RemoteEnroll {
576 pid: 1,
577 fd: 3,
578 local_socket: "127.0.0.1:8080".to_string(),
579 peer_socket: "127.0.0.1:8081".to_string()
580 })
581 .await
582 .unwrap(),
583 P2mResponse::Ack
584 );
585
586 let P2mResponse::Grant(flow_id) =
587 p2m_service.call(P2mRequest::IoRequest { pid: 1, fd: 3, output: true }).await.unwrap()
588 else {
589 panic!("Expected P2mResponse::Grant");
590 };
591 assert_eq!(
592 p2m_service
593 .call(P2mRequest::IoReport { pid: 1, fd: 3, grant_id: flow_id, result: true })
594 .await
595 .unwrap(),
596 P2mResponse::Ack
597 );
598
599 let P2mResponse::Grant(flow_id) =
600 p2m_service.call(P2mRequest::IoRequest { pid: 1, fd: 3, output: false }).await.unwrap()
601 else {
602 panic!("Expected P2mResponse::Grant");
603 };
604 assert_eq!(
605 p2m_service
606 .call(P2mRequest::IoReport { pid: 1, fd: 3, grant_id: flow_id, result: true })
607 .await
608 .unwrap(),
609 P2mResponse::Ack
610 );
611 }
612
613 #[tokio::test]
614 async fn unit_trace2e_service_validated_resources() {
615 crate::trace2e_tracing::init();
616 let mut p2m_service = P2mApiService::new(
617 SequencerService::default(),
618 ProvenanceService::default(),
619 ComplianceService::default(),
620 M2mNop,
621 )
622 .with_resource_validation(true);
623
624 // Test with invalid process
625 // This request is supposed to be filtered out by the validator
626 assert_eq!(
627 p2m_service
628 .call(P2mRequest::LocalEnroll { pid: 0, fd: 3, path: "/tmp/test.txt".to_string() })
629 .await
630 .unwrap_err()
631 .to_string(),
632 "Traceability error, process not found (pid: 0)"
633 );
634
635 // Test successful process instantiation with validation
636 assert_eq!(
637 p2m_service
638 .call(P2mRequest::LocalEnroll {
639 pid: std::process::id() as i32,
640 fd: 3,
641 path: "/tmp/test.txt".to_string()
642 })
643 .await
644 .unwrap(),
645 P2mResponse::Ack
646 );
647 }
648
649 #[tokio::test]
650 async fn unit_trace2e_service_io_invalid_request() {
651 crate::trace2e_tracing::init();
652 let mut p2m_service = P2mApiService::new(
653 SequencerService::default(),
654 ProvenanceService::default(),
655 ComplianceService::default(),
656 M2mNop,
657 )
658 .with_resource_validation(true);
659
660 // Neither process nor fd are enrolled
661 assert_eq!(
662 p2m_service
663 .call(P2mRequest::IoRequest { pid: std::process::id() as i32, fd: 3, output: true })
664 .await
665 .unwrap_err()
666 .to_string(),
667 format!(
668 "Traceability error, undeclared resource (pid: {}, fd: 3)",
669 std::process::id() as i32
670 )
671 );
672
673 p2m_service
674 .call(P2mRequest::LocalEnroll {
675 pid: std::process::id() as i32,
676 fd: 4,
677 path: "/tmp/test.txt".to_string(),
678 })
679 .await
680 .unwrap();
681
682 // Only process is enrolled
683 assert_eq!(
684 p2m_service
685 .call(P2mRequest::IoRequest { pid: std::process::id() as i32, fd: 3, output: true })
686 .await
687 .unwrap_err()
688 .to_string(),
689 format!(
690 "Traceability error, undeclared resource (pid: {}, fd: 3)",
691 std::process::id() as i32
692 )
693 );
694 }
695
696 #[tokio::test]
697 async fn unit_trace2e_service_io_invalid_report() {
698 crate::trace2e_tracing::init();
699 let mut p2m_service = P2mApiService::new(
700 SequencerService::default(),
701 ProvenanceService::default(),
702 ComplianceService::default(),
703 M2mNop,
704 )
705 .with_resource_validation(true);
706
707 // Invalid grant id
708 assert_eq!(
709 p2m_service
710 .call(P2mRequest::IoReport { pid: 1, fd: 3, grant_id: 0, result: true })
711 .await
712 .unwrap_err()
713 .to_string(),
714 "Traceability error, flow not found (id: 0)"
715 );
716 }
717
718 #[tokio::test]
719 async fn unit_trace2e_service_integrated_validation() {
720 crate::trace2e_tracing::init();
721
722 // Test P2M service with integrated validation enabled
723 let mut p2m_service_with_validation = P2mApiService::new(
724 SequencerService::default(),
725 ProvenanceService::default(),
726 ComplianceService::default(),
727 M2mNop,
728 )
729 .with_resource_validation(true);
730
731 // Test P2M service with validation disabled
732 let mut p2m_service_without_validation = P2mApiService::new(
733 SequencerService::default(),
734 ProvenanceService::default(),
735 ComplianceService::default(),
736 M2mNop,
737 )
738 .with_resource_validation(false);
739
740 // Test invalid process - should fail with validation enabled
741 assert_eq!(
742 p2m_service_with_validation
743 .call(P2mRequest::LocalEnroll { pid: 0, fd: 3, path: "/tmp/test.txt".to_string() })
744 .await
745 .unwrap_err()
746 .to_string(),
747 "Traceability error, process not found (pid: 0)"
748 );
749
750 // Test invalid process - should succeed with validation disabled
751 assert_eq!(
752 p2m_service_without_validation
753 .call(P2mRequest::LocalEnroll { pid: 0, fd: 3, path: "/tmp/test.txt".to_string() })
754 .await
755 .unwrap(),
756 P2mResponse::Ack
757 );
758
759 // Test valid process - should succeed with validation enabled
760 assert_eq!(
761 p2m_service_with_validation
762 .call(P2mRequest::LocalEnroll {
763 pid: std::process::id() as i32,
764 fd: 3,
765 path: "/tmp/test.txt".to_string()
766 })
767 .await
768 .unwrap(),
769 P2mResponse::Ack
770 );
771
772 // Test valid process - should succeed with validation disabled
773 assert_eq!(
774 p2m_service_without_validation
775 .call(P2mRequest::LocalEnroll {
776 pid: std::process::id() as i32,
777 fd: 3,
778 path: "/tmp/test.txt".to_string()
779 })
780 .await
781 .unwrap(),
782 P2mResponse::Ack
783 );
784 }
785}