1use std::fs::File;
3use std::path::Path;
4use std::thread::spawn;
5
6use anyhow::Result;
7use crossbeam::channel::{bounded, Receiver, Sender};
8use indicatif::ProgressBar;
9use log::*;
10use parquet::file::reader::{ChunkReader, FileReader};
11use parquet::file::serialized_reader::SerializedFileReader;
12use parquet::record::RecordReader;
13
14use crate::util::logging::{item_progress, measure_and_send, meter_bar};
15
16pub struct RecordIter<R>
18where
19 R: Send + Sync + 'static,
20 Vec<R>: RecordReader<R>,
21{
22 remaining: usize,
23 channel: Receiver<Result<Vec<R>>>,
24 batch: Option<std::vec::IntoIter<R>>,
25}
26
27impl<R> RecordIter<R>
28where
29 R: Send + Sync + 'static,
30 Vec<R>: RecordReader<R>,
31{
32 pub fn remaining(&self) -> usize {
33 self.remaining
34 }
35}
36
37pub fn scan_parquet_file<R, P>(path: P) -> Result<RecordIter<R>>
39where
40 P: AsRef<Path>,
41 R: Send + Sync + 'static,
42 Vec<R>: RecordReader<R>,
43{
44 let path = path.as_ref();
45 let reader = File::open(&path)?;
46 let reader = SerializedFileReader::new(reader)?;
47 let meta = reader.metadata().file_metadata();
48 let row_count = meta.num_rows();
49
50 info!(
51 "scanning {} with {} rows",
52 path.display(),
53 friendly::scalar(row_count)
54 );
55 let pb = item_progress(row_count, &format!("{}", path.display()));
56
57 let (send, receive) = bounded(5);
59 let p2 = path.to_path_buf();
60
61 spawn(move || {
62 let send = send;
63 if let Err(e) = scan_backend(reader, &send, &p2, pb) {
64 send.send(Err(e)).expect("failed to send error message");
65 }
66 });
67
68 debug!(
69 "{}: background thread spawned, returning iter",
70 path.display()
71 );
72 Ok(RecordIter {
73 remaining: row_count as usize,
74 channel: receive,
75 batch: None,
76 })
77}
78
79impl<R> Iterator for RecordIter<R>
80where
81 R: Send + Sync + 'static,
82 Vec<R>: RecordReader<R>,
83{
84 type Item = Result<R>;
85
86 fn next(&mut self) -> Option<Self::Item> {
87 loop {
88 let next = self.batch.as_mut().map(|i| i.next()).flatten();
90 if let Some(row) = next {
91 self.remaining -= 1;
93 return Some(Ok(row));
94 } else if let Ok(br) = self.channel.recv() {
95 match br {
97 Ok(batch) => {
98 self.batch = Some(batch.into_iter());
99 }
101 Err(e) => {
102 return Some(Err(e));
104 }
105 }
106 } else {
107 return None;
109 }
110 }
111 }
112}
113
114fn scan_backend<R, E>(
115 reader: SerializedFileReader<R>,
116 send: &Sender<Result<Vec<E>>>,
117 path: &Path,
118 pb: ProgressBar,
119) -> Result<()>
120where
121 R: ChunkReader + 'static,
122 E: Send + Sync + 'static,
123 Vec<E>: RecordReader<E>,
124{
125 debug!("{}: beginning batched read", path.display());
126 let meter = meter_bar(5, &format!("{} read buffer", path.display()));
127 let ngroups = reader.num_row_groups();
128 for gi in 0..ngroups {
129 let mut group = reader.get_row_group(gi)?;
130 let gmeta = group.metadata();
131 let glen = gmeta.num_rows();
132
133 trace!("{}: received batch of {} rows", path.display(), glen);
134 pb.inc(glen as u64);
135 let mut batch = Vec::with_capacity(glen as usize);
136 batch.read_from_row_group(group.as_mut(), glen as usize)?;
137 assert_eq!(batch.len(), glen as usize);
138 trace!("{}: decoded chunk, sending to consumer", path.display());
139 measure_and_send(&send, Ok(batch), &meter)?;
140 }
141 Ok(())
142}