bookdata/cli/
filter_marc.rs

1//! Command to filter MARC output.
2use std::fs::File;
3use std::path::{Path, PathBuf};
4use std::sync::Arc;
5
6use arrow::array::StringBuilder;
7use arrow::array::UInt32Builder;
8use arrow::datatypes::{DataType, Field, Schema};
9use arrow::record_batch::RecordBatch;
10use friendly::scalar;
11use parquet::arrow::ArrowWriter;
12
13use crate::arrow::scan_parquet_file;
14use crate::arrow::writer::parquet_writer_defaults;
15use crate::io::object::{ThreadObjectWriter, UnchunkWriter};
16use crate::marc::flat_fields::FieldRecord;
17use crate::prelude::*;
18
19const BATCH_SIZE: usize = 1024 * 1024;
20
21/// Filter a MARC field file to only contain certain results.
22#[derive(Args, Debug)]
23#[command(name = "filter-marc")]
24pub struct FilterMARC {
25    #[command(flatten)]
26    filter: FilterSpec,
27
28    #[command(flatten)]
29    output: OutputSpec,
30
31    /// Input file of MARC field data.
32    #[arg(name = "FIELD_FILE")]
33    field_file: PathBuf,
34}
35
36/// Options for filtering MARC records.
37#[derive(Args, Debug, Clone)]
38struct FilterSpec {
39    /// Specify the tag to filter to.
40    #[arg(short = 't', long = "tag", name = "TAG")]
41    tag: Option<i16>,
42
43    /// Specify the subfield to filter to.
44    #[arg(short = 'f', long = "subfield", name = "CODE")]
45    subfield: Option<char>,
46
47    /// Trim the contents before emitting.
48    #[arg(short = 'T', long = "trim")]
49    trim: bool,
50
51    /// Lowercase the contents before emitting.
52    #[arg(short = 'L', long = "lower")]
53    lower: bool,
54}
55
56/// Options for output.
57#[derive(Args, Debug, Clone)]
58struct OutputSpec {
59    /// Rename the content field.
60    #[arg(short = 'n', long = "name", name = "FIELD")]
61    content_name: Option<String>,
62
63    /// Output file for filtered MARC fields.
64    #[arg(short = 'o', long = "output", name = "FILE")]
65    file: PathBuf,
66}
67
68impl FilterSpec {
69    fn matches(&self, rec: &FieldRecord) -> bool {
70        if let Some(t) = &self.tag {
71            if rec.tag != *t {
72                return false;
73            }
74        }
75
76        if let Some(sf) = &self.subfield {
77            if rec.sf_code != (*sf as u8) {
78                return false;
79            }
80        }
81
82        true
83    }
84
85    fn transform<'a>(&self, value: &'a str) -> Cow<'a, str> {
86        let content: Cow<'a, str> = if self.trim {
87            value.trim().into()
88        } else {
89            value.into()
90        };
91
92        let content: Cow<'a, str> = if self.lower {
93            content.to_lowercase().into()
94        } else {
95            content
96        };
97
98        content
99    }
100}
101
102struct FilterOutput<W: ObjectWriter<RecordBatch>> {
103    schema: Arc<Schema>,
104    writer: W,
105}
106
107impl<W: ObjectWriter<RecordBatch>> ObjectWriter<Vec<FieldRecord>> for FilterOutput<W> {
108    fn write_object(&mut self, object: Vec<FieldRecord>) -> Result<()> {
109        let size = object.len();
110
111        let mut id_col = UInt32Builder::with_capacity(size);
112        let mut val_col = StringBuilder::with_capacity(size, size * 10);
113
114        for rec in object {
115            id_col.append_value(rec.rec_id);
116            val_col.append_value(rec.contents);
117        }
118
119        let id_col = id_col.finish();
120        let val_col = val_col.finish();
121        let batch = RecordBatch::try_new(
122            self.schema.clone(),
123            vec![Arc::new(id_col), Arc::new(val_col)],
124        )?;
125
126        self.writer.write_object(batch)?;
127        Ok(())
128    }
129
130    fn finish(self) -> Result<usize> {
131        self.writer.finish()
132    }
133}
134
135/// Scan MARC records from a file.
136///
137/// Failes quickly if there is an error opening the file; errors reading the file are
138/// from the thread and are availabl when it is joined.
139fn scan_records(
140    path: &Path,
141    filter: &FilterSpec,
142    out: impl ObjectWriter<FieldRecord> + Send,
143) -> Result<(usize, usize)> {
144    info!("reading names from authority fields in {:?}", path);
145    let scanner = scan_parquet_file(path)?;
146    let mut out = out;
147
148    let scanner = scanner;
149    let mut nr = 0;
150    let mut nw = 0;
151    for rec in scanner {
152        nr += 1;
153        let mut rec: FieldRecord = rec?;
154        if filter.matches(&rec) {
155            nw += 1;
156            rec.contents = filter.transform(rec.contents.as_str()).into();
157            out.write_object(rec)?;
158        }
159    }
160    debug!("finished scanning parquet");
161    out.finish()?;
162    Ok((nr, nw))
163}
164
165/// Create an output for the records.
166fn open_output(out: &OutputSpec) -> Result<impl ObjectWriter<FieldRecord> + Send> {
167    info!("writing output to {:?}", out.file);
168    let out_name = out
169        .content_name
170        .as_ref()
171        .map(|s| s.clone())
172        .unwrap_or("content".into());
173    let schema = Schema::new(vec![
174        Field::new("rec_id", DataType::UInt32, false),
175        Field::new(&out_name, DataType::Utf8, false),
176    ]);
177    let schema = Arc::new(schema);
178
179    // we'll open the file early, so bg open failures are only in Parquet.
180    let file = File::options()
181        .create(true)
182        .truncate(true)
183        .write(true)
184        .open(&out.file)?;
185
186    let writer = ThreadObjectWriter::bg_open(move || {
187        let props = parquet_writer_defaults().set_column_dictionary_enabled(out_name.into(), true);
188        let writer = ArrowWriter::try_new(file, schema.clone(), Some(props.build()))?;
189        Ok(FilterOutput { schema, writer })
190    })
191    .spawn();
192    let writer = UnchunkWriter::with_size(writer, BATCH_SIZE);
193
194    Ok(writer)
195}
196
197impl Command for FilterMARC {
198    fn exec(&self) -> Result<()> {
199        let out = open_output(&self.output)?;
200        let (nr, nw) = scan_records(self.field_file.as_path(), &self.filter, out)?;
201
202        info!("wrote {} out of {} records", scalar(nw), scalar(nr));
203
204        Ok(())
205    }
206}