bookdata/cli/
collect_isbns.rs

1//! Collect ISBNs from across the data sources.
2use std::fmt::Debug;
3
4use fallible_iterator::IteratorExt;
5use polars::prelude::*;
6
7use crate::prelude::Result;
8use crate::prelude::*;
9
10/// Collect ISBNs from across the data sources.
11#[derive(Args, Debug)]
12#[command(name = "collect-isbns")]
13pub struct CollectISBNs {
14    /// Path to the output file (in Parquet format)
15    #[arg(short = 'o', long = "output")]
16    out_file: PathBuf,
17}
18
19/// Get the active ISBN layouts.
20///
21/// Modify this function to add more sources.
22fn all_sources(cfg: &Config) -> Vec<ISBNSource> {
23    vec![
24        ISBNSource::new("LOC")
25            .path("../loc-mds/book-isbns.parquet")
26            .finish(),
27        ISBNSource::new("OL")
28            .path("../openlibrary/edition-isbns.parquet")
29            .finish(),
30        ISBNSource::new("GR")
31            .enabled(cfg.goodreads.enabled)
32            .path("../goodreads/gr-book-ids.parquet")
33            .columns(&["isbn10", "isbn13", "asin"])
34            .finish(),
35        ISBNSource::new("BX")
36            .enabled(cfg.bx.enabled)
37            .path("../bx/cleaned-ratings.csv")
38            .finish(),
39        ISBNSource::new("AZ14")
40            .enabled(cfg.az2014.enabled)
41            .path("../az2014/ratings.parquet")
42            .column("asin")
43            .finish(),
44        ISBNSource::new("AZ18")
45            .enabled(cfg.az2018.enabled)
46            .path("../az2018/ratings.parquet")
47            .column("asin")
48            .finish(),
49    ]
50}
51
52#[derive(Debug, Clone)]
53struct ISBNSource {
54    name: &'static str,
55    enabled: bool,
56    path: &'static str,
57    columns: Vec<&'static str>,
58}
59
60impl ISBNSource {
61    fn new(name: &'static str) -> ISBNSource {
62        ISBNSource {
63            name: name,
64            enabled: true,
65            path: "",
66            columns: vec![],
67        }
68    }
69
70    fn enabled(self, e: bool) -> ISBNSource {
71        ISBNSource { enabled: e, ..self }
72    }
73
74    fn path(self, path: &'static str) -> ISBNSource {
75        ISBNSource { path, ..self }
76    }
77
78    fn column(self, col: &'static str) -> ISBNSource {
79        ISBNSource {
80            columns: vec![col],
81            ..self
82        }
83    }
84
85    fn columns(self, cols: &[&'static str]) -> ISBNSource {
86        ISBNSource {
87            columns: cols.iter().map(|s| *s).collect(),
88            ..self
89        }
90    }
91
92    fn finish(self) -> ISBNSource {
93        ISBNSource {
94            columns: if self.columns.len() > 0 {
95                self.columns
96            } else {
97                vec!["isbn".into()]
98            },
99            ..self
100        }
101    }
102}
103
104/// Read a single ISBN source into the accumulator.
105fn scan_source(src: &ISBNSource) -> Result<LazyFrame> {
106    info!("scanning ISBNs from {}", src.path);
107
108    let read = if src.path.ends_with(".csv") {
109        LazyCsvReader::new(src.path.to_string())
110            .has_header(true)
111            .finish()?
112    } else {
113        scan_df_parquet(src.path)?
114    };
115
116    let mut counted: Option<LazyFrame> = None;
117    for id_col in &src.columns {
118        info!("counting column {}", id_col);
119        let df = read.clone().select(&[col(id_col).alias("isbn")]);
120        let df = df.drop_nulls(None);
121        let df = df.group_by(["isbn"]).agg([len().alias("nrecs")]);
122        if let Some(prev) = counted {
123            let joined = prev.join(
124                df,
125                [col("isbn")],
126                [col("isbn")],
127                JoinArgs::new(JoinType::Outer { coalesce: true }),
128            );
129            counted = Some(joined.select([
130                col("isbn"),
131                (col(src.name).fill_null(0) + col("nrecs").fill_null(0)).alias(src.name),
132            ]));
133        } else {
134            counted = Some(df.select([col("isbn"), col("nrecs").alias(src.name)]));
135        }
136    }
137
138    Ok(counted.expect("data frame with no columns"))
139}
140
141impl Command for CollectISBNs {
142    fn exec(&self) -> Result<()> {
143        let cfg = load_config()?;
144        let sources = all_sources(&cfg);
145        let active: Vec<_> = sources.iter().filter(|s| s.enabled).collect();
146        info!(
147            "preparing to collect ISBNs from {} active sources (of {} known)",
148            active.len(),
149            sources.len()
150        );
151
152        let df = active
153            .iter()
154            .map(|s| scan_source(*s))
155            .transpose_into_fallible()
156            .fold(None, |cur: Option<LazyFrame>, df2| {
157                let out = if let Some(df1) = cur {
158                    df1.join(
159                        df2.clone(),
160                        [col("isbn")],
161                        [col("isbn")],
162                        JoinArgs::new(JoinType::Outer { coalesce: true }),
163                    )
164                } else {
165                    df2
166                };
167                debug!("join result schema:\n{:?}", out.schema()?);
168                debug!("join result plan:\n{:?}", out.logical_plan);
169                Ok(Some(out))
170            })?;
171
172        let df = df.ok_or_else(|| anyhow!("no sources loaded"))?;
173        let df = df.with_row_index("isbn_id", Some(1));
174        let mut cast = vec![col("isbn_id").cast(DataType::Int32), col("isbn")];
175        for src in &active {
176            cast.push(col(src.name).fill_null(0));
177        }
178        let df = df.select(&cast);
179        info!("collecting ISBNs");
180        let df = df.collect()?;
181
182        info!(
183            "saving {} ISBNs to {}",
184            df.height(),
185            self.out_file.display()
186        );
187        save_df_parquet(df, &self.out_file)?;
188        info!("wrote ISBN collection file");
189
190        Ok(())
191    }
192}