bookdata/interactions/
actions.rs

1use std::collections::HashMap;
2use std::marker::PhantomData;
3use std::mem::take;
4use std::path::Path;
5
6use anyhow::Result;
7use log::*;
8use parquet::record::RecordWriter;
9use parquet_derive::ParquetRecordWriter;
10
11use super::{Dedup, Interaction, Key};
12use crate::arrow::*;
13use crate::io::{file_size, ObjectWriter};
14use crate::util::logging::item_progress;
15use crate::util::Timer;
16
17/// Record for a single output action.
18#[derive(ParquetRecordWriter, Debug)]
19pub struct TimestampActionRecord {
20    pub user: i32,
21    pub item: i32,
22    pub first_time: i64,
23    pub last_time: i64,
24    pub last_rating: Option<f32>,
25    pub nactions: i32,
26}
27
28/// Record for a single output action without time.
29#[derive(ParquetRecordWriter, Debug)]
30pub struct TimelessActionRecord {
31    pub user: i32,
32    pub item: i32,
33    pub nactions: i32,
34}
35
36#[derive(PartialEq, Clone, Debug)]
37pub struct ActionInstance {
38    timestamp: i64,
39    rating: Option<f32>,
40}
41
42/// Collapse a sequence of actions into an action record.
43pub trait FromActionSet {
44    fn create(user: i32, item: i32, actions: Vec<ActionInstance>) -> Self;
45}
46
47impl FromActionSet for TimestampActionRecord {
48    fn create(user: i32, item: i32, actions: Vec<ActionInstance>) -> Self {
49        let mut vec = actions;
50        if vec.len() == 1 {
51            // fast path
52            let act = &vec[0];
53            TimestampActionRecord {
54                user,
55                item,
56                first_time: act.timestamp,
57                last_time: act.timestamp,
58                last_rating: act.rating,
59                nactions: 1,
60            }
61        } else {
62            vec.sort_unstable_by_key(|a| a.timestamp);
63            let first = &vec[0];
64            let last = &vec[vec.len() - 1];
65            let rates = vec.iter().flat_map(|a| a.rating).collect::<Vec<f32>>();
66            let last_rating = if rates.len() > 0 {
67                Some(rates[rates.len() - 1])
68            } else {
69                None
70            };
71
72            TimestampActionRecord {
73                user,
74                item,
75                first_time: first.timestamp,
76                last_time: last.timestamp,
77                last_rating,
78                nactions: vec.len() as i32,
79            }
80        }
81    }
82}
83
84impl FromActionSet for TimelessActionRecord {
85    fn create(user: i32, item: i32, actions: Vec<ActionInstance>) -> Self {
86        TimelessActionRecord {
87            user,
88            item,
89            nactions: actions.len() as i32,
90        }
91    }
92}
93
94/// Action deduplicator.
95pub struct ActionDedup<R>
96where
97    R: FromActionSet,
98    for<'a> &'a [R]: RecordWriter<R>,
99{
100    _phantom: PhantomData<R>,
101    table: HashMap<Key, Vec<ActionInstance>>,
102}
103
104impl<R> Default for ActionDedup<R>
105where
106    R: FromActionSet + 'static,
107    for<'a> &'a [R]: RecordWriter<R>,
108{
109    fn default() -> ActionDedup<R> {
110        ActionDedup {
111            _phantom: PhantomData,
112            table: HashMap::new(),
113        }
114    }
115}
116
117impl<I: Interaction, R> Dedup<I> for ActionDedup<R>
118where
119    R: FromActionSet + Send + Sync + 'static,
120    for<'a> &'a [R]: RecordWriter<R>,
121{
122    fn add_interaction(&mut self, act: I) -> Result<()> {
123        self.record(
124            act.get_user(),
125            act.get_item(),
126            act.get_timestamp(),
127            act.get_rating(),
128        );
129        Ok(())
130    }
131
132    fn save(&mut self, path: &Path) -> Result<usize> {
133        self.write_actions(path)
134    }
135}
136
137impl<R> ActionDedup<R>
138where
139    R: FromActionSet + Send + Sync + 'static,
140    for<'a> &'a [R]: RecordWriter<R>,
141{
142    /// Add an action to the deduplicator.
143    pub fn record(&mut self, user: i32, item: i32, timestamp: i64, rating: Option<f32>) {
144        let k = Key::new(user, item);
145        // get the vector for this user/item pair
146        let vec = self.table.entry(k).or_insert_with(|| Vec::with_capacity(1));
147        // and insert our records!
148        vec.push(ActionInstance { timestamp, rating });
149    }
150
151    /// Save the rating table disk.
152    pub fn write_actions<P: AsRef<Path>>(&mut self, path: P) -> Result<usize> {
153        let path = path.as_ref();
154        info!(
155            "writing {} deduplicated actions to {}",
156            friendly::scalar(self.table.len()),
157            path.display()
158        );
159        let mut writer = TableWriter::open(path)?;
160        let timer = Timer::new();
161        let n = self.table.len() as u64;
162        let pb = item_progress(n, "writing actions");
163
164        // we're going to consume the hashtable.
165        let table = take(&mut self.table);
166        for (k, vec) in pb.wrap_iter(table.into_iter()) {
167            let record = R::create(k.user, k.item, vec);
168            writer.write_object(record)?;
169        }
170
171        let rv = writer.finish()?;
172
173        info!(
174            "wrote {} actions in {}, file is {}",
175            friendly::scalar(n),
176            timer.human_elapsed(),
177            friendly::bytes(file_size(path)?)
178        );
179
180        Ok(rv)
181    }
182}