bookdata/io/
lines.rs

1use std::error::Error;
2use std::io::prelude::*;
3use std::io::Lines;
4use std::marker::PhantomData;
5use std::path::Path;
6use std::str::FromStr;
7
8use anyhow::Result;
9use indicatif::ProgressBar;
10use log::*;
11use serde::de::DeserializeOwned;
12
13use super::compress::open_gzin_progress;
14use super::ObjectWriter;
15
16/// Read lines from a file with buffering, decompression, and parsing.
17pub struct LineProcessor {
18    reader: Box<dyn BufRead>,
19}
20
21pub struct Records<R> {
22    lines: Lines<Box<dyn BufRead>>,
23    phantom: PhantomData<R>,
24}
25
26impl<R: FromStr> Iterator for Records<R>
27where
28    R::Err: 'static + Error + Send + Sync,
29{
30    type Item = Result<R>;
31
32    fn next(&mut self) -> Option<Self::Item> {
33        self.lines.next().map(|l| {
34            let line = l?;
35            let rec = line.parse()?;
36            Ok(rec)
37        })
38    }
39}
40
41pub struct JSONRecords<R> {
42    lines: Lines<Box<dyn BufRead>>,
43    phantom: PhantomData<R>,
44}
45
46impl<R: DeserializeOwned> Iterator for JSONRecords<R> {
47    type Item = Result<R>;
48
49    fn next(&mut self) -> Option<Self::Item> {
50        self.lines.next().map(|l| {
51            let line = l?;
52            let rec = serde_json::from_str(&line)?;
53            Ok(rec)
54        })
55    }
56}
57
58impl LineProcessor {
59    /// Open a line processor from a gzipped source.
60    pub fn open_gzip(path: &Path, pb: ProgressBar) -> Result<LineProcessor> {
61        let read = open_gzin_progress(path, pb)?;
62        Ok(LineProcessor {
63            reader: Box::new(read),
64        })
65    }
66
67    /// Get the lines as strings.
68    #[allow(dead_code)]
69    pub fn lines(self) -> Lines<Box<dyn BufRead>> {
70        self.reader.lines()
71    }
72
73    /// Get the lines as records.
74    pub fn records<R: FromStr>(self) -> Records<R> {
75        Records {
76            lines: self.reader.lines(),
77            phantom: PhantomData,
78        }
79    }
80
81    /// Get the lines as JSON records.
82    pub fn json_records<R: DeserializeOwned>(self) -> JSONRecords<R> {
83        JSONRecords {
84            lines: self.reader.lines(),
85            phantom: PhantomData,
86        }
87    }
88
89    /// Process JSON rows into an object writer.
90    ///
91    /// This parses each line of the data set, deserializes it with JSON, and passes the resulting object
92    /// to the specified [ObjectWriter].  It produces useful error messages with line numbers when there
93    /// is a failure. It does **not** call [ObjectWriter::finish] when it is done - the caller needs to do that.
94    pub fn process_json<W, R>(self, writer: &mut W) -> Result<usize>
95    where
96        R: DeserializeOwned,
97        W: ObjectWriter<R>,
98    {
99        let mut line_no = 0;
100        for line in self.json_records() {
101            line_no += 1;
102            let obj: R = line.map_err(|e| {
103                error!("error parsing line {}: {:?}", line_no, e);
104                e
105            })?;
106            writer.write_object(obj).map_err(|e| {
107                error!("error writing line {}: {:?}", line_no, e);
108                e
109            })?;
110        }
111
112        debug!("read {} lines", line_no);
113
114        Ok(line_no)
115    }
116}