bookdata/cli/cluster/
authors.rs1use std::path::PathBuf;
3
4use parse_display::{Display, FromStr};
5
6use crate::arrow::dfext::*;
7use crate::arrow::writer::save_df_parquet_nonnull;
8use crate::prelude::*;
9use anyhow::Result;
10use polars::prelude::*;
11
12#[derive(Display, FromStr, Debug, Clone)]
13#[display(style = "lowercase")]
14enum Source {
15 OpenLib,
16 LOC,
17}
18
19#[derive(Args, Debug)]
20#[command(name = "extract-authors")]
21pub struct ClusterAuthors {
23 #[arg(long = "first-author")]
25 first_author: bool,
26
27 #[arg(short = 'o', long = "output")]
29 output: PathBuf,
30
31 #[arg(short = 's', long = "source")]
33 sources: Vec<Source>,
34}
35
36fn scan_openlib(first_only: bool) -> Result<LazyFrame> {
38 info!("scanning OpenLibrary author data");
39 info!("reading ISBN clusters");
40 let icl = scan_df_parquet("book-links/isbn-clusters.parquet")?;
41 let icl = icl.select(&[col("isbn_id"), col("cluster")]);
42 info!("reading OL edition IDs");
43 let edl = scan_df_parquet("openlibrary/edition-isbn-ids.parquet")?;
44 let edl = edl.filter(col("isbn_id").is_not_null());
45 info!("reading OL edition authors");
46 let mut eau = scan_df_parquet("openlibrary/edition-authors.parquet")?;
47 if first_only {
48 eau = eau.filter(col("pos").eq(0i16));
49 }
50
51 info!("reading OL author names");
52 let auth = scan_df_parquet("openlibrary/author-names.parquet")?;
53 let linked = icl.join(
54 edl,
55 [col("isbn_id")],
56 [col("isbn_id")],
57 JoinType::Inner.into(),
58 );
59 let linked = linked.join(
60 eau,
61 [col("edition")],
62 [col("edition")],
63 JoinType::Inner.into(),
64 );
65 let linked = linked.join(auth, [col("author")], [col("id")], JoinType::Inner.into());
66 let authors = linked.select(vec![
67 col("cluster"),
68 col("name")
69 .alias("author_name")
70 .map(udf_clean_name, GetOutput::from_type(DataType::String)),
71 ]);
72
73 Ok(authors)
74}
75
76fn scan_loc(first_only: bool) -> Result<LazyFrame> {
78 if !first_only {
79 error!("only first-author extraction is currently supported");
80 return Err(anyhow!("cannot extract multiple authors"));
81 }
82
83 info!("reading ISBN clusters");
84 let icl = scan_df_parquet("book-links/isbn-clusters.parquet")?;
85 let icl = icl.select([col("isbn_id"), col("cluster")]);
86
87 info!("reading LOC book records");
88 let books = scan_df_parquet("loc-mds/book-isbn-ids.parquet")?;
89
90 info!("reading LOC book authors");
91 let authors = scan_df_parquet("loc-mds/book-authors.parquet")?;
92 let authors = authors.filter(col("author_name").is_not_null());
93
94 let linked = icl.join(
95 books,
96 [col("isbn_id")],
97 [col("isbn_id")],
98 JoinType::Inner.into(),
99 );
100 let linked = linked.join(
101 authors,
102 [col("rec_id")],
103 [col("rec_id")],
104 JoinType::Inner.into(),
105 );
106 let authors = linked.select(vec![
107 col("cluster"),
108 col("author_name").map(udf_clean_name, GetOutput::from_type(DataType::String)),
109 ]);
110
111 Ok(authors)
112}
113
114impl Command for ClusterAuthors {
115 fn exec(&self) -> Result<()> {
116 let mut authors: Option<LazyFrame> = None;
117 for source in &self.sources {
118 let astr = match source {
119 Source::OpenLib => scan_openlib(self.first_author)?,
120 Source::LOC => scan_loc(self.first_author)?,
121 };
122 debug!("author source {} has schema {:?}", source, astr.schema());
123 if let Some(adf) = authors {
124 authors = Some(concat(
125 [adf, astr],
126 UnionArgs {
127 parallel: true,
128 rechunk: false,
129 to_supertypes: false,
130 },
131 )?);
132 } else {
133 authors = Some(astr);
134 }
135 }
136 let authors = authors.ok_or(anyhow!("no sources specified"))?;
137 let authors = authors.filter(
138 col("author_name")
139 .is_not_null()
140 .and(col("author_name").neq("".lit())),
141 );
142
143 let authors = authors.unique(None, UniqueKeepStrategy::First);
144
145 debug!("plan: {}", authors.describe_plan());
146
147 info!("collecting results");
148 let authors = authors.collect()?;
149 info!("found {} cluster-author links", authors.height());
150
151 info!("saving to {:?}", &self.output);
152 save_df_parquet_nonnull(authors, &self.output)?;
153
154 info!(
155 "output file is {}",
156 friendly::bytes(file_size(&self.output)?)
157 );
158
159 Ok(())
160 }
161}