bookdata/arrow/
writer.rs

1use std::borrow::Cow;
2use std::fs::{File, OpenOptions};
3use std::io::Write;
4use std::marker::PhantomData;
5use std::path::{Path, PathBuf};
6use std::sync::Arc;
7
8use anyhow::{anyhow, Result};
9use arrow::record_batch::RecordBatch;
10use log::*;
11use parquet::arrow::ArrowWriter;
12use parquet::basic::{Compression, ZstdLevel as PQZstdLevel};
13use parquet::file::properties::{WriterProperties, WriterPropertiesBuilder, WriterVersion};
14use parquet::file::writer::SerializedFileWriter;
15use parquet::record::RecordWriter;
16use parquet::schema::types::TypePtr;
17
18use crate::io::object::{ObjectWriter, ThreadObjectWriter, UnchunkWriter};
19
20const BATCH_SIZE: usize = 1024 * 1024;
21const ZSTD_LEVEL: i32 = 3;
22
23pub fn parquet_writer_defaults() -> WriterPropertiesBuilder {
24    WriterProperties::builder()
25        .set_compression(Compression::ZSTD(
26            PQZstdLevel::try_new(ZSTD_LEVEL).expect("invalid zstd level"),
27        ))
28        .set_writer_version(WriterVersion::PARQUET_2_0)
29        .set_dictionary_enabled(false)
30}
31
32/// Open an Arrow Parquet writer using BookData defaults.
33pub fn open_parquet_writer<P: AsRef<Path>>(
34    path: P,
35    schema: TypePtr,
36) -> Result<SerializedFileWriter<File>> {
37    info!("creating Parquet file {:?}", path.as_ref());
38    let file = OpenOptions::new()
39        .create(true)
40        .truncate(true)
41        .write(true)
42        .open(path)?;
43    let props = parquet_writer_defaults().build();
44    let writer = SerializedFileWriter::new(file, schema, Arc::new(props))?;
45
46    Ok(writer)
47}
48
49/// Parquet table writer.
50///
51/// A table writer is an [ObjectWriter] for structs implementing [TableRow], that writes
52/// them out to a Parquet file.
53pub struct TableWriter<R>
54where
55    R: Send + Sync + 'static,
56    for<'a> &'a [R]: RecordWriter<R>,
57{
58    _phantom: PhantomData<R>,
59    writer: Option<UnchunkWriter<R, ThreadObjectWriter<'static, Vec<R>>>>,
60    out_path: Option<PathBuf>,
61    row_count: usize,
62}
63
64impl<R> TableWriter<R>
65where
66    R: Send + Sync + 'static,
67    for<'a> &'a [R]: RecordWriter<R>,
68{
69    /// Open a table writer for a path.
70    pub fn open<P: AsRef<Path>>(path: P) -> Result<Self> {
71        let path = path.as_ref();
72
73        // extract struct schema
74        let empty: [R; 0] = [];
75        let schema = (&empty as &[R]).schema()?;
76        debug!("{}: opening for schema {:?}", path.display(), schema);
77
78        let writer = open_parquet_writer(path, schema)?;
79        let writer = ThreadObjectWriter::wrap(writer)
80            .with_name(format!("write:{}", path.display()))
81            .with_capacity(4)
82            .spawn();
83        let writer = UnchunkWriter::with_size(writer, BATCH_SIZE);
84        let out_path = Some(path.to_path_buf());
85        Ok(TableWriter {
86            _phantom: PhantomData,
87            writer: Some(writer),
88            out_path,
89            row_count: 0,
90        })
91    }
92}
93
94impl<R> TableWriter<R>
95where
96    R: Send + Sync + 'static,
97    for<'a> &'a [R]: RecordWriter<R>,
98{
99    fn display_path(&self) -> Cow<'static, str> {
100        if let Some(p) = &self.out_path {
101            format!("{}", p.display()).into()
102        } else {
103            "<unknown>".into()
104        }
105    }
106}
107
108impl<R> ObjectWriter<R> for TableWriter<R>
109where
110    R: Send + Sync + 'static,
111    for<'a> &'a [R]: RecordWriter<R>,
112{
113    fn write_object(&mut self, row: R) -> Result<()> {
114        let w = self
115            .writer
116            .as_mut()
117            .ok_or_else(|| anyhow!("writer is closed"))?;
118
119        w.write_object(row)?;
120        self.row_count += 1;
121        Ok(())
122    }
123
124    fn finish_objects(mut self) -> Result<usize> {
125        if let Some(writer) = self.writer.take() {
126            info!("closing Parquet writer for {}", self.display_path());
127            writer.finish_objects()?;
128        } else {
129            warn!("{}: writer already closed", self.display_path());
130        }
131        Ok(self.row_count)
132    }
133}
134
135impl<R> Drop for TableWriter<R>
136where
137    R: Send + Sync + 'static,
138    for<'a> &'a [R]: RecordWriter<R>,
139{
140    fn drop(&mut self) {
141        // make sure we closed the writer
142        if self.writer.is_some() {
143            error!("{}: Parquet table writer not closed", self.display_path());
144        }
145    }
146}
147
148impl<W> ObjectWriter<RecordBatch> for ArrowWriter<W>
149where
150    W: Write + Send,
151{
152    fn write_object(&mut self, batch: RecordBatch) -> Result<()> {
153        self.write(&batch)?;
154        Ok(())
155    }
156
157    fn finish_objects(self) -> Result<usize> {
158        let meta = self.close()?;
159        Ok(meta.file_metadata().num_rows() as usize)
160    }
161}
162
163impl<W, R> ObjectWriter<Vec<R>> for SerializedFileWriter<W>
164where
165    W: Write + Send,
166    for<'a> &'a [R]: RecordWriter<R>,
167{
168    fn write_object(&mut self, object: Vec<R>) -> Result<()> {
169        let slice = object.as_slice();
170        let mut group = self.next_row_group()?;
171        slice.write_to_row_group(&mut group)?;
172        group.close()?;
173        Ok(())
174    }
175
176    fn finish_objects(self) -> Result<usize> {
177        self.close()?;
178        Ok(0)
179    }
180}