bookdata/interactions/
actions.rs1use std::collections::HashMap;
2use std::marker::PhantomData;
3use std::mem::take;
4use std::path::Path;
5
6use anyhow::Result;
7use log::*;
8use parquet::record::RecordWriter;
9use parquet_derive::ParquetRecordWriter;
10
11use super::{Dedup, Interaction, Key};
12use crate::arrow::*;
13use crate::io::{file_size, ObjectWriter};
14use crate::util::logging::item_progress;
15use crate::util::Timer;
16
17#[derive(ParquetRecordWriter, Debug)]
19pub struct TimestampActionRecord {
20 pub user: i32,
21 pub item: i32,
22 pub first_time: i64,
23 pub last_time: i64,
24 pub last_rating: Option<f32>,
25 pub nactions: i32,
26}
27
28#[derive(ParquetRecordWriter, Debug)]
30pub struct TimelessActionRecord {
31 pub user: i32,
32 pub item: i32,
33 pub nactions: i32,
34}
35
36#[derive(PartialEq, Clone, Debug)]
37pub struct ActionInstance {
38 timestamp: i64,
39 rating: Option<f32>,
40}
41
42pub trait FromActionSet {
44 fn create(user: i32, item: i32, actions: Vec<ActionInstance>) -> Self;
45}
46
47impl FromActionSet for TimestampActionRecord {
48 fn create(user: i32, item: i32, actions: Vec<ActionInstance>) -> Self {
49 let mut vec = actions;
50 if vec.len() == 1 {
51 let act = &vec[0];
53 TimestampActionRecord {
54 user,
55 item,
56 first_time: act.timestamp,
57 last_time: act.timestamp,
58 last_rating: act.rating,
59 nactions: 1,
60 }
61 } else {
62 vec.sort_unstable_by_key(|a| a.timestamp);
63 let first = &vec[0];
64 let last = &vec[vec.len() - 1];
65 let rates = vec.iter().flat_map(|a| a.rating).collect::<Vec<f32>>();
66 let last_rating = if rates.len() > 0 {
67 Some(rates[rates.len() - 1])
68 } else {
69 None
70 };
71
72 TimestampActionRecord {
73 user,
74 item,
75 first_time: first.timestamp,
76 last_time: last.timestamp,
77 last_rating,
78 nactions: vec.len() as i32,
79 }
80 }
81 }
82}
83
84impl FromActionSet for TimelessActionRecord {
85 fn create(user: i32, item: i32, actions: Vec<ActionInstance>) -> Self {
86 TimelessActionRecord {
87 user,
88 item,
89 nactions: actions.len() as i32,
90 }
91 }
92}
93
94pub struct ActionDedup<R>
96where
97 R: FromActionSet,
98 for<'a> &'a [R]: RecordWriter<R>,
99{
100 _phantom: PhantomData<R>,
101 table: HashMap<Key, Vec<ActionInstance>>,
102}
103
104impl<R> Default for ActionDedup<R>
105where
106 R: FromActionSet + 'static,
107 for<'a> &'a [R]: RecordWriter<R>,
108{
109 fn default() -> ActionDedup<R> {
110 ActionDedup {
111 _phantom: PhantomData,
112 table: HashMap::new(),
113 }
114 }
115}
116
117impl<I: Interaction, R> Dedup<I> for ActionDedup<R>
118where
119 R: FromActionSet + Send + Sync + 'static,
120 for<'a> &'a [R]: RecordWriter<R>,
121{
122 fn add_interaction(&mut self, act: I) -> Result<()> {
123 self.record(
124 act.get_user(),
125 act.get_item(),
126 act.get_timestamp(),
127 act.get_rating(),
128 );
129 Ok(())
130 }
131
132 fn save(&mut self, path: &Path) -> Result<usize> {
133 self.write_actions(path)
134 }
135}
136
137impl<R> ActionDedup<R>
138where
139 R: FromActionSet + Send + Sync + 'static,
140 for<'a> &'a [R]: RecordWriter<R>,
141{
142 pub fn record(&mut self, user: i32, item: i32, timestamp: i64, rating: Option<f32>) {
144 let k = Key::new(user, item);
145 let vec = self.table.entry(k).or_insert_with(|| Vec::with_capacity(1));
147 vec.push(ActionInstance { timestamp, rating });
149 }
150
151 pub fn write_actions<P: AsRef<Path>>(&mut self, path: P) -> Result<usize> {
153 let path = path.as_ref();
154 info!(
155 "writing {} deduplicated actions to {}",
156 friendly::scalar(self.table.len()),
157 path.display()
158 );
159 let mut writer = TableWriter::open(path)?;
160 let timer = Timer::new();
161 let n = self.table.len() as u64;
162 let pb = item_progress(n, "writing actions");
163
164 let table = take(&mut self.table);
166 for (k, vec) in pb.wrap_iter(table.into_iter()) {
167 let record = R::create(k.user, k.item, vec);
168 writer.write_object(record)?;
169 }
170
171 let rv = writer.finish()?;
172
173 info!(
174 "wrote {} actions in {}, file is {}",
175 friendly::scalar(n),
176 timer.human_elapsed(),
177 friendly::bytes(file_size(path)?)
178 );
179
180 Ok(rv)
181 }
182}