trace2e_core/traceability/core/
sequencer.rs

1use std::{collections::VecDeque, pin::Pin, sync::Arc, task::Poll};
2
3use dashmap::DashMap;
4use tokio::{join, sync::oneshot};
5use tower::Service;
6#[cfg(feature = "trace2e_tracing")]
7use tracing::{debug, info};
8
9use crate::traceability::{
10    api::{SequencerRequest, SequencerResponse},
11    error::TraceabilityError,
12    naming::Resource,
13};
14
15/// Sequencer service for managing resources reservation
16///
17/// This service does not offer flow collision handling, it is the responsibility of the caller.
18/// `WaitingQueueService` can be set up as a layer to handle collisions.
19#[derive(Clone, Default)]
20pub struct SequencerService {
21    flows: Arc<DashMap<Resource, Resource>>,
22}
23
24impl SequencerService {
25    /// Make a flow
26    /// Returns the availability state of the source and destination before the attempt
27    async fn make_flow(
28        &self,
29        source: Resource,
30        destination: Resource,
31    ) -> Result<SequencerResponse, TraceabilityError> {
32        // source is not already reserved by a writer
33        let source_available = !self.flows.contains_key(&source);
34        // destination is not already reserved by a reader or writer
35        let destination_available = !self.flows.contains_key(&destination)
36            && !self.flows.iter().any(|entry| entry.value() == &destination);
37
38        // if both are available, create a flow
39        if source_available && destination_available {
40            self.flows.insert(destination, source);
41            Ok(SequencerResponse::FlowReserved)
42        } else if source_available {
43            Err(TraceabilityError::UnavailableDestination(destination))
44        } else if destination_available {
45            Err(TraceabilityError::UnavailableSource(source))
46        } else {
47            Err(TraceabilityError::UnavailableSourceAndDestination(source, destination))
48        }
49    }
50
51    /// Drop a flow
52    /// Returns the SequencerResponse to the caller
53    async fn drop_flow(
54        &self,
55        destination: &Resource,
56    ) -> Result<SequencerResponse, TraceabilityError> {
57        if let Some((_, source)) = self.flows.remove(destination) {
58            if self.flows.iter().any(|entry| entry.value() == &source) {
59                // Partial release of the flow
60                // source is still reserved as reader, notify the waiting queue of the destination
61                Ok(SequencerResponse::FlowReleased {
62                    source: None,
63                    destination: Some(destination.to_owned()),
64                })
65            } else {
66                // Complete release of the flow
67                // both source and destination are available again, notify both waiting queues
68                Ok(SequencerResponse::FlowReleased {
69                    source: Some(source),
70                    destination: Some(destination.to_owned()),
71                })
72            }
73        } else {
74            // Destination is not reserved, nothing to do
75            Ok(SequencerResponse::FlowReleased { source: None, destination: None })
76        }
77    }
78}
79
80impl Service<SequencerRequest> for SequencerService {
81    type Response = SequencerResponse;
82    type Error = TraceabilityError;
83    type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
84
85    fn poll_ready(&mut self, _: &mut std::task::Context<'_>) -> Poll<Result<(), Self::Error>> {
86        Poll::Ready(Ok(()))
87    }
88
89    fn call(&mut self, request: SequencerRequest) -> Self::Future {
90        let this = self.clone();
91        Box::pin(async move {
92            match request {
93                SequencerRequest::ReserveFlow { source, destination } => {
94                    #[cfg(feature = "trace2e_tracing")]
95                    info!(
96                        "[sequencer] ReserveFlow: source: {:?}, destination: {:?}",
97                        source, destination
98                    );
99                    this.make_flow(source, destination).await
100                }
101                SequencerRequest::ReleaseFlow { destination } => {
102                    #[cfg(feature = "trace2e_tracing")]
103                    info!("[sequencer] ReleaseFlow: destination: {:?}", destination);
104                    this.drop_flow(&destination).await
105                }
106            }
107        })
108    }
109}
110
111/// Waiting queue service for managing resources reservation
112///
113/// This service is used to handle flow collisions.
114#[derive(Clone)]
115pub struct WaitingQueueService<T> {
116    inner: T,
117    waiting_queue: Arc<DashMap<Resource, VecDeque<oneshot::Sender<()>>>>,
118    max_retries: u32,
119}
120
121impl<T> WaitingQueueService<T> {
122    pub fn new(inner: T, max_retries: Option<u32>) -> Self {
123        Self {
124            inner,
125            waiting_queue: Arc::new(DashMap::new()),
126            // If None, so the waiting queue is not used
127            max_retries: max_retries.unwrap_or_default(),
128        }
129    }
130
131    async fn join_waiting_queue(&self, resource: &Resource) -> oneshot::Receiver<()> {
132        let (tx, rx) = oneshot::channel();
133        if let Some(mut queue) = self.waiting_queue.get_mut(resource) {
134            queue.push_back(tx);
135        } else {
136            let mut queue = VecDeque::new();
137            queue.push_back(tx);
138            self.waiting_queue.insert(resource.to_owned(), queue);
139        }
140        rx
141    }
142
143    async fn notify_waiting_queue(&self, resource: &Option<Resource>) {
144        if let Some(resource) = resource
145            && let Some(mut queue) = self.waiting_queue.get_mut(resource)
146            && let Some(tx) = queue.pop_front()
147        {
148            tx.send(()).unwrap();
149        }
150    }
151}
152
153impl<T> Service<SequencerRequest> for WaitingQueueService<T>
154where
155    T: Service<SequencerRequest, Response = SequencerResponse, Error = TraceabilityError>
156        + Clone
157        + Sync
158        + Send
159        + 'static,
160    T::Future: Send,
161{
162    type Response = T::Response;
163    type Error = T::Error;
164    type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
165
166    fn poll_ready(&mut self, cx: &mut std::task::Context<'_>) -> Poll<Result<(), Self::Error>> {
167        self.inner.poll_ready(cx)
168    }
169
170    fn call(&mut self, req: SequencerRequest) -> Self::Future {
171        let mut inner = self.inner.clone();
172        let max_tries = self.max_retries + 1;
173        let this = self.clone();
174        Box::pin(async move {
175            for _ in 0..max_tries {
176                match inner.call(req.clone()).await {
177                    Ok(SequencerResponse::FlowReserved) => {
178                        #[cfg(feature = "trace2e_tracing")]
179                        debug!("[sequencer] FlowReserved");
180                        return Ok(SequencerResponse::FlowReserved);
181                    }
182                    Ok(SequencerResponse::FlowReleased { source, destination }) => {
183                        join!(
184                            this.notify_waiting_queue(&source),
185                            this.notify_waiting_queue(&destination)
186                        );
187                        #[cfg(feature = "trace2e_tracing")]
188                        debug!(
189                            "[sequencer] FlowReleased: source: {:?}, destination: {:?}",
190                            source, destination
191                        );
192                        return Ok(SequencerResponse::FlowReleased { source, destination });
193                    }
194                    Err(TraceabilityError::UnavailableSource(source)) => {
195                        let rx = this.join_waiting_queue(&source).await;
196                        #[cfg(feature = "trace2e_tracing")]
197                        debug!("[sequencer] waiting source: {:?}", source);
198                        let _ = rx.await;
199                    }
200                    Err(TraceabilityError::UnavailableDestination(destination)) => {
201                        let rx = this.join_waiting_queue(&destination).await;
202                        #[cfg(feature = "trace2e_tracing")]
203                        debug!("[sequencer] waiting destination: {:?}", destination);
204                        let _ = rx.await;
205                    }
206                    Err(TraceabilityError::UnavailableSourceAndDestination(
207                        source,
208                        destination,
209                    )) => {
210                        let rx1 = this.join_waiting_queue(&source).await;
211                        let rx2 = this.join_waiting_queue(&destination).await;
212                        #[cfg(feature = "trace2e_tracing")]
213                        debug!(
214                            "[sequencer] waiting source: {:?}, destination: {:?}",
215                            source, destination
216                        );
217                        let (_, _) = join!(rx1, rx2);
218                    }
219                    Err(e) => return Err(e),
220                }
221            }
222            Err(TraceabilityError::ReachedMaxRetriesWaitingQueue)
223        })
224    }
225}
226
227#[cfg(test)]
228mod tests {
229    use std::time::Duration;
230
231    use tower::{ServiceBuilder, layer::layer_fn, timeout::TimeoutLayer};
232
233    use super::*;
234    use crate::traceability::naming::Resource;
235    #[tokio::test]
236    async fn unit_sequencer_impl_flow() {
237        #[cfg(feature = "trace2e_tracing")]
238        crate::trace2e_tracing::init();
239        let sequencer = SequencerService::default();
240        let process = Resource::new_process_mock(0);
241        let file = Resource::new_file("/tmp/test".to_string());
242        assert_eq!(
243            sequencer.make_flow(process.clone(), file.clone()).await,
244            Ok(SequencerResponse::FlowReserved)
245        );
246        assert_eq!(
247            sequencer.drop_flow(&file).await.unwrap(),
248            SequencerResponse::FlowReleased {
249                source: Some(process.clone()),
250                destination: Some(file.clone())
251            }
252        );
253        assert_eq!(
254            sequencer.make_flow(process.clone(), file.clone()).await,
255            Ok(SequencerResponse::FlowReserved)
256        );
257        assert_eq!(
258            sequencer.drop_flow(&file).await.unwrap(),
259            SequencerResponse::FlowReleased { source: Some(process), destination: Some(file) }
260        );
261    }
262
263    #[tokio::test]
264    async fn unit_sequencer_impl_flow_drop_already_dropped() {
265        #[cfg(feature = "trace2e_tracing")]
266        crate::trace2e_tracing::init();
267        let sequencer = SequencerService::default();
268        let process = Resource::new_process_mock(0);
269        let file = Resource::new_file("/tmp/test".to_string());
270        assert_eq!(
271            sequencer.make_flow(process.clone(), file.clone()).await,
272            Ok(SequencerResponse::FlowReserved)
273        );
274        assert_eq!(
275            sequencer.drop_flow(&file).await.unwrap(),
276            SequencerResponse::FlowReleased {
277                source: Some(process),
278                destination: Some(file.clone())
279            }
280        );
281        // Already dropped, source is still available so it return true again
282        assert_eq!(
283            sequencer.drop_flow(&file).await.unwrap(),
284            SequencerResponse::FlowReleased { source: None, destination: None }
285        );
286    }
287
288    #[tokio::test]
289    async fn unit_sequencer_impl_flow_readers_drop() {
290        #[cfg(feature = "trace2e_tracing")]
291        crate::trace2e_tracing::init();
292        let sequencer = SequencerService::default();
293        let process = Resource::new_process_mock(0);
294        let file1 = Resource::new_file("/tmp/test1".to_string());
295        let file2 = Resource::new_file("/tmp/test2".to_string());
296        let file3 = Resource::new_file("/tmp/test3".to_string());
297        let file4 = Resource::new_file("/tmp/test4".to_string());
298        assert_eq!(
299            sequencer.make_flow(process.clone(), file1.clone()).await,
300            Ok(SequencerResponse::FlowReserved)
301        );
302        assert_eq!(
303            sequencer.make_flow(process.clone(), file2.clone()).await,
304            Ok(SequencerResponse::FlowReserved)
305        );
306        assert_eq!(
307            sequencer.make_flow(process.clone(), file3.clone()).await,
308            Ok(SequencerResponse::FlowReserved)
309        );
310
311        // Must fail because process is already reserved 3 times as reader
312        assert_eq!(
313            sequencer.make_flow(file4.clone(), process.clone()).await,
314            Err(TraceabilityError::UnavailableDestination(process.clone()))
315        );
316
317        // Drop 2 reservations
318        assert_eq!(
319            sequencer.drop_flow(&file2).await.unwrap(),
320            SequencerResponse::FlowReleased { source: None, destination: Some(file2) }
321        );
322        assert_eq!(
323            sequencer.drop_flow(&file1).await.unwrap(),
324            SequencerResponse::FlowReleased { source: None, destination: Some(file1) }
325        );
326
327        // Must fail because process is still reserved once as reader
328        assert_eq!(
329            sequencer.make_flow(file4.clone(), process.clone()).await,
330            Err(TraceabilityError::UnavailableDestination(process.clone()))
331        );
332
333        // Drop last reservation
334        assert_eq!(
335            sequencer.drop_flow(&file3).await.unwrap(),
336            SequencerResponse::FlowReleased {
337                source: Some(process.clone()),
338                destination: Some(file3)
339            }
340        );
341
342        // Must succeed because process is not reserved as reader
343        assert_eq!(sequencer.make_flow(file4, process).await, Ok(SequencerResponse::FlowReserved));
344    }
345
346    #[tokio::test]
347    async fn unit_sequencer_impl_flow_interference() {
348        #[cfg(feature = "trace2e_tracing")]
349        crate::trace2e_tracing::init();
350        let sequencer = SequencerService::default();
351        let process1 = Resource::new_process_mock(1);
352        let process2 = Resource::new_process_mock(2);
353        let file1 = Resource::new_file("/tmp/test1".to_string());
354        let file2 = Resource::new_file("/tmp/test2".to_string());
355
356        assert_eq!(
357            sequencer.make_flow(file1.clone(), process1.clone()).await,
358            Ok(SequencerResponse::FlowReserved)
359        );
360
361        // Fails because try get write of write lock
362        // (this case may be released in the future, this flow already exists)
363        assert_eq!(
364            sequencer.make_flow(file1.clone(), process1.clone()).await,
365            Err(TraceabilityError::UnavailableDestination(process1.clone()))
366        );
367
368        // Fails because try get write on read lock
369        assert_eq!(
370            sequencer.make_flow(process2, file1.clone()).await,
371            Err(TraceabilityError::UnavailableDestination(file1.clone()))
372        );
373
374        // Fails because try get read on write lock
375        assert_eq!(
376            sequencer.make_flow(process1.clone(), file2).await,
377            Err(TraceabilityError::UnavailableSource(process1.clone()))
378        );
379
380        // Fails because circular flow (get read on write lock & get write on read lock)
381        assert_eq!(
382            sequencer.make_flow(process1.clone(), file1.clone()).await,
383            Err(TraceabilityError::UnavailableSourceAndDestination(process1, file1))
384        );
385    }
386
387    #[tokio::test]
388    async fn unit_sequencer_layer_flow() {
389        #[cfg(feature = "trace2e_tracing")]
390        crate::trace2e_tracing::init();
391        let mut sequencer = SequencerService::default();
392
393        let process = Resource::new_process_mock(0);
394        let file = Resource::new_file("/tmp/test".to_string());
395
396        assert_eq!(
397            sequencer
398                .call(SequencerRequest::ReserveFlow {
399                    source: process.clone(),
400                    destination: file.clone(),
401                })
402                .await
403                .unwrap(),
404            SequencerResponse::FlowReserved
405        );
406
407        assert_eq!(
408            sequencer
409                .call(SequencerRequest::ReleaseFlow { destination: file.clone() })
410                .await
411                .unwrap(),
412            SequencerResponse::FlowReleased { source: Some(process), destination: Some(file) }
413        );
414    }
415
416    #[tokio::test]
417    async fn unit_sequencer_layer_flow_interference() {
418        #[cfg(feature = "trace2e_tracing")]
419        crate::trace2e_tracing::init();
420        let mut sequencer = SequencerService::default();
421
422        let process = Resource::new_process_mock(0);
423        let file = Resource::new_file("/tmp/test".to_string());
424
425        assert_eq!(
426            sequencer
427                .call(SequencerRequest::ReserveFlow {
428                    source: process.clone(),
429                    destination: file.clone(),
430                })
431                .await
432                .unwrap(),
433            SequencerResponse::FlowReserved
434        );
435
436        assert_eq!(
437            sequencer
438                .call(SequencerRequest::ReserveFlow { source: process, destination: file.clone() })
439                .await
440                .unwrap_err(),
441            TraceabilityError::UnavailableDestination(file)
442        );
443    }
444
445    #[tokio::test]
446    async fn unit_sequencer_layer_flow_circular() {
447        #[cfg(feature = "trace2e_tracing")]
448        crate::trace2e_tracing::init();
449        let mut sequencer = SequencerService::default();
450
451        let process = Resource::new_process_mock(0);
452        let file = Resource::new_file("/tmp/test".to_string());
453
454        assert_eq!(
455            sequencer
456                .call(SequencerRequest::ReserveFlow {
457                    source: process.clone(),
458                    destination: file.clone(),
459                })
460                .await
461                .unwrap(),
462            SequencerResponse::FlowReserved
463        );
464
465        assert_eq!(
466            sequencer
467                .call(SequencerRequest::ReserveFlow {
468                    source: file.clone(),
469                    destination: process.clone(),
470                })
471                .await
472                .unwrap_err(),
473            TraceabilityError::UnavailableSourceAndDestination(file, process)
474        );
475    }
476
477    #[tokio::test]
478    async fn unit_sequencer_layer_flow_sequence() {
479        #[cfg(feature = "trace2e_tracing")]
480        crate::trace2e_tracing::init();
481        let mut sequencer = SequencerService::default();
482
483        let process = Resource::new_process_mock(0);
484        let file1 = Resource::new_file("/tmp/test1".to_string());
485        let file2 = Resource::new_file("/tmp/test2".to_string());
486
487        assert_eq!(
488            sequencer
489                .call(SequencerRequest::ReserveFlow {
490                    source: file1.clone(),
491                    destination: process.clone(),
492                })
493                .await
494                .unwrap(),
495            SequencerResponse::FlowReserved
496        );
497
498        assert_eq!(
499            sequencer
500                .call(SequencerRequest::ReleaseFlow { destination: process.clone() })
501                .await
502                .unwrap(),
503            SequencerResponse::FlowReleased {
504                source: Some(file1),
505                destination: Some(process.clone())
506            }
507        );
508
509        assert_eq!(
510            sequencer
511                .call(SequencerRequest::ReserveFlow {
512                    source: file2.clone(),
513                    destination: process.clone(),
514                })
515                .await
516                .unwrap(),
517            SequencerResponse::FlowReserved
518        );
519
520        assert_eq!(
521            sequencer
522                .call(SequencerRequest::ReleaseFlow { destination: process.clone() })
523                .await
524                .unwrap(),
525            SequencerResponse::FlowReleased { source: Some(file2), destination: Some(process) }
526        );
527    }
528
529    #[tokio::test]
530    async fn unit_sequencer_layer_flow_sequence_interference() {
531        #[cfg(feature = "trace2e_tracing")]
532        crate::trace2e_tracing::init();
533        let mut sequencer = SequencerService::default();
534
535        let process1 = Resource::new_process_mock(1);
536        let process2 = Resource::new_process_mock(2);
537        let file1 = Resource::new_file("/tmp/test1".to_string());
538        let file2 = Resource::new_file("/tmp/test2".to_string());
539
540        assert_eq!(
541            sequencer
542                .call(SequencerRequest::ReserveFlow {
543                    source: file1.clone(),
544                    destination: process1.clone(),
545                })
546                .await
547                .unwrap(),
548            SequencerResponse::FlowReserved
549        );
550
551        // Fails because try get write of write lock
552        assert_eq!(
553            sequencer
554                .call(SequencerRequest::ReserveFlow {
555                    source: file1.clone(),
556                    destination: process1.clone(),
557                })
558                .await
559                .unwrap_err(),
560            TraceabilityError::UnavailableDestination(process1.clone())
561        );
562
563        // Fails because try get write on read lock
564        assert_eq!(
565            sequencer
566                .call(SequencerRequest::ReserveFlow {
567                    source: process2,
568                    destination: file1.clone(),
569                })
570                .await
571                .unwrap_err(),
572            TraceabilityError::UnavailableDestination(file1.clone())
573        );
574
575        // Fails because try get read on write lock
576        assert_eq!(
577            sequencer
578                .call(SequencerRequest::ReserveFlow {
579                    source: process1.clone(),
580                    destination: file2,
581                })
582                .await
583                .unwrap_err(),
584            TraceabilityError::UnavailableSource(process1.clone())
585        );
586
587        // Fails because circular flow (get read on write lock & get write on read lock)
588        assert_eq!(
589            sequencer
590                .call(SequencerRequest::ReserveFlow {
591                    source: process1.clone(),
592                    destination: file1.clone(),
593                })
594                .await
595                .unwrap_err(),
596            TraceabilityError::UnavailableSourceAndDestination(process1, file1)
597        );
598    }
599    #[tokio::test]
600    async fn unit_sequencer_layer_flow_sequence_interference_multiple_share_releases() {
601        #[cfg(feature = "trace2e_tracing")]
602        crate::trace2e_tracing::init();
603        let mut sequencer = SequencerService::default();
604
605        let process = Resource::new_process_mock(0);
606        let file1 = Resource::new_file("/tmp/test1".to_string());
607        let file2 = Resource::new_file("/tmp/test2".to_string());
608        let file3 = Resource::new_file("/tmp/test3".to_string());
609
610        assert_eq!(
611            sequencer
612                .call(SequencerRequest::ReserveFlow {
613                    source: process.clone(),
614                    destination: file1.clone(),
615                })
616                .await
617                .unwrap(),
618            SequencerResponse::FlowReserved
619        );
620
621        assert_eq!(
622            sequencer
623                .call(SequencerRequest::ReserveFlow {
624                    source: process.clone(),
625                    destination: file2.clone(),
626                })
627                .await
628                .unwrap(),
629            SequencerResponse::FlowReserved
630        );
631
632        assert_eq!(
633            sequencer
634                .call(SequencerRequest::ReserveFlow {
635                    source: process.clone(),
636                    destination: file3.clone(),
637                })
638                .await
639                .unwrap(),
640            SequencerResponse::FlowReserved
641        );
642
643        assert_eq!(
644            sequencer
645                .call(SequencerRequest::ReleaseFlow { destination: file2.clone() })
646                .await
647                .unwrap(),
648            SequencerResponse::FlowReleased { source: None, destination: Some(file2) }
649        );
650        assert_eq!(
651            sequencer
652                .call(SequencerRequest::ReleaseFlow { destination: file3.clone() })
653                .await
654                .unwrap(),
655            SequencerResponse::FlowReleased { source: None, destination: Some(file3) }
656        );
657
658        assert_eq!(
659            sequencer
660                .call(SequencerRequest::ReleaseFlow { destination: file1.clone() })
661                .await
662                .unwrap(),
663            SequencerResponse::FlowReleased { source: Some(process), destination: Some(file1) }
664        );
665    }
666
667    #[tokio::test]
668    async fn unit_waiting_queue_layer_flow_interference() {
669        #[cfg(feature = "trace2e_tracing")]
670        crate::trace2e_tracing::init();
671        let mut sequencer = ServiceBuilder::new()
672            .layer(TimeoutLayer::new(Duration::from_millis(1)))
673            .layer(layer_fn(|inner| WaitingQueueService::new(inner, None)))
674            .service(SequencerService::default());
675
676        let process = Resource::new_process_mock(0);
677        let file = Resource::new_file("/tmp/test".to_string());
678
679        assert_eq!(
680            sequencer
681                .call(SequencerRequest::ReserveFlow {
682                    source: process.clone(),
683                    destination: file.clone(),
684                })
685                .await
686                .unwrap(),
687            SequencerResponse::FlowReserved
688        );
689
690        assert!(
691            sequencer
692                .call(SequencerRequest::ReserveFlow { source: process, destination: file })
693                .await
694                .is_err(),
695        );
696    }
697
698    #[tokio::test]
699    async fn unit_waiting_queue_layer_flow_circular() {
700        #[cfg(feature = "trace2e_tracing")]
701        crate::trace2e_tracing::init();
702        let mut sequencer = ServiceBuilder::new()
703            .layer(TimeoutLayer::new(Duration::from_millis(1)))
704            .layer(layer_fn(|inner| WaitingQueueService::new(inner, None)))
705            .service(SequencerService::default());
706
707        let process = Resource::new_process_mock(0);
708        let file = Resource::new_file("/tmp/test".to_string());
709
710        assert_eq!(
711            sequencer
712                .call(SequencerRequest::ReserveFlow {
713                    source: process.clone(),
714                    destination: file.clone(),
715                })
716                .await
717                .unwrap(),
718            SequencerResponse::FlowReserved
719        );
720
721        assert!(
722            sequencer
723                .call(SequencerRequest::ReserveFlow { source: file, destination: process })
724                .await
725                .is_err(),
726        );
727    }
728
729    #[tokio::test]
730    async fn unit_waiting_queue_layer_writers_interference_resolution() {
731        #[cfg(feature = "trace2e_tracing")]
732        crate::trace2e_tracing::init();
733        let mut sequencer = ServiceBuilder::new()
734            .layer(TimeoutLayer::new(Duration::from_millis(10)))
735            .layer(layer_fn(|inner| WaitingQueueService::new(inner, Some(1))))
736            .service(SequencerService::default());
737
738        let process = Resource::new_process_mock(0);
739        let file1 = Resource::new_file("/tmp/test1".to_string());
740        let file2 = Resource::new_file("/tmp/test2".to_string());
741
742        assert_eq!(
743            sequencer
744                .call(SequencerRequest::ReserveFlow {
745                    source: file1.clone(),
746                    destination: process.clone(),
747                })
748                .await
749                .unwrap(),
750            SequencerResponse::FlowReserved
751        );
752
753        let mut sequencer_clone = sequencer.clone();
754        let process_clone = process.clone();
755        let file2_clone = file2.clone();
756        let res = tokio::spawn(async move {
757            sequencer_clone
758                .call(SequencerRequest::ReserveFlow {
759                    source: file2_clone,
760                    destination: process_clone,
761                })
762                .await
763        });
764
765        tokio::time::sleep(Duration::from_millis(5)).await;
766        assert_eq!(
767            sequencer
768                .call(SequencerRequest::ReleaseFlow { destination: process.clone() })
769                .await
770                .unwrap(),
771            SequencerResponse::FlowReleased {
772                source: Some(file1),
773                destination: Some(process.clone())
774            }
775        );
776
777        assert_eq!(res.await.unwrap().unwrap(), SequencerResponse::FlowReserved);
778
779        assert_eq!(
780            sequencer
781                .call(SequencerRequest::ReleaseFlow { destination: process.clone() })
782                .await
783                .unwrap(),
784            SequencerResponse::FlowReleased { source: Some(file2), destination: Some(process) }
785        );
786    }
787    #[tokio::test]
788    async fn unit_waiting_queue_layer_writer_readers_interference_resolution() {
789        #[cfg(feature = "trace2e_tracing")]
790        crate::trace2e_tracing::init();
791        let mut sequencer = ServiceBuilder::new()
792            .layer(TimeoutLayer::new(Duration::from_millis(2)))
793            .layer(layer_fn(|inner| WaitingQueueService::new(inner, Some(1))))
794            .service(SequencerService::default());
795
796        let process1 = Resource::new_process_mock(0);
797        let process2 = Resource::new_process_mock(1);
798        let process3 = Resource::new_process_mock(2);
799        let process4 = Resource::new_process_mock(3);
800        let file = Resource::new_file("/tmp/test".to_string());
801
802        assert_eq!(
803            sequencer
804                .call(SequencerRequest::ReserveFlow {
805                    source: file.clone(),
806                    destination: process1.clone(),
807                })
808                .await
809                .unwrap(),
810            SequencerResponse::FlowReserved
811        );
812
813        assert_eq!(
814            sequencer
815                .call(SequencerRequest::ReserveFlow {
816                    source: file.clone(),
817                    destination: process2.clone(),
818                })
819                .await
820                .unwrap(),
821            SequencerResponse::FlowReserved
822        );
823
824        assert_eq!(
825            sequencer
826                .call(SequencerRequest::ReserveFlow {
827                    source: file.clone(),
828                    destination: process3.clone(),
829                })
830                .await
831                .unwrap(),
832            SequencerResponse::FlowReserved
833        );
834
835        let mut sequencer_clone = sequencer.clone();
836        let file_clone = file.clone();
837        let process4_clone = process4.clone();
838        let res = tokio::spawn(async move {
839            sequencer_clone
840                .call(SequencerRequest::ReserveFlow {
841                    source: process4_clone,
842                    destination: file_clone,
843                })
844                .await
845        });
846
847        tokio::time::sleep(Duration::from_millis(1)).await;
848        assert_eq!(
849            sequencer
850                .call(SequencerRequest::ReleaseFlow { destination: process1.clone() })
851                .await
852                .unwrap(),
853            SequencerResponse::FlowReleased {
854                source: None, // here None means that the source is still reserved as reader
855                destination: Some(process1)
856            }
857        );
858        assert_eq!(
859            sequencer
860                .call(SequencerRequest::ReleaseFlow { destination: process2.clone() })
861                .await
862                .unwrap(),
863            SequencerResponse::FlowReleased {
864                source: None, // here None means that the source is still reserved as reader
865                destination: Some(process2)
866            }
867        );
868        assert_eq!(
869            sequencer
870                .call(SequencerRequest::ReleaseFlow { destination: process3.clone() })
871                .await
872                .unwrap(),
873            SequencerResponse::FlowReleased {
874                // now source is available again
875                source: Some(file.clone()),
876                destination: Some(process3)
877            }
878        );
879
880        // the pending flow is automatically reserved
881        assert_eq!(res.await.unwrap().unwrap(), SequencerResponse::FlowReserved);
882
883        assert_eq!(
884            sequencer
885                .call(SequencerRequest::ReleaseFlow { destination: file.clone() })
886                .await
887                .unwrap(),
888            SequencerResponse::FlowReleased { source: Some(process4), destination: Some(file) }
889        );
890    }
891
892    #[tokio::test]
893    async fn unit_waiting_queue_layer_reader_writer_interference_resolution() {
894        #[cfg(feature = "trace2e_tracing")]
895        crate::trace2e_tracing::init();
896        let mut sequencer = ServiceBuilder::new()
897            .layer(TimeoutLayer::new(Duration::from_millis(2)))
898            .layer(layer_fn(|inner| WaitingQueueService::new(inner, Some(1))))
899            .service(SequencerService::default());
900
901        let process = Resource::new_process_mock(0);
902        let file1 = Resource::new_file("/tmp/test1".to_string());
903        let file2 = Resource::new_file("/tmp/test2".to_string());
904
905        assert_eq!(
906            sequencer
907                .call(SequencerRequest::ReserveFlow {
908                    source: file1.clone(),
909                    destination: process.clone(),
910                })
911                .await
912                .unwrap(),
913            SequencerResponse::FlowReserved
914        );
915
916        let mut sequencer_clone = sequencer.clone();
917        let process_clone = process.clone();
918        let file2_clone = file2.clone();
919        let res = tokio::spawn(async move {
920            sequencer_clone
921                .call(SequencerRequest::ReserveFlow {
922                    source: process_clone,
923                    destination: file2_clone,
924                })
925                .await
926        });
927
928        tokio::time::sleep(Duration::from_millis(1)).await;
929        assert_eq!(
930            sequencer
931                .call(SequencerRequest::ReleaseFlow { destination: process.clone() })
932                .await
933                .unwrap(),
934            SequencerResponse::FlowReleased {
935                source: Some(file1),
936                destination: Some(process.clone())
937            }
938        );
939
940        assert_eq!(res.await.unwrap().unwrap(), SequencerResponse::FlowReserved);
941
942        assert_eq!(
943            sequencer
944                .call(SequencerRequest::ReleaseFlow { destination: file2.clone() })
945                .await
946                .unwrap(),
947            SequencerResponse::FlowReleased { source: Some(process), destination: Some(file2) }
948        );
949    }
950}