bookdata/io/
background.rs

1use std::io::prelude::*;
2use std::io::{self, copy};
3use std::mem::drop;
4use std::thread::{spawn, JoinHandle};
5
6use anyhow::Result;
7use friendly::bytes;
8use log::*;
9use os_pipe::{pipe, PipeReader, PipeWriter};
10
11use crate::util::timing::Timer;
12
13/// Reader that reads from a background thread.
14pub struct ThreadRead {
15    read: PipeReader,
16    handle: Option<JoinHandle<io::Result<u64>>>,
17}
18
19/// Writer that writes to a background thread.
20pub struct ThreadWrite {
21    write: Option<PipeWriter>,
22    handle: Option<JoinHandle<io::Result<u64>>>,
23}
24
25impl ThreadRead {
26    /// Push a read into a background thread.
27    pub fn new<R: Read + Send + 'static>(chan: R) -> Result<ThreadRead> {
28        let (read, writer) = pipe()?;
29
30        let jh = spawn(move || {
31            let mut src = chan;
32            let mut dst = writer;
33            let timer = Timer::new();
34            let res = copy(&mut src, &mut dst);
35            if let Ok(size) = res {
36                info!("copied {} in {}", bytes(size), timer);
37            }
38            res
39        });
40
41        Ok(ThreadRead {
42            read,
43            handle: Some(jh),
44        })
45    }
46}
47
48impl Read for ThreadRead {
49    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
50        let size = self.read.read(buf)?;
51        if size == 0 {
52            // eof - make sure thread completed successfully
53            // this makes it so thread failure results in an error
54            // on the last call to `read`, instead of a panic when
55            // the reader is dropped.
56            if let Some(h) = self.handle.take() {
57                let res = h.join().expect("thread error");
58                let sz = res?;
59                debug!("thread copied {} bytes", sz);
60            }
61        }
62        Ok(size)
63    }
64}
65
66impl ThreadWrite {
67    /// Push a read into a background thread.
68    pub fn new<W: Write + Send + 'static>(chan: W) -> Result<ThreadWrite> {
69        let (read, writer) = pipe()?;
70
71        let jh = spawn(move || {
72            let mut src = read;
73            let mut dst = chan;
74            copy(&mut src, &mut dst)
75        });
76
77        Ok(ThreadWrite {
78            write: Some(writer),
79            handle: Some(jh),
80        })
81    }
82
83    fn do_close(&mut self) -> io::Result<u64> {
84        if let Some(mut write) = self.write.take() {
85            write.flush()?;
86            drop(write);
87        }
88        if let Some(h) = self.handle.take() {
89            let res = h.join().expect("thread error");
90            let sz = res?;
91            debug!("thread copied {} bytes", sz);
92            Ok(sz)
93        } else {
94            Err(io::ErrorKind::BrokenPipe.into())
95        }
96    }
97
98    /// Close and drop the thread writer.
99    #[allow(dead_code)]
100    pub fn close(mut self) -> io::Result<u64> {
101        self.do_close()
102    }
103}
104
105impl Drop for ThreadWrite {
106    fn drop(&mut self) {
107        if self.write.is_some() || self.handle.is_some() {
108            self.do_close().expect("unclosed background thread failed");
109        }
110    }
111}
112
113impl Write for ThreadWrite {
114    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
115        if let Some(ref mut write) = self.write {
116            write.write(buf)
117        } else {
118            Err(io::ErrorKind::BrokenPipe.into())
119        }
120    }
121
122    /// Flush the thread writer. This is not reliable.
123    fn flush(&mut self) -> io::Result<()> {
124        if let Some(ref mut write) = self.write {
125            write.flush()
126        } else {
127            Err(io::ErrorKind::BrokenPipe.into())
128        }
129    }
130}