bookdata/cli/
pqinfo.rs

1//! Extract basic information from a Parquet file.
2use std::fmt::Debug;
3use std::fs::File;
4use std::io::{stdout, Read, Seek, Write};
5use std::mem::drop;
6
7use polars_arrow::datatypes::Field;
8use serde::Serialize;
9
10use friendly::{bytes, scalar};
11use polars_parquet::read::schema::parquet_to_arrow_schema;
12use polars_parquet::read::*;
13
14use crate::prelude::*;
15
16use super::Command;
17
18/// Extract basic information from a Parquet file.
19#[derive(Args, Debug)]
20#[command(name = "collect-isbns")]
21pub struct PQInfo {
22    /// Check the length by decoding the file.
23    #[arg(long = "check-length")]
24    check_length: bool,
25
26    /// Path to the output JSON file.
27    #[arg(short = 'o', long = "output")]
28    out_file: Option<PathBuf>,
29
30    /// Path to the Parquet file.
31    #[arg(name = "FILE")]
32    source_file: PathBuf,
33}
34
35#[derive(Debug, Serialize)]
36struct InfoStruct {
37    row_count: usize,
38    file_size: u64,
39    fields: Vec<FieldStruct>,
40}
41
42#[derive(Debug, Serialize)]
43struct FieldStruct {
44    name: String,
45    data_type: String,
46    nullable: bool,
47}
48
49impl From<&Field> for FieldStruct {
50    fn from(f: &Field) -> Self {
51        FieldStruct {
52            name: f.name.clone(),
53            data_type: format!("{:?}", f.data_type),
54            nullable: f.is_nullable,
55        }
56    }
57}
58
59fn check_length<R: Read + Seek>(reader: &mut FileReader<R>, expected: usize) -> Result<()> {
60    let mut len = 0;
61    for chunk in reader {
62        let chunk = chunk?;
63        let clen = chunk.len();
64        let arrays = chunk.into_arrays();
65        let types = arrays.iter().map(|a| a.data_type()).collect::<Vec<_>>();
66        debug!("chunk length: {}", clen);
67        debug!("chunk types: {:?}", types);
68        len += clen;
69    }
70
71    if len != expected {
72        warn!("expected {} rows but decoded {}", expected, len);
73    }
74
75    Ok(())
76}
77
78impl Command for PQInfo {
79    fn exec(&self) -> Result<()> {
80        info!("reading {:?}", self.source_file);
81
82        let mut pqf = File::open(&self.source_file)?;
83        let fmeta = pqf.metadata()?;
84        info!("file size: {}", bytes(fmeta.len()));
85
86        let meta = read_metadata(&mut pqf)?;
87
88        info!("row count: {}", scalar(meta.num_rows));
89        info!("row groups: {}", meta.row_groups.len());
90        let rc2: usize = meta.row_groups.iter().map(|rg| rg.num_rows()).sum();
91        if rc2 != meta.num_rows {
92            warn!("row group total {} != file total {}", rc2, meta.num_rows);
93        }
94
95        info!("decoding schema");
96        let schema = meta.schema();
97        let fields = parquet_to_arrow_schema(schema.fields());
98
99        let out = stdout();
100        let mut ol = out.lock();
101        for field in &fields {
102            writeln!(&mut ol, "{:?}", field)?;
103        }
104        drop(ol);
105
106        if let Some(ref file) = self.out_file {
107            let mut out = File::create(file)?;
108            let info = InfoStruct {
109                row_count: meta.num_rows,
110                file_size: fmeta.len() as u64,
111                fields: fields.iter().map(|f| f.into()).collect(),
112            };
113
114            serde_json::to_writer_pretty(&mut out, &info)?;
115        }
116
117        if self.check_length {
118            let schema = infer_schema(&meta)?;
119            let mut reader = FileReader::new(pqf, meta.row_groups, schema, None, None, None);
120            check_length(&mut reader, meta.num_rows)?;
121        }
122
123        Ok(())
124    }
125}