bookdata/cli/goodreads/
cluster.rs

1use std::path::{Path, PathBuf};
2
3use clap::Args;
4
5use crate::arrow::*;
6use crate::ids::codes::{NS_GR_BOOK, NS_GR_WORK};
7use crate::prelude::*;
8
9use polars::prelude::*;
10
11#[derive(Args, Debug)]
12pub struct CICommand {
13    /// Cluster ratings.
14    #[arg(long = "ratings")]
15    ratings: bool,
16
17    /// Cluster add-to-shelf actions.
18    #[arg(long = "add-actions")]
19    add_actions: bool,
20
21    /// Cluster using native GoodReads works instead of book clusters.
22    #[arg(long = "native-works")]
23    native_works: bool,
24
25    /// Write output to FILE.
26    #[arg(short = 'o', long = "output", name = "FILE")]
27    output: PathBuf,
28}
29
30#[derive(Debug, PartialEq, Eq)]
31enum ActionType {
32    Ratings,
33    AddActions,
34}
35
36#[derive(Debug, PartialEq, Eq)]
37enum AggType {
38    Clusters,
39    NativeWorks,
40}
41
42#[derive(Debug)]
43pub struct ClusterOp {
44    actions: ActionType,
45    clusters: AggType,
46    output: PathBuf,
47}
48
49impl CICommand {
50    pub fn exec(&self) -> Result<()> {
51        let mut op = if self.add_actions {
52            ClusterOp::add_actions(&self.output)
53        } else if self.ratings {
54            ClusterOp::ratings(&self.output)
55        } else {
56            error!("must specify one of --add-actions, --ratings, or --reviews");
57            return Err(anyhow!("no operating mode specified"));
58        };
59        if self.native_works {
60            op = op.native_works();
61        }
62
63        op.cluster()
64    }
65}
66
67impl ClusterOp {
68    /// Start a new action-clustering operation.
69    pub fn add_actions<P: AsRef<Path>>(path: P) -> ClusterOp {
70        ClusterOp {
71            actions: ActionType::AddActions,
72            clusters: AggType::Clusters,
73            output: path.as_ref().to_path_buf(),
74        }
75    }
76
77    /// Start a new rating-clustering operation.
78    pub fn ratings<P: AsRef<Path>>(path: P) -> ClusterOp {
79        ClusterOp {
80            actions: ActionType::Ratings,
81            clusters: AggType::Clusters,
82            output: path.as_ref().to_path_buf(),
83        }
84    }
85
86    /// Set operation to cluster with native works instead of clusters.
87    pub fn native_works(self) -> ClusterOp {
88        ClusterOp {
89            clusters: AggType::NativeWorks,
90            ..self
91        }
92    }
93
94    /// Run the clustering operation.
95    pub fn cluster(self) -> Result<()> {
96        let interactions = self.load_interactions()?;
97        let interactions = self.filter(interactions);
98        let interactions = self.project_and_sort(interactions);
99        let actions = interactions
100            .clone()
101            .group_by(&[col("user_id"), col("item_id")])
102            .agg(self.aggregates());
103
104        let actions = self.maybe_integrate_ratings(actions, &interactions);
105        let actions = actions.sort("first_time", SortOptions::default());
106
107        debug!("logical plan: {:?}", actions.describe_plan());
108        debug!("optimized plan: {:?}", actions.describe_optimized_plan()?);
109        info!("collecting results");
110        let actions = actions.collect()?;
111
112        info!("writing {} actions to {:?}", actions.height(), &self.output);
113        save_df_parquet(actions, &self.output)?;
114
115        Ok(())
116    }
117
118    /// Load the interaction file.
119    fn load_interactions(&self) -> Result<LazyFrame> {
120        let path = "goodreads/gr-interactions.parquet";
121        let data = LazyFrame::scan_parquet(path, Default::default())?;
122
123        let links = LazyFrame::scan_parquet("goodreads/gr-book-link.parquet", Default::default())?;
124
125        let data = data.join(
126            links,
127            &[col("book_id")],
128            &[col("book_id")],
129            JoinType::Inner.into(),
130        );
131        Ok(data)
132    }
133
134    /// Filter the data frame to only the actions we want
135    fn filter(&self, frame: LazyFrame) -> LazyFrame {
136        match self.actions {
137            ActionType::Ratings => frame.filter(col("rating").is_not_null()),
138            _ => frame,
139        }
140    }
141
142    /// Create an identity column reference.
143    fn id_col(&self) -> Expr {
144        match self.clusters {
145            AggType::Clusters => {
146                info!("grouping by integrated clusters");
147                col("cluster")
148            }
149            AggType::NativeWorks => {
150                info!("grouping by native works");
151                when(col("work_id").is_not_null())
152                    .then(col("work_id") + lit(NS_GR_WORK.base()))
153                    .otherwise(col("book_id") + lit(NS_GR_BOOK.base()))
154            }
155        }
156    }
157
158    /// Project and sort (if possible) the data.
159    fn project_and_sort(&self, frame: LazyFrame) -> LazyFrame {
160        frame.select(&[
161            col("user_id"),
162            self.id_col().alias("item_id"),
163            (col("updated").cast(DataType::Int64)).alias("timestamp"),
164            col("rating"),
165        ])
166    }
167
168    /// Aggreate the interactions.
169    fn aggregates(&self) -> Vec<Expr> {
170        match &self.actions {
171            ActionType::Ratings => {
172                vec![
173                    col("rating").median().alias("rating"),
174                    col("rating").last().alias("last_rating"),
175                    col("timestamp").min().alias("first_time"),
176                    col("timestamp").max().alias("last_time"),
177                    col("item_id").count().alias("nratings"),
178                ]
179            }
180            ActionType::AddActions => {
181                vec![
182                    col("timestamp").min().alias("first_time"),
183                    col("timestamp").max().alias("last_time"),
184                    col("item_id").count().alias("nactions"),
185                ]
186            }
187        }
188    }
189
190    fn maybe_integrate_ratings(&self, actions: LazyFrame, source: &LazyFrame) -> LazyFrame {
191        match &self.actions {
192            ActionType::AddActions => {
193                let ratings = source.clone().filter(col("rating").is_not_null());
194                let ratings = ratings
195                    .group_by(["user_id", "item_id"])
196                    .agg(&[col("rating").last().alias("last_rating")]);
197                actions.join(
198                    ratings,
199                    &[col("user_id"), col("item_id")],
200                    &[col("user_id"), col("item_id")],
201                    JoinType::Left.into(),
202                )
203            }
204            _ => actions,
205        }
206    }
207}