trace2e_core/traceability/services/
consent.rs

1//! Consent service for managing consent for outgoing data flows of resources.
2use std::{future::Future, pin::Pin, sync::Arc, task::Poll};
3
4use dashmap::{DashMap, Entry};
5use tokio::{sync::broadcast, time::Duration};
6use tower::Service;
7use tracing::info;
8
9use crate::traceability::{
10    error::TraceabilityError,
11    infrastructure::naming::{LocalizedResource, Resource},
12};
13
14/// Consent service request types.
15///
16/// API for the consent service, which manages user consent for data flow operations.
17#[derive(Debug, PartialEq)]
18pub enum ConsentRequest {
19    /// Request consent for a data flow operation.
20    ///
21    /// Requests consent from the resource owner for a data flow operation.
22    RequestConsent {
23        /// Source resource providing data
24        source: Resource,
25        /// Destination resource receiving data
26        destination: Destination,
27    },
28    /// Take ownership of a resource.
29    ///
30    /// The owner of the resource will be able to receive consent request notifications
31    /// and send back decisions for the resource through the returned channels.
32    TakeResourceOwnership(Resource),
33    /// Set consent decision for a specific data flow operation.
34    ///
35    /// Updates the consent status for a pending data flow operation.
36    SetConsent {
37        /// Source resource providing data
38        source: Resource,
39        /// Destination resource receiving data
40        destination: Destination,
41        /// Consent decision: true to grant, false to deny
42        consent: bool,
43    },
44}
45
46/// Consent service response types.
47///
48/// Responses from the consent service regarding consent decisions and pending requests.
49#[derive(Debug)]
50pub enum ConsentResponse {
51    /// Consent granted or denied for a data flow.
52    Consent(bool),
53    /// Acknowledgment of successful consent decision update.
54    Ack,
55    /// Notification channel for the resource.
56    Notifications(broadcast::Receiver<Destination>),
57}
58
59/// Destination for consent requests with built-in hierarchical structure.
60///
61/// The hierarchy is encoded in the type itself:
62/// - A `Resource` can have a `parent` destination (typically a `Node`)
63/// - This creates a natural traversal from specific to broad scopes
64///
65/// Example: `Resource { resource: file, parent: Some(Node("node1")) }`
66/// represents a file on node1, with node1 as the fallback consent scope.
67#[derive(Debug, Clone, Eq, PartialEq, Hash)]
68pub enum Destination {
69    /// A specific resource, optionally with a parent destination for hierarchy.
70    Resource { resource: Resource, parent: Option<Box<Self>> },
71    /// A node destination.
72    Node(String),
73}
74
75impl From<LocalizedResource> for Destination {
76    fn from(localized_resource: LocalizedResource) -> Self {
77        Self::Resource {
78            resource: localized_resource.resource().to_owned(),
79            parent: Some(Box::new(Self::Node(localized_resource.node_id().to_owned()))),
80        }
81    }
82}
83
84/// Parse a Destination from a string.
85/// Tries three formats in order:
86/// 1. LocalizedResource: "node_id@resource_spec"
87/// 2. Resource: "file:///path" or "stream://local::peer"
88/// 3. Node ID: a simple string without whitespaces
89impl TryFrom<&str> for Destination {
90    type Error = String;
91
92    fn try_from(s: &str) -> Result<Destination, Self::Error> {
93        let s = s.trim();
94
95        // Try parsing as LocalizedResource first (contains '@')
96        if s.contains('@')
97            && let Ok(lr) = LocalizedResource::try_from(s)
98        {
99            return Ok(Destination::Resource {
100                resource: lr.resource().clone(),
101                parent: Some(Box::new(Destination::Node(lr.node_id().clone()))),
102            });
103        }
104
105        // Try parsing as Resource (contains '://' or starts with specific patterns)
106        if s.contains("://")
107            && let Ok(resource) = Resource::try_from(s)
108        {
109            return Ok(Destination::Resource { resource, parent: None });
110        }
111
112        // Try as node ID (simple string without whitespaces)
113        if !s.contains(char::is_whitespace) && !s.is_empty() {
114            return Ok(Destination::Node(s.to_string()));
115        }
116
117        Err(format!(
118            "Failed to parse destination: '{}'. Expected one of: node_id, resource (file:// or stream://), or localized_resource (node_id@resource)",
119            s
120        ))
121    }
122}
123
124impl TryFrom<String> for Destination {
125    type Error = String;
126
127    fn try_from(s: String) -> Result<Destination, Self::Error> {
128        Destination::try_from(s.as_str())
129    }
130}
131
132impl Destination {
133    /// Create a new destination from optional node_id and resource.
134    pub fn new(node_id: Option<String>, resource: Option<Resource>) -> Self {
135        match (node_id, resource) {
136            (Some(node_id), Some(resource)) => {
137                Self::Resource { resource, parent: Some(Box::new(Self::Node(node_id))) }
138            }
139            (None, Some(resource)) => Self::Resource { resource, parent: None },
140            (Some(node_id), None) => Self::Node(node_id),
141            (None, None) => panic!("Cannot create Destination with no node_id and no resource"),
142        }
143    }
144
145    /// Iterator over the hierarchy from most specific to least specific.
146    ///
147    /// Example hierarchy traversal:
148    /// ```text
149    /// Resource { resource: file, parent: Some(Node("node1")) }
150    ///   -> yields: Resource(file), Node("node1")
151    /// ```
152    fn hierarchy(&self) -> impl Iterator<Item = &Destination> {
153        std::iter::successors(Some(self), |dest| match dest {
154            Destination::Resource { parent, .. } => parent.as_deref(),
155            Destination::Node(_) => None,
156        })
157    }
158}
159#[derive(Debug, Clone, Eq, PartialEq, Hash)]
160struct ConsentKey(Resource, Destination);
161
162#[derive(Default, Debug, Clone)]
163pub struct ConsentService {
164    timeout: u64,
165    /// Unified store of consent states keyed by (source, node_id, destination)
166    states: Arc<DashMap<ConsentKey, bool>>,
167    /// Consent request notification channels
168    notifications_channels: Arc<DashMap<Resource, broadcast::Sender<Destination>>>,
169    /// Consent decision channels
170    decision_channels: Arc<DashMap<ConsentKey, broadcast::Sender<bool>>>,
171}
172
173impl ConsentService {
174    /// Create a new `ConsentService` with the specified timeout.
175    ///
176    /// Timeout is disabled if set to 0.
177    pub fn new(timeout_ms: u64) -> Self {
178        Self {
179            timeout: timeout_ms,
180            states: Arc::new(DashMap::new()),
181            notifications_channels: Arc::new(DashMap::new()),
182            decision_channels: Arc::new(DashMap::new()),
183        }
184    }
185
186    /// Check for existing consent decisions in the hierarchy.
187    /// Returns the most specific consent decision if found.
188    ///
189    /// Traverses from most specific (resource) to least specific (node).
190    fn check_consent_hierarchy(
191        &self,
192        source: &Resource,
193        destination: &Destination,
194    ) -> Option<bool> {
195        destination.hierarchy().find_map(|dest| {
196            let key = ConsentKey(source.clone(), dest.clone());
197            self.states.get(&key).map(|v| *v)
198        })
199    }
200
201    /// Internal method to get consent
202    async fn get_consent(
203        &self,
204        source: Resource,
205        destination: Destination,
206    ) -> Result<bool, TraceabilityError> {
207        // Check hierarchy for existing decision (most specific first)
208        if let Some(consent) = self.check_consent_hierarchy(&source, &destination) {
209            return Ok(consent);
210        }
211
212        // No existing decision, proceed with request flow
213        let key = ConsentKey(source, destination);
214
215        // CRITICAL: Clone the broadcast sender OUTSIDE the DashMap guard
216        // to avoid holding a lock during async recv() operations
217        let notif_sender = match self.notifications_channels.get(&key.0) {
218            Some(sender_ref) => sender_ref.clone(),
219            None => {
220                // No notifications feed, so nobody will ever know about this consent request
221                // and it will never be granted
222                return Ok(false);
223            }
224        };
225        // The sender is now cloned and the guard is dropped above
226
227        // Send consent request notification and get decision receiver
228        // CRITICAL: Clone the decision sender OUTSIDE the DashMap guard to avoid lock issues
229        let decision_sender = self.decision_channels.get(&key).map(|feed| feed.clone());
230        // Guard is now dropped
231
232        let mut decision_rx = if let Some(decision_sender) = decision_sender {
233            // Existing decision channel - subscribe to it
234            notif_sender
235                .send(key.1.clone())
236                .map_err(|_| TraceabilityError::InternalTrace2eError)?;
237            decision_sender.subscribe() // ✅ No DashMap lock held
238        } else {
239            // First request for this source→destination - create new decision channel
240            let (tx, rx) = broadcast::channel::<bool>(100);
241            self.decision_channels.insert(key.clone(), tx);
242            notif_sender.send(key.1).map_err(|_| TraceabilityError::InternalTrace2eError)?;
243            rx
244        };
245
246        // Handle timeout - no DashMap locks held here
247        if self.timeout > 0 {
248            tokio::time::timeout(Duration::from_millis(self.timeout), decision_rx.recv())
249                .await
250                .map_err(|_| TraceabilityError::ConsentRequestTimeout)?
251                .map_err(|_| TraceabilityError::InternalTrace2eError)
252        } else {
253            decision_rx.recv().await.map_err(|_| TraceabilityError::InternalTrace2eError)
254        }
255    }
256
257    /// Internal method to set consent
258    fn set_consent(&self, source: Resource, destination: Destination, consent: bool) {
259        let key = ConsentKey(source.clone(), destination.clone());
260        // Insert the consent decision into the persistent state
261        self.states.insert(key.clone(), consent);
262
263        // Always attempt to notify any waiting decision receivers
264        // This includes:
265        // 1. Exact key match (specific resource request)
266        // 2. Hierarchical matches (parent destinations like nodes)
267
268        // First, try exact match
269        if let Some((_, decision_feed)) = self.decision_channels.remove(&key) {
270            let _ = decision_feed.send(consent);
271        }
272
273        // Then, try to notify any child destinations that might be waiting
274        // For example, if we set consent at Node level, notify all Resource-level requests
275        // with that node as parent
276        let mut keys_to_remove = Vec::new();
277        for entry in self.decision_channels.iter() {
278            let other_key = entry.key();
279            // Check if this other_key matches our hierarchy
280            // It matches if: same source AND other_key's destination has our destination as parent
281            if other_key.0 == source {
282                // Check if setting destination satisfies the hierarchy requirement for other_key
283                let other_dest = &other_key.1;
284
285                // Build the hierarchy of other_dest and check if our destination is in it
286                let mut current: Option<&Destination> = Some(other_dest);
287                while let Some(dest) = current {
288                    if dest == &destination {
289                        // Found a match! This waiting request should be notified
290                        keys_to_remove.push(other_key.clone());
291                        break;
292                    }
293                    // Move to parent in hierarchy
294                    current = match dest {
295                        Destination::Resource { parent, .. } => parent.as_deref(),
296                        Destination::Node(_) => None,
297                    };
298                }
299            }
300        }
301
302        // Remove and notify all matching keys
303        for key_to_remove in keys_to_remove {
304            if let Some((_, decision_feed)) = self.decision_channels.remove(&key_to_remove) {
305                let _ = decision_feed.send(consent);
306            }
307        }
308    }
309
310    /// Internal method to subscribe to consent request notifications
311    fn take_resource_ownership(&self, resource: Resource) -> broadcast::Receiver<Destination> {
312        match self.notifications_channels.entry(resource.clone()) {
313            Entry::Occupied(entry) => entry.get().subscribe(),
314            Entry::Vacant(entry) => {
315                let (tx, rx) = broadcast::channel(100);
316                entry.insert(tx);
317                rx
318            }
319        }
320    }
321}
322
323impl Service<ConsentRequest> for ConsentService {
324    type Response = ConsentResponse;
325    type Error = TraceabilityError;
326    type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
327
328    fn poll_ready(&mut self, _: &mut std::task::Context<'_>) -> Poll<Result<(), Self::Error>> {
329        Poll::Ready(Ok(()))
330    }
331
332    fn call(&mut self, request: ConsentRequest) -> Self::Future {
333        let this = self.clone();
334        Box::pin(async move {
335            match request {
336                ConsentRequest::RequestConsent { source, destination } => {
337                    info!(source = %source, destination = ?destination, "[consent] RequestConsent");
338                    this.get_consent(source, destination).await.map(ConsentResponse::Consent)
339                }
340                ConsentRequest::SetConsent { source, destination, consent } => {
341                    info!(consent = %consent, source = %source, destination = ?destination, "[consent] SetConsent");
342                    this.set_consent(source, destination, consent);
343                    Ok(ConsentResponse::Ack)
344                }
345                ConsentRequest::TakeResourceOwnership(resource) => {
346                    info!(resource = %resource, "[consent] TakeResourceOwnership");
347                    Ok(ConsentResponse::Notifications(this.take_resource_ownership(resource)))
348                }
349            }
350        })
351    }
352}
353
354#[cfg(test)]
355mod tests {
356    use super::*;
357
358    #[tokio::test]
359    async fn test_consent_service_no_ownership() {
360        crate::trace2e_tracing::init();
361        let mut consent_service = ConsentService::new(0);
362        let resource = Resource::new_process_mock(0);
363        let destination = Resource::new_file("/tmp/test.txt".to_string());
364        let request = ConsentRequest::RequestConsent {
365            source: resource.clone(),
366            destination: Destination::new(None, Some(destination.clone())),
367        };
368        let response = consent_service.call(request).await.unwrap();
369        // No ownership, so no consent can be granted
370        assert!(matches!(response, ConsentResponse::Consent(false)));
371    }
372
373    #[tokio::test]
374    async fn test_consent_service_with_ownership_with_decision_on_notification() {
375        crate::trace2e_tracing::init();
376        let mut consent_service = ConsentService::new(0);
377        let resource = Resource::new_process_mock(0);
378        let destination =
379            Destination::new(None, Some(Resource::new_file("/tmp/test.txt".to_string())));
380        let request = ConsentRequest::TakeResourceOwnership(resource.clone());
381        let ownership_response = consent_service.call(request).await.unwrap();
382        let ConsentResponse::Notifications(mut notifications_feed) = ownership_response else {
383            panic!("Expected Notifications");
384        };
385        let resource_clone = resource.clone();
386        let destination_clone = destination.clone();
387        let mut consent_service_clone = consent_service.clone();
388        tokio::task::spawn(async move {
389            assert!(matches!(
390                consent_service_clone
391                    .call(ConsentRequest::RequestConsent {
392                        source: resource_clone,
393                        destination: destination_clone,
394                    })
395                    .await
396                    .unwrap(),
397                ConsentResponse::Consent(true)
398            ));
399        });
400
401        assert_eq!(notifications_feed.recv().await.unwrap(), destination.clone());
402        consent_service
403            .call(ConsentRequest::SetConsent { source: resource, destination, consent: true })
404            .await
405            .unwrap();
406    }
407
408    #[tokio::test]
409    async fn test_consent_service_with_ownership_with_decision_timeout() {
410        crate::trace2e_tracing::init();
411        let mut consent_service = ConsentService::new(1);
412        let resource = Resource::new_process_mock(0);
413        let destination =
414            Destination::new(None, Some(Resource::new_file("/tmp/test.txt".to_string())));
415        let request = ConsentRequest::TakeResourceOwnership(resource.clone());
416        let ownership_response = consent_service.call(request).await.unwrap();
417        let ConsentResponse::Notifications(mut notifications_feed) = ownership_response else {
418            panic!("Expected Notifications");
419        };
420
421        // Spawn a task to check the consent request timeout before the decision is sent
422        let resource_clone = resource.clone();
423        let destination_clone = destination.clone();
424        let mut consent_service_clone = consent_service.clone();
425        tokio::task::spawn(async move {
426            assert!(matches!(
427                consent_service_clone
428                    .call(ConsentRequest::RequestConsent {
429                        source: resource_clone,
430                        destination: destination_clone,
431                    })
432                    .await
433                    .unwrap_err(),
434                TraceabilityError::ConsentRequestTimeout
435            ));
436        });
437
438        tokio::time::sleep(Duration::from_millis(2)).await;
439        assert_eq!(notifications_feed.recv().await.unwrap(), destination.clone());
440        consent_service
441            .call(ConsentRequest::SetConsent {
442                source: resource.clone(),
443                destination: destination.clone(),
444                consent: true,
445            })
446            .await
447            .unwrap();
448
449        // Check the consent request timeout after the decision is sent
450        assert!(matches!(
451            consent_service
452                .call(ConsentRequest::RequestConsent { source: resource, destination })
453                .await
454                .unwrap(),
455            ConsentResponse::Consent(true)
456        ));
457    }
458
459    #[tokio::test]
460    async fn test_hierarchical_consent_resource_overrides_node() {
461        crate::trace2e_tracing::init();
462        let mut consent_service = ConsentService::new(0);
463        let source = Resource::new_process_mock(0);
464        let node_id = "node1".to_string();
465        let resource = Resource::new_file("/tmp/test.txt".to_string());
466
467        // Set node-level consent to false
468        consent_service
469            .call(ConsentRequest::SetConsent {
470                source: source.clone(),
471                destination: Destination::Node(node_id.clone()),
472                consent: false,
473            })
474            .await
475            .unwrap();
476
477        // Set resource-level consent to true (should override node-level)
478        consent_service
479            .call(ConsentRequest::SetConsent {
480                source: source.clone(),
481                destination: Destination::new(Some(node_id.clone()), Some(resource.clone())),
482                consent: true,
483            })
484            .await
485            .unwrap();
486
487        // Request consent for the resource - should get true (resource-level overrides node-level)
488        let response = consent_service
489            .call(ConsentRequest::RequestConsent {
490                source: source.clone(),
491                destination: Destination::new(Some(node_id), Some(resource)),
492            })
493            .await
494            .unwrap();
495
496        assert!(matches!(response, ConsentResponse::Consent(true)));
497    }
498
499    #[tokio::test]
500    async fn test_hierarchical_consent_node_level_fallback() {
501        crate::trace2e_tracing::init();
502        let mut consent_service = ConsentService::new(0);
503        let source = Resource::new_process_mock(0);
504        let node_id = "node1".to_string();
505        let resource = Resource::new_file("/tmp/test.txt".to_string());
506
507        // Set only node-level consent to true
508        consent_service
509            .call(ConsentRequest::SetConsent {
510                source: source.clone(),
511                destination: Destination::Node(node_id.clone()),
512                consent: true,
513            })
514            .await
515            .unwrap();
516
517        // Request consent for a resource on that node - should fall back to node-level
518        let response = consent_service
519            .call(ConsentRequest::RequestConsent {
520                source: source.clone(),
521                destination: Destination::new(Some(node_id), Some(resource)),
522            })
523            .await
524            .unwrap();
525
526        assert!(matches!(response, ConsentResponse::Consent(true)));
527    }
528
529    #[tokio::test]
530    async fn test_hierarchical_consent_most_specific_wins() {
531        crate::trace2e_tracing::init();
532        let mut consent_service = ConsentService::new(0);
533        let source = Resource::new_process_mock(0);
534        let node_id = "node1".to_string();
535        let resource1 = Resource::new_file("/tmp/allowed.txt".to_string());
536        let resource2 = Resource::new_file("/tmp/denied.txt".to_string());
537
538        // Set node-level consent to true (permissive default)
539        consent_service
540            .call(ConsentRequest::SetConsent {
541                source: source.clone(),
542                destination: Destination::Node(node_id.clone()),
543                consent: true,
544            })
545            .await
546            .unwrap();
547
548        // Set resource-level consent to false for specific resource (more specific, should win)
549        consent_service
550            .call(ConsentRequest::SetConsent {
551                source: source.clone(),
552                destination: Destination::new(Some(node_id.clone()), Some(resource2.clone())),
553                consent: false,
554            })
555            .await
556            .unwrap();
557
558        // Resource1 should inherit node-level consent (true)
559        let response1 = consent_service
560            .call(ConsentRequest::RequestConsent {
561                source: source.clone(),
562                destination: Destination::new(Some(node_id.clone()), Some(resource1)),
563            })
564            .await
565            .unwrap();
566        assert!(matches!(response1, ConsentResponse::Consent(true)));
567
568        // Resource2 should use its specific consent (false), overriding node-level
569        let response2 = consent_service
570            .call(ConsentRequest::RequestConsent {
571                source,
572                destination: Destination::new(Some(node_id), Some(resource2)),
573            })
574            .await
575            .unwrap();
576        assert!(matches!(response2, ConsentResponse::Consent(false)));
577    }
578}