1use std::{
50 collections::{HashMap, HashSet},
51 future::Future,
52 pin::Pin,
53 sync::Arc,
54 task::Poll,
55};
56
57use dashmap::DashMap;
58use futures::future::try_join_all;
59use tokio_stream::{StreamExt, wrappers::BroadcastStream};
60use tonic::{Request, Response, Status, transport::Channel};
61use tower::Service;
62use tracing::info;
63
64pub const DEFAULT_GRPC_PORT: u16 = 50051;
66
67pub mod proto {
69 tonic::include_proto!("trace2e");
70 pub mod primitives {
71 tonic::include_proto!("trace2e.primitives");
72 }
73 pub mod messages {
74 tonic::include_proto!("trace2e.messages");
75 }
76
77 pub const MIDDLEWARE_DESCRIPTOR_SET: &[u8] = include_bytes!("../../trace2e_descriptor.bin");
79}
80
81use crate::{
82 traceability::{
83 api::types::{M2mRequest, M2mResponse, O2mRequest, O2mResponse, P2mRequest, P2mResponse},
84 error::TraceabilityError,
85 infrastructure::naming::{
86 DisplayableResource, Fd, File, LocalizedResource, Process, Resource, Stream,
87 },
88 services::{
89 compliance::{ConfidentialityPolicy, Policy},
90 consent::Destination,
91 },
92 },
93 transport::eval_remote_ip,
94};
95
96impl From<TraceabilityError> for Status {
98 fn from(error: TraceabilityError) -> Self {
99 Status::internal(error.to_string())
100 }
101}
102
103#[derive(Default, Clone)]
121pub struct M2mGrpc {
122 connected_remotes: Arc<DashMap<String, proto::m2m_client::M2mClient<Channel>>>,
124 mock_mode: bool,
126}
127
128impl M2mGrpc {
129 pub fn mock() -> Self {
130 Self { mock_mode: true, connected_remotes: Arc::new(DashMap::new()) }
131 }
132
133 async fn connect_remote(
150 &self,
151 remote_ip: String,
152 ) -> Result<proto::m2m_client::M2mClient<Channel>, TraceabilityError> {
153 match proto::m2m_client::M2mClient::connect(format!(
154 "http://{remote_ip}:{DEFAULT_GRPC_PORT}"
155 ))
156 .await
157 {
158 Ok(client) => {
159 self.connected_remotes.insert(remote_ip, client.clone());
160 Ok(client)
161 }
162 Err(_) => Err(TraceabilityError::TransportFailedToContactRemote(remote_ip)),
163 }
164 }
165
166 async fn get_client(&self, remote_ip: String) -> Option<proto::m2m_client::M2mClient<Channel>> {
176 self.connected_remotes.get(&remote_ip).map(|c| c.to_owned())
177 }
178
179 async fn get_client_or_connect(
197 &self,
198 remote_ip: String,
199 ) -> Result<proto::m2m_client::M2mClient<Channel>, TraceabilityError> {
200 let ip = if self.mock_mode { "127.0.0.1".to_string() } else { remote_ip };
202 match self.get_client(ip.clone()).await {
203 Some(client) => Ok(client),
204 None => self.connect_remote(ip).await,
205 }
206 }
207}
208
209impl Service<M2mRequest> for M2mGrpc {
210 type Response = M2mResponse;
211 type Error = TraceabilityError;
212 type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
213
214 fn poll_ready(&mut self, _: &mut std::task::Context<'_>) -> Poll<Result<(), Self::Error>> {
215 Poll::Ready(Ok(()))
216 }
217
218 fn call(&mut self, request: M2mRequest) -> Self::Future {
219 let this = self.clone();
220 Box::pin(async move {
221 match request.clone() {
222 M2mRequest::GetDestinationPolicy(destination) => {
223 info!(
224 destination = %destination,
225 "[gRPC-client] GetDestinationPolicy"
226 );
227 let remote_ip = eval_remote_ip(request)?;
228 let mut client = this.get_client_or_connect(remote_ip.clone()).await?;
229
230 let proto_req = proto::messages::GetDestinationPolicy {
232 destination: Some(destination.into()),
233 };
234
235 let response = client
237 .m2m_destination_policy(Request::new(proto_req))
238 .await
239 .map_err(|_| TraceabilityError::TransportFailedToContactRemote(remote_ip))?
240 .into_inner();
241 Ok(M2mResponse::DestinationPolicy(
242 response.policy.map(|policy| policy.into()).unwrap_or_default(),
243 ))
244 }
245 M2mRequest::CheckSourceCompliance { sources, destination } => {
246 info!(
247 sources = %DisplayableResource::from(&sources),
248 destination = %destination.0,
249 destination_policy = ?destination.1,
250 "[gRPC-client] CheckSourceCompliance"
251 );
252 let sources_partition = sources.iter().fold(
254 HashMap::new(),
255 |mut partitions: HashMap<&String, HashSet<&LocalizedResource>>, lr| {
256 partitions.entry(lr.node_id()).or_default().insert(lr);
257 partitions
258 },
259 );
260
261 let futures = sources_partition
262 .into_iter()
263 .map(|(node_id, sources)| {
264 let this_clone = this.clone();
265 let dest_resource = destination.0.clone();
266 let dest_policy = destination.1.clone();
267 async move {
268 let mut client =
269 this_clone.get_client_or_connect(node_id.to_string()).await?;
270 client
271 .m2m_check_source_compliance(Request::new(
272 proto::messages::CheckSourceCompliance {
273 sources: sources
274 .iter()
275 .map(|r| (**r).clone().into())
276 .collect(),
277 destination: Some((dest_resource).into()),
278 destination_policy: Some((dest_policy).into()),
279 },
280 ))
281 .await
282 .map_err(|_| {
283 TraceabilityError::TransportFailedToContactRemote(
284 node_id.to_string(),
285 )
286 })
287 }
288 })
289 .collect::<Vec<_>>();
290
291 try_join_all(futures).await?;
293 Ok(M2mResponse::Ack)
294 }
295 M2mRequest::UpdateProvenance { source_prov, destination } => {
296 info!(
297 source_prov = %DisplayableResource::from(&source_prov),
298 destination = %destination,
299 "[gRPC-client] UpdateProvenance"
300 );
301 let remote_ip = eval_remote_ip(request)?;
302 let mut client = this.get_client_or_connect(remote_ip.clone()).await?;
303
304 let mut grouped: HashMap<String, Vec<proto::primitives::Resource>> =
306 HashMap::default();
307 for lr in source_prov {
308 grouped
309 .entry(lr.node_id().clone())
310 .or_default()
311 .push(lr.resource().clone().into());
312 }
313
314 let source_prov_proto: Vec<proto::primitives::References> = grouped
316 .into_iter()
317 .map(|(node, resources)| proto::primitives::References { node, resources })
318 .collect();
319
320 let proto_req = proto::messages::UpdateProvenance {
322 source_prov: source_prov_proto,
323 destination: Some(destination.into()),
324 };
325
326 client.m2m_update_provenance(Request::new(proto_req)).await.map_err(|_| {
328 TraceabilityError::TransportFailedToContactRemote(remote_ip)
329 })?;
330
331 Ok(M2mResponse::Ack)
332 }
333 M2mRequest::BroadcastDeletion(resource) => {
334 info!(
335 resource = %resource,
336 "[gRPC-client] BroadcastDeletion"
337 );
338 let remote_ip = eval_remote_ip(request)?;
339 let mut client = this.get_client_or_connect(remote_ip.clone()).await?;
340
341 let proto_req = proto::messages::BroadcastDeletionRequest {
343 resource: Some(resource.into()),
344 };
345
346 client.m2m_broadcast_deletion(Request::new(proto_req)).await.map_err(|_| {
348 TraceabilityError::TransportFailedToContactRemote(remote_ip)
349 })?;
350
351 Ok(M2mResponse::Ack)
352 }
353 }
354 })
355 }
356}
357
358pub struct P2mHandler<P2mApi> {
375 p2m: P2mApi,
377}
378
379impl<P2mApi> P2mHandler<P2mApi> {
380 pub fn new(p2m: P2mApi) -> Self {
386 Self { p2m }
387 }
388}
389
390#[tonic::async_trait]
397impl<P2mApi> proto::p2m_server::P2m for P2mHandler<P2mApi>
398where
399 P2mApi: Service<P2mRequest, Response = P2mResponse, Error = TraceabilityError>
400 + Clone
401 + Sync
402 + Send
403 + 'static,
404 P2mApi::Future: Send,
405{
406 async fn p2m_local_enroll(
411 &self,
412 request: Request<proto::messages::LocalCt>,
413 ) -> Result<Response<proto::messages::Ack>, Status> {
414 let req = request.into_inner();
415 let mut p2m = self.p2m.clone();
416 match p2m
417 .call(P2mRequest::LocalEnroll {
418 pid: req.process_id,
419 fd: req.file_descriptor,
420 path: req.path,
421 })
422 .await?
423 {
424 P2mResponse::Ack => Ok(Response::new(proto::messages::Ack {})),
425 _ => Err(Status::internal("Internal traceability API error")),
426 }
427 }
428
429 async fn p2m_remote_enroll(
434 &self,
435 request: Request<proto::messages::RemoteCt>,
436 ) -> Result<Response<proto::messages::Ack>, Status> {
437 let req = request.into_inner();
438 let mut p2m = self.p2m.clone();
439 match p2m
440 .call(P2mRequest::RemoteEnroll {
441 pid: req.process_id,
442 fd: req.file_descriptor,
443 local_socket: req.local_socket,
444 peer_socket: req.peer_socket,
445 })
446 .await?
447 {
448 P2mResponse::Ack => Ok(Response::new(proto::messages::Ack {})),
449 _ => Err(Status::internal("Internal traceability API error")),
450 }
451 }
452
453 async fn p2m_io_request(
458 &self,
459 request: Request<proto::messages::IoInfo>,
460 ) -> Result<Response<proto::messages::Grant>, Status> {
461 let req = request.into_inner();
462 let mut p2m = self.p2m.clone();
463 match p2m
464 .call(P2mRequest::IoRequest {
465 pid: req.process_id,
466 fd: req.file_descriptor,
467 output: req.flow == proto::primitives::Flow::Output as i32,
468 })
469 .await?
470 {
471 P2mResponse::Grant(id) => {
472 Ok(Response::new(proto::messages::Grant { id: id.to_string() }))
473 }
474 _ => Err(Status::internal("Internal traceability API error")),
475 }
476 }
477
478 async fn p2m_io_report(
483 &self,
484 request: Request<proto::messages::IoResult>,
485 ) -> Result<Response<proto::messages::Ack>, Status> {
486 let req = request.into_inner();
487 let mut p2m = self.p2m.clone();
488 match p2m
489 .call(P2mRequest::IoReport {
490 pid: req.process_id,
491 fd: req.file_descriptor,
492 grant_id: req.grant_id.parse::<u128>().unwrap_or_default(),
493 result: req.result,
494 })
495 .await?
496 {
497 P2mResponse::Ack => Ok(Response::new(proto::messages::Ack {})),
498 _ => Err(Status::internal("Internal traceability API error")),
499 }
500 }
501}
502pub struct M2mHandler<M2mApi> {
503 m2m: M2mApi,
505}
506
507impl<M2mApi> M2mHandler<M2mApi> {
508 pub fn new(m2m: M2mApi) -> Self {
514 Self { m2m }
515 }
516}
517
518#[tonic::async_trait]
519impl<M2mApi> proto::m2m_server::M2m for M2mHandler<M2mApi>
520where
521 M2mApi: Service<M2mRequest, Response = M2mResponse, Error = TraceabilityError>
522 + Clone
523 + Sync
524 + Send
525 + 'static,
526 M2mApi::Future: Send,
527{
528 async fn m2m_destination_policy(
533 &self,
534 request: Request<proto::messages::GetDestinationPolicy>,
535 ) -> Result<Response<proto::messages::DestinationPolicy>, Status> {
536 info!("[gRPC-server] m2m_destination_policy");
537 let req = request.into_inner();
538 let mut m2m = self.m2m.clone();
539 match m2m.call(req.into()).await? {
540 M2mResponse::DestinationPolicy(policy) => Ok(Response::new(policy.into())),
541 _ => Err(Status::internal("Internal traceability API error")),
542 }
543 }
544
545 async fn m2m_check_source_compliance(
550 &self,
551 request: Request<proto::messages::CheckSourceCompliance>,
552 ) -> Result<Response<proto::messages::Ack>, Status> {
553 info!("[gRPC-server] m2m_check_source_compliance");
554 let req = request.into_inner();
555 let mut m2m = self.m2m.clone();
556 m2m.call(req.into()).await?;
557 Ok(Response::new(proto::messages::Ack {}))
558 }
559
560 async fn m2m_update_provenance(
566 &self,
567 request: Request<proto::messages::UpdateProvenance>,
568 ) -> Result<Response<proto::messages::Ack>, Status> {
569 info!("[gRPC-server] m2m_update_provenance");
570 let req = request.into_inner();
571 let mut m2m = self.m2m.clone();
572 match m2m.call(req.into()).await? {
573 M2mResponse::Ack => Ok(Response::new(proto::messages::Ack {})),
574 _ => Err(Status::internal("Internal traceability API error")),
575 }
576 }
577
578 async fn m2m_broadcast_deletion(
582 &self,
583 request: Request<proto::messages::BroadcastDeletionRequest>,
584 ) -> Result<Response<proto::messages::Ack>, Status> {
585 info!("[gRPC-server] m2m_broadcast_deletion");
586 let req = request.into_inner();
587 let mut m2m = self.m2m.clone();
588 match m2m.call(req.into()).await? {
589 M2mResponse::Ack => Ok(Response::new(proto::messages::Ack {})),
590 _ => Err(Status::internal("Internal traceability API error")),
591 }
592 }
593}
594
595pub struct O2mHandler<O2mApi> {
605 o2m: O2mApi,
607}
608
609impl<O2mApi> O2mHandler<O2mApi> {
610 pub fn new(o2m: O2mApi) -> Self {
616 Self { o2m }
617 }
618}
619
620#[tonic::async_trait]
625#[allow(clippy::result_large_err)]
626impl<O2mApi> proto::o2m_server::O2m for O2mHandler<O2mApi>
627where
628 O2mApi: Service<O2mRequest, Response = O2mResponse, Error = TraceabilityError>
629 + Clone
630 + Sync
631 + Send
632 + 'static,
633 O2mApi::Future: Send,
634{
635 async fn o2m_get_policies(
639 &self,
640 request: Request<proto::messages::GetPoliciesRequest>,
641 ) -> Result<Response<proto::messages::GetPoliciesResponse>, Status> {
642 let req = request.into_inner();
643 let mut o2m = self.o2m.clone();
644 match o2m.call(req.into()).await? {
645 O2mResponse::Policies(policies) => Ok(Response::new(policies.into())),
646 _ => Err(Status::internal("Internal traceability API error")),
647 }
648 }
649
650 async fn o2m_set_policy(
654 &self,
655 request: Request<proto::messages::SetPolicyRequest>,
656 ) -> Result<Response<proto::messages::Ack>, Status> {
657 let req = request.into_inner();
658 let mut o2m = self.o2m.clone();
659 match o2m.call(req.into()).await? {
660 O2mResponse::Ack => Ok(Response::new(proto::messages::Ack {})),
661 _ => Err(Status::internal("Internal traceability API error")),
662 }
663 }
664
665 async fn o2m_set_confidentiality(
669 &self,
670 request: Request<proto::messages::SetConfidentialityRequest>,
671 ) -> Result<Response<proto::messages::Ack>, Status> {
672 let req = request.into_inner();
673 let mut o2m = self.o2m.clone();
674 match o2m.call(req.into()).await? {
675 O2mResponse::Ack => Ok(Response::new(proto::messages::Ack {})),
676 _ => Err(Status::internal("Internal traceability API error")),
677 }
678 }
679
680 async fn o2m_set_integrity(
684 &self,
685 request: Request<proto::messages::SetIntegrityRequest>,
686 ) -> Result<Response<proto::messages::Ack>, Status> {
687 let req = request.into_inner();
688 let mut o2m = self.o2m.clone();
689 match o2m.call(req.into()).await? {
690 O2mResponse::Ack => Ok(Response::new(proto::messages::Ack {})),
691 _ => Err(Status::internal("Internal traceability API error")),
692 }
693 }
694
695 async fn o2m_set_deleted(
699 &self,
700 request: Request<proto::messages::SetDeletedRequest>,
701 ) -> Result<Response<proto::messages::Ack>, Status> {
702 let req = request.into_inner();
703 let mut o2m = self.o2m.clone();
704 match o2m.call(req.into()).await? {
705 O2mResponse::Ack => Ok(Response::new(proto::messages::Ack {})),
706 _ => Err(Status::internal("Internal traceability API error")),
707 }
708 }
709
710 type O2MEnforceConsentStream = Pin<
711 Box<
712 dyn tokio_stream::Stream<Item = Result<proto::messages::ConsentNotification, Status>>
713 + Send,
714 >,
715 >;
716
717 async fn o2m_enforce_consent(
722 &self,
723 request: Request<proto::messages::EnforceConsentRequest>,
724 ) -> Result<Response<Self::O2MEnforceConsentStream>, Status> {
725 let req = request.into_inner();
726 let mut o2m = self.o2m.clone();
727 match o2m.call(req.into()).await? {
728 O2mResponse::Notifications(receiver) => {
729 let stream = BroadcastStream::new(receiver).map(|result| {
731 match result {
732 Ok(destination) => {
733 let consent_request =
735 format!("Consent request for destination: {:?}", destination);
736 Ok(proto::messages::ConsentNotification { consent_request })
737 }
738 Err(e) => {
739 Err(Status::internal(format!("Notification stream error: {}", e)))
740 }
741 }
742 });
743 Ok(Response::new(Box::pin(stream)))
744 }
745 _ => Err(Status::internal("Internal traceability API error")),
746 }
747 }
748
749 async fn o2m_set_consent_decision(
753 &self,
754 request: Request<proto::messages::SetConsentDecisionRequest>,
755 ) -> Result<Response<proto::messages::Ack>, Status> {
756 let req = request.into_inner();
757 let mut o2m = self.o2m.clone();
758 match o2m.call(req.into()).await? {
759 O2mResponse::Ack => Ok(Response::new(proto::messages::Ack {})),
760 _ => Err(Status::internal("Internal traceability API error")),
761 }
762 }
763
764 async fn o2m_get_references(
768 &self,
769 request: Request<proto::messages::GetReferencesRequest>,
770 ) -> Result<Response<proto::messages::GetReferencesResponse>, Status> {
771 let req = request.into_inner();
772 let mut o2m = self.o2m.clone();
773 match o2m.call(req.into()).await? {
774 O2mResponse::References(references) => {
775 let mut grouped: HashMap<String, HashSet<Resource>> = HashMap::default();
777 for lr in references {
778 grouped.entry(lr.node_id().clone()).or_default().insert(lr.resource().clone());
779 }
780 Ok(Response::new(grouped.into()))
781 }
782 _ => Err(Status::internal("Internal traceability API error")),
783 }
784 }
785}
786
787impl From<proto::primitives::Resource> for Resource {
792 fn from(req: proto::primitives::Resource) -> Self {
793 match req.resource {
794 Some(proto::primitives::resource::Resource::Fd(fd)) => Resource::Fd(fd.into()),
795 Some(proto::primitives::resource::Resource::Process(process)) => {
796 Resource::Process(process.into())
797 }
798 None => Resource::None,
799 }
800 }
801}
802
803impl From<Resource> for proto::primitives::Resource {
805 fn from(resource: Resource) -> Self {
806 match resource {
807 Resource::Fd(fd) => proto::primitives::Resource {
808 resource: Some(proto::primitives::resource::Resource::Fd(fd.into())),
809 },
810 Resource::Process(process) => proto::primitives::Resource {
811 resource: Some(proto::primitives::resource::Resource::Process(process.into())),
812 },
813 Resource::None => proto::primitives::Resource { resource: None },
814 }
815 }
816}
817
818impl From<proto::primitives::Fd> for Fd {
819 fn from(proto_fd: proto::primitives::Fd) -> Self {
820 match proto_fd.fd {
821 Some(proto::primitives::fd::Fd::File(file)) => Fd::File(file.into()),
822 Some(proto::primitives::fd::Fd::Stream(stream)) => Fd::Stream(stream.into()),
823 None => Fd::File(File { path: String::new() }),
824 }
825 }
826}
827
828impl From<Fd> for proto::primitives::Fd {
829 fn from(fd: Fd) -> Self {
830 match fd {
831 Fd::File(file) => {
832 proto::primitives::Fd { fd: Some(proto::primitives::fd::Fd::File(file.into())) }
833 }
834 Fd::Stream(stream) => {
835 proto::primitives::Fd { fd: Some(proto::primitives::fd::Fd::Stream(stream.into())) }
836 }
837 }
838 }
839}
840
841impl From<proto::primitives::File> for File {
842 fn from(proto_file: proto::primitives::File) -> Self {
843 File { path: proto_file.path }
844 }
845}
846
847impl From<File> for proto::primitives::File {
848 fn from(file: File) -> Self {
849 proto::primitives::File { path: file.path }
850 }
851}
852
853impl From<proto::primitives::Stream> for Stream {
854 fn from(proto_stream: proto::primitives::Stream) -> Self {
855 Stream { local_socket: proto_stream.local_socket, peer_socket: proto_stream.peer_socket }
856 }
857}
858
859impl From<Stream> for proto::primitives::Stream {
860 fn from(stream: Stream) -> Self {
861 proto::primitives::Stream {
862 local_socket: stream.local_socket,
863 peer_socket: stream.peer_socket,
864 }
865 }
866}
867
868impl From<proto::primitives::Process> for Process {
869 fn from(proto_process: proto::primitives::Process) -> Self {
870 Process {
871 pid: proto_process.pid,
872 starttime: proto_process.starttime,
873 exe_path: proto_process.exe_path,
874 }
875 }
876}
877
878impl From<Process> for proto::primitives::Process {
879 fn from(process: Process) -> Self {
880 proto::primitives::Process {
881 pid: process.pid,
882 starttime: process.starttime,
883 exe_path: process.exe_path,
884 }
885 }
886}
887
888impl From<proto::primitives::LocalizedResource> for LocalizedResource {
892 fn from(proto_lr: proto::primitives::LocalizedResource) -> Self {
893 LocalizedResource::new(
894 proto_lr.node_id,
895 proto_lr.resource.map(|r| r.into()).unwrap_or_default(),
896 )
897 }
898}
899
900impl From<LocalizedResource> for proto::primitives::LocalizedResource {
902 fn from(lr: LocalizedResource) -> Self {
903 proto::primitives::LocalizedResource {
904 node_id: lr.node_id().clone(),
905 resource: Some(lr.resource().clone().into()),
906 }
907 }
908}
909
910impl From<proto::primitives::Destination> for Destination {
912 fn from(proto_dest: proto::primitives::Destination) -> Self {
913 match proto_dest.destination {
914 Some(proto::primitives::destination::Destination::Resource(lr_with_parent)) => {
915 if let Some(proto_lr) = lr_with_parent.resource {
916 let localized_resource: LocalizedResource = proto_lr.into();
917 let parent = lr_with_parent.parent.map(|p| Box::new((*p).into()));
918 Destination::Resource {
920 resource: localized_resource.resource().clone(),
921 parent,
922 }
923 } else {
924 Destination::Resource { resource: Resource::None, parent: None }
925 }
926 }
927 Some(proto::primitives::destination::Destination::Node(node_id)) => {
928 Destination::Node(node_id)
929 }
930 None => Destination::Node(String::new()),
931 }
932 }
933}
934
935impl From<Destination> for proto::primitives::Destination {
938 fn from(dest: Destination) -> Self {
939 destination_to_proto_node_variant(dest)
940 }
941}
942
943pub fn destination_to_proto_node_variant(dest: Destination) -> proto::primitives::Destination {
946 match dest {
947 Destination::Resource { resource, parent } => {
948 let (node_id, proto_parent) = match &parent {
952 Some(p) => {
953 match &(**p) {
954 Destination::Node(node) => {
955 (node.clone(), parent.map(|p| Box::new((*p).clone().into())))
956 }
957 _ => {
958 (String::new(), parent.map(|p| Box::new((*p).clone().into())))
960 }
961 }
962 }
963 None => {
964 (String::new(), None)
966 }
967 };
968
969 proto::primitives::Destination {
970 destination: Some(proto::primitives::destination::Destination::Resource(Box::new(
971 proto::primitives::LocalizedResourceWithParent {
972 resource: Some(LocalizedResource::new(node_id, resource).into()),
973 parent: proto_parent,
974 },
975 ))),
976 }
977 }
978 Destination::Node(node_id) => proto::primitives::Destination {
979 destination: Some(proto::primitives::destination::Destination::Node(node_id)),
980 },
981 }
982}
983
984impl From<Policy> for proto::primitives::Policy {
987 fn from(policy: Policy) -> Self {
988 proto::primitives::Policy {
989 confidentiality: match policy.is_confidential() {
990 false => proto::primitives::Confidentiality::Public as i32,
991 true => proto::primitives::Confidentiality::Secret as i32,
992 },
993 integrity: policy.get_integrity(),
994 deleted: policy.is_deleted(),
995 consent: policy.get_consent(),
996 }
997 }
998}
999
1000impl From<proto::primitives::Policy> for Policy {
1001 fn from(proto_policy: proto::primitives::Policy) -> Self {
1002 Policy::new(
1003 match proto_policy.confidentiality {
1004 x if x == proto::primitives::Confidentiality::Secret as i32 => {
1005 ConfidentialityPolicy::Secret
1006 }
1007 _ => ConfidentialityPolicy::Public,
1008 },
1009 proto_policy.integrity,
1010 proto_policy.deleted.into(),
1011 proto_policy.consent,
1012 )
1013 }
1014}
1015
1016impl From<proto::primitives::MappedLocalizedPolicy> for (LocalizedResource, Policy) {
1020 fn from(policy: proto::primitives::MappedLocalizedPolicy) -> Self {
1021 (
1022 policy.resource.map(|r| r.into()).unwrap_or_default(),
1023 policy.policy.map(|p| p.into()).unwrap_or_default(),
1024 )
1025 }
1026}
1027
1028impl From<proto::primitives::References> for (String, HashSet<Resource>) {
1032 fn from(references: proto::primitives::References) -> Self {
1033 (references.node, references.resources.into_iter().map(|r| r.into()).collect())
1034 }
1035}
1036
1037impl From<proto::primitives::References> for HashSet<LocalizedResource> {
1039 fn from(references: proto::primitives::References) -> Self {
1040 references
1041 .resources
1042 .into_iter()
1043 .map(|r| LocalizedResource::new(references.node.clone(), r.into()))
1044 .collect()
1045 }
1046}
1047
1048impl From<(String, HashSet<Resource>)> for proto::primitives::References {
1050 fn from((node, resources): (String, HashSet<Resource>)) -> Self {
1051 proto::primitives::References {
1052 node,
1053 resources: resources.into_iter().map(|r| r.into()).collect(),
1054 }
1055 }
1056}
1057
1058impl From<proto::messages::GetDestinationPolicy> for M2mRequest {
1062 fn from(req: proto::messages::GetDestinationPolicy) -> Self {
1063 M2mRequest::GetDestinationPolicy(req.destination.map(|d| d.into()).unwrap_or_default())
1064 }
1065}
1066
1067impl From<proto::messages::UpdateProvenance> for M2mRequest {
1069 fn from(req: proto::messages::UpdateProvenance) -> Self {
1070 let source_prov: HashSet<LocalizedResource> = req
1071 .source_prov
1072 .into_iter()
1073 .flat_map(|refs: proto::primitives::References| {
1074 let node_id = refs.node.clone();
1075 refs.resources
1076 .into_iter()
1077 .map(move |r| LocalizedResource::new(node_id.clone(), r.into()))
1078 })
1079 .collect();
1080
1081 M2mRequest::UpdateProvenance {
1082 source_prov,
1083 destination: req.destination.map(|d| d.into()).unwrap_or_default(),
1084 }
1085 }
1086}
1087
1088impl From<proto::messages::BroadcastDeletionRequest> for M2mRequest {
1090 fn from(req: proto::messages::BroadcastDeletionRequest) -> Self {
1091 M2mRequest::BroadcastDeletion(req.resource.map(|r| r.into()).unwrap_or_default())
1092 }
1093}
1094
1095impl From<proto::messages::CheckSourceCompliance> for M2mRequest {
1097 fn from(req: proto::messages::CheckSourceCompliance) -> Self {
1098 let sources: HashSet<LocalizedResource> =
1099 req.sources.into_iter().map(|s| s.into()).collect();
1100 let destination: LocalizedResource = req.destination.map(|d| d.into()).unwrap_or_default();
1101 let destination_policy: Policy =
1102 req.destination_policy.map(|p| p.into()).unwrap_or_default();
1103 M2mRequest::CheckSourceCompliance {
1104 sources,
1105 destination: (destination, destination_policy),
1106 }
1107 }
1108}
1109
1110impl From<Policy> for proto::messages::DestinationPolicy {
1112 fn from(policy: Policy) -> Self {
1113 proto::messages::DestinationPolicy { policy: Some(policy.into()) }
1114 }
1115}
1116
1117impl From<proto::messages::GetPoliciesRequest> for O2mRequest {
1121 fn from(req: proto::messages::GetPoliciesRequest) -> Self {
1122 O2mRequest::GetPolicies(req.resources.into_iter().map(|r| r.into()).collect())
1123 }
1124}
1125
1126impl From<proto::messages::SetPolicyRequest> for O2mRequest {
1128 fn from(req: proto::messages::SetPolicyRequest) -> Self {
1129 O2mRequest::SetPolicy {
1130 resource: req.resource.map(|r| r.into()).unwrap_or_default(),
1131 policy: req.policy.map(|p| p.into()).unwrap_or_default(),
1132 }
1133 }
1134}
1135
1136impl From<proto::messages::SetConfidentialityRequest> for O2mRequest {
1138 fn from(req: proto::messages::SetConfidentialityRequest) -> Self {
1139 O2mRequest::SetConfidentiality {
1140 resource: req.resource.map(|r| r.into()).unwrap_or_default(),
1141 confidentiality: match req.confidentiality {
1142 x if x == proto::primitives::Confidentiality::Secret as i32 => {
1143 ConfidentialityPolicy::Secret
1144 }
1145 _ => ConfidentialityPolicy::Public,
1146 },
1147 }
1148 }
1149}
1150
1151impl From<proto::messages::SetIntegrityRequest> for O2mRequest {
1153 fn from(req: proto::messages::SetIntegrityRequest) -> Self {
1154 O2mRequest::SetIntegrity {
1155 resource: req.resource.map(|r| r.into()).unwrap_or_default(),
1156 integrity: req.integrity,
1157 }
1158 }
1159}
1160
1161impl From<proto::messages::SetDeletedRequest> for O2mRequest {
1163 fn from(req: proto::messages::SetDeletedRequest) -> Self {
1164 O2mRequest::SetDeleted(req.resource.map(|r| r.into()).unwrap_or_default())
1165 }
1166}
1167
1168impl From<proto::messages::EnforceConsentRequest> for O2mRequest {
1170 fn from(req: proto::messages::EnforceConsentRequest) -> Self {
1171 O2mRequest::EnforceConsent(req.resource.map(|r| r.into()).unwrap_or_default())
1172 }
1173}
1174
1175impl From<proto::messages::SetConsentDecisionRequest> for O2mRequest {
1177 fn from(req: proto::messages::SetConsentDecisionRequest) -> Self {
1178 O2mRequest::SetConsentDecision {
1179 source: req.source.map(|r| r.into()).unwrap_or_default(),
1180 destination: req
1181 .destination
1182 .map(|d| d.into())
1183 .unwrap_or_else(|| Destination::Node(String::new())),
1184 decision: req.decision,
1185 }
1186 }
1187}
1188
1189impl From<proto::messages::GetReferencesRequest> for O2mRequest {
1191 fn from(req: proto::messages::GetReferencesRequest) -> Self {
1192 O2mRequest::GetReferences(req.resource.map(|r| r.into()).unwrap_or_default())
1193 }
1194}
1195
1196impl From<HashMap<Resource, Policy>> for proto::messages::GetPoliciesResponse {
1200 fn from(policies: HashMap<Resource, Policy>) -> Self {
1201 proto::messages::GetPoliciesResponse {
1202 policies: policies
1203 .into_iter()
1204 .map(|(resource, policy)| proto::primitives::MappedLocalizedPolicy {
1205 resource: Some(LocalizedResource::new(String::new(), resource).into()),
1206 policy: Some(policy.into()),
1207 })
1208 .collect(),
1209 }
1210 }
1211}
1212
1213impl From<HashMap<LocalizedResource, Policy>> for proto::messages::GetPoliciesResponse {
1215 fn from(policies: HashMap<LocalizedResource, Policy>) -> Self {
1216 proto::messages::GetPoliciesResponse {
1217 policies: policies
1218 .into_iter()
1219 .map(|(resource, policy)| proto::primitives::MappedLocalizedPolicy {
1220 resource: Some(resource.into()),
1221 policy: Some(policy.into()),
1222 })
1223 .collect(),
1224 }
1225 }
1226}
1227
1228impl From<HashMap<String, HashSet<Resource>>> for proto::messages::GetReferencesResponse {
1230 fn from(references: HashMap<String, HashSet<Resource>>) -> Self {
1231 proto::messages::GetReferencesResponse {
1232 references: references
1233 .into_iter()
1234 .map(|(node, resources)| (node, resources).into())
1235 .collect(),
1236 }
1237 }
1238}