1use std::{collections::VecDeque, pin::Pin, sync::Arc, task::Poll};
2
3use dashmap::DashMap;
4use tokio::{join, sync::oneshot};
5use tower::Service;
6#[cfg(feature = "trace2e_tracing")]
7use tracing::{debug, info};
8
9use crate::traceability::{
10 api::{SequencerRequest, SequencerResponse},
11 error::TraceabilityError,
12 naming::Resource,
13};
14
15#[derive(Clone, Default)]
20pub struct SequencerService {
21 flows: Arc<DashMap<Resource, Resource>>,
22}
23
24impl SequencerService {
25 async fn make_flow(
28 &self,
29 source: Resource,
30 destination: Resource,
31 ) -> Result<SequencerResponse, TraceabilityError> {
32 let source_available = !self.flows.contains_key(&source);
34 let destination_available = !self.flows.contains_key(&destination)
36 && !self.flows.iter().any(|entry| entry.value() == &destination);
37
38 if source_available && destination_available {
40 self.flows.insert(destination, source);
41 Ok(SequencerResponse::FlowReserved)
42 } else if source_available {
43 Err(TraceabilityError::UnavailableDestination(destination))
44 } else if destination_available {
45 Err(TraceabilityError::UnavailableSource(source))
46 } else {
47 Err(TraceabilityError::UnavailableSourceAndDestination(source, destination))
48 }
49 }
50
51 async fn drop_flow(
54 &self,
55 destination: &Resource,
56 ) -> Result<SequencerResponse, TraceabilityError> {
57 if let Some((_, source)) = self.flows.remove(destination) {
58 if self.flows.iter().any(|entry| entry.value() == &source) {
59 Ok(SequencerResponse::FlowReleased {
62 source: None,
63 destination: Some(destination.to_owned()),
64 })
65 } else {
66 Ok(SequencerResponse::FlowReleased {
69 source: Some(source),
70 destination: Some(destination.to_owned()),
71 })
72 }
73 } else {
74 Ok(SequencerResponse::FlowReleased { source: None, destination: None })
76 }
77 }
78}
79
80impl Service<SequencerRequest> for SequencerService {
81 type Response = SequencerResponse;
82 type Error = TraceabilityError;
83 type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
84
85 fn poll_ready(&mut self, _: &mut std::task::Context<'_>) -> Poll<Result<(), Self::Error>> {
86 Poll::Ready(Ok(()))
87 }
88
89 fn call(&mut self, request: SequencerRequest) -> Self::Future {
90 let this = self.clone();
91 Box::pin(async move {
92 match request {
93 SequencerRequest::ReserveFlow { source, destination } => {
94 #[cfg(feature = "trace2e_tracing")]
95 info!(
96 "[sequencer] ReserveFlow: source: {:?}, destination: {:?}",
97 source, destination
98 );
99 this.make_flow(source, destination).await
100 }
101 SequencerRequest::ReleaseFlow { destination } => {
102 #[cfg(feature = "trace2e_tracing")]
103 info!("[sequencer] ReleaseFlow: destination: {:?}", destination);
104 this.drop_flow(&destination).await
105 }
106 }
107 })
108 }
109}
110
111#[derive(Clone)]
115pub struct WaitingQueueService<T> {
116 inner: T,
117 waiting_queue: Arc<DashMap<Resource, VecDeque<oneshot::Sender<()>>>>,
118 max_retries: u32,
119}
120
121impl<T> WaitingQueueService<T> {
122 pub fn new(inner: T, max_retries: Option<u32>) -> Self {
123 Self {
124 inner,
125 waiting_queue: Arc::new(DashMap::new()),
126 max_retries: max_retries.unwrap_or_default(),
128 }
129 }
130
131 async fn join_waiting_queue(&self, resource: &Resource) -> oneshot::Receiver<()> {
132 let (tx, rx) = oneshot::channel();
133 if let Some(mut queue) = self.waiting_queue.get_mut(resource) {
134 queue.push_back(tx);
135 } else {
136 let mut queue = VecDeque::new();
137 queue.push_back(tx);
138 self.waiting_queue.insert(resource.to_owned(), queue);
139 }
140 rx
141 }
142
143 async fn notify_waiting_queue(&self, resource: &Option<Resource>) {
144 if let Some(resource) = resource
145 && let Some(mut queue) = self.waiting_queue.get_mut(resource)
146 && let Some(tx) = queue.pop_front()
147 {
148 tx.send(()).unwrap();
149 }
150 }
151}
152
153impl<T> Service<SequencerRequest> for WaitingQueueService<T>
154where
155 T: Service<SequencerRequest, Response = SequencerResponse, Error = TraceabilityError>
156 + Clone
157 + Sync
158 + Send
159 + 'static,
160 T::Future: Send,
161{
162 type Response = T::Response;
163 type Error = T::Error;
164 type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
165
166 fn poll_ready(&mut self, cx: &mut std::task::Context<'_>) -> Poll<Result<(), Self::Error>> {
167 self.inner.poll_ready(cx)
168 }
169
170 fn call(&mut self, req: SequencerRequest) -> Self::Future {
171 let mut inner = self.inner.clone();
172 let max_tries = self.max_retries + 1;
173 let this = self.clone();
174 Box::pin(async move {
175 for _ in 0..max_tries {
176 match inner.call(req.clone()).await {
177 Ok(SequencerResponse::FlowReserved) => {
178 #[cfg(feature = "trace2e_tracing")]
179 debug!("[sequencer] FlowReserved");
180 return Ok(SequencerResponse::FlowReserved);
181 }
182 Ok(SequencerResponse::FlowReleased { source, destination }) => {
183 join!(
184 this.notify_waiting_queue(&source),
185 this.notify_waiting_queue(&destination)
186 );
187 #[cfg(feature = "trace2e_tracing")]
188 debug!(
189 "[sequencer] FlowReleased: source: {:?}, destination: {:?}",
190 source, destination
191 );
192 return Ok(SequencerResponse::FlowReleased { source, destination });
193 }
194 Err(TraceabilityError::UnavailableSource(source)) => {
195 let rx = this.join_waiting_queue(&source).await;
196 #[cfg(feature = "trace2e_tracing")]
197 debug!("[sequencer] waiting source: {:?}", source);
198 let _ = rx.await;
199 }
200 Err(TraceabilityError::UnavailableDestination(destination)) => {
201 let rx = this.join_waiting_queue(&destination).await;
202 #[cfg(feature = "trace2e_tracing")]
203 debug!("[sequencer] waiting destination: {:?}", destination);
204 let _ = rx.await;
205 }
206 Err(TraceabilityError::UnavailableSourceAndDestination(
207 source,
208 destination,
209 )) => {
210 let rx1 = this.join_waiting_queue(&source).await;
211 let rx2 = this.join_waiting_queue(&destination).await;
212 #[cfg(feature = "trace2e_tracing")]
213 debug!(
214 "[sequencer] waiting source: {:?}, destination: {:?}",
215 source, destination
216 );
217 let (_, _) = join!(rx1, rx2);
218 }
219 Err(e) => return Err(e),
220 }
221 }
222 Err(TraceabilityError::ReachedMaxRetriesWaitingQueue)
223 })
224 }
225}
226
227#[cfg(test)]
228mod tests {
229 use std::time::Duration;
230
231 use tower::{ServiceBuilder, layer::layer_fn, timeout::TimeoutLayer};
232
233 use super::*;
234 use crate::traceability::naming::Resource;
235 #[tokio::test]
236 async fn unit_sequencer_impl_flow() {
237 #[cfg(feature = "trace2e_tracing")]
238 crate::trace2e_tracing::init();
239 let sequencer = SequencerService::default();
240 let process = Resource::new_process_mock(0);
241 let file = Resource::new_file("/tmp/test".to_string());
242 assert_eq!(
243 sequencer.make_flow(process.clone(), file.clone()).await,
244 Ok(SequencerResponse::FlowReserved)
245 );
246 assert_eq!(
247 sequencer.drop_flow(&file).await.unwrap(),
248 SequencerResponse::FlowReleased {
249 source: Some(process.clone()),
250 destination: Some(file.clone())
251 }
252 );
253 assert_eq!(
254 sequencer.make_flow(process.clone(), file.clone()).await,
255 Ok(SequencerResponse::FlowReserved)
256 );
257 assert_eq!(
258 sequencer.drop_flow(&file).await.unwrap(),
259 SequencerResponse::FlowReleased { source: Some(process), destination: Some(file) }
260 );
261 }
262
263 #[tokio::test]
264 async fn unit_sequencer_impl_flow_drop_already_dropped() {
265 #[cfg(feature = "trace2e_tracing")]
266 crate::trace2e_tracing::init();
267 let sequencer = SequencerService::default();
268 let process = Resource::new_process_mock(0);
269 let file = Resource::new_file("/tmp/test".to_string());
270 assert_eq!(
271 sequencer.make_flow(process.clone(), file.clone()).await,
272 Ok(SequencerResponse::FlowReserved)
273 );
274 assert_eq!(
275 sequencer.drop_flow(&file).await.unwrap(),
276 SequencerResponse::FlowReleased {
277 source: Some(process),
278 destination: Some(file.clone())
279 }
280 );
281 assert_eq!(
283 sequencer.drop_flow(&file).await.unwrap(),
284 SequencerResponse::FlowReleased { source: None, destination: None }
285 );
286 }
287
288 #[tokio::test]
289 async fn unit_sequencer_impl_flow_readers_drop() {
290 #[cfg(feature = "trace2e_tracing")]
291 crate::trace2e_tracing::init();
292 let sequencer = SequencerService::default();
293 let process = Resource::new_process_mock(0);
294 let file1 = Resource::new_file("/tmp/test1".to_string());
295 let file2 = Resource::new_file("/tmp/test2".to_string());
296 let file3 = Resource::new_file("/tmp/test3".to_string());
297 let file4 = Resource::new_file("/tmp/test4".to_string());
298 assert_eq!(
299 sequencer.make_flow(process.clone(), file1.clone()).await,
300 Ok(SequencerResponse::FlowReserved)
301 );
302 assert_eq!(
303 sequencer.make_flow(process.clone(), file2.clone()).await,
304 Ok(SequencerResponse::FlowReserved)
305 );
306 assert_eq!(
307 sequencer.make_flow(process.clone(), file3.clone()).await,
308 Ok(SequencerResponse::FlowReserved)
309 );
310
311 assert_eq!(
313 sequencer.make_flow(file4.clone(), process.clone()).await,
314 Err(TraceabilityError::UnavailableDestination(process.clone()))
315 );
316
317 assert_eq!(
319 sequencer.drop_flow(&file2).await.unwrap(),
320 SequencerResponse::FlowReleased { source: None, destination: Some(file2) }
321 );
322 assert_eq!(
323 sequencer.drop_flow(&file1).await.unwrap(),
324 SequencerResponse::FlowReleased { source: None, destination: Some(file1) }
325 );
326
327 assert_eq!(
329 sequencer.make_flow(file4.clone(), process.clone()).await,
330 Err(TraceabilityError::UnavailableDestination(process.clone()))
331 );
332
333 assert_eq!(
335 sequencer.drop_flow(&file3).await.unwrap(),
336 SequencerResponse::FlowReleased {
337 source: Some(process.clone()),
338 destination: Some(file3)
339 }
340 );
341
342 assert_eq!(sequencer.make_flow(file4, process).await, Ok(SequencerResponse::FlowReserved));
344 }
345
346 #[tokio::test]
347 async fn unit_sequencer_impl_flow_interference() {
348 #[cfg(feature = "trace2e_tracing")]
349 crate::trace2e_tracing::init();
350 let sequencer = SequencerService::default();
351 let process1 = Resource::new_process_mock(1);
352 let process2 = Resource::new_process_mock(2);
353 let file1 = Resource::new_file("/tmp/test1".to_string());
354 let file2 = Resource::new_file("/tmp/test2".to_string());
355
356 assert_eq!(
357 sequencer.make_flow(file1.clone(), process1.clone()).await,
358 Ok(SequencerResponse::FlowReserved)
359 );
360
361 assert_eq!(
364 sequencer.make_flow(file1.clone(), process1.clone()).await,
365 Err(TraceabilityError::UnavailableDestination(process1.clone()))
366 );
367
368 assert_eq!(
370 sequencer.make_flow(process2, file1.clone()).await,
371 Err(TraceabilityError::UnavailableDestination(file1.clone()))
372 );
373
374 assert_eq!(
376 sequencer.make_flow(process1.clone(), file2).await,
377 Err(TraceabilityError::UnavailableSource(process1.clone()))
378 );
379
380 assert_eq!(
382 sequencer.make_flow(process1.clone(), file1.clone()).await,
383 Err(TraceabilityError::UnavailableSourceAndDestination(process1, file1))
384 );
385 }
386
387 #[tokio::test]
388 async fn unit_sequencer_layer_flow() {
389 #[cfg(feature = "trace2e_tracing")]
390 crate::trace2e_tracing::init();
391 let mut sequencer = SequencerService::default();
392
393 let process = Resource::new_process_mock(0);
394 let file = Resource::new_file("/tmp/test".to_string());
395
396 assert_eq!(
397 sequencer
398 .call(SequencerRequest::ReserveFlow {
399 source: process.clone(),
400 destination: file.clone(),
401 })
402 .await
403 .unwrap(),
404 SequencerResponse::FlowReserved
405 );
406
407 assert_eq!(
408 sequencer
409 .call(SequencerRequest::ReleaseFlow { destination: file.clone() })
410 .await
411 .unwrap(),
412 SequencerResponse::FlowReleased { source: Some(process), destination: Some(file) }
413 );
414 }
415
416 #[tokio::test]
417 async fn unit_sequencer_layer_flow_interference() {
418 #[cfg(feature = "trace2e_tracing")]
419 crate::trace2e_tracing::init();
420 let mut sequencer = SequencerService::default();
421
422 let process = Resource::new_process_mock(0);
423 let file = Resource::new_file("/tmp/test".to_string());
424
425 assert_eq!(
426 sequencer
427 .call(SequencerRequest::ReserveFlow {
428 source: process.clone(),
429 destination: file.clone(),
430 })
431 .await
432 .unwrap(),
433 SequencerResponse::FlowReserved
434 );
435
436 assert_eq!(
437 sequencer
438 .call(SequencerRequest::ReserveFlow { source: process, destination: file.clone() })
439 .await
440 .unwrap_err(),
441 TraceabilityError::UnavailableDestination(file)
442 );
443 }
444
445 #[tokio::test]
446 async fn unit_sequencer_layer_flow_circular() {
447 #[cfg(feature = "trace2e_tracing")]
448 crate::trace2e_tracing::init();
449 let mut sequencer = SequencerService::default();
450
451 let process = Resource::new_process_mock(0);
452 let file = Resource::new_file("/tmp/test".to_string());
453
454 assert_eq!(
455 sequencer
456 .call(SequencerRequest::ReserveFlow {
457 source: process.clone(),
458 destination: file.clone(),
459 })
460 .await
461 .unwrap(),
462 SequencerResponse::FlowReserved
463 );
464
465 assert_eq!(
466 sequencer
467 .call(SequencerRequest::ReserveFlow {
468 source: file.clone(),
469 destination: process.clone(),
470 })
471 .await
472 .unwrap_err(),
473 TraceabilityError::UnavailableSourceAndDestination(file, process)
474 );
475 }
476
477 #[tokio::test]
478 async fn unit_sequencer_layer_flow_sequence() {
479 #[cfg(feature = "trace2e_tracing")]
480 crate::trace2e_tracing::init();
481 let mut sequencer = SequencerService::default();
482
483 let process = Resource::new_process_mock(0);
484 let file1 = Resource::new_file("/tmp/test1".to_string());
485 let file2 = Resource::new_file("/tmp/test2".to_string());
486
487 assert_eq!(
488 sequencer
489 .call(SequencerRequest::ReserveFlow {
490 source: file1.clone(),
491 destination: process.clone(),
492 })
493 .await
494 .unwrap(),
495 SequencerResponse::FlowReserved
496 );
497
498 assert_eq!(
499 sequencer
500 .call(SequencerRequest::ReleaseFlow { destination: process.clone() })
501 .await
502 .unwrap(),
503 SequencerResponse::FlowReleased {
504 source: Some(file1),
505 destination: Some(process.clone())
506 }
507 );
508
509 assert_eq!(
510 sequencer
511 .call(SequencerRequest::ReserveFlow {
512 source: file2.clone(),
513 destination: process.clone(),
514 })
515 .await
516 .unwrap(),
517 SequencerResponse::FlowReserved
518 );
519
520 assert_eq!(
521 sequencer
522 .call(SequencerRequest::ReleaseFlow { destination: process.clone() })
523 .await
524 .unwrap(),
525 SequencerResponse::FlowReleased { source: Some(file2), destination: Some(process) }
526 );
527 }
528
529 #[tokio::test]
530 async fn unit_sequencer_layer_flow_sequence_interference() {
531 #[cfg(feature = "trace2e_tracing")]
532 crate::trace2e_tracing::init();
533 let mut sequencer = SequencerService::default();
534
535 let process1 = Resource::new_process_mock(1);
536 let process2 = Resource::new_process_mock(2);
537 let file1 = Resource::new_file("/tmp/test1".to_string());
538 let file2 = Resource::new_file("/tmp/test2".to_string());
539
540 assert_eq!(
541 sequencer
542 .call(SequencerRequest::ReserveFlow {
543 source: file1.clone(),
544 destination: process1.clone(),
545 })
546 .await
547 .unwrap(),
548 SequencerResponse::FlowReserved
549 );
550
551 assert_eq!(
553 sequencer
554 .call(SequencerRequest::ReserveFlow {
555 source: file1.clone(),
556 destination: process1.clone(),
557 })
558 .await
559 .unwrap_err(),
560 TraceabilityError::UnavailableDestination(process1.clone())
561 );
562
563 assert_eq!(
565 sequencer
566 .call(SequencerRequest::ReserveFlow {
567 source: process2,
568 destination: file1.clone(),
569 })
570 .await
571 .unwrap_err(),
572 TraceabilityError::UnavailableDestination(file1.clone())
573 );
574
575 assert_eq!(
577 sequencer
578 .call(SequencerRequest::ReserveFlow {
579 source: process1.clone(),
580 destination: file2,
581 })
582 .await
583 .unwrap_err(),
584 TraceabilityError::UnavailableSource(process1.clone())
585 );
586
587 assert_eq!(
589 sequencer
590 .call(SequencerRequest::ReserveFlow {
591 source: process1.clone(),
592 destination: file1.clone(),
593 })
594 .await
595 .unwrap_err(),
596 TraceabilityError::UnavailableSourceAndDestination(process1, file1)
597 );
598 }
599 #[tokio::test]
600 async fn unit_sequencer_layer_flow_sequence_interference_multiple_share_releases() {
601 #[cfg(feature = "trace2e_tracing")]
602 crate::trace2e_tracing::init();
603 let mut sequencer = SequencerService::default();
604
605 let process = Resource::new_process_mock(0);
606 let file1 = Resource::new_file("/tmp/test1".to_string());
607 let file2 = Resource::new_file("/tmp/test2".to_string());
608 let file3 = Resource::new_file("/tmp/test3".to_string());
609
610 assert_eq!(
611 sequencer
612 .call(SequencerRequest::ReserveFlow {
613 source: process.clone(),
614 destination: file1.clone(),
615 })
616 .await
617 .unwrap(),
618 SequencerResponse::FlowReserved
619 );
620
621 assert_eq!(
622 sequencer
623 .call(SequencerRequest::ReserveFlow {
624 source: process.clone(),
625 destination: file2.clone(),
626 })
627 .await
628 .unwrap(),
629 SequencerResponse::FlowReserved
630 );
631
632 assert_eq!(
633 sequencer
634 .call(SequencerRequest::ReserveFlow {
635 source: process.clone(),
636 destination: file3.clone(),
637 })
638 .await
639 .unwrap(),
640 SequencerResponse::FlowReserved
641 );
642
643 assert_eq!(
644 sequencer
645 .call(SequencerRequest::ReleaseFlow { destination: file2.clone() })
646 .await
647 .unwrap(),
648 SequencerResponse::FlowReleased { source: None, destination: Some(file2) }
649 );
650 assert_eq!(
651 sequencer
652 .call(SequencerRequest::ReleaseFlow { destination: file3.clone() })
653 .await
654 .unwrap(),
655 SequencerResponse::FlowReleased { source: None, destination: Some(file3) }
656 );
657
658 assert_eq!(
659 sequencer
660 .call(SequencerRequest::ReleaseFlow { destination: file1.clone() })
661 .await
662 .unwrap(),
663 SequencerResponse::FlowReleased { source: Some(process), destination: Some(file1) }
664 );
665 }
666
667 #[tokio::test]
668 async fn unit_waiting_queue_layer_flow_interference() {
669 #[cfg(feature = "trace2e_tracing")]
670 crate::trace2e_tracing::init();
671 let mut sequencer = ServiceBuilder::new()
672 .layer(TimeoutLayer::new(Duration::from_millis(1)))
673 .layer(layer_fn(|inner| WaitingQueueService::new(inner, None)))
674 .service(SequencerService::default());
675
676 let process = Resource::new_process_mock(0);
677 let file = Resource::new_file("/tmp/test".to_string());
678
679 assert_eq!(
680 sequencer
681 .call(SequencerRequest::ReserveFlow {
682 source: process.clone(),
683 destination: file.clone(),
684 })
685 .await
686 .unwrap(),
687 SequencerResponse::FlowReserved
688 );
689
690 assert!(
691 sequencer
692 .call(SequencerRequest::ReserveFlow { source: process, destination: file })
693 .await
694 .is_err(),
695 );
696 }
697
698 #[tokio::test]
699 async fn unit_waiting_queue_layer_flow_circular() {
700 #[cfg(feature = "trace2e_tracing")]
701 crate::trace2e_tracing::init();
702 let mut sequencer = ServiceBuilder::new()
703 .layer(TimeoutLayer::new(Duration::from_millis(1)))
704 .layer(layer_fn(|inner| WaitingQueueService::new(inner, None)))
705 .service(SequencerService::default());
706
707 let process = Resource::new_process_mock(0);
708 let file = Resource::new_file("/tmp/test".to_string());
709
710 assert_eq!(
711 sequencer
712 .call(SequencerRequest::ReserveFlow {
713 source: process.clone(),
714 destination: file.clone(),
715 })
716 .await
717 .unwrap(),
718 SequencerResponse::FlowReserved
719 );
720
721 assert!(
722 sequencer
723 .call(SequencerRequest::ReserveFlow { source: file, destination: process })
724 .await
725 .is_err(),
726 );
727 }
728
729 #[tokio::test]
730 async fn unit_waiting_queue_layer_writers_interference_resolution() {
731 #[cfg(feature = "trace2e_tracing")]
732 crate::trace2e_tracing::init();
733 let mut sequencer = ServiceBuilder::new()
734 .layer(TimeoutLayer::new(Duration::from_millis(10)))
735 .layer(layer_fn(|inner| WaitingQueueService::new(inner, Some(1))))
736 .service(SequencerService::default());
737
738 let process = Resource::new_process_mock(0);
739 let file1 = Resource::new_file("/tmp/test1".to_string());
740 let file2 = Resource::new_file("/tmp/test2".to_string());
741
742 assert_eq!(
743 sequencer
744 .call(SequencerRequest::ReserveFlow {
745 source: file1.clone(),
746 destination: process.clone(),
747 })
748 .await
749 .unwrap(),
750 SequencerResponse::FlowReserved
751 );
752
753 let mut sequencer_clone = sequencer.clone();
754 let process_clone = process.clone();
755 let file2_clone = file2.clone();
756 let res = tokio::spawn(async move {
757 sequencer_clone
758 .call(SequencerRequest::ReserveFlow {
759 source: file2_clone,
760 destination: process_clone,
761 })
762 .await
763 });
764
765 tokio::time::sleep(Duration::from_millis(5)).await;
766 assert_eq!(
767 sequencer
768 .call(SequencerRequest::ReleaseFlow { destination: process.clone() })
769 .await
770 .unwrap(),
771 SequencerResponse::FlowReleased {
772 source: Some(file1),
773 destination: Some(process.clone())
774 }
775 );
776
777 assert_eq!(res.await.unwrap().unwrap(), SequencerResponse::FlowReserved);
778
779 assert_eq!(
780 sequencer
781 .call(SequencerRequest::ReleaseFlow { destination: process.clone() })
782 .await
783 .unwrap(),
784 SequencerResponse::FlowReleased { source: Some(file2), destination: Some(process) }
785 );
786 }
787 #[tokio::test]
788 async fn unit_waiting_queue_layer_writer_readers_interference_resolution() {
789 #[cfg(feature = "trace2e_tracing")]
790 crate::trace2e_tracing::init();
791 let mut sequencer = ServiceBuilder::new()
792 .layer(TimeoutLayer::new(Duration::from_millis(2)))
793 .layer(layer_fn(|inner| WaitingQueueService::new(inner, Some(1))))
794 .service(SequencerService::default());
795
796 let process1 = Resource::new_process_mock(0);
797 let process2 = Resource::new_process_mock(1);
798 let process3 = Resource::new_process_mock(2);
799 let process4 = Resource::new_process_mock(3);
800 let file = Resource::new_file("/tmp/test".to_string());
801
802 assert_eq!(
803 sequencer
804 .call(SequencerRequest::ReserveFlow {
805 source: file.clone(),
806 destination: process1.clone(),
807 })
808 .await
809 .unwrap(),
810 SequencerResponse::FlowReserved
811 );
812
813 assert_eq!(
814 sequencer
815 .call(SequencerRequest::ReserveFlow {
816 source: file.clone(),
817 destination: process2.clone(),
818 })
819 .await
820 .unwrap(),
821 SequencerResponse::FlowReserved
822 );
823
824 assert_eq!(
825 sequencer
826 .call(SequencerRequest::ReserveFlow {
827 source: file.clone(),
828 destination: process3.clone(),
829 })
830 .await
831 .unwrap(),
832 SequencerResponse::FlowReserved
833 );
834
835 let mut sequencer_clone = sequencer.clone();
836 let file_clone = file.clone();
837 let process4_clone = process4.clone();
838 let res = tokio::spawn(async move {
839 sequencer_clone
840 .call(SequencerRequest::ReserveFlow {
841 source: process4_clone,
842 destination: file_clone,
843 })
844 .await
845 });
846
847 tokio::time::sleep(Duration::from_millis(1)).await;
848 assert_eq!(
849 sequencer
850 .call(SequencerRequest::ReleaseFlow { destination: process1.clone() })
851 .await
852 .unwrap(),
853 SequencerResponse::FlowReleased {
854 source: None, destination: Some(process1)
856 }
857 );
858 assert_eq!(
859 sequencer
860 .call(SequencerRequest::ReleaseFlow { destination: process2.clone() })
861 .await
862 .unwrap(),
863 SequencerResponse::FlowReleased {
864 source: None, destination: Some(process2)
866 }
867 );
868 assert_eq!(
869 sequencer
870 .call(SequencerRequest::ReleaseFlow { destination: process3.clone() })
871 .await
872 .unwrap(),
873 SequencerResponse::FlowReleased {
874 source: Some(file.clone()),
876 destination: Some(process3)
877 }
878 );
879
880 assert_eq!(res.await.unwrap().unwrap(), SequencerResponse::FlowReserved);
882
883 assert_eq!(
884 sequencer
885 .call(SequencerRequest::ReleaseFlow { destination: file.clone() })
886 .await
887 .unwrap(),
888 SequencerResponse::FlowReleased { source: Some(process4), destination: Some(file) }
889 );
890 }
891
892 #[tokio::test]
893 async fn unit_waiting_queue_layer_reader_writer_interference_resolution() {
894 #[cfg(feature = "trace2e_tracing")]
895 crate::trace2e_tracing::init();
896 let mut sequencer = ServiceBuilder::new()
897 .layer(TimeoutLayer::new(Duration::from_millis(2)))
898 .layer(layer_fn(|inner| WaitingQueueService::new(inner, Some(1))))
899 .service(SequencerService::default());
900
901 let process = Resource::new_process_mock(0);
902 let file1 = Resource::new_file("/tmp/test1".to_string());
903 let file2 = Resource::new_file("/tmp/test2".to_string());
904
905 assert_eq!(
906 sequencer
907 .call(SequencerRequest::ReserveFlow {
908 source: file1.clone(),
909 destination: process.clone(),
910 })
911 .await
912 .unwrap(),
913 SequencerResponse::FlowReserved
914 );
915
916 let mut sequencer_clone = sequencer.clone();
917 let process_clone = process.clone();
918 let file2_clone = file2.clone();
919 let res = tokio::spawn(async move {
920 sequencer_clone
921 .call(SequencerRequest::ReserveFlow {
922 source: process_clone,
923 destination: file2_clone,
924 })
925 .await
926 });
927
928 tokio::time::sleep(Duration::from_millis(1)).await;
929 assert_eq!(
930 sequencer
931 .call(SequencerRequest::ReleaseFlow { destination: process.clone() })
932 .await
933 .unwrap(),
934 SequencerResponse::FlowReleased {
935 source: Some(file1),
936 destination: Some(process.clone())
937 }
938 );
939
940 assert_eq!(res.await.unwrap().unwrap(), SequencerResponse::FlowReserved);
941
942 assert_eq!(
943 sequencer
944 .call(SequencerRequest::ReleaseFlow { destination: file2.clone() })
945 .await
946 .unwrap(),
947 SequencerResponse::FlowReleased { source: Some(process), destination: Some(file2) }
948 );
949 }
950}