1use std::error::Error;
2use std::io::prelude::*;
3use std::io::Lines;
4use std::marker::PhantomData;
5use std::path::Path;
6use std::str::FromStr;
7
8use anyhow::Result;
9use indicatif::ProgressBar;
10use log::*;
11use serde::de::DeserializeOwned;
12
13use super::compress::open_gzin_progress;
14use super::ObjectWriter;
15
16pub struct LineProcessor {
18 reader: Box<dyn BufRead>,
19}
20
21pub struct Records<R> {
22 lines: Lines<Box<dyn BufRead>>,
23 phantom: PhantomData<R>,
24}
25
26impl<R: FromStr> Iterator for Records<R>
27where
28 R::Err: 'static + Error + Send + Sync,
29{
30 type Item = Result<R>;
31
32 fn next(&mut self) -> Option<Self::Item> {
33 self.lines.next().map(|l| {
34 let line = l?;
35 let rec = line.parse()?;
36 Ok(rec)
37 })
38 }
39}
40
41pub struct JSONRecords<R> {
42 lines: Lines<Box<dyn BufRead>>,
43 phantom: PhantomData<R>,
44}
45
46impl<R: DeserializeOwned> Iterator for JSONRecords<R> {
47 type Item = Result<R>;
48
49 fn next(&mut self) -> Option<Self::Item> {
50 self.lines.next().map(|l| {
51 let line = l?;
52 let rec = serde_json::from_str(&line)?;
53 Ok(rec)
54 })
55 }
56}
57
58impl LineProcessor {
59 pub fn open_gzip(path: &Path, pb: ProgressBar) -> Result<LineProcessor> {
61 let read = open_gzin_progress(path, pb)?;
62 Ok(LineProcessor {
63 reader: Box::new(read),
64 })
65 }
66
67 #[allow(dead_code)]
69 pub fn lines(self) -> Lines<Box<dyn BufRead>> {
70 self.reader.lines()
71 }
72
73 pub fn records<R: FromStr>(self) -> Records<R> {
75 Records {
76 lines: self.reader.lines(),
77 phantom: PhantomData,
78 }
79 }
80
81 pub fn json_records<R: DeserializeOwned>(self) -> JSONRecords<R> {
83 JSONRecords {
84 lines: self.reader.lines(),
85 phantom: PhantomData,
86 }
87 }
88
89 pub fn process_json<W, R>(self, writer: &mut W) -> Result<usize>
95 where
96 R: DeserializeOwned,
97 W: ObjectWriter<R>,
98 {
99 let mut line_no = 0;
100 for line in self.json_records() {
101 line_no += 1;
102 let obj: R = line.map_err(|e| {
103 error!("error parsing line {}: {:?}", line_no, e);
104 e
105 })?;
106 writer.write_object(obj).map_err(|e| {
107 error!("error writing line {}: {:?}", line_no, e);
108 e
109 })?;
110 }
111
112 debug!("read {} lines", line_no);
113
114 Ok(line_no)
115 }
116}