bookdata/goodreads/
interaction.rs1use chrono::NaiveDateTime;
3use hashbrown::HashSet;
4use parquet_derive::ParquetRecordWriter;
5use serde::Deserialize;
6
7use crate::arrow::*;
8use crate::goodreads::users::save_user_index;
9use crate::ids::index::IdIndex;
10use crate::parsing::dates::*;
11use crate::parsing::*;
12use crate::prelude::*;
13
14pub const OUT_FILE: BDPath<'static> = BDPath::new("goodreads/gr-interactions.parquet");
15
16#[derive(Deserialize)]
18pub struct RawInteraction {
19 pub user_id: String,
20 pub book_id: String,
21 pub review_id: String,
22 #[serde(alias = "isRead")]
23 pub is_read: bool,
24 pub rating: f32,
25 pub date_added: String,
26 pub date_updated: String,
27 pub read_at: String,
28 pub started_at: String,
29}
30
31#[derive(ParquetRecordWriter)]
37pub struct IntRecord {
38 pub rec_id: u32,
39 pub review_id: i64,
46 pub user_id: i32,
47 pub book_id: i32,
48 pub is_read: u8,
49 pub rating: Option<f32>,
50 pub added: Option<NaiveDateTime>,
51 pub updated: Option<NaiveDateTime>,
52 pub read_started: Option<NaiveDateTime>,
53 pub read_finished: Option<NaiveDateTime>,
54}
55
56pub struct IntWriter {
58 writer: TableWriter<IntRecord>,
59 users: IdIndex<String>,
60 review_ids: HashSet<i64>,
61 n_recs: u32,
62}
63
64impl IntWriter {
65 pub fn open() -> Result<IntWriter> {
67 let writer = TableWriter::open(OUT_FILE.resolve()?)?;
68 Ok(IntWriter {
69 writer,
70 users: IdIndex::new(),
71 review_ids: HashSet::new(),
72 n_recs: 0,
73 })
74 }
75}
76
77impl ObjectWriter<RawInteraction> for IntWriter {
78 fn write_object(&mut self, row: RawInteraction) -> Result<()> {
80 self.n_recs += 1;
81 let rec_id = self.n_recs;
82 let user_id = self.users.intern_owned(row.user_id)?;
83 let book_id: i32 = row.book_id.parse()?;
84 let (rev_hi, rev_lo) = decode_hex_i64_pair(&row.review_id)?;
85 let review_id = rev_hi ^ rev_lo;
86 if !self.review_ids.insert(review_id) {
87 warn!("review id {} duplicated ({})", review_id, row.review_id);
88 }
89
90 self.writer.write_object(IntRecord {
91 rec_id,
92 review_id,
93 user_id,
94 book_id,
95 is_read: row.is_read as u8,
96 rating: if row.rating > 0.0 {
97 Some(row.rating)
98 } else {
99 None
100 },
101 added: parse_gr_date(&row.date_added).map(check_ts("added", 2000))?,
102 updated: parse_gr_date(&row.date_updated).map(check_ts("updated", 2000))?,
103 read_started: trim_opt(&row.started_at)
104 .map(parse_gr_date)
105 .transpose()?
106 .and_then(check_ts("started", 1900)),
107 read_finished: trim_opt(&row.read_at)
108 .map(parse_gr_date)
109 .transpose()?
110 .and_then(check_ts("finished", 1900)),
111 })?;
112
113 Ok(())
114 }
115
116 fn finish_objects(self) -> Result<usize> {
118 info!(
119 "wrote {} records for {} users, closing output",
120 self.n_recs,
121 self.users.len()
122 );
123 let res = self.writer.finish_objects()?;
124 save_user_index(&self.users)?;
125 Ok(res)
126 }
127}