bookdata/util/
logging.rs

1//! Logging utilities for the book data tools.
2//!
3//! This module contains support for initializing the logging infrastucture, and
4//! for dynamically routing log messages based on whether there is an active
5//! progress bar.
6
7use std::fmt::Debug;
8use std::marker::PhantomData;
9
10use crossbeam::channel::{Receiver, Sender};
11use friendly::scalar;
12use happylog::new_progress;
13use indicatif::style::ProgressTracker;
14use indicatif::{ProgressBar, ProgressState, ProgressStyle};
15
16const DATA_PROGRESS_TMPL: &str =
17    "{prefix}: {wide_bar} {bytes}/{total_bytes} ({bytes_per_sec}, {elapsed} elapsed, ETA {eta})";
18const ITEM_PROGRESS_TMPL: &str = "{prefix}: {wide_bar} {friendly_pos}/{friendly_len} ({friendly_rate}/s, {elapsed} elapsed, ETA {eta}) {msg}";
19const METER_TMPL: &str = "{prefix}: {wide_bar} {pos}/{len} {msg}";
20
21trait FieldExtract: Default + Send + Sync {
22    fn extract(state: &ProgressState) -> f64;
23}
24
25#[derive(Default)]
26struct Friendly<F: FieldExtract + 'static> {
27    _ghost: PhantomData<F>,
28}
29
30impl<F: FieldExtract + 'static> ProgressTracker for Friendly<F> {
31    fn clone_box(&self) -> Box<dyn ProgressTracker> {
32        Box::new(Self::default())
33    }
34
35    fn reset(&mut self, _state: &indicatif::ProgressState, _now: std::time::Instant) {
36        // do nothing
37    }
38
39    fn tick(&mut self, _state: &indicatif::ProgressState, _now: std::time::Instant) {
40        // do nothing
41    }
42
43    fn write(&self, state: &indicatif::ProgressState, w: &mut dyn std::fmt::Write) {
44        let val = F::extract(state);
45        let len = scalar(val);
46        write!(w, "{}", len).expect("failed to write progress");
47    }
48}
49
50#[derive(Default)]
51struct Pos;
52impl FieldExtract for Pos {
53    fn extract(state: &ProgressState) -> f64 {
54        state.pos() as f64
55    }
56}
57
58#[derive(Default)]
59struct Len;
60impl FieldExtract for Len {
61    fn extract(state: &ProgressState) -> f64 {
62        state.len().map(|x| x as f64).unwrap_or(f64::NAN)
63    }
64}
65
66#[derive(Default)]
67struct Rate;
68impl FieldExtract for Rate {
69    fn extract(state: &ProgressState) -> f64 {
70        state.per_sec()
71    }
72}
73
74/// Create a progress bar for tracking data.
75///
76/// If the size is unknown at creation time, pass 0.
77pub fn data_progress<S>(len: S) -> ProgressBar
78where
79    S: TryInto<u64>,
80    S::Error: Debug,
81{
82    new_progress(len.try_into().expect("invalid length")).with_style(
83        ProgressStyle::default_bar()
84            .template(DATA_PROGRESS_TMPL)
85            .expect("template error"),
86    )
87}
88
89/// Create a progress bar for tracking items.
90///
91/// If the size is unknown at creation time, pass 0.
92pub fn item_progress<S>(len: S, name: &str) -> ProgressBar
93where
94    S: TryInto<u64>,
95    S::Error: Debug,
96{
97    let len: u64 = len.try_into().expect("invalid length");
98    let len = Some(len).filter(|l| *l > 0);
99    let style = ProgressStyle::default_bar()
100        .with_key("friendly_pos", Friendly::<Pos>::default())
101        .with_key("friendly_len", Friendly::<Len>::default())
102        .with_key("friendly_rate", Friendly::<Rate>::default())
103        .template(ITEM_PROGRESS_TMPL)
104        .expect("template error");
105
106    new_progress(len.unwrap_or(0))
107        .with_style(style)
108        .with_prefix(name.to_string())
109}
110
111/// Create a meter for monitoring pipelines.
112pub fn meter_bar<S>(len: S, name: &str) -> ProgressBar
113where
114    S: TryInto<u64>,
115    S::Error: Debug,
116{
117    let len: u64 = len.try_into().expect("invalid length");
118    let len = Some(len).filter(|l| *l > 0);
119    let style = ProgressStyle::default_bar()
120        .template(METER_TMPL)
121        .expect("template error");
122    new_progress(len.unwrap_or(0))
123        .with_style(style)
124        .with_prefix(name.to_string())
125}
126
127/// Fetch from a receiver while updating the length.
128pub fn measure_and_recv<T>(chan: &Receiver<T>, pb: &ProgressBar) -> Option<T> {
129    pb.set_position(chan.len() as u64);
130    let res = chan.recv();
131    pb.set_position(chan.len() as u64);
132    res.ok()
133}
134
135/// Send to a channel while updating the length.
136pub fn measure_and_send<T>(
137    chan: &Sender<T>,
138    obj: T,
139    pb: &ProgressBar,
140) -> Result<(), crossbeam::channel::SendError<T>> {
141    pb.set_position(chan.len() as u64);
142    let res = chan.send(obj);
143    pb.set_position(chan.len() as u64);
144    res
145}