bookdata/cli/
index_names.rs1use std::collections::{HashMap, HashSet};
3use std::fs::File;
4use std::path::{Path, PathBuf};
5use std::thread::{spawn, JoinHandle};
6
7use crossbeam::channel::{bounded, Receiver, Sender};
8
9use csv;
10use flate2::write::GzEncoder;
11use parquet_derive::ParquetRecordWriter;
12use serde::Serialize;
13
14use rayon::prelude::*;
15
16use crate::arrow::*;
17use crate::cleaning::names::*;
18use crate::io::background::ThreadWrite;
19use crate::io::object::ThreadObjectWriter;
20use crate::marc::flat_fields::FieldRecord;
21use crate::prelude::*;
22use crate::util::logging::item_progress;
23
24#[derive(Args, Debug)]
25#[command(name = "index-names")]
26pub struct IndexNames {
28 #[arg(long = "marc-authorities", name = "FILE")]
30 marc_authorities: Option<PathBuf>,
31
32 #[arg(name = "OUTFILE")]
34 outfile: PathBuf,
35}
36
37type NameIndex = HashMap<String, HashSet<u32>>;
38
39#[derive(ParquetRecordWriter, Serialize, Clone)]
40struct IndexEntry {
41 rec_id: u32,
42 name: String,
43}
44
45fn scan_authority_names(
46 path: &Path,
47 send: Sender<(String, u32)>,
48) -> Result<JoinHandle<Result<usize>>> {
49 info!("reading names from authority fields in {:?}", path);
50 let scanner = scan_parquet_file(path)?;
51
52 Ok(spawn(move || {
53 let scanner = scanner;
54 let pb = item_progress(scanner.remaining() as u64, "fields");
55 let mut n = 0;
56 for rec in pb.wrap_iter(scanner) {
57 let rec: FieldRecord = rec?;
58 if rec.tag == 700 && rec.sf_code == b'a' {
59 send.send((rec.contents, rec.rec_id))?;
60 n += 1;
61 }
62 }
63 debug!("finished scanning parquet");
64 Ok(n)
65 }))
66}
67
68fn process_names(recv: Receiver<(String, u32)>) -> Result<NameIndex> {
69 let mut index = NameIndex::new();
70
71 for (src, rec_id) in recv {
73 for name in name_variants(&src)? {
74 index.entry(name).or_default().insert(rec_id);
75 }
76 }
77
78 info!("index {} names", index.len());
79 Ok(index)
80}
81
82fn write_index(index: NameIndex, path: &Path) -> Result<()> {
83 info!("sorting {} names", index.len());
84 debug!("copying names");
85 let mut names = Vec::with_capacity(index.len());
86 names.extend(index.keys().map(|s| s.as_str()));
87 debug!("sorting names");
88 names.par_sort_unstable();
89
90 info!("writing deduplicated names to {}", path.to_string_lossy());
91 let mut writer = TableWriter::open(&path)?;
92
93 let mut csv_fn = PathBuf::from(path);
94 csv_fn.set_extension("csv.gz");
95 info!("writing CSV version to {:?}", csv_fn);
96 let out = File::create(&csv_fn)?;
97 let out = GzEncoder::new(out, flate2::Compression::fast());
98 let out = ThreadWrite::new(out)?;
99 let csvw = csv::Writer::from_writer(out);
101 let mut csvout = ThreadObjectWriter::<IndexEntry>::wrap(csvw)
102 .with_name("csv output")
103 .spawn();
104
105 let pb = item_progress(names.len(), "names");
106
107 for name in pb.wrap_iter(names.into_iter()) {
108 let mut ids: Vec<u32> = index.get(name).unwrap().iter().map(|i| *i).collect();
109 ids.sort_unstable();
110 for rec_id in ids {
111 let e = IndexEntry {
112 rec_id,
113 name: name.to_string(),
114 };
115 csvout.write_object(e.clone())?;
116 writer.write_object(e)?;
117 }
118 }
119
120 writer.finish()?;
121 csvout.finish()?;
122 Ok(())
123}
124
125impl Command for IndexNames {
126 fn exec(&self) -> Result<()> {
127 let (send, recv) = bounded(4096);
128 let h = if let Some(ref path) = self.marc_authorities {
129 scan_authority_names(path.as_path(), send)?
130 } else {
131 return Err(anyhow!("no name source specified"));
132 };
133
134 let names = process_names(recv)?;
135 let nr = h.join().expect("thread join error")?;
136 info!("scanned {} name records", nr);
137
138 write_index(names, &self.outfile)?;
139
140 Ok(())
141 }
142}