bookdata/cli/
link_isbns.rs

1use clap::Args;
2
3use crate::{arrow::*, cli::link_isbns::writer::save_df_parquet_nonnull, prelude::*};
4use polars::prelude::*;
5
6static ALL_ISBNS_FILE: &str = "book-links/all-isbns.parquet";
7
8/// Link records to ISBN IDs.
9#[derive(Debug, Args)]
10#[command(name = "link-isbn-ids")]
11pub struct LinkISBNIds {
12    /// Read record IDs from RECFLD.
13    #[arg(
14        short = 'R',
15        long = "record-id",
16        name = "RECFLD",
17        default_value = "rec_id"
18    )]
19    rec_field: String,
20
21    /// Read ISBNs from FIELD.
22    #[arg(
23        short = 'I',
24        long = "isbn-field",
25        name = "FIELD",
26        default_value = "isbn"
27    )]
28    isbn_fields: Vec<String>,
29
30    /// Write output to FILE.
31    #[arg(short = 'o', long = "output", name = "FILE")]
32    outfile: PathBuf,
33
34    /// Read records from INPUT.
35    #[arg(name = "INFILE")]
36    infile: PathBuf,
37}
38
39impl Command for LinkISBNIds {
40    fn exec(&self) -> Result<()> {
41        info!("record field: {}", &self.rec_field);
42        info!("ISBN fields: {:?}", &self.isbn_fields);
43
44        let isbns = scan_df_parquet(ALL_ISBNS_FILE)?;
45        let records = scan_df_parquet(&self.infile)?;
46
47        let merged = if self.isbn_fields.len() == 1 {
48            // one column, join on it
49            records.join(
50                isbns,
51                &[col(self.isbn_fields[0].as_str())],
52                &[col("isbn")],
53                JoinType::Inner.into(),
54            )
55        } else {
56            let mut melt = MeltArgs::default();
57            melt.id_vars.push((&self.rec_field).into());
58            for fld in &self.isbn_fields {
59                melt.value_vars.push(fld.into());
60            }
61            melt.value_name = Some("isbn".into());
62            melt.variable_name = Some("field".into());
63            let rm = records.melt(melt);
64            rm.join(
65                isbns,
66                &[col("isbn")],
67                &[col("isbn")],
68                JoinType::Inner.into(),
69            )
70        };
71        let filtered = merged
72            .filter(col("isbn").is_not_null())
73            .select(&[col(self.rec_field.as_str()), col("isbn_id")])
74            .unique(None, UniqueKeepStrategy::First)
75            .sort(self.rec_field.as_str(), default());
76
77        info!("collecting results");
78        let frame = filtered.collect()?;
79        if frame.column(&self.rec_field)?.null_count() > 0 {
80            error!("final frame has null record IDs");
81            return Err(anyhow!("data check failed"));
82        }
83        if frame.column("isbn_id")?.null_count() > 0 {
84            error!("final frame has null ISBN IDs");
85            return Err(anyhow!("data check failed"));
86        }
87
88        info!("saving {} links to {:?}", frame.height(), &self.outfile);
89        save_df_parquet_nonnull(frame, &self.outfile)?;
90
91        Ok(())
92    }
93}