1use clap::Args;
3use std::fs::File;
4
5use crate::prelude::*;
6use polars::prelude::*;
7
8static GENDER_FILE: &str = "book-links/cluster-genders.parquet";
9static ISBN_CLUSTER_FILE: &str = "book-links/isbn-clusters.parquet";
10static LOC_BOOK_FILE: &str = "loc-mds/book-isbn-ids.parquet";
11static STAT_FILE: &str = "book-links/gender-stats.csv";
12
13static ACTION_FILES: &[(&str, &str)] = &[
14 ("BX-I", "bx/bx-cluster-actions.parquet"),
15 ("BX-E", "bx/bx-cluster-ratings.parquet"),
16 ("AZ14", "az2014/az-cluster-ratings.parquet"),
17 ("AZ18", "az2018/az-cluster-ratings.parquet"),
18 ("GR-I", "goodreads/gr-cluster-actions.parquet"),
19 ("GR-E", "goodreads/gr-cluster-ratings.parquet"),
20];
21
22#[derive(Debug, Args)]
24#[command(name = "integration-stats")]
25pub struct IntegrationStats {}
26
27impl Command for IntegrationStats {
28 fn exec(&self) -> Result<()> {
29 require_working_root()?;
30 let cfg = load_config()?;
31
32 let genders = scan_genders()?;
33
34 let loc_frame = scan_loc(genders.clone())?;
35 let mut agg_frames = Vec::with_capacity(ACTION_FILES.len());
36
37 for (name, file) in ACTION_FILES {
38 if cfg.ds_enabled(name) {
39 agg_frames.push(scan_actions(&file, genders.clone(), *name)?);
40 }
41 }
42
43 info!("combining results");
44 let mut results = agg_frames.into_iter().fold(Ok(loc_frame), |dfr, df2| {
45 dfr.and_then(|df1| df1.vstack(&df2))
46 })?;
47
48 info!("saving {} records to {}", results.height(), STAT_FILE);
49 let writer = File::create(STAT_FILE)?;
50 let mut writer = CsvWriter::new(writer).include_header(true);
51 writer.finish(&mut results)?;
52
53 Ok(())
54 }
55}
56
57fn scan_genders() -> Result<LazyFrame> {
58 let df = LazyFrame::scan_parquet(GENDER_FILE, default())?;
59 Ok(df)
60}
61
62fn scan_loc(genders: LazyFrame) -> Result<DataFrame> {
63 info!("scanning LOC books");
64
65 let books = LazyFrame::scan_parquet(LOC_BOOK_FILE, default())?;
66 let clusters = LazyFrame::scan_parquet(ISBN_CLUSTER_FILE, default())?;
67 let books = books.inner_join(clusters, col("isbn_id"), col("isbn_id"));
68
69 let bg = books.inner_join(genders, col("cluster"), col("cluster"));
70 let bg = bg
71 .group_by([col("gender")])
72 .agg([col("cluster").n_unique().alias("n_books")])
73 .select([
74 lit("LOC-MDS").alias("dataset"),
75 col("gender"),
76 col("n_books"),
77 lit(NULL).cast(DataType::UInt32).alias("n_actions"),
78 ]);
79
80 let df = bg.collect()?;
81 Ok(df)
82}
83
84fn scan_actions(file: &str, genders: LazyFrame, name: &str) -> Result<DataFrame> {
85 info!("scanning data {} from {}", name, file);
86 let df = LazyFrame::scan_parquet(file, default())?;
87
88 let df = df.join(
89 genders,
90 &[col("item_id")],
91 &[col("cluster")],
92 JoinType::Inner.into(),
93 );
94 let df = df
95 .group_by([col("gender")])
96 .agg(&[
97 col("item_id").n_unique().alias("n_books"),
98 col("item_id").count().alias("n_actions"),
99 ])
100 .select([
101 lit(name).alias("dataset"),
102 col("gender"),
103 col("n_books"),
104 col("n_actions"),
105 ]);
106 debug!("{} schema: {:?}", name, df.schema());
107
108 let df = df.collect()?;
109 Ok(df)
110}