bookdata/arrow/
writer.rs

1use std::borrow::Cow;
2use std::fs::{File, OpenOptions};
3use std::io::Write;
4use std::marker::PhantomData;
5use std::path::{Path, PathBuf};
6use std::sync::Arc;
7
8use anyhow::{anyhow, Result};
9use arrow::record_batch::RecordBatch;
10use log::*;
11use parquet::arrow::ArrowWriter;
12use parquet::basic::{Compression, ZstdLevel as PQZstdLevel};
13use parquet::file::properties::{WriterProperties, WriterPropertiesBuilder, WriterVersion};
14use parquet::file::writer::SerializedFileWriter;
15use parquet::record::RecordWriter;
16use parquet::schema::types::TypePtr;
17use polars::io::parquet::{BatchedWriter, ZstdLevel};
18use polars::prelude::{ArrowSchema, DataFrame, ParquetCompression, ParquetWriter};
19use polars_arrow::array::Array as PArray;
20use polars_arrow::chunk::Chunk as PChunk;
21use polars_parquet::write::{
22    transverse, CompressionOptions, Encoding, FileWriter, RowGroupIterator, Version, WriteOptions,
23};
24
25use crate::arrow::nonnull_schema;
26use crate::io::object::{ObjectWriter, ThreadObjectWriter, UnchunkWriter};
27use crate::io::DataSink;
28use crate::util::logging::item_progress;
29
30const BATCH_SIZE: usize = 1024 * 1024;
31const ZSTD_LEVEL: i32 = 3;
32
33/// Open a Parquet writer using BookData defaults.
34fn open_plpq_writer<P: AsRef<Path>>(path: P, schema: ArrowSchema) -> Result<FileWriter<File>> {
35    let compression = CompressionOptions::Zstd(None);
36    let options = WriteOptions {
37        write_statistics: true,
38        version: Version::V2,
39        compression,
40        data_pagesize_limit: None,
41    };
42
43    info!("creating Parquet file {:?}", path.as_ref());
44    let file = OpenOptions::new()
45        .create(true)
46        .truncate(true)
47        .write(true)
48        .open(path)?;
49    let writer = FileWriter::try_new(file, schema, options)?;
50
51    Ok(writer)
52}
53
54/// Open a Polars data frame Parquet writer using BookData defaults.
55pub fn open_polars_writer<P: AsRef<Path>>(path: P) -> Result<ParquetWriter<File>> {
56    info!("creating Parquet file {:?}", path.as_ref());
57    let file = OpenOptions::new()
58        .create(true)
59        .truncate(true)
60        .write(true)
61        .open(path)?;
62    let writer = ParquetWriter::new(file)
63        .with_compression(ParquetCompression::Zstd(Some(ZstdLevel::try_new(
64            ZSTD_LEVEL,
65        )?)))
66        .with_row_group_size(Some(BATCH_SIZE));
67
68    Ok(writer)
69}
70
71pub fn parquet_writer_defaults() -> WriterPropertiesBuilder {
72    WriterProperties::builder()
73        .set_compression(Compression::ZSTD(
74            PQZstdLevel::try_new(ZSTD_LEVEL).expect("invalid zstd level"),
75        ))
76        .set_writer_version(WriterVersion::PARQUET_2_0)
77        .set_dictionary_enabled(false)
78}
79
80/// Open an Arrow Parquet writer using BookData defaults.
81pub fn open_parquet_writer<P: AsRef<Path>>(
82    path: P,
83    schema: TypePtr,
84) -> Result<SerializedFileWriter<File>> {
85    info!("creating Parquet file {:?}", path.as_ref());
86    let file = OpenOptions::new()
87        .create(true)
88        .truncate(true)
89        .write(true)
90        .open(path)?;
91    let props = parquet_writer_defaults().build();
92    let writer = SerializedFileWriter::new(file, schema, Arc::new(props))?;
93
94    Ok(writer)
95}
96
97/// Save a data frame to a Parquet file.
98pub fn save_df_parquet<P: AsRef<Path>>(df: DataFrame, path: P) -> Result<()> {
99    let path = path.as_ref();
100    debug!("writing file {}", path.display());
101    debug!("{}: schema {:?}", path.display(), df.schema());
102    let mut df = df;
103    let writer = open_polars_writer(path)?;
104    let size = writer
105        .with_row_group_size(Some(BATCH_SIZE))
106        .finish(&mut df)?;
107    debug!("{}: wrote {}", path.display(), friendly::bytes(size));
108    Ok(())
109}
110
111/// Save a data frame to a Prquet file without nulls in the schema.
112pub fn save_df_parquet_nonnull<P: AsRef<Path>>(df: DataFrame, path: P) -> Result<()> {
113    let path = path.as_ref();
114    debug!("writing file {}", path.display());
115    debug!("{}: initial schema {:?}", path.display(), df.schema());
116    let schema = nonnull_schema(&df);
117    debug!("{}: nonnull schema {:?}", path.display(), schema);
118    let mut writer = open_plpq_writer(path, schema)?;
119    let pb = item_progress(df.n_chunks(), "writing chunks");
120    for chunk in df.iter_chunks(false) {
121        writer.write_object(chunk)?;
122        pb.tick();
123    }
124    let size = writer.end(None)?;
125    debug!(
126        "{}: wrote {} chunks in {}",
127        path.display(),
128        df.n_chunks(),
129        friendly::bytes(size)
130    );
131    Ok(())
132}
133
134/// Parquet table writer.
135///
136/// A table writer is an [ObjectWriter] for structs implementing [TableRow], that writes
137/// them out to a Parquet file.
138pub struct TableWriter<R>
139where
140    R: Send + Sync + 'static,
141    for<'a> &'a [R]: RecordWriter<R>,
142{
143    _phantom: PhantomData<R>,
144    writer: Option<UnchunkWriter<R, ThreadObjectWriter<'static, Vec<R>>>>,
145    out_path: Option<PathBuf>,
146    row_count: usize,
147}
148
149impl<R> TableWriter<R>
150where
151    R: Send + Sync + 'static,
152    for<'a> &'a [R]: RecordWriter<R>,
153{
154    /// Open a table writer for a path.
155    pub fn open<P: AsRef<Path>>(path: P) -> Result<Self> {
156        let path = path.as_ref();
157
158        // extract struct schema
159        let empty: [R; 0] = [];
160        let schema = (&empty as &[R]).schema()?;
161        debug!("{}: opening for schema {:?}", path.display(), schema);
162
163        let writer = open_parquet_writer(path, schema)?;
164        let writer = ThreadObjectWriter::wrap(writer)
165            .with_name(format!("write:{}", path.display()))
166            .with_capacity(4)
167            .spawn();
168        let writer = UnchunkWriter::with_size(writer, BATCH_SIZE);
169        let out_path = Some(path.to_path_buf());
170        Ok(TableWriter {
171            _phantom: PhantomData,
172            writer: Some(writer),
173            out_path,
174            row_count: 0,
175        })
176    }
177}
178
179impl<R> TableWriter<R>
180where
181    R: Send + Sync + 'static,
182    for<'a> &'a [R]: RecordWriter<R>,
183{
184    fn display_path(&self) -> Cow<'static, str> {
185        if let Some(p) = &self.out_path {
186            format!("{}", p.display()).into()
187        } else {
188            "<unknown>".into()
189        }
190    }
191}
192
193impl<R> DataSink for TableWriter<R>
194where
195    R: Send + Sync + 'static,
196    for<'a> &'a [R]: RecordWriter<R>,
197{
198    fn output_files(&self) -> Vec<PathBuf> {
199        match &self.out_path {
200            None => Vec::new(),
201            Some(p) => vec![p.clone()],
202        }
203    }
204}
205
206impl<R> ObjectWriter<R> for TableWriter<R>
207where
208    R: Send + Sync + 'static,
209    for<'a> &'a [R]: RecordWriter<R>,
210{
211    fn write_object(&mut self, row: R) -> Result<()> {
212        let w = self
213            .writer
214            .as_mut()
215            .ok_or_else(|| anyhow!("writer is closed"))?;
216
217        w.write_object(row)?;
218        self.row_count += 1;
219        Ok(())
220    }
221
222    fn finish(mut self) -> Result<usize> {
223        if let Some(writer) = self.writer.take() {
224            info!("closing Parquet writer for {}", self.display_path());
225            writer.finish()?;
226        } else {
227            warn!("{}: writer already closed", self.display_path());
228        }
229        Ok(self.row_count)
230    }
231}
232
233impl<R> Drop for TableWriter<R>
234where
235    R: Send + Sync + 'static,
236    for<'a> &'a [R]: RecordWriter<R>,
237{
238    fn drop(&mut self) {
239        // make sure we closed the writer
240        if self.writer.is_some() {
241            error!("{}: Parquet table writer not closed", self.display_path());
242        }
243    }
244}
245
246/// Implementation of object writer for Polars Arrow writers
247impl<W> ObjectWriter<PChunk<Box<dyn PArray + 'static>>> for FileWriter<W>
248where
249    W: Write,
250{
251    fn write_object(&mut self, chunk: PChunk<Box<dyn PArray + 'static>>) -> Result<()> {
252        let schema = self.schema();
253        let encodings: Vec<_> = schema
254            .fields
255            .iter()
256            .map(|f| transverse(&f.data_type, |_| Encoding::Plain))
257            .collect();
258        let options = self.options();
259        let chunks = vec![Ok(chunk)];
260        let groups = RowGroupIterator::try_new(chunks.into_iter(), &schema, options, encodings)?;
261        for group in groups {
262            self.write(group?)?;
263        }
264        Ok(())
265    }
266
267    fn finish(mut self) -> Result<usize> {
268        self.end(None)?;
269        Ok(0)
270    }
271}
272
273impl<W> ObjectWriter<RecordBatch> for ArrowWriter<W>
274where
275    W: Write + Send,
276{
277    fn write_object(&mut self, batch: RecordBatch) -> Result<()> {
278        self.write(&batch)?;
279        Ok(())
280    }
281
282    fn finish(self) -> Result<usize> {
283        let meta = self.close()?;
284        Ok(meta.num_rows as usize)
285    }
286}
287
288impl<W, R> ObjectWriter<Vec<R>> for SerializedFileWriter<W>
289where
290    W: Write + Send,
291    for<'a> &'a [R]: RecordWriter<R>,
292{
293    fn write_object(&mut self, object: Vec<R>) -> Result<()> {
294        let slice = object.as_slice();
295        let mut group = self.next_row_group()?;
296        slice.write_to_row_group(&mut group)?;
297        group.close()?;
298        Ok(())
299    }
300
301    fn finish(self) -> Result<usize> {
302        self.close()?;
303        Ok(0)
304    }
305}
306
307/// Implementation of object writer for Polars Parquet batched writer
308impl<W> ObjectWriter<DataFrame> for BatchedWriter<W>
309where
310    W: Write,
311{
312    fn write_object(&mut self, df: DataFrame) -> Result<()> {
313        self.write_batch(&df)?;
314        Ok(())
315    }
316
317    fn finish(mut self) -> Result<usize> {
318        let size = BatchedWriter::finish(&mut self)?;
319        Ok(size as usize)
320    }
321}