bookdata/io/object/
thread.rs

1use std::{
2    borrow::Cow,
3    thread::{spawn, JoinHandle, Scope, ScopedJoinHandle},
4};
5
6use anyhow::Result;
7use crossbeam::channel::{bounded, Receiver, Sender};
8use indicatif::ProgressBar;
9
10use crate::util::logging::{measure_and_recv, measure_and_send, meter_bar};
11
12use super::ObjectWriter;
13
14enum WorkHandle<'scope> {
15    Static(JoinHandle<Result<usize>>),
16    Scoped(ScopedJoinHandle<'scope, Result<usize>>),
17}
18
19fn ferry<T, W>(recv: Receiver<T>, writer: W, pb: ProgressBar) -> Result<usize>
20where
21    T: Send + Sync + 'static,
22    W: ObjectWriter<T>,
23{
24    let mut writer = writer; // move writer into thread
25
26    while let Some(obj) = measure_and_recv(&recv, &pb) {
27        writer.write_object(obj)?;
28    }
29
30    pb.finish_and_clear();
31    writer.finish_objects()
32}
33
34pub struct ThreadObjectWriterBuilder<W, F: Send + FnOnce() -> Result<W>> {
35    thunk: F,
36    name: String,
37    capacity: usize,
38}
39
40/// Write objects in a background thread.
41pub struct ThreadObjectWriter<'scope, T>
42where
43    T: Send + Sync + 'static,
44{
45    sender: Sender<T>,
46    handle: WorkHandle<'scope>,
47    meter: ProgressBar,
48}
49
50impl<'scope, T> ThreadObjectWriter<'scope, T>
51where
52    T: Send + Sync + 'scope,
53{
54    pub fn wrap<W>(writer: W) -> ThreadObjectWriterBuilder<W, impl Send + FnOnce() -> Result<W>>
55    where
56        W: ObjectWriter<T> + Send + Sync + 'scope,
57    {
58        ThreadObjectWriterBuilder {
59            thunk: move || Ok(writer),
60            name: "unnamed".into(),
61            capacity: 100,
62        }
63    }
64}
65
66impl<W, F: Send + FnOnce() -> Result<W>> ThreadObjectWriterBuilder<W, F> {
67    /// Set the channel capacity for this thread writer.  Defaults to 100.
68    pub fn with_capacity(self, cap: usize) -> Self {
69        ThreadObjectWriterBuilder {
70            capacity: cap,
71            ..self
72        }
73    }
74
75    /// Set a name for this thread writer for debugging.
76    pub fn with_name<S: Into<Cow<'static, str>>>(self, name: S) -> Self {
77        let name: Cow<'static, str> = name.into();
78        ThreadObjectWriterBuilder {
79            name: name.to_string(),
80            ..self
81        }
82    }
83
84    /// Spawn the thread writer.
85    pub fn spawn_scoped<'scope, 'env, T>(
86        self,
87        scope: &'scope Scope<'scope, 'env>,
88    ) -> ThreadObjectWriter<'scope, T>
89    where
90        W: ObjectWriter<T> + 'scope,
91        F: 'scope,
92        T: Send + Sync + 'scope,
93    {
94        let (sender, receiver) = bounded(self.capacity);
95        let pb = meter_bar(self.capacity, &format!("{} buffer", self.name));
96
97        let rpb = pb.clone();
98        let thunk = self.thunk;
99        let h = scope.spawn(move || ferry(receiver, thunk()?, rpb));
100
101        ThreadObjectWriter {
102            meter: pb,
103            sender,
104            handle: WorkHandle::Scoped(h),
105        }
106    }
107
108    /// Spawn the thread writer.
109    pub fn spawn<T>(self) -> ThreadObjectWriter<'static, T>
110    where
111        W: ObjectWriter<T> + 'static,
112        F: 'static,
113        T: Send + Sync + 'static,
114    {
115        let (sender, receiver) = bounded(self.capacity);
116        let pb = meter_bar(self.capacity, &format!("{} buffer", self.name));
117
118        let rpb = pb.clone();
119        let thunk = self.thunk;
120        let h = spawn(move || ferry(receiver, thunk()?, rpb));
121
122        ThreadObjectWriter {
123            meter: pb,
124            sender,
125            handle: WorkHandle::Static(h),
126        }
127    }
128}
129
130impl<'scope, T: Send + Sync + 'scope> ThreadObjectWriter<'scope, T> {
131    /// Create a satellite writer that writes to the same backend as this
132    /// writer.
133    ///
134    /// Satellites can be used to enable multiple data-generating threads to
135    /// write to the same thread writer, turning it into a multi-producer,
136    /// single-consumer writing pipeline.  Satellite writers should be finished,
137    /// and closing them does not finish the original thread writer (it still
138    /// needs to have [ObjectWriter::finish] called, typically after all
139    /// satellites are done, but it calling [ObjectWriter::finish] while
140    /// satellites are still active will wait until the satellites have finished
141    /// and closed their connections to the consumer thread).
142    ///
143    /// Satellites hold a reference to the original thread writer, to discourage
144    /// keeping them alive after the thread writer has been finished.  They
145    /// work best with [std::thread::scope]:
146    ///
147    /// ```rust,ignore
148    /// let writer = ThreadWriter::new(writer);
149    /// scope(|s| {
150    ///     for i in 0..NTHREADS {
151    ///         let out = writer.satellite();
152    ///         s.spawn(move || {
153    ///             // process and write to out
154    ///             out.finish().expect("closing writer failed");
155    ///         })
156    ///     }
157    /// })
158    /// ```
159    pub fn satellite<'a>(&'a self) -> ThreadWriterSatellite<'a, 'scope, T>
160    where
161        'scope: 'a,
162    {
163        ThreadWriterSatellite::create(self)
164    }
165}
166
167impl<'scope, T: Send + Sync + 'static> ObjectWriter<T> for ThreadObjectWriter<'scope, T> {
168    fn write_object(&mut self, object: T) -> Result<()> {
169        measure_and_send(&self.sender, object, &self.meter)?;
170        Ok(())
171    }
172
173    fn finish_objects(self) -> Result<usize> {
174        // dropping the sender will cause the consumer to hang up.
175        drop(self.sender);
176        // wait for the consumer to finish before we consider the writer closed.
177        let res = match self.handle {
178            WorkHandle::Static(h) => h.join().map_err(std::panic::resume_unwind)?,
179            WorkHandle::Scoped(h) => h.join().map_err(std::panic::resume_unwind)?,
180        };
181        res
182    }
183}
184
185/// Child writer to allow objects to be written to a threaded writer from multiple
186/// threads, allowing parallel feeding of data.  It takes a **reference** to the
187/// original thread writer, so it needs to be used with [std::thread::scope]. The
188/// original writer still needs to be closed, and this design ensures that the
189/// original cannot be closed until all the children are finished.
190#[derive(Clone)]
191pub struct ThreadWriterSatellite<'a, 'scope, T>
192where
193    T: Send + Sync + 'static,
194    'scope: 'a,
195{
196    delegate: &'a ThreadObjectWriter<'scope, T>,
197    sender: Sender<T>,
198}
199
200impl<'a, 'scope, T> ThreadWriterSatellite<'a, 'scope, T>
201where
202    T: Send + Sync + 'static,
203    'scope: 'a,
204{
205    /// Create a new thread child writer.
206    fn create(delegate: &'a ThreadObjectWriter<'scope, T>) -> ThreadWriterSatellite<'a, 'scope, T> {
207        ThreadWriterSatellite {
208            delegate,
209            sender: delegate.sender.clone(),
210        }
211    }
212}
213
214impl<'a, 'scope, T> ObjectWriter<T> for ThreadWriterSatellite<'a, 'scope, T>
215where
216    T: Send + Sync + 'static,
217    'scope: 'a,
218{
219    fn write_object(&mut self, object: T) -> Result<()> {
220        measure_and_send(&self.sender, object, &self.delegate.meter)?;
221        Ok(())
222    }
223
224    fn finish_objects(self) -> Result<usize> {
225        // do nothing, dropping the writer will close the sender
226        Ok(0)
227    }
228}