trace2e_core/traceability/services/
provenance.rs1use std::{collections::HashSet, pin::Pin, sync::Arc, task::Poll};
5
6use dashmap::DashMap;
7use tower::Service;
8
9use crate::traceability::infrastructure::naming::DisplayableResource;
10use tracing::info;
11
12use crate::traceability::{
13 api::types::{ProvenanceRequest, ProvenanceResponse},
14 error::TraceabilityError,
15 infrastructure::naming::{LocalizedResource, NodeId, Resource},
16};
17
18type ProvenanceMap = DashMap<Resource, HashSet<LocalizedResource>>;
19
20#[derive(Debug, Default, Clone)]
22pub struct ProvenanceService {
23 node_id: String,
24 provenance: Arc<ProvenanceMap>,
25}
26
27impl ProvenanceService {
28 pub fn new(node_id: String) -> Self {
29 Self { node_id, provenance: Arc::new(DashMap::new()) }
30 }
31
32 fn init_provenance(&self, resource: &Resource) -> HashSet<LocalizedResource> {
33 if !resource.is_stream() {
34 HashSet::from([LocalizedResource::new(self.node_id.clone(), resource.to_owned())])
35 } else {
36 HashSet::new()
37 }
38 }
39
40 fn get_prov(&self, resource: &Resource) -> HashSet<LocalizedResource> {
45 if let Some(prov) = self.provenance.get(resource) {
46 prov.to_owned()
47 } else {
48 self.init_provenance(resource)
49 }
50 }
51
52 fn update(&mut self, source: &Resource, destination: &Resource) -> ProvenanceResponse {
57 self.update_raw(self.get_prov(source), destination)
59 }
60
61 fn update_raw(
66 &mut self,
67 source_prov: HashSet<LocalizedResource>,
68 destination: &Resource,
69 ) -> ProvenanceResponse {
70 let mut destination_prov = self.get_prov(destination);
71 if source_prov.is_subset(&destination_prov) {
72 info!(
73 "[provenance-raw] Provenance not updated: source_prov is subset of destination_prov"
74 );
75 ProvenanceResponse::ProvenanceNotUpdated
76 } else {
77 destination_prov.extend(source_prov);
78 info!(
79 destination_prov = %DisplayableResource::from(&destination_prov),
80 "[provenance-raw] Provenance updated"
81 );
82 self.provenance.insert(destination.to_owned(), destination_prov);
83 ProvenanceResponse::ProvenanceUpdated
84 }
85 }
86}
87
88impl NodeId for ProvenanceService {
89 fn node_id(&self) -> String {
90 self.node_id.to_owned()
91 }
92}
93
94impl Service<ProvenanceRequest> for ProvenanceService {
95 type Response = ProvenanceResponse;
96 type Error = TraceabilityError;
97 type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
98
99 fn poll_ready(&mut self, _: &mut std::task::Context<'_>) -> Poll<Result<(), Self::Error>> {
100 Poll::Ready(Ok(()))
101 }
102
103 fn call(&mut self, request: ProvenanceRequest) -> Self::Future {
104 let mut this = self.clone();
105 Box::pin(async move {
106 match request {
107 ProvenanceRequest::GetReferences(resource) => {
108 info!(node_id = %this.node_id, resource = %resource, "[provenance] GetReferences");
109 Ok(ProvenanceResponse::Provenance(this.get_prov(&resource)))
110 }
111 ProvenanceRequest::UpdateProvenance { source, destination } => {
112 info!(
113 node_id = %this.node_id,
114 source = %source,
115 destination = %destination,
116 "[provenance] UpdateProvenance"
117 );
118 Ok(this.update(&source, &destination))
119 }
120 ProvenanceRequest::UpdateProvenanceRaw { source_prov, destination } => {
121 info!(
122 node_id = %this.node_id,
123 source_prov = %DisplayableResource::from(&source_prov),
124 destination = %destination,
125 "[provenance] UpdateProvenanceRaw"
126 );
127 Ok(this.update_raw(source_prov, &destination))
128 }
129 }
130 })
131 }
132}
133
134#[cfg(test)]
135mod tests {
136 use super::*;
137
138 #[test]
139 fn unit_provenance_update_simple() {
140 crate::trace2e_tracing::init();
141 let mut provenance = ProvenanceService::default();
142 let process = LocalizedResource::new(provenance.node_id(), Resource::new_process_mock(0));
143 let file = LocalizedResource::new(
144 provenance.node_id(),
145 Resource::new_file("/tmp/test".to_string()),
146 );
147
148 assert_eq!(
149 provenance.update(file.resource(), process.resource()),
150 ProvenanceResponse::ProvenanceUpdated
151 );
152 assert_eq!(provenance.get_prov(process.resource()), HashSet::from([file, process]));
154 }
155
156 #[test]
157 fn unit_provenance_update_circular() {
158 crate::trace2e_tracing::init();
159 let mut provenance = ProvenanceService::default();
160 let process = LocalizedResource::new(provenance.node_id(), Resource::new_process_mock(0));
161 let file = LocalizedResource::new(
162 provenance.node_id(),
163 Resource::new_file("/tmp/test".to_string()),
164 );
165
166 assert_eq!(
167 provenance.update(process.resource(), file.resource()),
168 ProvenanceResponse::ProvenanceUpdated
169 );
170 assert_eq!(
171 provenance.update(file.resource(), process.resource()),
172 ProvenanceResponse::ProvenanceUpdated
173 );
174
175 assert_eq!(provenance.get_prov(file.resource()), provenance.get_prov(process.resource()));
177 }
178
179 #[tokio::test]
180 async fn unit_provenance_service_flow_simple() {
181 crate::trace2e_tracing::init();
182 let mut provenance = ProvenanceService::default();
183 let process = LocalizedResource::new(provenance.node_id(), Resource::new_process_mock(0));
184 let file = LocalizedResource::new(
185 provenance.node_id(),
186 Resource::new_file("/tmp/test".to_string()),
187 );
188
189 assert_eq!(
190 provenance
191 .call(ProvenanceRequest::GetReferences(process.resource().clone()))
192 .await
193 .unwrap(),
194 ProvenanceResponse::Provenance(HashSet::from([process.clone()]))
195 );
196
197 assert_eq!(
198 provenance
199 .call(ProvenanceRequest::UpdateProvenance {
200 source: file.resource().clone(),
201 destination: process.resource().clone(),
202 })
203 .await
204 .unwrap(),
205 ProvenanceResponse::ProvenanceUpdated
206 );
207
208 assert_eq!(
209 provenance
210 .call(ProvenanceRequest::UpdateProvenance {
211 source: file.resource().clone(),
212 destination: process.resource().clone(),
213 })
214 .await
215 .unwrap(),
216 ProvenanceResponse::ProvenanceNotUpdated
217 );
218
219 assert_eq!(
220 provenance
221 .call(ProvenanceRequest::GetReferences(process.resource().clone()))
222 .await
223 .unwrap(),
224 ProvenanceResponse::Provenance(HashSet::from([file, process,]))
225 );
226 }
227}