1use std::fs::File;
3use std::path::Path;
4use std::thread::spawn;
5
6use anyhow::Result;
7use crossbeam::channel::{bounded, Receiver, Sender};
8use indicatif::ProgressBar;
9use log::*;
10use parquet::file::reader::{ChunkReader, FileReader};
11use parquet::file::serialized_reader::SerializedFileReader;
12use parquet::record::RecordReader;
13use polars::prelude::*;
14
15use crate::util::logging::{item_progress, measure_and_send, meter_bar};
16
17pub fn scan_df_parquet<P: AsRef<Path>>(file: P) -> Result<LazyFrame> {
19 let file = file.as_ref();
20 debug!("scanning file {}", file.display());
21 let df = LazyFrame::scan_parquet(file, ScanArgsParquet::default())?;
22 debug!("{}: schema {:?}", file.display(), df.schema()?);
23 Ok(df)
24}
25
26pub struct RecordIter<R>
28where
29 R: Send + Sync + 'static,
30 Vec<R>: RecordReader<R>,
31{
32 remaining: usize,
33 channel: Receiver<Result<Vec<R>>>,
34 batch: Option<std::vec::IntoIter<R>>,
35}
36
37impl<R> RecordIter<R>
38where
39 R: Send + Sync + 'static,
40 Vec<R>: RecordReader<R>,
41{
42 pub fn remaining(&self) -> usize {
43 self.remaining
44 }
45}
46
47pub fn scan_parquet_file<R, P>(path: P) -> Result<RecordIter<R>>
49where
50 P: AsRef<Path>,
51 R: Send + Sync + 'static,
52 Vec<R>: RecordReader<R>,
53{
54 let path = path.as_ref();
55 let reader = File::open(&path)?;
56 let reader = SerializedFileReader::new(reader)?;
57 let meta = reader.metadata().file_metadata();
58 let row_count = meta.num_rows();
59
60 info!(
61 "scanning {} with {} rows",
62 path.display(),
63 friendly::scalar(row_count)
64 );
65 let pb = item_progress(row_count, &format!("{}", path.display()));
66
67 let (send, receive) = bounded(5);
69 let p2 = path.to_path_buf();
70
71 spawn(move || {
72 let send = send;
73 if let Err(e) = scan_backend(reader, &send, &p2, pb) {
74 send.send(Err(e)).expect("failed to send error message");
75 }
76 });
77
78 debug!(
79 "{}: background thread spawned, returning iter",
80 path.display()
81 );
82 Ok(RecordIter {
83 remaining: row_count as usize,
84 channel: receive,
85 batch: None,
86 })
87}
88
89impl<R> Iterator for RecordIter<R>
90where
91 R: Send + Sync + 'static,
92 Vec<R>: RecordReader<R>,
93{
94 type Item = Result<R>;
95
96 fn next(&mut self) -> Option<Self::Item> {
97 loop {
98 let next = self.batch.as_mut().map(|i| i.next()).flatten();
100 if let Some(row) = next {
101 self.remaining -= 1;
103 return Some(Ok(row));
104 } else if let Ok(br) = self.channel.recv() {
105 match br {
107 Ok(batch) => {
108 self.batch = Some(batch.into_iter());
109 }
111 Err(e) => {
112 return Some(Err(e));
114 }
115 }
116 } else {
117 return None;
119 }
120 }
121 }
122}
123
124fn scan_backend<R, E>(
125 reader: SerializedFileReader<R>,
126 send: &Sender<Result<Vec<E>>>,
127 path: &Path,
128 pb: ProgressBar,
129) -> Result<()>
130where
131 R: ChunkReader + 'static,
132 E: Send + Sync + 'static,
133 Vec<E>: RecordReader<E>,
134{
135 debug!("{}: beginning batched read", path.display());
136 let meter = meter_bar(5, &format!("{} read buffer", path.display()));
137 let ngroups = reader.num_row_groups();
138 for gi in 0..ngroups {
139 let mut group = reader.get_row_group(gi)?;
140 let gmeta = group.metadata();
141 let glen = gmeta.num_rows();
142
143 trace!("{}: received batch of {} rows", path.display(), glen);
144 pb.inc(glen as u64);
145 let mut batch = Vec::with_capacity(glen as usize);
146 batch.read_from_row_group(group.as_mut(), glen as usize)?;
147 assert_eq!(batch.len(), glen as usize);
148 trace!("{}: decoded chunk, sending to consumer", path.display());
149 measure_and_send(&send, Ok(batch), &meter)?;
150 }
151 Ok(())
152}