bookdata/arrow/
reader.rs

1//! Support for streaming objects from a Parquet file.
2use std::fs::File;
3use std::path::Path;
4use std::thread::spawn;
5
6use anyhow::Result;
7use crossbeam::channel::{bounded, Receiver, Sender};
8use indicatif::ProgressBar;
9use log::*;
10use parquet::file::reader::{ChunkReader, FileReader};
11use parquet::file::serialized_reader::SerializedFileReader;
12use parquet::record::RecordReader;
13use polars::prelude::*;
14
15use crate::util::logging::{item_progress, measure_and_send, meter_bar};
16
17/// Scan a Parquet file into a data frame.
18pub fn scan_df_parquet<P: AsRef<Path>>(file: P) -> Result<LazyFrame> {
19    let file = file.as_ref();
20    debug!("scanning file {}", file.display());
21    let df = LazyFrame::scan_parquet(file, ScanArgsParquet::default())?;
22    debug!("{}: schema {:?}", file.display(), df.schema()?);
23    Ok(df)
24}
25
26/// Iterator over deserialized records from a Parquet file.
27pub struct RecordIter<R>
28where
29    R: Send + Sync + 'static,
30    Vec<R>: RecordReader<R>,
31{
32    remaining: usize,
33    channel: Receiver<Result<Vec<R>>>,
34    batch: Option<std::vec::IntoIter<R>>,
35}
36
37impl<R> RecordIter<R>
38where
39    R: Send + Sync + 'static,
40    Vec<R>: RecordReader<R>,
41{
42    pub fn remaining(&self) -> usize {
43        self.remaining
44    }
45}
46
47/// Scan a Parquet file in a background thread and deserialize records.
48pub fn scan_parquet_file<R, P>(path: P) -> Result<RecordIter<R>>
49where
50    P: AsRef<Path>,
51    R: Send + Sync + 'static,
52    Vec<R>: RecordReader<R>,
53{
54    let path = path.as_ref();
55    let reader = File::open(&path)?;
56    let reader = SerializedFileReader::new(reader)?;
57    let meta = reader.metadata().file_metadata();
58    let row_count = meta.num_rows();
59
60    info!(
61        "scanning {} with {} rows",
62        path.display(),
63        friendly::scalar(row_count)
64    );
65    let pb = item_progress(row_count, &format!("{}", path.display()));
66
67    // use a small bound since we're sending whole batches
68    let (send, receive) = bounded(5);
69    let p2 = path.to_path_buf();
70
71    spawn(move || {
72        let send = send;
73        if let Err(e) = scan_backend(reader, &send, &p2, pb) {
74            send.send(Err(e)).expect("failed to send error message");
75        }
76    });
77
78    debug!(
79        "{}: background thread spawned, returning iter",
80        path.display()
81    );
82    Ok(RecordIter {
83        remaining: row_count as usize,
84        channel: receive,
85        batch: None,
86    })
87}
88
89impl<R> Iterator for RecordIter<R>
90where
91    R: Send + Sync + 'static,
92    Vec<R>: RecordReader<R>,
93{
94    type Item = Result<R>;
95
96    fn next(&mut self) -> Option<Self::Item> {
97        loop {
98            // try to get another item from the current batch
99            let next = self.batch.as_mut().map(|i| i.next()).flatten();
100            if let Some(row) = next {
101                // we got something! return it
102                self.remaining -= 1;
103                return Some(Ok(row));
104            } else if let Ok(br) = self.channel.recv() {
105                // fetch a new batch and try again
106                match br {
107                    Ok(batch) => {
108                        self.batch = Some(batch.into_iter());
109                        // loop around and use the batch
110                    }
111                    Err(e) => {
112                        // error on the channel
113                        return Some(Err(e));
114                    }
115                }
116            } else {
117                // we're done
118                return None;
119            }
120        }
121    }
122}
123
124fn scan_backend<R, E>(
125    reader: SerializedFileReader<R>,
126    send: &Sender<Result<Vec<E>>>,
127    path: &Path,
128    pb: ProgressBar,
129) -> Result<()>
130where
131    R: ChunkReader + 'static,
132    E: Send + Sync + 'static,
133    Vec<E>: RecordReader<E>,
134{
135    debug!("{}: beginning batched read", path.display());
136    let meter = meter_bar(5, &format!("{} read buffer", path.display()));
137    let ngroups = reader.num_row_groups();
138    for gi in 0..ngroups {
139        let mut group = reader.get_row_group(gi)?;
140        let gmeta = group.metadata();
141        let glen = gmeta.num_rows();
142
143        trace!("{}: received batch of {} rows", path.display(), glen);
144        pb.inc(glen as u64);
145        let mut batch = Vec::with_capacity(glen as usize);
146        batch.read_from_row_group(group.as_mut(), glen as usize)?;
147        assert_eq!(batch.len(), glen as usize);
148        trace!("{}: decoded chunk, sending to consumer", path.display());
149        measure_and_send(&send, Ok(batch), &meter)?;
150    }
151    Ok(())
152}