1use std::{future::Future, pin::Pin, sync::Arc, task::Poll};
3
4use dashmap::{DashMap, Entry};
5use tokio::{sync::broadcast, time::Duration};
6use tower::Service;
7use tracing::info;
8
9use crate::traceability::{
10 error::TraceabilityError,
11 infrastructure::naming::{LocalizedResource, Resource},
12};
13
14#[derive(Debug, PartialEq)]
18pub enum ConsentRequest {
19 RequestConsent {
23 source: Resource,
25 destination: Destination,
27 },
28 TakeResourceOwnership(Resource),
33 SetConsent {
37 source: Resource,
39 destination: Destination,
41 consent: bool,
43 },
44}
45
46#[derive(Debug)]
50pub enum ConsentResponse {
51 Consent(bool),
53 Ack,
55 Notifications(broadcast::Receiver<Destination>),
57}
58
59#[derive(Debug, Clone, Eq, PartialEq, Hash)]
68pub enum Destination {
69 Resource { resource: Resource, parent: Option<Box<Self>> },
71 Node(String),
73}
74
75impl From<LocalizedResource> for Destination {
76 fn from(localized_resource: LocalizedResource) -> Self {
77 Self::Resource {
78 resource: localized_resource.resource().to_owned(),
79 parent: Some(Box::new(Self::Node(localized_resource.node_id().to_owned()))),
80 }
81 }
82}
83
84impl TryFrom<&str> for Destination {
90 type Error = String;
91
92 fn try_from(s: &str) -> Result<Destination, Self::Error> {
93 let s = s.trim();
94
95 if s.contains('@')
97 && let Ok(lr) = LocalizedResource::try_from(s)
98 {
99 return Ok(Destination::Resource {
100 resource: lr.resource().clone(),
101 parent: Some(Box::new(Destination::Node(lr.node_id().clone()))),
102 });
103 }
104
105 if s.contains("://")
107 && let Ok(resource) = Resource::try_from(s)
108 {
109 return Ok(Destination::Resource { resource, parent: None });
110 }
111
112 if !s.contains(char::is_whitespace) && !s.is_empty() {
114 return Ok(Destination::Node(s.to_string()));
115 }
116
117 Err(format!(
118 "Failed to parse destination: '{}'. Expected one of: node_id, resource (file:// or stream://), or localized_resource (node_id@resource)",
119 s
120 ))
121 }
122}
123
124impl TryFrom<String> for Destination {
125 type Error = String;
126
127 fn try_from(s: String) -> Result<Destination, Self::Error> {
128 Destination::try_from(s.as_str())
129 }
130}
131
132impl Destination {
133 pub fn new(node_id: Option<String>, resource: Option<Resource>) -> Self {
135 match (node_id, resource) {
136 (Some(node_id), Some(resource)) => {
137 Self::Resource { resource, parent: Some(Box::new(Self::Node(node_id))) }
138 }
139 (None, Some(resource)) => Self::Resource { resource, parent: None },
140 (Some(node_id), None) => Self::Node(node_id),
141 (None, None) => panic!("Cannot create Destination with no node_id and no resource"),
142 }
143 }
144
145 fn hierarchy(&self) -> impl Iterator<Item = &Destination> {
153 std::iter::successors(Some(self), |dest| match dest {
154 Destination::Resource { parent, .. } => parent.as_deref(),
155 Destination::Node(_) => None,
156 })
157 }
158}
159#[derive(Debug, Clone, Eq, PartialEq, Hash)]
160struct ConsentKey(Resource, Destination);
161
162#[derive(Default, Debug, Clone)]
163pub struct ConsentService {
164 timeout: u64,
165 states: Arc<DashMap<ConsentKey, bool>>,
167 notifications_channels: Arc<DashMap<Resource, broadcast::Sender<Destination>>>,
169 decision_channels: Arc<DashMap<ConsentKey, broadcast::Sender<bool>>>,
171}
172
173impl ConsentService {
174 pub fn new(timeout_ms: u64) -> Self {
178 Self {
179 timeout: timeout_ms,
180 states: Arc::new(DashMap::new()),
181 notifications_channels: Arc::new(DashMap::new()),
182 decision_channels: Arc::new(DashMap::new()),
183 }
184 }
185
186 fn check_consent_hierarchy(
191 &self,
192 source: &Resource,
193 destination: &Destination,
194 ) -> Option<bool> {
195 destination.hierarchy().find_map(|dest| {
196 let key = ConsentKey(source.clone(), dest.clone());
197 self.states.get(&key).map(|v| *v)
198 })
199 }
200
201 async fn get_consent(
203 &self,
204 source: Resource,
205 destination: Destination,
206 ) -> Result<bool, TraceabilityError> {
207 if let Some(consent) = self.check_consent_hierarchy(&source, &destination) {
209 return Ok(consent);
210 }
211
212 let key = ConsentKey(source, destination);
214
215 let notif_sender = match self.notifications_channels.get(&key.0) {
218 Some(sender_ref) => sender_ref.clone(),
219 None => {
220 return Ok(false);
223 }
224 };
225 let decision_sender = self.decision_channels.get(&key).map(|feed| feed.clone());
230 let mut decision_rx = if let Some(decision_sender) = decision_sender {
233 notif_sender
235 .send(key.1.clone())
236 .map_err(|_| TraceabilityError::InternalTrace2eError)?;
237 decision_sender.subscribe() } else {
239 let (tx, rx) = broadcast::channel::<bool>(100);
241 self.decision_channels.insert(key.clone(), tx);
242 notif_sender.send(key.1).map_err(|_| TraceabilityError::InternalTrace2eError)?;
243 rx
244 };
245
246 if self.timeout > 0 {
248 tokio::time::timeout(Duration::from_millis(self.timeout), decision_rx.recv())
249 .await
250 .map_err(|_| TraceabilityError::ConsentRequestTimeout)?
251 .map_err(|_| TraceabilityError::InternalTrace2eError)
252 } else {
253 decision_rx.recv().await.map_err(|_| TraceabilityError::InternalTrace2eError)
254 }
255 }
256
257 fn set_consent(&self, source: Resource, destination: Destination, consent: bool) {
259 let key = ConsentKey(source.clone(), destination.clone());
260 self.states.insert(key.clone(), consent);
262
263 if let Some((_, decision_feed)) = self.decision_channels.remove(&key) {
270 let _ = decision_feed.send(consent);
271 }
272
273 let mut keys_to_remove = Vec::new();
277 for entry in self.decision_channels.iter() {
278 let other_key = entry.key();
279 if other_key.0 == source {
282 let other_dest = &other_key.1;
284
285 let mut current: Option<&Destination> = Some(other_dest);
287 while let Some(dest) = current {
288 if dest == &destination {
289 keys_to_remove.push(other_key.clone());
291 break;
292 }
293 current = match dest {
295 Destination::Resource { parent, .. } => parent.as_deref(),
296 Destination::Node(_) => None,
297 };
298 }
299 }
300 }
301
302 for key_to_remove in keys_to_remove {
304 if let Some((_, decision_feed)) = self.decision_channels.remove(&key_to_remove) {
305 let _ = decision_feed.send(consent);
306 }
307 }
308 }
309
310 fn take_resource_ownership(&self, resource: Resource) -> broadcast::Receiver<Destination> {
312 match self.notifications_channels.entry(resource.clone()) {
313 Entry::Occupied(entry) => entry.get().subscribe(),
314 Entry::Vacant(entry) => {
315 let (tx, rx) = broadcast::channel(100);
316 entry.insert(tx);
317 rx
318 }
319 }
320 }
321}
322
323impl Service<ConsentRequest> for ConsentService {
324 type Response = ConsentResponse;
325 type Error = TraceabilityError;
326 type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
327
328 fn poll_ready(&mut self, _: &mut std::task::Context<'_>) -> Poll<Result<(), Self::Error>> {
329 Poll::Ready(Ok(()))
330 }
331
332 fn call(&mut self, request: ConsentRequest) -> Self::Future {
333 let this = self.clone();
334 Box::pin(async move {
335 match request {
336 ConsentRequest::RequestConsent { source, destination } => {
337 info!(source = %source, destination = ?destination, "[consent] RequestConsent");
338 this.get_consent(source, destination).await.map(ConsentResponse::Consent)
339 }
340 ConsentRequest::SetConsent { source, destination, consent } => {
341 info!(consent = %consent, source = %source, destination = ?destination, "[consent] SetConsent");
342 this.set_consent(source, destination, consent);
343 Ok(ConsentResponse::Ack)
344 }
345 ConsentRequest::TakeResourceOwnership(resource) => {
346 info!(resource = %resource, "[consent] TakeResourceOwnership");
347 Ok(ConsentResponse::Notifications(this.take_resource_ownership(resource)))
348 }
349 }
350 })
351 }
352}
353
354#[cfg(test)]
355mod tests {
356 use super::*;
357
358 #[tokio::test]
359 async fn test_consent_service_no_ownership() {
360 crate::trace2e_tracing::init();
361 let mut consent_service = ConsentService::new(0);
362 let resource = Resource::new_process_mock(0);
363 let destination = Resource::new_file("/tmp/test.txt".to_string());
364 let request = ConsentRequest::RequestConsent {
365 source: resource.clone(),
366 destination: Destination::new(None, Some(destination.clone())),
367 };
368 let response = consent_service.call(request).await.unwrap();
369 assert!(matches!(response, ConsentResponse::Consent(false)));
371 }
372
373 #[tokio::test]
374 async fn test_consent_service_with_ownership_with_decision_on_notification() {
375 crate::trace2e_tracing::init();
376 let mut consent_service = ConsentService::new(0);
377 let resource = Resource::new_process_mock(0);
378 let destination =
379 Destination::new(None, Some(Resource::new_file("/tmp/test.txt".to_string())));
380 let request = ConsentRequest::TakeResourceOwnership(resource.clone());
381 let ownership_response = consent_service.call(request).await.unwrap();
382 let ConsentResponse::Notifications(mut notifications_feed) = ownership_response else {
383 panic!("Expected Notifications");
384 };
385 let resource_clone = resource.clone();
386 let destination_clone = destination.clone();
387 let mut consent_service_clone = consent_service.clone();
388 tokio::task::spawn(async move {
389 assert!(matches!(
390 consent_service_clone
391 .call(ConsentRequest::RequestConsent {
392 source: resource_clone,
393 destination: destination_clone,
394 })
395 .await
396 .unwrap(),
397 ConsentResponse::Consent(true)
398 ));
399 });
400
401 assert_eq!(notifications_feed.recv().await.unwrap(), destination.clone());
402 consent_service
403 .call(ConsentRequest::SetConsent { source: resource, destination, consent: true })
404 .await
405 .unwrap();
406 }
407
408 #[tokio::test]
409 async fn test_consent_service_with_ownership_with_decision_timeout() {
410 crate::trace2e_tracing::init();
411 let mut consent_service = ConsentService::new(1);
412 let resource = Resource::new_process_mock(0);
413 let destination =
414 Destination::new(None, Some(Resource::new_file("/tmp/test.txt".to_string())));
415 let request = ConsentRequest::TakeResourceOwnership(resource.clone());
416 let ownership_response = consent_service.call(request).await.unwrap();
417 let ConsentResponse::Notifications(mut notifications_feed) = ownership_response else {
418 panic!("Expected Notifications");
419 };
420
421 let resource_clone = resource.clone();
423 let destination_clone = destination.clone();
424 let mut consent_service_clone = consent_service.clone();
425 tokio::task::spawn(async move {
426 assert!(matches!(
427 consent_service_clone
428 .call(ConsentRequest::RequestConsent {
429 source: resource_clone,
430 destination: destination_clone,
431 })
432 .await
433 .unwrap_err(),
434 TraceabilityError::ConsentRequestTimeout
435 ));
436 });
437
438 tokio::time::sleep(Duration::from_millis(2)).await;
439 assert_eq!(notifications_feed.recv().await.unwrap(), destination.clone());
440 consent_service
441 .call(ConsentRequest::SetConsent {
442 source: resource.clone(),
443 destination: destination.clone(),
444 consent: true,
445 })
446 .await
447 .unwrap();
448
449 assert!(matches!(
451 consent_service
452 .call(ConsentRequest::RequestConsent { source: resource, destination })
453 .await
454 .unwrap(),
455 ConsentResponse::Consent(true)
456 ));
457 }
458
459 #[tokio::test]
460 async fn test_hierarchical_consent_resource_overrides_node() {
461 crate::trace2e_tracing::init();
462 let mut consent_service = ConsentService::new(0);
463 let source = Resource::new_process_mock(0);
464 let node_id = "node1".to_string();
465 let resource = Resource::new_file("/tmp/test.txt".to_string());
466
467 consent_service
469 .call(ConsentRequest::SetConsent {
470 source: source.clone(),
471 destination: Destination::Node(node_id.clone()),
472 consent: false,
473 })
474 .await
475 .unwrap();
476
477 consent_service
479 .call(ConsentRequest::SetConsent {
480 source: source.clone(),
481 destination: Destination::new(Some(node_id.clone()), Some(resource.clone())),
482 consent: true,
483 })
484 .await
485 .unwrap();
486
487 let response = consent_service
489 .call(ConsentRequest::RequestConsent {
490 source: source.clone(),
491 destination: Destination::new(Some(node_id), Some(resource)),
492 })
493 .await
494 .unwrap();
495
496 assert!(matches!(response, ConsentResponse::Consent(true)));
497 }
498
499 #[tokio::test]
500 async fn test_hierarchical_consent_node_level_fallback() {
501 crate::trace2e_tracing::init();
502 let mut consent_service = ConsentService::new(0);
503 let source = Resource::new_process_mock(0);
504 let node_id = "node1".to_string();
505 let resource = Resource::new_file("/tmp/test.txt".to_string());
506
507 consent_service
509 .call(ConsentRequest::SetConsent {
510 source: source.clone(),
511 destination: Destination::Node(node_id.clone()),
512 consent: true,
513 })
514 .await
515 .unwrap();
516
517 let response = consent_service
519 .call(ConsentRequest::RequestConsent {
520 source: source.clone(),
521 destination: Destination::new(Some(node_id), Some(resource)),
522 })
523 .await
524 .unwrap();
525
526 assert!(matches!(response, ConsentResponse::Consent(true)));
527 }
528
529 #[tokio::test]
530 async fn test_hierarchical_consent_most_specific_wins() {
531 crate::trace2e_tracing::init();
532 let mut consent_service = ConsentService::new(0);
533 let source = Resource::new_process_mock(0);
534 let node_id = "node1".to_string();
535 let resource1 = Resource::new_file("/tmp/allowed.txt".to_string());
536 let resource2 = Resource::new_file("/tmp/denied.txt".to_string());
537
538 consent_service
540 .call(ConsentRequest::SetConsent {
541 source: source.clone(),
542 destination: Destination::Node(node_id.clone()),
543 consent: true,
544 })
545 .await
546 .unwrap();
547
548 consent_service
550 .call(ConsentRequest::SetConsent {
551 source: source.clone(),
552 destination: Destination::new(Some(node_id.clone()), Some(resource2.clone())),
553 consent: false,
554 })
555 .await
556 .unwrap();
557
558 let response1 = consent_service
560 .call(ConsentRequest::RequestConsent {
561 source: source.clone(),
562 destination: Destination::new(Some(node_id.clone()), Some(resource1)),
563 })
564 .await
565 .unwrap();
566 assert!(matches!(response1, ConsentResponse::Consent(true)));
567
568 let response2 = consent_service
570 .call(ConsentRequest::RequestConsent {
571 source,
572 destination: Destination::new(Some(node_id), Some(resource2)),
573 })
574 .await
575 .unwrap();
576 assert!(matches!(response2, ConsentResponse::Consent(false)));
577 }
578}