1use std::convert::TryInto;
2use std::io::BufRead;
3use std::mem::replace;
4use std::str;
5use std::thread::{scope, spawn, JoinHandle, ScopedJoinHandle};
6
7use crossbeam::channel::bounded;
8use log::*;
9
10use anyhow::{anyhow, Result};
11use quick_xml::events::attributes::Attributes;
12use quick_xml::events::Event;
13use quick_xml::Reader;
14
15use crate::io::object::{ChunkWriter, ThreadObjectWriter, UnchunkWriter};
16use crate::io::ObjectWriter;
17use crate::tsv::split_first;
18use crate::util::logging::{measure_and_recv, measure_and_send, meter_bar};
19use crate::util::StringAccumulator;
20
21use super::record::*;
22
23const CHUNK_LINES: usize = 5000;
24const CHUNK_BUFFER_SIZE: usize = 20;
25
26#[derive(Debug, Default)]
27struct Codes {
28 tag: i16,
29 ind1: Code,
30 ind2: Code,
31}
32
33impl From<Codes> for Field {
34 fn from(c: Codes) -> Field {
35 Field {
36 tag: c.tag,
37 ind1: c.ind1,
38 ind2: c.ind2,
39 subfields: Vec::new(),
40 }
41 }
42}
43
44pub fn scan_records<R, W>(reader: R, output: &mut W) -> Result<usize>
46where
47 R: BufRead,
48 W: ObjectWriter<MARCRecord>,
49{
50 let mut reader = Reader::from_reader(reader);
51 let mut nrecs = 0;
52 let mut buffer = Vec::with_capacity(4096);
53 loop {
54 match reader.read_event_into(&mut buffer)? {
55 Event::Start(ref e) => {
56 let name = e.local_name();
57 match name.into_inner() {
58 b"record" => {
59 let rec = read_record(&mut reader)?;
60 output.write_object(rec)?;
61 nrecs += 1;
62 }
63 _ => (),
64 }
65 }
66 Event::Eof => return Ok(nrecs),
67 _ => (),
68 }
69 }
70}
71
72pub fn scan_records_delim<R, W>(reader: R, output: &mut W) -> Result<usize>
77where
78 R: BufRead + Send + 'static,
79 W: ObjectWriter<MARCRecord> + Sync + Send,
80{
81 let lines = reader.lines();
82
83 let output = ChunkWriter::new(output);
84 let fill = meter_bar(CHUNK_BUFFER_SIZE, "input chunks");
85
86 let nrecs: Result<usize> = scope(|outer| {
87 let output = ThreadObjectWriter::wrap(output)
89 .with_name("marc records")
90 .with_capacity(CHUNK_BUFFER_SIZE)
91 .spawn_scoped(outer);
92 let (chunk_tx, chunk_rx) = bounded(CHUNK_BUFFER_SIZE);
94
95 info!("spawning reader thread");
97 let fpb = fill.clone();
98 let bg_read: JoinHandle<Result<usize>> = spawn(move || {
99 let mut accum = Vec::with_capacity(CHUNK_LINES);
100 let mut nlines = 0usize;
101 for line in lines {
102 let line = line?;
103 let (_id, payload) = split_first(&line).ok_or_else(|| anyhow!("invalid line"))?;
104 nlines += 1;
105 accum.push(payload.to_owned());
106 if accum.len() >= CHUNK_LINES {
107 let chunk = replace(&mut accum, Vec::with_capacity(CHUNK_LINES));
108 measure_and_send(&chunk_tx, chunk, &fpb).expect("channel send failure");
109 }
110 }
111 if accum.len() > 0 {
112 chunk_tx.send(accum).expect("channel send failure");
113 }
114 Ok(nlines)
115 });
116
117 let nrecs: Result<usize> = scope(|inner| {
118 let nthreads = 4;
127 let mut workers: Vec<ScopedJoinHandle<'_, Result<usize>>> =
128 Vec::with_capacity(nthreads as usize);
129 info!("spawning {} parser threads", nthreads);
130 for i in 0..nthreads {
131 debug!("spawning parser thread {}", i + 1);
132 let rx = chunk_rx.clone();
133 let out = output.satellite();
134 let out = UnchunkWriter::with_size(out, CHUNK_LINES);
135 let fill = fill.clone();
136 workers.push(inner.spawn(move || {
137 let mut out = out;
138 let mut nrecs = 0;
139 while let Some(chunk) = measure_and_recv(&rx, &fill) {
140 for line in chunk {
141 let res = parse_record(&line)?;
142 out.write_object(res)?;
143 nrecs += 1;
144 }
145 }
146 out.finish()?;
147 Ok(nrecs)
148 }));
149 }
150
151 let mut nrecs = 0;
152 for h in workers {
153 nrecs += h.join().map_err(std::panic::resume_unwind)??;
154 }
155 Ok(nrecs)
156 });
157 let nrecs = nrecs?;
158
159 bg_read.join().map_err(std::panic::resume_unwind)??;
160 output.finish()?;
161 Ok(nrecs)
162 });
163 let nrecs = nrecs?;
164
165 info!("processed {} records", nrecs);
166 Ok(nrecs)
167}
168
169pub fn parse_record<S: AsRef<str>>(xml: S) -> Result<MARCRecord> {
171 let mut parse = Reader::from_str(xml.as_ref());
172 read_record(&mut parse)
173}
174
175#[inline(never)] fn read_record<B: BufRead>(rdr: &mut Reader<B>) -> Result<MARCRecord> {
178 let mut buf = Vec::new();
179 let mut content = StringAccumulator::new();
180 let mut record = MARCRecord {
181 leader: String::new(),
182 control: Vec::new(),
183 fields: Vec::new(),
184 };
185 let mut field = Field::default();
186 let mut tag = 0;
187 let mut sf_code = Code::default();
188 loop {
189 match rdr.read_event_into(&mut buf)? {
190 Event::Start(ref e) => {
191 let name = e.local_name();
192 match name.into_inner() {
193 b"record" => (),
194 b"leader" => {
195 content.activate();
196 }
197 b"controlfield" => {
198 tag = read_tag_attr(e.attributes())?;
199 content.activate();
200 }
201 b"datafield" => {
202 let codes = read_code_attrs(e.attributes())?;
203 field = codes.into();
204 }
205 b"subfield" => {
206 sf_code = read_sf_code_attr(e.attributes())?;
207 content.activate();
208 }
209 _ => (),
210 }
211 }
212 Event::End(ref e) => {
213 let name = e.local_name();
214 match name.into_inner() {
215 b"leader" => {
216 record.leader = content.finish().to_owned();
217 }
218 b"controlfield" => record.control.push(ControlField {
219 tag: tag.try_into()?,
220 content: content.finish().to_owned(),
221 }),
222 b"subfield" => field.subfields.push(Subfield {
223 code: sf_code,
224 content: content.finish().to_owned(),
225 }),
226 b"datafield" => {
227 record.fields.push(field);
228 field = Field::default();
229 }
230 b"record" => return Ok(record),
231 _ => (),
232 }
233 }
234 Event::Text(e) => {
235 let t = e.unescape()?;
236 content.add_slice(t);
237 }
238 Event::Eof => break,
239 _ => (),
240 }
241 }
242 Err(anyhow!("could not parse record"))
243}
244
245fn read_tag_attr(attrs: Attributes<'_>) -> Result<i16> {
247 for ar in attrs {
248 let a = ar?;
249 if a.key.into_inner() == b"tag" {
250 let tag = a.unescape_value()?;
251 return Ok(tag.parse()?);
252 }
253 }
254
255 Err(anyhow!("no tag attribute found"))
256}
257
258fn read_code_attrs(attrs: Attributes<'_>) -> Result<Codes> {
260 let mut tag = 0;
261 let mut ind1 = Code::default();
262 let mut ind2 = Code::default();
263
264 for ar in attrs {
265 let a = ar?;
266 let v = a.unescape_value()?;
267 match a.key.into_inner() {
268 b"tag" => tag = v.parse()?,
269 b"ind1" => ind1 = v.as_bytes()[0].into(),
270 b"ind2" => ind2 = v.as_bytes()[0].into(),
271 _ => (),
272 }
273 }
274
275 if tag == 0 {
276 Err(anyhow!("no tag attribute found"))
277 } else {
278 Ok(Codes { tag, ind1, ind2 })
279 }
280}
281
282fn read_sf_code_attr(attrs: Attributes<'_>) -> Result<Code> {
284 for ar in attrs {
285 let a = ar?;
286 if a.key.into_inner() == b"code" {
287 let code = a.unescape_value()?;
288 return Ok(code.as_bytes()[0].into());
289 }
290 }
291
292 Err(anyhow!("no code found"))
293}