bookdata/arrow/
reader.rs

1//! Support for streaming objects from a Parquet file.
2use std::fs::File;
3use std::path::Path;
4use std::thread::spawn;
5
6use anyhow::Result;
7use crossbeam::channel::{bounded, Receiver, Sender};
8use indicatif::ProgressBar;
9use log::*;
10use parquet::file::reader::{ChunkReader, FileReader};
11use parquet::file::serialized_reader::SerializedFileReader;
12use parquet::record::RecordReader;
13
14use crate::util::logging::{item_progress, measure_and_send, meter_bar};
15
16/// Iterator over deserialized records from a Parquet file.
17pub struct RecordIter<R>
18where
19    R: Send + Sync + 'static,
20    Vec<R>: RecordReader<R>,
21{
22    remaining: usize,
23    channel: Receiver<Result<Vec<R>>>,
24    batch: Option<std::vec::IntoIter<R>>,
25}
26
27impl<R> RecordIter<R>
28where
29    R: Send + Sync + 'static,
30    Vec<R>: RecordReader<R>,
31{
32    pub fn remaining(&self) -> usize {
33        self.remaining
34    }
35}
36
37/// Scan a Parquet file in a background thread and deserialize records.
38pub fn scan_parquet_file<R, P>(path: P) -> Result<RecordIter<R>>
39where
40    P: AsRef<Path>,
41    R: Send + Sync + 'static,
42    Vec<R>: RecordReader<R>,
43{
44    let path = path.as_ref();
45    let reader = File::open(&path)?;
46    let reader = SerializedFileReader::new(reader)?;
47    let meta = reader.metadata().file_metadata();
48    let row_count = meta.num_rows();
49
50    info!(
51        "scanning {} with {} rows",
52        path.display(),
53        friendly::scalar(row_count)
54    );
55    let pb = item_progress(row_count, &format!("{}", path.display()));
56
57    // use a small bound since we're sending whole batches
58    let (send, receive) = bounded(5);
59    let p2 = path.to_path_buf();
60
61    spawn(move || {
62        let send = send;
63        if let Err(e) = scan_backend(reader, &send, &p2, pb) {
64            send.send(Err(e)).expect("failed to send error message");
65        }
66    });
67
68    debug!(
69        "{}: background thread spawned, returning iter",
70        path.display()
71    );
72    Ok(RecordIter {
73        remaining: row_count as usize,
74        channel: receive,
75        batch: None,
76    })
77}
78
79impl<R> Iterator for RecordIter<R>
80where
81    R: Send + Sync + 'static,
82    Vec<R>: RecordReader<R>,
83{
84    type Item = Result<R>;
85
86    fn next(&mut self) -> Option<Self::Item> {
87        loop {
88            // try to get another item from the current batch
89            let next = self.batch.as_mut().map(|i| i.next()).flatten();
90            if let Some(row) = next {
91                // we got something! return it
92                self.remaining -= 1;
93                return Some(Ok(row));
94            } else if let Ok(br) = self.channel.recv() {
95                // fetch a new batch and try again
96                match br {
97                    Ok(batch) => {
98                        self.batch = Some(batch.into_iter());
99                        // loop around and use the batch
100                    }
101                    Err(e) => {
102                        // error on the channel
103                        return Some(Err(e));
104                    }
105                }
106            } else {
107                // we're done
108                return None;
109            }
110        }
111    }
112}
113
114fn scan_backend<R, E>(
115    reader: SerializedFileReader<R>,
116    send: &Sender<Result<Vec<E>>>,
117    path: &Path,
118    pb: ProgressBar,
119) -> Result<()>
120where
121    R: ChunkReader + 'static,
122    E: Send + Sync + 'static,
123    Vec<E>: RecordReader<E>,
124{
125    debug!("{}: beginning batched read", path.display());
126    let meter = meter_bar(5, &format!("{} read buffer", path.display()));
127    let ngroups = reader.num_row_groups();
128    for gi in 0..ngroups {
129        let mut group = reader.get_row_group(gi)?;
130        let gmeta = group.metadata();
131        let glen = gmeta.num_rows();
132
133        trace!("{}: received batch of {} rows", path.display(), glen);
134        pb.inc(glen as u64);
135        let mut batch = Vec::with_capacity(glen as usize);
136        batch.read_from_row_group(group.as_mut(), glen as usize)?;
137        assert_eq!(batch.len(), glen as usize);
138        trace!("{}: decoded chunk, sending to consumer", path.display());
139        measure_and_send(&send, Ok(batch), &meter)?;
140    }
141    Ok(())
142}