1use std::fmt::Debug;
3use std::fs::File;
4use std::io::{stdout, Read, Seek, Write};
5use std::mem::drop;
6
7use polars_arrow::datatypes::Field;
8use serde::Serialize;
9
10use friendly::{bytes, scalar};
11use polars_parquet::read::schema::parquet_to_arrow_schema;
12use polars_parquet::read::*;
13
14use crate::prelude::*;
15
16use super::Command;
17
18#[derive(Args, Debug)]
20#[command(name = "collect-isbns")]
21pub struct PQInfo {
22 #[arg(long = "check-length")]
24 check_length: bool,
25
26 #[arg(short = 'o', long = "output")]
28 out_file: Option<PathBuf>,
29
30 #[arg(name = "FILE")]
32 source_file: PathBuf,
33}
34
35#[derive(Debug, Serialize)]
36struct InfoStruct {
37 row_count: usize,
38 file_size: u64,
39 fields: Vec<FieldStruct>,
40}
41
42#[derive(Debug, Serialize)]
43struct FieldStruct {
44 name: String,
45 data_type: String,
46 nullable: bool,
47}
48
49impl From<&Field> for FieldStruct {
50 fn from(f: &Field) -> Self {
51 FieldStruct {
52 name: f.name.clone(),
53 data_type: format!("{:?}", f.data_type),
54 nullable: f.is_nullable,
55 }
56 }
57}
58
59fn check_length<R: Read + Seek>(reader: &mut FileReader<R>, expected: usize) -> Result<()> {
60 let mut len = 0;
61 for chunk in reader {
62 let chunk = chunk?;
63 let clen = chunk.len();
64 let arrays = chunk.into_arrays();
65 let types = arrays.iter().map(|a| a.data_type()).collect::<Vec<_>>();
66 debug!("chunk length: {}", clen);
67 debug!("chunk types: {:?}", types);
68 len += clen;
69 }
70
71 if len != expected {
72 warn!("expected {} rows but decoded {}", expected, len);
73 }
74
75 Ok(())
76}
77
78impl Command for PQInfo {
79 fn exec(&self) -> Result<()> {
80 info!("reading {:?}", self.source_file);
81
82 let mut pqf = File::open(&self.source_file)?;
83 let fmeta = pqf.metadata()?;
84 info!("file size: {}", bytes(fmeta.len()));
85
86 let meta = read_metadata(&mut pqf)?;
87
88 info!("row count: {}", scalar(meta.num_rows));
89 info!("row groups: {}", meta.row_groups.len());
90 let rc2: usize = meta.row_groups.iter().map(|rg| rg.num_rows()).sum();
91 if rc2 != meta.num_rows {
92 warn!("row group total {} != file total {}", rc2, meta.num_rows);
93 }
94
95 info!("decoding schema");
96 let schema = meta.schema();
97 let fields = parquet_to_arrow_schema(schema.fields());
98
99 let out = stdout();
100 let mut ol = out.lock();
101 for field in &fields {
102 writeln!(&mut ol, "{:?}", field)?;
103 }
104 drop(ol);
105
106 if let Some(ref file) = self.out_file {
107 let mut out = File::create(file)?;
108 let info = InfoStruct {
109 row_count: meta.num_rows,
110 file_size: fmeta.len() as u64,
111 fields: fields.iter().map(|f| f.into()).collect(),
112 };
113
114 serde_json::to_writer_pretty(&mut out, &info)?;
115 }
116
117 if self.check_length {
118 let schema = infer_schema(&meta)?;
119 let mut reader = FileReader::new(pqf, meta.row_groups, schema, None, None, None);
120 check_length(&mut reader, meta.num_rows)?;
121 }
122
123 Ok(())
124 }
125}