bookdata/cli/goodreads/
cluster.rs1use std::path::{Path, PathBuf};
2
3use clap::Args;
4
5use crate::arrow::*;
6use crate::ids::codes::{NS_GR_BOOK, NS_GR_WORK};
7use crate::prelude::*;
8
9use polars::prelude::*;
10
11#[derive(Args, Debug)]
12pub struct CICommand {
13 #[arg(long = "ratings")]
15 ratings: bool,
16
17 #[arg(long = "add-actions")]
19 add_actions: bool,
20
21 #[arg(long = "native-works")]
23 native_works: bool,
24
25 #[arg(short = 'o', long = "output", name = "FILE")]
27 output: PathBuf,
28}
29
30#[derive(Debug, PartialEq, Eq)]
31enum ActionType {
32 Ratings,
33 AddActions,
34}
35
36#[derive(Debug, PartialEq, Eq)]
37enum AggType {
38 Clusters,
39 NativeWorks,
40}
41
42#[derive(Debug)]
43pub struct ClusterOp {
44 actions: ActionType,
45 clusters: AggType,
46 output: PathBuf,
47}
48
49impl CICommand {
50 pub fn exec(&self) -> Result<()> {
51 let mut op = if self.add_actions {
52 ClusterOp::add_actions(&self.output)
53 } else if self.ratings {
54 ClusterOp::ratings(&self.output)
55 } else {
56 error!("must specify one of --add-actions, --ratings, or --reviews");
57 return Err(anyhow!("no operating mode specified"));
58 };
59 if self.native_works {
60 op = op.native_works();
61 }
62
63 op.cluster()
64 }
65}
66
67impl ClusterOp {
68 pub fn add_actions<P: AsRef<Path>>(path: P) -> ClusterOp {
70 ClusterOp {
71 actions: ActionType::AddActions,
72 clusters: AggType::Clusters,
73 output: path.as_ref().to_path_buf(),
74 }
75 }
76
77 pub fn ratings<P: AsRef<Path>>(path: P) -> ClusterOp {
79 ClusterOp {
80 actions: ActionType::Ratings,
81 clusters: AggType::Clusters,
82 output: path.as_ref().to_path_buf(),
83 }
84 }
85
86 pub fn native_works(self) -> ClusterOp {
88 ClusterOp {
89 clusters: AggType::NativeWorks,
90 ..self
91 }
92 }
93
94 pub fn cluster(self) -> Result<()> {
96 let interactions = self.load_interactions()?;
97 let interactions = self.filter(interactions);
98 let interactions = self.project_and_sort(interactions);
99 let actions = interactions
100 .clone()
101 .group_by(&[col("user_id"), col("item_id")])
102 .agg(self.aggregates());
103
104 let actions = self.maybe_integrate_ratings(actions, &interactions);
105 let actions = actions.sort("first_time", SortOptions::default());
106
107 debug!("logical plan: {:?}", actions.describe_plan());
108 debug!("optimized plan: {:?}", actions.describe_optimized_plan()?);
109 info!("collecting results");
110 let actions = actions.collect()?;
111
112 info!("writing {} actions to {:?}", actions.height(), &self.output);
113 save_df_parquet(actions, &self.output)?;
114
115 Ok(())
116 }
117
118 fn load_interactions(&self) -> Result<LazyFrame> {
120 let path = "goodreads/gr-interactions.parquet";
121 let data = LazyFrame::scan_parquet(path, Default::default())?;
122
123 let links = LazyFrame::scan_parquet("goodreads/gr-book-link.parquet", Default::default())?;
124
125 let data = data.join(
126 links,
127 &[col("book_id")],
128 &[col("book_id")],
129 JoinType::Inner.into(),
130 );
131 Ok(data)
132 }
133
134 fn filter(&self, frame: LazyFrame) -> LazyFrame {
136 match self.actions {
137 ActionType::Ratings => frame.filter(col("rating").is_not_null()),
138 _ => frame,
139 }
140 }
141
142 fn id_col(&self) -> Expr {
144 match self.clusters {
145 AggType::Clusters => {
146 info!("grouping by integrated clusters");
147 col("cluster")
148 }
149 AggType::NativeWorks => {
150 info!("grouping by native works");
151 when(col("work_id").is_not_null())
152 .then(col("work_id") + lit(NS_GR_WORK.base()))
153 .otherwise(col("book_id") + lit(NS_GR_BOOK.base()))
154 }
155 }
156 }
157
158 fn project_and_sort(&self, frame: LazyFrame) -> LazyFrame {
160 frame.select(&[
161 col("user_id"),
162 self.id_col().alias("item_id"),
163 (col("updated").cast(DataType::Int64)).alias("timestamp"),
164 col("rating"),
165 ])
166 }
167
168 fn aggregates(&self) -> Vec<Expr> {
170 match &self.actions {
171 ActionType::Ratings => {
172 vec![
173 col("rating").median().alias("rating"),
174 col("rating").last().alias("last_rating"),
175 col("timestamp").min().alias("first_time"),
176 col("timestamp").max().alias("last_time"),
177 col("item_id").count().alias("nratings"),
178 ]
179 }
180 ActionType::AddActions => {
181 vec![
182 col("timestamp").min().alias("first_time"),
183 col("timestamp").max().alias("last_time"),
184 col("item_id").count().alias("nactions"),
185 ]
186 }
187 }
188 }
189
190 fn maybe_integrate_ratings(&self, actions: LazyFrame, source: &LazyFrame) -> LazyFrame {
191 match &self.actions {
192 ActionType::AddActions => {
193 let ratings = source.clone().filter(col("rating").is_not_null());
194 let ratings = ratings
195 .group_by(["user_id", "item_id"])
196 .agg(&[col("rating").last().alias("last_rating")]);
197 actions.join(
198 ratings,
199 &[col("user_id"), col("item_id")],
200 &[col("user_id"), col("item_id")],
201 JoinType::Left.into(),
202 )
203 }
204 _ => actions,
205 }
206 }
207}