bookdata/cli/
index_names.rs

1//! Index names from authority records.
2use std::collections::{HashMap, HashSet};
3use std::fs::File;
4use std::path::{Path, PathBuf};
5use std::thread::{spawn, JoinHandle};
6
7use crossbeam::channel::{bounded, Receiver, Sender};
8
9use csv;
10use flate2::write::GzEncoder;
11use parquet_derive::ParquetRecordWriter;
12use serde::Serialize;
13
14use rayon::prelude::*;
15
16use crate::arrow::*;
17use crate::cleaning::names::*;
18use crate::io::background::ThreadWrite;
19use crate::io::object::ThreadObjectWriter;
20use crate::marc::flat_fields::FieldRecord;
21use crate::prelude::*;
22use crate::util::logging::item_progress;
23
24#[derive(Args, Debug)]
25#[command(name = "index-names")]
26/// Clean and index author names from authority records.
27pub struct IndexNames {
28    /// MARC authority field file to scan for names.
29    #[arg(long = "marc-authorities", name = "FILE")]
30    marc_authorities: Option<PathBuf>,
31
32    /// Index output Parquet file.
33    #[arg(name = "OUTFILE")]
34    outfile: PathBuf,
35}
36
37type NameIndex = HashMap<String, HashSet<u32>>;
38
39#[derive(ParquetRecordWriter, Serialize, Clone)]
40struct IndexEntry {
41    rec_id: u32,
42    name: String,
43    clean_name: String,
44}
45
46fn scan_authority_names(
47    path: &Path,
48    send: Sender<(String, u32)>,
49) -> Result<JoinHandle<Result<usize>>> {
50    info!("reading names from authority fields in {:?}", path);
51    let scanner = scan_parquet_file(path)?;
52
53    Ok(spawn(move || {
54        let scanner = scanner;
55        let pb = item_progress(scanner.remaining() as u64, "fields");
56        let mut n = 0;
57        for rec in pb.wrap_iter(scanner) {
58            let rec: FieldRecord = rec?;
59            if rec.tag == 700 && rec.sf_code == b'a' {
60                send.send((rec.contents, rec.rec_id))?;
61                n += 1;
62            }
63        }
64        debug!("finished scanning parquet");
65        Ok(n)
66    }))
67}
68
69fn process_names(recv: Receiver<(String, u32)>) -> Result<NameIndex> {
70    let mut index = NameIndex::new();
71
72    // process results and add to list
73    for (src, rec_id) in recv {
74        for name in name_variants(&src)? {
75            index.entry(name).or_default().insert(rec_id);
76        }
77    }
78
79    info!("index {} names", index.len());
80    Ok(index)
81}
82
83fn write_index(index: NameIndex, path: &Path) -> Result<()> {
84    info!("sorting {} names", index.len());
85    debug!("copying names");
86    let mut names = Vec::with_capacity(index.len());
87    names.extend(index.keys().map(|s| s.as_str()));
88    debug!("sorting names");
89    names.par_sort_unstable();
90
91    info!("writing deduplicated names to {}", path.to_string_lossy());
92    let mut writer = TableWriter::open(&path)?;
93
94    let mut csv_fn = PathBuf::from(path);
95    csv_fn.set_extension("csv.gz");
96    info!("writing CSV version to {:?}", csv_fn);
97    let out = File::create(&csv_fn)?;
98    let out = GzEncoder::new(out, flate2::Compression::fast());
99    let out = ThreadWrite::new(out)?;
100    // let out = Encoder::new(out, 2)?.auto_finish();
101    let csvw = csv::Writer::from_writer(out);
102    let mut csvout = ThreadObjectWriter::<IndexEntry>::wrap(csvw)
103        .with_name("csv output")
104        .spawn();
105
106    let pb = item_progress(names.len(), "names");
107
108    for name in pb.wrap_iter(names.into_iter()) {
109        let mut ids: Vec<u32> = index.get(name).unwrap().iter().map(|i| *i).collect();
110        ids.sort_unstable();
111        for rec_id in ids {
112            let e = IndexEntry {
113                rec_id,
114                name: name.to_string(),
115                clean_name: clean_name(&name),
116            };
117            csvout.write_object(e.clone())?;
118            writer.write_object(e)?;
119        }
120    }
121
122    writer.finish_objects()?;
123    csvout.finish_objects()?;
124    Ok(())
125}
126
127impl Command for IndexNames {
128    fn exec(&self) -> Result<()> {
129        let (send, recv) = bounded(4096);
130        let h = if let Some(ref path) = self.marc_authorities {
131            scan_authority_names(path.as_path(), send)?
132        } else {
133            return Err(anyhow!("no name source specified"));
134        };
135
136        let names = process_names(recv)?;
137        let nr = h.join().expect("thread join error")?;
138        info!("scanned {} name records", nr);
139
140        write_index(names, &self.outfile)?;
141
142        Ok(())
143    }
144}