bookdata/cli/
index_names.rs

1//! Index names from authority records.
2use std::collections::{HashMap, HashSet};
3use std::fs::File;
4use std::path::{Path, PathBuf};
5use std::thread::{spawn, JoinHandle};
6
7use crossbeam::channel::{bounded, Receiver, Sender};
8
9use csv;
10use flate2::write::GzEncoder;
11use parquet_derive::ParquetRecordWriter;
12use serde::Serialize;
13
14use rayon::prelude::*;
15
16use crate::arrow::*;
17use crate::cleaning::names::*;
18use crate::io::background::ThreadWrite;
19use crate::io::object::ThreadObjectWriter;
20use crate::marc::flat_fields::FieldRecord;
21use crate::prelude::*;
22use crate::util::logging::item_progress;
23
24#[derive(Args, Debug)]
25#[command(name = "index-names")]
26/// Clean and index author names from authority records.
27pub struct IndexNames {
28    /// MARC authority field file to scan for names.
29    #[arg(long = "marc-authorities", name = "FILE")]
30    marc_authorities: Option<PathBuf>,
31
32    /// Index output Parquet file.
33    #[arg(name = "OUTFILE")]
34    outfile: PathBuf,
35}
36
37type NameIndex = HashMap<String, HashSet<u32>>;
38
39#[derive(ParquetRecordWriter, Serialize, Clone)]
40struct IndexEntry {
41    rec_id: u32,
42    name: String,
43}
44
45fn scan_authority_names(
46    path: &Path,
47    send: Sender<(String, u32)>,
48) -> Result<JoinHandle<Result<usize>>> {
49    info!("reading names from authority fields in {:?}", path);
50    let scanner = scan_parquet_file(path)?;
51
52    Ok(spawn(move || {
53        let scanner = scanner;
54        let pb = item_progress(scanner.remaining() as u64, "fields");
55        let mut n = 0;
56        for rec in pb.wrap_iter(scanner) {
57            let rec: FieldRecord = rec?;
58            if rec.tag == 700 && rec.sf_code == b'a' {
59                send.send((rec.contents, rec.rec_id))?;
60                n += 1;
61            }
62        }
63        debug!("finished scanning parquet");
64        Ok(n)
65    }))
66}
67
68fn process_names(recv: Receiver<(String, u32)>) -> Result<NameIndex> {
69    let mut index = NameIndex::new();
70
71    // process results and add to list
72    for (src, rec_id) in recv {
73        for name in name_variants(&src)? {
74            index.entry(name).or_default().insert(rec_id);
75        }
76    }
77
78    info!("index {} names", index.len());
79    Ok(index)
80}
81
82fn write_index(index: NameIndex, path: &Path) -> Result<()> {
83    info!("sorting {} names", index.len());
84    debug!("copying names");
85    let mut names = Vec::with_capacity(index.len());
86    names.extend(index.keys().map(|s| s.as_str()));
87    debug!("sorting names");
88    names.par_sort_unstable();
89
90    info!("writing deduplicated names to {}", path.to_string_lossy());
91    let mut writer = TableWriter::open(&path)?;
92
93    let mut csv_fn = PathBuf::from(path);
94    csv_fn.set_extension("csv.gz");
95    info!("writing CSV version to {:?}", csv_fn);
96    let out = File::create(&csv_fn)?;
97    let out = GzEncoder::new(out, flate2::Compression::fast());
98    let out = ThreadWrite::new(out)?;
99    // let out = Encoder::new(out, 2)?.auto_finish();
100    let csvw = csv::Writer::from_writer(out);
101    let mut csvout = ThreadObjectWriter::<IndexEntry>::wrap(csvw)
102        .with_name("csv output")
103        .spawn();
104
105    let pb = item_progress(names.len(), "names");
106
107    for name in pb.wrap_iter(names.into_iter()) {
108        let mut ids: Vec<u32> = index.get(name).unwrap().iter().map(|i| *i).collect();
109        ids.sort_unstable();
110        for rec_id in ids {
111            let e = IndexEntry {
112                rec_id,
113                name: name.to_string(),
114            };
115            csvout.write_object(e.clone())?;
116            writer.write_object(e)?;
117        }
118    }
119
120    writer.finish()?;
121    csvout.finish()?;
122    Ok(())
123}
124
125impl Command for IndexNames {
126    fn exec(&self) -> Result<()> {
127        let (send, recv) = bounded(4096);
128        let h = if let Some(ref path) = self.marc_authorities {
129            scan_authority_names(path.as_path(), send)?
130        } else {
131            return Err(anyhow!("no name source specified"));
132        };
133
134        let names = process_names(recv)?;
135        let nr = h.join().expect("thread join error")?;
136        info!("scanned {} name records", nr);
137
138        write_index(names, &self.outfile)?;
139
140        Ok(())
141    }
142}