bookdata/cli/amazon/
scan_reviews.rs

1//! Scan Amazon reviews.
2use crate::amazon::*;
3use crate::arrow::*;
4use crate::ids::index::IdIndex;
5use crate::prelude::*;
6use crate::util::logging::data_progress;
7
8/// Scan an Amazon review JSON file into Parquet.
9#[derive(Args, Debug)]
10#[command(name = "scan-reviews")]
11pub struct ScanReviews {
12    /// Rating output file
13    #[arg(short = 'o', long = "rating-output")]
14    ratings_out: PathBuf,
15
16    /// Review output file
17    #[arg(short = 'r', long = "review-output")]
18    reviews_out: Option<PathBuf>,
19
20    /// Input file
21    #[arg(name = "INPUT")]
22    infile: PathBuf,
23}
24
25impl Command for ScanReviews {
26    fn exec(&self) -> Result<()> {
27        info!("scanning Amazon reviews");
28
29        let out = &self.ratings_out;
30        info!("writing ratings to {}", out.display());
31        let mut ratings = TableWriter::open(out)?;
32
33        let mut reviews = if let Some(ref p) = self.reviews_out {
34            info!("writing reviews to {}", p.display());
35            Some(TableWriter::open(p)?)
36        } else {
37            None
38        };
39
40        let pb = data_progress(0);
41        let src = LineProcessor::open_gzip(&self.infile, pb.clone())?;
42        // let mut timer = Timer::new();
43        let mut users: IdIndex<String> = IdIndex::new();
44        let mut lno: usize = 0;
45        // let iter = timer.iter_progress("reading reviews", 5.0, src.json_records());
46        for row in src.json_records() {
47            lno += 1;
48            let row: SourceReview = row.map_err(|e| {
49                error!("parse error on line {}: {}", lno, e);
50                e
51            })?;
52            let user_id = users.intern(row.user.as_str())?;
53            ratings.write_object(RatingRow {
54                user_id,
55                asin: row.asin.clone(),
56                rating: row.rating,
57                timestamp: row.timestamp,
58            })?;
59
60            if let Some(ref mut rvw) = reviews {
61                if row.summary.is_some() || row.text.is_some() {
62                    rvw.write_object(ReviewRow {
63                        user_id,
64                        asin: row.asin,
65                        rating: row.rating,
66                        timestamp: row.timestamp,
67                        summary: row.summary.unwrap_or_default().trim().to_owned(),
68                        text: row.text.unwrap_or_default().trim().to_owned(),
69                    })?;
70                }
71            }
72        }
73
74        ratings.finish()?;
75        if let Some(rvw) = reviews {
76            rvw.finish()?;
77        }
78        Ok(())
79    }
80}