bookdata/cli/amazon/
cluster_ratings.rs

1//! Cluster Amazon ratings.
2use crate::prelude::*;
3use polars::prelude::*;
4
5/// Group Amazon ratings into clusters.
6#[derive(Args, Debug)]
7#[command(name = "cluster-ratings")]
8pub struct ClusterRatings {
9    /// Rating output file
10    #[arg(short = 'o', long = "output", name = "FILE")]
11    ratings_out: PathBuf,
12
13    /// Input file to cluster
14    #[arg(name = "INPUT")]
15    infile: PathBuf,
16}
17
18impl Command for ClusterRatings {
19    fn exec(&self) -> Result<()> {
20        let isbns = LazyFrame::scan_parquet("book-links/isbn-clusters.parquet", default())?;
21        let isbns = isbns.select(&[col("isbn"), col("cluster")]);
22
23        let ratings = LazyFrame::scan_parquet(&self.infile, default())?;
24
25        let joined = ratings.join(
26            isbns,
27            &[col("asin")],
28            &[col("isbn")],
29            JoinType::Inner.into(),
30        );
31        let joined = joined
32            .select(&[
33                col("user_id"),
34                col("cluster").alias("item_id"),
35                col("rating"),
36                col("timestamp"),
37            ])
38            .sort("timestamp", default());
39
40        let actions = joined.group_by(&[col("user_id"), col("item_id")]).agg(&[
41            col("rating").median().alias("rating"),
42            col("rating").last().alias("last_rating"),
43            col("timestamp").min().alias("first_time"),
44            col("timestamp").max().alias("last_time"),
45            col("item_id").count().alias("nratings"),
46        ]);
47
48        info!("collecting results");
49        let actions = actions.collect()?;
50
51        info!("saving {} records", actions.height());
52        save_df_parquet(actions, &self.ratings_out)?;
53
54        Ok(())
55    }
56}