1use std::borrow::Cow;
2use std::fs::{File, OpenOptions};
3use std::io::Write;
4use std::marker::PhantomData;
5use std::path::{Path, PathBuf};
6use std::sync::Arc;
7
8use anyhow::{anyhow, Result};
9use arrow::record_batch::RecordBatch;
10use log::*;
11use parquet::arrow::ArrowWriter;
12use parquet::basic::{Compression, ZstdLevel as PQZstdLevel};
13use parquet::file::properties::{WriterProperties, WriterPropertiesBuilder, WriterVersion};
14use parquet::file::writer::SerializedFileWriter;
15use parquet::record::RecordWriter;
16use parquet::schema::types::TypePtr;
17use polars::io::parquet::{BatchedWriter, ZstdLevel};
18use polars::prelude::{ArrowSchema, DataFrame, ParquetCompression, ParquetWriter};
19use polars_arrow::array::Array as PArray;
20use polars_arrow::chunk::Chunk as PChunk;
21use polars_parquet::write::{
22 transverse, CompressionOptions, Encoding, FileWriter, RowGroupIterator, Version, WriteOptions,
23};
24
25use crate::arrow::nonnull_schema;
26use crate::io::object::{ObjectWriter, ThreadObjectWriter, UnchunkWriter};
27use crate::io::DataSink;
28use crate::util::logging::item_progress;
29
30const BATCH_SIZE: usize = 1024 * 1024;
31const ZSTD_LEVEL: i32 = 3;
32
33fn open_plpq_writer<P: AsRef<Path>>(path: P, schema: ArrowSchema) -> Result<FileWriter<File>> {
35 let compression = CompressionOptions::Zstd(None);
36 let options = WriteOptions {
37 write_statistics: true,
38 version: Version::V2,
39 compression,
40 data_pagesize_limit: None,
41 };
42
43 info!("creating Parquet file {:?}", path.as_ref());
44 let file = OpenOptions::new()
45 .create(true)
46 .truncate(true)
47 .write(true)
48 .open(path)?;
49 let writer = FileWriter::try_new(file, schema, options)?;
50
51 Ok(writer)
52}
53
54pub fn open_polars_writer<P: AsRef<Path>>(path: P) -> Result<ParquetWriter<File>> {
56 info!("creating Parquet file {:?}", path.as_ref());
57 let file = OpenOptions::new()
58 .create(true)
59 .truncate(true)
60 .write(true)
61 .open(path)?;
62 let writer = ParquetWriter::new(file)
63 .with_compression(ParquetCompression::Zstd(Some(ZstdLevel::try_new(
64 ZSTD_LEVEL,
65 )?)))
66 .with_row_group_size(Some(BATCH_SIZE));
67
68 Ok(writer)
69}
70
71pub fn parquet_writer_defaults() -> WriterPropertiesBuilder {
72 WriterProperties::builder()
73 .set_compression(Compression::ZSTD(
74 PQZstdLevel::try_new(ZSTD_LEVEL).expect("invalid zstd level"),
75 ))
76 .set_writer_version(WriterVersion::PARQUET_2_0)
77 .set_dictionary_enabled(false)
78}
79
80pub fn open_parquet_writer<P: AsRef<Path>>(
82 path: P,
83 schema: TypePtr,
84) -> Result<SerializedFileWriter<File>> {
85 info!("creating Parquet file {:?}", path.as_ref());
86 let file = OpenOptions::new()
87 .create(true)
88 .truncate(true)
89 .write(true)
90 .open(path)?;
91 let props = parquet_writer_defaults().build();
92 let writer = SerializedFileWriter::new(file, schema, Arc::new(props))?;
93
94 Ok(writer)
95}
96
97pub fn save_df_parquet<P: AsRef<Path>>(df: DataFrame, path: P) -> Result<()> {
99 let path = path.as_ref();
100 debug!("writing file {}", path.display());
101 debug!("{}: schema {:?}", path.display(), df.schema());
102 let mut df = df;
103 let writer = open_polars_writer(path)?;
104 let size = writer
105 .with_row_group_size(Some(BATCH_SIZE))
106 .finish(&mut df)?;
107 debug!("{}: wrote {}", path.display(), friendly::bytes(size));
108 Ok(())
109}
110
111pub fn save_df_parquet_nonnull<P: AsRef<Path>>(df: DataFrame, path: P) -> Result<()> {
113 let path = path.as_ref();
114 debug!("writing file {}", path.display());
115 debug!("{}: initial schema {:?}", path.display(), df.schema());
116 let schema = nonnull_schema(&df);
117 debug!("{}: nonnull schema {:?}", path.display(), schema);
118 let mut writer = open_plpq_writer(path, schema)?;
119 let pb = item_progress(df.n_chunks(), "writing chunks");
120 for chunk in df.iter_chunks(false) {
121 writer.write_object(chunk)?;
122 pb.tick();
123 }
124 let size = writer.end(None)?;
125 debug!(
126 "{}: wrote {} chunks in {}",
127 path.display(),
128 df.n_chunks(),
129 friendly::bytes(size)
130 );
131 Ok(())
132}
133
134pub struct TableWriter<R>
139where
140 R: Send + Sync + 'static,
141 for<'a> &'a [R]: RecordWriter<R>,
142{
143 _phantom: PhantomData<R>,
144 writer: Option<UnchunkWriter<R, ThreadObjectWriter<'static, Vec<R>>>>,
145 out_path: Option<PathBuf>,
146 row_count: usize,
147}
148
149impl<R> TableWriter<R>
150where
151 R: Send + Sync + 'static,
152 for<'a> &'a [R]: RecordWriter<R>,
153{
154 pub fn open<P: AsRef<Path>>(path: P) -> Result<Self> {
156 let path = path.as_ref();
157
158 let empty: [R; 0] = [];
160 let schema = (&empty as &[R]).schema()?;
161 debug!("{}: opening for schema {:?}", path.display(), schema);
162
163 let writer = open_parquet_writer(path, schema)?;
164 let writer = ThreadObjectWriter::wrap(writer)
165 .with_name(format!("write:{}", path.display()))
166 .with_capacity(4)
167 .spawn();
168 let writer = UnchunkWriter::with_size(writer, BATCH_SIZE);
169 let out_path = Some(path.to_path_buf());
170 Ok(TableWriter {
171 _phantom: PhantomData,
172 writer: Some(writer),
173 out_path,
174 row_count: 0,
175 })
176 }
177}
178
179impl<R> TableWriter<R>
180where
181 R: Send + Sync + 'static,
182 for<'a> &'a [R]: RecordWriter<R>,
183{
184 fn display_path(&self) -> Cow<'static, str> {
185 if let Some(p) = &self.out_path {
186 format!("{}", p.display()).into()
187 } else {
188 "<unknown>".into()
189 }
190 }
191}
192
193impl<R> DataSink for TableWriter<R>
194where
195 R: Send + Sync + 'static,
196 for<'a> &'a [R]: RecordWriter<R>,
197{
198 fn output_files(&self) -> Vec<PathBuf> {
199 match &self.out_path {
200 None => Vec::new(),
201 Some(p) => vec![p.clone()],
202 }
203 }
204}
205
206impl<R> ObjectWriter<R> for TableWriter<R>
207where
208 R: Send + Sync + 'static,
209 for<'a> &'a [R]: RecordWriter<R>,
210{
211 fn write_object(&mut self, row: R) -> Result<()> {
212 let w = self
213 .writer
214 .as_mut()
215 .ok_or_else(|| anyhow!("writer is closed"))?;
216
217 w.write_object(row)?;
218 self.row_count += 1;
219 Ok(())
220 }
221
222 fn finish(mut self) -> Result<usize> {
223 if let Some(writer) = self.writer.take() {
224 info!("closing Parquet writer for {}", self.display_path());
225 writer.finish()?;
226 } else {
227 warn!("{}: writer already closed", self.display_path());
228 }
229 Ok(self.row_count)
230 }
231}
232
233impl<R> Drop for TableWriter<R>
234where
235 R: Send + Sync + 'static,
236 for<'a> &'a [R]: RecordWriter<R>,
237{
238 fn drop(&mut self) {
239 if self.writer.is_some() {
241 error!("{}: Parquet table writer not closed", self.display_path());
242 }
243 }
244}
245
246impl<W> ObjectWriter<PChunk<Box<dyn PArray + 'static>>> for FileWriter<W>
248where
249 W: Write,
250{
251 fn write_object(&mut self, chunk: PChunk<Box<dyn PArray + 'static>>) -> Result<()> {
252 let schema = self.schema();
253 let encodings: Vec<_> = schema
254 .fields
255 .iter()
256 .map(|f| transverse(&f.data_type, |_| Encoding::Plain))
257 .collect();
258 let options = self.options();
259 let chunks = vec![Ok(chunk)];
260 let groups = RowGroupIterator::try_new(chunks.into_iter(), &schema, options, encodings)?;
261 for group in groups {
262 self.write(group?)?;
263 }
264 Ok(())
265 }
266
267 fn finish(mut self) -> Result<usize> {
268 self.end(None)?;
269 Ok(0)
270 }
271}
272
273impl<W> ObjectWriter<RecordBatch> for ArrowWriter<W>
274where
275 W: Write + Send,
276{
277 fn write_object(&mut self, batch: RecordBatch) -> Result<()> {
278 self.write(&batch)?;
279 Ok(())
280 }
281
282 fn finish(self) -> Result<usize> {
283 let meta = self.close()?;
284 Ok(meta.num_rows as usize)
285 }
286}
287
288impl<W, R> ObjectWriter<Vec<R>> for SerializedFileWriter<W>
289where
290 W: Write + Send,
291 for<'a> &'a [R]: RecordWriter<R>,
292{
293 fn write_object(&mut self, object: Vec<R>) -> Result<()> {
294 let slice = object.as_slice();
295 let mut group = self.next_row_group()?;
296 slice.write_to_row_group(&mut group)?;
297 group.close()?;
298 Ok(())
299 }
300
301 fn finish(self) -> Result<usize> {
302 self.close()?;
303 Ok(0)
304 }
305}
306
307impl<W> ObjectWriter<DataFrame> for BatchedWriter<W>
309where
310 W: Write,
311{
312 fn write_object(&mut self, df: DataFrame) -> Result<()> {
313 self.write_batch(&df)?;
314 Ok(())
315 }
316
317 fn finish(mut self) -> Result<usize> {
318 let size = BatchedWriter::finish(&mut self)?;
319 Ok(size as usize)
320 }
321}