1use std::{future::Future, pin::Pin, sync::Arc, task::Poll};
3
4use dashmap::{DashMap, Entry};
5use tokio::{sync::broadcast, time::Duration};
6use tower::Service;
7use tracing::info;
8
9use crate::traceability::{
10 error::TraceabilityError,
11 infrastructure::naming::{LocalizedResource, Resource},
12};
13
14#[derive(Debug, PartialEq)]
18pub enum ConsentRequest {
19 RequestConsent {
23 source: Resource,
25 destination: Destination,
27 },
28 TakeResourceOwnership(Resource),
33 SetConsent {
37 source: Resource,
39 destination: Destination,
41 consent: bool,
43 },
44}
45
46#[derive(Debug)]
50pub enum ConsentResponse {
51 Consent(bool),
53 Ack,
55 Notifications(broadcast::Receiver<Destination>),
57}
58
59#[derive(Debug, Clone, Eq, PartialEq, Hash)]
68pub enum Destination {
69 Resource { resource: Resource, parent: Option<Box<Self>> },
71 Node(String),
73}
74
75impl From<LocalizedResource> for Destination {
76 fn from(localized_resource: LocalizedResource) -> Self {
77 Self::Resource {
78 resource: localized_resource.resource().to_owned(),
79 parent: Some(Box::new(Self::Node(localized_resource.node_id().to_owned()))),
80 }
81 }
82}
83
84impl TryFrom<&str> for Destination {
90 type Error = String;
91
92 fn try_from(s: &str) -> Result<Destination, Self::Error> {
93 let s = s.trim();
94
95 if s.contains('@')
97 && let Ok(lr) = LocalizedResource::try_from(s)
98 {
99 return Ok(Destination::Resource {
100 resource: lr.resource().clone(),
101 parent: Some(Box::new(Destination::Node(lr.node_id().clone()))),
102 });
103 }
104
105 if s.contains("://")
107 && let Ok(resource) = Resource::try_from(s)
108 {
109 return Ok(Destination::Resource { resource, parent: None });
110 }
111
112 if !s.contains(char::is_whitespace) && !s.is_empty() {
114 return Ok(Destination::Node(s.to_string()));
115 }
116
117 Err(format!(
118 "Failed to parse destination: '{}'. Expected one of: node_id, resource (file:// or stream://), or localized_resource (node_id@resource)",
119 s
120 ))
121 }
122}
123
124impl TryFrom<String> for Destination {
125 type Error = String;
126
127 fn try_from(s: String) -> Result<Destination, Self::Error> {
128 Destination::try_from(s.as_str())
129 }
130}
131
132impl Destination {
133 pub fn new(node_id: Option<String>, resource: Option<Resource>) -> Self {
135 match (node_id, resource) {
136 (Some(node_id), Some(resource)) => {
137 Self::Resource { resource, parent: Some(Box::new(Self::Node(node_id))) }
138 }
139 (None, Some(resource)) => Self::Resource { resource, parent: None },
140 (Some(node_id), None) => Self::Node(node_id),
141 (None, None) => panic!("Cannot create Destination with no node_id and no resource"),
142 }
143 }
144
145 fn hierarchy(&self) -> impl Iterator<Item = &Destination> {
153 std::iter::successors(Some(self), |dest| match dest {
154 Destination::Resource { parent, .. } => parent.as_deref(),
155 Destination::Node(_) => None,
156 })
157 }
158}
159#[derive(Debug, Clone, Eq, PartialEq, Hash)]
160struct ConsentKey(Resource, Destination);
161
162#[derive(Default, Debug, Clone)]
163pub struct ConsentService {
164 timeout: u64,
165 states: Arc<DashMap<ConsentKey, bool>>,
167 notifications_channels: Arc<DashMap<Resource, broadcast::Sender<Destination>>>,
169 decision_channels: Arc<DashMap<ConsentKey, broadcast::Sender<bool>>>,
171}
172
173impl ConsentService {
174 pub fn new(timeout_ms: u64) -> Self {
178 Self {
179 timeout: timeout_ms,
180 states: Arc::new(DashMap::new()),
181 notifications_channels: Arc::new(DashMap::new()),
182 decision_channels: Arc::new(DashMap::new()),
183 }
184 }
185
186 fn check_consent_hierarchy(
191 &self,
192 source: &Resource,
193 destination: &Destination,
194 ) -> Option<bool> {
195 destination.hierarchy().find_map(|dest| {
196 let key = ConsentKey(source.clone(), dest.clone());
197 self.states.get(&key).map(|v| *v)
198 })
199 }
200
201 fn get_or_create_decision_channel(&self, key: &ConsentKey) -> broadcast::Receiver<bool> {
206 let sender = self.decision_channels.get(key).map(|feed| feed.clone());
207
208 if let Some(sender) = sender {
209 sender.subscribe()
210 } else {
211 let (tx, rx) = broadcast::channel::<bool>(100);
212 self.decision_channels.insert(key.clone(), tx);
213 rx
214 }
215 }
216
217 async fn get_consent(
225 &self,
226 source: Resource,
227 destination: Destination,
228 ) -> Result<bool, TraceabilityError> {
229 if let Some(consent) = self.check_consent_hierarchy(&source, &destination) {
231 return Ok(consent);
232 }
233
234 let resource_key = ConsentKey(source.clone(), destination.clone());
236
237 let node_key = match &destination {
239 Destination::Resource { parent: Some(parent), .. } => {
240 Some(ConsentKey(source.clone(), (**parent).clone()))
241 }
242 _ => None,
243 };
244
245 let notif_sender = match self.notifications_channels.get(&resource_key.0) {
247 Some(sender_ref) => sender_ref.clone(),
248 None => {
249 return Ok(false);
252 }
253 };
254
255 let mut resource_rx = self.get_or_create_decision_channel(&resource_key);
257
258 let node_rx = node_key.as_ref().map(|key| self.get_or_create_decision_channel(key));
260
261 notif_sender.send(destination).map_err(|_| TraceabilityError::InternalTrace2eError)?;
263
264 if let Some(mut node_rx) = node_rx {
266 if self.timeout > 0 {
267 tokio::time::timeout(Duration::from_millis(self.timeout), async {
268 tokio::select! {
269 result = resource_rx.recv() => result,
270 result = node_rx.recv() => result,
271 }
272 })
273 .await
274 .map_err(|_| TraceabilityError::ConsentRequestTimeout)?
275 .map_err(|_| TraceabilityError::InternalTrace2eError)
276 } else {
277 tokio::select! {
278 result = resource_rx.recv() => result,
279 result = node_rx.recv() => result,
280 }
281 .map_err(|_| TraceabilityError::InternalTrace2eError)
282 }
283 } else {
284 if self.timeout > 0 {
286 tokio::time::timeout(Duration::from_millis(self.timeout), resource_rx.recv())
287 .await
288 .map_err(|_| TraceabilityError::ConsentRequestTimeout)?
289 .map_err(|_| TraceabilityError::InternalTrace2eError)
290 } else {
291 resource_rx.recv().await.map_err(|_| TraceabilityError::InternalTrace2eError)
292 }
293 }
294 }
295
296 fn set_consent(&self, source: Resource, destination: Destination, consent: bool) {
302 let key = ConsentKey(source, destination);
303 self.states.insert(key.clone(), consent);
305
306 if let Some((_, decision_feed)) = self.decision_channels.remove(&key) {
309 let _ = decision_feed.send(consent);
310 }
311 }
312
313 fn take_resource_ownership(&self, resource: Resource) -> broadcast::Receiver<Destination> {
315 match self.notifications_channels.entry(resource.clone()) {
316 Entry::Occupied(entry) => entry.get().subscribe(),
317 Entry::Vacant(entry) => {
318 let (tx, rx) = broadcast::channel(100);
319 entry.insert(tx);
320 rx
321 }
322 }
323 }
324}
325
326impl Service<ConsentRequest> for ConsentService {
327 type Response = ConsentResponse;
328 type Error = TraceabilityError;
329 type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
330
331 fn poll_ready(&mut self, _: &mut std::task::Context<'_>) -> Poll<Result<(), Self::Error>> {
332 Poll::Ready(Ok(()))
333 }
334
335 fn call(&mut self, request: ConsentRequest) -> Self::Future {
336 let this = self.clone();
337 Box::pin(async move {
338 match request {
339 ConsentRequest::RequestConsent { source, destination } => {
340 info!(source = %source, destination = ?destination, "[consent] RequestConsent");
341 this.get_consent(source, destination).await.map(ConsentResponse::Consent)
342 }
343 ConsentRequest::SetConsent { source, destination, consent } => {
344 info!(consent = %consent, source = %source, destination = ?destination, "[consent] SetConsent");
345 this.set_consent(source, destination, consent);
346 Ok(ConsentResponse::Ack)
347 }
348 ConsentRequest::TakeResourceOwnership(resource) => {
349 info!(resource = %resource, "[consent] TakeResourceOwnership");
350 Ok(ConsentResponse::Notifications(this.take_resource_ownership(resource)))
351 }
352 }
353 })
354 }
355}
356
357#[cfg(test)]
358mod tests {
359 use super::*;
360
361 #[tokio::test]
362 async fn test_consent_service_no_ownership() {
363 crate::trace2e_tracing::init();
364 let mut consent_service = ConsentService::new(0);
365 let resource = Resource::new_process_mock(0);
366 let destination = Resource::new_file("/tmp/test.txt".to_string());
367 let request = ConsentRequest::RequestConsent {
368 source: resource.clone(),
369 destination: Destination::new(None, Some(destination.clone())),
370 };
371 let response = consent_service.call(request).await.unwrap();
372 assert!(matches!(response, ConsentResponse::Consent(false)));
374 }
375
376 #[tokio::test]
377 async fn test_consent_service_with_ownership_with_decision_on_notification() {
378 crate::trace2e_tracing::init();
379 let mut consent_service = ConsentService::new(0);
380 let resource = Resource::new_process_mock(0);
381 let destination =
382 Destination::new(None, Some(Resource::new_file("/tmp/test.txt".to_string())));
383 let request = ConsentRequest::TakeResourceOwnership(resource.clone());
384 let ownership_response = consent_service.call(request).await.unwrap();
385 let ConsentResponse::Notifications(mut notifications_feed) = ownership_response else {
386 panic!("Expected Notifications");
387 };
388 let resource_clone = resource.clone();
389 let destination_clone = destination.clone();
390 let mut consent_service_clone = consent_service.clone();
391 tokio::task::spawn(async move {
392 assert!(matches!(
393 consent_service_clone
394 .call(ConsentRequest::RequestConsent {
395 source: resource_clone,
396 destination: destination_clone,
397 })
398 .await
399 .unwrap(),
400 ConsentResponse::Consent(true)
401 ));
402 });
403
404 assert_eq!(notifications_feed.recv().await.unwrap(), destination.clone());
405 consent_service
406 .call(ConsentRequest::SetConsent { source: resource, destination, consent: true })
407 .await
408 .unwrap();
409 }
410
411 #[tokio::test]
412 async fn test_consent_service_with_ownership_with_decision_timeout() {
413 crate::trace2e_tracing::init();
414 let mut consent_service = ConsentService::new(1);
415 let resource = Resource::new_process_mock(0);
416 let destination =
417 Destination::new(None, Some(Resource::new_file("/tmp/test.txt".to_string())));
418 let request = ConsentRequest::TakeResourceOwnership(resource.clone());
419 let ownership_response = consent_service.call(request).await.unwrap();
420 let ConsentResponse::Notifications(mut notifications_feed) = ownership_response else {
421 panic!("Expected Notifications");
422 };
423
424 let resource_clone = resource.clone();
426 let destination_clone = destination.clone();
427 let mut consent_service_clone = consent_service.clone();
428 tokio::task::spawn(async move {
429 assert!(matches!(
430 consent_service_clone
431 .call(ConsentRequest::RequestConsent {
432 source: resource_clone,
433 destination: destination_clone,
434 })
435 .await
436 .unwrap_err(),
437 TraceabilityError::ConsentRequestTimeout
438 ));
439 });
440
441 tokio::time::sleep(Duration::from_millis(2)).await;
442 assert_eq!(notifications_feed.recv().await.unwrap(), destination.clone());
443 consent_service
444 .call(ConsentRequest::SetConsent {
445 source: resource.clone(),
446 destination: destination.clone(),
447 consent: true,
448 })
449 .await
450 .unwrap();
451
452 assert!(matches!(
454 consent_service
455 .call(ConsentRequest::RequestConsent { source: resource, destination })
456 .await
457 .unwrap(),
458 ConsentResponse::Consent(true)
459 ));
460 }
461
462 #[tokio::test]
463 async fn test_hierarchical_consent_resource_overrides_node() {
464 crate::trace2e_tracing::init();
465 let mut consent_service = ConsentService::new(0);
466 let source = Resource::new_process_mock(0);
467 let node_id = "node1".to_string();
468 let resource = Resource::new_file("/tmp/test.txt".to_string());
469
470 consent_service
472 .call(ConsentRequest::SetConsent {
473 source: source.clone(),
474 destination: Destination::Node(node_id.clone()),
475 consent: false,
476 })
477 .await
478 .unwrap();
479
480 consent_service
482 .call(ConsentRequest::SetConsent {
483 source: source.clone(),
484 destination: Destination::new(Some(node_id.clone()), Some(resource.clone())),
485 consent: true,
486 })
487 .await
488 .unwrap();
489
490 let response = consent_service
492 .call(ConsentRequest::RequestConsent {
493 source: source.clone(),
494 destination: Destination::new(Some(node_id), Some(resource)),
495 })
496 .await
497 .unwrap();
498
499 assert!(matches!(response, ConsentResponse::Consent(true)));
500 }
501
502 #[tokio::test]
503 async fn test_hierarchical_consent_node_level_fallback() {
504 crate::trace2e_tracing::init();
505 let mut consent_service = ConsentService::new(0);
506 let source = Resource::new_process_mock(0);
507 let node_id = "node1".to_string();
508 let resource = Resource::new_file("/tmp/test.txt".to_string());
509
510 consent_service
512 .call(ConsentRequest::SetConsent {
513 source: source.clone(),
514 destination: Destination::Node(node_id.clone()),
515 consent: true,
516 })
517 .await
518 .unwrap();
519
520 let response = consent_service
522 .call(ConsentRequest::RequestConsent {
523 source: source.clone(),
524 destination: Destination::new(Some(node_id), Some(resource)),
525 })
526 .await
527 .unwrap();
528
529 assert!(matches!(response, ConsentResponse::Consent(true)));
530 }
531
532 #[tokio::test]
533 async fn test_hierarchical_consent_most_specific_wins() {
534 crate::trace2e_tracing::init();
535 let mut consent_service = ConsentService::new(0);
536 let source = Resource::new_process_mock(0);
537 let node_id = "node1".to_string();
538 let resource1 = Resource::new_file("/tmp/allowed.txt".to_string());
539 let resource2 = Resource::new_file("/tmp/denied.txt".to_string());
540
541 consent_service
543 .call(ConsentRequest::SetConsent {
544 source: source.clone(),
545 destination: Destination::Node(node_id.clone()),
546 consent: true,
547 })
548 .await
549 .unwrap();
550
551 consent_service
553 .call(ConsentRequest::SetConsent {
554 source: source.clone(),
555 destination: Destination::new(Some(node_id.clone()), Some(resource2.clone())),
556 consent: false,
557 })
558 .await
559 .unwrap();
560
561 let response1 = consent_service
563 .call(ConsentRequest::RequestConsent {
564 source: source.clone(),
565 destination: Destination::new(Some(node_id.clone()), Some(resource1)),
566 })
567 .await
568 .unwrap();
569 assert!(matches!(response1, ConsentResponse::Consent(true)));
570
571 let response2 = consent_service
573 .call(ConsentRequest::RequestConsent {
574 source,
575 destination: Destination::new(Some(node_id), Some(resource2)),
576 })
577 .await
578 .unwrap();
579 assert!(matches!(response2, ConsentResponse::Consent(false)));
580 }
581}