bookdata/cli/amazon/
scan_ratings.rs

1//! Scan Amazon ratings.
2use csv;
3use std::fs::File;
4
5use crate::amazon::*;
6use crate::arrow::*;
7use crate::ids::index::IdIndex;
8use crate::prelude::*;
9use crate::util::logging::data_progress;
10
11/// Scan an Amazon rating CSV file into Parquet.
12#[derive(Args, Debug)]
13#[command(name = "scan-ratings")]
14pub struct ScanRatings {
15    /// Swap user and item columns (for AZ 2018 data)
16    #[arg(long = "swap-id-columns")]
17    swap_columns: bool,
18
19    /// Rating output file
20    #[arg(short = 'o', long = "rating-output", name = "FILE")]
21    ratings_out: PathBuf,
22
23    /// Input file
24    #[arg(name = "INPUT")]
25    infile: PathBuf,
26}
27
28impl Command for ScanRatings {
29    fn exec(&self) -> Result<()> {
30        info!("scanning Amazon rating CSV from {}", self.infile.display());
31        let out = &self.ratings_out;
32        info!("writing to {}", out.display());
33        let mut writer = TableWriter::open(out)?;
34
35        let src = File::open(&self.infile)?;
36        let pb = data_progress(src.metadata()?.len());
37        pb.set_prefix("ratings");
38        let src = pb.wrap_read(src);
39        let src = csv::ReaderBuilder::new()
40            .has_headers(false)
41            .from_reader(src);
42        let src = src.into_deserialize();
43        let mut index: IdIndex<String> = IdIndex::new();
44        for row in src {
45            let mut row: SourceRating = row?;
46            if self.swap_columns {
47                std::mem::swap(&mut row.user, &mut row.asin);
48            }
49            let user_id = index.intern(row.user.as_str())?;
50            writer.write_object(RatingRow {
51                user_id,
52                asin: row.asin,
53                rating: row.rating,
54                timestamp: row.timestamp,
55            })?;
56        }
57
58        writer.finish()?;
59        Ok(())
60    }
61}