bookdata/cli/
collect_isbns.rs1use std::fmt::Debug;
3
4use fallible_iterator::IteratorExt;
5use polars::prelude::*;
6
7use crate::prelude::Result;
8use crate::prelude::*;
9
10#[derive(Args, Debug)]
12#[command(name = "collect-isbns")]
13pub struct CollectISBNs {
14 #[arg(short = 'o', long = "output")]
16 out_file: PathBuf,
17}
18
19fn all_sources(cfg: &Config) -> Vec<ISBNSource> {
23 vec![
24 ISBNSource::new("LOC")
25 .path("../loc-mds/book-isbns.parquet")
26 .finish(),
27 ISBNSource::new("OL")
28 .path("../openlibrary/edition-isbns.parquet")
29 .finish(),
30 ISBNSource::new("GR")
31 .enabled(cfg.goodreads.enabled)
32 .path("../goodreads/gr-book-ids.parquet")
33 .columns(&["isbn10", "isbn13", "asin"])
34 .finish(),
35 ISBNSource::new("BX")
36 .enabled(cfg.bx.enabled)
37 .path("../bx/cleaned-ratings.csv")
38 .finish(),
39 ISBNSource::new("AZ14")
40 .enabled(cfg.az2014.enabled)
41 .path("../az2014/ratings.parquet")
42 .column("asin")
43 .finish(),
44 ISBNSource::new("AZ18")
45 .enabled(cfg.az2018.enabled)
46 .path("../az2018/ratings.parquet")
47 .column("asin")
48 .finish(),
49 ]
50}
51
52#[derive(Debug, Clone)]
53struct ISBNSource {
54 name: &'static str,
55 enabled: bool,
56 path: &'static str,
57 columns: Vec<&'static str>,
58}
59
60impl ISBNSource {
61 fn new(name: &'static str) -> ISBNSource {
62 ISBNSource {
63 name: name,
64 enabled: true,
65 path: "",
66 columns: vec![],
67 }
68 }
69
70 fn enabled(self, e: bool) -> ISBNSource {
71 ISBNSource { enabled: e, ..self }
72 }
73
74 fn path(self, path: &'static str) -> ISBNSource {
75 ISBNSource { path, ..self }
76 }
77
78 fn column(self, col: &'static str) -> ISBNSource {
79 ISBNSource {
80 columns: vec![col],
81 ..self
82 }
83 }
84
85 fn columns(self, cols: &[&'static str]) -> ISBNSource {
86 ISBNSource {
87 columns: cols.iter().map(|s| *s).collect(),
88 ..self
89 }
90 }
91
92 fn finish(self) -> ISBNSource {
93 ISBNSource {
94 columns: if self.columns.len() > 0 {
95 self.columns
96 } else {
97 vec!["isbn".into()]
98 },
99 ..self
100 }
101 }
102}
103
104fn scan_source(src: &ISBNSource) -> Result<LazyFrame> {
106 info!("scanning ISBNs from {}", src.path);
107
108 let read = if src.path.ends_with(".csv") {
109 LazyCsvReader::new(src.path.to_string())
110 .has_header(true)
111 .finish()?
112 } else {
113 scan_df_parquet(src.path)?
114 };
115
116 let mut counted: Option<LazyFrame> = None;
117 for id_col in &src.columns {
118 info!("counting column {}", id_col);
119 let df = read.clone().select(&[col(id_col).alias("isbn")]);
120 let df = df.drop_nulls(None);
121 let df = df.group_by(["isbn"]).agg([len().alias("nrecs")]);
122 if let Some(prev) = counted {
123 let joined = prev.join(
124 df,
125 [col("isbn")],
126 [col("isbn")],
127 JoinArgs::new(JoinType::Outer { coalesce: true }),
128 );
129 counted = Some(joined.select([
130 col("isbn"),
131 (col(src.name).fill_null(0) + col("nrecs").fill_null(0)).alias(src.name),
132 ]));
133 } else {
134 counted = Some(df.select([col("isbn"), col("nrecs").alias(src.name)]));
135 }
136 }
137
138 Ok(counted.expect("data frame with no columns"))
139}
140
141impl Command for CollectISBNs {
142 fn exec(&self) -> Result<()> {
143 let cfg = load_config()?;
144 let sources = all_sources(&cfg);
145 let active: Vec<_> = sources.iter().filter(|s| s.enabled).collect();
146 info!(
147 "preparing to collect ISBNs from {} active sources (of {} known)",
148 active.len(),
149 sources.len()
150 );
151
152 let df = active
153 .iter()
154 .map(|s| scan_source(*s))
155 .transpose_into_fallible()
156 .fold(None, |cur: Option<LazyFrame>, df2| {
157 let out = if let Some(df1) = cur {
158 df1.join(
159 df2.clone(),
160 [col("isbn")],
161 [col("isbn")],
162 JoinArgs::new(JoinType::Outer { coalesce: true }),
163 )
164 } else {
165 df2
166 };
167 debug!("join result schema:\n{:?}", out.schema()?);
168 debug!("join result plan:\n{:?}", out.logical_plan);
169 Ok(Some(out))
170 })?;
171
172 let df = df.ok_or_else(|| anyhow!("no sources loaded"))?;
173 let df = df.with_row_index("isbn_id", Some(1));
174 let mut cast = vec![col("isbn_id").cast(DataType::Int32), col("isbn")];
175 for src in &active {
176 cast.push(col(src.name).fill_null(0));
177 }
178 let df = df.select(&cast);
179 info!("collecting ISBNs");
180 let df = df.collect()?;
181
182 info!(
183 "saving {} ISBNs to {}",
184 df.height(),
185 self.out_file.display()
186 );
187 save_df_parquet(df, &self.out_file)?;
188 info!("wrote ISBN collection file");
189
190 Ok(())
191 }
192}