trace2e_core/traceability/services/
provenance.rs

1//! Provenance service for tracking resource references across nodes.
2//!
3//! Provides async helpers to get/update provenance and a tower::Service implementation.
4use std::{collections::HashSet, pin::Pin, sync::Arc, task::Poll};
5
6use dashmap::DashMap;
7use tower::Service;
8
9use crate::traceability::infrastructure::naming::DisplayableResource;
10use tracing::info;
11
12use crate::traceability::{
13    api::types::{ProvenanceRequest, ProvenanceResponse},
14    error::TraceabilityError,
15    infrastructure::naming::{LocalizedResource, NodeId, Resource},
16};
17
18type ProvenanceMap = DashMap<Resource, HashSet<LocalizedResource>>;
19
20/// Provenance service for tracking resources provenance
21#[derive(Debug, Default, Clone)]
22pub struct ProvenanceService {
23    node_id: String,
24    provenance: Arc<ProvenanceMap>,
25}
26
27impl ProvenanceService {
28    pub fn new(node_id: String) -> Self {
29        Self { node_id, provenance: Arc::new(DashMap::new()) }
30    }
31
32    fn init_provenance(&self, resource: &Resource) -> HashSet<LocalizedResource> {
33        if !resource.is_stream() {
34            HashSet::from([LocalizedResource::new(self.node_id.clone(), resource.to_owned())])
35        } else {
36            HashSet::new()
37        }
38    }
39
40    /// Get the provenance of a resource
41    ///
42    /// This function returns a map of node IDs to the provenance of the resource for that node.
43    /// If the resource is found, it initializes the provenance for the resource.
44    fn get_prov(&self, resource: &Resource) -> HashSet<LocalizedResource> {
45        if let Some(prov) = self.provenance.get(resource) {
46            prov.to_owned()
47        } else {
48            self.init_provenance(resource)
49        }
50    }
51
52    /// Update the provenance of the destination with the source
53    ///
54    /// Note that this function does not guarantee sequential consistency,
55    /// this is the role of the sequencer.
56    fn update(&mut self, source: &Resource, destination: &Resource) -> ProvenanceResponse {
57        // Update the provenance of the destination with the source provenance
58        self.update_raw(self.get_prov(source), destination)
59    }
60
61    /// Update the provenance of the destination with the raw source provenance
62    ///
63    /// Note that this function does not guarantee sequential consistency,
64    /// this is the role of the sequencer.
65    fn update_raw(
66        &mut self,
67        source_prov: HashSet<LocalizedResource>,
68        destination: &Resource,
69    ) -> ProvenanceResponse {
70        let mut destination_prov = self.get_prov(destination);
71        if source_prov.is_subset(&destination_prov) {
72            info!(
73                "[provenance-raw] Provenance not updated: source_prov is subset of destination_prov"
74            );
75            ProvenanceResponse::ProvenanceNotUpdated
76        } else {
77            destination_prov.extend(source_prov);
78            info!(
79                destination_prov = %DisplayableResource::from(&destination_prov),
80                "[provenance-raw] Provenance updated"
81            );
82            self.provenance.insert(destination.to_owned(), destination_prov);
83            ProvenanceResponse::ProvenanceUpdated
84        }
85    }
86}
87
88impl NodeId for ProvenanceService {
89    fn node_id(&self) -> String {
90        self.node_id.to_owned()
91    }
92}
93
94impl Service<ProvenanceRequest> for ProvenanceService {
95    type Response = ProvenanceResponse;
96    type Error = TraceabilityError;
97    type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
98
99    fn poll_ready(&mut self, _: &mut std::task::Context<'_>) -> Poll<Result<(), Self::Error>> {
100        Poll::Ready(Ok(()))
101    }
102
103    fn call(&mut self, request: ProvenanceRequest) -> Self::Future {
104        let mut this = self.clone();
105        Box::pin(async move {
106            match request {
107                ProvenanceRequest::GetReferences(resource) => {
108                    info!(node_id = %this.node_id, resource = %resource, "[provenance] GetReferences");
109                    Ok(ProvenanceResponse::Provenance(this.get_prov(&resource)))
110                }
111                ProvenanceRequest::UpdateProvenance { source, destination } => {
112                    info!(
113                        node_id = %this.node_id,
114                        source = %source,
115                        destination = %destination,
116                        "[provenance] UpdateProvenance"
117                    );
118                    Ok(this.update(&source, &destination))
119                }
120                ProvenanceRequest::UpdateProvenanceRaw { source_prov, destination } => {
121                    info!(
122                        node_id = %this.node_id,
123                        source_prov = %DisplayableResource::from(&source_prov),
124                        destination = %destination,
125                        "[provenance] UpdateProvenanceRaw"
126                    );
127                    Ok(this.update_raw(source_prov, &destination))
128                }
129            }
130        })
131    }
132}
133
134#[cfg(test)]
135mod tests {
136    use super::*;
137
138    #[test]
139    fn unit_provenance_update_simple() {
140        crate::trace2e_tracing::init();
141        let mut provenance = ProvenanceService::default();
142        let process = LocalizedResource::new(provenance.node_id(), Resource::new_process_mock(0));
143        let file = LocalizedResource::new(
144            provenance.node_id(),
145            Resource::new_file("/tmp/test".to_string()),
146        );
147
148        assert_eq!(
149            provenance.update(file.resource(), process.resource()),
150            ProvenanceResponse::ProvenanceUpdated
151        );
152        // Check that the process is now derived from the file
153        assert_eq!(provenance.get_prov(process.resource()), HashSet::from([file, process]));
154    }
155
156    #[test]
157    fn unit_provenance_update_circular() {
158        crate::trace2e_tracing::init();
159        let mut provenance = ProvenanceService::default();
160        let process = LocalizedResource::new(provenance.node_id(), Resource::new_process_mock(0));
161        let file = LocalizedResource::new(
162            provenance.node_id(),
163            Resource::new_file("/tmp/test".to_string()),
164        );
165
166        assert_eq!(
167            provenance.update(process.resource(), file.resource()),
168            ProvenanceResponse::ProvenanceUpdated
169        );
170        assert_eq!(
171            provenance.update(file.resource(), process.resource()),
172            ProvenanceResponse::ProvenanceUpdated
173        );
174
175        // Check the proper handling of circular dependencies
176        assert_eq!(provenance.get_prov(file.resource()), provenance.get_prov(process.resource()));
177    }
178
179    #[tokio::test]
180    async fn unit_provenance_service_flow_simple() {
181        crate::trace2e_tracing::init();
182        let mut provenance = ProvenanceService::default();
183        let process = LocalizedResource::new(provenance.node_id(), Resource::new_process_mock(0));
184        let file = LocalizedResource::new(
185            provenance.node_id(),
186            Resource::new_file("/tmp/test".to_string()),
187        );
188
189        assert_eq!(
190            provenance
191                .call(ProvenanceRequest::GetReferences(process.resource().clone()))
192                .await
193                .unwrap(),
194            ProvenanceResponse::Provenance(HashSet::from([process.clone()]))
195        );
196
197        assert_eq!(
198            provenance
199                .call(ProvenanceRequest::UpdateProvenance {
200                    source: file.resource().clone(),
201                    destination: process.resource().clone(),
202                })
203                .await
204                .unwrap(),
205            ProvenanceResponse::ProvenanceUpdated
206        );
207
208        assert_eq!(
209            provenance
210                .call(ProvenanceRequest::UpdateProvenance {
211                    source: file.resource().clone(),
212                    destination: process.resource().clone(),
213                })
214                .await
215                .unwrap(),
216            ProvenanceResponse::ProvenanceNotUpdated
217        );
218
219        assert_eq!(
220            provenance
221                .call(ProvenanceRequest::GetReferences(process.resource().clone()))
222                .await
223                .unwrap(),
224            ProvenanceResponse::Provenance(HashSet::from([file, process,]))
225        );
226    }
227}