trace2e_core/transport/
grpc.rs

1//! # gRPC Transport Implementation
2//!
3//! This module provides the gRPC-based transport layer for distributed traceability
4//! operations in the trace2e framework. It implements both client and server
5//! functionality for machine-to-machine (M2M) communication using Protocol Buffers
6//! and the Tonic gRPC framework.
7//!
8//! ## Components
9//!
10//! - **M2mGrpc**: Client service for making outbound gRPC calls to remote middleware
11//! - **P2mHandler**: Server implementation that routes incoming gRPC requests for P2M operations
12//! - **M2mHandler**: Server implementation that routes incoming gRPC requests for M2M operations
13//! - **O2mHandler**: Server implementation that routes incoming gRPC requests for O2M operations
14//! - **Protocol Buffer Conversions**: Type conversions between internal and protobuf types
15//!
16//! ## Connection Management
17//!
18//! The gRPC client maintains a cache of connected remote clients to avoid
19//! repeated connection overhead. Connections are established on-demand and
20//! reused for subsequent requests to the same remote endpoint.
21//!
22//! ## Service Operations
23//!
24//! ### Process-to-Middleware (P2M)
25//! - Local process enrollment (file descriptors)
26//! - Remote process enrollment (network connections)
27//! - I/O request authorization
28//! - I/O operation reporting
29//!
30//! ### Machine-to-Machine (M2M)
31//! - Destination compliance policy retrieval
32//! - Source compliance policy retrieval
33//! - Provenance information updates
34//!
35//! ### Operator-to-Middleware (O2M)
36//! - Policy management
37//! - Confidentiality management
38//! - Integrity management
39//! - Deletion management
40//! - Consent management
41//! - Provenance information retrieval
42//!
43//! ## Protocol Buffer Integration
44//!
45//! The module includes comprehensive type conversions between the internal
46//! trace2e types and their Protocol Buffer representations, ensuring seamless
47//! serialization and deserialization across network boundaries.
48
49use std::{
50    collections::{HashMap, HashSet},
51    future::Future,
52    pin::Pin,
53    sync::Arc,
54    task::Poll,
55};
56
57use dashmap::DashMap;
58use futures::future::try_join_all;
59use tokio_stream::{StreamExt, wrappers::BroadcastStream};
60use tonic::{Request, Response, Status, transport::Channel};
61use tower::Service;
62
63/// Default port for gRPC communication between trace2e middleware instances.
64pub const DEFAULT_GRPC_PORT: u16 = 50051;
65
66/// Protocol Buffer definitions and descriptor sets for the trace2e gRPC service.
67pub mod proto {
68    tonic::include_proto!("trace2e");
69    pub mod primitives {
70        tonic::include_proto!("trace2e.primitives");
71    }
72    pub mod messages {
73        tonic::include_proto!("trace2e.messages");
74    }
75
76    /// Pre-compiled Protocol Buffer descriptor set for service reflection.
77    pub const MIDDLEWARE_DESCRIPTOR_SET: &[u8] = include_bytes!("../../trace2e_descriptor.bin");
78}
79
80use crate::{
81    traceability::{
82        api::types::{M2mRequest, M2mResponse, O2mRequest, O2mResponse, P2mRequest, P2mResponse},
83        error::TraceabilityError,
84        infrastructure::naming::{Fd, File, LocalizedResource, Process, Resource, Stream},
85        services::{
86            compliance::{ConfidentialityPolicy, Policy},
87            consent::Destination,
88        },
89    },
90    transport::eval_remote_ip,
91};
92
93/// Converts traceability errors to gRPC Status codes for wire transmission.
94impl From<TraceabilityError> for Status {
95    fn from(error: TraceabilityError) -> Self {
96        Status::internal(error.to_string())
97    }
98}
99
100/// gRPC client service for machine-to-machine communication.
101///
102/// `M2mGrpc` provides the client-side implementation for making gRPC calls to
103/// remote trace2e middleware instances. It manages connection pooling and
104/// handles the translation between internal M2M requests and gRPC protocol.
105///
106/// ## Connection Management
107///
108/// The service maintains a thread-safe cache of connected clients to avoid
109/// connection overhead. Connections are established lazily when first needed
110/// and reused for subsequent requests to the same remote endpoint.
111///
112/// ## Request Routing
113///
114/// The service automatically determines the target remote IP address from
115/// the request payload and routes the call to the appropriate endpoint.
116/// Network failures are reported as transport errors.
117#[derive(Default, Clone)]
118pub struct M2mGrpc {
119    /// Cache of established gRPC client connections indexed by remote IP address.
120    connected_remotes: Arc<DashMap<String, proto::m2m_client::M2mClient<Channel>>>,
121}
122
123impl M2mGrpc {
124    /// Establishes a new gRPC connection to a remote middleware instance.
125    ///
126    /// Creates a new client connection to the specified remote IP address
127    /// using the default gRPC port. The connection is cached for future use.
128    ///
129    /// # Arguments
130    ///
131    /// * `remote_ip` - The IP address of the remote middleware instance
132    ///
133    /// # Returns
134    ///
135    /// A connected gRPC client, or an error if connection fails.
136    ///
137    /// # Errors
138    ///
139    /// Returns `TransportFailedToContactRemote` if the connection cannot be established.
140    async fn connect_remote(
141        &self,
142        remote_ip: String,
143    ) -> Result<proto::m2m_client::M2mClient<Channel>, TraceabilityError> {
144        match proto::m2m_client::M2mClient::connect(format!("{remote_ip}:{DEFAULT_GRPC_PORT}"))
145            .await
146        {
147            Ok(client) => {
148                self.connected_remotes.insert(remote_ip, client.clone());
149                Ok(client)
150            }
151            Err(_) => Err(TraceabilityError::TransportFailedToContactRemote(remote_ip)),
152        }
153    }
154
155    /// Retrieves an existing cached gRPC client for the specified remote IP.
156    ///
157    /// # Arguments
158    ///
159    /// * `remote_ip` - The IP address to look up in the connection cache
160    ///
161    /// # Returns
162    ///
163    /// An existing client connection if available, None otherwise.
164    async fn get_client(&self, remote_ip: String) -> Option<proto::m2m_client::M2mClient<Channel>> {
165        self.connected_remotes.get(&remote_ip).map(|c| c.to_owned())
166    }
167
168    /// Retrieves an existing client or establishes a new connection if needed.
169    ///
170    /// This method first checks the connection cache and returns an existing
171    /// client if available. If no cached connection exists, it establishes
172    /// a new connection and caches it for future use.
173    ///
174    /// # Arguments
175    ///
176    /// * `remote_ip` - The IP address of the target middleware instance
177    ///
178    /// # Returns
179    ///
180    /// A ready-to-use gRPC client connection.
181    ///
182    /// # Errors
183    ///
184    /// Returns `TransportFailedToContactRemote` if connection establishment fails.
185    async fn get_client_or_connect(
186        &self,
187        remote_ip: String,
188    ) -> Result<proto::m2m_client::M2mClient<Channel>, TraceabilityError> {
189        match self.get_client(remote_ip.clone()).await {
190            Some(client) => Ok(client),
191            None => self.connect_remote(remote_ip).await,
192        }
193    }
194}
195
196impl Service<M2mRequest> for M2mGrpc {
197    type Response = M2mResponse;
198    type Error = TraceabilityError;
199    type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
200
201    fn poll_ready(&mut self, _: &mut std::task::Context<'_>) -> Poll<Result<(), Self::Error>> {
202        Poll::Ready(Ok(()))
203    }
204
205    fn call(&mut self, request: M2mRequest) -> Self::Future {
206        let this = self.clone();
207        Box::pin(async move {
208            match request.clone() {
209                M2mRequest::GetDestinationPolicy(destination) => {
210                    let remote_ip = eval_remote_ip(request)?;
211                    let mut client = this.get_client_or_connect(remote_ip.clone()).await?;
212
213                    // Create the protobuf request
214                    let proto_req = proto::messages::GetDestinationPolicy {
215                        destination: Some(destination.into()),
216                    };
217
218                    // Make the gRPC call
219                    let response = client
220                        .m2m_destination_policy(Request::new(proto_req))
221                        .await
222                        .map_err(|_| TraceabilityError::TransportFailedToContactRemote(remote_ip))?
223                        .into_inner();
224                    Ok(M2mResponse::DestinationPolicy(
225                        response.policy.map(|policy| policy.into()).unwrap_or_default(),
226                    ))
227                }
228                M2mRequest::CheckSourceCompliance { sources, destination } => {
229                    // Create sources partition by node_id
230                    let sources_partition = sources.iter().fold(
231                        HashMap::new(),
232                        |mut partitions: HashMap<&String, HashSet<&LocalizedResource>>, lr| {
233                            partitions.entry(lr.node_id()).or_default().insert(lr);
234                            partitions
235                        },
236                    );
237
238                    let futures = sources_partition
239                        .into_iter()
240                        .map(|(node_id, sources)| {
241                            let this_clone = this.clone();
242                            let dest_resource = destination.0.clone();
243                            let dest_policy = destination.1.clone();
244                            async move {
245                                let mut client =
246                                    this_clone.get_client_or_connect(node_id.to_string()).await?;
247                                client
248                                    .m2m_check_source_compliance(Request::new(
249                                        proto::messages::CheckSourceCompliance {
250                                            sources: sources
251                                                .iter()
252                                                .map(|r| (**r).clone().into())
253                                                .collect(),
254                                            destination: Some((dest_resource).into()),
255                                            destination_policy: Some((dest_policy).into()),
256                                        },
257                                    ))
258                                    .await
259                                    .map_err(|_| {
260                                        TraceabilityError::TransportFailedToContactRemote(
261                                            node_id.to_string(),
262                                        )
263                                    })
264                            }
265                        })
266                        .collect::<Vec<_>>();
267
268                    // Collect all results and return error if any failed
269                    try_join_all(futures).await?;
270                    Ok(M2mResponse::Ack)
271                }
272                M2mRequest::UpdateProvenance { source_prov, destination } => {
273                    let remote_ip = eval_remote_ip(request)?;
274                    let mut client = this.get_client_or_connect(remote_ip.clone()).await?;
275
276                    // Group LocalizedResources by node_id
277                    let mut grouped: HashMap<String, Vec<proto::primitives::Resource>> =
278                        HashMap::default();
279                    for lr in source_prov {
280                        grouped
281                            .entry(lr.node_id().clone())
282                            .or_default()
283                            .push(lr.resource().clone().into());
284                    }
285
286                    // Convert to Vec<References>
287                    let source_prov_proto: Vec<proto::primitives::References> = grouped
288                        .into_iter()
289                        .map(|(node, resources)| proto::primitives::References { node, resources })
290                        .collect();
291
292                    // Create the protobuf request
293                    let proto_req = proto::messages::UpdateProvenance {
294                        source_prov: source_prov_proto,
295                        destination: Some(destination.into()),
296                    };
297
298                    // Make the gRPC call
299                    client.m2m_update_provenance(Request::new(proto_req)).await.map_err(|_| {
300                        TraceabilityError::TransportFailedToContactRemote(remote_ip)
301                    })?;
302
303                    Ok(M2mResponse::Ack)
304                }
305                M2mRequest::BroadcastDeletion(resource) => {
306                    let remote_ip = eval_remote_ip(request)?;
307                    let mut client = this.get_client_or_connect(remote_ip.clone()).await?;
308
309                    // Create the protobuf request
310                    let proto_req = proto::messages::BroadcastDeletionRequest {
311                        resource: Some(resource.into()),
312                    };
313
314                    // Make the gRPC call
315                    client.m2m_broadcast_deletion(Request::new(proto_req)).await.map_err(|_| {
316                        TraceabilityError::TransportFailedToContactRemote(remote_ip)
317                    })?;
318
319                    Ok(M2mResponse::Ack)
320                }
321            }
322        })
323    }
324}
325
326/// gRPC server router that handles incoming requests and routes them to appropriate services.
327///
328/// `Trace2eRouter` implements the gRPC server-side logic by accepting incoming
329/// requests and routing them to the appropriate process-to-middleware (P2M) or
330/// machine-to-machine (M2M) service handlers.
331///
332/// ## Type Parameters
333///
334/// * `P2mApi` - Service handling process-to-middleware requests
335/// * `M2mApi` - Service handling machine-to-machine requests
336///
337/// ## Request Routing
338///
339/// The router translates incoming Protocol Buffer requests to internal API
340/// types, calls the appropriate service, and converts responses back to
341/// Protocol Buffer format for transmission.
342pub struct P2mHandler<P2mApi> {
343    /// Process-to-middleware service handler.
344    p2m: P2mApi,
345}
346
347impl<P2mApi> P2mHandler<P2mApi> {
348    /// Creates a new router with the specified service handlers.
349    ///
350    /// # Arguments
351    ///
352    /// * `p2m` - Service for handling process-to-middleware requests
353    pub fn new(p2m: P2mApi) -> Self {
354        Self { p2m }
355    }
356}
357
358/// Implementation of the trace2e gRPC service protocol.
359///
360/// This implementation provides the server-side handlers for all gRPC endpoints
361/// defined in the trace2e protocol. It handles both P2M (process-to-middleware)
362/// and M2M (machine-to-machine) operations by delegating to the appropriate
363/// internal service handlers.
364#[tonic::async_trait]
365impl<P2mApi> proto::p2m_server::P2m for P2mHandler<P2mApi>
366where
367    P2mApi: Service<P2mRequest, Response = P2mResponse, Error = TraceabilityError>
368        + Clone
369        + Sync
370        + Send
371        + 'static,
372    P2mApi::Future: Send,
373{
374    /// Handles local process enrollment requests.
375    ///
376    /// Registers a local file descriptor with the middleware for tracking.
377    /// This is called when a process opens a local file or resource.
378    async fn p2m_local_enroll(
379        &self,
380        request: Request<proto::messages::LocalCt>,
381    ) -> Result<Response<proto::messages::Ack>, Status> {
382        let req = request.into_inner();
383        let mut p2m = self.p2m.clone();
384        match p2m
385            .call(P2mRequest::LocalEnroll {
386                pid: req.process_id,
387                fd: req.file_descriptor,
388                path: req.path,
389            })
390            .await?
391        {
392            P2mResponse::Ack => Ok(Response::new(proto::messages::Ack {})),
393            _ => Err(Status::internal("Internal traceability API error")),
394        }
395    }
396
397    /// Handles remote process enrollment requests.
398    ///
399    /// Registers a network connection (socket) with the middleware for tracking.
400    /// This is called when a process establishes a network connection.
401    async fn p2m_remote_enroll(
402        &self,
403        request: Request<proto::messages::RemoteCt>,
404    ) -> Result<Response<proto::messages::Ack>, Status> {
405        let req = request.into_inner();
406        let mut p2m = self.p2m.clone();
407        match p2m
408            .call(P2mRequest::RemoteEnroll {
409                pid: req.process_id,
410                fd: req.file_descriptor,
411                local_socket: req.local_socket,
412                peer_socket: req.peer_socket,
413            })
414            .await?
415        {
416            P2mResponse::Ack => Ok(Response::new(proto::messages::Ack {})),
417            _ => Err(Status::internal("Internal traceability API error")),
418        }
419    }
420
421    /// Handles I/O authorization requests from processes.
422    ///
423    /// Evaluates whether a process is authorized to perform an I/O operation
424    /// on a specific file descriptor. Returns a grant ID if authorized.
425    async fn p2m_io_request(
426        &self,
427        request: Request<proto::messages::IoInfo>,
428    ) -> Result<Response<proto::messages::Grant>, Status> {
429        let req = request.into_inner();
430        let mut p2m = self.p2m.clone();
431        match p2m
432            .call(P2mRequest::IoRequest {
433                pid: req.process_id,
434                fd: req.file_descriptor,
435                output: req.flow == proto::primitives::Flow::Output as i32,
436            })
437            .await?
438        {
439            P2mResponse::Grant(id) => {
440                Ok(Response::new(proto::messages::Grant { id: id.to_string() }))
441            }
442            _ => Err(Status::internal("Internal traceability API error")),
443        }
444    }
445
446    /// Handles I/O operation completion reports from processes.
447    ///
448    /// Records the completion and result of an I/O operation that was
449    /// previously authorized. This completes the audit trail for the operation.
450    async fn p2m_io_report(
451        &self,
452        request: Request<proto::messages::IoResult>,
453    ) -> Result<Response<proto::messages::Ack>, Status> {
454        let req = request.into_inner();
455        let mut p2m = self.p2m.clone();
456        match p2m
457            .call(P2mRequest::IoReport {
458                pid: req.process_id,
459                fd: req.file_descriptor,
460                grant_id: req.grant_id.parse::<u128>().unwrap_or_default(),
461                result: req.result,
462            })
463            .await?
464        {
465            P2mResponse::Ack => Ok(Response::new(proto::messages::Ack {})),
466            _ => Err(Status::internal("Internal traceability API error")),
467        }
468    }
469}
470pub struct M2mHandler<M2mApi> {
471    /// Machine-to-machine service handler.
472    m2m: M2mApi,
473}
474
475impl<M2mApi> M2mHandler<M2mApi> {
476    /// Creates a new router with the specified service handlers.
477    ///
478    /// # Arguments
479    ///
480    /// * `m2m` - Service for handling machine-to-machine requests
481    pub fn new(m2m: M2mApi) -> Self {
482        Self { m2m }
483    }
484}
485
486#[tonic::async_trait]
487impl<M2mApi> proto::m2m_server::M2m for M2mHandler<M2mApi>
488where
489    M2mApi: Service<M2mRequest, Response = M2mResponse, Error = TraceabilityError>
490        + Clone
491        + Sync
492        + Send
493        + 'static,
494    M2mApi::Future: Send,
495{
496    /// Handles destination policy requests from remote middleware.
497    ///
498    /// Returns the compliance policy for a destination resource to enable
499    /// remote middleware instances to evaluate flow authorization.
500    async fn m2m_destination_policy(
501        &self,
502        request: Request<proto::messages::GetDestinationPolicy>,
503    ) -> Result<Response<proto::messages::DestinationPolicy>, Status> {
504        let req = request.into_inner();
505        let mut m2m = self.m2m.clone();
506        match m2m.call(req.into()).await? {
507            M2mResponse::DestinationPolicy(policy) => Ok(Response::new(policy.into())),
508            _ => Err(Status::internal("Internal traceability API error")),
509        }
510    }
511
512    /// Handles source compliance checking requests from remote middleware.
513    ///
514    /// Checks compliance policies for a set of source resources to enable
515    /// distributed flow evaluation across multiple middleware instances.
516    async fn m2m_check_source_compliance(
517        &self,
518        request: Request<proto::messages::CheckSourceCompliance>,
519    ) -> Result<Response<proto::messages::Ack>, Status> {
520        let req = request.into_inner();
521        let mut m2m = self.m2m.clone();
522        m2m.call(req.into()).await?;
523        Ok(Response::new(proto::messages::Ack {}))
524    }
525
526    /// Handles provenance update requests from remote middleware.
527    ///
528    /// Updates the provenance information for a destination resource based
529    /// on data flows from remote sources. This maintains the audit trail
530    /// across distributed operations.
531    async fn m2m_update_provenance(
532        &self,
533        request: Request<proto::messages::UpdateProvenance>,
534    ) -> Result<Response<proto::messages::Ack>, Status> {
535        let req = request.into_inner();
536        let mut m2m = self.m2m.clone();
537        match m2m.call(req.into()).await? {
538            M2mResponse::Ack => Ok(Response::new(proto::messages::Ack {})),
539            _ => Err(Status::internal("Internal traceability API error")),
540        }
541    }
542
543    /// Handles deletion broadcast requests from remote middleware.
544    ///
545    /// Broadcasts the deletion of a resource to all middleware instances.
546    async fn m2m_broadcast_deletion(
547        &self,
548        request: Request<proto::messages::BroadcastDeletionRequest>,
549    ) -> Result<Response<proto::messages::Ack>, Status> {
550        let req = request.into_inner();
551        let mut m2m = self.m2m.clone();
552        match m2m.call(req.into()).await? {
553            M2mResponse::Ack => Ok(Response::new(proto::messages::Ack {})),
554            _ => Err(Status::internal("Internal traceability API error")),
555        }
556    }
557}
558
559/// gRPC server handler for operator-to-middleware operations.
560///
561/// `O2mHandler` implements the gRPC server-side logic for administrative
562/// operations including policy management, provenance queries, and consent
563/// management by operators and compliance officers.
564///
565/// ## Type Parameters
566///
567/// * `O2mApi` - Service handling operator-to-middleware requests
568pub struct O2mHandler<O2mApi> {
569    /// Operator-to-middleware service handler.
570    o2m: O2mApi,
571}
572
573impl<O2mApi> O2mHandler<O2mApi> {
574    /// Creates a new O2M handler with the specified service handler.
575    ///
576    /// # Arguments
577    ///
578    /// * `o2m` - Service for handling operator-to-middleware requests
579    pub fn new(o2m: O2mApi) -> Self {
580        Self { o2m }
581    }
582}
583
584/// Implementation of the O2M gRPC service protocol.
585///
586/// This implementation provides the server-side handlers for all operator-facing
587/// endpoints including policy configuration and provenance analysis.
588#[tonic::async_trait]
589impl<O2mApi> proto::o2m_server::O2m for O2mHandler<O2mApi>
590where
591    O2mApi: Service<O2mRequest, Response = O2mResponse, Error = TraceabilityError>
592        + Clone
593        + Sync
594        + Send
595        + 'static,
596    O2mApi::Future: Send,
597{
598    /// Handles policy retrieval requests from operators.
599    ///
600    /// Returns the current compliance policies for the specified set of resources.
601    async fn o2m_get_policies(
602        &self,
603        request: Request<proto::messages::GetPoliciesRequest>,
604    ) -> Result<Response<proto::messages::GetPoliciesResponse>, Status> {
605        let req = request.into_inner();
606        let mut o2m = self.o2m.clone();
607        match o2m.call(req.into()).await? {
608            O2mResponse::Policies(policies) => Ok(Response::new(policies.into())),
609            _ => Err(Status::internal("Internal traceability API error")),
610        }
611    }
612
613    /// Handles policy update requests from operators.
614    ///
615    /// Sets a complete compliance policy for a specific resource.
616    async fn o2m_set_policy(
617        &self,
618        request: Request<proto::messages::SetPolicyRequest>,
619    ) -> Result<Response<proto::messages::Ack>, Status> {
620        let req = request.into_inner();
621        let mut o2m = self.o2m.clone();
622        match o2m.call(req.into()).await? {
623            O2mResponse::Ack => Ok(Response::new(proto::messages::Ack {})),
624            _ => Err(Status::internal("Internal traceability API error")),
625        }
626    }
627
628    /// Handles confidentiality setting requests from operators.
629    ///
630    /// Updates the confidentiality policy for a specific resource.
631    async fn o2m_set_confidentiality(
632        &self,
633        request: Request<proto::messages::SetConfidentialityRequest>,
634    ) -> Result<Response<proto::messages::Ack>, Status> {
635        let req = request.into_inner();
636        let mut o2m = self.o2m.clone();
637        match o2m.call(req.into()).await? {
638            O2mResponse::Ack => Ok(Response::new(proto::messages::Ack {})),
639            _ => Err(Status::internal("Internal traceability API error")),
640        }
641    }
642
643    /// Handles integrity setting requests from operators.
644    ///
645    /// Updates the integrity level for a specific resource.
646    async fn o2m_set_integrity(
647        &self,
648        request: Request<proto::messages::SetIntegrityRequest>,
649    ) -> Result<Response<proto::messages::Ack>, Status> {
650        let req = request.into_inner();
651        let mut o2m = self.o2m.clone();
652        match o2m.call(req.into()).await? {
653            O2mResponse::Ack => Ok(Response::new(proto::messages::Ack {})),
654            _ => Err(Status::internal("Internal traceability API error")),
655        }
656    }
657
658    /// Handles deletion marking requests from operators.
659    ///
660    /// Marks a resource as deleted for compliance tracking purposes.
661    async fn o2m_set_deleted(
662        &self,
663        request: Request<proto::messages::SetDeletedRequest>,
664    ) -> Result<Response<proto::messages::Ack>, Status> {
665        let req = request.into_inner();
666        let mut o2m = self.o2m.clone();
667        match o2m.call(req.into()).await? {
668            O2mResponse::Ack => Ok(Response::new(proto::messages::Ack {})),
669            _ => Err(Status::internal("Internal traceability API error")),
670        }
671    }
672
673    type O2MEnforceConsentStream = Pin<
674        Box<
675            dyn tokio_stream::Stream<Item = Result<proto::messages::ConsentNotification, Status>>
676                + Send,
677        >,
678    >;
679
680    /// Handles consent enforcement requests from operators.
681    ///
682    /// Enables consent enforcement for a resource and returns a stream of
683    /// consent request notifications that can be monitored by the operator.
684    async fn o2m_enforce_consent(
685        &self,
686        request: Request<proto::messages::EnforceConsentRequest>,
687    ) -> Result<Response<Self::O2MEnforceConsentStream>, Status> {
688        let req = request.into_inner();
689        let mut o2m = self.o2m.clone();
690        match o2m.call(req.into()).await? {
691            O2mResponse::Notifications(receiver) => {
692                // Convert the broadcast receiver into a stream of consent notifications
693                let stream = BroadcastStream::new(receiver).map(|result| {
694                    match result {
695                        Ok(destination) => {
696                            // Format destination as human-readable consent request message
697                            let consent_request =
698                                format!("Consent request for destination: {:?}", destination);
699                            Ok(proto::messages::ConsentNotification { consent_request })
700                        }
701                        Err(e) => {
702                            Err(Status::internal(format!("Notification stream error: {}", e)))
703                        }
704                    }
705                });
706                Ok(Response::new(Box::pin(stream)))
707            }
708            _ => Err(Status::internal("Internal traceability API error")),
709        }
710    }
711
712    /// Handles consent decision requests from operators.
713    ///
714    /// Sets the consent decision for a specific data flow operation.
715    async fn o2m_set_consent_decision(
716        &self,
717        request: Request<proto::messages::SetConsentDecisionRequest>,
718    ) -> Result<Response<proto::messages::Ack>, Status> {
719        let req = request.into_inner();
720        let mut o2m = self.o2m.clone();
721        match o2m.call(req.into()).await? {
722            O2mResponse::Ack => Ok(Response::new(proto::messages::Ack {})),
723            _ => Err(Status::internal("Internal traceability API error")),
724        }
725    }
726
727    /// Handles provenance query requests from operators.
728    ///
729    /// Returns the complete provenance lineage for a specific resource.
730    async fn o2m_get_references(
731        &self,
732        request: Request<proto::messages::GetReferencesRequest>,
733    ) -> Result<Response<proto::messages::GetReferencesResponse>, Status> {
734        let req = request.into_inner();
735        let mut o2m = self.o2m.clone();
736        match o2m.call(req.into()).await? {
737            O2mResponse::References(references) => {
738                // Group LocalizedResources by node_id
739                let mut grouped: HashMap<String, HashSet<Resource>> = HashMap::default();
740                for lr in references {
741                    grouped.entry(lr.node_id().clone()).or_default().insert(lr.resource().clone());
742                }
743                Ok(Response::new(grouped.into()))
744            }
745            _ => Err(Status::internal("Internal traceability API error")),
746        }
747    }
748}
749
750// ========== Protocol Buffer Type Conversions ==========
751// Fundamental type conversions for serialization/deserialization
752
753/// Converts Protocol Buffer Resource to internal Resource type.
754impl From<proto::primitives::Resource> for Resource {
755    fn from(req: proto::primitives::Resource) -> Self {
756        match req.resource {
757            Some(proto::primitives::resource::Resource::Fd(fd)) => Resource::Fd(fd.into()),
758            Some(proto::primitives::resource::Resource::Process(process)) => {
759                Resource::Process(process.into())
760            }
761            None => Resource::None,
762        }
763    }
764}
765
766/// Converts internal Resource to Protocol Buffer Resource type.
767impl From<Resource> for proto::primitives::Resource {
768    fn from(resource: Resource) -> Self {
769        match resource {
770            Resource::Fd(fd) => proto::primitives::Resource {
771                resource: Some(proto::primitives::resource::Resource::Fd(fd.into())),
772            },
773            Resource::Process(process) => proto::primitives::Resource {
774                resource: Some(proto::primitives::resource::Resource::Process(process.into())),
775            },
776            Resource::None => proto::primitives::Resource { resource: None },
777        }
778    }
779}
780
781impl From<proto::primitives::Fd> for Fd {
782    fn from(proto_fd: proto::primitives::Fd) -> Self {
783        match proto_fd.fd {
784            Some(proto::primitives::fd::Fd::File(file)) => Fd::File(file.into()),
785            Some(proto::primitives::fd::Fd::Stream(stream)) => Fd::Stream(stream.into()),
786            None => Fd::File(File { path: String::new() }),
787        }
788    }
789}
790
791impl From<Fd> for proto::primitives::Fd {
792    fn from(fd: Fd) -> Self {
793        match fd {
794            Fd::File(file) => {
795                proto::primitives::Fd { fd: Some(proto::primitives::fd::Fd::File(file.into())) }
796            }
797            Fd::Stream(stream) => {
798                proto::primitives::Fd { fd: Some(proto::primitives::fd::Fd::Stream(stream.into())) }
799            }
800        }
801    }
802}
803
804impl From<proto::primitives::File> for File {
805    fn from(proto_file: proto::primitives::File) -> Self {
806        File { path: proto_file.path }
807    }
808}
809
810impl From<File> for proto::primitives::File {
811    fn from(file: File) -> Self {
812        proto::primitives::File { path: file.path }
813    }
814}
815
816impl From<proto::primitives::Stream> for Stream {
817    fn from(proto_stream: proto::primitives::Stream) -> Self {
818        Stream { local_socket: proto_stream.local_socket, peer_socket: proto_stream.peer_socket }
819    }
820}
821
822impl From<Stream> for proto::primitives::Stream {
823    fn from(stream: Stream) -> Self {
824        proto::primitives::Stream {
825            local_socket: stream.local_socket,
826            peer_socket: stream.peer_socket,
827        }
828    }
829}
830
831impl From<proto::primitives::Process> for Process {
832    fn from(proto_process: proto::primitives::Process) -> Self {
833        Process {
834            pid: proto_process.pid,
835            starttime: proto_process.starttime,
836            exe_path: proto_process.exe_path,
837        }
838    }
839}
840
841impl From<Process> for proto::primitives::Process {
842    fn from(process: Process) -> Self {
843        proto::primitives::Process {
844            pid: process.pid,
845            starttime: process.starttime,
846            exe_path: process.exe_path,
847        }
848    }
849}
850
851// ========== LocalizedResource and Destination Conversions ==========
852
853/// Converts Protocol Buffer LocalizedResource to internal LocalizedResource type.
854impl From<proto::primitives::LocalizedResource> for LocalizedResource {
855    fn from(proto_lr: proto::primitives::LocalizedResource) -> Self {
856        LocalizedResource::new(
857            proto_lr.node_id,
858            proto_lr.resource.map(|r| r.into()).unwrap_or_default(),
859        )
860    }
861}
862
863/// Converts internal LocalizedResource to Protocol Buffer LocalizedResource type.
864impl From<LocalizedResource> for proto::primitives::LocalizedResource {
865    fn from(lr: LocalizedResource) -> Self {
866        proto::primitives::LocalizedResource {
867            node_id: lr.node_id().clone(),
868            resource: Some(lr.resource().clone().into()),
869        }
870    }
871}
872
873/// Converts Protocol Buffer Destination to internal Destination type.
874impl From<proto::primitives::Destination> for Destination {
875    fn from(proto_dest: proto::primitives::Destination) -> Self {
876        match proto_dest.destination {
877            Some(proto::primitives::destination::Destination::Resource(lr_with_parent)) => {
878                if let Some(proto_lr) = lr_with_parent.resource {
879                    let localized_resource: LocalizedResource = proto_lr.into();
880                    let parent = lr_with_parent.parent.map(|p| Box::new((*p).into()));
881                    // Preserve the LocalizedResource as a Resource with node_id in the parent
882                    Destination::Resource {
883                        resource: localized_resource.resource().clone(),
884                        parent,
885                    }
886                } else {
887                    Destination::Resource { resource: Resource::None, parent: None }
888                }
889            }
890            Some(proto::primitives::destination::Destination::Node(node_id)) => {
891                Destination::Node(node_id)
892            }
893            None => Destination::Node(String::new()),
894        }
895    }
896}
897
898/// Converts internal Destination to Protocol Buffer Destination type.
899/// Preserves LocalizedResource context by reconstructing the hierarchical structure.
900impl From<Destination> for proto::primitives::Destination {
901    fn from(dest: Destination) -> Self {
902        destination_to_proto_node_variant(dest)
903    }
904}
905
906/// Helper function that performs the actual Destination -> proto conversion logic.
907/// This can be reused by other modules that need the same conversion.
908pub fn destination_to_proto_node_variant(dest: Destination) -> proto::primitives::Destination {
909    match dest {
910        Destination::Resource { resource, parent } => {
911            // When converting back to proto, we reconstruct the LocalizedResource
912            // If parent is None, we use empty string for node_id (local)
913            // If parent is Some(Node(...)), we extract the node_id
914            let (node_id, proto_parent) = match &parent {
915                Some(p) => {
916                    match &(**p) {
917                        Destination::Node(node) => {
918                            (node.clone(), parent.map(|p| Box::new((*p).clone().into())))
919                        }
920                        _ => {
921                            // If parent is another Resource, preserve it as-is
922                            (String::new(), parent.map(|p| Box::new((*p).clone().into())))
923                        }
924                    }
925                }
926                None => {
927                    // No parent, local resource
928                    (String::new(), None)
929                }
930            };
931
932            proto::primitives::Destination {
933                destination: Some(proto::primitives::destination::Destination::Resource(Box::new(
934                    proto::primitives::LocalizedResourceWithParent {
935                        resource: Some(LocalizedResource::new(node_id, resource).into()),
936                        parent: proto_parent,
937                    },
938                ))),
939            }
940        }
941        Destination::Node(node_id) => proto::primitives::Destination {
942            destination: Some(proto::primitives::destination::Destination::Node(node_id)),
943        },
944    }
945}
946
947// ========== Policy Conversions ==========
948
949impl From<Policy> for proto::primitives::Policy {
950    fn from(policy: Policy) -> Self {
951        proto::primitives::Policy {
952            confidentiality: match policy.is_confidential() {
953                false => proto::primitives::Confidentiality::Public as i32,
954                true => proto::primitives::Confidentiality::Secret as i32,
955            },
956            integrity: policy.get_integrity(),
957            deleted: policy.is_deleted(),
958            consent: policy.get_consent(),
959        }
960    }
961}
962
963impl From<proto::primitives::Policy> for Policy {
964    fn from(proto_policy: proto::primitives::Policy) -> Self {
965        Policy::new(
966            match proto_policy.confidentiality {
967                x if x == proto::primitives::Confidentiality::Secret as i32 => {
968                    ConfidentialityPolicy::Secret
969                }
970                _ => ConfidentialityPolicy::Public,
971            },
972            proto_policy.integrity,
973            proto_policy.deleted.into(),
974            proto_policy.consent,
975        )
976    }
977}
978
979// ========== Resource-Policy Mapping Conversions ==========
980
981/// Converts Protocol Buffer MappedLocalizedPolicy to internal tuple.
982impl From<proto::primitives::MappedLocalizedPolicy> for (LocalizedResource, Policy) {
983    fn from(policy: proto::primitives::MappedLocalizedPolicy) -> Self {
984        (
985            policy.resource.map(|r| r.into()).unwrap_or_default(),
986            policy.policy.map(|p| p.into()).unwrap_or_default(),
987        )
988    }
989}
990
991// ========== Provenance References Conversions ==========
992
993/// Converts Protocol Buffer References to internal node-resources tuple.
994impl From<proto::primitives::References> for (String, HashSet<Resource>) {
995    fn from(references: proto::primitives::References) -> Self {
996        (references.node, references.resources.into_iter().map(|r| r.into()).collect())
997    }
998}
999
1000/// Converts Protocol Buffer References to LocalizedResource set.
1001impl From<proto::primitives::References> for HashSet<LocalizedResource> {
1002    fn from(references: proto::primitives::References) -> Self {
1003        references
1004            .resources
1005            .into_iter()
1006            .map(|r| LocalizedResource::new(references.node.clone(), r.into()))
1007            .collect()
1008    }
1009}
1010
1011/// Converts internal node-resources tuple to Protocol Buffer References.
1012impl From<(String, HashSet<Resource>)> for proto::primitives::References {
1013    fn from((node, resources): (String, HashSet<Resource>)) -> Self {
1014        proto::primitives::References {
1015            node,
1016            resources: resources.into_iter().map(|r| r.into()).collect(),
1017        }
1018    }
1019}
1020
1021// ========== M2M Protocol Buffer Conversions ==========
1022
1023/// Converts Protocol Buffer GetDestinationPolicy request to internal M2M request.
1024impl From<proto::messages::GetDestinationPolicy> for M2mRequest {
1025    fn from(req: proto::messages::GetDestinationPolicy) -> Self {
1026        M2mRequest::GetDestinationPolicy(req.destination.map(|d| d.into()).unwrap_or_default())
1027    }
1028}
1029
1030/// Converts Protocol Buffer UpdateProvenance request to internal M2M request.
1031impl From<proto::messages::UpdateProvenance> for M2mRequest {
1032    fn from(req: proto::messages::UpdateProvenance) -> Self {
1033        let source_prov: HashSet<LocalizedResource> = req
1034            .source_prov
1035            .into_iter()
1036            .flat_map(|refs: proto::primitives::References| {
1037                let node_id = refs.node.clone();
1038                refs.resources
1039                    .into_iter()
1040                    .map(move |r| LocalizedResource::new(node_id.clone(), r.into()))
1041            })
1042            .collect();
1043
1044        M2mRequest::UpdateProvenance {
1045            source_prov,
1046            destination: req.destination.map(|d| d.into()).unwrap_or_default(),
1047        }
1048    }
1049}
1050
1051/// Converts Protocol Buffer BroadcastDeletionRequest to internal M2M request.
1052impl From<proto::messages::BroadcastDeletionRequest> for M2mRequest {
1053    fn from(req: proto::messages::BroadcastDeletionRequest) -> Self {
1054        M2mRequest::BroadcastDeletion(req.resource.map(|r| r.into()).unwrap_or_default())
1055    }
1056}
1057
1058/// Converts Protocol Buffer CheckSourceCompliance request to internal M2M request.
1059impl From<proto::messages::CheckSourceCompliance> for M2mRequest {
1060    fn from(req: proto::messages::CheckSourceCompliance) -> Self {
1061        let sources: HashSet<LocalizedResource> =
1062            req.sources.into_iter().map(|s| s.into()).collect();
1063        let destination: LocalizedResource = req.destination.map(|d| d.into()).unwrap_or_default();
1064        let destination_policy: Policy =
1065            req.destination_policy.map(|p| p.into()).unwrap_or_default();
1066        M2mRequest::CheckSourceCompliance {
1067            sources,
1068            destination: (destination, destination_policy),
1069        }
1070    }
1071}
1072
1073/// Converts internal M2M DestinationPolicy response to Protocol Buffer response.
1074impl From<Policy> for proto::messages::DestinationPolicy {
1075    fn from(policy: Policy) -> Self {
1076        proto::messages::DestinationPolicy { policy: Some(policy.into()) }
1077    }
1078}
1079
1080// ========== O2M Protocol Buffer Conversions ==========
1081
1082/// Converts Protocol Buffer GetPoliciesRequest to internal O2M request.
1083impl From<proto::messages::GetPoliciesRequest> for O2mRequest {
1084    fn from(req: proto::messages::GetPoliciesRequest) -> Self {
1085        O2mRequest::GetPolicies(req.resources.into_iter().map(|r| r.into()).collect())
1086    }
1087}
1088
1089/// Converts Protocol Buffer SetPolicyRequest to internal O2M request.
1090impl From<proto::messages::SetPolicyRequest> for O2mRequest {
1091    fn from(req: proto::messages::SetPolicyRequest) -> Self {
1092        O2mRequest::SetPolicy {
1093            resource: req.resource.map(|r| r.into()).unwrap_or_default(),
1094            policy: req.policy.map(|p| p.into()).unwrap_or_default(),
1095        }
1096    }
1097}
1098
1099/// Converts Protocol Buffer SetConfidentialityRequest to internal O2M request.
1100impl From<proto::messages::SetConfidentialityRequest> for O2mRequest {
1101    fn from(req: proto::messages::SetConfidentialityRequest) -> Self {
1102        O2mRequest::SetConfidentiality {
1103            resource: req.resource.map(|r| r.into()).unwrap_or_default(),
1104            confidentiality: match req.confidentiality {
1105                x if x == proto::primitives::Confidentiality::Secret as i32 => {
1106                    ConfidentialityPolicy::Secret
1107                }
1108                _ => ConfidentialityPolicy::Public,
1109            },
1110        }
1111    }
1112}
1113
1114/// Converts Protocol Buffer SetIntegrityRequest to internal O2M request.
1115impl From<proto::messages::SetIntegrityRequest> for O2mRequest {
1116    fn from(req: proto::messages::SetIntegrityRequest) -> Self {
1117        O2mRequest::SetIntegrity {
1118            resource: req.resource.map(|r| r.into()).unwrap_or_default(),
1119            integrity: req.integrity,
1120        }
1121    }
1122}
1123
1124/// Converts Protocol Buffer SetDeletedRequest to internal O2M request.
1125impl From<proto::messages::SetDeletedRequest> for O2mRequest {
1126    fn from(req: proto::messages::SetDeletedRequest) -> Self {
1127        O2mRequest::SetDeleted(req.resource.map(|r| r.into()).unwrap_or_default())
1128    }
1129}
1130
1131/// Converts Protocol Buffer EnforceConsentRequest to internal O2M request.
1132impl From<proto::messages::EnforceConsentRequest> for O2mRequest {
1133    fn from(req: proto::messages::EnforceConsentRequest) -> Self {
1134        O2mRequest::EnforceConsent(req.resource.map(|r| r.into()).unwrap_or_default())
1135    }
1136}
1137
1138/// Converts Protocol Buffer SetConsentDecisionRequest to internal O2M request.
1139impl From<proto::messages::SetConsentDecisionRequest> for O2mRequest {
1140    fn from(req: proto::messages::SetConsentDecisionRequest) -> Self {
1141        O2mRequest::SetConsentDecision {
1142            source: req.source.map(|r| r.into()).unwrap_or_default(),
1143            destination: req
1144                .destination
1145                .map(|d| d.into())
1146                .unwrap_or_else(|| Destination::Node(String::new())),
1147            decision: req.decision,
1148        }
1149    }
1150}
1151
1152/// Converts Protocol Buffer GetReferencesRequest to internal O2M request.
1153impl From<proto::messages::GetReferencesRequest> for O2mRequest {
1154    fn from(req: proto::messages::GetReferencesRequest) -> Self {
1155        O2mRequest::GetReferences(req.resource.map(|r| r.into()).unwrap_or_default())
1156    }
1157}
1158
1159// ========== O2M Response Conversions ==========
1160
1161/// Converts internal resource-policy map to Protocol Buffer GetPoliciesResponse.
1162impl From<HashMap<Resource, Policy>> for proto::messages::GetPoliciesResponse {
1163    fn from(policies: HashMap<Resource, Policy>) -> Self {
1164        proto::messages::GetPoliciesResponse {
1165            policies: policies
1166                .into_iter()
1167                .map(|(resource, policy)| proto::primitives::MappedLocalizedPolicy {
1168                    resource: Some(LocalizedResource::new(String::new(), resource).into()),
1169                    policy: Some(policy.into()),
1170                })
1171                .collect(),
1172        }
1173    }
1174}
1175
1176/// Converts internal LocalizedResource-policy map to Protocol Buffer GetPoliciesResponse.
1177impl From<HashMap<LocalizedResource, Policy>> for proto::messages::GetPoliciesResponse {
1178    fn from(policies: HashMap<LocalizedResource, Policy>) -> Self {
1179        proto::messages::GetPoliciesResponse {
1180            policies: policies
1181                .into_iter()
1182                .map(|(resource, policy)| proto::primitives::MappedLocalizedPolicy {
1183                    resource: Some(resource.into()),
1184                    policy: Some(policy.into()),
1185                })
1186                .collect(),
1187        }
1188    }
1189}
1190
1191/// Converts internal provenance references to Protocol Buffer GetReferencesResponse.
1192impl From<HashMap<String, HashSet<Resource>>> for proto::messages::GetReferencesResponse {
1193    fn from(references: HashMap<String, HashSet<Resource>>) -> Self {
1194        proto::messages::GetReferencesResponse {
1195            references: references
1196                .into_iter()
1197                .map(|(node, resources)| (node, resources).into())
1198                .collect(),
1199        }
1200    }
1201}