bookdata/cli/
index_names.rs1use std::collections::{HashMap, HashSet};
3use std::fs::File;
4use std::path::{Path, PathBuf};
5use std::thread::{spawn, JoinHandle};
6
7use crossbeam::channel::{bounded, Receiver, Sender};
8
9use csv;
10use flate2::write::GzEncoder;
11use parquet_derive::ParquetRecordWriter;
12use serde::Serialize;
13
14use rayon::prelude::*;
15
16use crate::arrow::*;
17use crate::cleaning::names::*;
18use crate::io::background::ThreadWrite;
19use crate::io::object::ThreadObjectWriter;
20use crate::marc::flat_fields::FieldRecord;
21use crate::prelude::*;
22use crate::util::logging::item_progress;
23
24#[derive(Args, Debug)]
25#[command(name = "index-names")]
26pub struct IndexNames {
28 #[arg(long = "marc-authorities", name = "FILE")]
30 marc_authorities: Option<PathBuf>,
31
32 #[arg(name = "OUTFILE")]
34 outfile: PathBuf,
35}
36
37type NameIndex = HashMap<String, HashSet<u32>>;
38
39#[derive(ParquetRecordWriter, Serialize, Clone)]
40struct IndexEntry {
41 rec_id: u32,
42 name: String,
43 clean_name: String,
44}
45
46fn scan_authority_names(
47 path: &Path,
48 send: Sender<(String, u32)>,
49) -> Result<JoinHandle<Result<usize>>> {
50 info!("reading names from authority fields in {:?}", path);
51 let scanner = scan_parquet_file(path)?;
52
53 Ok(spawn(move || {
54 let scanner = scanner;
55 let pb = item_progress(scanner.remaining() as u64, "fields");
56 let mut n = 0;
57 for rec in pb.wrap_iter(scanner) {
58 let rec: FieldRecord = rec?;
59 if rec.tag == 700 && rec.sf_code == b'a' {
60 send.send((rec.contents, rec.rec_id))?;
61 n += 1;
62 }
63 }
64 debug!("finished scanning parquet");
65 Ok(n)
66 }))
67}
68
69fn process_names(recv: Receiver<(String, u32)>) -> Result<NameIndex> {
70 let mut index = NameIndex::new();
71
72 for (src, rec_id) in recv {
74 for name in name_variants(&src)? {
75 index.entry(name).or_default().insert(rec_id);
76 }
77 }
78
79 info!("index {} names", index.len());
80 Ok(index)
81}
82
83fn write_index(index: NameIndex, path: &Path) -> Result<()> {
84 info!("sorting {} names", index.len());
85 debug!("copying names");
86 let mut names = Vec::with_capacity(index.len());
87 names.extend(index.keys().map(|s| s.as_str()));
88 debug!("sorting names");
89 names.par_sort_unstable();
90
91 info!("writing deduplicated names to {}", path.to_string_lossy());
92 let mut writer = TableWriter::open(&path)?;
93
94 let mut csv_fn = PathBuf::from(path);
95 csv_fn.set_extension("csv.gz");
96 info!("writing CSV version to {:?}", csv_fn);
97 let out = File::create(&csv_fn)?;
98 let out = GzEncoder::new(out, flate2::Compression::fast());
99 let out = ThreadWrite::new(out)?;
100 let csvw = csv::Writer::from_writer(out);
102 let mut csvout = ThreadObjectWriter::<IndexEntry>::wrap(csvw)
103 .with_name("csv output")
104 .spawn();
105
106 let pb = item_progress(names.len(), "names");
107
108 for name in pb.wrap_iter(names.into_iter()) {
109 let mut ids: Vec<u32> = index.get(name).unwrap().iter().map(|i| *i).collect();
110 ids.sort_unstable();
111 for rec_id in ids {
112 let e = IndexEntry {
113 rec_id,
114 name: name.to_string(),
115 clean_name: clean_name(&name),
116 };
117 csvout.write_object(e.clone())?;
118 writer.write_object(e)?;
119 }
120 }
121
122 writer.finish_objects()?;
123 csvout.finish_objects()?;
124 Ok(())
125}
126
127impl Command for IndexNames {
128 fn exec(&self) -> Result<()> {
129 let (send, recv) = bounded(4096);
130 let h = if let Some(ref path) = self.marc_authorities {
131 scan_authority_names(path.as_path(), send)?
132 } else {
133 return Err(anyhow!("no name source specified"));
134 };
135
136 let names = process_names(recv)?;
137 let nr = h.join().expect("thread join error")?;
138 info!("scanned {} name records", nr);
139
140 write_index(names, &self.outfile)?;
141
142 Ok(())
143 }
144}