bookdata/cli/cluster/
books.rs

1//! Extract author information for book clusters.
2use std::path::PathBuf;
3
4use crate::arrow::writer::save_df_parquet_nonnull;
5use crate::ids::codes::*;
6use crate::prelude::*;
7use polars::prelude::*;
8
9static GRAPH_NODE_FILE: &str = "book-links/cluster-graph-nodes.parquet";
10
11#[derive(Args, Debug)]
12#[command(name = "extract-books")]
13/// Extract cluster book codes for a particular namespace.
14pub struct ExtractBooks {
15    /// Specify output file
16    #[arg(short = 'o', long = "output")]
17    output: PathBuf,
18
19    /// Output numspaced book codes instead of original IDs.
20    #[arg(short = 'C', long = "numspaced-book-codes")]
21    book_codes: bool,
22
23    /// Specify the name of the book code field.
24    #[arg(
25        short = 'n',
26        long = "name",
27        name = "FIELD",
28        default_value = "book_code"
29    )]
30    field_name: String,
31
32    /// Specify an additional file to join into the results.
33    #[arg(long = "join-file", name = "LINKFILE")]
34    join_file: Option<PathBuf>,
35
36    /// Speficy a field to read from the join file.
37    #[arg(long = "join-field", name = "LINKFIELD")]
38    join_field: Option<String>,
39
40    /// Extract book codes in namespace NS.
41    #[arg(name = "NS")]
42    namespace: String,
43}
44
45impl Command for ExtractBooks {
46    fn exec(&self) -> Result<()> {
47        require_working_root()?;
48        let ns = NS::by_name(&self.namespace).ok_or(anyhow!("invalid namespace"))?;
49        let data = LazyFrame::scan_parquet(GRAPH_NODE_FILE, default())?;
50
51        let bc_col = if self.book_codes {
52            info!(
53                "writing numspaced book codes in column {}",
54                &self.field_name
55            );
56            col("book_code").alias(&self.field_name)
57        } else {
58            info!("writing source book IDs in column {}", &self.field_name);
59            (col("book_code") - lit(ns.base())).alias(&self.field_name)
60        };
61
62        let filtered = data
63            .filter((col("book_code") / lit(NS_MULT_BASE)).eq(lit(ns.code())))
64            .select(&[bc_col, col("cluster")]);
65
66        let results = if let Some(jf) = &self.join_file {
67            let join = LazyFrame::scan_parquet(jf, default())?;
68            let join = filtered.join(
69                join,
70                &[col(&self.field_name)],
71                &[col(&self.field_name)],
72                JoinType::Left.into(),
73            );
74            if let Some(fld) = &self.join_field {
75                join.select(&[col(&self.field_name), col(fld), col("cluster")])
76            } else {
77                join
78            }
79        } else {
80            filtered
81        };
82
83        info!("collecting results");
84        let mut frame = results.collect()?;
85        info!("got {} book links", frame.height());
86        frame.as_single_chunk_par();
87        save_df_parquet_nonnull(frame, &self.output)?;
88
89        Ok(())
90    }
91}