1use std::{
2 borrow::Cow,
3 thread::{spawn, JoinHandle, Scope, ScopedJoinHandle},
4};
5
6use anyhow::Result;
7use crossbeam::channel::{bounded, Receiver, Sender};
8use indicatif::ProgressBar;
9
10use crate::util::logging::{measure_and_recv, measure_and_send, meter_bar};
11
12use super::ObjectWriter;
13
14enum WorkHandle<'scope> {
15 Static(JoinHandle<Result<usize>>),
16 Scoped(ScopedJoinHandle<'scope, Result<usize>>),
17}
18
19fn ferry<T, W>(recv: Receiver<T>, writer: W, pb: ProgressBar) -> Result<usize>
20where
21 T: Send + Sync + 'static,
22 W: ObjectWriter<T>,
23{
24 let mut writer = writer; while let Some(obj) = measure_and_recv(&recv, &pb) {
27 writer.write_object(obj)?;
28 }
29
30 pb.finish_and_clear();
31 writer.finish()
32}
33
34pub struct ThreadObjectWriterBuilder<W, F: Send + FnOnce() -> Result<W>> {
35 thunk: F,
36 name: String,
37 capacity: usize,
38}
39
40pub struct ThreadObjectWriter<'scope, T>
42where
43 T: Send + Sync + 'static,
44{
45 sender: Sender<T>,
46 handle: WorkHandle<'scope>,
47 meter: ProgressBar,
48}
49
50impl<'scope, T> ThreadObjectWriter<'scope, T>
51where
52 T: Send + Sync + 'scope,
53{
54 pub fn wrap<W>(writer: W) -> ThreadObjectWriterBuilder<W, impl Send + FnOnce() -> Result<W>>
55 where
56 W: ObjectWriter<T> + Send + Sync + 'scope,
57 {
58 ThreadObjectWriterBuilder {
59 thunk: move || Ok(writer),
60 name: "unnamed".into(),
61 capacity: 100,
62 }
63 }
64
65 pub fn bg_open<W, F>(thunk: F) -> ThreadObjectWriterBuilder<W, F>
66 where
67 W: ObjectWriter<T> + 'scope,
68 F: Send + FnOnce() -> Result<W>,
69 {
70 ThreadObjectWriterBuilder {
71 thunk,
72 name: "unnamed".into(),
73 capacity: 100,
74 }
75 }
76}
77
78impl<W, F: Send + FnOnce() -> Result<W>> ThreadObjectWriterBuilder<W, F> {
79 pub fn with_capacity(self, cap: usize) -> Self {
81 ThreadObjectWriterBuilder {
82 capacity: cap,
83 ..self
84 }
85 }
86
87 pub fn with_name<S: Into<Cow<'static, str>>>(self, name: S) -> Self {
89 let name: Cow<'static, str> = name.into();
90 ThreadObjectWriterBuilder {
91 name: name.to_string(),
92 ..self
93 }
94 }
95
96 pub fn spawn_scoped<'scope, 'env, T>(
98 self,
99 scope: &'scope Scope<'scope, 'env>,
100 ) -> ThreadObjectWriter<'scope, T>
101 where
102 W: ObjectWriter<T> + 'scope,
103 F: 'scope,
104 T: Send + Sync + 'scope,
105 {
106 let (sender, receiver) = bounded(self.capacity);
107 let pb = meter_bar(self.capacity, &format!("{} buffer", self.name));
108
109 let rpb = pb.clone();
110 let thunk = self.thunk;
111 let h = scope.spawn(move || ferry(receiver, thunk()?, rpb));
112
113 ThreadObjectWriter {
114 meter: pb,
115 sender,
116 handle: WorkHandle::Scoped(h),
117 }
118 }
119
120 pub fn spawn<T>(self) -> ThreadObjectWriter<'static, T>
122 where
123 W: ObjectWriter<T> + 'static,
124 F: 'static,
125 T: Send + Sync + 'static,
126 {
127 let (sender, receiver) = bounded(self.capacity);
128 let pb = meter_bar(self.capacity, &format!("{} buffer", self.name));
129
130 let rpb = pb.clone();
131 let thunk = self.thunk;
132 let h = spawn(move || ferry(receiver, thunk()?, rpb));
133
134 ThreadObjectWriter {
135 meter: pb,
136 sender,
137 handle: WorkHandle::Static(h),
138 }
139 }
140}
141
142impl<'scope, T: Send + Sync + 'scope> ThreadObjectWriter<'scope, T> {
143 pub fn satellite<'a>(&'a self) -> ThreadWriterSatellite<'a, 'scope, T>
172 where
173 'scope: 'a,
174 {
175 ThreadWriterSatellite::create(self)
176 }
177}
178
179impl<'scope, T: Send + Sync + 'static> ObjectWriter<T> for ThreadObjectWriter<'scope, T> {
180 fn write_object(&mut self, object: T) -> Result<()> {
181 measure_and_send(&self.sender, object, &self.meter)?;
182 Ok(())
183 }
184
185 fn finish(self) -> Result<usize> {
186 drop(self.sender);
188 let res = match self.handle {
190 WorkHandle::Static(h) => h.join().map_err(std::panic::resume_unwind)?,
191 WorkHandle::Scoped(h) => h.join().map_err(std::panic::resume_unwind)?,
192 };
193 res
194 }
195}
196
197#[derive(Clone)]
203pub struct ThreadWriterSatellite<'a, 'scope, T>
204where
205 T: Send + Sync + 'static,
206 'scope: 'a,
207{
208 delegate: &'a ThreadObjectWriter<'scope, T>,
209 sender: Sender<T>,
210}
211
212impl<'a, 'scope, T> ThreadWriterSatellite<'a, 'scope, T>
213where
214 T: Send + Sync + 'static,
215 'scope: 'a,
216{
217 fn create(delegate: &'a ThreadObjectWriter<'scope, T>) -> ThreadWriterSatellite<'a, 'scope, T> {
219 ThreadWriterSatellite {
220 delegate,
221 sender: delegate.sender.clone(),
222 }
223 }
224}
225
226impl<'a, 'scope, T> ObjectWriter<T> for ThreadWriterSatellite<'a, 'scope, T>
227where
228 T: Send + Sync + 'static,
229 'scope: 'a,
230{
231 fn write_object(&mut self, object: T) -> Result<()> {
232 measure_and_send(&self.sender, object, &self.delegate.meter)?;
233 Ok(())
234 }
235
236 fn finish(self) -> Result<usize> {
237 Ok(0)
239 }
240}