trace2e_core/traceability/api/p2m.rs
1//! Process-to-Middleware (P2M) API service implementation.
2//!
3//! This module provides the core service implementation for handling requests from application
4//! processes that need traceability tracking for their I/O operations. The P2M API is the
5//! primary interface through which applications integrate with the trace2e system.
6//!
7//! ## Service Architecture
8//!
9//! The `P2mApiService` acts as the central coordinator between four key services:
10//! - **Sequencer Service**: Manages flow ordering and resource reservations
11//! - **Provenance Service**: Tracks data provenance
12//! - **Compliance Service**: Enforces policies and authorization decisions
13//! - **M2M Client**: Communicates with remote middleware for distributed flows
14//!
15//! ## Resource Management
16//!
17//! The service maintains two primary data structures:
18//! - **Resource Map**: Associates process/file descriptor pairs with source/destination resources
19//! - **Flow Map**: Tracks active flows by grant ID for operation completion reporting
20//!
21//! ## Operation Workflow
22//!
23//! 1. **Enrollment**: Processes register their files and streams before use
24//! 2. **Authorization**: Processes request permission for specific I/O operations
25//! 3. **Execution**: Middleware evaluates policies and grants/denies access
26//! 4. **Reporting**: Processes report completion status for audit trails
27//!
28//! ## Cross-Node Coordination
29//!
30//! For distributed flows involving remote resources, the service coordinates with
31//! remote middleware instances via the M2M API to ensure consistent policy
32//! enforcement and provenance tracking across the network.
33
34use std::{
35 collections::HashSet, future::Future, pin::Pin, sync::Arc, task::Poll, time::SystemTime,
36};
37
38use dashmap::DashMap;
39use tower::{Service, ServiceExt};
40use tracing::{debug, info};
41
42use crate::traceability::{
43 api::types::{
44 ComplianceRequest, ComplianceResponse, M2mRequest, M2mResponse, P2mRequest, P2mResponse,
45 ProvenanceRequest, ProvenanceResponse, SequencerRequest, SequencerResponse,
46 },
47 error::TraceabilityError,
48 infrastructure::{
49 naming::{NodeId, Resource},
50 validation::ResourceValidator,
51 },
52};
53
54/// Maps (process_id, file_descriptor) to (source_resource, destination_resource) pairs
55type ResourceMap = DashMap<(i32, i32), (Resource, Resource)>;
56/// Maps flow_id to (source_resource, destination_resource) pairs for active flows
57type FlowMap = DashMap<u128, (Resource, Resource)>;
58
59/// P2M (Process-to-Middleware) API Service.
60///
61/// Central orchestrator for process-initiated traceability operations. This service
62/// manages the complete lifecycle of tracked I/O operations from initial resource
63/// enrollment through final completion reporting.
64///
65/// ## Core Responsibilities
66///
67/// **Resource Enrollment**: Maintains registry of process file descriptors and their
68/// associated resources (files or network streams) for traceability tracking.
69///
70/// **Flow Authorization**: Coordinates with compliance and sequencer services to
71/// evaluate whether requested I/O operations should be permitted based on current policies.
72///
73/// **Distributed Coordination**: Communicates with remote middleware instances for
74/// cross-node flows, ensuring consistent policy enforcement across the network.
75///
76/// **Provenance Tracking**: Updates provenance records following successful operations
77/// to maintain complete audit trails for compliance and governance.
78///
79/// ## Concurrency and State Management
80///
81/// Uses concurrent data structures (`DashMap`) to handle multiple simultaneous requests
82/// from different processes while maintaining consistency. Resource and flow maps are
83/// shared across service instances using `Arc` for efficient cloning.
84///
85/// ## Generic Type Parameters
86///
87/// - `S`: Sequencer service for flow coordination and resource reservations
88/// - `P`: Provenance service for provenance tracking
89/// - `C`: Compliance service for policy evaluation and authorization decisions
90/// - `M`: M2M client service for communication with remote middleware instances
91#[derive(Debug, Clone)]
92pub struct P2mApiService<S, P, C, M> {
93 /// Maps (process_id, file_descriptor) to (source_resource, destination_resource) pairs
94 resource_map: Arc<ResourceMap>,
95 /// Maps flow_id to (source_resource, destination_resource) pairs for active flows
96 flow_map: Arc<FlowMap>,
97 /// Service for managing flows sequencing
98 sequencer: S,
99 /// Service for tracking resources provenance
100 provenance: P,
101 /// Service for policy management and compliance checking
102 compliance: C,
103 /// Client service for Middleware-to-Middleware communication
104 m2m: M,
105 /// Whether to perform resource validation on incoming requests
106 enable_resource_validation: bool,
107}
108
109impl<S, P, C, M> P2mApiService<S, P, C, M> {
110 /// Creates a new P2M API service with the provided component services.
111 ///
112 /// Initializes empty resource and flow maps and stores references to the
113 /// core services needed for traceability operations. The service is ready
114 /// to handle process requests immediately after construction.
115 ///
116 /// # Arguments
117 /// * `sequencer` - Service for flow coordination and resource reservations
118 /// * `provenance` - Service for provenance tracking
119 /// * `compliance` - Service for policy evaluation and authorization decisions
120 /// * `m2m` - Client for communication with remote middleware instances
121 pub fn new(sequencer: S, provenance: P, compliance: C, m2m: M) -> Self {
122 Self {
123 resource_map: Arc::new(ResourceMap::new()),
124 flow_map: Arc::new(FlowMap::new()),
125 sequencer,
126 provenance,
127 compliance,
128 m2m,
129 enable_resource_validation: false,
130 }
131 }
132
133 /// Pre-enrolls resources for testing and simulation purposes.
134 ///
135 /// Creates mock enrollments for the specified number of processes, files, and streams
136 /// to support testing scenarios without requiring actual process interactions.
137 /// Should only be used in test environments or for system benchmarking.
138 ///
139 /// # Arguments
140 /// * `process_count` - Number of mock processes to enroll
141 /// * `per_process_file_count` - Number of files to enroll per process
142 /// * `per_process_stream_count` - Number of streams to enroll per process
143 ///
144 /// # Returns
145 /// The service instance with pre-enrolled mock resources
146 #[cfg(test)]
147 pub fn with_enrolled_resources(
148 self,
149 process_count: u32,
150 per_process_file_count: u32,
151 per_process_stream_count: u32,
152 ) -> Self {
153 // Pre-calculate all entries to avoid repeated allocations during insertion
154 let file_entries: Vec<_> = (0..process_count as i32)
155 .flat_map(|process_id| {
156 (3..(per_process_file_count + 3) as i32).map(move |file_id| {
157 (
158 (process_id, file_id),
159 (
160 Resource::new_process_mock(process_id),
161 Resource::new_file(format!(
162 "/file_{}",
163 (process_id + file_id) % process_count as i32
164 )),
165 ),
166 )
167 })
168 })
169 .collect();
170 let stream_entries: Vec<_> = (0..process_count as i32)
171 .flat_map(|process_id| {
172 ((per_process_file_count + 3) as i32
173 ..(per_process_stream_count + per_process_file_count + 3) as i32)
174 .map(move |stream_id| {
175 (
176 (process_id, stream_id),
177 (
178 Resource::new_process_mock(process_id),
179 Resource::new_stream(
180 format!("127.0.0.1:{stream_id}",),
181 format!("127.0.0.2:{stream_id}",),
182 ),
183 ),
184 )
185 })
186 })
187 .collect();
188
189 // Batch insert all entries at once using DashMap's concurrent insert capabilities
190 for (key, value) in file_entries.into_iter().chain(stream_entries.into_iter()) {
191 self.resource_map.insert(key, value);
192 }
193 self
194 }
195
196 /// Enables or disables resource validation for incoming P2M requests.
197 ///
198 /// When validation is enabled, all incoming requests are validated for:
199 /// - Valid process IDs (must correspond to running processes)
200 /// - Valid stream addresses (must be well-formed and compatible)
201 ///
202 /// This method uses the same ResourceValidator logic as the Tower filter
203 /// but integrates it directly into the service to avoid complex Send/Sync
204 /// constraints with async runtimes.
205 ///
206 /// # Arguments
207 /// * `enable` - Whether to enable resource validation
208 ///
209 /// # Returns
210 /// Self with validation setting applied
211 pub fn with_resource_validation(mut self, enable: bool) -> Self {
212 self.enable_resource_validation = enable;
213 self
214 }
215
216 /// Validates a P2M request according to resource requirements.
217 ///
218 /// Applies the same validation rules as the ResourceValidator:
219 /// - `RemoteEnroll`: Validates both process and stream resources
220 /// - `LocalEnroll`, `IoRequest`: Validates process resources only
221 /// - `IoReport`: Passes through without validation (grant ID is validated later)
222 ///
223 /// # Arguments
224 /// * `request` - The P2M request to validate
225 ///
226 /// # Returns
227 /// `Ok(())` if validation passes, `Err(TraceabilityError)` if validation fails
228 ///
229 /// # Errors
230 /// - `InvalidProcess`: When the process ID is not found or accessible
231 /// - `InvalidStream`: When socket addresses are malformed or incompatible
232 fn validate_request(request: &P2mRequest) -> Result<&P2mRequest, TraceabilityError> {
233 // Use the same validation logic as the ResourceValidator
234 match request {
235 P2mRequest::RemoteEnroll { pid, local_socket, peer_socket, .. } => {
236 if ResourceValidator.is_valid_process(*pid) {
237 if ResourceValidator.is_valid_stream(local_socket, peer_socket) {
238 Ok(request)
239 } else {
240 Err(TraceabilityError::InvalidStream(
241 local_socket.clone(),
242 peer_socket.clone(),
243 ))
244 }
245 } else {
246 Err(TraceabilityError::InvalidProcess(*pid))
247 }
248 }
249 P2mRequest::LocalEnroll { pid, .. } | P2mRequest::IoRequest { pid, .. } => {
250 if ResourceValidator.is_valid_process(*pid) {
251 Ok(request)
252 } else {
253 Err(TraceabilityError::InvalidProcess(*pid))
254 }
255 }
256 P2mRequest::IoReport { .. } => Ok(request),
257 }
258 }
259
260 fn flow_id() -> u128 {
261 SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_nanos()
262 }
263}
264
265impl<S, P, C, M> Service<P2mRequest> for P2mApiService<S, P, C, M>
266where
267 S: Service<SequencerRequest, Response = SequencerResponse, Error = TraceabilityError>
268 + Clone
269 + Send
270 + 'static,
271 S::Future: Send,
272 P: Service<ProvenanceRequest, Response = ProvenanceResponse, Error = TraceabilityError>
273 + Clone
274 + Send
275 + NodeId
276 + 'static,
277 P::Future: Send,
278 C: Service<ComplianceRequest, Response = ComplianceResponse, Error = TraceabilityError>
279 + Clone
280 + Send
281 + 'static,
282 C::Future: Send,
283 M: Service<M2mRequest, Response = M2mResponse, Error = TraceabilityError>
284 + Clone
285 + Send
286 + 'static,
287 M::Future: Send,
288{
289 type Response = P2mResponse;
290 type Error = TraceabilityError;
291 type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
292
293 fn poll_ready(&mut self, _: &mut std::task::Context<'_>) -> Poll<Result<(), Self::Error>> {
294 Poll::Ready(Ok(()))
295 }
296
297 fn call(&mut self, request: P2mRequest) -> Self::Future {
298 let resource_map = self.resource_map.clone();
299 let flow_map = self.flow_map.clone();
300 let mut sequencer = self.sequencer.clone();
301 let mut provenance = self.provenance.clone();
302 let mut compliance = self.compliance.clone();
303 let mut m2m = self.m2m.clone();
304 let enable_validation = self.enable_resource_validation;
305
306 Box::pin(async move {
307 // Perform resource validation if enabled
308 if enable_validation {
309 Self::validate_request(&request)?;
310 }
311
312 match request {
313 P2mRequest::LocalEnroll { pid, fd, path } => {
314 info!(
315 node_id = %provenance.node_id(),
316 pid = %pid,
317 fd = %fd,
318 path = %path,
319 "[p2m] LocalEnroll"
320 );
321 resource_map
322 .insert((pid, fd), (Resource::new_process(pid), Resource::new_file(path)));
323 Ok(P2mResponse::Ack)
324 }
325 P2mRequest::RemoteEnroll { pid, fd, local_socket, peer_socket } => {
326 info!(
327 node_id = %provenance.node_id(),
328 pid = %pid,
329 fd = %fd,
330 local_socket = %local_socket,
331 peer_socket = %peer_socket,
332 "[p2m] RemoteEnroll"
333 );
334 resource_map.insert(
335 (pid, fd),
336 (
337 Resource::new_process(pid),
338 Resource::new_stream(local_socket, peer_socket),
339 ),
340 );
341 Ok(P2mResponse::Ack)
342 }
343 P2mRequest::IoRequest { pid, fd, output } => {
344 if let Some(resource) = resource_map.get(&(pid, fd)) {
345 let (source, destination) = if output {
346 (resource.0.to_owned(), resource.1.to_owned())
347 } else {
348 (resource.1.to_owned(), resource.0.to_owned())
349 };
350 info!(
351 node_id = %provenance.node_id(),
352 source = %source,
353 destination = %destination,
354 "[p2m] IoRequest"
355 );
356 match sequencer
357 .call(SequencerRequest::ReserveFlow {
358 source: source.clone(),
359 destination: destination.clone(),
360 })
361 .await
362 {
363 Ok(SequencerResponse::FlowReserved) => {
364 let localized_destination =
365 destination.clone().into_localized(provenance.node_id());
366 let destination_policy = if localized_destination
367 .resource()
368 .is_stream()
369 {
370 debug!(
371 node_id = %provenance.node_id(),
372 destination = %localized_destination,
373 "[p2m] Querying destination policy for remote stream"
374 );
375 match m2m
376 .ready()
377 .await?
378 .call(M2mRequest::GetDestinationPolicy(
379 localized_destination.clone(),
380 ))
381 .await
382 {
383 Ok(M2mResponse::DestinationPolicy(policy)) => Some(policy),
384 _ => None, // anyway, errors are handled later, but this may be improved
385 }
386 } else {
387 None
388 };
389 let flow_id = match provenance
390 .call(ProvenanceRequest::GetReferences(source.clone()))
391 .await
392 {
393 Ok(ProvenanceResponse::Provenance(references)) => {
394 let (local_references, remote_references): (
395 HashSet<_>,
396 HashSet<_>,
397 ) = references
398 .into_iter()
399 .partition(|r| *r.node_id() == provenance.node_id());
400 match compliance
401 .call(ComplianceRequest::EvalCompliance {
402 sources: local_references
403 .iter()
404 .map(|r| r.resource().to_owned())
405 .collect(),
406 destination: localized_destination.clone(),
407 destination_policy: destination_policy.clone(),
408 })
409 .await
410 {
411 Ok(ComplianceResponse::Grant) => {
412 // Local compliance check passed. If we have no remote references, we can
413 // grant the flow, otherwise we need to check the compliance of the remote nodes.
414 // Destination policy is required for remote sources compliance checking.
415 if remote_references.is_empty() {
416 debug!(
417 node_id = %provenance.node_id(),
418 "[p2m] Local compliance check passed, granting flow"
419 );
420 Ok(Self::flow_id())
421 } else {
422 // It the destination is not a stream, so it is a local resource, we can get the policy from the compliance service
423 // This could have been done earlier, but we do it here, to make this call only when it is really needed.
424 let destination_policy = if let Some(policy) =
425 destination_policy
426 {
427 policy
428 } else if !destination.is_stream() {
429 match compliance.call(ComplianceRequest::GetPolicy(destination.clone())).await {
430 Ok(ComplianceResponse::Policy(policy)) => policy,
431 _ => return Err(TraceabilityError::InternalTrace2eError),
432 }
433 } else {
434 return Err(
435 TraceabilityError::InternalTrace2eError,
436 );
437 };
438 debug!(
439 node_id = %provenance.node_id(),
440 "[p2m] Querying remote sources compliance"
441 );
442 match m2m
443 .ready()
444 .await?
445 .call(M2mRequest::CheckSourceCompliance {
446 sources: remote_references,
447 destination: (
448 localized_destination,
449 destination_policy,
450 ),
451 })
452 .await
453 {
454 Ok(M2mResponse::Ack) => {
455 // Remote sources compliance check passed
456 // Flow can be granted, return the flow id
457 Ok(Self::flow_id())
458 }
459 Err(e) => Err(e),
460 _ => Err(
461 TraceabilityError::InternalTrace2eError,
462 ),
463 }
464 }
465 }
466 Err(e) => Err(e),
467 _ => Err(TraceabilityError::InternalTrace2eError),
468 }
469 }
470 Err(e) => Err(e),
471 _ => Err(TraceabilityError::InternalTrace2eError),
472 };
473 match flow_id {
474 Ok(flow_id) => {
475 // Compliance check passed, flow can be granted, return the flow id
476 flow_map.insert(flow_id, (source, destination));
477 Ok(P2mResponse::Grant(flow_id))
478 }
479 Err(e) => {
480 debug!(
481 node_id = %provenance.node_id(),
482 error = ?e,
483 "[p2m] Compliance check failed, releasing flow"
484 );
485 // release the flow, and then forward the error
486 sequencer
487 .call(SequencerRequest::ReleaseFlow { destination })
488 .await?;
489 Err(e)
490 }
491 }
492 }
493 _ => Err(TraceabilityError::InternalTrace2eError),
494 }
495 } else {
496 Err(TraceabilityError::UndeclaredResource(pid, fd))
497 }
498 }
499 P2mRequest::IoReport { grant_id, .. } => {
500 if let Some((_, (source, destination))) = flow_map.remove(&grant_id) {
501 info!(
502 node_id = %provenance.node_id(),
503 source = %source,
504 destination = %destination,
505 "[p2m] IoReport"
506 );
507 if let Some(remote_stream) = destination.try_into_localized_peer_stream() {
508 match provenance
509 .call(ProvenanceRequest::GetReferences(source.clone()))
510 .await?
511 {
512 ProvenanceResponse::Provenance(references) => {
513 debug!(
514 remote_node_id = remote_stream.node_id(),
515 "[p2m] Updating remote provenance"
516 );
517 m2m.ready()
518 .await?
519 .call(M2mRequest::UpdateProvenance {
520 source_prov: references,
521 destination: remote_stream,
522 })
523 .await?;
524 }
525 _ => return Err(TraceabilityError::InternalTrace2eError),
526 };
527 } else {
528 info!(
529 node_id = %provenance.node_id(),
530 "[p2m] Updating local provenance"
531 );
532 provenance
533 .call(ProvenanceRequest::UpdateProvenance {
534 source,
535 destination: destination.clone(),
536 })
537 .await?;
538 }
539
540 sequencer.call(SequencerRequest::ReleaseFlow { destination }).await?;
541 Ok(P2mResponse::Ack)
542 } else {
543 Err(TraceabilityError::NotFoundFlow(grant_id))
544 }
545 }
546 }
547 })
548 }
549}
550
551#[cfg(test)]
552mod tests {
553 use tower::Service;
554
555 use super::*;
556 use crate::{
557 traceability::services::{
558 compliance::ComplianceService, provenance::ProvenanceService,
559 sequencer::SequencerService,
560 },
561 transport::nop::M2mNop,
562 };
563
564 #[tokio::test]
565 async fn unit_trace2e_service_request_response() {
566 crate::trace2e_tracing::init();
567 let mut p2m_service = P2mApiService::new(
568 SequencerService::default(),
569 ProvenanceService::default(),
570 ComplianceService::default(),
571 M2mNop,
572 );
573
574 assert_eq!(
575 p2m_service
576 .call(P2mRequest::LocalEnroll { pid: 1, fd: 3, path: "/tmp/test.txt".to_string() })
577 .await
578 .unwrap(),
579 P2mResponse::Ack
580 );
581 assert_eq!(
582 p2m_service
583 .call(P2mRequest::RemoteEnroll {
584 pid: 1,
585 fd: 3,
586 local_socket: "127.0.0.1:8080".to_string(),
587 peer_socket: "127.0.0.1:8081".to_string()
588 })
589 .await
590 .unwrap(),
591 P2mResponse::Ack
592 );
593
594 let P2mResponse::Grant(flow_id) =
595 p2m_service.call(P2mRequest::IoRequest { pid: 1, fd: 3, output: true }).await.unwrap()
596 else {
597 panic!("Expected P2mResponse::Grant");
598 };
599 assert_eq!(
600 p2m_service
601 .call(P2mRequest::IoReport { pid: 1, fd: 3, grant_id: flow_id, result: true })
602 .await
603 .unwrap(),
604 P2mResponse::Ack
605 );
606
607 let P2mResponse::Grant(flow_id) =
608 p2m_service.call(P2mRequest::IoRequest { pid: 1, fd: 3, output: false }).await.unwrap()
609 else {
610 panic!("Expected P2mResponse::Grant");
611 };
612 assert_eq!(
613 p2m_service
614 .call(P2mRequest::IoReport { pid: 1, fd: 3, grant_id: flow_id, result: true })
615 .await
616 .unwrap(),
617 P2mResponse::Ack
618 );
619 }
620
621 #[tokio::test]
622 async fn unit_trace2e_service_validated_resources() {
623 crate::trace2e_tracing::init();
624 let mut p2m_service = P2mApiService::new(
625 SequencerService::default(),
626 ProvenanceService::default(),
627 ComplianceService::default(),
628 M2mNop,
629 )
630 .with_resource_validation(true);
631
632 // Test with invalid process
633 // This request is supposed to be filtered out by the validator
634 assert_eq!(
635 p2m_service
636 .call(P2mRequest::LocalEnroll { pid: 0, fd: 3, path: "/tmp/test.txt".to_string() })
637 .await
638 .unwrap_err()
639 .to_string(),
640 "Traceability error, process not found (pid: 0)"
641 );
642
643 // Test successful process instantiation with validation
644 assert_eq!(
645 p2m_service
646 .call(P2mRequest::LocalEnroll {
647 pid: std::process::id() as i32,
648 fd: 3,
649 path: "/tmp/test.txt".to_string()
650 })
651 .await
652 .unwrap(),
653 P2mResponse::Ack
654 );
655 }
656
657 #[tokio::test]
658 async fn unit_trace2e_service_io_invalid_request() {
659 crate::trace2e_tracing::init();
660 let mut p2m_service = P2mApiService::new(
661 SequencerService::default(),
662 ProvenanceService::default(),
663 ComplianceService::default(),
664 M2mNop,
665 )
666 .with_resource_validation(true);
667
668 // Neither process nor fd are enrolled
669 assert_eq!(
670 p2m_service
671 .call(P2mRequest::IoRequest { pid: std::process::id() as i32, fd: 3, output: true })
672 .await
673 .unwrap_err()
674 .to_string(),
675 format!(
676 "Traceability error, undeclared resource (pid: {}, fd: 3)",
677 std::process::id() as i32
678 )
679 );
680
681 p2m_service
682 .call(P2mRequest::LocalEnroll {
683 pid: std::process::id() as i32,
684 fd: 4,
685 path: "/tmp/test.txt".to_string(),
686 })
687 .await
688 .unwrap();
689
690 // Only process is enrolled
691 assert_eq!(
692 p2m_service
693 .call(P2mRequest::IoRequest { pid: std::process::id() as i32, fd: 3, output: true })
694 .await
695 .unwrap_err()
696 .to_string(),
697 format!(
698 "Traceability error, undeclared resource (pid: {}, fd: 3)",
699 std::process::id() as i32
700 )
701 );
702 }
703
704 #[tokio::test]
705 async fn unit_trace2e_service_io_invalid_report() {
706 crate::trace2e_tracing::init();
707 let mut p2m_service = P2mApiService::new(
708 SequencerService::default(),
709 ProvenanceService::default(),
710 ComplianceService::default(),
711 M2mNop,
712 )
713 .with_resource_validation(true);
714
715 // Invalid grant id
716 assert_eq!(
717 p2m_service
718 .call(P2mRequest::IoReport { pid: 1, fd: 3, grant_id: 0, result: true })
719 .await
720 .unwrap_err()
721 .to_string(),
722 "Traceability error, flow not found (id: 0)"
723 );
724 }
725
726 #[tokio::test]
727 async fn unit_trace2e_service_integrated_validation() {
728 crate::trace2e_tracing::init();
729
730 // Test P2M service with integrated validation enabled
731 let mut p2m_service_with_validation = P2mApiService::new(
732 SequencerService::default(),
733 ProvenanceService::default(),
734 ComplianceService::default(),
735 M2mNop,
736 )
737 .with_resource_validation(true);
738
739 // Test P2M service with validation disabled
740 let mut p2m_service_without_validation = P2mApiService::new(
741 SequencerService::default(),
742 ProvenanceService::default(),
743 ComplianceService::default(),
744 M2mNop,
745 )
746 .with_resource_validation(false);
747
748 // Test invalid process - should fail with validation enabled
749 assert_eq!(
750 p2m_service_with_validation
751 .call(P2mRequest::LocalEnroll { pid: 0, fd: 3, path: "/tmp/test.txt".to_string() })
752 .await
753 .unwrap_err()
754 .to_string(),
755 "Traceability error, process not found (pid: 0)"
756 );
757
758 // Test invalid process - should succeed with validation disabled
759 assert_eq!(
760 p2m_service_without_validation
761 .call(P2mRequest::LocalEnroll { pid: 0, fd: 3, path: "/tmp/test.txt".to_string() })
762 .await
763 .unwrap(),
764 P2mResponse::Ack
765 );
766
767 // Test valid process - should succeed with validation enabled
768 assert_eq!(
769 p2m_service_with_validation
770 .call(P2mRequest::LocalEnroll {
771 pid: std::process::id() as i32,
772 fd: 3,
773 path: "/tmp/test.txt".to_string()
774 })
775 .await
776 .unwrap(),
777 P2mResponse::Ack
778 );
779
780 // Test valid process - should succeed with validation disabled
781 assert_eq!(
782 p2m_service_without_validation
783 .call(P2mRequest::LocalEnroll {
784 pid: std::process::id() as i32,
785 fd: 3,
786 path: "/tmp/test.txt".to_string()
787 })
788 .await
789 .unwrap(),
790 P2mResponse::Ack
791 );
792 }
793}