bookdata/cli/cluster/
books.rs1use std::path::PathBuf;
3
4use crate::arrow::writer::save_df_parquet_nonnull;
5use crate::ids::codes::*;
6use crate::prelude::*;
7use polars::prelude::*;
8
9static GRAPH_NODE_FILE: &str = "book-links/cluster-graph-nodes.parquet";
10
11#[derive(Args, Debug)]
12#[command(name = "extract-books")]
13pub struct ExtractBooks {
15 #[arg(short = 'o', long = "output")]
17 output: PathBuf,
18
19 #[arg(short = 'C', long = "numspaced-book-codes")]
21 book_codes: bool,
22
23 #[arg(
25 short = 'n',
26 long = "name",
27 name = "FIELD",
28 default_value = "book_code"
29 )]
30 field_name: String,
31
32 #[arg(long = "join-file", name = "LINKFILE")]
34 join_file: Option<PathBuf>,
35
36 #[arg(long = "join-field", name = "LINKFIELD")]
38 join_field: Option<String>,
39
40 #[arg(name = "NS")]
42 namespace: String,
43}
44
45impl Command for ExtractBooks {
46 fn exec(&self) -> Result<()> {
47 require_working_root()?;
48 let ns = NS::by_name(&self.namespace).ok_or(anyhow!("invalid namespace"))?;
49 let data = LazyFrame::scan_parquet(GRAPH_NODE_FILE, default())?;
50
51 let bc_col = if self.book_codes {
52 info!(
53 "writing numspaced book codes in column {}",
54 &self.field_name
55 );
56 col("book_code").alias(&self.field_name)
57 } else {
58 info!("writing source book IDs in column {}", &self.field_name);
59 (col("book_code") - lit(ns.base())).alias(&self.field_name)
60 };
61
62 let filtered = data
63 .filter((col("book_code") / lit(NS_MULT_BASE)).eq(lit(ns.code())))
64 .select(&[bc_col, col("cluster")]);
65
66 let results = if let Some(jf) = &self.join_file {
67 let join = LazyFrame::scan_parquet(jf, default())?;
68 let join = filtered.join(
69 join,
70 &[col(&self.field_name)],
71 &[col(&self.field_name)],
72 JoinType::Left.into(),
73 );
74 if let Some(fld) = &self.join_field {
75 join.select(&[col(&self.field_name), col(fld), col("cluster")])
76 } else {
77 join
78 }
79 } else {
80 filtered
81 };
82
83 info!("collecting results");
84 let mut frame = results.collect()?;
85 info!("got {} book links", frame.height());
86 frame.as_single_chunk_par();
87 save_df_parquet_nonnull(frame, &self.output)?;
88
89 Ok(())
90 }
91}