trace2e_core/traceability/services/
consent.rs

1//! Consent service for managing consent for outgoing data flows of resources.
2use std::{future::Future, pin::Pin, sync::Arc, task::Poll};
3
4use dashmap::{DashMap, Entry};
5use tokio::{sync::broadcast, time::Duration};
6use tower::Service;
7use tracing::info;
8
9use crate::traceability::{
10    error::TraceabilityError,
11    infrastructure::naming::{LocalizedResource, Resource},
12};
13
14/// Consent service request types.
15///
16/// API for the consent service, which manages user consent for data flow operations.
17#[derive(Debug, PartialEq)]
18pub enum ConsentRequest {
19    /// Request consent for a data flow operation.
20    ///
21    /// Requests consent from the resource owner for a data flow operation.
22    RequestConsent {
23        /// Source resource providing data
24        source: Resource,
25        /// Destination resource receiving data
26        destination: Destination,
27    },
28    /// Take ownership of a resource.
29    ///
30    /// The owner of the resource will be able to receive consent request notifications
31    /// and send back decisions for the resource through the returned channels.
32    TakeResourceOwnership(Resource),
33    /// Set consent decision for a specific data flow operation.
34    ///
35    /// Updates the consent status for a pending data flow operation.
36    SetConsent {
37        /// Source resource providing data
38        source: Resource,
39        /// Destination resource receiving data
40        destination: Destination,
41        /// Consent decision: true to grant, false to deny
42        consent: bool,
43    },
44}
45
46/// Consent service response types.
47///
48/// Responses from the consent service regarding consent decisions and pending requests.
49#[derive(Debug)]
50pub enum ConsentResponse {
51    /// Consent granted or denied for a data flow.
52    Consent(bool),
53    /// Acknowledgment of successful consent decision update.
54    Ack,
55    /// Notification channel for the resource.
56    Notifications(broadcast::Receiver<Destination>),
57}
58
59/// Destination for consent requests with built-in hierarchical structure.
60///
61/// The hierarchy is encoded in the type itself:
62/// - A `Resource` can have a `parent` destination (typically a `Node`)
63/// - This creates a natural traversal from specific to broad scopes
64///
65/// Example: `Resource { resource: file, parent: Some(Node("node1")) }`
66/// represents a file on node1, with node1 as the fallback consent scope.
67#[derive(Debug, Clone, Eq, PartialEq, Hash)]
68pub enum Destination {
69    /// A specific resource, optionally with a parent destination for hierarchy.
70    Resource { resource: Resource, parent: Option<Box<Self>> },
71    /// A node destination.
72    Node(String),
73}
74
75impl From<LocalizedResource> for Destination {
76    fn from(localized_resource: LocalizedResource) -> Self {
77        Self::Resource {
78            resource: localized_resource.resource().to_owned(),
79            parent: Some(Box::new(Self::Node(localized_resource.node_id().to_owned()))),
80        }
81    }
82}
83
84/// Parse a Destination from a string.
85/// Tries three formats in order:
86/// 1. LocalizedResource: "node_id@resource_spec"
87/// 2. Resource: "file:///path" or "stream://local::peer"
88/// 3. Node ID: a simple string without whitespaces
89impl TryFrom<&str> for Destination {
90    type Error = String;
91
92    fn try_from(s: &str) -> Result<Destination, Self::Error> {
93        let s = s.trim();
94
95        // Try parsing as LocalizedResource first (contains '@')
96        if s.contains('@')
97            && let Ok(lr) = LocalizedResource::try_from(s)
98        {
99            return Ok(Destination::Resource {
100                resource: lr.resource().clone(),
101                parent: Some(Box::new(Destination::Node(lr.node_id().clone()))),
102            });
103        }
104
105        // Try parsing as Resource (contains '://' or starts with specific patterns)
106        if s.contains("://")
107            && let Ok(resource) = Resource::try_from(s)
108        {
109            return Ok(Destination::Resource { resource, parent: None });
110        }
111
112        // Try as node ID (simple string without whitespaces)
113        if !s.contains(char::is_whitespace) && !s.is_empty() {
114            return Ok(Destination::Node(s.to_string()));
115        }
116
117        Err(format!(
118            "Failed to parse destination: '{}'. Expected one of: node_id, resource (file:// or stream://), or localized_resource (node_id@resource)",
119            s
120        ))
121    }
122}
123
124impl TryFrom<String> for Destination {
125    type Error = String;
126
127    fn try_from(s: String) -> Result<Destination, Self::Error> {
128        Destination::try_from(s.as_str())
129    }
130}
131
132impl Destination {
133    /// Create a new destination from optional node_id and resource.
134    pub fn new(node_id: Option<String>, resource: Option<Resource>) -> Self {
135        match (node_id, resource) {
136            (Some(node_id), Some(resource)) => {
137                Self::Resource { resource, parent: Some(Box::new(Self::Node(node_id))) }
138            }
139            (None, Some(resource)) => Self::Resource { resource, parent: None },
140            (Some(node_id), None) => Self::Node(node_id),
141            (None, None) => panic!("Cannot create Destination with no node_id and no resource"),
142        }
143    }
144
145    /// Iterator over the hierarchy from most specific to least specific.
146    ///
147    /// Example hierarchy traversal:
148    /// ```text
149    /// Resource { resource: file, parent: Some(Node("node1")) }
150    ///   -> yields: Resource(file), Node("node1")
151    /// ```
152    fn hierarchy(&self) -> impl Iterator<Item = &Destination> {
153        std::iter::successors(Some(self), |dest| match dest {
154            Destination::Resource { parent, .. } => parent.as_deref(),
155            Destination::Node(_) => None,
156        })
157    }
158}
159#[derive(Debug, Clone, Eq, PartialEq, Hash)]
160struct ConsentKey(Resource, Destination);
161
162#[derive(Default, Debug, Clone)]
163pub struct ConsentService {
164    timeout: u64,
165    /// Unified store of consent states keyed by (source, node_id, destination)
166    states: Arc<DashMap<ConsentKey, bool>>,
167    /// Consent request notification channels
168    notifications_channels: Arc<DashMap<Resource, broadcast::Sender<Destination>>>,
169    /// Consent decision channels
170    decision_channels: Arc<DashMap<ConsentKey, broadcast::Sender<bool>>>,
171}
172
173impl ConsentService {
174    /// Create a new `ConsentService` with the specified timeout.
175    ///
176    /// Timeout is disabled if set to 0.
177    pub fn new(timeout_ms: u64) -> Self {
178        Self {
179            timeout: timeout_ms,
180            states: Arc::new(DashMap::new()),
181            notifications_channels: Arc::new(DashMap::new()),
182            decision_channels: Arc::new(DashMap::new()),
183        }
184    }
185
186    /// Check for existing consent decisions in the hierarchy.
187    /// Returns the most specific consent decision if found.
188    ///
189    /// Traverses from most specific (resource) to least specific (node).
190    fn check_consent_hierarchy(
191        &self,
192        source: &Resource,
193        destination: &Destination,
194    ) -> Option<bool> {
195        destination.hierarchy().find_map(|dest| {
196            let key = ConsentKey(source.clone(), dest.clone());
197            self.states.get(&key).map(|v| *v)
198        })
199    }
200
201    /// Helper method to get or create a decision channel for a given key.
202    ///
203    /// This method ensures that a broadcast channel exists for the given ConsentKey,
204    /// creating one if necessary. Returns a receiver for subscribing to the decision.
205    fn get_or_create_decision_channel(&self, key: &ConsentKey) -> broadcast::Receiver<bool> {
206        let sender = self.decision_channels.get(key).map(|feed| feed.clone());
207
208        if let Some(sender) = sender {
209            sender.subscribe()
210        } else {
211            let (tx, rx) = broadcast::channel::<bool>(100);
212            self.decision_channels.insert(key.clone(), tx);
213            rx
214        }
215    }
216
217    /// Internal method to get consent
218    ///
219    /// Uses a dual subscription pattern when the destination has a parent node:
220    /// - Subscribes to both resource-level and node-level decision channels
221    /// - Uses tokio::select! to wait on whichever resolves first
222    ///
223    /// This eliminates the need for hierarchical scanning in set_consent
224    async fn get_consent(
225        &self,
226        source: Resource,
227        destination: Destination,
228    ) -> Result<bool, TraceabilityError> {
229        // Check hierarchy for existing decision (most specific first)
230        if let Some(consent) = self.check_consent_hierarchy(&source, &destination) {
231            return Ok(consent);
232        }
233
234        // No existing decision, proceed with request flow
235        let resource_key = ConsentKey(source.clone(), destination.clone());
236
237        // Extract node-level key if destination has a parent node
238        let node_key = match &destination {
239            Destination::Resource { parent: Some(parent), .. } => {
240                Some(ConsentKey(source.clone(), (**parent).clone()))
241            }
242            _ => None,
243        };
244
245        // Get notification sender
246        let notif_sender = match self.notifications_channels.get(&resource_key.0) {
247            Some(sender_ref) => sender_ref.clone(),
248            None => {
249                // No notifications feed, so nobody will ever know about this consent request
250                // and it will never be granted
251                return Ok(false);
252            }
253        };
254
255        // Subscribe to resource-level decision channel
256        let mut resource_rx = self.get_or_create_decision_channel(&resource_key);
257
258        // Subscribe to node-level decision channel (if applicable)
259        let node_rx = node_key.as_ref().map(|key| self.get_or_create_decision_channel(key));
260
261        // Send notification
262        notif_sender.send(destination).map_err(|_| TraceabilityError::InternalTrace2eError)?;
263
264        // Wait for either channel to resolve using tokio::select!
265        if let Some(mut node_rx) = node_rx {
266            if self.timeout > 0 {
267                tokio::time::timeout(Duration::from_millis(self.timeout), async {
268                    tokio::select! {
269                        result = resource_rx.recv() => result,
270                        result = node_rx.recv() => result,
271                    }
272                })
273                .await
274                .map_err(|_| TraceabilityError::ConsentRequestTimeout)?
275                .map_err(|_| TraceabilityError::InternalTrace2eError)
276            } else {
277                tokio::select! {
278                    result = resource_rx.recv() => result,
279                    result = node_rx.recv() => result,
280                }
281                .map_err(|_| TraceabilityError::InternalTrace2eError)
282            }
283        } else {
284            // No parent node, just wait on resource channel
285            if self.timeout > 0 {
286                tokio::time::timeout(Duration::from_millis(self.timeout), resource_rx.recv())
287                    .await
288                    .map_err(|_| TraceabilityError::ConsentRequestTimeout)?
289                    .map_err(|_| TraceabilityError::InternalTrace2eError)
290            } else {
291                resource_rx.recv().await.map_err(|_| TraceabilityError::InternalTrace2eError)
292            }
293        }
294    }
295
296    /// Internal method to set consent
297    ///
298    /// With the dual subscription pattern, subscribers to hierarchical destinations
299    /// are already listening to both their specific key and their parent's key,
300    /// so we only need to notify the exact key. No hierarchical scanning needed.
301    fn set_consent(&self, source: Resource, destination: Destination, consent: bool) {
302        let key = ConsentKey(source, destination);
303        // Insert the consent decision into the persistent state
304        self.states.insert(key.clone(), consent);
305
306        // Notify the exact key's channel
307        // Subscribers are already listening to both resource and node channels via dual subscription
308        if let Some((_, decision_feed)) = self.decision_channels.remove(&key) {
309            let _ = decision_feed.send(consent);
310        }
311    }
312
313    /// Internal method to subscribe to consent request notifications
314    fn take_resource_ownership(&self, resource: Resource) -> broadcast::Receiver<Destination> {
315        match self.notifications_channels.entry(resource.clone()) {
316            Entry::Occupied(entry) => entry.get().subscribe(),
317            Entry::Vacant(entry) => {
318                let (tx, rx) = broadcast::channel(100);
319                entry.insert(tx);
320                rx
321            }
322        }
323    }
324}
325
326impl Service<ConsentRequest> for ConsentService {
327    type Response = ConsentResponse;
328    type Error = TraceabilityError;
329    type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
330
331    fn poll_ready(&mut self, _: &mut std::task::Context<'_>) -> Poll<Result<(), Self::Error>> {
332        Poll::Ready(Ok(()))
333    }
334
335    fn call(&mut self, request: ConsentRequest) -> Self::Future {
336        let this = self.clone();
337        Box::pin(async move {
338            match request {
339                ConsentRequest::RequestConsent { source, destination } => {
340                    info!(source = %source, destination = ?destination, "[consent] RequestConsent");
341                    this.get_consent(source, destination).await.map(ConsentResponse::Consent)
342                }
343                ConsentRequest::SetConsent { source, destination, consent } => {
344                    info!(consent = %consent, source = %source, destination = ?destination, "[consent] SetConsent");
345                    this.set_consent(source, destination, consent);
346                    Ok(ConsentResponse::Ack)
347                }
348                ConsentRequest::TakeResourceOwnership(resource) => {
349                    info!(resource = %resource, "[consent] TakeResourceOwnership");
350                    Ok(ConsentResponse::Notifications(this.take_resource_ownership(resource)))
351                }
352            }
353        })
354    }
355}
356
357#[cfg(test)]
358mod tests {
359    use super::*;
360
361    #[tokio::test]
362    async fn test_consent_service_no_ownership() {
363        crate::trace2e_tracing::init();
364        let mut consent_service = ConsentService::new(0);
365        let resource = Resource::new_process_mock(0);
366        let destination = Resource::new_file("/tmp/test.txt".to_string());
367        let request = ConsentRequest::RequestConsent {
368            source: resource.clone(),
369            destination: Destination::new(None, Some(destination.clone())),
370        };
371        let response = consent_service.call(request).await.unwrap();
372        // No ownership, so no consent can be granted
373        assert!(matches!(response, ConsentResponse::Consent(false)));
374    }
375
376    #[tokio::test]
377    async fn test_consent_service_with_ownership_with_decision_on_notification() {
378        crate::trace2e_tracing::init();
379        let mut consent_service = ConsentService::new(0);
380        let resource = Resource::new_process_mock(0);
381        let destination =
382            Destination::new(None, Some(Resource::new_file("/tmp/test.txt".to_string())));
383        let request = ConsentRequest::TakeResourceOwnership(resource.clone());
384        let ownership_response = consent_service.call(request).await.unwrap();
385        let ConsentResponse::Notifications(mut notifications_feed) = ownership_response else {
386            panic!("Expected Notifications");
387        };
388        let resource_clone = resource.clone();
389        let destination_clone = destination.clone();
390        let mut consent_service_clone = consent_service.clone();
391        tokio::task::spawn(async move {
392            assert!(matches!(
393                consent_service_clone
394                    .call(ConsentRequest::RequestConsent {
395                        source: resource_clone,
396                        destination: destination_clone,
397                    })
398                    .await
399                    .unwrap(),
400                ConsentResponse::Consent(true)
401            ));
402        });
403
404        assert_eq!(notifications_feed.recv().await.unwrap(), destination.clone());
405        consent_service
406            .call(ConsentRequest::SetConsent { source: resource, destination, consent: true })
407            .await
408            .unwrap();
409    }
410
411    #[tokio::test]
412    async fn test_consent_service_with_ownership_with_decision_timeout() {
413        crate::trace2e_tracing::init();
414        let mut consent_service = ConsentService::new(1);
415        let resource = Resource::new_process_mock(0);
416        let destination =
417            Destination::new(None, Some(Resource::new_file("/tmp/test.txt".to_string())));
418        let request = ConsentRequest::TakeResourceOwnership(resource.clone());
419        let ownership_response = consent_service.call(request).await.unwrap();
420        let ConsentResponse::Notifications(mut notifications_feed) = ownership_response else {
421            panic!("Expected Notifications");
422        };
423
424        // Spawn a task to check the consent request timeout before the decision is sent
425        let resource_clone = resource.clone();
426        let destination_clone = destination.clone();
427        let mut consent_service_clone = consent_service.clone();
428        tokio::task::spawn(async move {
429            assert!(matches!(
430                consent_service_clone
431                    .call(ConsentRequest::RequestConsent {
432                        source: resource_clone,
433                        destination: destination_clone,
434                    })
435                    .await
436                    .unwrap_err(),
437                TraceabilityError::ConsentRequestTimeout
438            ));
439        });
440
441        tokio::time::sleep(Duration::from_millis(2)).await;
442        assert_eq!(notifications_feed.recv().await.unwrap(), destination.clone());
443        consent_service
444            .call(ConsentRequest::SetConsent {
445                source: resource.clone(),
446                destination: destination.clone(),
447                consent: true,
448            })
449            .await
450            .unwrap();
451
452        // Check the consent request timeout after the decision is sent
453        assert!(matches!(
454            consent_service
455                .call(ConsentRequest::RequestConsent { source: resource, destination })
456                .await
457                .unwrap(),
458            ConsentResponse::Consent(true)
459        ));
460    }
461
462    #[tokio::test]
463    async fn test_hierarchical_consent_resource_overrides_node() {
464        crate::trace2e_tracing::init();
465        let mut consent_service = ConsentService::new(0);
466        let source = Resource::new_process_mock(0);
467        let node_id = "node1".to_string();
468        let resource = Resource::new_file("/tmp/test.txt".to_string());
469
470        // Set node-level consent to false
471        consent_service
472            .call(ConsentRequest::SetConsent {
473                source: source.clone(),
474                destination: Destination::Node(node_id.clone()),
475                consent: false,
476            })
477            .await
478            .unwrap();
479
480        // Set resource-level consent to true (should override node-level)
481        consent_service
482            .call(ConsentRequest::SetConsent {
483                source: source.clone(),
484                destination: Destination::new(Some(node_id.clone()), Some(resource.clone())),
485                consent: true,
486            })
487            .await
488            .unwrap();
489
490        // Request consent for the resource - should get true (resource-level overrides node-level)
491        let response = consent_service
492            .call(ConsentRequest::RequestConsent {
493                source: source.clone(),
494                destination: Destination::new(Some(node_id), Some(resource)),
495            })
496            .await
497            .unwrap();
498
499        assert!(matches!(response, ConsentResponse::Consent(true)));
500    }
501
502    #[tokio::test]
503    async fn test_hierarchical_consent_node_level_fallback() {
504        crate::trace2e_tracing::init();
505        let mut consent_service = ConsentService::new(0);
506        let source = Resource::new_process_mock(0);
507        let node_id = "node1".to_string();
508        let resource = Resource::new_file("/tmp/test.txt".to_string());
509
510        // Set only node-level consent to true
511        consent_service
512            .call(ConsentRequest::SetConsent {
513                source: source.clone(),
514                destination: Destination::Node(node_id.clone()),
515                consent: true,
516            })
517            .await
518            .unwrap();
519
520        // Request consent for a resource on that node - should fall back to node-level
521        let response = consent_service
522            .call(ConsentRequest::RequestConsent {
523                source: source.clone(),
524                destination: Destination::new(Some(node_id), Some(resource)),
525            })
526            .await
527            .unwrap();
528
529        assert!(matches!(response, ConsentResponse::Consent(true)));
530    }
531
532    #[tokio::test]
533    async fn test_hierarchical_consent_most_specific_wins() {
534        crate::trace2e_tracing::init();
535        let mut consent_service = ConsentService::new(0);
536        let source = Resource::new_process_mock(0);
537        let node_id = "node1".to_string();
538        let resource1 = Resource::new_file("/tmp/allowed.txt".to_string());
539        let resource2 = Resource::new_file("/tmp/denied.txt".to_string());
540
541        // Set node-level consent to true (permissive default)
542        consent_service
543            .call(ConsentRequest::SetConsent {
544                source: source.clone(),
545                destination: Destination::Node(node_id.clone()),
546                consent: true,
547            })
548            .await
549            .unwrap();
550
551        // Set resource-level consent to false for specific resource (more specific, should win)
552        consent_service
553            .call(ConsentRequest::SetConsent {
554                source: source.clone(),
555                destination: Destination::new(Some(node_id.clone()), Some(resource2.clone())),
556                consent: false,
557            })
558            .await
559            .unwrap();
560
561        // Resource1 should inherit node-level consent (true)
562        let response1 = consent_service
563            .call(ConsentRequest::RequestConsent {
564                source: source.clone(),
565                destination: Destination::new(Some(node_id.clone()), Some(resource1)),
566            })
567            .await
568            .unwrap();
569        assert!(matches!(response1, ConsentResponse::Consent(true)));
570
571        // Resource2 should use its specific consent (false), overriding node-level
572        let response2 = consent_service
573            .call(ConsentRequest::RequestConsent {
574                source,
575                destination: Destination::new(Some(node_id), Some(resource2)),
576            })
577            .await
578            .unwrap();
579        assert!(matches!(response2, ConsentResponse::Consent(false)));
580    }
581}