1use std::{
50 collections::{HashMap, HashSet},
51 future::Future,
52 pin::Pin,
53 sync::Arc,
54 task::Poll,
55};
56
57use dashmap::DashMap;
58use futures::future::try_join_all;
59use tokio_stream::{StreamExt, wrappers::BroadcastStream};
60use tonic::{Request, Response, Status, transport::Channel};
61use tower::Service;
62
63pub const DEFAULT_GRPC_PORT: u16 = 50051;
65
66pub mod proto {
68 tonic::include_proto!("trace2e");
69 pub mod primitives {
70 tonic::include_proto!("trace2e.primitives");
71 }
72 pub mod messages {
73 tonic::include_proto!("trace2e.messages");
74 }
75
76 pub const MIDDLEWARE_DESCRIPTOR_SET: &[u8] = include_bytes!("../../trace2e_descriptor.bin");
78}
79
80use crate::{
81 traceability::{
82 api::types::{M2mRequest, M2mResponse, O2mRequest, O2mResponse, P2mRequest, P2mResponse},
83 error::TraceabilityError,
84 infrastructure::naming::{Fd, File, LocalizedResource, Process, Resource, Stream},
85 services::{
86 compliance::{ConfidentialityPolicy, Policy},
87 consent::Destination,
88 },
89 },
90 transport::eval_remote_ip,
91};
92
93impl From<TraceabilityError> for Status {
95 fn from(error: TraceabilityError) -> Self {
96 Status::internal(error.to_string())
97 }
98}
99
100#[derive(Default, Clone)]
118pub struct M2mGrpc {
119 connected_remotes: Arc<DashMap<String, proto::m2m_client::M2mClient<Channel>>>,
121}
122
123impl M2mGrpc {
124 async fn connect_remote(
141 &self,
142 remote_ip: String,
143 ) -> Result<proto::m2m_client::M2mClient<Channel>, TraceabilityError> {
144 match proto::m2m_client::M2mClient::connect(format!("{remote_ip}:{DEFAULT_GRPC_PORT}"))
145 .await
146 {
147 Ok(client) => {
148 self.connected_remotes.insert(remote_ip, client.clone());
149 Ok(client)
150 }
151 Err(_) => Err(TraceabilityError::TransportFailedToContactRemote(remote_ip)),
152 }
153 }
154
155 async fn get_client(&self, remote_ip: String) -> Option<proto::m2m_client::M2mClient<Channel>> {
165 self.connected_remotes.get(&remote_ip).map(|c| c.to_owned())
166 }
167
168 async fn get_client_or_connect(
186 &self,
187 remote_ip: String,
188 ) -> Result<proto::m2m_client::M2mClient<Channel>, TraceabilityError> {
189 match self.get_client(remote_ip.clone()).await {
190 Some(client) => Ok(client),
191 None => self.connect_remote(remote_ip).await,
192 }
193 }
194}
195
196impl Service<M2mRequest> for M2mGrpc {
197 type Response = M2mResponse;
198 type Error = TraceabilityError;
199 type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
200
201 fn poll_ready(&mut self, _: &mut std::task::Context<'_>) -> Poll<Result<(), Self::Error>> {
202 Poll::Ready(Ok(()))
203 }
204
205 fn call(&mut self, request: M2mRequest) -> Self::Future {
206 let this = self.clone();
207 Box::pin(async move {
208 match request.clone() {
209 M2mRequest::GetDestinationPolicy(destination) => {
210 let remote_ip = eval_remote_ip(request)?;
211 let mut client = this.get_client_or_connect(remote_ip.clone()).await?;
212
213 let proto_req = proto::messages::GetDestinationPolicy {
215 destination: Some(destination.into()),
216 };
217
218 let response = client
220 .m2m_destination_policy(Request::new(proto_req))
221 .await
222 .map_err(|_| TraceabilityError::TransportFailedToContactRemote(remote_ip))?
223 .into_inner();
224 Ok(M2mResponse::DestinationPolicy(
225 response.policy.map(|policy| policy.into()).unwrap_or_default(),
226 ))
227 }
228 M2mRequest::CheckSourceCompliance { sources, destination } => {
229 let sources_partition = sources.iter().fold(
231 HashMap::new(),
232 |mut partitions: HashMap<&String, HashSet<&LocalizedResource>>, lr| {
233 partitions.entry(lr.node_id()).or_default().insert(lr);
234 partitions
235 },
236 );
237
238 let futures = sources_partition
239 .into_iter()
240 .map(|(node_id, sources)| {
241 let this_clone = this.clone();
242 let dest_resource = destination.0.clone();
243 let dest_policy = destination.1.clone();
244 async move {
245 let mut client =
246 this_clone.get_client_or_connect(node_id.to_string()).await?;
247 client
248 .m2m_check_source_compliance(Request::new(
249 proto::messages::CheckSourceCompliance {
250 sources: sources
251 .iter()
252 .map(|r| (**r).clone().into())
253 .collect(),
254 destination: Some((dest_resource).into()),
255 destination_policy: Some((dest_policy).into()),
256 },
257 ))
258 .await
259 .map_err(|_| {
260 TraceabilityError::TransportFailedToContactRemote(
261 node_id.to_string(),
262 )
263 })
264 }
265 })
266 .collect::<Vec<_>>();
267
268 try_join_all(futures).await?;
270 Ok(M2mResponse::Ack)
271 }
272 M2mRequest::UpdateProvenance { source_prov, destination } => {
273 let remote_ip = eval_remote_ip(request)?;
274 let mut client = this.get_client_or_connect(remote_ip.clone()).await?;
275
276 let mut grouped: HashMap<String, Vec<proto::primitives::Resource>> =
278 HashMap::default();
279 for lr in source_prov {
280 grouped
281 .entry(lr.node_id().clone())
282 .or_default()
283 .push(lr.resource().clone().into());
284 }
285
286 let source_prov_proto: Vec<proto::primitives::References> = grouped
288 .into_iter()
289 .map(|(node, resources)| proto::primitives::References { node, resources })
290 .collect();
291
292 let proto_req = proto::messages::UpdateProvenance {
294 source_prov: source_prov_proto,
295 destination: Some(destination.into()),
296 };
297
298 client.m2m_update_provenance(Request::new(proto_req)).await.map_err(|_| {
300 TraceabilityError::TransportFailedToContactRemote(remote_ip)
301 })?;
302
303 Ok(M2mResponse::Ack)
304 }
305 M2mRequest::BroadcastDeletion(resource) => {
306 let remote_ip = eval_remote_ip(request)?;
307 let mut client = this.get_client_or_connect(remote_ip.clone()).await?;
308
309 let proto_req = proto::messages::BroadcastDeletionRequest {
311 resource: Some(resource.into()),
312 };
313
314 client.m2m_broadcast_deletion(Request::new(proto_req)).await.map_err(|_| {
316 TraceabilityError::TransportFailedToContactRemote(remote_ip)
317 })?;
318
319 Ok(M2mResponse::Ack)
320 }
321 }
322 })
323 }
324}
325
326pub struct P2mHandler<P2mApi> {
343 p2m: P2mApi,
345}
346
347impl<P2mApi> P2mHandler<P2mApi> {
348 pub fn new(p2m: P2mApi) -> Self {
354 Self { p2m }
355 }
356}
357
358#[tonic::async_trait]
365impl<P2mApi> proto::p2m_server::P2m for P2mHandler<P2mApi>
366where
367 P2mApi: Service<P2mRequest, Response = P2mResponse, Error = TraceabilityError>
368 + Clone
369 + Sync
370 + Send
371 + 'static,
372 P2mApi::Future: Send,
373{
374 async fn p2m_local_enroll(
379 &self,
380 request: Request<proto::messages::LocalCt>,
381 ) -> Result<Response<proto::messages::Ack>, Status> {
382 let req = request.into_inner();
383 let mut p2m = self.p2m.clone();
384 match p2m
385 .call(P2mRequest::LocalEnroll {
386 pid: req.process_id,
387 fd: req.file_descriptor,
388 path: req.path,
389 })
390 .await?
391 {
392 P2mResponse::Ack => Ok(Response::new(proto::messages::Ack {})),
393 _ => Err(Status::internal("Internal traceability API error")),
394 }
395 }
396
397 async fn p2m_remote_enroll(
402 &self,
403 request: Request<proto::messages::RemoteCt>,
404 ) -> Result<Response<proto::messages::Ack>, Status> {
405 let req = request.into_inner();
406 let mut p2m = self.p2m.clone();
407 match p2m
408 .call(P2mRequest::RemoteEnroll {
409 pid: req.process_id,
410 fd: req.file_descriptor,
411 local_socket: req.local_socket,
412 peer_socket: req.peer_socket,
413 })
414 .await?
415 {
416 P2mResponse::Ack => Ok(Response::new(proto::messages::Ack {})),
417 _ => Err(Status::internal("Internal traceability API error")),
418 }
419 }
420
421 async fn p2m_io_request(
426 &self,
427 request: Request<proto::messages::IoInfo>,
428 ) -> Result<Response<proto::messages::Grant>, Status> {
429 let req = request.into_inner();
430 let mut p2m = self.p2m.clone();
431 match p2m
432 .call(P2mRequest::IoRequest {
433 pid: req.process_id,
434 fd: req.file_descriptor,
435 output: req.flow == proto::primitives::Flow::Output as i32,
436 })
437 .await?
438 {
439 P2mResponse::Grant(id) => {
440 Ok(Response::new(proto::messages::Grant { id: id.to_string() }))
441 }
442 _ => Err(Status::internal("Internal traceability API error")),
443 }
444 }
445
446 async fn p2m_io_report(
451 &self,
452 request: Request<proto::messages::IoResult>,
453 ) -> Result<Response<proto::messages::Ack>, Status> {
454 let req = request.into_inner();
455 let mut p2m = self.p2m.clone();
456 match p2m
457 .call(P2mRequest::IoReport {
458 pid: req.process_id,
459 fd: req.file_descriptor,
460 grant_id: req.grant_id.parse::<u128>().unwrap_or_default(),
461 result: req.result,
462 })
463 .await?
464 {
465 P2mResponse::Ack => Ok(Response::new(proto::messages::Ack {})),
466 _ => Err(Status::internal("Internal traceability API error")),
467 }
468 }
469}
470pub struct M2mHandler<M2mApi> {
471 m2m: M2mApi,
473}
474
475impl<M2mApi> M2mHandler<M2mApi> {
476 pub fn new(m2m: M2mApi) -> Self {
482 Self { m2m }
483 }
484}
485
486#[tonic::async_trait]
487impl<M2mApi> proto::m2m_server::M2m for M2mHandler<M2mApi>
488where
489 M2mApi: Service<M2mRequest, Response = M2mResponse, Error = TraceabilityError>
490 + Clone
491 + Sync
492 + Send
493 + 'static,
494 M2mApi::Future: Send,
495{
496 async fn m2m_destination_policy(
501 &self,
502 request: Request<proto::messages::GetDestinationPolicy>,
503 ) -> Result<Response<proto::messages::DestinationPolicy>, Status> {
504 let req = request.into_inner();
505 let mut m2m = self.m2m.clone();
506 match m2m.call(req.into()).await? {
507 M2mResponse::DestinationPolicy(policy) => Ok(Response::new(policy.into())),
508 _ => Err(Status::internal("Internal traceability API error")),
509 }
510 }
511
512 async fn m2m_check_source_compliance(
517 &self,
518 request: Request<proto::messages::CheckSourceCompliance>,
519 ) -> Result<Response<proto::messages::Ack>, Status> {
520 let req = request.into_inner();
521 let mut m2m = self.m2m.clone();
522 m2m.call(req.into()).await?;
523 Ok(Response::new(proto::messages::Ack {}))
524 }
525
526 async fn m2m_update_provenance(
532 &self,
533 request: Request<proto::messages::UpdateProvenance>,
534 ) -> Result<Response<proto::messages::Ack>, Status> {
535 let req = request.into_inner();
536 let mut m2m = self.m2m.clone();
537 match m2m.call(req.into()).await? {
538 M2mResponse::Ack => Ok(Response::new(proto::messages::Ack {})),
539 _ => Err(Status::internal("Internal traceability API error")),
540 }
541 }
542
543 async fn m2m_broadcast_deletion(
547 &self,
548 request: Request<proto::messages::BroadcastDeletionRequest>,
549 ) -> Result<Response<proto::messages::Ack>, Status> {
550 let req = request.into_inner();
551 let mut m2m = self.m2m.clone();
552 match m2m.call(req.into()).await? {
553 M2mResponse::Ack => Ok(Response::new(proto::messages::Ack {})),
554 _ => Err(Status::internal("Internal traceability API error")),
555 }
556 }
557}
558
559pub struct O2mHandler<O2mApi> {
569 o2m: O2mApi,
571}
572
573impl<O2mApi> O2mHandler<O2mApi> {
574 pub fn new(o2m: O2mApi) -> Self {
580 Self { o2m }
581 }
582}
583
584#[tonic::async_trait]
589impl<O2mApi> proto::o2m_server::O2m for O2mHandler<O2mApi>
590where
591 O2mApi: Service<O2mRequest, Response = O2mResponse, Error = TraceabilityError>
592 + Clone
593 + Sync
594 + Send
595 + 'static,
596 O2mApi::Future: Send,
597{
598 async fn o2m_get_policies(
602 &self,
603 request: Request<proto::messages::GetPoliciesRequest>,
604 ) -> Result<Response<proto::messages::GetPoliciesResponse>, Status> {
605 let req = request.into_inner();
606 let mut o2m = self.o2m.clone();
607 match o2m.call(req.into()).await? {
608 O2mResponse::Policies(policies) => Ok(Response::new(policies.into())),
609 _ => Err(Status::internal("Internal traceability API error")),
610 }
611 }
612
613 async fn o2m_set_policy(
617 &self,
618 request: Request<proto::messages::SetPolicyRequest>,
619 ) -> Result<Response<proto::messages::Ack>, Status> {
620 let req = request.into_inner();
621 let mut o2m = self.o2m.clone();
622 match o2m.call(req.into()).await? {
623 O2mResponse::Ack => Ok(Response::new(proto::messages::Ack {})),
624 _ => Err(Status::internal("Internal traceability API error")),
625 }
626 }
627
628 async fn o2m_set_confidentiality(
632 &self,
633 request: Request<proto::messages::SetConfidentialityRequest>,
634 ) -> Result<Response<proto::messages::Ack>, Status> {
635 let req = request.into_inner();
636 let mut o2m = self.o2m.clone();
637 match o2m.call(req.into()).await? {
638 O2mResponse::Ack => Ok(Response::new(proto::messages::Ack {})),
639 _ => Err(Status::internal("Internal traceability API error")),
640 }
641 }
642
643 async fn o2m_set_integrity(
647 &self,
648 request: Request<proto::messages::SetIntegrityRequest>,
649 ) -> Result<Response<proto::messages::Ack>, Status> {
650 let req = request.into_inner();
651 let mut o2m = self.o2m.clone();
652 match o2m.call(req.into()).await? {
653 O2mResponse::Ack => Ok(Response::new(proto::messages::Ack {})),
654 _ => Err(Status::internal("Internal traceability API error")),
655 }
656 }
657
658 async fn o2m_set_deleted(
662 &self,
663 request: Request<proto::messages::SetDeletedRequest>,
664 ) -> Result<Response<proto::messages::Ack>, Status> {
665 let req = request.into_inner();
666 let mut o2m = self.o2m.clone();
667 match o2m.call(req.into()).await? {
668 O2mResponse::Ack => Ok(Response::new(proto::messages::Ack {})),
669 _ => Err(Status::internal("Internal traceability API error")),
670 }
671 }
672
673 type O2MEnforceConsentStream = Pin<
674 Box<
675 dyn tokio_stream::Stream<Item = Result<proto::messages::ConsentNotification, Status>>
676 + Send,
677 >,
678 >;
679
680 async fn o2m_enforce_consent(
685 &self,
686 request: Request<proto::messages::EnforceConsentRequest>,
687 ) -> Result<Response<Self::O2MEnforceConsentStream>, Status> {
688 let req = request.into_inner();
689 let mut o2m = self.o2m.clone();
690 match o2m.call(req.into()).await? {
691 O2mResponse::Notifications(receiver) => {
692 let stream = BroadcastStream::new(receiver).map(|result| {
694 match result {
695 Ok(destination) => {
696 let consent_request =
698 format!("Consent request for destination: {:?}", destination);
699 Ok(proto::messages::ConsentNotification { consent_request })
700 }
701 Err(e) => {
702 Err(Status::internal(format!("Notification stream error: {}", e)))
703 }
704 }
705 });
706 Ok(Response::new(Box::pin(stream)))
707 }
708 _ => Err(Status::internal("Internal traceability API error")),
709 }
710 }
711
712 async fn o2m_set_consent_decision(
716 &self,
717 request: Request<proto::messages::SetConsentDecisionRequest>,
718 ) -> Result<Response<proto::messages::Ack>, Status> {
719 let req = request.into_inner();
720 let mut o2m = self.o2m.clone();
721 match o2m.call(req.into()).await? {
722 O2mResponse::Ack => Ok(Response::new(proto::messages::Ack {})),
723 _ => Err(Status::internal("Internal traceability API error")),
724 }
725 }
726
727 async fn o2m_get_references(
731 &self,
732 request: Request<proto::messages::GetReferencesRequest>,
733 ) -> Result<Response<proto::messages::GetReferencesResponse>, Status> {
734 let req = request.into_inner();
735 let mut o2m = self.o2m.clone();
736 match o2m.call(req.into()).await? {
737 O2mResponse::References(references) => {
738 let mut grouped: HashMap<String, HashSet<Resource>> = HashMap::default();
740 for lr in references {
741 grouped.entry(lr.node_id().clone()).or_default().insert(lr.resource().clone());
742 }
743 Ok(Response::new(grouped.into()))
744 }
745 _ => Err(Status::internal("Internal traceability API error")),
746 }
747 }
748}
749
750impl From<proto::primitives::Resource> for Resource {
755 fn from(req: proto::primitives::Resource) -> Self {
756 match req.resource {
757 Some(proto::primitives::resource::Resource::Fd(fd)) => Resource::Fd(fd.into()),
758 Some(proto::primitives::resource::Resource::Process(process)) => {
759 Resource::Process(process.into())
760 }
761 None => Resource::None,
762 }
763 }
764}
765
766impl From<Resource> for proto::primitives::Resource {
768 fn from(resource: Resource) -> Self {
769 match resource {
770 Resource::Fd(fd) => proto::primitives::Resource {
771 resource: Some(proto::primitives::resource::Resource::Fd(fd.into())),
772 },
773 Resource::Process(process) => proto::primitives::Resource {
774 resource: Some(proto::primitives::resource::Resource::Process(process.into())),
775 },
776 Resource::None => proto::primitives::Resource { resource: None },
777 }
778 }
779}
780
781impl From<proto::primitives::Fd> for Fd {
782 fn from(proto_fd: proto::primitives::Fd) -> Self {
783 match proto_fd.fd {
784 Some(proto::primitives::fd::Fd::File(file)) => Fd::File(file.into()),
785 Some(proto::primitives::fd::Fd::Stream(stream)) => Fd::Stream(stream.into()),
786 None => Fd::File(File { path: String::new() }),
787 }
788 }
789}
790
791impl From<Fd> for proto::primitives::Fd {
792 fn from(fd: Fd) -> Self {
793 match fd {
794 Fd::File(file) => {
795 proto::primitives::Fd { fd: Some(proto::primitives::fd::Fd::File(file.into())) }
796 }
797 Fd::Stream(stream) => {
798 proto::primitives::Fd { fd: Some(proto::primitives::fd::Fd::Stream(stream.into())) }
799 }
800 }
801 }
802}
803
804impl From<proto::primitives::File> for File {
805 fn from(proto_file: proto::primitives::File) -> Self {
806 File { path: proto_file.path }
807 }
808}
809
810impl From<File> for proto::primitives::File {
811 fn from(file: File) -> Self {
812 proto::primitives::File { path: file.path }
813 }
814}
815
816impl From<proto::primitives::Stream> for Stream {
817 fn from(proto_stream: proto::primitives::Stream) -> Self {
818 Stream { local_socket: proto_stream.local_socket, peer_socket: proto_stream.peer_socket }
819 }
820}
821
822impl From<Stream> for proto::primitives::Stream {
823 fn from(stream: Stream) -> Self {
824 proto::primitives::Stream {
825 local_socket: stream.local_socket,
826 peer_socket: stream.peer_socket,
827 }
828 }
829}
830
831impl From<proto::primitives::Process> for Process {
832 fn from(proto_process: proto::primitives::Process) -> Self {
833 Process {
834 pid: proto_process.pid,
835 starttime: proto_process.starttime,
836 exe_path: proto_process.exe_path,
837 }
838 }
839}
840
841impl From<Process> for proto::primitives::Process {
842 fn from(process: Process) -> Self {
843 proto::primitives::Process {
844 pid: process.pid,
845 starttime: process.starttime,
846 exe_path: process.exe_path,
847 }
848 }
849}
850
851impl From<proto::primitives::LocalizedResource> for LocalizedResource {
855 fn from(proto_lr: proto::primitives::LocalizedResource) -> Self {
856 LocalizedResource::new(
857 proto_lr.node_id,
858 proto_lr.resource.map(|r| r.into()).unwrap_or_default(),
859 )
860 }
861}
862
863impl From<LocalizedResource> for proto::primitives::LocalizedResource {
865 fn from(lr: LocalizedResource) -> Self {
866 proto::primitives::LocalizedResource {
867 node_id: lr.node_id().clone(),
868 resource: Some(lr.resource().clone().into()),
869 }
870 }
871}
872
873impl From<proto::primitives::Destination> for Destination {
875 fn from(proto_dest: proto::primitives::Destination) -> Self {
876 match proto_dest.destination {
877 Some(proto::primitives::destination::Destination::Resource(lr_with_parent)) => {
878 if let Some(proto_lr) = lr_with_parent.resource {
879 let localized_resource: LocalizedResource = proto_lr.into();
880 let parent = lr_with_parent.parent.map(|p| Box::new((*p).into()));
881 Destination::Resource {
883 resource: localized_resource.resource().clone(),
884 parent,
885 }
886 } else {
887 Destination::Resource { resource: Resource::None, parent: None }
888 }
889 }
890 Some(proto::primitives::destination::Destination::Node(node_id)) => {
891 Destination::Node(node_id)
892 }
893 None => Destination::Node(String::new()),
894 }
895 }
896}
897
898impl From<Destination> for proto::primitives::Destination {
901 fn from(dest: Destination) -> Self {
902 destination_to_proto_node_variant(dest)
903 }
904}
905
906pub fn destination_to_proto_node_variant(dest: Destination) -> proto::primitives::Destination {
909 match dest {
910 Destination::Resource { resource, parent } => {
911 let (node_id, proto_parent) = match &parent {
915 Some(p) => {
916 match &(**p) {
917 Destination::Node(node) => {
918 (node.clone(), parent.map(|p| Box::new((*p).clone().into())))
919 }
920 _ => {
921 (String::new(), parent.map(|p| Box::new((*p).clone().into())))
923 }
924 }
925 }
926 None => {
927 (String::new(), None)
929 }
930 };
931
932 proto::primitives::Destination {
933 destination: Some(proto::primitives::destination::Destination::Resource(Box::new(
934 proto::primitives::LocalizedResourceWithParent {
935 resource: Some(LocalizedResource::new(node_id, resource).into()),
936 parent: proto_parent,
937 },
938 ))),
939 }
940 }
941 Destination::Node(node_id) => proto::primitives::Destination {
942 destination: Some(proto::primitives::destination::Destination::Node(node_id)),
943 },
944 }
945}
946
947impl From<Policy> for proto::primitives::Policy {
950 fn from(policy: Policy) -> Self {
951 proto::primitives::Policy {
952 confidentiality: match policy.is_confidential() {
953 false => proto::primitives::Confidentiality::Public as i32,
954 true => proto::primitives::Confidentiality::Secret as i32,
955 },
956 integrity: policy.get_integrity(),
957 deleted: policy.is_deleted(),
958 consent: policy.get_consent(),
959 }
960 }
961}
962
963impl From<proto::primitives::Policy> for Policy {
964 fn from(proto_policy: proto::primitives::Policy) -> Self {
965 Policy::new(
966 match proto_policy.confidentiality {
967 x if x == proto::primitives::Confidentiality::Secret as i32 => {
968 ConfidentialityPolicy::Secret
969 }
970 _ => ConfidentialityPolicy::Public,
971 },
972 proto_policy.integrity,
973 proto_policy.deleted.into(),
974 proto_policy.consent,
975 )
976 }
977}
978
979impl From<proto::primitives::MappedLocalizedPolicy> for (LocalizedResource, Policy) {
983 fn from(policy: proto::primitives::MappedLocalizedPolicy) -> Self {
984 (
985 policy.resource.map(|r| r.into()).unwrap_or_default(),
986 policy.policy.map(|p| p.into()).unwrap_or_default(),
987 )
988 }
989}
990
991impl From<proto::primitives::References> for (String, HashSet<Resource>) {
995 fn from(references: proto::primitives::References) -> Self {
996 (references.node, references.resources.into_iter().map(|r| r.into()).collect())
997 }
998}
999
1000impl From<proto::primitives::References> for HashSet<LocalizedResource> {
1002 fn from(references: proto::primitives::References) -> Self {
1003 references
1004 .resources
1005 .into_iter()
1006 .map(|r| LocalizedResource::new(references.node.clone(), r.into()))
1007 .collect()
1008 }
1009}
1010
1011impl From<(String, HashSet<Resource>)> for proto::primitives::References {
1013 fn from((node, resources): (String, HashSet<Resource>)) -> Self {
1014 proto::primitives::References {
1015 node,
1016 resources: resources.into_iter().map(|r| r.into()).collect(),
1017 }
1018 }
1019}
1020
1021impl From<proto::messages::GetDestinationPolicy> for M2mRequest {
1025 fn from(req: proto::messages::GetDestinationPolicy) -> Self {
1026 M2mRequest::GetDestinationPolicy(req.destination.map(|d| d.into()).unwrap_or_default())
1027 }
1028}
1029
1030impl From<proto::messages::UpdateProvenance> for M2mRequest {
1032 fn from(req: proto::messages::UpdateProvenance) -> Self {
1033 let source_prov: HashSet<LocalizedResource> = req
1034 .source_prov
1035 .into_iter()
1036 .flat_map(|refs: proto::primitives::References| {
1037 let node_id = refs.node.clone();
1038 refs.resources
1039 .into_iter()
1040 .map(move |r| LocalizedResource::new(node_id.clone(), r.into()))
1041 })
1042 .collect();
1043
1044 M2mRequest::UpdateProvenance {
1045 source_prov,
1046 destination: req.destination.map(|d| d.into()).unwrap_or_default(),
1047 }
1048 }
1049}
1050
1051impl From<proto::messages::BroadcastDeletionRequest> for M2mRequest {
1053 fn from(req: proto::messages::BroadcastDeletionRequest) -> Self {
1054 M2mRequest::BroadcastDeletion(req.resource.map(|r| r.into()).unwrap_or_default())
1055 }
1056}
1057
1058impl From<proto::messages::CheckSourceCompliance> for M2mRequest {
1060 fn from(req: proto::messages::CheckSourceCompliance) -> Self {
1061 let sources: HashSet<LocalizedResource> =
1062 req.sources.into_iter().map(|s| s.into()).collect();
1063 let destination: LocalizedResource = req.destination.map(|d| d.into()).unwrap_or_default();
1064 let destination_policy: Policy =
1065 req.destination_policy.map(|p| p.into()).unwrap_or_default();
1066 M2mRequest::CheckSourceCompliance {
1067 sources,
1068 destination: (destination, destination_policy),
1069 }
1070 }
1071}
1072
1073impl From<Policy> for proto::messages::DestinationPolicy {
1075 fn from(policy: Policy) -> Self {
1076 proto::messages::DestinationPolicy { policy: Some(policy.into()) }
1077 }
1078}
1079
1080impl From<proto::messages::GetPoliciesRequest> for O2mRequest {
1084 fn from(req: proto::messages::GetPoliciesRequest) -> Self {
1085 O2mRequest::GetPolicies(req.resources.into_iter().map(|r| r.into()).collect())
1086 }
1087}
1088
1089impl From<proto::messages::SetPolicyRequest> for O2mRequest {
1091 fn from(req: proto::messages::SetPolicyRequest) -> Self {
1092 O2mRequest::SetPolicy {
1093 resource: req.resource.map(|r| r.into()).unwrap_or_default(),
1094 policy: req.policy.map(|p| p.into()).unwrap_or_default(),
1095 }
1096 }
1097}
1098
1099impl From<proto::messages::SetConfidentialityRequest> for O2mRequest {
1101 fn from(req: proto::messages::SetConfidentialityRequest) -> Self {
1102 O2mRequest::SetConfidentiality {
1103 resource: req.resource.map(|r| r.into()).unwrap_or_default(),
1104 confidentiality: match req.confidentiality {
1105 x if x == proto::primitives::Confidentiality::Secret as i32 => {
1106 ConfidentialityPolicy::Secret
1107 }
1108 _ => ConfidentialityPolicy::Public,
1109 },
1110 }
1111 }
1112}
1113
1114impl From<proto::messages::SetIntegrityRequest> for O2mRequest {
1116 fn from(req: proto::messages::SetIntegrityRequest) -> Self {
1117 O2mRequest::SetIntegrity {
1118 resource: req.resource.map(|r| r.into()).unwrap_or_default(),
1119 integrity: req.integrity,
1120 }
1121 }
1122}
1123
1124impl From<proto::messages::SetDeletedRequest> for O2mRequest {
1126 fn from(req: proto::messages::SetDeletedRequest) -> Self {
1127 O2mRequest::SetDeleted(req.resource.map(|r| r.into()).unwrap_or_default())
1128 }
1129}
1130
1131impl From<proto::messages::EnforceConsentRequest> for O2mRequest {
1133 fn from(req: proto::messages::EnforceConsentRequest) -> Self {
1134 O2mRequest::EnforceConsent(req.resource.map(|r| r.into()).unwrap_or_default())
1135 }
1136}
1137
1138impl From<proto::messages::SetConsentDecisionRequest> for O2mRequest {
1140 fn from(req: proto::messages::SetConsentDecisionRequest) -> Self {
1141 O2mRequest::SetConsentDecision {
1142 source: req.source.map(|r| r.into()).unwrap_or_default(),
1143 destination: req
1144 .destination
1145 .map(|d| d.into())
1146 .unwrap_or_else(|| Destination::Node(String::new())),
1147 decision: req.decision,
1148 }
1149 }
1150}
1151
1152impl From<proto::messages::GetReferencesRequest> for O2mRequest {
1154 fn from(req: proto::messages::GetReferencesRequest) -> Self {
1155 O2mRequest::GetReferences(req.resource.map(|r| r.into()).unwrap_or_default())
1156 }
1157}
1158
1159impl From<HashMap<Resource, Policy>> for proto::messages::GetPoliciesResponse {
1163 fn from(policies: HashMap<Resource, Policy>) -> Self {
1164 proto::messages::GetPoliciesResponse {
1165 policies: policies
1166 .into_iter()
1167 .map(|(resource, policy)| proto::primitives::MappedLocalizedPolicy {
1168 resource: Some(LocalizedResource::new(String::new(), resource).into()),
1169 policy: Some(policy.into()),
1170 })
1171 .collect(),
1172 }
1173 }
1174}
1175
1176impl From<HashMap<LocalizedResource, Policy>> for proto::messages::GetPoliciesResponse {
1178 fn from(policies: HashMap<LocalizedResource, Policy>) -> Self {
1179 proto::messages::GetPoliciesResponse {
1180 policies: policies
1181 .into_iter()
1182 .map(|(resource, policy)| proto::primitives::MappedLocalizedPolicy {
1183 resource: Some(resource.into()),
1184 policy: Some(policy.into()),
1185 })
1186 .collect(),
1187 }
1188 }
1189}
1190
1191impl From<HashMap<String, HashSet<Resource>>> for proto::messages::GetReferencesResponse {
1193 fn from(references: HashMap<String, HashSet<Resource>>) -> Self {
1194 proto::messages::GetReferencesResponse {
1195 references: references
1196 .into_iter()
1197 .map(|(node, resources)| (node, resources).into())
1198 .collect(),
1199 }
1200 }
1201}