trace2e_core/traceability/api/
o2m.rs1use std::{future::Future, pin::Pin, task::Poll};
35
36use tower::Service;
37use tracing::info;
38
39use crate::traceability::{
40 api::{
41 M2mRequest, M2mResponse,
42 types::{
43 ComplianceRequest, ComplianceResponse, O2mRequest, O2mResponse, ProvenanceRequest,
44 ProvenanceResponse,
45 },
46 },
47 error::TraceabilityError,
48 infrastructure::naming::{LocalizedResource, NodeId},
49 services::consent::{ConsentRequest, ConsentResponse},
50};
51
52#[derive(Debug, Clone)]
58pub struct O2mApiService<P, C, Consent, M> {
59 provenance: P,
61 compliance: C,
63 consent: Consent,
65 m2m: M,
67}
68
69impl<P, C, Consent, M> O2mApiService<P, C, Consent, M> {
70 pub fn new(provenance: P, compliance: C, consent: Consent, m2m: M) -> Self {
72 Self { provenance, compliance, consent, m2m }
73 }
74}
75
76impl<P, C, Consent, M> Service<O2mRequest> for O2mApiService<P, C, Consent, M>
77where
78 P: Service<ProvenanceRequest, Response = ProvenanceResponse, Error = TraceabilityError>
79 + Clone
80 + Send
81 + NodeId
82 + 'static,
83 P::Future: Send,
84 C: Service<ComplianceRequest, Response = ComplianceResponse, Error = TraceabilityError>
85 + Clone
86 + Send
87 + 'static,
88 C::Future: Send,
89 Consent: Service<ConsentRequest, Response = ConsentResponse, Error = TraceabilityError>
90 + Clone
91 + Send
92 + 'static,
93 Consent::Future: Send,
94 M: Service<M2mRequest, Response = M2mResponse, Error = TraceabilityError>
95 + Clone
96 + Send
97 + 'static,
98 M::Future: Send,
99{
100 type Response = O2mResponse;
101 type Error = TraceabilityError;
102 type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
103
104 fn poll_ready(&mut self, _: &mut std::task::Context<'_>) -> Poll<Result<(), Self::Error>> {
105 Poll::Ready(Ok(()))
106 }
107
108 fn call(&mut self, request: O2mRequest) -> Self::Future {
109 let mut provenance = self.provenance.clone();
110 let mut compliance = self.compliance.clone();
111 let mut consent = self.consent.clone();
112 let mut m2m = self.m2m.clone();
113 Box::pin(async move {
114 match request {
115 O2mRequest::GetPolicies(resources) => {
116 info!(node_id = %provenance.node_id(), resources = ?resources, "[o2m] GetPolicies");
117 match compliance.call(ComplianceRequest::GetPolicies(resources)).await? {
118 ComplianceResponse::Policies(policies) => {
119 Ok(O2mResponse::Policies(policies))
120 }
121 _ => Err(TraceabilityError::InternalTrace2eError),
122 }
123 }
124 O2mRequest::SetPolicy { resource, policy } => {
125 info!(
126 node_id = %provenance.node_id(),
127 resource = %resource,
128 policy = ?policy,
129 "[o2m] SetPolicy"
130 );
131 match compliance.call(ComplianceRequest::SetPolicy { resource, policy }).await?
132 {
133 ComplianceResponse::PolicyUpdated => Ok(O2mResponse::Ack),
134 _ => Err(TraceabilityError::InternalTrace2eError),
135 }
136 }
137 O2mRequest::SetConfidentiality { resource, confidentiality } => {
138 info!(
139 node_id = %provenance.node_id(),
140 resource = %resource,
141 confidentiality = ?confidentiality,
142 "[o2m] SetConfidentiality"
143 );
144 match compliance
145 .call(ComplianceRequest::SetConfidentiality { resource, confidentiality })
146 .await?
147 {
148 ComplianceResponse::PolicyUpdated => Ok(O2mResponse::Ack),
149 _ => Err(TraceabilityError::InternalTrace2eError),
150 }
151 }
152 O2mRequest::SetIntegrity { resource, integrity } => {
153 info!(
154 node_id = %provenance.node_id(),
155 resource = %resource,
156 integrity = ?integrity,
157 "[o2m] SetIntegrity"
158 );
159 match compliance
160 .call(ComplianceRequest::SetIntegrity { resource, integrity })
161 .await?
162 {
163 ComplianceResponse::PolicyUpdated => Ok(O2mResponse::Ack),
164 _ => Err(TraceabilityError::InternalTrace2eError),
165 }
166 }
167 O2mRequest::SetDeleted(resource) => {
168 info!(node_id = %provenance.node_id(), resource = %resource, "[o2m] SetDeleted");
169 match compliance.call(ComplianceRequest::SetDeleted(resource)).await? {
170 ComplianceResponse::PolicyUpdated => Ok(O2mResponse::Ack),
171 _ => Err(TraceabilityError::InternalTrace2eError),
172 }
173 }
174 O2mRequest::BroadcastDeletion(resource) => {
175 info!(
176 node_id = %provenance.node_id(),
177 resource = %resource,
178 "[o2m] BroadcastDeletion"
179 );
180 let localized_resource =
181 LocalizedResource::new(provenance.node_id().clone(), resource);
182 match m2m.call(M2mRequest::BroadcastDeletion(localized_resource)).await? {
183 M2mResponse::Ack => Ok(O2mResponse::Ack),
184 _ => Err(TraceabilityError::InternalTrace2eError),
185 }
186 }
187 O2mRequest::EnforceConsent(resource) => {
188 info!(node_id = %provenance.node_id(), resource = %resource, "[o2m] EnforceConsent");
189 let notifications = match consent
190 .call(ConsentRequest::TakeResourceOwnership(resource.clone()))
191 .await?
192 {
193 ConsentResponse::Notifications(notifications) => notifications,
194 _ => return Err(TraceabilityError::InternalTrace2eError),
195 };
196 match compliance
197 .call(ComplianceRequest::EnforceConsent { resource, consent: true })
198 .await?
199 {
200 ComplianceResponse::PolicyUpdated => {
201 Ok(O2mResponse::Notifications(notifications))
202 }
203 _ => Err(TraceabilityError::InternalTrace2eError),
204 }
205 }
206 O2mRequest::GetReferences(resource) => {
207 info!(node_id = %provenance.node_id(), resource = %resource, "[o2m] GetReferences");
208 match provenance.call(ProvenanceRequest::GetReferences(resource)).await? {
209 ProvenanceResponse::Provenance(references) => {
210 Ok(O2mResponse::References(references))
211 }
212 _ => Err(TraceabilityError::InternalTrace2eError),
213 }
214 }
215 O2mRequest::SetConsentDecision { source, destination, decision } => {
216 info!(
217 "[o2m-{}] SetConsentDecision: source: {}, destination: {:?}, decision: {:?}",
218 provenance.node_id(),
219 source,
220 destination,
221 decision
222 );
223 match consent
224 .call(ConsentRequest::SetConsent { source, destination, consent: decision })
225 .await?
226 {
227 ConsentResponse::Ack => Ok(O2mResponse::Ack),
228 _ => Err(TraceabilityError::InternalTrace2eError),
229 }
230 }
231 }
232 })
233 }
234}