1use std::{
2 borrow::Cow,
3 thread::{spawn, JoinHandle, Scope, ScopedJoinHandle},
4};
5
6use anyhow::Result;
7use crossbeam::channel::{bounded, Receiver, Sender};
8use indicatif::ProgressBar;
9
10use crate::util::logging::{measure_and_recv, measure_and_send, meter_bar};
11
12use super::ObjectWriter;
13
14enum WorkHandle<'scope> {
15 Static(JoinHandle<Result<usize>>),
16 Scoped(ScopedJoinHandle<'scope, Result<usize>>),
17}
18
19fn ferry<T, W>(recv: Receiver<T>, writer: W, pb: ProgressBar) -> Result<usize>
20where
21 T: Send + Sync + 'static,
22 W: ObjectWriter<T>,
23{
24 let mut writer = writer; while let Some(obj) = measure_and_recv(&recv, &pb) {
27 writer.write_object(obj)?;
28 }
29
30 pb.finish_and_clear();
31 writer.finish_objects()
32}
33
34pub struct ThreadObjectWriterBuilder<W, F: Send + FnOnce() -> Result<W>> {
35 thunk: F,
36 name: String,
37 capacity: usize,
38}
39
40pub struct ThreadObjectWriter<'scope, T>
42where
43 T: Send + Sync + 'static,
44{
45 sender: Sender<T>,
46 handle: WorkHandle<'scope>,
47 meter: ProgressBar,
48}
49
50impl<'scope, T> ThreadObjectWriter<'scope, T>
51where
52 T: Send + Sync + 'scope,
53{
54 pub fn wrap<W>(writer: W) -> ThreadObjectWriterBuilder<W, impl Send + FnOnce() -> Result<W>>
55 where
56 W: ObjectWriter<T> + Send + Sync + 'scope,
57 {
58 ThreadObjectWriterBuilder {
59 thunk: move || Ok(writer),
60 name: "unnamed".into(),
61 capacity: 100,
62 }
63 }
64}
65
66impl<W, F: Send + FnOnce() -> Result<W>> ThreadObjectWriterBuilder<W, F> {
67 pub fn with_capacity(self, cap: usize) -> Self {
69 ThreadObjectWriterBuilder {
70 capacity: cap,
71 ..self
72 }
73 }
74
75 pub fn with_name<S: Into<Cow<'static, str>>>(self, name: S) -> Self {
77 let name: Cow<'static, str> = name.into();
78 ThreadObjectWriterBuilder {
79 name: name.to_string(),
80 ..self
81 }
82 }
83
84 pub fn spawn_scoped<'scope, 'env, T>(
86 self,
87 scope: &'scope Scope<'scope, 'env>,
88 ) -> ThreadObjectWriter<'scope, T>
89 where
90 W: ObjectWriter<T> + 'scope,
91 F: 'scope,
92 T: Send + Sync + 'scope,
93 {
94 let (sender, receiver) = bounded(self.capacity);
95 let pb = meter_bar(self.capacity, &format!("{} buffer", self.name));
96
97 let rpb = pb.clone();
98 let thunk = self.thunk;
99 let h = scope.spawn(move || ferry(receiver, thunk()?, rpb));
100
101 ThreadObjectWriter {
102 meter: pb,
103 sender,
104 handle: WorkHandle::Scoped(h),
105 }
106 }
107
108 pub fn spawn<T>(self) -> ThreadObjectWriter<'static, T>
110 where
111 W: ObjectWriter<T> + 'static,
112 F: 'static,
113 T: Send + Sync + 'static,
114 {
115 let (sender, receiver) = bounded(self.capacity);
116 let pb = meter_bar(self.capacity, &format!("{} buffer", self.name));
117
118 let rpb = pb.clone();
119 let thunk = self.thunk;
120 let h = spawn(move || ferry(receiver, thunk()?, rpb));
121
122 ThreadObjectWriter {
123 meter: pb,
124 sender,
125 handle: WorkHandle::Static(h),
126 }
127 }
128}
129
130impl<'scope, T: Send + Sync + 'scope> ThreadObjectWriter<'scope, T> {
131 pub fn satellite<'a>(&'a self) -> ThreadWriterSatellite<'a, 'scope, T>
160 where
161 'scope: 'a,
162 {
163 ThreadWriterSatellite::create(self)
164 }
165}
166
167impl<'scope, T: Send + Sync + 'static> ObjectWriter<T> for ThreadObjectWriter<'scope, T> {
168 fn write_object(&mut self, object: T) -> Result<()> {
169 measure_and_send(&self.sender, object, &self.meter)?;
170 Ok(())
171 }
172
173 fn finish_objects(self) -> Result<usize> {
174 drop(self.sender);
176 let res = match self.handle {
178 WorkHandle::Static(h) => h.join().map_err(std::panic::resume_unwind)?,
179 WorkHandle::Scoped(h) => h.join().map_err(std::panic::resume_unwind)?,
180 };
181 res
182 }
183}
184
185#[derive(Clone)]
191pub struct ThreadWriterSatellite<'a, 'scope, T>
192where
193 T: Send + Sync + 'static,
194 'scope: 'a,
195{
196 delegate: &'a ThreadObjectWriter<'scope, T>,
197 sender: Sender<T>,
198}
199
200impl<'a, 'scope, T> ThreadWriterSatellite<'a, 'scope, T>
201where
202 T: Send + Sync + 'static,
203 'scope: 'a,
204{
205 fn create(delegate: &'a ThreadObjectWriter<'scope, T>) -> ThreadWriterSatellite<'a, 'scope, T> {
207 ThreadWriterSatellite {
208 delegate,
209 sender: delegate.sender.clone(),
210 }
211 }
212}
213
214impl<'a, 'scope, T> ObjectWriter<T> for ThreadWriterSatellite<'a, 'scope, T>
215where
216 T: Send + Sync + 'static,
217 'scope: 'a,
218{
219 fn write_object(&mut self, object: T) -> Result<()> {
220 measure_and_send(&self.sender, object, &self.delegate.meter)?;
221 Ok(())
222 }
223
224 fn finish_objects(self) -> Result<usize> {
225 Ok(0)
227 }
228}