bookdata/io/
background.rs1use std::io::prelude::*;
2use std::io::{self, copy};
3use std::mem::drop;
4use std::thread::{spawn, JoinHandle};
5
6use anyhow::Result;
7use friendly::bytes;
8use log::*;
9use os_pipe::{pipe, PipeReader, PipeWriter};
10
11use crate::util::timing::Timer;
12
13pub struct ThreadRead {
15 read: PipeReader,
16 handle: Option<JoinHandle<io::Result<u64>>>,
17}
18
19pub struct ThreadWrite {
21 write: Option<PipeWriter>,
22 handle: Option<JoinHandle<io::Result<u64>>>,
23}
24
25impl ThreadRead {
26 pub fn new<R: Read + Send + 'static>(chan: R) -> Result<ThreadRead> {
28 let (read, writer) = pipe()?;
29
30 let jh = spawn(move || {
31 let mut src = chan;
32 let mut dst = writer;
33 let timer = Timer::new();
34 let res = copy(&mut src, &mut dst);
35 if let Ok(size) = res {
36 info!("copied {} in {}", bytes(size), timer);
37 }
38 res
39 });
40
41 Ok(ThreadRead {
42 read,
43 handle: Some(jh),
44 })
45 }
46}
47
48impl Read for ThreadRead {
49 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
50 let size = self.read.read(buf)?;
51 if size == 0 {
52 if let Some(h) = self.handle.take() {
57 let res = h.join().expect("thread error");
58 let sz = res?;
59 debug!("thread copied {} bytes", sz);
60 }
61 }
62 Ok(size)
63 }
64}
65
66impl ThreadWrite {
67 pub fn new<W: Write + Send + 'static>(chan: W) -> Result<ThreadWrite> {
69 let (read, writer) = pipe()?;
70
71 let jh = spawn(move || {
72 let mut src = read;
73 let mut dst = chan;
74 copy(&mut src, &mut dst)
75 });
76
77 Ok(ThreadWrite {
78 write: Some(writer),
79 handle: Some(jh),
80 })
81 }
82
83 fn do_close(&mut self) -> io::Result<u64> {
84 if let Some(mut write) = self.write.take() {
85 write.flush()?;
86 drop(write);
87 }
88 if let Some(h) = self.handle.take() {
89 let res = h.join().expect("thread error");
90 let sz = res?;
91 debug!("thread copied {} bytes", sz);
92 Ok(sz)
93 } else {
94 Err(io::ErrorKind::BrokenPipe.into())
95 }
96 }
97
98 #[allow(dead_code)]
100 pub fn close(mut self) -> io::Result<u64> {
101 self.do_close()
102 }
103}
104
105impl Drop for ThreadWrite {
106 fn drop(&mut self) {
107 if self.write.is_some() || self.handle.is_some() {
108 self.do_close().expect("unclosed background thread failed");
109 }
110 }
111}
112
113impl Write for ThreadWrite {
114 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
115 if let Some(ref mut write) = self.write {
116 write.write(buf)
117 } else {
118 Err(io::ErrorKind::BrokenPipe.into())
119 }
120 }
121
122 fn flush(&mut self) -> io::Result<()> {
124 if let Some(ref mut write) = self.write {
125 write.flush()
126 } else {
127 Err(io::ErrorKind::BrokenPipe.into())
128 }
129 }
130}