eg_staged_bib_overlay: add --link-skipped
[migration-tools.git] / eg_staged_bib_overlay
1 #!/usr/bin/perl
2
3 # Copyright (c) 2016 Equinox Software, Inc.
4 # Author: Galen Charlton <gmc@esilibrary.com>
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2, or (at your option)
9 # any later version.
10 #
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 # GNU General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program.  If not, see <http://www.gnu.org/licenses/>
18
19 use strict;
20 use warnings;
21
22 use Getopt::Long;
23 use MARC::Record;
24 use MARC::File::XML (BinaryEncoding => 'utf8');
25 use DBI;
26 use OpenILS::Application::AppUtils;
27
28 my $action;
29 my $schema = 'bib_loads';
30 my $db;
31 my $dbuser;
32 my $dbpw;
33 my $dbhost;
34 my $batch;
35 my $cutoff;
36 my $wait = 1;
37 my $output;
38 my $link_skipped;
39
40 my $ret = GetOptions(
41     'action:s'      => \$action,
42     'schema:s'      => \$schema,
43     'db:s'          => \$db,
44     'dbuser:s'      => \$dbuser,
45     'dbhost:s'      => \$dbhost,
46     'dbpw:s'        => \$dbpw,
47     'batch:s'       => \$batch,
48     'cutoff:s'      => \$cutoff,
49     'wait:i'        => \$wait,
50     'output:s'      => \$output,
51     'link-skipped'  => \$link_skipped,
52 );
53
54 abort('must specify --action') unless defined $action;
55 abort('must specify --schema') unless defined $schema;
56 abort('must specify --db') unless defined $db;
57 abort('must specify --dbuser') unless defined $dbuser;
58 abort('must specify --dbhost') unless defined $dbhost;
59 abort('must specify --dbpw') unless defined $dbpw;
60 abort('must specify --batch') unless defined $batch;
61
62 abort(q{--action must be "stage_bibs", "filter_bibs", "load_bibs", "stage_auths",
63 "match_auths", "load_new_auths", "overlay_auths_stage1",
64 "overlay_auths_stage2", "overlay_auths_stage3", "link_auth_auth",
65 "link_auth_bib", "export_skipped_bibs", or "export_skipped_auths"}) unless
66     $action eq 'filter_bibs' or
67     $action eq 'stage_bibs' or
68     $action eq 'load_bibs' or
69     $action eq 'stage_auths' or
70     $action eq 'match_auths' or
71     $action eq 'load_new_auths' or
72     $action eq 'overlay_auths_stage1' or
73     $action eq 'overlay_auths_stage2' or
74     $action eq 'overlay_auths_stage3' or
75     $action eq 'link_auth_auth' or
76     $action eq 'link_auth_bib' or
77     $action eq 'export_skipped_bibs' or
78     $action eq 'export_skipped_auths'
79 ;
80
81 my $dbh = connect_db($db, $dbuser, $dbpw, $dbhost);
82
83 if ($action eq 'stage_bibs') {
84     abort('must specify at least one input file') unless @ARGV;
85     handle_stage_bibs($dbh, $schema, $batch);
86 }
87
88 if ($action eq 'filter_bibs') {
89     abort('must specify cutoff date when filtering') unless defined $cutoff;
90     handle_filter_bibs($dbh, $schema, $batch, $cutoff);
91 }
92
93 if ($action eq 'load_bibs' ) {
94     handle_load_bibs($dbh, $schema, $batch, $wait);
95 }
96
97 if ($action eq 'stage_auths') {
98     abort('must specify at least one input file') unless @ARGV;
99     handle_stage_auths($dbh, $schema, $batch);
100 }
101
102 if ($action eq 'match_auths') {
103     handle_match_auths($dbh, $schema, $batch);
104 }
105
106 if ($action eq 'load_new_auths') {
107     handle_load_new_auths($dbh, $schema, $batch);
108 }
109
110 if ($action eq 'overlay_auths_stage1') {
111     handle_overlay_auths_stage1($dbh, $schema, $batch);
112 }
113 if ($action eq 'overlay_auths_stage2') {
114     handle_overlay_auths_stage2($dbh, $schema, $batch);
115 }
116 if ($action eq 'overlay_auths_stage3') {
117     handle_overlay_auths_stage3($dbh, $schema, $batch);
118 }
119
120 if ($action eq 'link_auth_auth') {
121     handle_link_auth_auth($dbh, $schema, $batch);
122 }
123 if ($action eq 'link_auth_bib') {
124     handle_link_auth_bib($dbh, $schema, $batch, $link_skipped);
125 }
126
127 if ($action eq 'export_skipped_bibs') {
128     abort('must specify output file') unless defined $output;
129     handle_export_skipped_bibs($dbh, $schema, $batch, $output);
130 }
131 if ($action eq 'export_skipped_auths') {
132     abort('must specify output file') unless defined $output;
133     handle_export_skipped_auths($dbh, $schema, $batch, $output);
134 }
135
136 sub abort {
137     my $msg = shift;
138     print STDERR "$0: $msg", "\n";
139     print_usage();
140     exit 1;
141 }
142
143 sub print_usage {
144     print <<_USAGE_;
145
146 Utility to stage and overlay bib records in an Evergreen database. This
147 expects that the incoming records will have been previously exported
148 from that Evergreen database and modified in some fashion (e.g., for
149 authority record processing) and that the bib ID can be found in the
150 901\$c subfield.
151
152 This program has several modes controlled by the --action switch:
153
154   --action stage_bibs  - load MARC bib records into a staging table
155   --action filter_bibs - mark previously staged bibs that should
156                          be excluded from a subsequent load, either
157                          because the target bib is deleted in Evergreen
158                          or the record was modified after a date
159                          specified by the --cutoff switch
160   --action load_bibs   - overlay bib records using a previously staged
161                          batch, one at a time. After each bib, it will
162                          wait the number of seconds specified by the
163                          --wait switch.
164
165   --action stage_auths          - load MARC authorities into staging
166                                   table
167   --action match_auths          - identify matches with authority
168                                   records already present in the
169                                   Evergreen database; matching is
170                                   based on LCCN, cancelled LCCN, and
171                                   main heading.
172   --action load_new_auths       - load new (unmatched) authorities
173   --action overlay_auths_stage1 - overlay based on LCCN where
174                                   heading has NOT changed; this step
175                                   disables propagation to bib records
176   --action overlay_auths_stage2 - overlay based on LCCN where heading
177                                   HAS changed; propagates changes
178                                   to bib records
179   --action overlay_auths_stage3 - overlay for records where a cancelled
180                                   LCCN is replaced with a new one
181   --action link_auth_auth       - run authority_authority_linker.pl for
182                                   the authorities that were overlaid
183                                   or added in this batch.
184   --action link_auth_bib        - run authority_control_fields.pl for
185                                   the bibs that were overlaid in this
186                                   batch.  Add --link-skipped to specify
187                                   that bibs that were matched but
188                                   skipped due to having be edited after
189                                   the cutoff should be linked (rather
190                                   than linking the imported bibs)
191   --action export_skipped_bibs  - export to ISO2709 file whose name is
192                                   specified by --output those bibs
193                                   that had been edited after the cutoff.
194   --action export_skipped_auths - export to ISO2709 file whose name is
195                                   specified by --output those authorities
196                                   that could not be definitively
197                                   handled as updates or adds.
198
199 Several switches are used regardless of the specified action:
200
201   --schema  - Pg schema in which staging table will live; should be
202               created beforehand
203   --batch   - name of bib batch; will also be used as the name
204               of the staging tables
205   --db      - database name
206   --dbuser  - database user
207   --dbpw    - database password
208   --dbhost  - database host
209
210 Examples:
211
212 $0 --schema bib_load --batch bibs_2016_01 --db evergreen \\
213    --dbuser evergreen --dbpw evergreen --dbhost localhost \\
214    --action stage_bibs -- file1.mrc file2.mrc [...]
215
216 $0 --schema bib_load --batch bibs_2016_01 --db evergreen \\
217    --dbuser evergreen --dbpw evergreen --dbhost localhost \\
218    --action filter_bibs --cutoff 2016-01-02
219
220 $0 --schema bib_load --batch bibs_2016_01 --db evergreen \\
221    --dbuser evergreen --dbpw evergreen --dbhost localhost \\
222    --action load_bibs --wait 2
223
224 _USAGE_
225 }
226
227
228 sub report_progress {
229     my ($msg, $counter) = @_;
230     if (defined $counter) {
231         print STDERR "$msg: $counter\n";
232     } else {
233         print STDERR "$msg\n";
234     }
235 }
236
237 sub connect_db {
238     my ($db, $dbuser, $dbpw, $dbhost) = @_;
239
240     my $dsn = "dbi:Pg:host=$dbhost;dbname=$db;port=5432";
241
242     my $attrs = {
243         ShowErrorStatement => 1,
244         RaiseError => 1,
245         PrintError => 1,
246         pg_enable_utf8 => 1,
247     };
248     my $dbh = DBI->connect($dsn, $dbuser, $dbpw, $attrs);
249
250     return $dbh;
251 }
252
253 sub handle_stage_bibs {
254     my $dbh = shift;
255     my $schema = shift;
256     my $batch = shift;
257
258     $dbh->do(qq{
259         DROP TABLE IF EXISTS $schema.$batch;
260     });
261     $dbh->do(qq{
262         CREATE TABLE $schema.$batch (
263             id          SERIAL,
264             marc        TEXT,
265             bib_id      BIGINT,
266             imported    BOOLEAN DEFAULT FALSE,
267             to_import   BOOLEAN DEFAULT TRUE,
268             skip_reason TEXT
269         )
270     });
271
272     local $/ = "\035";
273     my $i = 0;
274     binmode STDIN, ':utf8';
275     my $ins = $dbh->prepare("INSERT INTO $schema.$batch (marc, bib_id) VALUES (?, ?)");
276     $dbh->begin_work;
277     while (<>) {
278         $i++;
279         if (0 == $i % 100) {
280             report_progress("Records staged", $i);
281             $dbh->commit;
282             $dbh->begin_work;
283         }
284         my $marc = MARC::Record->new_from_usmarc($_);
285         my $bibid = $marc->subfield('901', 'c');
286         if ($bibid !~ /^\d+$/) {
287             print STDERR "Record $i is suspect; skipping\n";
288             next;
289         }
290         my $xml = OpenILS::Application::AppUtils->entityize($marc->as_xml_record());
291         $ins->execute($xml, $bibid);
292     }
293     $dbh->commit;
294     report_progress("Records staged", $i) if 0 != $i % 100;
295     $dbh->do(qq/
296         CREATE INDEX ${batch}_bib_id_idx ON
297             $schema.$batch (bib_id);
298     /);
299     $dbh->do(qq/
300         CREATE INDEX ${batch}_id_idx ON
301             $schema.$batch (id);
302     /);
303 }
304
305 sub handle_filter_bibs {
306     my $dbh = shift;
307     my $schema = shift;
308     my $batch = shift;
309     my $cutoff = shift;
310
311     my $sth1 = $dbh->prepare(qq{
312         UPDATE $schema.$batch
313         SET to_import = FALSE,
314             skip_reason = 'deleted'
315         WHERE bib_id IN (
316             SELECT id
317             FROM biblio.record_entry
318             WHERE deleted
319         )
320         AND to_import
321         AND NOT imported
322     });
323     $sth1->execute();
324     my $ct = $sth1->rows;
325     report_progress("Filtering out $ct records that are currently deleted");
326
327     my $sth2 = $dbh->prepare(qq{
328         UPDATE $schema.$batch
329         SET to_import = FALSE,
330             skip_reason = 'edited after cutoff of $cutoff'
331         WHERE bib_id IN (
332             SELECT id
333             FROM biblio.record_entry
334             WHERE edit_date >= ?
335         )
336         AND to_import
337         AND NOT imported
338     });
339     $sth2->execute($cutoff);
340     $ct = $sth2->rows;
341     report_progress("Filtering out $ct records edited after cutoff date of $cutoff");
342
343     my $sth3 = $dbh->prepare(qq{
344         UPDATE $schema.$batch
345         SET to_import = FALSE,
346             skip_reason = 'XML is not well-formed'
347         WHERE NOT xml_is_well_formed(marc)
348         AND to_import
349         AND NOT imported
350     });
351     $sth3->execute();
352     $ct = $sth3->rows;
353     report_progress("Filtering out $ct records whose XML is not well-formed");
354 }
355
356 sub handle_load_bibs {
357     my $dbh = shift;
358     my $schema = shift;
359     my $batch = shift;
360     my $wait = shift;
361
362     my $getct = $dbh->prepare(qq{
363         SELECT COUNT(*)
364         FROM  $schema.$batch
365         WHERE to_import
366         AND NOT imported
367     });
368     $getct->execute();
369     my $max = $getct->fetchrow_arrayref()->[0];
370
371     report_progress('Number of bibs to update', $max);
372     for (my $i = 1; $i <= $max; $i++) {
373         report_progress('... bibs updated', $i) if 0 == $i % 10 or $i == $max;
374         $dbh->begin_work;
375         $dbh->do(qq{
376             UPDATE biblio.record_entry a
377             SET marc = b.marc
378             FROM $schema.$batch b
379             WHERE a.id = b.bib_id
380             AND bib_id IN (
381                 SELECT bib_id
382                 FROM $schema.$batch
383                 WHERE to_import
384                 AND NOT imported
385                 ORDER BY id
386                 LIMIT 1
387             )
388         });
389         $dbh->do(qq{
390             UPDATE $schema.$batch
391             SET imported = TRUE
392             WHERE bib_id IN (
393                 SELECT bib_id
394                 FROM $schema.$batch
395                 WHERE to_import
396                 AND NOT imported
397                 ORDER BY id
398                 LIMIT 1
399             )
400         });
401         $dbh->commit;
402         sleep $wait;
403     }
404 }
405
406 sub handle_stage_auths {
407     my $dbh = shift;
408     my $schema = shift;
409     my $batch = shift;
410
411     $dbh->do(qq{
412         DROP TABLE IF EXISTS $schema.auths_$batch;
413     });
414     $dbh->do(qq{
415         CREATE TABLE $schema.auths_$batch (
416             id          SERIAL,
417             marc        TEXT,
418             auth_id     BIGINT,
419             new_auth_id BIGINT,
420             existing_heading TEXT,
421             lccn        TEXT,
422             cancelled_lccn TEXT,
423             cancelled_auth_id BIGINT,
424             heading     TEXT,
425             lccn_matched BOOLEAN DEFAULT FALSE,
426             heading_matched BOOLEAN DEFAULT FALSE,
427             imported    BOOLEAN DEFAULT FALSE,
428             to_import   BOOLEAN DEFAULT TRUE,
429             skip_reason TEXT
430         )
431     });
432
433     local $/ = "\035";
434     my $i = 0;
435     binmode STDIN, ':utf8';
436     my $ins = $dbh->prepare(qq{
437         INSERT INTO $schema.auths_$batch (marc, auth_id, lccn, cancelled_lccn, heading)
438         VALUES (?, ?, ?, ?, authority.normalize_heading(?))
439     });
440     $dbh->begin_work;
441     while (<>) {
442         $i++;
443         if (0 == $i % 100) {
444             report_progress("Records staged", $i);
445             $dbh->commit;
446             $dbh->begin_work;
447         }
448         my $marc = MARC::Record->new_from_usmarc($_);
449         my $authid = $marc->subfield('901', 'c');
450         if (defined($authid) && $authid !~ /^\d+$/) {
451             undef $authid;
452         }
453         my $lccn = $marc->subfield('010', 'a');
454         if (defined $lccn) {
455             $lccn =~ s/^\s+//;
456             $lccn =~ s/\s+$//;
457             $lccn =~ s/\s+/ /g;
458         }
459         my $cancelled_lccn = $marc->subfield('010', 'z');
460         if (defined $cancelled_lccn) {
461             $cancelled_lccn =~ s/^\s+//;
462             $cancelled_lccn =~ s/\s+$//;
463             $cancelled_lccn =~ s/\s+/ /g;
464         }
465         my $xml = OpenILS::Application::AppUtils->entityize($marc->as_xml_record());
466         $ins->execute($xml, $authid, $lccn, $cancelled_lccn, $xml);
467     }
468     $dbh->commit;
469     report_progress("Records staged", $i) if 0 != $i % 100;
470     $dbh->do(qq/
471         CREATE INDEX auths_${batch}_auth_id_idx ON
472             $schema.auths_$batch (auth_id);
473     /);
474     $dbh->do(qq/
475         CREATE INDEX auths_${batch}_id_idx ON
476             $schema.auths_$batch (id);
477     /);
478     $dbh->do(qq/
479         CREATE INDEX auths_${batch}_lccn_idx ON
480             $schema.auths_$batch (lccn);
481     /);
482 }
483
484 sub handle_match_auths {
485     my ($dbh, $schema, $batch) = @_;
486
487     my $sth = $dbh->prepare(qq{
488         UPDATE $schema.auths_${batch} a
489         SET auth_id = b.record,
490             lccn_matched = TRUE,
491             existing_heading = authority.normalize_heading(c.marc)
492         FROM authority.full_rec b
493         JOIN authority.record_entry c ON (b.record = c.id)
494         WHERE tag = '010'
495         AND   subfield = 'a'
496         AND   value = lccn
497         AND   auth_id IS NULL
498         AND   lccn IS NOT NULL;
499     });
500     $sth->execute();
501     my $ct = $sth->rows;
502     report_progress("Matched $ct authorities on LCCN");
503
504     $sth = $dbh->prepare(qq{
505         UPDATE $schema.auths_${batch} a
506         SET cancelled_auth_id = b.record
507         FROM authority.full_rec b
508         WHERE tag = '010'
509         AND   subfield = 'a'
510         AND   value = cancelled_lccn
511         AND   auth_id IS NULL
512         AND   cancelled_lccn IS NOT NULL;
513     });
514     $sth->execute();
515     $ct = $sth->rows;
516     report_progress("Matched $ct authorities on cancelled LCCN");
517
518     $sth = $dbh->prepare(qq{
519         UPDATE $schema.auths_$batch a
520         SET auth_id = b.id,
521             heading_matched = TRUE,
522             existing_heading = b.heading
523         FROM authority.record_entry b
524         WHERE a.heading = b.heading
525         AND   auth_id IS NULL;
526     });
527     $sth->execute();
528     $ct = $sth->rows;
529     report_progress("Matched $ct authorities on heading");
530 }
531
532 sub handle_load_new_auths {
533     my $dbh = shift;
534     my $schema = shift;
535     my $batch = shift;
536
537     my $getct = $dbh->prepare(qq{
538         SELECT COUNT(*)
539         FROM  $schema.auths_$batch
540         WHERE to_import
541         AND NOT imported
542         AND new_auth_id IS NULL
543         AND auth_id IS NULL
544         AND cancelled_auth_id IS NULL
545     });
546     $getct->execute();
547     my $max = $getct->fetchrow_arrayref()->[0];
548
549     report_progress('Number of authorities to add', $max);
550     for (my $i = 1; $i <= $max; $i++) {
551         report_progress('... authorities added', $i) if 0 == $i % 10 or $i == $max;
552         $dbh->begin_work;
553         $dbh->do(qq{
554             INSERT INTO authority.record_entry (marc, last_xact_id)
555             SELECT marc, ? || '-' || id
556             FROM $schema.auths_$batch b
557             WHERE id IN (
558                 SELECT id
559                 FROM $schema.auths_$batch
560                 WHERE to_import
561                 AND NOT imported
562                 AND new_auth_id IS NULL
563                 AND auth_id IS NULL
564                 AND cancelled_auth_id IS NULL
565                 ORDER BY id
566                 LIMIT 1
567             )
568         }, {}, "auths_$batch");
569         $dbh->do(qq{
570             UPDATE $schema.auths_$batch
571             SET imported = TRUE,
572                 new_auth_id = CURRVAL('authority.record_entry_id_seq')
573             WHERE id IN (
574                 SELECT id
575                 FROM $schema.auths_$batch
576                 WHERE to_import
577                 AND NOT imported
578                 AND new_auth_id IS NULL
579                 AND auth_id IS NULL
580                 AND cancelled_auth_id IS NULL
581                 ORDER BY id
582                 LIMIT 1
583             )
584         });
585         $dbh->commit;
586     }
587 }
588
589 sub handle_overlay_auths_stage1 {
590     my $dbh = shift;
591     my $schema = shift;
592     my $batch = shift;
593
594     my $getct = $dbh->prepare(qq{
595         SELECT COUNT(*)
596         FROM  $schema.auths_$batch
597         WHERE to_import
598         AND NOT imported
599         AND lccn_matched
600         AND heading = existing_heading
601     });
602     $getct->execute();
603     my $max = $getct->fetchrow_arrayref()->[0];
604     report_progress('Number of auths to update', $max);
605
606     $dbh->do(q{
607         UPDATE config.internal_flag SET enabled = TRUE
608         WHERE name = 'ingest.disable_authority_auto_update';
609     });
610     for (my $i = 1; $i <= $max; $i++) {
611         report_progress('... auths updated', $i) if 0 == $i % 10 or $i == $max;
612         $dbh->begin_work;
613         $dbh->do(qq{
614             UPDATE authority.record_entry a
615             SET marc = b.marc,
616                 edit_date = NOW()
617             FROM $schema.auths_$batch b
618             WHERE a.id = b.auth_id
619             AND auth_id IN (
620                 SELECT auth_id
621                 FROM $schema.auths_$batch
622                 WHERE to_import
623                 AND NOT imported
624                 AND lccn_matched
625                 AND heading = existing_heading
626                 ORDER BY id
627                 LIMIT 1
628             )
629         });
630         $dbh->do(qq{
631             UPDATE $schema.auths_$batch
632             SET imported = TRUE
633             WHERE auth_id IN (
634                 SELECT auth_id
635                 FROM $schema.auths_$batch
636                 WHERE to_import
637                 AND NOT imported
638                 AND lccn_matched
639                 AND heading = existing_heading
640                 ORDER BY id
641                 LIMIT 1
642             )
643         });
644         $dbh->commit;
645     }
646     $dbh->do(q{
647         UPDATE config.internal_flag SET enabled = FALSE
648         WHERE name = 'ingest.disable_authority_auto_update';
649     });
650 }
651
652 sub handle_overlay_auths_stage2 {
653     my $dbh = shift;
654     my $schema = shift;
655     my $batch = shift;
656
657     my $getct = $dbh->prepare(qq{
658         SELECT COUNT(*)
659         FROM  $schema.auths_$batch
660         WHERE to_import
661         AND NOT imported
662         AND lccn_matched
663         AND heading <> existing_heading
664     });
665     $getct->execute();
666     my $max = $getct->fetchrow_arrayref()->[0];
667     report_progress('Number of auths to update', $max);
668
669     for (my $i = 1; $i <= $max; $i++) {
670         report_progress('... auths updated', $i) if 0 == $i % 10 or $i == $max;
671         $dbh->begin_work;
672         $dbh->do(qq{
673             UPDATE authority.record_entry a
674             SET marc = b.marc,
675                 edit_date = NOW()
676             FROM $schema.auths_$batch b
677             WHERE a.id = b.auth_id
678             AND auth_id IN (
679                 SELECT auth_id
680                 FROM $schema.auths_$batch
681                 WHERE to_import
682                 AND NOT imported
683                 AND lccn_matched
684                 AND heading <> existing_heading
685                 ORDER BY id
686                 LIMIT 1
687             )
688         });
689         $dbh->do(qq{
690             UPDATE $schema.auths_$batch
691             SET imported = TRUE
692             WHERE auth_id IN (
693                 SELECT auth_id
694                 FROM $schema.auths_$batch
695                 WHERE to_import
696                 AND NOT imported
697                 AND lccn_matched
698                 AND heading <> existing_heading
699                 ORDER BY id
700                 LIMIT 1
701             )
702         });
703         $dbh->commit;
704     }
705 }
706
707 sub handle_overlay_auths_stage3 {
708     my $dbh = shift;
709     my $schema = shift;
710     my $batch = shift;
711
712     my $getct = $dbh->prepare(qq{
713         SELECT COUNT(*)
714         FROM  $schema.auths_$batch
715         WHERE to_import
716         AND NOT imported
717         AND (
718             auth_id IS NULL OR
719             auth_id = cancelled_auth_id
720         )
721         AND cancelled_auth_id IS NOT NULL
722     });
723     $getct->execute();
724     my $max = $getct->fetchrow_arrayref()->[0];
725     report_progress('Number of auths to update', $max);
726
727     for (my $i = 1; $i <= $max; $i++) {
728         report_progress('... auths updated', $i) if 0 == $i % 10 or $i == $max;
729         $dbh->begin_work;
730         $dbh->do(qq{
731             UPDATE authority.record_entry a
732             SET marc = b.marc,
733                 edit_date = NOW()
734             FROM $schema.auths_$batch b
735             WHERE a.id = b.cancelled_auth_id
736             AND cancelled_auth_id IN (
737                 SELECT auth_id
738                 FROM $schema.auths_$batch
739                 WHERE to_import
740                 AND NOT imported
741                 AND (
742                     auth_id IS NULL OR
743                     auth_id = cancelled_auth_id
744                 )
745                 AND cancelled_auth_id IS NOT NULL
746                 ORDER BY id
747                 LIMIT 1
748             )
749         });
750         $dbh->do(qq{
751             UPDATE $schema.auths_$batch
752             SET imported = TRUE
753             WHERE cancelled_auth_id IN (
754                 SELECT cancelled_auth_id
755                 FROM $schema.auths_$batch
756                 WHERE to_import
757                 AND NOT imported
758                 AND (
759                     auth_id IS NULL OR
760                     auth_id = cancelled_auth_id
761                 )
762                 AND cancelled_auth_id IS NOT NULL
763                 ORDER BY id
764                 LIMIT 1
765             )
766         });
767         $dbh->commit;
768     }
769 }
770
771 sub handle_link_auth_auth {
772     my $dbh = shift;
773     my $schema = shift;
774     my $batch = shift;
775
776     $dbh->do(q{
777         UPDATE config.internal_flag SET enabled = TRUE
778         WHERE name = 'ingest.disable_authority_auto_update';
779     });
780
781     my $sth = $dbh->prepare(qq{
782         SELECT COALESCE(new_auth_id, auth_id, cancelled_auth_id) AS id
783         FROM $schema.auths_$batch
784         WHERE imported
785         ORDER BY 1
786     });
787     $sth->execute();
788     my @ids = map { $_->{id} } @{ $sth->fetchall_arrayref({}) };
789     my $i = 0;
790     report_progress(scalar(@ids) . " records to do auth-auth linking");
791     foreach my $id (@ids) {
792         $i++;
793         report_progress('... auth-auth linkings processed', $i) if 0 == $i % 10 or $i == scalar(@ids);
794         system "/openils/bin/authority_authority_linker.pl -r $id -c /openils/conf/opensrf_core.xml";
795     }
796
797     $dbh->do(q{
798         UPDATE config.internal_flag SET enabled = FALSE
799         WHERE name = 'ingest.disable_authority_auto_update';
800     });
801 }
802
803 sub handle_link_auth_bib {
804     my $dbh = shift;
805     my $schema = shift;
806     my $batch = shift;
807     my $link_skipped = shift;
808
809     my $query;
810     if ($link_skipped) {
811         $query = qq{
812             SELECT bib_id AS id
813             FROM $schema.$batch
814             WHERE NOT imported
815             AND skip_reason ~ '^edit'
816             ORDER BY 1
817         };
818     } else {
819         $query = qq{
820             SELECT bib_id AS id
821             FROM $schema.$batch
822             WHERE imported
823             ORDER BY 1
824         };
825     }
826
827     my $sth = $dbh->prepare($query);
828     $sth->execute();
829     my @ids = map { $_->{id} } @{ $sth->fetchall_arrayref({}) };
830     my $i = 0;
831     report_progress(scalar(@ids) . " records to do auth-bib linking");
832     foreach my $id (@ids) {
833         $i++;
834         report_progress('... auth-bib linkings processed', $i) if 0 == $i % 10 or $i == scalar(@ids);
835         system "/openils/bin/authority_control_fields.pl --record $id -c /openils/conf/opensrf_core.xml";
836     }
837
838 }
839
840 sub handle_export_skipped_bibs {
841     my $dbh = shift;
842     my $schema = shift;
843     my $batch = shift;
844     my $output = shift;
845
846     my $outfh;
847     open($outfh, '>', $output) or die("Could not open input file $output: $!\n");
848     binmode $outfh, ':utf8';
849
850     my $sth = $dbh->prepare(qq{
851         SELECT marc
852         FROM $schema.$batch
853         WHERE skip_reason ~ '^edit'
854         ORDER BY id
855     });
856     $sth->execute();
857    
858     while (my $row  = $sth->fetchrow_hashref()) {
859         my $marc = MARC::Record->new_from_xml($row->{marc});
860         print $outfh $marc->as_usmarc();
861     }
862     $outfh->close();
863 }
864
865 sub handle_export_skipped_auths {
866     my $dbh = shift;
867     my $schema = shift;
868     my $batch = shift;
869     my $output = shift;
870
871     my $outfh;
872     open($outfh, '>', $output) or die("Could not open input file $output: $!\n");
873     binmode $outfh, ':utf8';
874
875     my $sth = $dbh->prepare(qq{
876         SELECT marc
877         FROM $schema.auths_$batch
878         WHERE NOT imported
879         ORDER BY id
880     });
881     $sth->execute();
882    
883     while (my $row  = $sth->fetchrow_hashref()) {
884         my $marc = MARC::Record->new_from_xml($row->{marc});
885         print $outfh $marc->as_usmarc();
886     }
887     $outfh->close();
888 }