bookdata/marc/
parse.rs

1use std::convert::TryInto;
2use std::io::BufRead;
3use std::mem::replace;
4use std::str;
5use std::thread::{scope, spawn, JoinHandle, ScopedJoinHandle};
6
7use crossbeam::channel::bounded;
8use log::*;
9
10use anyhow::{anyhow, Result};
11use quick_xml::events::attributes::Attributes;
12use quick_xml::events::Event;
13use quick_xml::Reader;
14
15use crate::io::object::{ChunkWriter, ThreadObjectWriter, UnchunkWriter};
16use crate::io::ObjectWriter;
17use crate::tsv::split_first;
18use crate::util::logging::{measure_and_recv, measure_and_send, meter_bar};
19use crate::util::StringAccumulator;
20
21use super::record::*;
22
23const CHUNK_LINES: usize = 5000;
24const CHUNK_BUFFER_SIZE: usize = 20;
25
26#[derive(Debug, Default)]
27struct Codes {
28    tag: i16,
29    ind1: Code,
30    ind2: Code,
31}
32
33impl From<Codes> for Field {
34    fn from(c: Codes) -> Field {
35        Field {
36            tag: c.tag,
37            ind1: c.ind1,
38            ind2: c.ind2,
39            subfields: Vec::new(),
40        }
41    }
42}
43
44/// Read MARC records from XML.
45pub fn scan_records<R, W>(reader: R, output: &mut W) -> Result<usize>
46where
47    R: BufRead,
48    W: ObjectWriter<MARCRecord>,
49{
50    let mut reader = Reader::from_reader(reader);
51    let mut nrecs = 0;
52    let mut buffer = Vec::with_capacity(4096);
53    loop {
54        match reader.read_event_into(&mut buffer)? {
55            Event::Start(ref e) => {
56                let name = e.local_name();
57                match name.into_inner() {
58                    b"record" => {
59                        let rec = read_record(&mut reader)?;
60                        output.write_object(rec)?;
61                        nrecs += 1;
62                    }
63                    _ => (),
64                }
65            }
66            Event::Eof => return Ok(nrecs),
67            _ => (),
68        }
69    }
70}
71
72/// Read MARC records from delimited XML.
73///
74/// This reader parses the XML in parallel, since XML parsing is typically
75/// the bottleneck for MARC scanning.
76pub fn scan_records_delim<R, W>(reader: R, output: &mut W) -> Result<usize>
77where
78    R: BufRead + Send + 'static,
79    W: ObjectWriter<MARCRecord> + Sync + Send,
80{
81    let lines = reader.lines();
82
83    let output = ChunkWriter::new(output);
84    let fill = meter_bar(CHUNK_BUFFER_SIZE, "input chunks");
85
86    let nrecs: Result<usize> = scope(|outer| {
87        // scoped thread writer to support parallel writing
88        let output = ThreadObjectWriter::wrap(output)
89            .with_name("marc records")
90            .with_capacity(CHUNK_BUFFER_SIZE)
91            .spawn_scoped(outer);
92        // receivers & senders for chunks of lines
93        let (chunk_tx, chunk_rx) = bounded(CHUNK_BUFFER_SIZE);
94
95        // background thread getting lines
96        info!("spawning reader thread");
97        let fpb = fill.clone();
98        let bg_read: JoinHandle<Result<usize>> = spawn(move || {
99            let mut accum = Vec::with_capacity(CHUNK_LINES);
100            let mut nlines = 0usize;
101            for line in lines {
102                let line = line?;
103                let (_id, payload) = split_first(&line).ok_or_else(|| anyhow!("invalid line"))?;
104                nlines += 1;
105                accum.push(payload.to_owned());
106                if accum.len() >= CHUNK_LINES {
107                    let chunk = replace(&mut accum, Vec::with_capacity(CHUNK_LINES));
108                    measure_and_send(&chunk_tx, chunk, &fpb).expect("channel send failure");
109                }
110            }
111            if accum.len() > 0 {
112                chunk_tx.send(accum).expect("channel send failure");
113            }
114            Ok(nlines)
115        });
116
117        let nrecs: Result<usize> = scope(|inner| {
118            // how many workers to use? let's count the active threads
119            //
120            // 1. decompression
121            // 2. parse lines
122            // 3. serialize MARC records
123            // 4. write Parquet file
124            //
125            // That leaves the remaining proessors to be used for parsing XML.
126            let nthreads = 4;
127            let mut workers: Vec<ScopedJoinHandle<'_, Result<usize>>> =
128                Vec::with_capacity(nthreads as usize);
129            info!("spawning {} parser threads", nthreads);
130            for i in 0..nthreads {
131                debug!("spawning parser thread {}", i + 1);
132                let rx = chunk_rx.clone();
133                let out = output.satellite();
134                let out = UnchunkWriter::with_size(out, CHUNK_LINES);
135                let fill = fill.clone();
136                workers.push(inner.spawn(move || {
137                    let mut out = out;
138                    let mut nrecs = 0;
139                    while let Some(chunk) = measure_and_recv(&rx, &fill) {
140                        for line in chunk {
141                            let res = parse_record(&line)?;
142                            out.write_object(res)?;
143                            nrecs += 1;
144                        }
145                    }
146                    out.finish()?;
147                    Ok(nrecs)
148                }));
149            }
150
151            let mut nrecs = 0;
152            for h in workers {
153                nrecs += h.join().map_err(std::panic::resume_unwind)??;
154            }
155            Ok(nrecs)
156        });
157        let nrecs = nrecs?;
158
159        bg_read.join().map_err(std::panic::resume_unwind)??;
160        output.finish()?;
161        Ok(nrecs)
162    });
163    let nrecs = nrecs?;
164
165    info!("processed {} records", nrecs);
166    Ok(nrecs)
167}
168
169/// Parse a single MARC record from an XML string.
170pub fn parse_record<S: AsRef<str>>(xml: S) -> Result<MARCRecord> {
171    let mut parse = Reader::from_str(xml.as_ref());
172    read_record(&mut parse)
173}
174
175/// Read a single MARC record from an XML reader.
176#[inline(never)] // make profiling a little easier, this fn isn't worth inlining
177fn read_record<B: BufRead>(rdr: &mut Reader<B>) -> Result<MARCRecord> {
178    let mut buf = Vec::new();
179    let mut content = StringAccumulator::new();
180    let mut record = MARCRecord {
181        leader: String::new(),
182        control: Vec::new(),
183        fields: Vec::new(),
184    };
185    let mut field = Field::default();
186    let mut tag = 0;
187    let mut sf_code = Code::default();
188    loop {
189        match rdr.read_event_into(&mut buf)? {
190            Event::Start(ref e) => {
191                let name = e.local_name();
192                match name.into_inner() {
193                    b"record" => (),
194                    b"leader" => {
195                        content.activate();
196                    }
197                    b"controlfield" => {
198                        tag = read_tag_attr(e.attributes())?;
199                        content.activate();
200                    }
201                    b"datafield" => {
202                        let codes = read_code_attrs(e.attributes())?;
203                        field = codes.into();
204                    }
205                    b"subfield" => {
206                        sf_code = read_sf_code_attr(e.attributes())?;
207                        content.activate();
208                    }
209                    _ => (),
210                }
211            }
212            Event::End(ref e) => {
213                let name = e.local_name();
214                match name.into_inner() {
215                    b"leader" => {
216                        record.leader = content.finish().to_owned();
217                    }
218                    b"controlfield" => record.control.push(ControlField {
219                        tag: tag.try_into()?,
220                        content: content.finish().to_owned(),
221                    }),
222                    b"subfield" => field.subfields.push(Subfield {
223                        code: sf_code,
224                        content: content.finish().to_owned(),
225                    }),
226                    b"datafield" => {
227                        record.fields.push(field);
228                        field = Field::default();
229                    }
230                    b"record" => return Ok(record),
231                    _ => (),
232                }
233            }
234            Event::Text(e) => {
235                let t = e.unescape()?;
236                content.add_slice(t);
237            }
238            Event::Eof => break,
239            _ => (),
240        }
241    }
242    Err(anyhow!("could not parse record"))
243}
244
245/// Read the tag attribute from a tag.
246fn read_tag_attr(attrs: Attributes<'_>) -> Result<i16> {
247    for ar in attrs {
248        let a = ar?;
249        if a.key.into_inner() == b"tag" {
250            let tag = a.unescape_value()?;
251            return Ok(tag.parse()?);
252        }
253    }
254
255    Err(anyhow!("no tag attribute found"))
256}
257
258/// Read code attributes from a tag.
259fn read_code_attrs(attrs: Attributes<'_>) -> Result<Codes> {
260    let mut tag = 0;
261    let mut ind1 = Code::default();
262    let mut ind2 = Code::default();
263
264    for ar in attrs {
265        let a = ar?;
266        let v = a.unescape_value()?;
267        match a.key.into_inner() {
268            b"tag" => tag = v.parse()?,
269            b"ind1" => ind1 = v.as_bytes()[0].into(),
270            b"ind2" => ind2 = v.as_bytes()[0].into(),
271            _ => (),
272        }
273    }
274
275    if tag == 0 {
276        Err(anyhow!("no tag attribute found"))
277    } else {
278        Ok(Codes { tag, ind1, ind2 })
279    }
280}
281
282/// Read the subfield code attriute from a tag
283fn read_sf_code_attr(attrs: Attributes<'_>) -> Result<Code> {
284    for ar in attrs {
285        let a = ar?;
286        if a.key.into_inner() == b"code" {
287            let code = a.unescape_value()?;
288            return Ok(code.as_bytes()[0].into());
289        }
290    }
291
292    Err(anyhow!("no code found"))
293}