bookdata/cli/
stats.rs

1//! Compute integration statistics.
2use clap::Args;
3use std::fs::File;
4
5use crate::prelude::*;
6use polars::prelude::*;
7
8static GENDER_FILE: &str = "book-links/cluster-genders.parquet";
9static ISBN_CLUSTER_FILE: &str = "book-links/isbn-clusters.parquet";
10static LOC_BOOK_FILE: &str = "loc-mds/book-isbn-ids.parquet";
11static STAT_FILE: &str = "book-links/gender-stats.csv";
12
13static ACTION_FILES: &[(&str, &str)] = &[
14    ("BX-I", "bx/bx-cluster-actions.parquet"),
15    ("BX-E", "bx/bx-cluster-ratings.parquet"),
16    ("AZ14", "az2014/az-cluster-ratings.parquet"),
17    ("AZ18", "az2018/az-cluster-ratings.parquet"),
18    ("GR-I", "goodreads/gr-cluster-actions.parquet"),
19    ("GR-E", "goodreads/gr-cluster-ratings.parquet"),
20];
21
22/// Compute integration statistics.
23#[derive(Debug, Args)]
24#[command(name = "integration-stats")]
25pub struct IntegrationStats {}
26
27impl Command for IntegrationStats {
28    fn exec(&self) -> Result<()> {
29        require_working_root()?;
30        let cfg = load_config()?;
31
32        let genders = scan_genders()?;
33
34        let loc_frame = scan_loc(genders.clone())?;
35        let mut agg_frames = Vec::with_capacity(ACTION_FILES.len());
36
37        for (name, file) in ACTION_FILES {
38            if cfg.ds_enabled(name) {
39                agg_frames.push(scan_actions(&file, genders.clone(), *name)?);
40            }
41        }
42
43        info!("combining results");
44        let mut results = agg_frames.into_iter().fold(Ok(loc_frame), |dfr, df2| {
45            dfr.and_then(|df1| df1.vstack(&df2))
46        })?;
47
48        info!("saving {} records to {}", results.height(), STAT_FILE);
49        let writer = File::create(STAT_FILE)?;
50        let mut writer = CsvWriter::new(writer).include_header(true);
51        writer.finish(&mut results)?;
52
53        Ok(())
54    }
55}
56
57fn scan_genders() -> Result<LazyFrame> {
58    let df = LazyFrame::scan_parquet(GENDER_FILE, default())?;
59    Ok(df)
60}
61
62fn scan_loc(genders: LazyFrame) -> Result<DataFrame> {
63    info!("scanning LOC books");
64
65    let books = LazyFrame::scan_parquet(LOC_BOOK_FILE, default())?;
66    let clusters = LazyFrame::scan_parquet(ISBN_CLUSTER_FILE, default())?;
67    let books = books.inner_join(clusters, col("isbn_id"), col("isbn_id"));
68
69    let bg = books.inner_join(genders, col("cluster"), col("cluster"));
70    let bg = bg
71        .group_by([col("gender")])
72        .agg([col("cluster").n_unique().alias("n_books")])
73        .select([
74            lit("LOC-MDS").alias("dataset"),
75            col("gender"),
76            col("n_books"),
77            lit(NULL).cast(DataType::UInt32).alias("n_actions"),
78        ]);
79
80    let df = bg.collect()?;
81    Ok(df)
82}
83
84fn scan_actions(file: &str, genders: LazyFrame, name: &str) -> Result<DataFrame> {
85    info!("scanning data {} from {}", name, file);
86    let df = LazyFrame::scan_parquet(file, default())?;
87
88    let df = df.join(
89        genders,
90        &[col("item_id")],
91        &[col("cluster")],
92        JoinType::Inner.into(),
93    );
94    let df = df
95        .group_by([col("gender")])
96        .agg(&[
97            col("item_id").n_unique().alias("n_books"),
98            col("item_id").count().alias("n_actions"),
99        ])
100        .select([
101            lit(name).alias("dataset"),
102            col("gender"),
103            col("n_books"),
104            col("n_actions"),
105        ]);
106    debug!("{} schema: {:?}", name, df.schema());
107
108    let df = df.collect()?;
109    Ok(df)
110}