1use std::{
50 collections::{HashMap, HashSet},
51 future::Future,
52 pin::Pin,
53 sync::Arc,
54 task::Poll,
55};
56
57use dashmap::DashMap;
58use futures::future::try_join_all;
59use tokio_stream::{StreamExt, wrappers::BroadcastStream};
60use tonic::{Request, Response, Status, transport::Channel};
61use tower::Service;
62use tracing::info;
63
64pub const DEFAULT_GRPC_PORT: u16 = 50051;
66
67pub mod proto {
69 tonic::include_proto!("trace2e");
70 pub mod primitives {
71 tonic::include_proto!("trace2e.primitives");
72 }
73 pub mod messages {
74 tonic::include_proto!("trace2e.messages");
75 }
76
77 pub const MIDDLEWARE_DESCRIPTOR_SET: &[u8] = include_bytes!("../../trace2e_descriptor.bin");
79}
80
81use crate::{
82 traceability::{
83 api::types::{M2mRequest, M2mResponse, O2mRequest, O2mResponse, P2mRequest, P2mResponse},
84 error::TraceabilityError,
85 infrastructure::naming::{
86 DisplayableResource, Fd, File, LocalizedResource, Process, Resource, Stream,
87 },
88 services::{
89 compliance::{ConfidentialityPolicy, Policy},
90 consent::Destination,
91 },
92 },
93 transport::eval_remote_ip,
94};
95
96impl From<TraceabilityError> for Status {
98 fn from(error: TraceabilityError) -> Self {
99 Status::internal(error.to_string())
100 }
101}
102
103#[derive(Default, Clone)]
121pub struct M2mGrpc {
122 connected_remotes: Arc<DashMap<String, proto::m2m_client::M2mClient<Channel>>>,
124}
125
126impl M2mGrpc {
127 async fn connect_remote(
144 &self,
145 remote_ip: String,
146 ) -> Result<proto::m2m_client::M2mClient<Channel>, TraceabilityError> {
147 match proto::m2m_client::M2mClient::connect(format!(
148 "http://{remote_ip}:{DEFAULT_GRPC_PORT}"
149 ))
150 .await
151 {
152 Ok(client) => {
153 self.connected_remotes.insert(remote_ip, client.clone());
154 Ok(client)
155 }
156 Err(_) => Err(TraceabilityError::TransportFailedToContactRemote(remote_ip)),
157 }
158 }
159
160 async fn get_client(&self, remote_ip: String) -> Option<proto::m2m_client::M2mClient<Channel>> {
170 self.connected_remotes.get(&remote_ip).map(|c| c.to_owned())
171 }
172
173 async fn get_client_or_connect(
191 &self,
192 remote_ip: String,
193 ) -> Result<proto::m2m_client::M2mClient<Channel>, TraceabilityError> {
194 match self.get_client(remote_ip.clone()).await {
195 Some(client) => Ok(client),
196 None => self.connect_remote(remote_ip).await,
197 }
198 }
199}
200
201impl Service<M2mRequest> for M2mGrpc {
202 type Response = M2mResponse;
203 type Error = TraceabilityError;
204 type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
205
206 fn poll_ready(&mut self, _: &mut std::task::Context<'_>) -> Poll<Result<(), Self::Error>> {
207 Poll::Ready(Ok(()))
208 }
209
210 fn call(&mut self, request: M2mRequest) -> Self::Future {
211 let this = self.clone();
212 Box::pin(async move {
213 match request.clone() {
214 M2mRequest::GetDestinationPolicy(destination) => {
215 info!(
216 destination = %destination,
217 "[gRPC-client] GetDestinationPolicy"
218 );
219 let remote_ip = eval_remote_ip(request)?;
220 let mut client = this.get_client_or_connect(remote_ip.clone()).await?;
221
222 let proto_req = proto::messages::GetDestinationPolicy {
224 destination: Some(destination.into()),
225 };
226
227 let response = client
229 .m2m_destination_policy(Request::new(proto_req))
230 .await
231 .map_err(|_| TraceabilityError::TransportFailedToContactRemote(remote_ip))?
232 .into_inner();
233 Ok(M2mResponse::DestinationPolicy(
234 response.policy.map(|policy| policy.into()).unwrap_or_default(),
235 ))
236 }
237 M2mRequest::CheckSourceCompliance { sources, destination } => {
238 info!(
239 sources = %DisplayableResource::from(&sources),
240 destination = %destination.0,
241 destination_policy = ?destination.1,
242 "[gRPC-client] CheckSourceCompliance"
243 );
244 let sources_partition = sources.iter().fold(
246 HashMap::new(),
247 |mut partitions: HashMap<&String, HashSet<&LocalizedResource>>, lr| {
248 partitions.entry(lr.node_id()).or_default().insert(lr);
249 partitions
250 },
251 );
252
253 let futures = sources_partition
254 .into_iter()
255 .map(|(node_id, sources)| {
256 let this_clone = this.clone();
257 let dest_resource = destination.0.clone();
258 let dest_policy = destination.1.clone();
259 async move {
260 let mut client =
261 this_clone.get_client_or_connect(node_id.to_string()).await?;
262 client
263 .m2m_check_source_compliance(Request::new(
264 proto::messages::CheckSourceCompliance {
265 sources: sources
266 .iter()
267 .map(|r| (**r).clone().into())
268 .collect(),
269 destination: Some((dest_resource).into()),
270 destination_policy: Some((dest_policy).into()),
271 },
272 ))
273 .await
274 .map_err(|_| {
275 TraceabilityError::TransportFailedToContactRemote(
276 node_id.to_string(),
277 )
278 })
279 }
280 })
281 .collect::<Vec<_>>();
282
283 try_join_all(futures).await?;
285 Ok(M2mResponse::Ack)
286 }
287 M2mRequest::UpdateProvenance { source_prov, destination } => {
288 info!(
289 source_prov = %DisplayableResource::from(&source_prov),
290 destination = %destination,
291 "[gRPC-client] UpdateProvenance"
292 );
293 let remote_ip = eval_remote_ip(request)?;
294 let mut client = this.get_client_or_connect(remote_ip.clone()).await?;
295
296 let mut grouped: HashMap<String, Vec<proto::primitives::Resource>> =
298 HashMap::default();
299 for lr in source_prov {
300 grouped
301 .entry(lr.node_id().clone())
302 .or_default()
303 .push(lr.resource().clone().into());
304 }
305
306 let source_prov_proto: Vec<proto::primitives::References> = grouped
308 .into_iter()
309 .map(|(node, resources)| proto::primitives::References { node, resources })
310 .collect();
311
312 let proto_req = proto::messages::UpdateProvenance {
314 source_prov: source_prov_proto,
315 destination: Some(destination.into()),
316 };
317
318 client.m2m_update_provenance(Request::new(proto_req)).await.map_err(|_| {
320 TraceabilityError::TransportFailedToContactRemote(remote_ip)
321 })?;
322
323 Ok(M2mResponse::Ack)
324 }
325 M2mRequest::BroadcastDeletion(resource) => {
326 info!(
327 resource = %resource,
328 "[gRPC-client] BroadcastDeletion"
329 );
330 let remote_ip = eval_remote_ip(request)?;
331 let mut client = this.get_client_or_connect(remote_ip.clone()).await?;
332
333 let proto_req = proto::messages::BroadcastDeletionRequest {
335 resource: Some(resource.into()),
336 };
337
338 client.m2m_broadcast_deletion(Request::new(proto_req)).await.map_err(|_| {
340 TraceabilityError::TransportFailedToContactRemote(remote_ip)
341 })?;
342
343 Ok(M2mResponse::Ack)
344 }
345 }
346 })
347 }
348}
349
350pub struct P2mHandler<P2mApi> {
367 p2m: P2mApi,
369}
370
371impl<P2mApi> P2mHandler<P2mApi> {
372 pub fn new(p2m: P2mApi) -> Self {
378 Self { p2m }
379 }
380}
381
382#[tonic::async_trait]
389impl<P2mApi> proto::p2m_server::P2m for P2mHandler<P2mApi>
390where
391 P2mApi: Service<P2mRequest, Response = P2mResponse, Error = TraceabilityError>
392 + Clone
393 + Sync
394 + Send
395 + 'static,
396 P2mApi::Future: Send,
397{
398 async fn p2m_local_enroll(
403 &self,
404 request: Request<proto::messages::LocalCt>,
405 ) -> Result<Response<proto::messages::Ack>, Status> {
406 let req = request.into_inner();
407 let mut p2m = self.p2m.clone();
408 match p2m
409 .call(P2mRequest::LocalEnroll {
410 pid: req.process_id,
411 fd: req.file_descriptor,
412 path: req.path,
413 })
414 .await?
415 {
416 P2mResponse::Ack => Ok(Response::new(proto::messages::Ack {})),
417 _ => Err(Status::internal("Internal traceability API error")),
418 }
419 }
420
421 async fn p2m_remote_enroll(
426 &self,
427 request: Request<proto::messages::RemoteCt>,
428 ) -> Result<Response<proto::messages::Ack>, Status> {
429 let req = request.into_inner();
430 let mut p2m = self.p2m.clone();
431 match p2m
432 .call(P2mRequest::RemoteEnroll {
433 pid: req.process_id,
434 fd: req.file_descriptor,
435 local_socket: req.local_socket,
436 peer_socket: req.peer_socket,
437 })
438 .await?
439 {
440 P2mResponse::Ack => Ok(Response::new(proto::messages::Ack {})),
441 _ => Err(Status::internal("Internal traceability API error")),
442 }
443 }
444
445 async fn p2m_io_request(
450 &self,
451 request: Request<proto::messages::IoInfo>,
452 ) -> Result<Response<proto::messages::Grant>, Status> {
453 let req = request.into_inner();
454 let mut p2m = self.p2m.clone();
455 match p2m
456 .call(P2mRequest::IoRequest {
457 pid: req.process_id,
458 fd: req.file_descriptor,
459 output: req.flow == proto::primitives::Flow::Output as i32,
460 })
461 .await?
462 {
463 P2mResponse::Grant(id) => {
464 Ok(Response::new(proto::messages::Grant { id: id.to_string() }))
465 }
466 _ => Err(Status::internal("Internal traceability API error")),
467 }
468 }
469
470 async fn p2m_io_report(
475 &self,
476 request: Request<proto::messages::IoResult>,
477 ) -> Result<Response<proto::messages::Ack>, Status> {
478 let req = request.into_inner();
479 let mut p2m = self.p2m.clone();
480 match p2m
481 .call(P2mRequest::IoReport {
482 pid: req.process_id,
483 fd: req.file_descriptor,
484 grant_id: req.grant_id.parse::<u128>().unwrap_or_default(),
485 result: req.result,
486 })
487 .await?
488 {
489 P2mResponse::Ack => Ok(Response::new(proto::messages::Ack {})),
490 _ => Err(Status::internal("Internal traceability API error")),
491 }
492 }
493}
494pub struct M2mHandler<M2mApi> {
495 m2m: M2mApi,
497}
498
499impl<M2mApi> M2mHandler<M2mApi> {
500 pub fn new(m2m: M2mApi) -> Self {
506 Self { m2m }
507 }
508}
509
510#[tonic::async_trait]
511impl<M2mApi> proto::m2m_server::M2m for M2mHandler<M2mApi>
512where
513 M2mApi: Service<M2mRequest, Response = M2mResponse, Error = TraceabilityError>
514 + Clone
515 + Sync
516 + Send
517 + 'static,
518 M2mApi::Future: Send,
519{
520 async fn m2m_destination_policy(
525 &self,
526 request: Request<proto::messages::GetDestinationPolicy>,
527 ) -> Result<Response<proto::messages::DestinationPolicy>, Status> {
528 info!("[gRPC-server] m2m_destination_policy");
529 let req = request.into_inner();
530 let mut m2m = self.m2m.clone();
531 match m2m.call(req.into()).await? {
532 M2mResponse::DestinationPolicy(policy) => Ok(Response::new(policy.into())),
533 _ => Err(Status::internal("Internal traceability API error")),
534 }
535 }
536
537 async fn m2m_check_source_compliance(
542 &self,
543 request: Request<proto::messages::CheckSourceCompliance>,
544 ) -> Result<Response<proto::messages::Ack>, Status> {
545 info!("[gRPC-server] m2m_check_source_compliance");
546 let req = request.into_inner();
547 let mut m2m = self.m2m.clone();
548 m2m.call(req.into()).await?;
549 Ok(Response::new(proto::messages::Ack {}))
550 }
551
552 async fn m2m_update_provenance(
558 &self,
559 request: Request<proto::messages::UpdateProvenance>,
560 ) -> Result<Response<proto::messages::Ack>, Status> {
561 info!("[gRPC-server] m2m_update_provenance");
562 let req = request.into_inner();
563 let mut m2m = self.m2m.clone();
564 match m2m.call(req.into()).await? {
565 M2mResponse::Ack => Ok(Response::new(proto::messages::Ack {})),
566 _ => Err(Status::internal("Internal traceability API error")),
567 }
568 }
569
570 async fn m2m_broadcast_deletion(
574 &self,
575 request: Request<proto::messages::BroadcastDeletionRequest>,
576 ) -> Result<Response<proto::messages::Ack>, Status> {
577 info!("[gRPC-server] m2m_broadcast_deletion");
578 let req = request.into_inner();
579 let mut m2m = self.m2m.clone();
580 match m2m.call(req.into()).await? {
581 M2mResponse::Ack => Ok(Response::new(proto::messages::Ack {})),
582 _ => Err(Status::internal("Internal traceability API error")),
583 }
584 }
585}
586
587pub struct O2mHandler<O2mApi> {
597 o2m: O2mApi,
599}
600
601impl<O2mApi> O2mHandler<O2mApi> {
602 pub fn new(o2m: O2mApi) -> Self {
608 Self { o2m }
609 }
610}
611
612#[tonic::async_trait]
617impl<O2mApi> proto::o2m_server::O2m for O2mHandler<O2mApi>
618where
619 O2mApi: Service<O2mRequest, Response = O2mResponse, Error = TraceabilityError>
620 + Clone
621 + Sync
622 + Send
623 + 'static,
624 O2mApi::Future: Send,
625{
626 async fn o2m_get_policies(
630 &self,
631 request: Request<proto::messages::GetPoliciesRequest>,
632 ) -> Result<Response<proto::messages::GetPoliciesResponse>, Status> {
633 let req = request.into_inner();
634 let mut o2m = self.o2m.clone();
635 match o2m.call(req.into()).await? {
636 O2mResponse::Policies(policies) => Ok(Response::new(policies.into())),
637 _ => Err(Status::internal("Internal traceability API error")),
638 }
639 }
640
641 async fn o2m_set_policy(
645 &self,
646 request: Request<proto::messages::SetPolicyRequest>,
647 ) -> Result<Response<proto::messages::Ack>, Status> {
648 let req = request.into_inner();
649 let mut o2m = self.o2m.clone();
650 match o2m.call(req.into()).await? {
651 O2mResponse::Ack => Ok(Response::new(proto::messages::Ack {})),
652 _ => Err(Status::internal("Internal traceability API error")),
653 }
654 }
655
656 async fn o2m_set_confidentiality(
660 &self,
661 request: Request<proto::messages::SetConfidentialityRequest>,
662 ) -> Result<Response<proto::messages::Ack>, Status> {
663 let req = request.into_inner();
664 let mut o2m = self.o2m.clone();
665 match o2m.call(req.into()).await? {
666 O2mResponse::Ack => Ok(Response::new(proto::messages::Ack {})),
667 _ => Err(Status::internal("Internal traceability API error")),
668 }
669 }
670
671 async fn o2m_set_integrity(
675 &self,
676 request: Request<proto::messages::SetIntegrityRequest>,
677 ) -> Result<Response<proto::messages::Ack>, Status> {
678 let req = request.into_inner();
679 let mut o2m = self.o2m.clone();
680 match o2m.call(req.into()).await? {
681 O2mResponse::Ack => Ok(Response::new(proto::messages::Ack {})),
682 _ => Err(Status::internal("Internal traceability API error")),
683 }
684 }
685
686 async fn o2m_set_deleted(
690 &self,
691 request: Request<proto::messages::SetDeletedRequest>,
692 ) -> Result<Response<proto::messages::Ack>, Status> {
693 let req = request.into_inner();
694 let mut o2m = self.o2m.clone();
695 match o2m.call(req.into()).await? {
696 O2mResponse::Ack => Ok(Response::new(proto::messages::Ack {})),
697 _ => Err(Status::internal("Internal traceability API error")),
698 }
699 }
700
701 type O2MEnforceConsentStream = Pin<
702 Box<
703 dyn tokio_stream::Stream<Item = Result<proto::messages::ConsentNotification, Status>>
704 + Send,
705 >,
706 >;
707
708 async fn o2m_enforce_consent(
713 &self,
714 request: Request<proto::messages::EnforceConsentRequest>,
715 ) -> Result<Response<Self::O2MEnforceConsentStream>, Status> {
716 let req = request.into_inner();
717 let mut o2m = self.o2m.clone();
718 match o2m.call(req.into()).await? {
719 O2mResponse::Notifications(receiver) => {
720 let stream = BroadcastStream::new(receiver).map(|result| {
722 match result {
723 Ok(destination) => {
724 let consent_request =
726 format!("Consent request for destination: {:?}", destination);
727 Ok(proto::messages::ConsentNotification { consent_request })
728 }
729 Err(e) => {
730 Err(Status::internal(format!("Notification stream error: {}", e)))
731 }
732 }
733 });
734 Ok(Response::new(Box::pin(stream)))
735 }
736 _ => Err(Status::internal("Internal traceability API error")),
737 }
738 }
739
740 async fn o2m_set_consent_decision(
744 &self,
745 request: Request<proto::messages::SetConsentDecisionRequest>,
746 ) -> Result<Response<proto::messages::Ack>, Status> {
747 let req = request.into_inner();
748 let mut o2m = self.o2m.clone();
749 match o2m.call(req.into()).await? {
750 O2mResponse::Ack => Ok(Response::new(proto::messages::Ack {})),
751 _ => Err(Status::internal("Internal traceability API error")),
752 }
753 }
754
755 async fn o2m_get_references(
759 &self,
760 request: Request<proto::messages::GetReferencesRequest>,
761 ) -> Result<Response<proto::messages::GetReferencesResponse>, Status> {
762 let req = request.into_inner();
763 let mut o2m = self.o2m.clone();
764 match o2m.call(req.into()).await? {
765 O2mResponse::References(references) => {
766 let mut grouped: HashMap<String, HashSet<Resource>> = HashMap::default();
768 for lr in references {
769 grouped.entry(lr.node_id().clone()).or_default().insert(lr.resource().clone());
770 }
771 Ok(Response::new(grouped.into()))
772 }
773 _ => Err(Status::internal("Internal traceability API error")),
774 }
775 }
776}
777
778impl From<proto::primitives::Resource> for Resource {
783 fn from(req: proto::primitives::Resource) -> Self {
784 match req.resource {
785 Some(proto::primitives::resource::Resource::Fd(fd)) => Resource::Fd(fd.into()),
786 Some(proto::primitives::resource::Resource::Process(process)) => {
787 Resource::Process(process.into())
788 }
789 None => Resource::None,
790 }
791 }
792}
793
794impl From<Resource> for proto::primitives::Resource {
796 fn from(resource: Resource) -> Self {
797 match resource {
798 Resource::Fd(fd) => proto::primitives::Resource {
799 resource: Some(proto::primitives::resource::Resource::Fd(fd.into())),
800 },
801 Resource::Process(process) => proto::primitives::Resource {
802 resource: Some(proto::primitives::resource::Resource::Process(process.into())),
803 },
804 Resource::None => proto::primitives::Resource { resource: None },
805 }
806 }
807}
808
809impl From<proto::primitives::Fd> for Fd {
810 fn from(proto_fd: proto::primitives::Fd) -> Self {
811 match proto_fd.fd {
812 Some(proto::primitives::fd::Fd::File(file)) => Fd::File(file.into()),
813 Some(proto::primitives::fd::Fd::Stream(stream)) => Fd::Stream(stream.into()),
814 None => Fd::File(File { path: String::new() }),
815 }
816 }
817}
818
819impl From<Fd> for proto::primitives::Fd {
820 fn from(fd: Fd) -> Self {
821 match fd {
822 Fd::File(file) => {
823 proto::primitives::Fd { fd: Some(proto::primitives::fd::Fd::File(file.into())) }
824 }
825 Fd::Stream(stream) => {
826 proto::primitives::Fd { fd: Some(proto::primitives::fd::Fd::Stream(stream.into())) }
827 }
828 }
829 }
830}
831
832impl From<proto::primitives::File> for File {
833 fn from(proto_file: proto::primitives::File) -> Self {
834 File { path: proto_file.path }
835 }
836}
837
838impl From<File> for proto::primitives::File {
839 fn from(file: File) -> Self {
840 proto::primitives::File { path: file.path }
841 }
842}
843
844impl From<proto::primitives::Stream> for Stream {
845 fn from(proto_stream: proto::primitives::Stream) -> Self {
846 Stream { local_socket: proto_stream.local_socket, peer_socket: proto_stream.peer_socket }
847 }
848}
849
850impl From<Stream> for proto::primitives::Stream {
851 fn from(stream: Stream) -> Self {
852 proto::primitives::Stream {
853 local_socket: stream.local_socket,
854 peer_socket: stream.peer_socket,
855 }
856 }
857}
858
859impl From<proto::primitives::Process> for Process {
860 fn from(proto_process: proto::primitives::Process) -> Self {
861 Process {
862 pid: proto_process.pid,
863 starttime: proto_process.starttime,
864 exe_path: proto_process.exe_path,
865 }
866 }
867}
868
869impl From<Process> for proto::primitives::Process {
870 fn from(process: Process) -> Self {
871 proto::primitives::Process {
872 pid: process.pid,
873 starttime: process.starttime,
874 exe_path: process.exe_path,
875 }
876 }
877}
878
879impl From<proto::primitives::LocalizedResource> for LocalizedResource {
883 fn from(proto_lr: proto::primitives::LocalizedResource) -> Self {
884 LocalizedResource::new(
885 proto_lr.node_id,
886 proto_lr.resource.map(|r| r.into()).unwrap_or_default(),
887 )
888 }
889}
890
891impl From<LocalizedResource> for proto::primitives::LocalizedResource {
893 fn from(lr: LocalizedResource) -> Self {
894 proto::primitives::LocalizedResource {
895 node_id: lr.node_id().clone(),
896 resource: Some(lr.resource().clone().into()),
897 }
898 }
899}
900
901impl From<proto::primitives::Destination> for Destination {
903 fn from(proto_dest: proto::primitives::Destination) -> Self {
904 match proto_dest.destination {
905 Some(proto::primitives::destination::Destination::Resource(lr_with_parent)) => {
906 if let Some(proto_lr) = lr_with_parent.resource {
907 let localized_resource: LocalizedResource = proto_lr.into();
908 let parent = lr_with_parent.parent.map(|p| Box::new((*p).into()));
909 Destination::Resource {
911 resource: localized_resource.resource().clone(),
912 parent,
913 }
914 } else {
915 Destination::Resource { resource: Resource::None, parent: None }
916 }
917 }
918 Some(proto::primitives::destination::Destination::Node(node_id)) => {
919 Destination::Node(node_id)
920 }
921 None => Destination::Node(String::new()),
922 }
923 }
924}
925
926impl From<Destination> for proto::primitives::Destination {
929 fn from(dest: Destination) -> Self {
930 destination_to_proto_node_variant(dest)
931 }
932}
933
934pub fn destination_to_proto_node_variant(dest: Destination) -> proto::primitives::Destination {
937 match dest {
938 Destination::Resource { resource, parent } => {
939 let (node_id, proto_parent) = match &parent {
943 Some(p) => {
944 match &(**p) {
945 Destination::Node(node) => {
946 (node.clone(), parent.map(|p| Box::new((*p).clone().into())))
947 }
948 _ => {
949 (String::new(), parent.map(|p| Box::new((*p).clone().into())))
951 }
952 }
953 }
954 None => {
955 (String::new(), None)
957 }
958 };
959
960 proto::primitives::Destination {
961 destination: Some(proto::primitives::destination::Destination::Resource(Box::new(
962 proto::primitives::LocalizedResourceWithParent {
963 resource: Some(LocalizedResource::new(node_id, resource).into()),
964 parent: proto_parent,
965 },
966 ))),
967 }
968 }
969 Destination::Node(node_id) => proto::primitives::Destination {
970 destination: Some(proto::primitives::destination::Destination::Node(node_id)),
971 },
972 }
973}
974
975impl From<Policy> for proto::primitives::Policy {
978 fn from(policy: Policy) -> Self {
979 proto::primitives::Policy {
980 confidentiality: match policy.is_confidential() {
981 false => proto::primitives::Confidentiality::Public as i32,
982 true => proto::primitives::Confidentiality::Secret as i32,
983 },
984 integrity: policy.get_integrity(),
985 deleted: policy.is_deleted(),
986 consent: policy.get_consent(),
987 }
988 }
989}
990
991impl From<proto::primitives::Policy> for Policy {
992 fn from(proto_policy: proto::primitives::Policy) -> Self {
993 Policy::new(
994 match proto_policy.confidentiality {
995 x if x == proto::primitives::Confidentiality::Secret as i32 => {
996 ConfidentialityPolicy::Secret
997 }
998 _ => ConfidentialityPolicy::Public,
999 },
1000 proto_policy.integrity,
1001 proto_policy.deleted.into(),
1002 proto_policy.consent,
1003 )
1004 }
1005}
1006
1007impl From<proto::primitives::MappedLocalizedPolicy> for (LocalizedResource, Policy) {
1011 fn from(policy: proto::primitives::MappedLocalizedPolicy) -> Self {
1012 (
1013 policy.resource.map(|r| r.into()).unwrap_or_default(),
1014 policy.policy.map(|p| p.into()).unwrap_or_default(),
1015 )
1016 }
1017}
1018
1019impl From<proto::primitives::References> for (String, HashSet<Resource>) {
1023 fn from(references: proto::primitives::References) -> Self {
1024 (references.node, references.resources.into_iter().map(|r| r.into()).collect())
1025 }
1026}
1027
1028impl From<proto::primitives::References> for HashSet<LocalizedResource> {
1030 fn from(references: proto::primitives::References) -> Self {
1031 references
1032 .resources
1033 .into_iter()
1034 .map(|r| LocalizedResource::new(references.node.clone(), r.into()))
1035 .collect()
1036 }
1037}
1038
1039impl From<(String, HashSet<Resource>)> for proto::primitives::References {
1041 fn from((node, resources): (String, HashSet<Resource>)) -> Self {
1042 proto::primitives::References {
1043 node,
1044 resources: resources.into_iter().map(|r| r.into()).collect(),
1045 }
1046 }
1047}
1048
1049impl From<proto::messages::GetDestinationPolicy> for M2mRequest {
1053 fn from(req: proto::messages::GetDestinationPolicy) -> Self {
1054 M2mRequest::GetDestinationPolicy(req.destination.map(|d| d.into()).unwrap_or_default())
1055 }
1056}
1057
1058impl From<proto::messages::UpdateProvenance> for M2mRequest {
1060 fn from(req: proto::messages::UpdateProvenance) -> Self {
1061 let source_prov: HashSet<LocalizedResource> = req
1062 .source_prov
1063 .into_iter()
1064 .flat_map(|refs: proto::primitives::References| {
1065 let node_id = refs.node.clone();
1066 refs.resources
1067 .into_iter()
1068 .map(move |r| LocalizedResource::new(node_id.clone(), r.into()))
1069 })
1070 .collect();
1071
1072 M2mRequest::UpdateProvenance {
1073 source_prov,
1074 destination: req.destination.map(|d| d.into()).unwrap_or_default(),
1075 }
1076 }
1077}
1078
1079impl From<proto::messages::BroadcastDeletionRequest> for M2mRequest {
1081 fn from(req: proto::messages::BroadcastDeletionRequest) -> Self {
1082 M2mRequest::BroadcastDeletion(req.resource.map(|r| r.into()).unwrap_or_default())
1083 }
1084}
1085
1086impl From<proto::messages::CheckSourceCompliance> for M2mRequest {
1088 fn from(req: proto::messages::CheckSourceCompliance) -> Self {
1089 let sources: HashSet<LocalizedResource> =
1090 req.sources.into_iter().map(|s| s.into()).collect();
1091 let destination: LocalizedResource = req.destination.map(|d| d.into()).unwrap_or_default();
1092 let destination_policy: Policy =
1093 req.destination_policy.map(|p| p.into()).unwrap_or_default();
1094 M2mRequest::CheckSourceCompliance {
1095 sources,
1096 destination: (destination, destination_policy),
1097 }
1098 }
1099}
1100
1101impl From<Policy> for proto::messages::DestinationPolicy {
1103 fn from(policy: Policy) -> Self {
1104 proto::messages::DestinationPolicy { policy: Some(policy.into()) }
1105 }
1106}
1107
1108impl From<proto::messages::GetPoliciesRequest> for O2mRequest {
1112 fn from(req: proto::messages::GetPoliciesRequest) -> Self {
1113 O2mRequest::GetPolicies(req.resources.into_iter().map(|r| r.into()).collect())
1114 }
1115}
1116
1117impl From<proto::messages::SetPolicyRequest> for O2mRequest {
1119 fn from(req: proto::messages::SetPolicyRequest) -> Self {
1120 O2mRequest::SetPolicy {
1121 resource: req.resource.map(|r| r.into()).unwrap_or_default(),
1122 policy: req.policy.map(|p| p.into()).unwrap_or_default(),
1123 }
1124 }
1125}
1126
1127impl From<proto::messages::SetConfidentialityRequest> for O2mRequest {
1129 fn from(req: proto::messages::SetConfidentialityRequest) -> Self {
1130 O2mRequest::SetConfidentiality {
1131 resource: req.resource.map(|r| r.into()).unwrap_or_default(),
1132 confidentiality: match req.confidentiality {
1133 x if x == proto::primitives::Confidentiality::Secret as i32 => {
1134 ConfidentialityPolicy::Secret
1135 }
1136 _ => ConfidentialityPolicy::Public,
1137 },
1138 }
1139 }
1140}
1141
1142impl From<proto::messages::SetIntegrityRequest> for O2mRequest {
1144 fn from(req: proto::messages::SetIntegrityRequest) -> Self {
1145 O2mRequest::SetIntegrity {
1146 resource: req.resource.map(|r| r.into()).unwrap_or_default(),
1147 integrity: req.integrity,
1148 }
1149 }
1150}
1151
1152impl From<proto::messages::SetDeletedRequest> for O2mRequest {
1154 fn from(req: proto::messages::SetDeletedRequest) -> Self {
1155 O2mRequest::SetDeleted(req.resource.map(|r| r.into()).unwrap_or_default())
1156 }
1157}
1158
1159impl From<proto::messages::EnforceConsentRequest> for O2mRequest {
1161 fn from(req: proto::messages::EnforceConsentRequest) -> Self {
1162 O2mRequest::EnforceConsent(req.resource.map(|r| r.into()).unwrap_or_default())
1163 }
1164}
1165
1166impl From<proto::messages::SetConsentDecisionRequest> for O2mRequest {
1168 fn from(req: proto::messages::SetConsentDecisionRequest) -> Self {
1169 O2mRequest::SetConsentDecision {
1170 source: req.source.map(|r| r.into()).unwrap_or_default(),
1171 destination: req
1172 .destination
1173 .map(|d| d.into())
1174 .unwrap_or_else(|| Destination::Node(String::new())),
1175 decision: req.decision,
1176 }
1177 }
1178}
1179
1180impl From<proto::messages::GetReferencesRequest> for O2mRequest {
1182 fn from(req: proto::messages::GetReferencesRequest) -> Self {
1183 O2mRequest::GetReferences(req.resource.map(|r| r.into()).unwrap_or_default())
1184 }
1185}
1186
1187impl From<HashMap<Resource, Policy>> for proto::messages::GetPoliciesResponse {
1191 fn from(policies: HashMap<Resource, Policy>) -> Self {
1192 proto::messages::GetPoliciesResponse {
1193 policies: policies
1194 .into_iter()
1195 .map(|(resource, policy)| proto::primitives::MappedLocalizedPolicy {
1196 resource: Some(LocalizedResource::new(String::new(), resource).into()),
1197 policy: Some(policy.into()),
1198 })
1199 .collect(),
1200 }
1201 }
1202}
1203
1204impl From<HashMap<LocalizedResource, Policy>> for proto::messages::GetPoliciesResponse {
1206 fn from(policies: HashMap<LocalizedResource, Policy>) -> Self {
1207 proto::messages::GetPoliciesResponse {
1208 policies: policies
1209 .into_iter()
1210 .map(|(resource, policy)| proto::primitives::MappedLocalizedPolicy {
1211 resource: Some(resource.into()),
1212 policy: Some(policy.into()),
1213 })
1214 .collect(),
1215 }
1216 }
1217}
1218
1219impl From<HashMap<String, HashSet<Resource>>> for proto::messages::GetReferencesResponse {
1221 fn from(references: HashMap<String, HashSet<Resource>>) -> Self {
1222 proto::messages::GetReferencesResponse {
1223 references: references
1224 .into_iter()
1225 .map(|(node, resources)| (node, resources).into())
1226 .collect(),
1227 }
1228 }
1229}