bookdata/cli/
filter_marc.rs1use std::fs::File;
3use std::path::{Path, PathBuf};
4use std::sync::Arc;
5
6use arrow::array::StringBuilder;
7use arrow::array::UInt32Builder;
8use arrow::datatypes::{DataType, Field, Schema};
9use arrow::record_batch::RecordBatch;
10use friendly::scalar;
11use parquet::arrow::ArrowWriter;
12
13use crate::arrow::scan_parquet_file;
14use crate::arrow::writer::parquet_writer_defaults;
15use crate::io::object::{ThreadObjectWriter, UnchunkWriter};
16use crate::marc::flat_fields::FieldRecord;
17use crate::prelude::*;
18
19const BATCH_SIZE: usize = 1024 * 1024;
20
21#[derive(Args, Debug)]
23#[command(name = "filter-marc")]
24pub struct FilterMARC {
25 #[command(flatten)]
26 filter: FilterSpec,
27
28 #[command(flatten)]
29 output: OutputSpec,
30
31 #[arg(name = "FIELD_FILE")]
33 field_file: PathBuf,
34}
35
36#[derive(Args, Debug, Clone)]
38struct FilterSpec {
39 #[arg(short = 't', long = "tag", name = "TAG")]
41 tag: Option<i16>,
42
43 #[arg(short = 'f', long = "subfield", name = "CODE")]
45 subfield: Option<char>,
46
47 #[arg(short = 'T', long = "trim")]
49 trim: bool,
50
51 #[arg(short = 'L', long = "lower")]
53 lower: bool,
54}
55
56#[derive(Args, Debug, Clone)]
58struct OutputSpec {
59 #[arg(short = 'n', long = "name", name = "FIELD")]
61 content_name: Option<String>,
62
63 #[arg(short = 'o', long = "output", name = "FILE")]
65 file: PathBuf,
66}
67
68impl FilterSpec {
69 fn matches(&self, rec: &FieldRecord) -> bool {
70 if let Some(t) = &self.tag {
71 if rec.tag != *t {
72 return false;
73 }
74 }
75
76 if let Some(sf) = &self.subfield {
77 if rec.sf_code != (*sf as u8) {
78 return false;
79 }
80 }
81
82 true
83 }
84
85 fn transform<'a>(&self, value: &'a str) -> Cow<'a, str> {
86 let content: Cow<'a, str> = if self.trim {
87 value.trim().into()
88 } else {
89 value.into()
90 };
91
92 let content: Cow<'a, str> = if self.lower {
93 content.to_lowercase().into()
94 } else {
95 content
96 };
97
98 content
99 }
100}
101
102struct FilterOutput<W: ObjectWriter<RecordBatch>> {
103 schema: Arc<Schema>,
104 writer: W,
105}
106
107impl<W: ObjectWriter<RecordBatch>> ObjectWriter<Vec<FieldRecord>> for FilterOutput<W> {
108 fn write_object(&mut self, object: Vec<FieldRecord>) -> Result<()> {
109 let size = object.len();
110
111 let mut id_col = UInt32Builder::with_capacity(size);
112 let mut val_col = StringBuilder::with_capacity(size, size * 10);
113
114 for rec in object {
115 id_col.append_value(rec.rec_id);
116 val_col.append_value(rec.contents);
117 }
118
119 let id_col = id_col.finish();
120 let val_col = val_col.finish();
121 let batch = RecordBatch::try_new(
122 self.schema.clone(),
123 vec![Arc::new(id_col), Arc::new(val_col)],
124 )?;
125
126 self.writer.write_object(batch)?;
127 Ok(())
128 }
129
130 fn finish(self) -> Result<usize> {
131 self.writer.finish()
132 }
133}
134
135fn scan_records(
140 path: &Path,
141 filter: &FilterSpec,
142 out: impl ObjectWriter<FieldRecord> + Send,
143) -> Result<(usize, usize)> {
144 info!("reading names from authority fields in {:?}", path);
145 let scanner = scan_parquet_file(path)?;
146 let mut out = out;
147
148 let scanner = scanner;
149 let mut nr = 0;
150 let mut nw = 0;
151 for rec in scanner {
152 nr += 1;
153 let mut rec: FieldRecord = rec?;
154 if filter.matches(&rec) {
155 nw += 1;
156 rec.contents = filter.transform(rec.contents.as_str()).into();
157 out.write_object(rec)?;
158 }
159 }
160 debug!("finished scanning parquet");
161 out.finish()?;
162 Ok((nr, nw))
163}
164
165fn open_output(out: &OutputSpec) -> Result<impl ObjectWriter<FieldRecord> + Send> {
167 info!("writing output to {:?}", out.file);
168 let out_name = out
169 .content_name
170 .as_ref()
171 .map(|s| s.clone())
172 .unwrap_or("content".into());
173 let schema = Schema::new(vec![
174 Field::new("rec_id", DataType::UInt32, false),
175 Field::new(&out_name, DataType::Utf8, false),
176 ]);
177 let schema = Arc::new(schema);
178
179 let file = File::options()
181 .create(true)
182 .truncate(true)
183 .write(true)
184 .open(&out.file)?;
185
186 let writer = ThreadObjectWriter::bg_open(move || {
187 let props = parquet_writer_defaults().set_column_dictionary_enabled(out_name.into(), true);
188 let writer = ArrowWriter::try_new(file, schema.clone(), Some(props.build()))?;
189 Ok(FilterOutput { schema, writer })
190 })
191 .spawn();
192 let writer = UnchunkWriter::with_size(writer, BATCH_SIZE);
193
194 Ok(writer)
195}
196
197impl Command for FilterMARC {
198 fn exec(&self) -> Result<()> {
199 let out = open_output(&self.output)?;
200 let (nr, nw) = scan_records(self.field_file.as_path(), &self.filter, out)?;
201
202 info!("wrote {} out of {} records", scalar(nw), scalar(nr));
203
204 Ok(())
205 }
206}