bookdata/io/object/
thread.rs

1use std::{
2    borrow::Cow,
3    thread::{spawn, JoinHandle, Scope, ScopedJoinHandle},
4};
5
6use anyhow::Result;
7use crossbeam::channel::{bounded, Receiver, Sender};
8use indicatif::ProgressBar;
9
10use crate::util::logging::{measure_and_recv, measure_and_send, meter_bar};
11
12use super::ObjectWriter;
13
14enum WorkHandle<'scope> {
15    Static(JoinHandle<Result<usize>>),
16    Scoped(ScopedJoinHandle<'scope, Result<usize>>),
17}
18
19fn ferry<T, W>(recv: Receiver<T>, writer: W, pb: ProgressBar) -> Result<usize>
20where
21    T: Send + Sync + 'static,
22    W: ObjectWriter<T>,
23{
24    let mut writer = writer; // move writer into thread
25
26    while let Some(obj) = measure_and_recv(&recv, &pb) {
27        writer.write_object(obj)?;
28    }
29
30    pb.finish_and_clear();
31    writer.finish()
32}
33
34pub struct ThreadObjectWriterBuilder<W, F: Send + FnOnce() -> Result<W>> {
35    thunk: F,
36    name: String,
37    capacity: usize,
38}
39
40/// Write objects in a background thread.
41pub struct ThreadObjectWriter<'scope, T>
42where
43    T: Send + Sync + 'static,
44{
45    sender: Sender<T>,
46    handle: WorkHandle<'scope>,
47    meter: ProgressBar,
48}
49
50impl<'scope, T> ThreadObjectWriter<'scope, T>
51where
52    T: Send + Sync + 'scope,
53{
54    pub fn wrap<W>(writer: W) -> ThreadObjectWriterBuilder<W, impl Send + FnOnce() -> Result<W>>
55    where
56        W: ObjectWriter<T> + Send + Sync + 'scope,
57    {
58        ThreadObjectWriterBuilder {
59            thunk: move || Ok(writer),
60            name: "unnamed".into(),
61            capacity: 100,
62        }
63    }
64
65    pub fn bg_open<W, F>(thunk: F) -> ThreadObjectWriterBuilder<W, F>
66    where
67        W: ObjectWriter<T> + 'scope,
68        F: Send + FnOnce() -> Result<W>,
69    {
70        ThreadObjectWriterBuilder {
71            thunk,
72            name: "unnamed".into(),
73            capacity: 100,
74        }
75    }
76}
77
78impl<W, F: Send + FnOnce() -> Result<W>> ThreadObjectWriterBuilder<W, F> {
79    /// Set the channel capacity for this thread writer.  Defaults to 100.
80    pub fn with_capacity(self, cap: usize) -> Self {
81        ThreadObjectWriterBuilder {
82            capacity: cap,
83            ..self
84        }
85    }
86
87    /// Set a name for this thread writer for debugging.
88    pub fn with_name<S: Into<Cow<'static, str>>>(self, name: S) -> Self {
89        let name: Cow<'static, str> = name.into();
90        ThreadObjectWriterBuilder {
91            name: name.to_string(),
92            ..self
93        }
94    }
95
96    /// Spawn the thread writer.
97    pub fn spawn_scoped<'scope, 'env, T>(
98        self,
99        scope: &'scope Scope<'scope, 'env>,
100    ) -> ThreadObjectWriter<'scope, T>
101    where
102        W: ObjectWriter<T> + 'scope,
103        F: 'scope,
104        T: Send + Sync + 'scope,
105    {
106        let (sender, receiver) = bounded(self.capacity);
107        let pb = meter_bar(self.capacity, &format!("{} buffer", self.name));
108
109        let rpb = pb.clone();
110        let thunk = self.thunk;
111        let h = scope.spawn(move || ferry(receiver, thunk()?, rpb));
112
113        ThreadObjectWriter {
114            meter: pb,
115            sender,
116            handle: WorkHandle::Scoped(h),
117        }
118    }
119
120    /// Spawn the thread writer.
121    pub fn spawn<T>(self) -> ThreadObjectWriter<'static, T>
122    where
123        W: ObjectWriter<T> + 'static,
124        F: 'static,
125        T: Send + Sync + 'static,
126    {
127        let (sender, receiver) = bounded(self.capacity);
128        let pb = meter_bar(self.capacity, &format!("{} buffer", self.name));
129
130        let rpb = pb.clone();
131        let thunk = self.thunk;
132        let h = spawn(move || ferry(receiver, thunk()?, rpb));
133
134        ThreadObjectWriter {
135            meter: pb,
136            sender,
137            handle: WorkHandle::Static(h),
138        }
139    }
140}
141
142impl<'scope, T: Send + Sync + 'scope> ThreadObjectWriter<'scope, T> {
143    /// Create a satellite writer that writes to the same backend as this
144    /// writer.
145    ///
146    /// Satellites can be used to enable multiple data-generating threads to
147    /// write to the same thread writer, turning it into a multi-producer,
148    /// single-consumer writing pipeline.  Satellite writers should be finished,
149    /// and closing them does not finish the original thread writer (it still
150    /// needs to have [ObjectWriter::finish] called, typically after all
151    /// satellites are done, but it calling [ObjectWriter::finish] while
152    /// satellites are still active will wait until the satellites have finished
153    /// and closed their connections to the consumer thread).
154    ///
155    /// Satellites hold a reference to the original thread writer, to discourage
156    /// keeping them alive after the thread writer has been finished.  They
157    /// work best with [std::thread::scope]:
158    ///
159    /// ```rust,ignore
160    /// let writer = ThreadWriter::new(writer);
161    /// scope(|s| {
162    ///     for i in 0..NTHREADS {
163    ///         let out = writer.satellite();
164    ///         s.spawn(move || {
165    ///             // process and write to out
166    ///             out.finish().expect("closing writer failed");
167    ///         })
168    ///     }
169    /// })
170    /// ```
171    pub fn satellite<'a>(&'a self) -> ThreadWriterSatellite<'a, 'scope, T>
172    where
173        'scope: 'a,
174    {
175        ThreadWriterSatellite::create(self)
176    }
177}
178
179impl<'scope, T: Send + Sync + 'static> ObjectWriter<T> for ThreadObjectWriter<'scope, T> {
180    fn write_object(&mut self, object: T) -> Result<()> {
181        measure_and_send(&self.sender, object, &self.meter)?;
182        Ok(())
183    }
184
185    fn finish(self) -> Result<usize> {
186        // dropping the sender will cause the consumer to hang up.
187        drop(self.sender);
188        // wait for the consumer to finish before we consider the writer closed.
189        let res = match self.handle {
190            WorkHandle::Static(h) => h.join().map_err(std::panic::resume_unwind)?,
191            WorkHandle::Scoped(h) => h.join().map_err(std::panic::resume_unwind)?,
192        };
193        res
194    }
195}
196
197/// Child writer to allow objects to be written to a threaded writer from multiple
198/// threads, allowing parallel feeding of data.  It takes a **reference** to the
199/// original thread writer, so it needs to be used with [std::thread::scope]. The
200/// original writer still needs to be closed, and this design ensures that the
201/// original cannot be closed until all the children are finished.
202#[derive(Clone)]
203pub struct ThreadWriterSatellite<'a, 'scope, T>
204where
205    T: Send + Sync + 'static,
206    'scope: 'a,
207{
208    delegate: &'a ThreadObjectWriter<'scope, T>,
209    sender: Sender<T>,
210}
211
212impl<'a, 'scope, T> ThreadWriterSatellite<'a, 'scope, T>
213where
214    T: Send + Sync + 'static,
215    'scope: 'a,
216{
217    /// Create a new thread child writer.
218    fn create(delegate: &'a ThreadObjectWriter<'scope, T>) -> ThreadWriterSatellite<'a, 'scope, T> {
219        ThreadWriterSatellite {
220            delegate,
221            sender: delegate.sender.clone(),
222        }
223    }
224}
225
226impl<'a, 'scope, T> ObjectWriter<T> for ThreadWriterSatellite<'a, 'scope, T>
227where
228    T: Send + Sync + 'static,
229    'scope: 'a,
230{
231    fn write_object(&mut self, object: T) -> Result<()> {
232        measure_and_send(&self.sender, object, &self.delegate.meter)?;
233        Ok(())
234    }
235
236    fn finish(self) -> Result<usize> {
237        // do nothing, dropping the writer will close the sender
238        Ok(0)
239    }
240}