1use std::{collections::VecDeque, pin::Pin, sync::Arc, task::Poll};
6
7use dashmap::DashMap;
8use tokio::{join, sync::oneshot};
9use tower::Service;
10
11use tracing::{debug, info};
12
13use crate::traceability::{
14 api::types::{SequencerRequest, SequencerResponse},
15 error::TraceabilityError,
16 infrastructure::naming::Resource,
17};
18
19#[derive(Clone, Default)]
24pub struct SequencerService {
25 flows: Arc<DashMap<Resource, Resource>>,
26}
27
28impl SequencerService {
29 async fn make_flow(
32 &self,
33 source: Resource,
34 destination: Resource,
35 ) -> Result<SequencerResponse, TraceabilityError> {
36 let source_available = !self.flows.contains_key(&source);
38 let destination_available = !self.flows.contains_key(&destination)
40 && !self.flows.iter().any(|entry| entry.value() == &destination);
41
42 if source_available && destination_available {
44 self.flows.insert(destination, source);
45 Ok(SequencerResponse::FlowReserved)
46 } else if source_available {
47 Err(TraceabilityError::UnavailableDestination(destination))
48 } else if destination_available {
49 Err(TraceabilityError::UnavailableSource(source))
50 } else {
51 Err(TraceabilityError::UnavailableSourceAndDestination(source, destination))
52 }
53 }
54
55 async fn drop_flow(
58 &self,
59 destination: &Resource,
60 ) -> Result<SequencerResponse, TraceabilityError> {
61 if let Some((_, source)) = self.flows.remove(destination) {
62 if self.flows.iter().any(|entry| entry.value() == &source) {
63 Ok(SequencerResponse::FlowReleased {
66 source: None,
67 destination: Some(destination.to_owned()),
68 })
69 } else {
70 Ok(SequencerResponse::FlowReleased {
73 source: Some(source),
74 destination: Some(destination.to_owned()),
75 })
76 }
77 } else {
78 Ok(SequencerResponse::FlowReleased { source: None, destination: None })
80 }
81 }
82}
83
84impl Service<SequencerRequest> for SequencerService {
85 type Response = SequencerResponse;
86 type Error = TraceabilityError;
87 type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
88
89 fn poll_ready(&mut self, _: &mut std::task::Context<'_>) -> Poll<Result<(), Self::Error>> {
90 Poll::Ready(Ok(()))
91 }
92
93 fn call(&mut self, request: SequencerRequest) -> Self::Future {
94 let this = self.clone();
95 Box::pin(async move {
96 match request {
97 SequencerRequest::ReserveFlow { source, destination } => {
98 info!(source = %source, destination = %destination, "[sequencer] ReserveFlow");
99 this.make_flow(source, destination).await
100 }
101 SequencerRequest::ReleaseFlow { destination } => {
102 info!(destination = %destination, "[sequencer] ReleaseFlow");
103 this.drop_flow(&destination).await
104 }
105 }
106 })
107 }
108}
109
110#[derive(Clone)]
114pub struct WaitingQueueService<T> {
115 inner: T,
116 waiting_queue: Arc<DashMap<Resource, VecDeque<oneshot::Sender<()>>>>,
117 max_retries: u32,
118}
119
120impl<T> WaitingQueueService<T> {
121 pub fn new(inner: T, max_retries: Option<u32>) -> Self {
122 Self {
123 inner,
124 waiting_queue: Arc::new(DashMap::new()),
125 max_retries: max_retries.unwrap_or_default(),
127 }
128 }
129
130 async fn join_waiting_queue(&self, resource: &Resource) -> oneshot::Receiver<()> {
131 let (tx, rx) = oneshot::channel();
132 if let Some(mut queue) = self.waiting_queue.get_mut(resource) {
133 queue.push_back(tx);
134 } else {
135 let mut queue = VecDeque::new();
136 queue.push_back(tx);
137 self.waiting_queue.insert(resource.to_owned(), queue);
138 }
139 rx
140 }
141
142 async fn notify_waiting_queue(&self, resource: &Option<Resource>) {
143 if let Some(resource) = resource
144 && let Some(mut queue) = self.waiting_queue.get_mut(resource)
145 && let Some(tx) = queue.pop_front()
146 {
147 tx.send(()).unwrap();
148 }
149 }
150}
151
152impl<T> Service<SequencerRequest> for WaitingQueueService<T>
153where
154 T: Service<SequencerRequest, Response = SequencerResponse, Error = TraceabilityError>
155 + Clone
156 + Sync
157 + Send
158 + 'static,
159 T::Future: Send,
160{
161 type Response = T::Response;
162 type Error = T::Error;
163 type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
164
165 fn poll_ready(&mut self, cx: &mut std::task::Context<'_>) -> Poll<Result<(), Self::Error>> {
166 self.inner.poll_ready(cx)
167 }
168
169 fn call(&mut self, req: SequencerRequest) -> Self::Future {
170 let mut inner = self.inner.clone();
171 let max_tries = self.max_retries + 1;
172 let this = self.clone();
173 Box::pin(async move {
174 for _ in 0..max_tries {
175 match inner.call(req.clone()).await {
176 Ok(SequencerResponse::FlowReserved) => {
177 debug!("[sequencer] FlowReserved");
178 return Ok(SequencerResponse::FlowReserved);
179 }
180 Ok(SequencerResponse::FlowReleased { source, destination }) => {
181 join!(
182 this.notify_waiting_queue(&source),
183 this.notify_waiting_queue(&destination)
184 );
185 debug!(
186 source = ?source,
187 destination = ?destination,
188 "[sequencer] FlowReleased"
189 );
190 return Ok(SequencerResponse::FlowReleased { source, destination });
191 }
192 Err(TraceabilityError::UnavailableSource(source)) => {
193 let rx = this.join_waiting_queue(&source).await;
194 debug!(source = %source, "[sequencer] waiting source");
195 let _ = rx.await;
196 }
197 Err(TraceabilityError::UnavailableDestination(destination)) => {
198 let rx = this.join_waiting_queue(&destination).await;
199 debug!(destination = %destination, "[sequencer] waiting destination");
200 let _ = rx.await;
201 }
202 Err(TraceabilityError::UnavailableSourceAndDestination(
203 source,
204 destination,
205 )) => {
206 let rx1 = this.join_waiting_queue(&source).await;
207 let rx2 = this.join_waiting_queue(&destination).await;
208 debug!(
209 source = %source,
210 destination = %destination,
211 "[sequencer] waiting"
212 );
213 let (_, _) = join!(rx1, rx2);
214 }
215 Err(e) => return Err(e),
216 }
217 }
218 Err(TraceabilityError::ReachedMaxRetriesWaitingQueue)
219 })
220 }
221}
222
223#[cfg(test)]
224mod tests {
225 use std::time::Duration;
226
227 use tower::{ServiceBuilder, layer::layer_fn, timeout::TimeoutLayer};
228
229 use super::*;
230 use crate::traceability::infrastructure::naming::Resource;
231 #[tokio::test]
232 async fn unit_sequencer_impl_flow() {
233 crate::trace2e_tracing::init();
234 let sequencer = SequencerService::default();
235 let process = Resource::new_process_mock(0);
236 let file = Resource::new_file("/tmp/test".to_string());
237 assert_eq!(
238 sequencer.make_flow(process.clone(), file.clone()).await,
239 Ok(SequencerResponse::FlowReserved)
240 );
241 assert_eq!(
242 sequencer.drop_flow(&file).await.unwrap(),
243 SequencerResponse::FlowReleased {
244 source: Some(process.clone()),
245 destination: Some(file.clone())
246 }
247 );
248 assert_eq!(
249 sequencer.make_flow(process.clone(), file.clone()).await,
250 Ok(SequencerResponse::FlowReserved)
251 );
252 assert_eq!(
253 sequencer.drop_flow(&file).await.unwrap(),
254 SequencerResponse::FlowReleased { source: Some(process), destination: Some(file) }
255 );
256 }
257
258 #[tokio::test]
259 async fn unit_sequencer_impl_flow_drop_already_dropped() {
260 crate::trace2e_tracing::init();
261 let sequencer = SequencerService::default();
262 let process = Resource::new_process_mock(0);
263 let file = Resource::new_file("/tmp/test".to_string());
264 assert_eq!(
265 sequencer.make_flow(process.clone(), file.clone()).await,
266 Ok(SequencerResponse::FlowReserved)
267 );
268 assert_eq!(
269 sequencer.drop_flow(&file).await.unwrap(),
270 SequencerResponse::FlowReleased {
271 source: Some(process),
272 destination: Some(file.clone())
273 }
274 );
275 assert_eq!(
277 sequencer.drop_flow(&file).await.unwrap(),
278 SequencerResponse::FlowReleased { source: None, destination: None }
279 );
280 }
281
282 #[tokio::test]
283 async fn unit_sequencer_impl_flow_readers_drop() {
284 crate::trace2e_tracing::init();
285 let sequencer = SequencerService::default();
286 let process = Resource::new_process_mock(0);
287 let file1 = Resource::new_file("/tmp/test1".to_string());
288 let file2 = Resource::new_file("/tmp/test2".to_string());
289 let file3 = Resource::new_file("/tmp/test3".to_string());
290 let file4 = Resource::new_file("/tmp/test4".to_string());
291 assert_eq!(
292 sequencer.make_flow(process.clone(), file1.clone()).await,
293 Ok(SequencerResponse::FlowReserved)
294 );
295 assert_eq!(
296 sequencer.make_flow(process.clone(), file2.clone()).await,
297 Ok(SequencerResponse::FlowReserved)
298 );
299 assert_eq!(
300 sequencer.make_flow(process.clone(), file3.clone()).await,
301 Ok(SequencerResponse::FlowReserved)
302 );
303
304 assert_eq!(
306 sequencer.make_flow(file4.clone(), process.clone()).await,
307 Err(TraceabilityError::UnavailableDestination(process.clone()))
308 );
309
310 assert_eq!(
312 sequencer.drop_flow(&file2).await.unwrap(),
313 SequencerResponse::FlowReleased { source: None, destination: Some(file2) }
314 );
315 assert_eq!(
316 sequencer.drop_flow(&file1).await.unwrap(),
317 SequencerResponse::FlowReleased { source: None, destination: Some(file1) }
318 );
319
320 assert_eq!(
322 sequencer.make_flow(file4.clone(), process.clone()).await,
323 Err(TraceabilityError::UnavailableDestination(process.clone()))
324 );
325
326 assert_eq!(
328 sequencer.drop_flow(&file3).await.unwrap(),
329 SequencerResponse::FlowReleased {
330 source: Some(process.clone()),
331 destination: Some(file3)
332 }
333 );
334
335 assert_eq!(sequencer.make_flow(file4, process).await, Ok(SequencerResponse::FlowReserved));
337 }
338
339 #[tokio::test]
340 async fn unit_sequencer_impl_flow_interference() {
341 crate::trace2e_tracing::init();
342 let sequencer = SequencerService::default();
343 let process1 = Resource::new_process_mock(1);
344 let process2 = Resource::new_process_mock(2);
345 let file1 = Resource::new_file("/tmp/test1".to_string());
346 let file2 = Resource::new_file("/tmp/test2".to_string());
347
348 assert_eq!(
349 sequencer.make_flow(file1.clone(), process1.clone()).await,
350 Ok(SequencerResponse::FlowReserved)
351 );
352
353 assert_eq!(
356 sequencer.make_flow(file1.clone(), process1.clone()).await,
357 Err(TraceabilityError::UnavailableDestination(process1.clone()))
358 );
359
360 assert_eq!(
362 sequencer.make_flow(process2, file1.clone()).await,
363 Err(TraceabilityError::UnavailableDestination(file1.clone()))
364 );
365
366 assert_eq!(
368 sequencer.make_flow(process1.clone(), file2).await,
369 Err(TraceabilityError::UnavailableSource(process1.clone()))
370 );
371
372 assert_eq!(
374 sequencer.make_flow(process1.clone(), file1.clone()).await,
375 Err(TraceabilityError::UnavailableSourceAndDestination(process1, file1))
376 );
377 }
378
379 #[tokio::test]
380 async fn unit_sequencer_layer_flow() {
381 crate::trace2e_tracing::init();
382 let mut sequencer = SequencerService::default();
383
384 let process = Resource::new_process_mock(0);
385 let file = Resource::new_file("/tmp/test".to_string());
386
387 assert_eq!(
388 sequencer
389 .call(SequencerRequest::ReserveFlow {
390 source: process.clone(),
391 destination: file.clone(),
392 })
393 .await
394 .unwrap(),
395 SequencerResponse::FlowReserved
396 );
397
398 assert_eq!(
399 sequencer
400 .call(SequencerRequest::ReleaseFlow { destination: file.clone() })
401 .await
402 .unwrap(),
403 SequencerResponse::FlowReleased { source: Some(process), destination: Some(file) }
404 );
405 }
406
407 #[tokio::test]
408 async fn unit_sequencer_layer_flow_interference() {
409 crate::trace2e_tracing::init();
410 let mut sequencer = SequencerService::default();
411
412 let process = Resource::new_process_mock(0);
413 let file = Resource::new_file("/tmp/test".to_string());
414
415 assert_eq!(
416 sequencer
417 .call(SequencerRequest::ReserveFlow {
418 source: process.clone(),
419 destination: file.clone(),
420 })
421 .await
422 .unwrap(),
423 SequencerResponse::FlowReserved
424 );
425
426 assert_eq!(
427 sequencer
428 .call(SequencerRequest::ReserveFlow { source: process, destination: file.clone() })
429 .await
430 .unwrap_err(),
431 TraceabilityError::UnavailableDestination(file)
432 );
433 }
434
435 #[tokio::test]
436 async fn unit_sequencer_layer_flow_circular() {
437 crate::trace2e_tracing::init();
438 let mut sequencer = SequencerService::default();
439
440 let process = Resource::new_process_mock(0);
441 let file = Resource::new_file("/tmp/test".to_string());
442
443 assert_eq!(
444 sequencer
445 .call(SequencerRequest::ReserveFlow {
446 source: process.clone(),
447 destination: file.clone(),
448 })
449 .await
450 .unwrap(),
451 SequencerResponse::FlowReserved
452 );
453
454 assert_eq!(
455 sequencer
456 .call(SequencerRequest::ReserveFlow {
457 source: file.clone(),
458 destination: process.clone(),
459 })
460 .await
461 .unwrap_err(),
462 TraceabilityError::UnavailableSourceAndDestination(file, process)
463 );
464 }
465
466 #[tokio::test]
467 async fn unit_sequencer_layer_flow_sequence() {
468 crate::trace2e_tracing::init();
469 let mut sequencer = SequencerService::default();
470
471 let process = Resource::new_process_mock(0);
472 let file1 = Resource::new_file("/tmp/test1".to_string());
473 let file2 = Resource::new_file("/tmp/test2".to_string());
474
475 assert_eq!(
476 sequencer
477 .call(SequencerRequest::ReserveFlow {
478 source: file1.clone(),
479 destination: process.clone(),
480 })
481 .await
482 .unwrap(),
483 SequencerResponse::FlowReserved
484 );
485
486 assert_eq!(
487 sequencer
488 .call(SequencerRequest::ReleaseFlow { destination: process.clone() })
489 .await
490 .unwrap(),
491 SequencerResponse::FlowReleased {
492 source: Some(file1),
493 destination: Some(process.clone())
494 }
495 );
496
497 assert_eq!(
498 sequencer
499 .call(SequencerRequest::ReserveFlow {
500 source: file2.clone(),
501 destination: process.clone(),
502 })
503 .await
504 .unwrap(),
505 SequencerResponse::FlowReserved
506 );
507
508 assert_eq!(
509 sequencer
510 .call(SequencerRequest::ReleaseFlow { destination: process.clone() })
511 .await
512 .unwrap(),
513 SequencerResponse::FlowReleased { source: Some(file2), destination: Some(process) }
514 );
515 }
516
517 #[tokio::test]
518 async fn unit_sequencer_layer_flow_sequence_interference() {
519 crate::trace2e_tracing::init();
520 let mut sequencer = SequencerService::default();
521
522 let process1 = Resource::new_process_mock(1);
523 let process2 = Resource::new_process_mock(2);
524 let file1 = Resource::new_file("/tmp/test1".to_string());
525 let file2 = Resource::new_file("/tmp/test2".to_string());
526
527 assert_eq!(
528 sequencer
529 .call(SequencerRequest::ReserveFlow {
530 source: file1.clone(),
531 destination: process1.clone(),
532 })
533 .await
534 .unwrap(),
535 SequencerResponse::FlowReserved
536 );
537
538 assert_eq!(
540 sequencer
541 .call(SequencerRequest::ReserveFlow {
542 source: file1.clone(),
543 destination: process1.clone(),
544 })
545 .await
546 .unwrap_err(),
547 TraceabilityError::UnavailableDestination(process1.clone())
548 );
549
550 assert_eq!(
552 sequencer
553 .call(SequencerRequest::ReserveFlow {
554 source: process2,
555 destination: file1.clone(),
556 })
557 .await
558 .unwrap_err(),
559 TraceabilityError::UnavailableDestination(file1.clone())
560 );
561
562 assert_eq!(
564 sequencer
565 .call(SequencerRequest::ReserveFlow {
566 source: process1.clone(),
567 destination: file2,
568 })
569 .await
570 .unwrap_err(),
571 TraceabilityError::UnavailableSource(process1.clone())
572 );
573
574 assert_eq!(
576 sequencer
577 .call(SequencerRequest::ReserveFlow {
578 source: process1.clone(),
579 destination: file1.clone(),
580 })
581 .await
582 .unwrap_err(),
583 TraceabilityError::UnavailableSourceAndDestination(process1, file1)
584 );
585 }
586 #[tokio::test]
587 async fn unit_sequencer_layer_flow_sequence_interference_multiple_share_releases() {
588 crate::trace2e_tracing::init();
589 let mut sequencer = SequencerService::default();
590
591 let process = Resource::new_process_mock(0);
592 let file1 = Resource::new_file("/tmp/test1".to_string());
593 let file2 = Resource::new_file("/tmp/test2".to_string());
594 let file3 = Resource::new_file("/tmp/test3".to_string());
595
596 assert_eq!(
597 sequencer
598 .call(SequencerRequest::ReserveFlow {
599 source: process.clone(),
600 destination: file1.clone(),
601 })
602 .await
603 .unwrap(),
604 SequencerResponse::FlowReserved
605 );
606
607 assert_eq!(
608 sequencer
609 .call(SequencerRequest::ReserveFlow {
610 source: process.clone(),
611 destination: file2.clone(),
612 })
613 .await
614 .unwrap(),
615 SequencerResponse::FlowReserved
616 );
617
618 assert_eq!(
619 sequencer
620 .call(SequencerRequest::ReserveFlow {
621 source: process.clone(),
622 destination: file3.clone(),
623 })
624 .await
625 .unwrap(),
626 SequencerResponse::FlowReserved
627 );
628
629 assert_eq!(
630 sequencer
631 .call(SequencerRequest::ReleaseFlow { destination: file2.clone() })
632 .await
633 .unwrap(),
634 SequencerResponse::FlowReleased { source: None, destination: Some(file2) }
635 );
636 assert_eq!(
637 sequencer
638 .call(SequencerRequest::ReleaseFlow { destination: file3.clone() })
639 .await
640 .unwrap(),
641 SequencerResponse::FlowReleased { source: None, destination: Some(file3) }
642 );
643
644 assert_eq!(
645 sequencer
646 .call(SequencerRequest::ReleaseFlow { destination: file1.clone() })
647 .await
648 .unwrap(),
649 SequencerResponse::FlowReleased { source: Some(process), destination: Some(file1) }
650 );
651 }
652
653 #[tokio::test]
654 async fn unit_waiting_queue_layer_flow_interference() {
655 crate::trace2e_tracing::init();
656 let mut sequencer = ServiceBuilder::new()
657 .layer(TimeoutLayer::new(Duration::from_millis(1)))
658 .layer(layer_fn(|inner| WaitingQueueService::new(inner, None)))
659 .service(SequencerService::default());
660
661 let process = Resource::new_process_mock(0);
662 let file = Resource::new_file("/tmp/test".to_string());
663
664 assert_eq!(
665 sequencer
666 .call(SequencerRequest::ReserveFlow {
667 source: process.clone(),
668 destination: file.clone(),
669 })
670 .await
671 .unwrap(),
672 SequencerResponse::FlowReserved
673 );
674
675 assert!(
676 sequencer
677 .call(SequencerRequest::ReserveFlow { source: process, destination: file })
678 .await
679 .is_err(),
680 );
681 }
682
683 #[tokio::test]
684 async fn unit_waiting_queue_layer_flow_circular() {
685 crate::trace2e_tracing::init();
686 let mut sequencer = ServiceBuilder::new()
687 .layer(TimeoutLayer::new(Duration::from_millis(1)))
688 .layer(layer_fn(|inner| WaitingQueueService::new(inner, None)))
689 .service(SequencerService::default());
690
691 let process = Resource::new_process_mock(0);
692 let file = Resource::new_file("/tmp/test".to_string());
693
694 assert_eq!(
695 sequencer
696 .call(SequencerRequest::ReserveFlow {
697 source: process.clone(),
698 destination: file.clone(),
699 })
700 .await
701 .unwrap(),
702 SequencerResponse::FlowReserved
703 );
704
705 assert!(
706 sequencer
707 .call(SequencerRequest::ReserveFlow { source: file, destination: process })
708 .await
709 .is_err(),
710 );
711 }
712
713 #[tokio::test]
714 async fn unit_waiting_queue_layer_writers_interference_resolution() {
715 crate::trace2e_tracing::init();
716 let mut sequencer = ServiceBuilder::new()
717 .layer(TimeoutLayer::new(Duration::from_millis(10)))
718 .layer(layer_fn(|inner| WaitingQueueService::new(inner, Some(1))))
719 .service(SequencerService::default());
720
721 let process = Resource::new_process_mock(0);
722 let file1 = Resource::new_file("/tmp/test1".to_string());
723 let file2 = Resource::new_file("/tmp/test2".to_string());
724
725 assert_eq!(
726 sequencer
727 .call(SequencerRequest::ReserveFlow {
728 source: file1.clone(),
729 destination: process.clone(),
730 })
731 .await
732 .unwrap(),
733 SequencerResponse::FlowReserved
734 );
735
736 let mut sequencer_clone = sequencer.clone();
737 let process_clone = process.clone();
738 let file2_clone = file2.clone();
739 let res = tokio::spawn(async move {
740 sequencer_clone
741 .call(SequencerRequest::ReserveFlow {
742 source: file2_clone,
743 destination: process_clone,
744 })
745 .await
746 });
747
748 tokio::time::sleep(Duration::from_millis(5)).await;
749 assert_eq!(
750 sequencer
751 .call(SequencerRequest::ReleaseFlow { destination: process.clone() })
752 .await
753 .unwrap(),
754 SequencerResponse::FlowReleased {
755 source: Some(file1),
756 destination: Some(process.clone())
757 }
758 );
759
760 assert_eq!(res.await.unwrap().unwrap(), SequencerResponse::FlowReserved);
761
762 assert_eq!(
763 sequencer
764 .call(SequencerRequest::ReleaseFlow { destination: process.clone() })
765 .await
766 .unwrap(),
767 SequencerResponse::FlowReleased { source: Some(file2), destination: Some(process) }
768 );
769 }
770 #[tokio::test]
771 async fn unit_waiting_queue_layer_writer_readers_interference_resolution() {
772 crate::trace2e_tracing::init();
773 let mut sequencer = ServiceBuilder::new()
774 .layer(TimeoutLayer::new(Duration::from_millis(2)))
775 .layer(layer_fn(|inner| WaitingQueueService::new(inner, Some(1))))
776 .service(SequencerService::default());
777
778 let process1 = Resource::new_process_mock(0);
779 let process2 = Resource::new_process_mock(1);
780 let process3 = Resource::new_process_mock(2);
781 let process4 = Resource::new_process_mock(3);
782 let file = Resource::new_file("/tmp/test".to_string());
783
784 assert_eq!(
785 sequencer
786 .call(SequencerRequest::ReserveFlow {
787 source: file.clone(),
788 destination: process1.clone(),
789 })
790 .await
791 .unwrap(),
792 SequencerResponse::FlowReserved
793 );
794
795 assert_eq!(
796 sequencer
797 .call(SequencerRequest::ReserveFlow {
798 source: file.clone(),
799 destination: process2.clone(),
800 })
801 .await
802 .unwrap(),
803 SequencerResponse::FlowReserved
804 );
805
806 assert_eq!(
807 sequencer
808 .call(SequencerRequest::ReserveFlow {
809 source: file.clone(),
810 destination: process3.clone(),
811 })
812 .await
813 .unwrap(),
814 SequencerResponse::FlowReserved
815 );
816
817 let mut sequencer_clone = sequencer.clone();
818 let file_clone = file.clone();
819 let process4_clone = process4.clone();
820 let res = tokio::spawn(async move {
821 sequencer_clone
822 .call(SequencerRequest::ReserveFlow {
823 source: process4_clone,
824 destination: file_clone,
825 })
826 .await
827 });
828
829 tokio::time::sleep(Duration::from_millis(1)).await;
830 assert_eq!(
831 sequencer
832 .call(SequencerRequest::ReleaseFlow { destination: process1.clone() })
833 .await
834 .unwrap(),
835 SequencerResponse::FlowReleased {
836 source: None, destination: Some(process1)
838 }
839 );
840 assert_eq!(
841 sequencer
842 .call(SequencerRequest::ReleaseFlow { destination: process2.clone() })
843 .await
844 .unwrap(),
845 SequencerResponse::FlowReleased {
846 source: None, destination: Some(process2)
848 }
849 );
850 assert_eq!(
851 sequencer
852 .call(SequencerRequest::ReleaseFlow { destination: process3.clone() })
853 .await
854 .unwrap(),
855 SequencerResponse::FlowReleased {
856 source: Some(file.clone()),
858 destination: Some(process3)
859 }
860 );
861
862 assert_eq!(res.await.unwrap().unwrap(), SequencerResponse::FlowReserved);
864
865 assert_eq!(
866 sequencer
867 .call(SequencerRequest::ReleaseFlow { destination: file.clone() })
868 .await
869 .unwrap(),
870 SequencerResponse::FlowReleased { source: Some(process4), destination: Some(file) }
871 );
872 }
873
874 #[tokio::test]
875 async fn unit_waiting_queue_layer_reader_writer_interference_resolution() {
876 crate::trace2e_tracing::init();
877 let mut sequencer = ServiceBuilder::new()
878 .layer(TimeoutLayer::new(Duration::from_millis(2)))
879 .layer(layer_fn(|inner| WaitingQueueService::new(inner, Some(1))))
880 .service(SequencerService::default());
881
882 let process = Resource::new_process_mock(0);
883 let file1 = Resource::new_file("/tmp/test1".to_string());
884 let file2 = Resource::new_file("/tmp/test2".to_string());
885
886 assert_eq!(
887 sequencer
888 .call(SequencerRequest::ReserveFlow {
889 source: file1.clone(),
890 destination: process.clone(),
891 })
892 .await
893 .unwrap(),
894 SequencerResponse::FlowReserved
895 );
896
897 let mut sequencer_clone = sequencer.clone();
898 let process_clone = process.clone();
899 let file2_clone = file2.clone();
900 let res = tokio::spawn(async move {
901 sequencer_clone
902 .call(SequencerRequest::ReserveFlow {
903 source: process_clone,
904 destination: file2_clone,
905 })
906 .await
907 });
908
909 tokio::time::sleep(Duration::from_millis(1)).await;
910 assert_eq!(
911 sequencer
912 .call(SequencerRequest::ReleaseFlow { destination: process.clone() })
913 .await
914 .unwrap(),
915 SequencerResponse::FlowReleased {
916 source: Some(file1),
917 destination: Some(process.clone())
918 }
919 );
920
921 assert_eq!(res.await.unwrap().unwrap(), SequencerResponse::FlowReserved);
922
923 assert_eq!(
924 sequencer
925 .call(SequencerRequest::ReleaseFlow { destination: file2.clone() })
926 .await
927 .unwrap(),
928 SequencerResponse::FlowReleased { source: Some(process), destination: Some(file2) }
929 );
930 }
931}