1use std::borrow::Cow;
2use std::fs::{File, OpenOptions};
3use std::io::Write;
4use std::marker::PhantomData;
5use std::path::{Path, PathBuf};
6use std::sync::Arc;
7
8use anyhow::{anyhow, Result};
9use arrow::record_batch::RecordBatch;
10use log::*;
11use parquet::arrow::ArrowWriter;
12use parquet::basic::{Compression, ZstdLevel as PQZstdLevel};
13use parquet::file::properties::{WriterProperties, WriterPropertiesBuilder, WriterVersion};
14use parquet::file::writer::SerializedFileWriter;
15use parquet::record::RecordWriter;
16use parquet::schema::types::TypePtr;
17
18use crate::io::object::{ObjectWriter, ThreadObjectWriter, UnchunkWriter};
19
20const BATCH_SIZE: usize = 1024 * 1024;
21const ZSTD_LEVEL: i32 = 3;
22
23pub fn parquet_writer_defaults() -> WriterPropertiesBuilder {
24 WriterProperties::builder()
25 .set_compression(Compression::ZSTD(
26 PQZstdLevel::try_new(ZSTD_LEVEL).expect("invalid zstd level"),
27 ))
28 .set_writer_version(WriterVersion::PARQUET_2_0)
29 .set_dictionary_enabled(false)
30}
31
32pub fn open_parquet_writer<P: AsRef<Path>>(
34 path: P,
35 schema: TypePtr,
36) -> Result<SerializedFileWriter<File>> {
37 info!("creating Parquet file {:?}", path.as_ref());
38 let file = OpenOptions::new()
39 .create(true)
40 .truncate(true)
41 .write(true)
42 .open(path)?;
43 let props = parquet_writer_defaults().build();
44 let writer = SerializedFileWriter::new(file, schema, Arc::new(props))?;
45
46 Ok(writer)
47}
48
49pub struct TableWriter<R>
54where
55 R: Send + Sync + 'static,
56 for<'a> &'a [R]: RecordWriter<R>,
57{
58 _phantom: PhantomData<R>,
59 writer: Option<UnchunkWriter<R, ThreadObjectWriter<'static, Vec<R>>>>,
60 out_path: Option<PathBuf>,
61 row_count: usize,
62}
63
64impl<R> TableWriter<R>
65where
66 R: Send + Sync + 'static,
67 for<'a> &'a [R]: RecordWriter<R>,
68{
69 pub fn open<P: AsRef<Path>>(path: P) -> Result<Self> {
71 let path = path.as_ref();
72
73 let empty: [R; 0] = [];
75 let schema = (&empty as &[R]).schema()?;
76 debug!("{}: opening for schema {:?}", path.display(), schema);
77
78 let writer = open_parquet_writer(path, schema)?;
79 let writer = ThreadObjectWriter::wrap(writer)
80 .with_name(format!("write:{}", path.display()))
81 .with_capacity(4)
82 .spawn();
83 let writer = UnchunkWriter::with_size(writer, BATCH_SIZE);
84 let out_path = Some(path.to_path_buf());
85 Ok(TableWriter {
86 _phantom: PhantomData,
87 writer: Some(writer),
88 out_path,
89 row_count: 0,
90 })
91 }
92}
93
94impl<R> TableWriter<R>
95where
96 R: Send + Sync + 'static,
97 for<'a> &'a [R]: RecordWriter<R>,
98{
99 fn display_path(&self) -> Cow<'static, str> {
100 if let Some(p) = &self.out_path {
101 format!("{}", p.display()).into()
102 } else {
103 "<unknown>".into()
104 }
105 }
106}
107
108impl<R> ObjectWriter<R> for TableWriter<R>
109where
110 R: Send + Sync + 'static,
111 for<'a> &'a [R]: RecordWriter<R>,
112{
113 fn write_object(&mut self, row: R) -> Result<()> {
114 let w = self
115 .writer
116 .as_mut()
117 .ok_or_else(|| anyhow!("writer is closed"))?;
118
119 w.write_object(row)?;
120 self.row_count += 1;
121 Ok(())
122 }
123
124 fn finish_objects(mut self) -> Result<usize> {
125 if let Some(writer) = self.writer.take() {
126 info!("closing Parquet writer for {}", self.display_path());
127 writer.finish_objects()?;
128 } else {
129 warn!("{}: writer already closed", self.display_path());
130 }
131 Ok(self.row_count)
132 }
133}
134
135impl<R> Drop for TableWriter<R>
136where
137 R: Send + Sync + 'static,
138 for<'a> &'a [R]: RecordWriter<R>,
139{
140 fn drop(&mut self) {
141 if self.writer.is_some() {
143 error!("{}: Parquet table writer not closed", self.display_path());
144 }
145 }
146}
147
148impl<W> ObjectWriter<RecordBatch> for ArrowWriter<W>
149where
150 W: Write + Send,
151{
152 fn write_object(&mut self, batch: RecordBatch) -> Result<()> {
153 self.write(&batch)?;
154 Ok(())
155 }
156
157 fn finish_objects(self) -> Result<usize> {
158 let meta = self.close()?;
159 Ok(meta.file_metadata().num_rows() as usize)
160 }
161}
162
163impl<W, R> ObjectWriter<Vec<R>> for SerializedFileWriter<W>
164where
165 W: Write + Send,
166 for<'a> &'a [R]: RecordWriter<R>,
167{
168 fn write_object(&mut self, object: Vec<R>) -> Result<()> {
169 let slice = object.as_slice();
170 let mut group = self.next_row_group()?;
171 slice.write_to_row_group(&mut group)?;
172 group.close()?;
173 Ok(())
174 }
175
176 fn finish_objects(self) -> Result<usize> {
177 self.close()?;
178 Ok(0)
179 }
180}