bookdata/goodreads/
interaction.rs

1//! GoodReads interaction record schemas and processing.
2use chrono::NaiveDateTime;
3use hashbrown::HashSet;
4use parquet_derive::ParquetRecordWriter;
5use serde::Deserialize;
6
7use crate::arrow::*;
8use crate::goodreads::users::save_user_index;
9use crate::ids::index::IdIndex;
10use crate::parsing::dates::*;
11use crate::parsing::*;
12use crate::prelude::*;
13
14pub const OUT_FILE: BDPath<'static> = BDPath::new("goodreads/gr-interactions.parquet");
15
16/// Interaction records we read from JSON.
17#[derive(Deserialize)]
18pub struct RawInteraction {
19    pub user_id: String,
20    pub book_id: String,
21    pub review_id: String,
22    #[serde(alias = "isRead")]
23    pub is_read: bool,
24    pub rating: f32,
25    pub date_added: String,
26    pub date_updated: String,
27    pub read_at: String,
28    pub started_at: String,
29}
30
31/// GoodReads interaction records as actually written to the table.
32///
33/// This struct is written to `gr-interactions.parquet` and records actual interaction data.
34/// Timestamps are UNIX timestamps recorded as 64-bit integers; they do not use a Parquet
35/// timestamp time, due to out-of-range values causing problems when loaded into Python.
36#[derive(ParquetRecordWriter)]
37pub struct IntRecord {
38    pub rec_id: u32,
39    /// The review ID.
40    ///
41    /// This is derived from the hexadecimal review ID by interpreting the hexadecimal-encoded
42    /// review ID from the source data as two big-endian i64s and XORing them.  The import
43    /// process checks that this does not result in duplicate review IDs, and emits warnings
44    /// if any are encountered.
45    pub review_id: i64,
46    pub user_id: i32,
47    pub book_id: i32,
48    pub is_read: u8,
49    pub rating: Option<f32>,
50    pub added: Option<NaiveDateTime>,
51    pub updated: Option<NaiveDateTime>,
52    pub read_started: Option<NaiveDateTime>,
53    pub read_finished: Option<NaiveDateTime>,
54}
55
56/// Object writer to transform and write GoodReads interactions
57pub struct IntWriter {
58    writer: TableWriter<IntRecord>,
59    users: IdIndex<String>,
60    review_ids: HashSet<i64>,
61    n_recs: u32,
62}
63
64impl IntWriter {
65    /// Open a new output
66    pub fn open() -> Result<IntWriter> {
67        let writer = TableWriter::open(OUT_FILE.resolve()?)?;
68        Ok(IntWriter {
69            writer,
70            users: IdIndex::new(),
71            review_ids: HashSet::new(),
72            n_recs: 0,
73        })
74    }
75}
76
77impl ObjectWriter<RawInteraction> for IntWriter {
78    /// Write a single interaction to the output
79    fn write_object(&mut self, row: RawInteraction) -> Result<()> {
80        self.n_recs += 1;
81        let rec_id = self.n_recs;
82        let user_id = self.users.intern_owned(row.user_id)?;
83        let book_id: i32 = row.book_id.parse()?;
84        let (rev_hi, rev_lo) = decode_hex_i64_pair(&row.review_id)?;
85        let review_id = rev_hi ^ rev_lo;
86        if !self.review_ids.insert(review_id) {
87            warn!("review id {} duplicated ({})", review_id, row.review_id);
88        }
89
90        self.writer.write_object(IntRecord {
91            rec_id,
92            review_id,
93            user_id,
94            book_id,
95            is_read: row.is_read as u8,
96            rating: if row.rating > 0.0 {
97                Some(row.rating)
98            } else {
99                None
100            },
101            added: parse_gr_date(&row.date_added).map(check_ts("added", 2000))?,
102            updated: parse_gr_date(&row.date_updated).map(check_ts("updated", 2000))?,
103            read_started: trim_opt(&row.started_at)
104                .map(parse_gr_date)
105                .transpose()?
106                .and_then(check_ts("started", 1900)),
107            read_finished: trim_opt(&row.read_at)
108                .map(parse_gr_date)
109                .transpose()?
110                .and_then(check_ts("finished", 1900)),
111        })?;
112
113        Ok(())
114    }
115
116    // Clean up and finalize output
117    fn finish_objects(self) -> Result<usize> {
118        info!(
119            "wrote {} records for {} users, closing output",
120            self.n_recs,
121            self.users.len()
122        );
123        let res = self.writer.finish_objects()?;
124        save_user_index(&self.users)?;
125        Ok(res)
126    }
127}