trace2e_core/transport/
grpc.rs

1//! # gRPC Transport Implementation
2//!
3//! This module provides the gRPC-based transport layer for distributed traceability
4//! operations in the trace2e framework. It implements both client and server
5//! functionality for machine-to-machine (M2M) communication using Protocol Buffers
6//! and the Tonic gRPC framework.
7//!
8//! ## Components
9//!
10//! - **M2mGrpc**: Client service for making outbound gRPC calls to remote middleware
11//! - **P2mHandler**: Server implementation that routes incoming gRPC requests for P2M operations
12//! - **M2mHandler**: Server implementation that routes incoming gRPC requests for M2M operations
13//! - **O2mHandler**: Server implementation that routes incoming gRPC requests for O2M operations
14//! - **Protocol Buffer Conversions**: Type conversions between internal and protobuf types
15//!
16//! ## Connection Management
17//!
18//! The gRPC client maintains a cache of connected remote clients to avoid
19//! repeated connection overhead. Connections are established on-demand and
20//! reused for subsequent requests to the same remote endpoint.
21//!
22//! ## Service Operations
23//!
24//! ### Process-to-Middleware (P2M)
25//! - Local process enrollment (file descriptors)
26//! - Remote process enrollment (network connections)
27//! - I/O request authorization
28//! - I/O operation reporting
29//!
30//! ### Machine-to-Machine (M2M)
31//! - Destination compliance policy retrieval
32//! - Source compliance policy retrieval
33//! - Provenance information updates
34//!
35//! ### Operator-to-Middleware (O2M)
36//! - Policy management
37//! - Confidentiality management
38//! - Integrity management
39//! - Deletion management
40//! - Consent management
41//! - Provenance information retrieval
42//!
43//! ## Protocol Buffer Integration
44//!
45//! The module includes comprehensive type conversions between the internal
46//! trace2e types and their Protocol Buffer representations, ensuring seamless
47//! serialization and deserialization across network boundaries.
48
49use std::{
50    collections::{HashMap, HashSet},
51    future::Future,
52    pin::Pin,
53    sync::Arc,
54    task::Poll,
55};
56
57use dashmap::DashMap;
58use futures::future::try_join_all;
59use tokio_stream::{StreamExt, wrappers::BroadcastStream};
60use tonic::{Request, Response, Status, transport::Channel};
61use tower::Service;
62use tracing::info;
63
64/// Default port for gRPC communication between trace2e middleware instances.
65pub const DEFAULT_GRPC_PORT: u16 = 50051;
66
67/// Protocol Buffer definitions and descriptor sets for the trace2e gRPC service.
68pub mod proto {
69    tonic::include_proto!("trace2e");
70    pub mod primitives {
71        tonic::include_proto!("trace2e.primitives");
72    }
73    pub mod messages {
74        tonic::include_proto!("trace2e.messages");
75    }
76
77    /// Pre-compiled Protocol Buffer descriptor set for service reflection.
78    pub const MIDDLEWARE_DESCRIPTOR_SET: &[u8] = include_bytes!("../../trace2e_descriptor.bin");
79}
80
81use crate::{
82    traceability::{
83        api::types::{M2mRequest, M2mResponse, O2mRequest, O2mResponse, P2mRequest, P2mResponse},
84        error::TraceabilityError,
85        infrastructure::naming::{
86            DisplayableResource, Fd, File, LocalizedResource, Process, Resource, Stream,
87        },
88        services::{
89            compliance::{ConfidentialityPolicy, Policy},
90            consent::Destination,
91        },
92    },
93    transport::eval_remote_ip,
94};
95
96/// Converts traceability errors to gRPC Status codes for wire transmission.
97impl From<TraceabilityError> for Status {
98    fn from(error: TraceabilityError) -> Self {
99        Status::internal(error.to_string())
100    }
101}
102
103/// gRPC client service for machine-to-machine communication.
104///
105/// `M2mGrpc` provides the client-side implementation for making gRPC calls to
106/// remote trace2e middleware instances. It manages connection pooling and
107/// handles the translation between internal M2M requests and gRPC protocol.
108///
109/// ## Connection Management
110///
111/// The service maintains a thread-safe cache of connected clients to avoid
112/// connection overhead. Connections are established lazily when first needed
113/// and reused for subsequent requests to the same remote endpoint.
114///
115/// ## Request Routing
116///
117/// The service automatically determines the target remote IP address from
118/// the request payload and routes the call to the appropriate endpoint.
119/// Network failures are reported as transport errors.
120#[derive(Default, Clone)]
121pub struct M2mGrpc {
122    /// Cache of established gRPC client connections indexed by remote IP address.
123    connected_remotes: Arc<DashMap<String, proto::m2m_client::M2mClient<Channel>>>,
124}
125
126impl M2mGrpc {
127    /// Establishes a new gRPC connection to a remote middleware instance.
128    ///
129    /// Creates a new client connection to the specified remote IP address
130    /// using the default gRPC port. The connection is cached for future use.
131    ///
132    /// # Arguments
133    ///
134    /// * `remote_ip` - The IP address of the remote middleware instance
135    ///
136    /// # Returns
137    ///
138    /// A connected gRPC client, or an error if connection fails.
139    ///
140    /// # Errors
141    ///
142    /// Returns `TransportFailedToContactRemote` if the connection cannot be established.
143    async fn connect_remote(
144        &self,
145        remote_ip: String,
146    ) -> Result<proto::m2m_client::M2mClient<Channel>, TraceabilityError> {
147        match proto::m2m_client::M2mClient::connect(format!(
148            "http://{remote_ip}:{DEFAULT_GRPC_PORT}"
149        ))
150        .await
151        {
152            Ok(client) => {
153                self.connected_remotes.insert(remote_ip, client.clone());
154                Ok(client)
155            }
156            Err(_) => Err(TraceabilityError::TransportFailedToContactRemote(remote_ip)),
157        }
158    }
159
160    /// Retrieves an existing cached gRPC client for the specified remote IP.
161    ///
162    /// # Arguments
163    ///
164    /// * `remote_ip` - The IP address to look up in the connection cache
165    ///
166    /// # Returns
167    ///
168    /// An existing client connection if available, None otherwise.
169    async fn get_client(&self, remote_ip: String) -> Option<proto::m2m_client::M2mClient<Channel>> {
170        self.connected_remotes.get(&remote_ip).map(|c| c.to_owned())
171    }
172
173    /// Retrieves an existing client or establishes a new connection if needed.
174    ///
175    /// This method first checks the connection cache and returns an existing
176    /// client if available. If no cached connection exists, it establishes
177    /// a new connection and caches it for future use.
178    ///
179    /// # Arguments
180    ///
181    /// * `remote_ip` - The IP address of the target middleware instance
182    ///
183    /// # Returns
184    ///
185    /// A ready-to-use gRPC client connection.
186    ///
187    /// # Errors
188    ///
189    /// Returns `TransportFailedToContactRemote` if connection establishment fails.
190    async fn get_client_or_connect(
191        &self,
192        remote_ip: String,
193    ) -> Result<proto::m2m_client::M2mClient<Channel>, TraceabilityError> {
194        match self.get_client(remote_ip.clone()).await {
195            Some(client) => Ok(client),
196            None => self.connect_remote(remote_ip).await,
197        }
198    }
199}
200
201impl Service<M2mRequest> for M2mGrpc {
202    type Response = M2mResponse;
203    type Error = TraceabilityError;
204    type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
205
206    fn poll_ready(&mut self, _: &mut std::task::Context<'_>) -> Poll<Result<(), Self::Error>> {
207        Poll::Ready(Ok(()))
208    }
209
210    fn call(&mut self, request: M2mRequest) -> Self::Future {
211        let this = self.clone();
212        Box::pin(async move {
213            match request.clone() {
214                M2mRequest::GetDestinationPolicy(destination) => {
215                    info!(
216                        destination = %destination,
217                        "[gRPC-client] GetDestinationPolicy"
218                    );
219                    let remote_ip = eval_remote_ip(request)?;
220                    let mut client = this.get_client_or_connect(remote_ip.clone()).await?;
221
222                    // Create the protobuf request
223                    let proto_req = proto::messages::GetDestinationPolicy {
224                        destination: Some(destination.into()),
225                    };
226
227                    // Make the gRPC call
228                    let response = client
229                        .m2m_destination_policy(Request::new(proto_req))
230                        .await
231                        .map_err(|_| TraceabilityError::TransportFailedToContactRemote(remote_ip))?
232                        .into_inner();
233                    Ok(M2mResponse::DestinationPolicy(
234                        response.policy.map(|policy| policy.into()).unwrap_or_default(),
235                    ))
236                }
237                M2mRequest::CheckSourceCompliance { sources, destination } => {
238                    info!(
239                        sources = %DisplayableResource::from(&sources),
240                        destination = %destination.0,
241                        destination_policy = ?destination.1,
242                        "[gRPC-client] CheckSourceCompliance"
243                    );
244                    // Create sources partition by node_id
245                    let sources_partition = sources.iter().fold(
246                        HashMap::new(),
247                        |mut partitions: HashMap<&String, HashSet<&LocalizedResource>>, lr| {
248                            partitions.entry(lr.node_id()).or_default().insert(lr);
249                            partitions
250                        },
251                    );
252
253                    let futures = sources_partition
254                        .into_iter()
255                        .map(|(node_id, sources)| {
256                            let this_clone = this.clone();
257                            let dest_resource = destination.0.clone();
258                            let dest_policy = destination.1.clone();
259                            async move {
260                                let mut client =
261                                    this_clone.get_client_or_connect(node_id.to_string()).await?;
262                                client
263                                    .m2m_check_source_compliance(Request::new(
264                                        proto::messages::CheckSourceCompliance {
265                                            sources: sources
266                                                .iter()
267                                                .map(|r| (**r).clone().into())
268                                                .collect(),
269                                            destination: Some((dest_resource).into()),
270                                            destination_policy: Some((dest_policy).into()),
271                                        },
272                                    ))
273                                    .await
274                                    .map_err(|_| {
275                                        TraceabilityError::TransportFailedToContactRemote(
276                                            node_id.to_string(),
277                                        )
278                                    })
279                            }
280                        })
281                        .collect::<Vec<_>>();
282
283                    // Collect all results and return error if any failed
284                    try_join_all(futures).await?;
285                    Ok(M2mResponse::Ack)
286                }
287                M2mRequest::UpdateProvenance { source_prov, destination } => {
288                    info!(
289                        source_prov = %DisplayableResource::from(&source_prov),
290                        destination = %destination,
291                        "[gRPC-client] UpdateProvenance"
292                    );
293                    let remote_ip = eval_remote_ip(request)?;
294                    let mut client = this.get_client_or_connect(remote_ip.clone()).await?;
295
296                    // Group LocalizedResources by node_id
297                    let mut grouped: HashMap<String, Vec<proto::primitives::Resource>> =
298                        HashMap::default();
299                    for lr in source_prov {
300                        grouped
301                            .entry(lr.node_id().clone())
302                            .or_default()
303                            .push(lr.resource().clone().into());
304                    }
305
306                    // Convert to Vec<References>
307                    let source_prov_proto: Vec<proto::primitives::References> = grouped
308                        .into_iter()
309                        .map(|(node, resources)| proto::primitives::References { node, resources })
310                        .collect();
311
312                    // Create the protobuf request
313                    let proto_req = proto::messages::UpdateProvenance {
314                        source_prov: source_prov_proto,
315                        destination: Some(destination.into()),
316                    };
317
318                    // Make the gRPC call
319                    client.m2m_update_provenance(Request::new(proto_req)).await.map_err(|_| {
320                        TraceabilityError::TransportFailedToContactRemote(remote_ip)
321                    })?;
322
323                    Ok(M2mResponse::Ack)
324                }
325                M2mRequest::BroadcastDeletion(resource) => {
326                    info!(
327                        resource = %resource,
328                        "[gRPC-client] BroadcastDeletion"
329                    );
330                    let remote_ip = eval_remote_ip(request)?;
331                    let mut client = this.get_client_or_connect(remote_ip.clone()).await?;
332
333                    // Create the protobuf request
334                    let proto_req = proto::messages::BroadcastDeletionRequest {
335                        resource: Some(resource.into()),
336                    };
337
338                    // Make the gRPC call
339                    client.m2m_broadcast_deletion(Request::new(proto_req)).await.map_err(|_| {
340                        TraceabilityError::TransportFailedToContactRemote(remote_ip)
341                    })?;
342
343                    Ok(M2mResponse::Ack)
344                }
345            }
346        })
347    }
348}
349
350/// gRPC server router that handles incoming requests and routes them to appropriate services.
351///
352/// `Trace2eRouter` implements the gRPC server-side logic by accepting incoming
353/// requests and routing them to the appropriate process-to-middleware (P2M) or
354/// machine-to-machine (M2M) service handlers.
355///
356/// ## Type Parameters
357///
358/// * `P2mApi` - Service handling process-to-middleware requests
359/// * `M2mApi` - Service handling machine-to-machine requests
360///
361/// ## Request Routing
362///
363/// The router translates incoming Protocol Buffer requests to internal API
364/// types, calls the appropriate service, and converts responses back to
365/// Protocol Buffer format for transmission.
366pub struct P2mHandler<P2mApi> {
367    /// Process-to-middleware service handler.
368    p2m: P2mApi,
369}
370
371impl<P2mApi> P2mHandler<P2mApi> {
372    /// Creates a new router with the specified service handlers.
373    ///
374    /// # Arguments
375    ///
376    /// * `p2m` - Service for handling process-to-middleware requests
377    pub fn new(p2m: P2mApi) -> Self {
378        Self { p2m }
379    }
380}
381
382/// Implementation of the trace2e gRPC service protocol.
383///
384/// This implementation provides the server-side handlers for all gRPC endpoints
385/// defined in the trace2e protocol. It handles both P2M (process-to-middleware)
386/// and M2M (machine-to-machine) operations by delegating to the appropriate
387/// internal service handlers.
388#[tonic::async_trait]
389impl<P2mApi> proto::p2m_server::P2m for P2mHandler<P2mApi>
390where
391    P2mApi: Service<P2mRequest, Response = P2mResponse, Error = TraceabilityError>
392        + Clone
393        + Sync
394        + Send
395        + 'static,
396    P2mApi::Future: Send,
397{
398    /// Handles local process enrollment requests.
399    ///
400    /// Registers a local file descriptor with the middleware for tracking.
401    /// This is called when a process opens a local file or resource.
402    async fn p2m_local_enroll(
403        &self,
404        request: Request<proto::messages::LocalCt>,
405    ) -> Result<Response<proto::messages::Ack>, Status> {
406        let req = request.into_inner();
407        let mut p2m = self.p2m.clone();
408        match p2m
409            .call(P2mRequest::LocalEnroll {
410                pid: req.process_id,
411                fd: req.file_descriptor,
412                path: req.path,
413            })
414            .await?
415        {
416            P2mResponse::Ack => Ok(Response::new(proto::messages::Ack {})),
417            _ => Err(Status::internal("Internal traceability API error")),
418        }
419    }
420
421    /// Handles remote process enrollment requests.
422    ///
423    /// Registers a network connection (socket) with the middleware for tracking.
424    /// This is called when a process establishes a network connection.
425    async fn p2m_remote_enroll(
426        &self,
427        request: Request<proto::messages::RemoteCt>,
428    ) -> Result<Response<proto::messages::Ack>, Status> {
429        let req = request.into_inner();
430        let mut p2m = self.p2m.clone();
431        match p2m
432            .call(P2mRequest::RemoteEnroll {
433                pid: req.process_id,
434                fd: req.file_descriptor,
435                local_socket: req.local_socket,
436                peer_socket: req.peer_socket,
437            })
438            .await?
439        {
440            P2mResponse::Ack => Ok(Response::new(proto::messages::Ack {})),
441            _ => Err(Status::internal("Internal traceability API error")),
442        }
443    }
444
445    /// Handles I/O authorization requests from processes.
446    ///
447    /// Evaluates whether a process is authorized to perform an I/O operation
448    /// on a specific file descriptor. Returns a grant ID if authorized.
449    async fn p2m_io_request(
450        &self,
451        request: Request<proto::messages::IoInfo>,
452    ) -> Result<Response<proto::messages::Grant>, Status> {
453        let req = request.into_inner();
454        let mut p2m = self.p2m.clone();
455        match p2m
456            .call(P2mRequest::IoRequest {
457                pid: req.process_id,
458                fd: req.file_descriptor,
459                output: req.flow == proto::primitives::Flow::Output as i32,
460            })
461            .await?
462        {
463            P2mResponse::Grant(id) => {
464                Ok(Response::new(proto::messages::Grant { id: id.to_string() }))
465            }
466            _ => Err(Status::internal("Internal traceability API error")),
467        }
468    }
469
470    /// Handles I/O operation completion reports from processes.
471    ///
472    /// Records the completion and result of an I/O operation that was
473    /// previously authorized. This completes the audit trail for the operation.
474    async fn p2m_io_report(
475        &self,
476        request: Request<proto::messages::IoResult>,
477    ) -> Result<Response<proto::messages::Ack>, Status> {
478        let req = request.into_inner();
479        let mut p2m = self.p2m.clone();
480        match p2m
481            .call(P2mRequest::IoReport {
482                pid: req.process_id,
483                fd: req.file_descriptor,
484                grant_id: req.grant_id.parse::<u128>().unwrap_or_default(),
485                result: req.result,
486            })
487            .await?
488        {
489            P2mResponse::Ack => Ok(Response::new(proto::messages::Ack {})),
490            _ => Err(Status::internal("Internal traceability API error")),
491        }
492    }
493}
494pub struct M2mHandler<M2mApi> {
495    /// Machine-to-machine service handler.
496    m2m: M2mApi,
497}
498
499impl<M2mApi> M2mHandler<M2mApi> {
500    /// Creates a new router with the specified service handlers.
501    ///
502    /// # Arguments
503    ///
504    /// * `m2m` - Service for handling machine-to-machine requests
505    pub fn new(m2m: M2mApi) -> Self {
506        Self { m2m }
507    }
508}
509
510#[tonic::async_trait]
511impl<M2mApi> proto::m2m_server::M2m for M2mHandler<M2mApi>
512where
513    M2mApi: Service<M2mRequest, Response = M2mResponse, Error = TraceabilityError>
514        + Clone
515        + Sync
516        + Send
517        + 'static,
518    M2mApi::Future: Send,
519{
520    /// Handles destination policy requests from remote middleware.
521    ///
522    /// Returns the compliance policy for a destination resource to enable
523    /// remote middleware instances to evaluate flow authorization.
524    async fn m2m_destination_policy(
525        &self,
526        request: Request<proto::messages::GetDestinationPolicy>,
527    ) -> Result<Response<proto::messages::DestinationPolicy>, Status> {
528        info!("[gRPC-server] m2m_destination_policy");
529        let req = request.into_inner();
530        let mut m2m = self.m2m.clone();
531        match m2m.call(req.into()).await? {
532            M2mResponse::DestinationPolicy(policy) => Ok(Response::new(policy.into())),
533            _ => Err(Status::internal("Internal traceability API error")),
534        }
535    }
536
537    /// Handles source compliance checking requests from remote middleware.
538    ///
539    /// Checks compliance policies for a set of source resources to enable
540    /// distributed flow evaluation across multiple middleware instances.
541    async fn m2m_check_source_compliance(
542        &self,
543        request: Request<proto::messages::CheckSourceCompliance>,
544    ) -> Result<Response<proto::messages::Ack>, Status> {
545        info!("[gRPC-server] m2m_check_source_compliance");
546        let req = request.into_inner();
547        let mut m2m = self.m2m.clone();
548        m2m.call(req.into()).await?;
549        Ok(Response::new(proto::messages::Ack {}))
550    }
551
552    /// Handles provenance update requests from remote middleware.
553    ///
554    /// Updates the provenance information for a destination resource based
555    /// on data flows from remote sources. This maintains the audit trail
556    /// across distributed operations.
557    async fn m2m_update_provenance(
558        &self,
559        request: Request<proto::messages::UpdateProvenance>,
560    ) -> Result<Response<proto::messages::Ack>, Status> {
561        info!("[gRPC-server] m2m_update_provenance");
562        let req = request.into_inner();
563        let mut m2m = self.m2m.clone();
564        match m2m.call(req.into()).await? {
565            M2mResponse::Ack => Ok(Response::new(proto::messages::Ack {})),
566            _ => Err(Status::internal("Internal traceability API error")),
567        }
568    }
569
570    /// Handles deletion broadcast requests from remote middleware.
571    ///
572    /// Broadcasts the deletion of a resource to all middleware instances.
573    async fn m2m_broadcast_deletion(
574        &self,
575        request: Request<proto::messages::BroadcastDeletionRequest>,
576    ) -> Result<Response<proto::messages::Ack>, Status> {
577        info!("[gRPC-server] m2m_broadcast_deletion");
578        let req = request.into_inner();
579        let mut m2m = self.m2m.clone();
580        match m2m.call(req.into()).await? {
581            M2mResponse::Ack => Ok(Response::new(proto::messages::Ack {})),
582            _ => Err(Status::internal("Internal traceability API error")),
583        }
584    }
585}
586
587/// gRPC server handler for operator-to-middleware operations.
588///
589/// `O2mHandler` implements the gRPC server-side logic for administrative
590/// operations including policy management, provenance queries, and consent
591/// management by operators and compliance officers.
592///
593/// ## Type Parameters
594///
595/// * `O2mApi` - Service handling operator-to-middleware requests
596pub struct O2mHandler<O2mApi> {
597    /// Operator-to-middleware service handler.
598    o2m: O2mApi,
599}
600
601impl<O2mApi> O2mHandler<O2mApi> {
602    /// Creates a new O2M handler with the specified service handler.
603    ///
604    /// # Arguments
605    ///
606    /// * `o2m` - Service for handling operator-to-middleware requests
607    pub fn new(o2m: O2mApi) -> Self {
608        Self { o2m }
609    }
610}
611
612/// Implementation of the O2M gRPC service protocol.
613///
614/// This implementation provides the server-side handlers for all operator-facing
615/// endpoints including policy configuration and provenance analysis.
616#[tonic::async_trait]
617impl<O2mApi> proto::o2m_server::O2m for O2mHandler<O2mApi>
618where
619    O2mApi: Service<O2mRequest, Response = O2mResponse, Error = TraceabilityError>
620        + Clone
621        + Sync
622        + Send
623        + 'static,
624    O2mApi::Future: Send,
625{
626    /// Handles policy retrieval requests from operators.
627    ///
628    /// Returns the current compliance policies for the specified set of resources.
629    async fn o2m_get_policies(
630        &self,
631        request: Request<proto::messages::GetPoliciesRequest>,
632    ) -> Result<Response<proto::messages::GetPoliciesResponse>, Status> {
633        let req = request.into_inner();
634        let mut o2m = self.o2m.clone();
635        match o2m.call(req.into()).await? {
636            O2mResponse::Policies(policies) => Ok(Response::new(policies.into())),
637            _ => Err(Status::internal("Internal traceability API error")),
638        }
639    }
640
641    /// Handles policy update requests from operators.
642    ///
643    /// Sets a complete compliance policy for a specific resource.
644    async fn o2m_set_policy(
645        &self,
646        request: Request<proto::messages::SetPolicyRequest>,
647    ) -> Result<Response<proto::messages::Ack>, Status> {
648        let req = request.into_inner();
649        let mut o2m = self.o2m.clone();
650        match o2m.call(req.into()).await? {
651            O2mResponse::Ack => Ok(Response::new(proto::messages::Ack {})),
652            _ => Err(Status::internal("Internal traceability API error")),
653        }
654    }
655
656    /// Handles confidentiality setting requests from operators.
657    ///
658    /// Updates the confidentiality policy for a specific resource.
659    async fn o2m_set_confidentiality(
660        &self,
661        request: Request<proto::messages::SetConfidentialityRequest>,
662    ) -> Result<Response<proto::messages::Ack>, Status> {
663        let req = request.into_inner();
664        let mut o2m = self.o2m.clone();
665        match o2m.call(req.into()).await? {
666            O2mResponse::Ack => Ok(Response::new(proto::messages::Ack {})),
667            _ => Err(Status::internal("Internal traceability API error")),
668        }
669    }
670
671    /// Handles integrity setting requests from operators.
672    ///
673    /// Updates the integrity level for a specific resource.
674    async fn o2m_set_integrity(
675        &self,
676        request: Request<proto::messages::SetIntegrityRequest>,
677    ) -> Result<Response<proto::messages::Ack>, Status> {
678        let req = request.into_inner();
679        let mut o2m = self.o2m.clone();
680        match o2m.call(req.into()).await? {
681            O2mResponse::Ack => Ok(Response::new(proto::messages::Ack {})),
682            _ => Err(Status::internal("Internal traceability API error")),
683        }
684    }
685
686    /// Handles deletion marking requests from operators.
687    ///
688    /// Marks a resource as deleted for compliance tracking purposes.
689    async fn o2m_set_deleted(
690        &self,
691        request: Request<proto::messages::SetDeletedRequest>,
692    ) -> Result<Response<proto::messages::Ack>, Status> {
693        let req = request.into_inner();
694        let mut o2m = self.o2m.clone();
695        match o2m.call(req.into()).await? {
696            O2mResponse::Ack => Ok(Response::new(proto::messages::Ack {})),
697            _ => Err(Status::internal("Internal traceability API error")),
698        }
699    }
700
701    type O2MEnforceConsentStream = Pin<
702        Box<
703            dyn tokio_stream::Stream<Item = Result<proto::messages::ConsentNotification, Status>>
704                + Send,
705        >,
706    >;
707
708    /// Handles consent enforcement requests from operators.
709    ///
710    /// Enables consent enforcement for a resource and returns a stream of
711    /// consent request notifications that can be monitored by the operator.
712    async fn o2m_enforce_consent(
713        &self,
714        request: Request<proto::messages::EnforceConsentRequest>,
715    ) -> Result<Response<Self::O2MEnforceConsentStream>, Status> {
716        let req = request.into_inner();
717        let mut o2m = self.o2m.clone();
718        match o2m.call(req.into()).await? {
719            O2mResponse::Notifications(receiver) => {
720                // Convert the broadcast receiver into a stream of consent notifications
721                let stream = BroadcastStream::new(receiver).map(|result| {
722                    match result {
723                        Ok(destination) => {
724                            // Format destination as human-readable consent request message
725                            let consent_request =
726                                format!("Consent request for destination: {:?}", destination);
727                            Ok(proto::messages::ConsentNotification { consent_request })
728                        }
729                        Err(e) => {
730                            Err(Status::internal(format!("Notification stream error: {}", e)))
731                        }
732                    }
733                });
734                Ok(Response::new(Box::pin(stream)))
735            }
736            _ => Err(Status::internal("Internal traceability API error")),
737        }
738    }
739
740    /// Handles consent decision requests from operators.
741    ///
742    /// Sets the consent decision for a specific data flow operation.
743    async fn o2m_set_consent_decision(
744        &self,
745        request: Request<proto::messages::SetConsentDecisionRequest>,
746    ) -> Result<Response<proto::messages::Ack>, Status> {
747        let req = request.into_inner();
748        let mut o2m = self.o2m.clone();
749        match o2m.call(req.into()).await? {
750            O2mResponse::Ack => Ok(Response::new(proto::messages::Ack {})),
751            _ => Err(Status::internal("Internal traceability API error")),
752        }
753    }
754
755    /// Handles provenance query requests from operators.
756    ///
757    /// Returns the complete provenance lineage for a specific resource.
758    async fn o2m_get_references(
759        &self,
760        request: Request<proto::messages::GetReferencesRequest>,
761    ) -> Result<Response<proto::messages::GetReferencesResponse>, Status> {
762        let req = request.into_inner();
763        let mut o2m = self.o2m.clone();
764        match o2m.call(req.into()).await? {
765            O2mResponse::References(references) => {
766                // Group LocalizedResources by node_id
767                let mut grouped: HashMap<String, HashSet<Resource>> = HashMap::default();
768                for lr in references {
769                    grouped.entry(lr.node_id().clone()).or_default().insert(lr.resource().clone());
770                }
771                Ok(Response::new(grouped.into()))
772            }
773            _ => Err(Status::internal("Internal traceability API error")),
774        }
775    }
776}
777
778// ========== Protocol Buffer Type Conversions ==========
779// Fundamental type conversions for serialization/deserialization
780
781/// Converts Protocol Buffer Resource to internal Resource type.
782impl From<proto::primitives::Resource> for Resource {
783    fn from(req: proto::primitives::Resource) -> Self {
784        match req.resource {
785            Some(proto::primitives::resource::Resource::Fd(fd)) => Resource::Fd(fd.into()),
786            Some(proto::primitives::resource::Resource::Process(process)) => {
787                Resource::Process(process.into())
788            }
789            None => Resource::None,
790        }
791    }
792}
793
794/// Converts internal Resource to Protocol Buffer Resource type.
795impl From<Resource> for proto::primitives::Resource {
796    fn from(resource: Resource) -> Self {
797        match resource {
798            Resource::Fd(fd) => proto::primitives::Resource {
799                resource: Some(proto::primitives::resource::Resource::Fd(fd.into())),
800            },
801            Resource::Process(process) => proto::primitives::Resource {
802                resource: Some(proto::primitives::resource::Resource::Process(process.into())),
803            },
804            Resource::None => proto::primitives::Resource { resource: None },
805        }
806    }
807}
808
809impl From<proto::primitives::Fd> for Fd {
810    fn from(proto_fd: proto::primitives::Fd) -> Self {
811        match proto_fd.fd {
812            Some(proto::primitives::fd::Fd::File(file)) => Fd::File(file.into()),
813            Some(proto::primitives::fd::Fd::Stream(stream)) => Fd::Stream(stream.into()),
814            None => Fd::File(File { path: String::new() }),
815        }
816    }
817}
818
819impl From<Fd> for proto::primitives::Fd {
820    fn from(fd: Fd) -> Self {
821        match fd {
822            Fd::File(file) => {
823                proto::primitives::Fd { fd: Some(proto::primitives::fd::Fd::File(file.into())) }
824            }
825            Fd::Stream(stream) => {
826                proto::primitives::Fd { fd: Some(proto::primitives::fd::Fd::Stream(stream.into())) }
827            }
828        }
829    }
830}
831
832impl From<proto::primitives::File> for File {
833    fn from(proto_file: proto::primitives::File) -> Self {
834        File { path: proto_file.path }
835    }
836}
837
838impl From<File> for proto::primitives::File {
839    fn from(file: File) -> Self {
840        proto::primitives::File { path: file.path }
841    }
842}
843
844impl From<proto::primitives::Stream> for Stream {
845    fn from(proto_stream: proto::primitives::Stream) -> Self {
846        Stream { local_socket: proto_stream.local_socket, peer_socket: proto_stream.peer_socket }
847    }
848}
849
850impl From<Stream> for proto::primitives::Stream {
851    fn from(stream: Stream) -> Self {
852        proto::primitives::Stream {
853            local_socket: stream.local_socket,
854            peer_socket: stream.peer_socket,
855        }
856    }
857}
858
859impl From<proto::primitives::Process> for Process {
860    fn from(proto_process: proto::primitives::Process) -> Self {
861        Process {
862            pid: proto_process.pid,
863            starttime: proto_process.starttime,
864            exe_path: proto_process.exe_path,
865        }
866    }
867}
868
869impl From<Process> for proto::primitives::Process {
870    fn from(process: Process) -> Self {
871        proto::primitives::Process {
872            pid: process.pid,
873            starttime: process.starttime,
874            exe_path: process.exe_path,
875        }
876    }
877}
878
879// ========== LocalizedResource and Destination Conversions ==========
880
881/// Converts Protocol Buffer LocalizedResource to internal LocalizedResource type.
882impl From<proto::primitives::LocalizedResource> for LocalizedResource {
883    fn from(proto_lr: proto::primitives::LocalizedResource) -> Self {
884        LocalizedResource::new(
885            proto_lr.node_id,
886            proto_lr.resource.map(|r| r.into()).unwrap_or_default(),
887        )
888    }
889}
890
891/// Converts internal LocalizedResource to Protocol Buffer LocalizedResource type.
892impl From<LocalizedResource> for proto::primitives::LocalizedResource {
893    fn from(lr: LocalizedResource) -> Self {
894        proto::primitives::LocalizedResource {
895            node_id: lr.node_id().clone(),
896            resource: Some(lr.resource().clone().into()),
897        }
898    }
899}
900
901/// Converts Protocol Buffer Destination to internal Destination type.
902impl From<proto::primitives::Destination> for Destination {
903    fn from(proto_dest: proto::primitives::Destination) -> Self {
904        match proto_dest.destination {
905            Some(proto::primitives::destination::Destination::Resource(lr_with_parent)) => {
906                if let Some(proto_lr) = lr_with_parent.resource {
907                    let localized_resource: LocalizedResource = proto_lr.into();
908                    let parent = lr_with_parent.parent.map(|p| Box::new((*p).into()));
909                    // Preserve the LocalizedResource as a Resource with node_id in the parent
910                    Destination::Resource {
911                        resource: localized_resource.resource().clone(),
912                        parent,
913                    }
914                } else {
915                    Destination::Resource { resource: Resource::None, parent: None }
916                }
917            }
918            Some(proto::primitives::destination::Destination::Node(node_id)) => {
919                Destination::Node(node_id)
920            }
921            None => Destination::Node(String::new()),
922        }
923    }
924}
925
926/// Converts internal Destination to Protocol Buffer Destination type.
927/// Preserves LocalizedResource context by reconstructing the hierarchical structure.
928impl From<Destination> for proto::primitives::Destination {
929    fn from(dest: Destination) -> Self {
930        destination_to_proto_node_variant(dest)
931    }
932}
933
934/// Helper function that performs the actual Destination -> proto conversion logic.
935/// This can be reused by other modules that need the same conversion.
936pub fn destination_to_proto_node_variant(dest: Destination) -> proto::primitives::Destination {
937    match dest {
938        Destination::Resource { resource, parent } => {
939            // When converting back to proto, we reconstruct the LocalizedResource
940            // If parent is None, we use empty string for node_id (local)
941            // If parent is Some(Node(...)), we extract the node_id
942            let (node_id, proto_parent) = match &parent {
943                Some(p) => {
944                    match &(**p) {
945                        Destination::Node(node) => {
946                            (node.clone(), parent.map(|p| Box::new((*p).clone().into())))
947                        }
948                        _ => {
949                            // If parent is another Resource, preserve it as-is
950                            (String::new(), parent.map(|p| Box::new((*p).clone().into())))
951                        }
952                    }
953                }
954                None => {
955                    // No parent, local resource
956                    (String::new(), None)
957                }
958            };
959
960            proto::primitives::Destination {
961                destination: Some(proto::primitives::destination::Destination::Resource(Box::new(
962                    proto::primitives::LocalizedResourceWithParent {
963                        resource: Some(LocalizedResource::new(node_id, resource).into()),
964                        parent: proto_parent,
965                    },
966                ))),
967            }
968        }
969        Destination::Node(node_id) => proto::primitives::Destination {
970            destination: Some(proto::primitives::destination::Destination::Node(node_id)),
971        },
972    }
973}
974
975// ========== Policy Conversions ==========
976
977impl From<Policy> for proto::primitives::Policy {
978    fn from(policy: Policy) -> Self {
979        proto::primitives::Policy {
980            confidentiality: match policy.is_confidential() {
981                false => proto::primitives::Confidentiality::Public as i32,
982                true => proto::primitives::Confidentiality::Secret as i32,
983            },
984            integrity: policy.get_integrity(),
985            deleted: policy.is_deleted(),
986            consent: policy.get_consent(),
987        }
988    }
989}
990
991impl From<proto::primitives::Policy> for Policy {
992    fn from(proto_policy: proto::primitives::Policy) -> Self {
993        Policy::new(
994            match proto_policy.confidentiality {
995                x if x == proto::primitives::Confidentiality::Secret as i32 => {
996                    ConfidentialityPolicy::Secret
997                }
998                _ => ConfidentialityPolicy::Public,
999            },
1000            proto_policy.integrity,
1001            proto_policy.deleted.into(),
1002            proto_policy.consent,
1003        )
1004    }
1005}
1006
1007// ========== Resource-Policy Mapping Conversions ==========
1008
1009/// Converts Protocol Buffer MappedLocalizedPolicy to internal tuple.
1010impl From<proto::primitives::MappedLocalizedPolicy> for (LocalizedResource, Policy) {
1011    fn from(policy: proto::primitives::MappedLocalizedPolicy) -> Self {
1012        (
1013            policy.resource.map(|r| r.into()).unwrap_or_default(),
1014            policy.policy.map(|p| p.into()).unwrap_or_default(),
1015        )
1016    }
1017}
1018
1019// ========== Provenance References Conversions ==========
1020
1021/// Converts Protocol Buffer References to internal node-resources tuple.
1022impl From<proto::primitives::References> for (String, HashSet<Resource>) {
1023    fn from(references: proto::primitives::References) -> Self {
1024        (references.node, references.resources.into_iter().map(|r| r.into()).collect())
1025    }
1026}
1027
1028/// Converts Protocol Buffer References to LocalizedResource set.
1029impl From<proto::primitives::References> for HashSet<LocalizedResource> {
1030    fn from(references: proto::primitives::References) -> Self {
1031        references
1032            .resources
1033            .into_iter()
1034            .map(|r| LocalizedResource::new(references.node.clone(), r.into()))
1035            .collect()
1036    }
1037}
1038
1039/// Converts internal node-resources tuple to Protocol Buffer References.
1040impl From<(String, HashSet<Resource>)> for proto::primitives::References {
1041    fn from((node, resources): (String, HashSet<Resource>)) -> Self {
1042        proto::primitives::References {
1043            node,
1044            resources: resources.into_iter().map(|r| r.into()).collect(),
1045        }
1046    }
1047}
1048
1049// ========== M2M Protocol Buffer Conversions ==========
1050
1051/// Converts Protocol Buffer GetDestinationPolicy request to internal M2M request.
1052impl From<proto::messages::GetDestinationPolicy> for M2mRequest {
1053    fn from(req: proto::messages::GetDestinationPolicy) -> Self {
1054        M2mRequest::GetDestinationPolicy(req.destination.map(|d| d.into()).unwrap_or_default())
1055    }
1056}
1057
1058/// Converts Protocol Buffer UpdateProvenance request to internal M2M request.
1059impl From<proto::messages::UpdateProvenance> for M2mRequest {
1060    fn from(req: proto::messages::UpdateProvenance) -> Self {
1061        let source_prov: HashSet<LocalizedResource> = req
1062            .source_prov
1063            .into_iter()
1064            .flat_map(|refs: proto::primitives::References| {
1065                let node_id = refs.node.clone();
1066                refs.resources
1067                    .into_iter()
1068                    .map(move |r| LocalizedResource::new(node_id.clone(), r.into()))
1069            })
1070            .collect();
1071
1072        M2mRequest::UpdateProvenance {
1073            source_prov,
1074            destination: req.destination.map(|d| d.into()).unwrap_or_default(),
1075        }
1076    }
1077}
1078
1079/// Converts Protocol Buffer BroadcastDeletionRequest to internal M2M request.
1080impl From<proto::messages::BroadcastDeletionRequest> for M2mRequest {
1081    fn from(req: proto::messages::BroadcastDeletionRequest) -> Self {
1082        M2mRequest::BroadcastDeletion(req.resource.map(|r| r.into()).unwrap_or_default())
1083    }
1084}
1085
1086/// Converts Protocol Buffer CheckSourceCompliance request to internal M2M request.
1087impl From<proto::messages::CheckSourceCompliance> for M2mRequest {
1088    fn from(req: proto::messages::CheckSourceCompliance) -> Self {
1089        let sources: HashSet<LocalizedResource> =
1090            req.sources.into_iter().map(|s| s.into()).collect();
1091        let destination: LocalizedResource = req.destination.map(|d| d.into()).unwrap_or_default();
1092        let destination_policy: Policy =
1093            req.destination_policy.map(|p| p.into()).unwrap_or_default();
1094        M2mRequest::CheckSourceCompliance {
1095            sources,
1096            destination: (destination, destination_policy),
1097        }
1098    }
1099}
1100
1101/// Converts internal M2M DestinationPolicy response to Protocol Buffer response.
1102impl From<Policy> for proto::messages::DestinationPolicy {
1103    fn from(policy: Policy) -> Self {
1104        proto::messages::DestinationPolicy { policy: Some(policy.into()) }
1105    }
1106}
1107
1108// ========== O2M Protocol Buffer Conversions ==========
1109
1110/// Converts Protocol Buffer GetPoliciesRequest to internal O2M request.
1111impl From<proto::messages::GetPoliciesRequest> for O2mRequest {
1112    fn from(req: proto::messages::GetPoliciesRequest) -> Self {
1113        O2mRequest::GetPolicies(req.resources.into_iter().map(|r| r.into()).collect())
1114    }
1115}
1116
1117/// Converts Protocol Buffer SetPolicyRequest to internal O2M request.
1118impl From<proto::messages::SetPolicyRequest> for O2mRequest {
1119    fn from(req: proto::messages::SetPolicyRequest) -> Self {
1120        O2mRequest::SetPolicy {
1121            resource: req.resource.map(|r| r.into()).unwrap_or_default(),
1122            policy: req.policy.map(|p| p.into()).unwrap_or_default(),
1123        }
1124    }
1125}
1126
1127/// Converts Protocol Buffer SetConfidentialityRequest to internal O2M request.
1128impl From<proto::messages::SetConfidentialityRequest> for O2mRequest {
1129    fn from(req: proto::messages::SetConfidentialityRequest) -> Self {
1130        O2mRequest::SetConfidentiality {
1131            resource: req.resource.map(|r| r.into()).unwrap_or_default(),
1132            confidentiality: match req.confidentiality {
1133                x if x == proto::primitives::Confidentiality::Secret as i32 => {
1134                    ConfidentialityPolicy::Secret
1135                }
1136                _ => ConfidentialityPolicy::Public,
1137            },
1138        }
1139    }
1140}
1141
1142/// Converts Protocol Buffer SetIntegrityRequest to internal O2M request.
1143impl From<proto::messages::SetIntegrityRequest> for O2mRequest {
1144    fn from(req: proto::messages::SetIntegrityRequest) -> Self {
1145        O2mRequest::SetIntegrity {
1146            resource: req.resource.map(|r| r.into()).unwrap_or_default(),
1147            integrity: req.integrity,
1148        }
1149    }
1150}
1151
1152/// Converts Protocol Buffer SetDeletedRequest to internal O2M request.
1153impl From<proto::messages::SetDeletedRequest> for O2mRequest {
1154    fn from(req: proto::messages::SetDeletedRequest) -> Self {
1155        O2mRequest::SetDeleted(req.resource.map(|r| r.into()).unwrap_or_default())
1156    }
1157}
1158
1159/// Converts Protocol Buffer EnforceConsentRequest to internal O2M request.
1160impl From<proto::messages::EnforceConsentRequest> for O2mRequest {
1161    fn from(req: proto::messages::EnforceConsentRequest) -> Self {
1162        O2mRequest::EnforceConsent(req.resource.map(|r| r.into()).unwrap_or_default())
1163    }
1164}
1165
1166/// Converts Protocol Buffer SetConsentDecisionRequest to internal O2M request.
1167impl From<proto::messages::SetConsentDecisionRequest> for O2mRequest {
1168    fn from(req: proto::messages::SetConsentDecisionRequest) -> Self {
1169        O2mRequest::SetConsentDecision {
1170            source: req.source.map(|r| r.into()).unwrap_or_default(),
1171            destination: req
1172                .destination
1173                .map(|d| d.into())
1174                .unwrap_or_else(|| Destination::Node(String::new())),
1175            decision: req.decision,
1176        }
1177    }
1178}
1179
1180/// Converts Protocol Buffer GetReferencesRequest to internal O2M request.
1181impl From<proto::messages::GetReferencesRequest> for O2mRequest {
1182    fn from(req: proto::messages::GetReferencesRequest) -> Self {
1183        O2mRequest::GetReferences(req.resource.map(|r| r.into()).unwrap_or_default())
1184    }
1185}
1186
1187// ========== O2M Response Conversions ==========
1188
1189/// Converts internal resource-policy map to Protocol Buffer GetPoliciesResponse.
1190impl From<HashMap<Resource, Policy>> for proto::messages::GetPoliciesResponse {
1191    fn from(policies: HashMap<Resource, Policy>) -> Self {
1192        proto::messages::GetPoliciesResponse {
1193            policies: policies
1194                .into_iter()
1195                .map(|(resource, policy)| proto::primitives::MappedLocalizedPolicy {
1196                    resource: Some(LocalizedResource::new(String::new(), resource).into()),
1197                    policy: Some(policy.into()),
1198                })
1199                .collect(),
1200        }
1201    }
1202}
1203
1204/// Converts internal LocalizedResource-policy map to Protocol Buffer GetPoliciesResponse.
1205impl From<HashMap<LocalizedResource, Policy>> for proto::messages::GetPoliciesResponse {
1206    fn from(policies: HashMap<LocalizedResource, Policy>) -> Self {
1207        proto::messages::GetPoliciesResponse {
1208            policies: policies
1209                .into_iter()
1210                .map(|(resource, policy)| proto::primitives::MappedLocalizedPolicy {
1211                    resource: Some(resource.into()),
1212                    policy: Some(policy.into()),
1213                })
1214                .collect(),
1215        }
1216    }
1217}
1218
1219/// Converts internal provenance references to Protocol Buffer GetReferencesResponse.
1220impl From<HashMap<String, HashSet<Resource>>> for proto::messages::GetReferencesResponse {
1221    fn from(references: HashMap<String, HashSet<Resource>>) -> Self {
1222        proto::messages::GetReferencesResponse {
1223            references: references
1224                .into_iter()
1225                .map(|(node, resources)| (node, resources).into())
1226                .collect(),
1227        }
1228    }
1229}