bookdata/cli/amazon/
cluster_ratings.rs1use crate::prelude::*;
3use polars::prelude::*;
4
5#[derive(Args, Debug)]
7#[command(name = "cluster-ratings")]
8pub struct ClusterRatings {
9 #[arg(short = 'o', long = "output", name = "FILE")]
11 ratings_out: PathBuf,
12
13 #[arg(name = "INPUT")]
15 infile: PathBuf,
16}
17
18impl Command for ClusterRatings {
19 fn exec(&self) -> Result<()> {
20 let isbns = LazyFrame::scan_parquet("book-links/isbn-clusters.parquet", default())?;
21 let isbns = isbns.select(&[col("isbn"), col("cluster")]);
22
23 let ratings = LazyFrame::scan_parquet(&self.infile, default())?;
24
25 let joined = ratings.join(
26 isbns,
27 &[col("asin")],
28 &[col("isbn")],
29 JoinType::Inner.into(),
30 );
31 let joined = joined
32 .select(&[
33 col("user_id"),
34 col("cluster").alias("item_id"),
35 col("rating"),
36 col("timestamp"),
37 ])
38 .sort("timestamp", default());
39
40 let actions = joined.group_by(&[col("user_id"), col("item_id")]).agg(&[
41 col("rating").median().alias("rating"),
42 col("rating").last().alias("last_rating"),
43 col("timestamp").min().alias("first_time"),
44 col("timestamp").max().alias("last_time"),
45 col("item_id").count().alias("nratings"),
46 ]);
47
48 info!("collecting results");
49 let actions = actions.collect()?;
50
51 info!("saving {} records", actions.height());
52 save_df_parquet(actions, &self.ratings_out)?;
53
54 Ok(())
55 }
56}