trace2e_core/transport/
grpc.rs

1//! # gRPC Transport Implementation
2//!
3//! This module provides the gRPC-based transport layer for distributed traceability
4//! operations in the trace2e framework. It implements both client and server
5//! functionality for machine-to-machine (M2M) communication using Protocol Buffers
6//! and the Tonic gRPC framework.
7//!
8//! ## Components
9//!
10//! - **M2mGrpc**: Client service for making outbound gRPC calls to remote middleware
11//! - **P2mHandler**: Server implementation that routes incoming gRPC requests for P2M operations
12//! - **M2mHandler**: Server implementation that routes incoming gRPC requests for M2M operations
13//! - **O2mHandler**: Server implementation that routes incoming gRPC requests for O2M operations
14//! - **Protocol Buffer Conversions**: Type conversions between internal and protobuf types
15//!
16//! ## Connection Management
17//!
18//! The gRPC client maintains a cache of connected remote clients to avoid
19//! repeated connection overhead. Connections are established on-demand and
20//! reused for subsequent requests to the same remote endpoint.
21//!
22//! ## Service Operations
23//!
24//! ### Process-to-Middleware (P2M)
25//! - Local process enrollment (file descriptors)
26//! - Remote process enrollment (network connections)
27//! - I/O request authorization
28//! - I/O operation reporting
29//!
30//! ### Machine-to-Machine (M2M)
31//! - Destination compliance policy retrieval
32//! - Source compliance policy retrieval
33//! - Provenance information updates
34//!
35//! ### Operator-to-Middleware (O2M)
36//! - Policy management
37//! - Confidentiality management
38//! - Integrity management
39//! - Deletion management
40//! - Consent management
41//! - Provenance information retrieval
42//!
43//! ## Protocol Buffer Integration
44//!
45//! The module includes comprehensive type conversions between the internal
46//! trace2e types and their Protocol Buffer representations, ensuring seamless
47//! serialization and deserialization across network boundaries.
48
49use std::{
50    collections::{HashMap, HashSet},
51    future::Future,
52    pin::Pin,
53    sync::Arc,
54    task::Poll,
55};
56
57use dashmap::DashMap;
58use futures::future::try_join_all;
59use tokio_stream::{StreamExt, wrappers::BroadcastStream};
60use tonic::{Request, Response, Status, transport::Channel};
61use tower::Service;
62use tracing::info;
63
64/// Default port for gRPC communication between trace2e middleware instances.
65pub const DEFAULT_GRPC_PORT: u16 = 50051;
66
67/// Protocol Buffer definitions and descriptor sets for the trace2e gRPC service.
68pub mod proto {
69    tonic::include_proto!("trace2e");
70    pub mod primitives {
71        tonic::include_proto!("trace2e.primitives");
72    }
73    pub mod messages {
74        tonic::include_proto!("trace2e.messages");
75    }
76
77    /// Pre-compiled Protocol Buffer descriptor set for service reflection.
78    pub const MIDDLEWARE_DESCRIPTOR_SET: &[u8] = include_bytes!("../../trace2e_descriptor.bin");
79}
80
81use crate::{
82    traceability::{
83        api::types::{M2mRequest, M2mResponse, O2mRequest, O2mResponse, P2mRequest, P2mResponse},
84        error::TraceabilityError,
85        infrastructure::naming::{
86            DisplayableResource, Fd, File, LocalizedResource, Process, Resource, Stream,
87        },
88        services::{
89            compliance::{ConfidentialityPolicy, Policy},
90            consent::Destination,
91        },
92    },
93    transport::eval_remote_ip,
94};
95
96/// Converts traceability errors to gRPC Status codes for wire transmission.
97impl From<TraceabilityError> for Status {
98    fn from(error: TraceabilityError) -> Self {
99        Status::internal(error.to_string())
100    }
101}
102
103/// gRPC client service for machine-to-machine communication.
104///
105/// `M2mGrpc` provides the client-side implementation for making gRPC calls to
106/// remote trace2e middleware instances. It manages connection pooling and
107/// handles the translation between internal M2M requests and gRPC protocol.
108///
109/// ## Connection Management
110///
111/// The service maintains a thread-safe cache of connected clients to avoid
112/// connection overhead. Connections are established lazily when first needed
113/// and reused for subsequent requests to the same remote endpoint.
114///
115/// ## Request Routing
116///
117/// The service automatically determines the target remote IP address from
118/// the request payload and routes the call to the appropriate endpoint.
119/// Network failures are reported as transport errors.
120#[derive(Default, Clone)]
121pub struct M2mGrpc {
122    /// Cache of established gRPC client connections indexed by remote IP address.
123    connected_remotes: Arc<DashMap<String, proto::m2m_client::M2mClient<Channel>>>,
124    /// Mock mode flag for local integation testing
125    mock_mode: bool,
126}
127
128impl M2mGrpc {
129    pub fn mock() -> Self {
130        Self { mock_mode: true, connected_remotes: Arc::new(DashMap::new()) }
131    }
132
133    /// Establishes a new gRPC connection to a remote middleware instance.
134    ///
135    /// Creates a new client connection to the specified remote IP address
136    /// using the default gRPC port. The connection is cached for future use.
137    ///
138    /// # Arguments
139    ///
140    /// * `remote_ip` - The IP address of the remote middleware instance
141    ///
142    /// # Returns
143    ///
144    /// A connected gRPC client, or an error if connection fails.
145    ///
146    /// # Errors
147    ///
148    /// Returns `TransportFailedToContactRemote` if the connection cannot be established.
149    async fn connect_remote(
150        &self,
151        remote_ip: String,
152    ) -> Result<proto::m2m_client::M2mClient<Channel>, TraceabilityError> {
153        match proto::m2m_client::M2mClient::connect(format!(
154            "http://{remote_ip}:{DEFAULT_GRPC_PORT}"
155        ))
156        .await
157        {
158            Ok(client) => {
159                self.connected_remotes.insert(remote_ip, client.clone());
160                Ok(client)
161            }
162            Err(_) => Err(TraceabilityError::TransportFailedToContactRemote(remote_ip)),
163        }
164    }
165
166    /// Retrieves an existing cached gRPC client for the specified remote IP.
167    ///
168    /// # Arguments
169    ///
170    /// * `remote_ip` - The IP address to look up in the connection cache
171    ///
172    /// # Returns
173    ///
174    /// An existing client connection if available, None otherwise.
175    async fn get_client(&self, remote_ip: String) -> Option<proto::m2m_client::M2mClient<Channel>> {
176        self.connected_remotes.get(&remote_ip).map(|c| c.to_owned())
177    }
178
179    /// Retrieves an existing client or establishes a new connection if needed.
180    ///
181    /// This method first checks the connection cache and returns an existing
182    /// client if available. If no cached connection exists, it establishes
183    /// a new connection and caches it for future use.
184    ///
185    /// # Arguments
186    ///
187    /// * `remote_ip` - The IP address of the target middleware instance
188    ///
189    /// # Returns
190    ///
191    /// A ready-to-use gRPC client connection.
192    ///
193    /// # Errors
194    ///
195    /// Returns `TransportFailedToContactRemote` if connection establishment fails.
196    async fn get_client_or_connect(
197        &self,
198        remote_ip: String,
199    ) -> Result<proto::m2m_client::M2mClient<Channel>, TraceabilityError> {
200        // if in mock mode, override remote_ip to localhost for testing
201        let ip = if self.mock_mode { "127.0.0.1".to_string() } else { remote_ip };
202        match self.get_client(ip.clone()).await {
203            Some(client) => Ok(client),
204            None => self.connect_remote(ip).await,
205        }
206    }
207}
208
209impl Service<M2mRequest> for M2mGrpc {
210    type Response = M2mResponse;
211    type Error = TraceabilityError;
212    type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
213
214    fn poll_ready(&mut self, _: &mut std::task::Context<'_>) -> Poll<Result<(), Self::Error>> {
215        Poll::Ready(Ok(()))
216    }
217
218    fn call(&mut self, request: M2mRequest) -> Self::Future {
219        let this = self.clone();
220        Box::pin(async move {
221            match request.clone() {
222                M2mRequest::GetDestinationPolicy(destination) => {
223                    info!(
224                        destination = %destination,
225                        "[gRPC-client] GetDestinationPolicy"
226                    );
227                    let remote_ip = eval_remote_ip(request)?;
228                    let mut client = this.get_client_or_connect(remote_ip.clone()).await?;
229
230                    // Create the protobuf request
231                    let proto_req = proto::messages::GetDestinationPolicy {
232                        destination: Some(destination.into()),
233                    };
234
235                    // Make the gRPC call
236                    let response = client
237                        .m2m_destination_policy(Request::new(proto_req))
238                        .await
239                        .map_err(|_| TraceabilityError::TransportFailedToContactRemote(remote_ip))?
240                        .into_inner();
241                    Ok(M2mResponse::DestinationPolicy(
242                        response.policy.map(|policy| policy.into()).unwrap_or_default(),
243                    ))
244                }
245                M2mRequest::CheckSourceCompliance { sources, destination } => {
246                    info!(
247                        sources = %DisplayableResource::from(&sources),
248                        destination = %destination.0,
249                        destination_policy = ?destination.1,
250                        "[gRPC-client] CheckSourceCompliance"
251                    );
252                    // Create sources partition by node_id
253                    let sources_partition = sources.iter().fold(
254                        HashMap::new(),
255                        |mut partitions: HashMap<&String, HashSet<&LocalizedResource>>, lr| {
256                            partitions.entry(lr.node_id()).or_default().insert(lr);
257                            partitions
258                        },
259                    );
260
261                    let futures = sources_partition
262                        .into_iter()
263                        .map(|(node_id, sources)| {
264                            let this_clone = this.clone();
265                            let dest_resource = destination.0.clone();
266                            let dest_policy = destination.1.clone();
267                            async move {
268                                let mut client =
269                                    this_clone.get_client_or_connect(node_id.to_string()).await?;
270                                client
271                                    .m2m_check_source_compliance(Request::new(
272                                        proto::messages::CheckSourceCompliance {
273                                            sources: sources
274                                                .iter()
275                                                .map(|r| (**r).clone().into())
276                                                .collect(),
277                                            destination: Some((dest_resource).into()),
278                                            destination_policy: Some((dest_policy).into()),
279                                        },
280                                    ))
281                                    .await
282                                    .map_err(|_| {
283                                        TraceabilityError::TransportFailedToContactRemote(
284                                            node_id.to_string(),
285                                        )
286                                    })
287                            }
288                        })
289                        .collect::<Vec<_>>();
290
291                    // Collect all results and return error if any failed
292                    try_join_all(futures).await?;
293                    Ok(M2mResponse::Ack)
294                }
295                M2mRequest::UpdateProvenance { source_prov, destination } => {
296                    info!(
297                        source_prov = %DisplayableResource::from(&source_prov),
298                        destination = %destination,
299                        "[gRPC-client] UpdateProvenance"
300                    );
301                    let remote_ip = eval_remote_ip(request)?;
302                    let mut client = this.get_client_or_connect(remote_ip.clone()).await?;
303
304                    // Group LocalizedResources by node_id
305                    let mut grouped: HashMap<String, Vec<proto::primitives::Resource>> =
306                        HashMap::default();
307                    for lr in source_prov {
308                        grouped
309                            .entry(lr.node_id().clone())
310                            .or_default()
311                            .push(lr.resource().clone().into());
312                    }
313
314                    // Convert to Vec<References>
315                    let source_prov_proto: Vec<proto::primitives::References> = grouped
316                        .into_iter()
317                        .map(|(node, resources)| proto::primitives::References { node, resources })
318                        .collect();
319
320                    // Create the protobuf request
321                    let proto_req = proto::messages::UpdateProvenance {
322                        source_prov: source_prov_proto,
323                        destination: Some(destination.into()),
324                    };
325
326                    // Make the gRPC call
327                    client.m2m_update_provenance(Request::new(proto_req)).await.map_err(|_| {
328                        TraceabilityError::TransportFailedToContactRemote(remote_ip)
329                    })?;
330
331                    Ok(M2mResponse::Ack)
332                }
333                M2mRequest::BroadcastDeletion(resource) => {
334                    info!(
335                        resource = %resource,
336                        "[gRPC-client] BroadcastDeletion"
337                    );
338                    let remote_ip = eval_remote_ip(request)?;
339                    let mut client = this.get_client_or_connect(remote_ip.clone()).await?;
340
341                    // Create the protobuf request
342                    let proto_req = proto::messages::BroadcastDeletionRequest {
343                        resource: Some(resource.into()),
344                    };
345
346                    // Make the gRPC call
347                    client.m2m_broadcast_deletion(Request::new(proto_req)).await.map_err(|_| {
348                        TraceabilityError::TransportFailedToContactRemote(remote_ip)
349                    })?;
350
351                    Ok(M2mResponse::Ack)
352                }
353            }
354        })
355    }
356}
357
358/// gRPC server router that handles incoming requests and routes them to appropriate services.
359///
360/// `Trace2eRouter` implements the gRPC server-side logic by accepting incoming
361/// requests and routing them to the appropriate process-to-middleware (P2M) or
362/// machine-to-machine (M2M) service handlers.
363///
364/// ## Type Parameters
365///
366/// * `P2mApi` - Service handling process-to-middleware requests
367/// * `M2mApi` - Service handling machine-to-machine requests
368///
369/// ## Request Routing
370///
371/// The router translates incoming Protocol Buffer requests to internal API
372/// types, calls the appropriate service, and converts responses back to
373/// Protocol Buffer format for transmission.
374pub struct P2mHandler<P2mApi> {
375    /// Process-to-middleware service handler.
376    p2m: P2mApi,
377}
378
379impl<P2mApi> P2mHandler<P2mApi> {
380    /// Creates a new router with the specified service handlers.
381    ///
382    /// # Arguments
383    ///
384    /// * `p2m` - Service for handling process-to-middleware requests
385    pub fn new(p2m: P2mApi) -> Self {
386        Self { p2m }
387    }
388}
389
390/// Implementation of the trace2e gRPC service protocol.
391///
392/// This implementation provides the server-side handlers for all gRPC endpoints
393/// defined in the trace2e protocol. It handles both P2M (process-to-middleware)
394/// and M2M (machine-to-machine) operations by delegating to the appropriate
395/// internal service handlers.
396#[tonic::async_trait]
397impl<P2mApi> proto::p2m_server::P2m for P2mHandler<P2mApi>
398where
399    P2mApi: Service<P2mRequest, Response = P2mResponse, Error = TraceabilityError>
400        + Clone
401        + Sync
402        + Send
403        + 'static,
404    P2mApi::Future: Send,
405{
406    /// Handles local process enrollment requests.
407    ///
408    /// Registers a local file descriptor with the middleware for tracking.
409    /// This is called when a process opens a local file or resource.
410    async fn p2m_local_enroll(
411        &self,
412        request: Request<proto::messages::LocalCt>,
413    ) -> Result<Response<proto::messages::Ack>, Status> {
414        let req = request.into_inner();
415        let mut p2m = self.p2m.clone();
416        match p2m
417            .call(P2mRequest::LocalEnroll {
418                pid: req.process_id,
419                fd: req.file_descriptor,
420                path: req.path,
421            })
422            .await?
423        {
424            P2mResponse::Ack => Ok(Response::new(proto::messages::Ack {})),
425            _ => Err(Status::internal("Internal traceability API error")),
426        }
427    }
428
429    /// Handles remote process enrollment requests.
430    ///
431    /// Registers a network connection (socket) with the middleware for tracking.
432    /// This is called when a process establishes a network connection.
433    async fn p2m_remote_enroll(
434        &self,
435        request: Request<proto::messages::RemoteCt>,
436    ) -> Result<Response<proto::messages::Ack>, Status> {
437        let req = request.into_inner();
438        let mut p2m = self.p2m.clone();
439        match p2m
440            .call(P2mRequest::RemoteEnroll {
441                pid: req.process_id,
442                fd: req.file_descriptor,
443                local_socket: req.local_socket,
444                peer_socket: req.peer_socket,
445            })
446            .await?
447        {
448            P2mResponse::Ack => Ok(Response::new(proto::messages::Ack {})),
449            _ => Err(Status::internal("Internal traceability API error")),
450        }
451    }
452
453    /// Handles I/O authorization requests from processes.
454    ///
455    /// Evaluates whether a process is authorized to perform an I/O operation
456    /// on a specific file descriptor. Returns a grant ID if authorized.
457    async fn p2m_io_request(
458        &self,
459        request: Request<proto::messages::IoInfo>,
460    ) -> Result<Response<proto::messages::Grant>, Status> {
461        let req = request.into_inner();
462        let mut p2m = self.p2m.clone();
463        match p2m
464            .call(P2mRequest::IoRequest {
465                pid: req.process_id,
466                fd: req.file_descriptor,
467                output: req.flow == proto::primitives::Flow::Output as i32,
468            })
469            .await?
470        {
471            P2mResponse::Grant(id) => {
472                Ok(Response::new(proto::messages::Grant { id: id.to_string() }))
473            }
474            _ => Err(Status::internal("Internal traceability API error")),
475        }
476    }
477
478    /// Handles I/O operation completion reports from processes.
479    ///
480    /// Records the completion and result of an I/O operation that was
481    /// previously authorized. This completes the audit trail for the operation.
482    async fn p2m_io_report(
483        &self,
484        request: Request<proto::messages::IoResult>,
485    ) -> Result<Response<proto::messages::Ack>, Status> {
486        let req = request.into_inner();
487        let mut p2m = self.p2m.clone();
488        match p2m
489            .call(P2mRequest::IoReport {
490                pid: req.process_id,
491                fd: req.file_descriptor,
492                grant_id: req.grant_id.parse::<u128>().unwrap_or_default(),
493                result: req.result,
494            })
495            .await?
496        {
497            P2mResponse::Ack => Ok(Response::new(proto::messages::Ack {})),
498            _ => Err(Status::internal("Internal traceability API error")),
499        }
500    }
501}
502pub struct M2mHandler<M2mApi> {
503    /// Machine-to-machine service handler.
504    m2m: M2mApi,
505}
506
507impl<M2mApi> M2mHandler<M2mApi> {
508    /// Creates a new router with the specified service handlers.
509    ///
510    /// # Arguments
511    ///
512    /// * `m2m` - Service for handling machine-to-machine requests
513    pub fn new(m2m: M2mApi) -> Self {
514        Self { m2m }
515    }
516}
517
518#[tonic::async_trait]
519impl<M2mApi> proto::m2m_server::M2m for M2mHandler<M2mApi>
520where
521    M2mApi: Service<M2mRequest, Response = M2mResponse, Error = TraceabilityError>
522        + Clone
523        + Sync
524        + Send
525        + 'static,
526    M2mApi::Future: Send,
527{
528    /// Handles destination policy requests from remote middleware.
529    ///
530    /// Returns the compliance policy for a destination resource to enable
531    /// remote middleware instances to evaluate flow authorization.
532    async fn m2m_destination_policy(
533        &self,
534        request: Request<proto::messages::GetDestinationPolicy>,
535    ) -> Result<Response<proto::messages::DestinationPolicy>, Status> {
536        info!("[gRPC-server] m2m_destination_policy");
537        let req = request.into_inner();
538        let mut m2m = self.m2m.clone();
539        match m2m.call(req.into()).await? {
540            M2mResponse::DestinationPolicy(policy) => Ok(Response::new(policy.into())),
541            _ => Err(Status::internal("Internal traceability API error")),
542        }
543    }
544
545    /// Handles source compliance checking requests from remote middleware.
546    ///
547    /// Checks compliance policies for a set of source resources to enable
548    /// distributed flow evaluation across multiple middleware instances.
549    async fn m2m_check_source_compliance(
550        &self,
551        request: Request<proto::messages::CheckSourceCompliance>,
552    ) -> Result<Response<proto::messages::Ack>, Status> {
553        info!("[gRPC-server] m2m_check_source_compliance");
554        let req = request.into_inner();
555        let mut m2m = self.m2m.clone();
556        m2m.call(req.into()).await?;
557        Ok(Response::new(proto::messages::Ack {}))
558    }
559
560    /// Handles provenance update requests from remote middleware.
561    ///
562    /// Updates the provenance information for a destination resource based
563    /// on data flows from remote sources. This maintains the audit trail
564    /// across distributed operations.
565    async fn m2m_update_provenance(
566        &self,
567        request: Request<proto::messages::UpdateProvenance>,
568    ) -> Result<Response<proto::messages::Ack>, Status> {
569        info!("[gRPC-server] m2m_update_provenance");
570        let req = request.into_inner();
571        let mut m2m = self.m2m.clone();
572        match m2m.call(req.into()).await? {
573            M2mResponse::Ack => Ok(Response::new(proto::messages::Ack {})),
574            _ => Err(Status::internal("Internal traceability API error")),
575        }
576    }
577
578    /// Handles deletion broadcast requests from remote middleware.
579    ///
580    /// Broadcasts the deletion of a resource to all middleware instances.
581    async fn m2m_broadcast_deletion(
582        &self,
583        request: Request<proto::messages::BroadcastDeletionRequest>,
584    ) -> Result<Response<proto::messages::Ack>, Status> {
585        info!("[gRPC-server] m2m_broadcast_deletion");
586        let req = request.into_inner();
587        let mut m2m = self.m2m.clone();
588        match m2m.call(req.into()).await? {
589            M2mResponse::Ack => Ok(Response::new(proto::messages::Ack {})),
590            _ => Err(Status::internal("Internal traceability API error")),
591        }
592    }
593}
594
595/// gRPC server handler for operator-to-middleware operations.
596///
597/// `O2mHandler` implements the gRPC server-side logic for administrative
598/// operations including policy management, provenance queries, and consent
599/// management by operators and compliance officers.
600///
601/// ## Type Parameters
602///
603/// * `O2mApi` - Service handling operator-to-middleware requests
604pub struct O2mHandler<O2mApi> {
605    /// Operator-to-middleware service handler.
606    o2m: O2mApi,
607}
608
609impl<O2mApi> O2mHandler<O2mApi> {
610    /// Creates a new O2M handler with the specified service handler.
611    ///
612    /// # Arguments
613    ///
614    /// * `o2m` - Service for handling operator-to-middleware requests
615    pub fn new(o2m: O2mApi) -> Self {
616        Self { o2m }
617    }
618}
619
620/// Implementation of the O2M gRPC service protocol.
621///
622/// This implementation provides the server-side handlers for all operator-facing
623/// endpoints including policy configuration and provenance analysis.
624#[tonic::async_trait]
625#[allow(clippy::result_large_err)]
626impl<O2mApi> proto::o2m_server::O2m for O2mHandler<O2mApi>
627where
628    O2mApi: Service<O2mRequest, Response = O2mResponse, Error = TraceabilityError>
629        + Clone
630        + Sync
631        + Send
632        + 'static,
633    O2mApi::Future: Send,
634{
635    /// Handles policy retrieval requests from operators.
636    ///
637    /// Returns the current compliance policies for the specified set of resources.
638    async fn o2m_get_policies(
639        &self,
640        request: Request<proto::messages::GetPoliciesRequest>,
641    ) -> Result<Response<proto::messages::GetPoliciesResponse>, Status> {
642        let req = request.into_inner();
643        let mut o2m = self.o2m.clone();
644        match o2m.call(req.into()).await? {
645            O2mResponse::Policies(policies) => Ok(Response::new(policies.into())),
646            _ => Err(Status::internal("Internal traceability API error")),
647        }
648    }
649
650    /// Handles policy update requests from operators.
651    ///
652    /// Sets a complete compliance policy for a specific resource.
653    async fn o2m_set_policy(
654        &self,
655        request: Request<proto::messages::SetPolicyRequest>,
656    ) -> Result<Response<proto::messages::Ack>, Status> {
657        let req = request.into_inner();
658        let mut o2m = self.o2m.clone();
659        match o2m.call(req.into()).await? {
660            O2mResponse::Ack => Ok(Response::new(proto::messages::Ack {})),
661            _ => Err(Status::internal("Internal traceability API error")),
662        }
663    }
664
665    /// Handles confidentiality setting requests from operators.
666    ///
667    /// Updates the confidentiality policy for a specific resource.
668    async fn o2m_set_confidentiality(
669        &self,
670        request: Request<proto::messages::SetConfidentialityRequest>,
671    ) -> Result<Response<proto::messages::Ack>, Status> {
672        let req = request.into_inner();
673        let mut o2m = self.o2m.clone();
674        match o2m.call(req.into()).await? {
675            O2mResponse::Ack => Ok(Response::new(proto::messages::Ack {})),
676            _ => Err(Status::internal("Internal traceability API error")),
677        }
678    }
679
680    /// Handles integrity setting requests from operators.
681    ///
682    /// Updates the integrity level for a specific resource.
683    async fn o2m_set_integrity(
684        &self,
685        request: Request<proto::messages::SetIntegrityRequest>,
686    ) -> Result<Response<proto::messages::Ack>, Status> {
687        let req = request.into_inner();
688        let mut o2m = self.o2m.clone();
689        match o2m.call(req.into()).await? {
690            O2mResponse::Ack => Ok(Response::new(proto::messages::Ack {})),
691            _ => Err(Status::internal("Internal traceability API error")),
692        }
693    }
694
695    /// Handles deletion marking requests from operators.
696    ///
697    /// Marks a resource as deleted for compliance tracking purposes.
698    async fn o2m_set_deleted(
699        &self,
700        request: Request<proto::messages::SetDeletedRequest>,
701    ) -> Result<Response<proto::messages::Ack>, Status> {
702        let req = request.into_inner();
703        let mut o2m = self.o2m.clone();
704        match o2m.call(req.into()).await? {
705            O2mResponse::Ack => Ok(Response::new(proto::messages::Ack {})),
706            _ => Err(Status::internal("Internal traceability API error")),
707        }
708    }
709
710    type O2MEnforceConsentStream = Pin<
711        Box<
712            dyn tokio_stream::Stream<Item = Result<proto::messages::ConsentNotification, Status>>
713                + Send,
714        >,
715    >;
716
717    /// Handles consent enforcement requests from operators.
718    ///
719    /// Enables consent enforcement for a resource and returns a stream of
720    /// consent request notifications that can be monitored by the operator.
721    async fn o2m_enforce_consent(
722        &self,
723        request: Request<proto::messages::EnforceConsentRequest>,
724    ) -> Result<Response<Self::O2MEnforceConsentStream>, Status> {
725        let req = request.into_inner();
726        let mut o2m = self.o2m.clone();
727        match o2m.call(req.into()).await? {
728            O2mResponse::Notifications(receiver) => {
729                // Convert the broadcast receiver into a stream of consent notifications
730                let stream = BroadcastStream::new(receiver).map(|result| {
731                    match result {
732                        Ok(destination) => {
733                            // Format destination as human-readable consent request message
734                            let consent_request =
735                                format!("Consent request for destination: {:?}", destination);
736                            Ok(proto::messages::ConsentNotification { consent_request })
737                        }
738                        Err(e) => {
739                            Err(Status::internal(format!("Notification stream error: {}", e)))
740                        }
741                    }
742                });
743                Ok(Response::new(Box::pin(stream)))
744            }
745            _ => Err(Status::internal("Internal traceability API error")),
746        }
747    }
748
749    /// Handles consent decision requests from operators.
750    ///
751    /// Sets the consent decision for a specific data flow operation.
752    async fn o2m_set_consent_decision(
753        &self,
754        request: Request<proto::messages::SetConsentDecisionRequest>,
755    ) -> Result<Response<proto::messages::Ack>, Status> {
756        let req = request.into_inner();
757        let mut o2m = self.o2m.clone();
758        match o2m.call(req.into()).await? {
759            O2mResponse::Ack => Ok(Response::new(proto::messages::Ack {})),
760            _ => Err(Status::internal("Internal traceability API error")),
761        }
762    }
763
764    /// Handles provenance query requests from operators.
765    ///
766    /// Returns the complete provenance lineage for a specific resource.
767    async fn o2m_get_references(
768        &self,
769        request: Request<proto::messages::GetReferencesRequest>,
770    ) -> Result<Response<proto::messages::GetReferencesResponse>, Status> {
771        let req = request.into_inner();
772        let mut o2m = self.o2m.clone();
773        match o2m.call(req.into()).await? {
774            O2mResponse::References(references) => {
775                // Group LocalizedResources by node_id
776                let mut grouped: HashMap<String, HashSet<Resource>> = HashMap::default();
777                for lr in references {
778                    grouped.entry(lr.node_id().clone()).or_default().insert(lr.resource().clone());
779                }
780                Ok(Response::new(grouped.into()))
781            }
782            _ => Err(Status::internal("Internal traceability API error")),
783        }
784    }
785}
786
787// ========== Protocol Buffer Type Conversions ==========
788// Fundamental type conversions for serialization/deserialization
789
790/// Converts Protocol Buffer Resource to internal Resource type.
791impl From<proto::primitives::Resource> for Resource {
792    fn from(req: proto::primitives::Resource) -> Self {
793        match req.resource {
794            Some(proto::primitives::resource::Resource::Fd(fd)) => Resource::Fd(fd.into()),
795            Some(proto::primitives::resource::Resource::Process(process)) => {
796                Resource::Process(process.into())
797            }
798            None => Resource::None,
799        }
800    }
801}
802
803/// Converts internal Resource to Protocol Buffer Resource type.
804impl From<Resource> for proto::primitives::Resource {
805    fn from(resource: Resource) -> Self {
806        match resource {
807            Resource::Fd(fd) => proto::primitives::Resource {
808                resource: Some(proto::primitives::resource::Resource::Fd(fd.into())),
809            },
810            Resource::Process(process) => proto::primitives::Resource {
811                resource: Some(proto::primitives::resource::Resource::Process(process.into())),
812            },
813            Resource::None => proto::primitives::Resource { resource: None },
814        }
815    }
816}
817
818impl From<proto::primitives::Fd> for Fd {
819    fn from(proto_fd: proto::primitives::Fd) -> Self {
820        match proto_fd.fd {
821            Some(proto::primitives::fd::Fd::File(file)) => Fd::File(file.into()),
822            Some(proto::primitives::fd::Fd::Stream(stream)) => Fd::Stream(stream.into()),
823            None => Fd::File(File { path: String::new() }),
824        }
825    }
826}
827
828impl From<Fd> for proto::primitives::Fd {
829    fn from(fd: Fd) -> Self {
830        match fd {
831            Fd::File(file) => {
832                proto::primitives::Fd { fd: Some(proto::primitives::fd::Fd::File(file.into())) }
833            }
834            Fd::Stream(stream) => {
835                proto::primitives::Fd { fd: Some(proto::primitives::fd::Fd::Stream(stream.into())) }
836            }
837        }
838    }
839}
840
841impl From<proto::primitives::File> for File {
842    fn from(proto_file: proto::primitives::File) -> Self {
843        File { path: proto_file.path }
844    }
845}
846
847impl From<File> for proto::primitives::File {
848    fn from(file: File) -> Self {
849        proto::primitives::File { path: file.path }
850    }
851}
852
853impl From<proto::primitives::Stream> for Stream {
854    fn from(proto_stream: proto::primitives::Stream) -> Self {
855        Stream { local_socket: proto_stream.local_socket, peer_socket: proto_stream.peer_socket }
856    }
857}
858
859impl From<Stream> for proto::primitives::Stream {
860    fn from(stream: Stream) -> Self {
861        proto::primitives::Stream {
862            local_socket: stream.local_socket,
863            peer_socket: stream.peer_socket,
864        }
865    }
866}
867
868impl From<proto::primitives::Process> for Process {
869    fn from(proto_process: proto::primitives::Process) -> Self {
870        Process {
871            pid: proto_process.pid,
872            starttime: proto_process.starttime,
873            exe_path: proto_process.exe_path,
874        }
875    }
876}
877
878impl From<Process> for proto::primitives::Process {
879    fn from(process: Process) -> Self {
880        proto::primitives::Process {
881            pid: process.pid,
882            starttime: process.starttime,
883            exe_path: process.exe_path,
884        }
885    }
886}
887
888// ========== LocalizedResource and Destination Conversions ==========
889
890/// Converts Protocol Buffer LocalizedResource to internal LocalizedResource type.
891impl From<proto::primitives::LocalizedResource> for LocalizedResource {
892    fn from(proto_lr: proto::primitives::LocalizedResource) -> Self {
893        LocalizedResource::new(
894            proto_lr.node_id,
895            proto_lr.resource.map(|r| r.into()).unwrap_or_default(),
896        )
897    }
898}
899
900/// Converts internal LocalizedResource to Protocol Buffer LocalizedResource type.
901impl From<LocalizedResource> for proto::primitives::LocalizedResource {
902    fn from(lr: LocalizedResource) -> Self {
903        proto::primitives::LocalizedResource {
904            node_id: lr.node_id().clone(),
905            resource: Some(lr.resource().clone().into()),
906        }
907    }
908}
909
910/// Converts Protocol Buffer Destination to internal Destination type.
911impl From<proto::primitives::Destination> for Destination {
912    fn from(proto_dest: proto::primitives::Destination) -> Self {
913        match proto_dest.destination {
914            Some(proto::primitives::destination::Destination::Resource(lr_with_parent)) => {
915                if let Some(proto_lr) = lr_with_parent.resource {
916                    let localized_resource: LocalizedResource = proto_lr.into();
917                    let parent = lr_with_parent.parent.map(|p| Box::new((*p).into()));
918                    // Preserve the LocalizedResource as a Resource with node_id in the parent
919                    Destination::Resource {
920                        resource: localized_resource.resource().clone(),
921                        parent,
922                    }
923                } else {
924                    Destination::Resource { resource: Resource::None, parent: None }
925                }
926            }
927            Some(proto::primitives::destination::Destination::Node(node_id)) => {
928                Destination::Node(node_id)
929            }
930            None => Destination::Node(String::new()),
931        }
932    }
933}
934
935/// Converts internal Destination to Protocol Buffer Destination type.
936/// Preserves LocalizedResource context by reconstructing the hierarchical structure.
937impl From<Destination> for proto::primitives::Destination {
938    fn from(dest: Destination) -> Self {
939        destination_to_proto_node_variant(dest)
940    }
941}
942
943/// Helper function that performs the actual Destination -> proto conversion logic.
944/// This can be reused by other modules that need the same conversion.
945pub fn destination_to_proto_node_variant(dest: Destination) -> proto::primitives::Destination {
946    match dest {
947        Destination::Resource { resource, parent } => {
948            // When converting back to proto, we reconstruct the LocalizedResource
949            // If parent is None, we use empty string for node_id (local)
950            // If parent is Some(Node(...)), we extract the node_id
951            let (node_id, proto_parent) = match &parent {
952                Some(p) => {
953                    match &(**p) {
954                        Destination::Node(node) => {
955                            (node.clone(), parent.map(|p| Box::new((*p).clone().into())))
956                        }
957                        _ => {
958                            // If parent is another Resource, preserve it as-is
959                            (String::new(), parent.map(|p| Box::new((*p).clone().into())))
960                        }
961                    }
962                }
963                None => {
964                    // No parent, local resource
965                    (String::new(), None)
966                }
967            };
968
969            proto::primitives::Destination {
970                destination: Some(proto::primitives::destination::Destination::Resource(Box::new(
971                    proto::primitives::LocalizedResourceWithParent {
972                        resource: Some(LocalizedResource::new(node_id, resource).into()),
973                        parent: proto_parent,
974                    },
975                ))),
976            }
977        }
978        Destination::Node(node_id) => proto::primitives::Destination {
979            destination: Some(proto::primitives::destination::Destination::Node(node_id)),
980        },
981    }
982}
983
984// ========== Policy Conversions ==========
985
986impl From<Policy> for proto::primitives::Policy {
987    fn from(policy: Policy) -> Self {
988        proto::primitives::Policy {
989            confidentiality: match policy.is_confidential() {
990                false => proto::primitives::Confidentiality::Public as i32,
991                true => proto::primitives::Confidentiality::Secret as i32,
992            },
993            integrity: policy.get_integrity(),
994            deleted: policy.is_deleted(),
995            consent: policy.get_consent(),
996        }
997    }
998}
999
1000impl From<proto::primitives::Policy> for Policy {
1001    fn from(proto_policy: proto::primitives::Policy) -> Self {
1002        Policy::new(
1003            match proto_policy.confidentiality {
1004                x if x == proto::primitives::Confidentiality::Secret as i32 => {
1005                    ConfidentialityPolicy::Secret
1006                }
1007                _ => ConfidentialityPolicy::Public,
1008            },
1009            proto_policy.integrity,
1010            proto_policy.deleted.into(),
1011            proto_policy.consent,
1012        )
1013    }
1014}
1015
1016// ========== Resource-Policy Mapping Conversions ==========
1017
1018/// Converts Protocol Buffer MappedLocalizedPolicy to internal tuple.
1019impl From<proto::primitives::MappedLocalizedPolicy> for (LocalizedResource, Policy) {
1020    fn from(policy: proto::primitives::MappedLocalizedPolicy) -> Self {
1021        (
1022            policy.resource.map(|r| r.into()).unwrap_or_default(),
1023            policy.policy.map(|p| p.into()).unwrap_or_default(),
1024        )
1025    }
1026}
1027
1028// ========== Provenance References Conversions ==========
1029
1030/// Converts Protocol Buffer References to internal node-resources tuple.
1031impl From<proto::primitives::References> for (String, HashSet<Resource>) {
1032    fn from(references: proto::primitives::References) -> Self {
1033        (references.node, references.resources.into_iter().map(|r| r.into()).collect())
1034    }
1035}
1036
1037/// Converts Protocol Buffer References to LocalizedResource set.
1038impl From<proto::primitives::References> for HashSet<LocalizedResource> {
1039    fn from(references: proto::primitives::References) -> Self {
1040        references
1041            .resources
1042            .into_iter()
1043            .map(|r| LocalizedResource::new(references.node.clone(), r.into()))
1044            .collect()
1045    }
1046}
1047
1048/// Converts internal node-resources tuple to Protocol Buffer References.
1049impl From<(String, HashSet<Resource>)> for proto::primitives::References {
1050    fn from((node, resources): (String, HashSet<Resource>)) -> Self {
1051        proto::primitives::References {
1052            node,
1053            resources: resources.into_iter().map(|r| r.into()).collect(),
1054        }
1055    }
1056}
1057
1058// ========== M2M Protocol Buffer Conversions ==========
1059
1060/// Converts Protocol Buffer GetDestinationPolicy request to internal M2M request.
1061impl From<proto::messages::GetDestinationPolicy> for M2mRequest {
1062    fn from(req: proto::messages::GetDestinationPolicy) -> Self {
1063        M2mRequest::GetDestinationPolicy(req.destination.map(|d| d.into()).unwrap_or_default())
1064    }
1065}
1066
1067/// Converts Protocol Buffer UpdateProvenance request to internal M2M request.
1068impl From<proto::messages::UpdateProvenance> for M2mRequest {
1069    fn from(req: proto::messages::UpdateProvenance) -> Self {
1070        let source_prov: HashSet<LocalizedResource> = req
1071            .source_prov
1072            .into_iter()
1073            .flat_map(|refs: proto::primitives::References| {
1074                let node_id = refs.node.clone();
1075                refs.resources
1076                    .into_iter()
1077                    .map(move |r| LocalizedResource::new(node_id.clone(), r.into()))
1078            })
1079            .collect();
1080
1081        M2mRequest::UpdateProvenance {
1082            source_prov,
1083            destination: req.destination.map(|d| d.into()).unwrap_or_default(),
1084        }
1085    }
1086}
1087
1088/// Converts Protocol Buffer BroadcastDeletionRequest to internal M2M request.
1089impl From<proto::messages::BroadcastDeletionRequest> for M2mRequest {
1090    fn from(req: proto::messages::BroadcastDeletionRequest) -> Self {
1091        M2mRequest::BroadcastDeletion(req.resource.map(|r| r.into()).unwrap_or_default())
1092    }
1093}
1094
1095/// Converts Protocol Buffer CheckSourceCompliance request to internal M2M request.
1096impl From<proto::messages::CheckSourceCompliance> for M2mRequest {
1097    fn from(req: proto::messages::CheckSourceCompliance) -> Self {
1098        let sources: HashSet<LocalizedResource> =
1099            req.sources.into_iter().map(|s| s.into()).collect();
1100        let destination: LocalizedResource = req.destination.map(|d| d.into()).unwrap_or_default();
1101        let destination_policy: Policy =
1102            req.destination_policy.map(|p| p.into()).unwrap_or_default();
1103        M2mRequest::CheckSourceCompliance {
1104            sources,
1105            destination: (destination, destination_policy),
1106        }
1107    }
1108}
1109
1110/// Converts internal M2M DestinationPolicy response to Protocol Buffer response.
1111impl From<Policy> for proto::messages::DestinationPolicy {
1112    fn from(policy: Policy) -> Self {
1113        proto::messages::DestinationPolicy { policy: Some(policy.into()) }
1114    }
1115}
1116
1117// ========== O2M Protocol Buffer Conversions ==========
1118
1119/// Converts Protocol Buffer GetPoliciesRequest to internal O2M request.
1120impl From<proto::messages::GetPoliciesRequest> for O2mRequest {
1121    fn from(req: proto::messages::GetPoliciesRequest) -> Self {
1122        O2mRequest::GetPolicies(req.resources.into_iter().map(|r| r.into()).collect())
1123    }
1124}
1125
1126/// Converts Protocol Buffer SetPolicyRequest to internal O2M request.
1127impl From<proto::messages::SetPolicyRequest> for O2mRequest {
1128    fn from(req: proto::messages::SetPolicyRequest) -> Self {
1129        O2mRequest::SetPolicy {
1130            resource: req.resource.map(|r| r.into()).unwrap_or_default(),
1131            policy: req.policy.map(|p| p.into()).unwrap_or_default(),
1132        }
1133    }
1134}
1135
1136/// Converts Protocol Buffer SetConfidentialityRequest to internal O2M request.
1137impl From<proto::messages::SetConfidentialityRequest> for O2mRequest {
1138    fn from(req: proto::messages::SetConfidentialityRequest) -> Self {
1139        O2mRequest::SetConfidentiality {
1140            resource: req.resource.map(|r| r.into()).unwrap_or_default(),
1141            confidentiality: match req.confidentiality {
1142                x if x == proto::primitives::Confidentiality::Secret as i32 => {
1143                    ConfidentialityPolicy::Secret
1144                }
1145                _ => ConfidentialityPolicy::Public,
1146            },
1147        }
1148    }
1149}
1150
1151/// Converts Protocol Buffer SetIntegrityRequest to internal O2M request.
1152impl From<proto::messages::SetIntegrityRequest> for O2mRequest {
1153    fn from(req: proto::messages::SetIntegrityRequest) -> Self {
1154        O2mRequest::SetIntegrity {
1155            resource: req.resource.map(|r| r.into()).unwrap_or_default(),
1156            integrity: req.integrity,
1157        }
1158    }
1159}
1160
1161/// Converts Protocol Buffer SetDeletedRequest to internal O2M request.
1162impl From<proto::messages::SetDeletedRequest> for O2mRequest {
1163    fn from(req: proto::messages::SetDeletedRequest) -> Self {
1164        O2mRequest::SetDeleted(req.resource.map(|r| r.into()).unwrap_or_default())
1165    }
1166}
1167
1168/// Converts Protocol Buffer EnforceConsentRequest to internal O2M request.
1169impl From<proto::messages::EnforceConsentRequest> for O2mRequest {
1170    fn from(req: proto::messages::EnforceConsentRequest) -> Self {
1171        O2mRequest::EnforceConsent(req.resource.map(|r| r.into()).unwrap_or_default())
1172    }
1173}
1174
1175/// Converts Protocol Buffer SetConsentDecisionRequest to internal O2M request.
1176impl From<proto::messages::SetConsentDecisionRequest> for O2mRequest {
1177    fn from(req: proto::messages::SetConsentDecisionRequest) -> Self {
1178        O2mRequest::SetConsentDecision {
1179            source: req.source.map(|r| r.into()).unwrap_or_default(),
1180            destination: req
1181                .destination
1182                .map(|d| d.into())
1183                .unwrap_or_else(|| Destination::Node(String::new())),
1184            decision: req.decision,
1185        }
1186    }
1187}
1188
1189/// Converts Protocol Buffer GetReferencesRequest to internal O2M request.
1190impl From<proto::messages::GetReferencesRequest> for O2mRequest {
1191    fn from(req: proto::messages::GetReferencesRequest) -> Self {
1192        O2mRequest::GetReferences(req.resource.map(|r| r.into()).unwrap_or_default())
1193    }
1194}
1195
1196// ========== O2M Response Conversions ==========
1197
1198/// Converts internal resource-policy map to Protocol Buffer GetPoliciesResponse.
1199impl From<HashMap<Resource, Policy>> for proto::messages::GetPoliciesResponse {
1200    fn from(policies: HashMap<Resource, Policy>) -> Self {
1201        proto::messages::GetPoliciesResponse {
1202            policies: policies
1203                .into_iter()
1204                .map(|(resource, policy)| proto::primitives::MappedLocalizedPolicy {
1205                    resource: Some(LocalizedResource::new(String::new(), resource).into()),
1206                    policy: Some(policy.into()),
1207                })
1208                .collect(),
1209        }
1210    }
1211}
1212
1213/// Converts internal LocalizedResource-policy map to Protocol Buffer GetPoliciesResponse.
1214impl From<HashMap<LocalizedResource, Policy>> for proto::messages::GetPoliciesResponse {
1215    fn from(policies: HashMap<LocalizedResource, Policy>) -> Self {
1216        proto::messages::GetPoliciesResponse {
1217            policies: policies
1218                .into_iter()
1219                .map(|(resource, policy)| proto::primitives::MappedLocalizedPolicy {
1220                    resource: Some(resource.into()),
1221                    policy: Some(policy.into()),
1222                })
1223                .collect(),
1224        }
1225    }
1226}
1227
1228/// Converts internal provenance references to Protocol Buffer GetReferencesResponse.
1229impl From<HashMap<String, HashSet<Resource>>> for proto::messages::GetReferencesResponse {
1230    fn from(references: HashMap<String, HashSet<Resource>>) -> Self {
1231        proto::messages::GetReferencesResponse {
1232            references: references
1233                .into_iter()
1234                .map(|(node, resources)| (node, resources).into())
1235                .collect(),
1236        }
1237    }
1238}