trace2e_core/traceability/services/
sequencer.rs

1//! Sequencer service for reserving resources to avoid conflicts while processing flows.
2//!
3//! Includes an optional waiting queue layer and a tower::Service implementation.
4
5use std::{collections::VecDeque, pin::Pin, sync::Arc, task::Poll};
6
7use dashmap::DashMap;
8use tokio::{join, sync::oneshot};
9use tower::Service;
10
11use tracing::{debug, info};
12
13use crate::traceability::{
14    api::types::{SequencerRequest, SequencerResponse},
15    error::TraceabilityError,
16    infrastructure::naming::Resource,
17};
18
19/// Sequencer service for managing resources reservation
20///
21/// This service does not offer flow collision handling, it is the responsibility of the caller.
22/// `WaitingQueueService` can be set up as a layer to handle collisions.
23#[derive(Clone, Default)]
24pub struct SequencerService {
25    flows: Arc<DashMap<Resource, Resource>>,
26}
27
28impl SequencerService {
29    /// Make a flow
30    /// Returns the availability state of the source and destination before the attempt
31    async fn make_flow(
32        &self,
33        source: Resource,
34        destination: Resource,
35    ) -> Result<SequencerResponse, TraceabilityError> {
36        // source is not already reserved by a writer
37        let source_available = !self.flows.contains_key(&source);
38        // destination is not already reserved by a reader or writer
39        let destination_available = !self.flows.contains_key(&destination)
40            && !self.flows.iter().any(|entry| entry.value() == &destination);
41
42        // if both are available, create a flow
43        if source_available && destination_available {
44            self.flows.insert(destination, source);
45            Ok(SequencerResponse::FlowReserved)
46        } else if source_available {
47            Err(TraceabilityError::UnavailableDestination(destination))
48        } else if destination_available {
49            Err(TraceabilityError::UnavailableSource(source))
50        } else {
51            Err(TraceabilityError::UnavailableSourceAndDestination(source, destination))
52        }
53    }
54
55    /// Drop a flow
56    /// Returns the SequencerResponse to the caller
57    async fn drop_flow(
58        &self,
59        destination: &Resource,
60    ) -> Result<SequencerResponse, TraceabilityError> {
61        if let Some((_, source)) = self.flows.remove(destination) {
62            if self.flows.iter().any(|entry| entry.value() == &source) {
63                // Partial release of the flow
64                // source is still reserved as reader, notify the waiting queue of the destination
65                Ok(SequencerResponse::FlowReleased {
66                    source: None,
67                    destination: Some(destination.to_owned()),
68                })
69            } else {
70                // Complete release of the flow
71                // both source and destination are available again, notify both waiting queues
72                Ok(SequencerResponse::FlowReleased {
73                    source: Some(source),
74                    destination: Some(destination.to_owned()),
75                })
76            }
77        } else {
78            // Destination is not reserved, nothing to do
79            Ok(SequencerResponse::FlowReleased { source: None, destination: None })
80        }
81    }
82}
83
84impl Service<SequencerRequest> for SequencerService {
85    type Response = SequencerResponse;
86    type Error = TraceabilityError;
87    type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
88
89    fn poll_ready(&mut self, _: &mut std::task::Context<'_>) -> Poll<Result<(), Self::Error>> {
90        Poll::Ready(Ok(()))
91    }
92
93    fn call(&mut self, request: SequencerRequest) -> Self::Future {
94        let this = self.clone();
95        Box::pin(async move {
96            match request {
97                SequencerRequest::ReserveFlow { source, destination } => {
98                    info!(source = %source, destination = %destination, "[sequencer] ReserveFlow");
99                    this.make_flow(source, destination).await
100                }
101                SequencerRequest::ReleaseFlow { destination } => {
102                    info!(destination = %destination, "[sequencer] ReleaseFlow");
103                    this.drop_flow(&destination).await
104                }
105            }
106        })
107    }
108}
109
110/// Waiting queue service for managing resources reservation
111///
112/// This service is used to handle flow collisions.
113#[derive(Clone)]
114pub struct WaitingQueueService<T> {
115    inner: T,
116    waiting_queue: Arc<DashMap<Resource, VecDeque<oneshot::Sender<()>>>>,
117    max_retries: u32,
118}
119
120impl<T> WaitingQueueService<T> {
121    pub fn new(inner: T, max_retries: Option<u32>) -> Self {
122        Self {
123            inner,
124            waiting_queue: Arc::new(DashMap::new()),
125            // If None, so the waiting queue is not used
126            max_retries: max_retries.unwrap_or_default(),
127        }
128    }
129
130    async fn join_waiting_queue(&self, resource: &Resource) -> oneshot::Receiver<()> {
131        let (tx, rx) = oneshot::channel();
132        if let Some(mut queue) = self.waiting_queue.get_mut(resource) {
133            queue.push_back(tx);
134        } else {
135            let mut queue = VecDeque::new();
136            queue.push_back(tx);
137            self.waiting_queue.insert(resource.to_owned(), queue);
138        }
139        rx
140    }
141
142    async fn notify_waiting_queue(&self, resource: &Option<Resource>) {
143        if let Some(resource) = resource
144            && let Some(mut queue) = self.waiting_queue.get_mut(resource)
145            && let Some(tx) = queue.pop_front()
146        {
147            tx.send(()).unwrap();
148        }
149    }
150}
151
152impl<T> Service<SequencerRequest> for WaitingQueueService<T>
153where
154    T: Service<SequencerRequest, Response = SequencerResponse, Error = TraceabilityError>
155        + Clone
156        + Sync
157        + Send
158        + 'static,
159    T::Future: Send,
160{
161    type Response = T::Response;
162    type Error = T::Error;
163    type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
164
165    fn poll_ready(&mut self, cx: &mut std::task::Context<'_>) -> Poll<Result<(), Self::Error>> {
166        self.inner.poll_ready(cx)
167    }
168
169    fn call(&mut self, req: SequencerRequest) -> Self::Future {
170        let mut inner = self.inner.clone();
171        let max_tries = self.max_retries + 1;
172        let this = self.clone();
173        Box::pin(async move {
174            for _ in 0..max_tries {
175                match inner.call(req.clone()).await {
176                    Ok(SequencerResponse::FlowReserved) => {
177                        debug!("[sequencer] FlowReserved");
178                        return Ok(SequencerResponse::FlowReserved);
179                    }
180                    Ok(SequencerResponse::FlowReleased { source, destination }) => {
181                        join!(
182                            this.notify_waiting_queue(&source),
183                            this.notify_waiting_queue(&destination)
184                        );
185                        debug!(
186                            source = ?source,
187                            destination = ?destination,
188                            "[sequencer] FlowReleased"
189                        );
190                        return Ok(SequencerResponse::FlowReleased { source, destination });
191                    }
192                    Err(TraceabilityError::UnavailableSource(source)) => {
193                        let rx = this.join_waiting_queue(&source).await;
194                        debug!(source = %source, "[sequencer] waiting source");
195                        let _ = rx.await;
196                    }
197                    Err(TraceabilityError::UnavailableDestination(destination)) => {
198                        let rx = this.join_waiting_queue(&destination).await;
199                        debug!(destination = %destination, "[sequencer] waiting destination");
200                        let _ = rx.await;
201                    }
202                    Err(TraceabilityError::UnavailableSourceAndDestination(
203                        source,
204                        destination,
205                    )) => {
206                        let rx1 = this.join_waiting_queue(&source).await;
207                        let rx2 = this.join_waiting_queue(&destination).await;
208                        debug!(
209                            source = %source,
210                            destination = %destination,
211                            "[sequencer] waiting"
212                        );
213                        let (_, _) = join!(rx1, rx2);
214                    }
215                    Err(e) => return Err(e),
216                }
217            }
218            Err(TraceabilityError::ReachedMaxRetriesWaitingQueue)
219        })
220    }
221}
222
223#[cfg(test)]
224mod tests {
225    use std::time::Duration;
226
227    use tower::{ServiceBuilder, layer::layer_fn, timeout::TimeoutLayer};
228
229    use super::*;
230    use crate::traceability::infrastructure::naming::Resource;
231    #[tokio::test]
232    async fn unit_sequencer_impl_flow() {
233        crate::trace2e_tracing::init();
234        let sequencer = SequencerService::default();
235        let process = Resource::new_process_mock(0);
236        let file = Resource::new_file("/tmp/test".to_string());
237        assert_eq!(
238            sequencer.make_flow(process.clone(), file.clone()).await,
239            Ok(SequencerResponse::FlowReserved)
240        );
241        assert_eq!(
242            sequencer.drop_flow(&file).await.unwrap(),
243            SequencerResponse::FlowReleased {
244                source: Some(process.clone()),
245                destination: Some(file.clone())
246            }
247        );
248        assert_eq!(
249            sequencer.make_flow(process.clone(), file.clone()).await,
250            Ok(SequencerResponse::FlowReserved)
251        );
252        assert_eq!(
253            sequencer.drop_flow(&file).await.unwrap(),
254            SequencerResponse::FlowReleased { source: Some(process), destination: Some(file) }
255        );
256    }
257
258    #[tokio::test]
259    async fn unit_sequencer_impl_flow_drop_already_dropped() {
260        crate::trace2e_tracing::init();
261        let sequencer = SequencerService::default();
262        let process = Resource::new_process_mock(0);
263        let file = Resource::new_file("/tmp/test".to_string());
264        assert_eq!(
265            sequencer.make_flow(process.clone(), file.clone()).await,
266            Ok(SequencerResponse::FlowReserved)
267        );
268        assert_eq!(
269            sequencer.drop_flow(&file).await.unwrap(),
270            SequencerResponse::FlowReleased {
271                source: Some(process),
272                destination: Some(file.clone())
273            }
274        );
275        // Already dropped, source is still available so it return true again
276        assert_eq!(
277            sequencer.drop_flow(&file).await.unwrap(),
278            SequencerResponse::FlowReleased { source: None, destination: None }
279        );
280    }
281
282    #[tokio::test]
283    async fn unit_sequencer_impl_flow_readers_drop() {
284        crate::trace2e_tracing::init();
285        let sequencer = SequencerService::default();
286        let process = Resource::new_process_mock(0);
287        let file1 = Resource::new_file("/tmp/test1".to_string());
288        let file2 = Resource::new_file("/tmp/test2".to_string());
289        let file3 = Resource::new_file("/tmp/test3".to_string());
290        let file4 = Resource::new_file("/tmp/test4".to_string());
291        assert_eq!(
292            sequencer.make_flow(process.clone(), file1.clone()).await,
293            Ok(SequencerResponse::FlowReserved)
294        );
295        assert_eq!(
296            sequencer.make_flow(process.clone(), file2.clone()).await,
297            Ok(SequencerResponse::FlowReserved)
298        );
299        assert_eq!(
300            sequencer.make_flow(process.clone(), file3.clone()).await,
301            Ok(SequencerResponse::FlowReserved)
302        );
303
304        // Must fail because process is already reserved 3 times as reader
305        assert_eq!(
306            sequencer.make_flow(file4.clone(), process.clone()).await,
307            Err(TraceabilityError::UnavailableDestination(process.clone()))
308        );
309
310        // Drop 2 reservations
311        assert_eq!(
312            sequencer.drop_flow(&file2).await.unwrap(),
313            SequencerResponse::FlowReleased { source: None, destination: Some(file2) }
314        );
315        assert_eq!(
316            sequencer.drop_flow(&file1).await.unwrap(),
317            SequencerResponse::FlowReleased { source: None, destination: Some(file1) }
318        );
319
320        // Must fail because process is still reserved once as reader
321        assert_eq!(
322            sequencer.make_flow(file4.clone(), process.clone()).await,
323            Err(TraceabilityError::UnavailableDestination(process.clone()))
324        );
325
326        // Drop last reservation
327        assert_eq!(
328            sequencer.drop_flow(&file3).await.unwrap(),
329            SequencerResponse::FlowReleased {
330                source: Some(process.clone()),
331                destination: Some(file3)
332            }
333        );
334
335        // Must succeed because process is not reserved as reader
336        assert_eq!(sequencer.make_flow(file4, process).await, Ok(SequencerResponse::FlowReserved));
337    }
338
339    #[tokio::test]
340    async fn unit_sequencer_impl_flow_interference() {
341        crate::trace2e_tracing::init();
342        let sequencer = SequencerService::default();
343        let process1 = Resource::new_process_mock(1);
344        let process2 = Resource::new_process_mock(2);
345        let file1 = Resource::new_file("/tmp/test1".to_string());
346        let file2 = Resource::new_file("/tmp/test2".to_string());
347
348        assert_eq!(
349            sequencer.make_flow(file1.clone(), process1.clone()).await,
350            Ok(SequencerResponse::FlowReserved)
351        );
352
353        // Fails because try get write of write lock
354        // (this case may be released in the future, this flow already exists)
355        assert_eq!(
356            sequencer.make_flow(file1.clone(), process1.clone()).await,
357            Err(TraceabilityError::UnavailableDestination(process1.clone()))
358        );
359
360        // Fails because try get write on read lock
361        assert_eq!(
362            sequencer.make_flow(process2, file1.clone()).await,
363            Err(TraceabilityError::UnavailableDestination(file1.clone()))
364        );
365
366        // Fails because try get read on write lock
367        assert_eq!(
368            sequencer.make_flow(process1.clone(), file2).await,
369            Err(TraceabilityError::UnavailableSource(process1.clone()))
370        );
371
372        // Fails because circular flow (get read on write lock & get write on read lock)
373        assert_eq!(
374            sequencer.make_flow(process1.clone(), file1.clone()).await,
375            Err(TraceabilityError::UnavailableSourceAndDestination(process1, file1))
376        );
377    }
378
379    #[tokio::test]
380    async fn unit_sequencer_layer_flow() {
381        crate::trace2e_tracing::init();
382        let mut sequencer = SequencerService::default();
383
384        let process = Resource::new_process_mock(0);
385        let file = Resource::new_file("/tmp/test".to_string());
386
387        assert_eq!(
388            sequencer
389                .call(SequencerRequest::ReserveFlow {
390                    source: process.clone(),
391                    destination: file.clone(),
392                })
393                .await
394                .unwrap(),
395            SequencerResponse::FlowReserved
396        );
397
398        assert_eq!(
399            sequencer
400                .call(SequencerRequest::ReleaseFlow { destination: file.clone() })
401                .await
402                .unwrap(),
403            SequencerResponse::FlowReleased { source: Some(process), destination: Some(file) }
404        );
405    }
406
407    #[tokio::test]
408    async fn unit_sequencer_layer_flow_interference() {
409        crate::trace2e_tracing::init();
410        let mut sequencer = SequencerService::default();
411
412        let process = Resource::new_process_mock(0);
413        let file = Resource::new_file("/tmp/test".to_string());
414
415        assert_eq!(
416            sequencer
417                .call(SequencerRequest::ReserveFlow {
418                    source: process.clone(),
419                    destination: file.clone(),
420                })
421                .await
422                .unwrap(),
423            SequencerResponse::FlowReserved
424        );
425
426        assert_eq!(
427            sequencer
428                .call(SequencerRequest::ReserveFlow { source: process, destination: file.clone() })
429                .await
430                .unwrap_err(),
431            TraceabilityError::UnavailableDestination(file)
432        );
433    }
434
435    #[tokio::test]
436    async fn unit_sequencer_layer_flow_circular() {
437        crate::trace2e_tracing::init();
438        let mut sequencer = SequencerService::default();
439
440        let process = Resource::new_process_mock(0);
441        let file = Resource::new_file("/tmp/test".to_string());
442
443        assert_eq!(
444            sequencer
445                .call(SequencerRequest::ReserveFlow {
446                    source: process.clone(),
447                    destination: file.clone(),
448                })
449                .await
450                .unwrap(),
451            SequencerResponse::FlowReserved
452        );
453
454        assert_eq!(
455            sequencer
456                .call(SequencerRequest::ReserveFlow {
457                    source: file.clone(),
458                    destination: process.clone(),
459                })
460                .await
461                .unwrap_err(),
462            TraceabilityError::UnavailableSourceAndDestination(file, process)
463        );
464    }
465
466    #[tokio::test]
467    async fn unit_sequencer_layer_flow_sequence() {
468        crate::trace2e_tracing::init();
469        let mut sequencer = SequencerService::default();
470
471        let process = Resource::new_process_mock(0);
472        let file1 = Resource::new_file("/tmp/test1".to_string());
473        let file2 = Resource::new_file("/tmp/test2".to_string());
474
475        assert_eq!(
476            sequencer
477                .call(SequencerRequest::ReserveFlow {
478                    source: file1.clone(),
479                    destination: process.clone(),
480                })
481                .await
482                .unwrap(),
483            SequencerResponse::FlowReserved
484        );
485
486        assert_eq!(
487            sequencer
488                .call(SequencerRequest::ReleaseFlow { destination: process.clone() })
489                .await
490                .unwrap(),
491            SequencerResponse::FlowReleased {
492                source: Some(file1),
493                destination: Some(process.clone())
494            }
495        );
496
497        assert_eq!(
498            sequencer
499                .call(SequencerRequest::ReserveFlow {
500                    source: file2.clone(),
501                    destination: process.clone(),
502                })
503                .await
504                .unwrap(),
505            SequencerResponse::FlowReserved
506        );
507
508        assert_eq!(
509            sequencer
510                .call(SequencerRequest::ReleaseFlow { destination: process.clone() })
511                .await
512                .unwrap(),
513            SequencerResponse::FlowReleased { source: Some(file2), destination: Some(process) }
514        );
515    }
516
517    #[tokio::test]
518    async fn unit_sequencer_layer_flow_sequence_interference() {
519        crate::trace2e_tracing::init();
520        let mut sequencer = SequencerService::default();
521
522        let process1 = Resource::new_process_mock(1);
523        let process2 = Resource::new_process_mock(2);
524        let file1 = Resource::new_file("/tmp/test1".to_string());
525        let file2 = Resource::new_file("/tmp/test2".to_string());
526
527        assert_eq!(
528            sequencer
529                .call(SequencerRequest::ReserveFlow {
530                    source: file1.clone(),
531                    destination: process1.clone(),
532                })
533                .await
534                .unwrap(),
535            SequencerResponse::FlowReserved
536        );
537
538        // Fails because try get write of write lock
539        assert_eq!(
540            sequencer
541                .call(SequencerRequest::ReserveFlow {
542                    source: file1.clone(),
543                    destination: process1.clone(),
544                })
545                .await
546                .unwrap_err(),
547            TraceabilityError::UnavailableDestination(process1.clone())
548        );
549
550        // Fails because try get write on read lock
551        assert_eq!(
552            sequencer
553                .call(SequencerRequest::ReserveFlow {
554                    source: process2,
555                    destination: file1.clone(),
556                })
557                .await
558                .unwrap_err(),
559            TraceabilityError::UnavailableDestination(file1.clone())
560        );
561
562        // Fails because try get read on write lock
563        assert_eq!(
564            sequencer
565                .call(SequencerRequest::ReserveFlow {
566                    source: process1.clone(),
567                    destination: file2,
568                })
569                .await
570                .unwrap_err(),
571            TraceabilityError::UnavailableSource(process1.clone())
572        );
573
574        // Fails because circular flow (get read on write lock & get write on read lock)
575        assert_eq!(
576            sequencer
577                .call(SequencerRequest::ReserveFlow {
578                    source: process1.clone(),
579                    destination: file1.clone(),
580                })
581                .await
582                .unwrap_err(),
583            TraceabilityError::UnavailableSourceAndDestination(process1, file1)
584        );
585    }
586    #[tokio::test]
587    async fn unit_sequencer_layer_flow_sequence_interference_multiple_share_releases() {
588        crate::trace2e_tracing::init();
589        let mut sequencer = SequencerService::default();
590
591        let process = Resource::new_process_mock(0);
592        let file1 = Resource::new_file("/tmp/test1".to_string());
593        let file2 = Resource::new_file("/tmp/test2".to_string());
594        let file3 = Resource::new_file("/tmp/test3".to_string());
595
596        assert_eq!(
597            sequencer
598                .call(SequencerRequest::ReserveFlow {
599                    source: process.clone(),
600                    destination: file1.clone(),
601                })
602                .await
603                .unwrap(),
604            SequencerResponse::FlowReserved
605        );
606
607        assert_eq!(
608            sequencer
609                .call(SequencerRequest::ReserveFlow {
610                    source: process.clone(),
611                    destination: file2.clone(),
612                })
613                .await
614                .unwrap(),
615            SequencerResponse::FlowReserved
616        );
617
618        assert_eq!(
619            sequencer
620                .call(SequencerRequest::ReserveFlow {
621                    source: process.clone(),
622                    destination: file3.clone(),
623                })
624                .await
625                .unwrap(),
626            SequencerResponse::FlowReserved
627        );
628
629        assert_eq!(
630            sequencer
631                .call(SequencerRequest::ReleaseFlow { destination: file2.clone() })
632                .await
633                .unwrap(),
634            SequencerResponse::FlowReleased { source: None, destination: Some(file2) }
635        );
636        assert_eq!(
637            sequencer
638                .call(SequencerRequest::ReleaseFlow { destination: file3.clone() })
639                .await
640                .unwrap(),
641            SequencerResponse::FlowReleased { source: None, destination: Some(file3) }
642        );
643
644        assert_eq!(
645            sequencer
646                .call(SequencerRequest::ReleaseFlow { destination: file1.clone() })
647                .await
648                .unwrap(),
649            SequencerResponse::FlowReleased { source: Some(process), destination: Some(file1) }
650        );
651    }
652
653    #[tokio::test]
654    async fn unit_waiting_queue_layer_flow_interference() {
655        crate::trace2e_tracing::init();
656        let mut sequencer = ServiceBuilder::new()
657            .layer(TimeoutLayer::new(Duration::from_millis(1)))
658            .layer(layer_fn(|inner| WaitingQueueService::new(inner, None)))
659            .service(SequencerService::default());
660
661        let process = Resource::new_process_mock(0);
662        let file = Resource::new_file("/tmp/test".to_string());
663
664        assert_eq!(
665            sequencer
666                .call(SequencerRequest::ReserveFlow {
667                    source: process.clone(),
668                    destination: file.clone(),
669                })
670                .await
671                .unwrap(),
672            SequencerResponse::FlowReserved
673        );
674
675        assert!(
676            sequencer
677                .call(SequencerRequest::ReserveFlow { source: process, destination: file })
678                .await
679                .is_err(),
680        );
681    }
682
683    #[tokio::test]
684    async fn unit_waiting_queue_layer_flow_circular() {
685        crate::trace2e_tracing::init();
686        let mut sequencer = ServiceBuilder::new()
687            .layer(TimeoutLayer::new(Duration::from_millis(1)))
688            .layer(layer_fn(|inner| WaitingQueueService::new(inner, None)))
689            .service(SequencerService::default());
690
691        let process = Resource::new_process_mock(0);
692        let file = Resource::new_file("/tmp/test".to_string());
693
694        assert_eq!(
695            sequencer
696                .call(SequencerRequest::ReserveFlow {
697                    source: process.clone(),
698                    destination: file.clone(),
699                })
700                .await
701                .unwrap(),
702            SequencerResponse::FlowReserved
703        );
704
705        assert!(
706            sequencer
707                .call(SequencerRequest::ReserveFlow { source: file, destination: process })
708                .await
709                .is_err(),
710        );
711    }
712
713    #[tokio::test]
714    async fn unit_waiting_queue_layer_writers_interference_resolution() {
715        crate::trace2e_tracing::init();
716        let mut sequencer = ServiceBuilder::new()
717            .layer(TimeoutLayer::new(Duration::from_millis(10)))
718            .layer(layer_fn(|inner| WaitingQueueService::new(inner, Some(1))))
719            .service(SequencerService::default());
720
721        let process = Resource::new_process_mock(0);
722        let file1 = Resource::new_file("/tmp/test1".to_string());
723        let file2 = Resource::new_file("/tmp/test2".to_string());
724
725        assert_eq!(
726            sequencer
727                .call(SequencerRequest::ReserveFlow {
728                    source: file1.clone(),
729                    destination: process.clone(),
730                })
731                .await
732                .unwrap(),
733            SequencerResponse::FlowReserved
734        );
735
736        let mut sequencer_clone = sequencer.clone();
737        let process_clone = process.clone();
738        let file2_clone = file2.clone();
739        let res = tokio::spawn(async move {
740            sequencer_clone
741                .call(SequencerRequest::ReserveFlow {
742                    source: file2_clone,
743                    destination: process_clone,
744                })
745                .await
746        });
747
748        tokio::time::sleep(Duration::from_millis(5)).await;
749        assert_eq!(
750            sequencer
751                .call(SequencerRequest::ReleaseFlow { destination: process.clone() })
752                .await
753                .unwrap(),
754            SequencerResponse::FlowReleased {
755                source: Some(file1),
756                destination: Some(process.clone())
757            }
758        );
759
760        assert_eq!(res.await.unwrap().unwrap(), SequencerResponse::FlowReserved);
761
762        assert_eq!(
763            sequencer
764                .call(SequencerRequest::ReleaseFlow { destination: process.clone() })
765                .await
766                .unwrap(),
767            SequencerResponse::FlowReleased { source: Some(file2), destination: Some(process) }
768        );
769    }
770    #[tokio::test]
771    async fn unit_waiting_queue_layer_writer_readers_interference_resolution() {
772        crate::trace2e_tracing::init();
773        let mut sequencer = ServiceBuilder::new()
774            .layer(TimeoutLayer::new(Duration::from_millis(2)))
775            .layer(layer_fn(|inner| WaitingQueueService::new(inner, Some(1))))
776            .service(SequencerService::default());
777
778        let process1 = Resource::new_process_mock(0);
779        let process2 = Resource::new_process_mock(1);
780        let process3 = Resource::new_process_mock(2);
781        let process4 = Resource::new_process_mock(3);
782        let file = Resource::new_file("/tmp/test".to_string());
783
784        assert_eq!(
785            sequencer
786                .call(SequencerRequest::ReserveFlow {
787                    source: file.clone(),
788                    destination: process1.clone(),
789                })
790                .await
791                .unwrap(),
792            SequencerResponse::FlowReserved
793        );
794
795        assert_eq!(
796            sequencer
797                .call(SequencerRequest::ReserveFlow {
798                    source: file.clone(),
799                    destination: process2.clone(),
800                })
801                .await
802                .unwrap(),
803            SequencerResponse::FlowReserved
804        );
805
806        assert_eq!(
807            sequencer
808                .call(SequencerRequest::ReserveFlow {
809                    source: file.clone(),
810                    destination: process3.clone(),
811                })
812                .await
813                .unwrap(),
814            SequencerResponse::FlowReserved
815        );
816
817        let mut sequencer_clone = sequencer.clone();
818        let file_clone = file.clone();
819        let process4_clone = process4.clone();
820        let res = tokio::spawn(async move {
821            sequencer_clone
822                .call(SequencerRequest::ReserveFlow {
823                    source: process4_clone,
824                    destination: file_clone,
825                })
826                .await
827        });
828
829        tokio::time::sleep(Duration::from_millis(1)).await;
830        assert_eq!(
831            sequencer
832                .call(SequencerRequest::ReleaseFlow { destination: process1.clone() })
833                .await
834                .unwrap(),
835            SequencerResponse::FlowReleased {
836                source: None, // here None means that the source is still reserved as reader
837                destination: Some(process1)
838            }
839        );
840        assert_eq!(
841            sequencer
842                .call(SequencerRequest::ReleaseFlow { destination: process2.clone() })
843                .await
844                .unwrap(),
845            SequencerResponse::FlowReleased {
846                source: None, // here None means that the source is still reserved as reader
847                destination: Some(process2)
848            }
849        );
850        assert_eq!(
851            sequencer
852                .call(SequencerRequest::ReleaseFlow { destination: process3.clone() })
853                .await
854                .unwrap(),
855            SequencerResponse::FlowReleased {
856                // now source is available again
857                source: Some(file.clone()),
858                destination: Some(process3)
859            }
860        );
861
862        // the pending flow is automatically reserved
863        assert_eq!(res.await.unwrap().unwrap(), SequencerResponse::FlowReserved);
864
865        assert_eq!(
866            sequencer
867                .call(SequencerRequest::ReleaseFlow { destination: file.clone() })
868                .await
869                .unwrap(),
870            SequencerResponse::FlowReleased { source: Some(process4), destination: Some(file) }
871        );
872    }
873
874    #[tokio::test]
875    async fn unit_waiting_queue_layer_reader_writer_interference_resolution() {
876        crate::trace2e_tracing::init();
877        let mut sequencer = ServiceBuilder::new()
878            .layer(TimeoutLayer::new(Duration::from_millis(2)))
879            .layer(layer_fn(|inner| WaitingQueueService::new(inner, Some(1))))
880            .service(SequencerService::default());
881
882        let process = Resource::new_process_mock(0);
883        let file1 = Resource::new_file("/tmp/test1".to_string());
884        let file2 = Resource::new_file("/tmp/test2".to_string());
885
886        assert_eq!(
887            sequencer
888                .call(SequencerRequest::ReserveFlow {
889                    source: file1.clone(),
890                    destination: process.clone(),
891                })
892                .await
893                .unwrap(),
894            SequencerResponse::FlowReserved
895        );
896
897        let mut sequencer_clone = sequencer.clone();
898        let process_clone = process.clone();
899        let file2_clone = file2.clone();
900        let res = tokio::spawn(async move {
901            sequencer_clone
902                .call(SequencerRequest::ReserveFlow {
903                    source: process_clone,
904                    destination: file2_clone,
905                })
906                .await
907        });
908
909        tokio::time::sleep(Duration::from_millis(1)).await;
910        assert_eq!(
911            sequencer
912                .call(SequencerRequest::ReleaseFlow { destination: process.clone() })
913                .await
914                .unwrap(),
915            SequencerResponse::FlowReleased {
916                source: Some(file1),
917                destination: Some(process.clone())
918            }
919        );
920
921        assert_eq!(res.await.unwrap().unwrap(), SequencerResponse::FlowReserved);
922
923        assert_eq!(
924            sequencer
925                .call(SequencerRequest::ReleaseFlow { destination: file2.clone() })
926                .await
927                .unwrap(),
928            SequencerResponse::FlowReleased { source: Some(process), destination: Some(file2) }
929        );
930    }
931}