bookdata/cli/cluster/
authors.rs

1//! Extract author information for book clusters.
2use std::path::PathBuf;
3
4use parse_display::{Display, FromStr};
5
6use crate::arrow::dfext::*;
7use crate::arrow::writer::save_df_parquet_nonnull;
8use crate::prelude::*;
9use anyhow::Result;
10use polars::prelude::*;
11
12#[derive(Display, FromStr, Debug, Clone)]
13#[display(style = "lowercase")]
14enum Source {
15    OpenLib,
16    LOC,
17}
18
19#[derive(Args, Debug)]
20#[command(name = "extract-authors")]
21/// Extract cluster author data from extracted book data.
22pub struct ClusterAuthors {
23    /// Only extract first authors
24    #[arg(long = "first-author")]
25    first_author: bool,
26
27    /// Specify output file
28    #[arg(short = 'o', long = "output")]
29    output: PathBuf,
30
31    /// Specify the source
32    #[arg(short = 's', long = "source")]
33    sources: Vec<Source>,
34}
35
36/// Scan the OpenLibrary data for authors.
37fn scan_openlib(first_only: bool) -> Result<LazyFrame> {
38    info!("scanning OpenLibrary author data");
39    info!("reading ISBN clusters");
40    let icl = scan_df_parquet("book-links/isbn-clusters.parquet")?;
41    let icl = icl.select(&[col("isbn_id"), col("cluster")]);
42    info!("reading OL edition IDs");
43    let edl = scan_df_parquet("openlibrary/edition-isbn-ids.parquet")?;
44    let edl = edl.filter(col("isbn_id").is_not_null());
45    info!("reading OL edition authors");
46    let mut eau = scan_df_parquet("openlibrary/edition-authors.parquet")?;
47    if first_only {
48        eau = eau.filter(col("pos").eq(0i16));
49    }
50
51    info!("reading OL author names");
52    let auth = scan_df_parquet("openlibrary/author-names.parquet")?;
53    let linked = icl.join(
54        edl,
55        [col("isbn_id")],
56        [col("isbn_id")],
57        JoinType::Inner.into(),
58    );
59    let linked = linked.join(
60        eau,
61        [col("edition")],
62        [col("edition")],
63        JoinType::Inner.into(),
64    );
65    let linked = linked.join(auth, [col("author")], [col("id")], JoinType::Inner.into());
66    let authors = linked.select(vec![
67        col("cluster"),
68        col("name")
69            .alias("author_name")
70            .map(udf_clean_name, GetOutput::from_type(DataType::String)),
71    ]);
72
73    Ok(authors)
74}
75
76/// Scan the Library of Congress data for first authors.
77fn scan_loc(first_only: bool) -> Result<LazyFrame> {
78    if !first_only {
79        error!("only first-author extraction is currently supported");
80        return Err(anyhow!("cannot extract multiple authors"));
81    }
82
83    info!("reading ISBN clusters");
84    let icl = scan_df_parquet("book-links/isbn-clusters.parquet")?;
85    let icl = icl.select([col("isbn_id"), col("cluster")]);
86
87    info!("reading LOC book records");
88    let books = scan_df_parquet("loc-mds/book-isbn-ids.parquet")?;
89
90    info!("reading LOC book authors");
91    let authors = scan_df_parquet("loc-mds/book-authors.parquet")?;
92    let authors = authors.filter(col("author_name").is_not_null());
93
94    let linked = icl.join(
95        books,
96        [col("isbn_id")],
97        [col("isbn_id")],
98        JoinType::Inner.into(),
99    );
100    let linked = linked.join(
101        authors,
102        [col("rec_id")],
103        [col("rec_id")],
104        JoinType::Inner.into(),
105    );
106    let authors = linked.select(vec![
107        col("cluster"),
108        col("author_name").map(udf_clean_name, GetOutput::from_type(DataType::String)),
109    ]);
110
111    Ok(authors)
112}
113
114impl Command for ClusterAuthors {
115    fn exec(&self) -> Result<()> {
116        let mut authors: Option<LazyFrame> = None;
117        for source in &self.sources {
118            let astr = match source {
119                Source::OpenLib => scan_openlib(self.first_author)?,
120                Source::LOC => scan_loc(self.first_author)?,
121            };
122            debug!("author source {} has schema {:?}", source, astr.schema());
123            if let Some(adf) = authors {
124                authors = Some(concat(
125                    [adf, astr],
126                    UnionArgs {
127                        parallel: true,
128                        rechunk: false,
129                        to_supertypes: false,
130                    },
131                )?);
132            } else {
133                authors = Some(astr);
134            }
135        }
136        let authors = authors.ok_or(anyhow!("no sources specified"))?;
137        let authors = authors.filter(
138            col("author_name")
139                .is_not_null()
140                .and(col("author_name").neq("".lit())),
141        );
142
143        let authors = authors.unique(None, UniqueKeepStrategy::First);
144
145        debug!("plan: {}", authors.describe_plan());
146
147        info!("collecting results");
148        let authors = authors.collect()?;
149        info!("found {} cluster-author links", authors.height());
150
151        info!("saving to {:?}", &self.output);
152        save_df_parquet_nonnull(authors, &self.output)?;
153
154        info!(
155            "output file is {}",
156            friendly::bytes(file_size(&self.output)?)
157        );
158
159        Ok(())
160    }
161}