Use eval in eg_staged_bib_overlay.
[migration-tools.git] / eg_staged_bib_overlay
1 #!/usr/bin/perl
2
3 # Copyright (c) 2016 Equinox Software, Inc.
4 # Author: Galen Charlton <gmc@esilibrary.com>
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2, or (at your option)
9 # any later version.
10 #
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 # GNU General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program.  If not, see <http://www.gnu.org/licenses/>
18
19 use strict;
20 use warnings;
21
22 use Getopt::Long;
23 use MARC::Record;
24 use MARC::File::XML (BinaryEncoding => 'utf8');
25 use DBI;
26 use OpenILS::Application::AppUtils;
27
28 my $action;
29 my $schema = 'bib_loads';
30 my $db;
31 my $dbuser;
32 my $dbpw;
33 my $dbhost;
34 my $batch;
35 my $cutoff;
36 my $wait = 1;
37 my $output;
38 my $link_skipped;
39
40 my $ret = GetOptions(
41     'action:s'      => \$action,
42     'schema:s'      => \$schema,
43     'db:s'          => \$db,
44     'dbuser:s'      => \$dbuser,
45     'dbhost:s'      => \$dbhost,
46     'dbpw:s'        => \$dbpw,
47     'batch:s'       => \$batch,
48     'cutoff:s'      => \$cutoff,
49     'wait:i'        => \$wait,
50     'output:s'      => \$output,
51     'link-skipped'  => \$link_skipped,
52 );
53
54 abort('must specify --action') unless defined $action;
55 abort('must specify --schema') unless defined $schema;
56 abort('must specify --db') unless defined $db;
57 abort('must specify --dbuser') unless defined $dbuser;
58 abort('must specify --dbhost') unless defined $dbhost;
59 abort('must specify --dbpw') unless defined $dbpw;
60 abort('must specify --batch') unless defined $batch;
61
62 abort(q{--action must be "stage_bibs", "filter_bibs", "load_bibs", "stage_auths",
63 "match_auths", "load_new_auths", "overlay_auths_stage1",
64 "overlay_auths_stage2", "overlay_auths_stage3", "link_auth_auth",
65 "link_auth_bib", "export_skipped_bibs", or "export_skipped_auths"}) unless
66     $action eq 'filter_bibs' or
67     $action eq 'stage_bibs' or
68     $action eq 'load_bibs' or
69     $action eq 'stage_auths' or
70     $action eq 'match_auths' or
71     $action eq 'load_new_auths' or
72     $action eq 'overlay_auths_stage1' or
73     $action eq 'overlay_auths_stage2' or
74     $action eq 'overlay_auths_stage3' or
75     $action eq 'link_auth_auth' or
76     $action eq 'link_auth_bib' or
77     $action eq 'export_skipped_bibs' or
78     $action eq 'export_skipped_auths'
79 ;
80
81 my $dbh = connect_db($db, $dbuser, $dbpw, $dbhost);
82
83 if ($action eq 'stage_bibs') {
84     abort('must specify at least one input file') unless @ARGV;
85     handle_stage_bibs($dbh, $schema, $batch);
86 }
87
88 if ($action eq 'filter_bibs') {
89     abort('must specify cutoff date when filtering') unless defined $cutoff;
90     handle_filter_bibs($dbh, $schema, $batch, $cutoff);
91 }
92
93 if ($action eq 'load_bibs' ) {
94     handle_load_bibs($dbh, $schema, $batch, $wait);
95 }
96
97 if ($action eq 'stage_auths') {
98     abort('must specify at least one input file') unless @ARGV;
99     handle_stage_auths($dbh, $schema, $batch);
100 }
101
102 if ($action eq 'match_auths') {
103     handle_match_auths($dbh, $schema, $batch);
104 }
105
106 if ($action eq 'load_new_auths') {
107     handle_load_new_auths($dbh, $schema, $batch);
108 }
109
110 if ($action eq 'overlay_auths_stage1') {
111     handle_overlay_auths_stage1($dbh, $schema, $batch);
112 }
113 if ($action eq 'overlay_auths_stage2') {
114     handle_overlay_auths_stage2($dbh, $schema, $batch);
115 }
116 if ($action eq 'overlay_auths_stage3') {
117     handle_overlay_auths_stage3($dbh, $schema, $batch);
118 }
119
120 if ($action eq 'link_auth_auth') {
121     handle_link_auth_auth($dbh, $schema, $batch);
122 }
123 if ($action eq 'link_auth_bib') {
124     handle_link_auth_bib($dbh, $schema, $batch, $link_skipped);
125 }
126
127 if ($action eq 'export_skipped_bibs') {
128     abort('must specify output file') unless defined $output;
129     handle_export_skipped_bibs($dbh, $schema, $batch, $output);
130 }
131 if ($action eq 'export_skipped_auths') {
132     abort('must specify output file') unless defined $output;
133     handle_export_skipped_auths($dbh, $schema, $batch, $output);
134 }
135
136 sub abort {
137     my $msg = shift;
138     print STDERR "$0: $msg", "\n";
139     print_usage();
140     exit 1;
141 }
142
143 sub print_usage {
144     print <<_USAGE_;
145
146 Utility to stage and overlay bib records in an Evergreen database. This
147 expects that the incoming records will have been previously exported
148 from that Evergreen database and modified in some fashion (e.g., for
149 authority record processing) and that the bib ID can be found in the
150 901\$c subfield.
151
152 This program has several modes controlled by the --action switch:
153
154   --action stage_bibs  - load MARC bib records into a staging table
155   --action filter_bibs - mark previously staged bibs that should
156                          be excluded from a subsequent load, either
157                          because the target bib is deleted in Evergreen
158                          or the record was modified after a date
159                          specified by the --cutoff switch
160   --action load_bibs   - overlay bib records using a previously staged
161                          batch, one at a time. After each bib, it will
162                          wait the number of seconds specified by the
163                          --wait switch.
164
165   --action stage_auths          - load MARC authorities into staging
166                                   table
167   --action match_auths          - identify matches with authority
168                                   records already present in the
169                                   Evergreen database; matching is
170                                   based on LCCN, cancelled LCCN, and
171                                   main heading.
172   --action load_new_auths       - load new (unmatched) authorities
173   --action overlay_auths_stage1 - overlay based on LCCN where
174                                   heading has NOT changed; this step
175                                   disables propagation to bib records
176   --action overlay_auths_stage2 - overlay based on LCCN where heading
177                                   HAS changed; propagates changes
178                                   to bib records
179   --action overlay_auths_stage3 - overlay for records where a cancelled
180                                   LCCN is replaced with a new one
181   --action link_auth_auth       - run authority_authority_linker.pl for
182                                   the authorities that were overlaid
183                                   or added in this batch.
184   --action link_auth_bib        - run authority_control_fields.pl for
185                                   the bibs that were overlaid in this
186                                   batch.  Add --link-skipped to specify
187                                   that bibs that were matched but
188                                   skipped due to having be edited after
189                                   the cutoff should be linked (rather
190                                   than linking the imported bibs)
191   --action export_skipped_bibs  - export to ISO2709 file whose name is
192                                   specified by --output those bibs
193                                   that had been edited after the cutoff.
194   --action export_skipped_auths - export to ISO2709 file whose name is
195                                   specified by --output those authorities
196                                   that could not be definitively
197                                   handled as updates or adds.
198
199 Several switches are used regardless of the specified action:
200
201   --schema  - Pg schema in which staging table will live; should be
202               created beforehand
203   --batch   - name of bib batch; will also be used as the name
204               of the staging tables
205   --db      - database name
206   --dbuser  - database user
207   --dbpw    - database password
208   --dbhost  - database host
209
210 Examples:
211
212 $0 --schema bib_load --batch bibs_2016_01 --db evergreen \\
213    --dbuser evergreen --dbpw evergreen --dbhost localhost \\
214    --action stage_bibs -- file1.mrc file2.mrc [...]
215
216 $0 --schema bib_load --batch bibs_2016_01 --db evergreen \\
217    --dbuser evergreen --dbpw evergreen --dbhost localhost \\
218    --action filter_bibs --cutoff 2016-01-02
219
220 $0 --schema bib_load --batch bibs_2016_01 --db evergreen \\
221    --dbuser evergreen --dbpw evergreen --dbhost localhost \\
222    --action load_bibs --wait 2
223
224 _USAGE_
225 }
226
227
228 sub report_progress {
229     my ($msg, $counter) = @_;
230     if (defined $counter) {
231         print STDERR "$msg: $counter\n";
232     } else {
233         print STDERR "$msg\n";
234     }
235 }
236
237 sub connect_db {
238     my ($db, $dbuser, $dbpw, $dbhost) = @_;
239
240     my $dsn = "dbi:Pg:host=$dbhost;dbname=$db;port=5432";
241
242     my $attrs = {
243         ShowErrorStatement => 1,
244         RaiseError => 1,
245         PrintError => 1,
246         pg_enable_utf8 => 1,
247     };
248     my $dbh = DBI->connect($dsn, $dbuser, $dbpw, $attrs);
249
250     return $dbh;
251 }
252
253 sub handle_stage_bibs {
254     my $dbh = shift;
255     my $schema = shift;
256     my $batch = shift;
257
258     $dbh->do(qq{
259         DROP TABLE IF EXISTS $schema.$batch;
260     });
261     $dbh->do(qq{
262         CREATE TABLE $schema.$batch (
263             id          SERIAL,
264             marc        TEXT,
265             bib_id      BIGINT,
266             imported    BOOLEAN DEFAULT FALSE,
267             to_import   BOOLEAN DEFAULT TRUE,
268             skip_reason TEXT
269         )
270     });
271
272     local $/ = "\035";
273     my $i = 0;
274     binmode STDIN, ':utf8';
275     my $ins = $dbh->prepare("INSERT INTO $schema.$batch (marc, bib_id) VALUES (?, ?)");
276     $dbh->begin_work;
277     while (<>) {
278         $i++;
279         if (0 == $i % 100) {
280             report_progress("Records staged", $i);
281             $dbh->commit;
282             $dbh->begin_work;
283         }
284         eval {
285             my $marc = MARC::Record->new_from_usmarc($_);
286             my $bibid = $marc->subfield('901', 'c');
287             if ($bibid !~ /^\d+$/) {
288                 print STDERR "Record $i is suspect; skipping\n";
289                 next;
290             }
291             my $xml = OpenILS::Application::AppUtils->entityize($marc->as_xml_record());
292             $ins->execute($xml, $bibid);
293         };
294         if ($@) {
295             print STDERR "Record $i is bad; skipping\n";
296             next;
297         }
298     }
299     $dbh->commit;
300     report_progress("Records staged", $i) if 0 != $i % 100;
301     $dbh->do(qq/
302         CREATE INDEX ${batch}_bib_id_idx ON
303             $schema.$batch (bib_id);
304     /);
305     $dbh->do(qq/
306         CREATE INDEX ${batch}_id_idx ON
307             $schema.$batch (id);
308     /);
309 }
310
311 sub handle_filter_bibs {
312     my $dbh = shift;
313     my $schema = shift;
314     my $batch = shift;
315     my $cutoff = shift;
316
317     my $sth1 = $dbh->prepare(qq{
318         UPDATE $schema.$batch
319         SET to_import = FALSE,
320             skip_reason = 'deleted'
321         WHERE bib_id IN (
322             SELECT id
323             FROM biblio.record_entry
324             WHERE deleted
325         )
326         AND to_import
327         AND NOT imported
328     });
329     $sth1->execute();
330     my $ct = $sth1->rows;
331     report_progress("Filtering out $ct records that are currently deleted");
332
333     my $sth2 = $dbh->prepare(qq{
334         UPDATE $schema.$batch
335         SET to_import = FALSE,
336             skip_reason = 'edited after cutoff of $cutoff'
337         WHERE bib_id IN (
338             SELECT id
339             FROM biblio.record_entry
340             WHERE edit_date >= ?
341         )
342         AND to_import
343         AND NOT imported
344     });
345     $sth2->execute($cutoff);
346     $ct = $sth2->rows;
347     report_progress("Filtering out $ct records edited after cutoff date of $cutoff");
348
349     my $sth3 = $dbh->prepare(qq{
350         UPDATE $schema.$batch
351         SET to_import = FALSE,
352             skip_reason = 'XML is not well-formed'
353         WHERE NOT xml_is_well_formed(marc)
354         AND to_import
355         AND NOT imported
356     });
357     $sth3->execute();
358     $ct = $sth3->rows;
359     report_progress("Filtering out $ct records whose XML is not well-formed");
360 }
361
362 sub handle_load_bibs {
363     my $dbh = shift;
364     my $schema = shift;
365     my $batch = shift;
366     my $wait = shift;
367
368     my $getct = $dbh->prepare(qq{
369         SELECT COUNT(*)
370         FROM  $schema.$batch
371         WHERE to_import
372         AND NOT imported
373     });
374     $getct->execute();
375     my $max = $getct->fetchrow_arrayref()->[0];
376
377     report_progress('Number of bibs to update', $max);
378     for (my $i = 1; $i <= $max; $i++) {
379         report_progress('... bibs updated', $i) if 0 == $i % 10 or $i == $max;
380         $dbh->begin_work;
381         $dbh->do(qq{
382             UPDATE biblio.record_entry a
383             SET marc = b.marc
384             FROM $schema.$batch b
385             WHERE a.id = b.bib_id
386             AND bib_id IN (
387                 SELECT bib_id
388                 FROM $schema.$batch
389                 WHERE to_import
390                 AND NOT imported
391                 ORDER BY id
392                 LIMIT 1
393             )
394         });
395         $dbh->do(qq{
396             UPDATE $schema.$batch
397             SET imported = TRUE
398             WHERE bib_id IN (
399                 SELECT bib_id
400                 FROM $schema.$batch
401                 WHERE to_import
402                 AND NOT imported
403                 ORDER BY id
404                 LIMIT 1
405             )
406         });
407         $dbh->commit;
408         sleep $wait;
409     }
410 }
411
412 sub handle_stage_auths {
413     my $dbh = shift;
414     my $schema = shift;
415     my $batch = shift;
416
417     $dbh->do(qq{
418         DROP TABLE IF EXISTS $schema.auths_$batch;
419     });
420     $dbh->do(qq{
421         CREATE TABLE $schema.auths_$batch (
422             id          SERIAL,
423             marc        TEXT,
424             auth_id     BIGINT,
425             new_auth_id BIGINT,
426             existing_heading TEXT,
427             lccn        TEXT,
428             cancelled_lccn TEXT,
429             cancelled_auth_id BIGINT,
430             heading     TEXT,
431             lccn_matched BOOLEAN DEFAULT FALSE,
432             heading_matched BOOLEAN DEFAULT FALSE,
433             imported    BOOLEAN DEFAULT FALSE,
434             to_import   BOOLEAN DEFAULT TRUE,
435             skip_reason TEXT
436         )
437     });
438
439     local $/ = "\035";
440     my $i = 0;
441     binmode STDIN, ':utf8';
442     my $ins = $dbh->prepare(qq{
443         INSERT INTO $schema.auths_$batch (marc, auth_id, lccn, cancelled_lccn, heading)
444         VALUES (?, ?, ?, ?, authority.normalize_heading(?))
445     });
446     $dbh->begin_work;
447     while (<>) {
448         $i++;
449         if (0 == $i % 100) {
450             report_progress("Records staged", $i);
451             $dbh->commit;
452             $dbh->begin_work;
453         }
454         eval {
455             my $marc = MARC::Record->new_from_usmarc($_);
456             my $authid = $marc->subfield('901', 'c');
457             if (defined($authid) && $authid !~ /^\d+$/) {
458                 undef $authid;
459             }
460             my $lccn = $marc->subfield('010', 'a');
461             if (defined $lccn) {
462                 $lccn =~ s/^\s+//;
463                 $lccn =~ s/\s+$//;
464                 $lccn =~ s/\s+/ /g;
465             }
466             my $cancelled_lccn = $marc->subfield('010', 'z');
467             if (defined $cancelled_lccn) {
468                 $cancelled_lccn =~ s/^\s+//;
469                 $cancelled_lccn =~ s/\s+$//;
470                 $cancelled_lccn =~ s/\s+/ /g;
471             }
472             my $xml = OpenILS::Application::AppUtils->entityize($marc->as_xml_record());
473             $ins->execute($xml, $authid, $lccn, $cancelled_lccn, $xml);
474         };
475         if ($@) {
476             print STDERR "Record $i is bad; skipping\n";
477             next;
478         }
479     }
480     $dbh->commit;
481     report_progress("Records staged", $i) if 0 != $i % 100;
482     $dbh->do(qq/
483         CREATE INDEX auths_${batch}_auth_id_idx ON
484             $schema.auths_$batch (auth_id);
485     /);
486     $dbh->do(qq/
487         CREATE INDEX auths_${batch}_id_idx ON
488             $schema.auths_$batch (id);
489     /);
490     $dbh->do(qq/
491         CREATE INDEX auths_${batch}_lccn_idx ON
492             $schema.auths_$batch (lccn);
493     /);
494 }
495
496 sub handle_match_auths {
497     my ($dbh, $schema, $batch) = @_;
498
499     my $sth = $dbh->prepare(qq{
500         UPDATE $schema.auths_${batch} a
501         SET auth_id = b.record,
502             lccn_matched = TRUE,
503             existing_heading = authority.normalize_heading(c.marc)
504         FROM authority.full_rec b
505         JOIN authority.record_entry c ON (b.record = c.id)
506         WHERE tag = '010'
507         AND   subfield = 'a'
508         AND   value = lccn
509         AND   auth_id IS NULL
510         AND   lccn IS NOT NULL;
511     });
512     $sth->execute();
513     my $ct = $sth->rows;
514     report_progress("Matched $ct authorities on LCCN");
515
516     $sth = $dbh->prepare(qq{
517         UPDATE $schema.auths_${batch} a
518         SET cancelled_auth_id = b.record
519         FROM authority.full_rec b
520         WHERE tag = '010'
521         AND   subfield = 'a'
522         AND   value = cancelled_lccn
523         AND   auth_id IS NULL
524         AND   cancelled_lccn IS NOT NULL;
525     });
526     $sth->execute();
527     $ct = $sth->rows;
528     report_progress("Matched $ct authorities on cancelled LCCN");
529
530     $sth = $dbh->prepare(qq{
531         UPDATE $schema.auths_$batch a
532         SET auth_id = b.id,
533             heading_matched = TRUE,
534             existing_heading = b.heading
535         FROM authority.record_entry b
536         WHERE a.heading = b.heading
537         AND   auth_id IS NULL;
538     });
539     $sth->execute();
540     $ct = $sth->rows;
541     report_progress("Matched $ct authorities on heading");
542 }
543
544 sub handle_load_new_auths {
545     my $dbh = shift;
546     my $schema = shift;
547     my $batch = shift;
548
549     my $getct = $dbh->prepare(qq{
550         SELECT COUNT(*)
551         FROM  $schema.auths_$batch
552         WHERE to_import
553         AND NOT imported
554         AND new_auth_id IS NULL
555         AND auth_id IS NULL
556         AND cancelled_auth_id IS NULL
557     });
558     $getct->execute();
559     my $max = $getct->fetchrow_arrayref()->[0];
560
561     report_progress('Number of authorities to add', $max);
562     for (my $i = 1; $i <= $max; $i++) {
563         report_progress('... authorities added', $i) if 0 == $i % 10 or $i == $max;
564         $dbh->begin_work;
565         $dbh->do(qq{
566             INSERT INTO authority.record_entry (marc, last_xact_id)
567             SELECT marc, ? || '-' || id
568             FROM $schema.auths_$batch b
569             WHERE id IN (
570                 SELECT id
571                 FROM $schema.auths_$batch
572                 WHERE to_import
573                 AND NOT imported
574                 AND new_auth_id IS NULL
575                 AND auth_id IS NULL
576                 AND cancelled_auth_id IS NULL
577                 ORDER BY id
578                 LIMIT 1
579             )
580         }, {}, "auths_$batch");
581         $dbh->do(qq{
582             UPDATE $schema.auths_$batch
583             SET imported = TRUE,
584                 new_auth_id = CURRVAL('authority.record_entry_id_seq')
585             WHERE id IN (
586                 SELECT id
587                 FROM $schema.auths_$batch
588                 WHERE to_import
589                 AND NOT imported
590                 AND new_auth_id IS NULL
591                 AND auth_id IS NULL
592                 AND cancelled_auth_id IS NULL
593                 ORDER BY id
594                 LIMIT 1
595             )
596         });
597         $dbh->commit;
598     }
599 }
600
601 sub handle_overlay_auths_stage1 {
602     my $dbh = shift;
603     my $schema = shift;
604     my $batch = shift;
605
606     my $getct = $dbh->prepare(qq{
607         SELECT COUNT(*)
608         FROM  $schema.auths_$batch
609         WHERE to_import
610         AND NOT imported
611         AND lccn_matched
612         AND heading = existing_heading
613     });
614     $getct->execute();
615     my $max = $getct->fetchrow_arrayref()->[0];
616     report_progress('Number of auths to update', $max);
617
618     $dbh->do(q{
619         UPDATE config.internal_flag SET enabled = TRUE
620         WHERE name = 'ingest.disable_authority_auto_update';
621     });
622     for (my $i = 1; $i <= $max; $i++) {
623         report_progress('... auths updated', $i) if 0 == $i % 10 or $i == $max;
624         $dbh->begin_work;
625         $dbh->do(qq{
626             UPDATE authority.record_entry a
627             SET marc = b.marc,
628                 edit_date = NOW()
629             FROM $schema.auths_$batch b
630             WHERE a.id = b.auth_id
631             AND auth_id IN (
632                 SELECT auth_id
633                 FROM $schema.auths_$batch
634                 WHERE to_import
635                 AND NOT imported
636                 AND lccn_matched
637                 AND heading = existing_heading
638                 ORDER BY id
639                 LIMIT 1
640             )
641         });
642         $dbh->do(qq{
643             UPDATE $schema.auths_$batch
644             SET imported = TRUE
645             WHERE auth_id IN (
646                 SELECT auth_id
647                 FROM $schema.auths_$batch
648                 WHERE to_import
649                 AND NOT imported
650                 AND lccn_matched
651                 AND heading = existing_heading
652                 ORDER BY id
653                 LIMIT 1
654             )
655         });
656         $dbh->commit;
657     }
658     $dbh->do(q{
659         UPDATE config.internal_flag SET enabled = FALSE
660         WHERE name = 'ingest.disable_authority_auto_update';
661     });
662 }
663
664 sub handle_overlay_auths_stage2 {
665     my $dbh = shift;
666     my $schema = shift;
667     my $batch = shift;
668
669     my $getct = $dbh->prepare(qq{
670         SELECT COUNT(*)
671         FROM  $schema.auths_$batch
672         WHERE to_import
673         AND NOT imported
674         AND lccn_matched
675         AND heading <> existing_heading
676     });
677     $getct->execute();
678     my $max = $getct->fetchrow_arrayref()->[0];
679     report_progress('Number of auths to update', $max);
680
681     for (my $i = 1; $i <= $max; $i++) {
682         report_progress('... auths updated', $i) if 0 == $i % 10 or $i == $max;
683         $dbh->begin_work;
684         $dbh->do(qq{
685             UPDATE authority.record_entry a
686             SET marc = b.marc,
687                 edit_date = NOW()
688             FROM $schema.auths_$batch b
689             WHERE a.id = b.auth_id
690             AND auth_id IN (
691                 SELECT auth_id
692                 FROM $schema.auths_$batch
693                 WHERE to_import
694                 AND NOT imported
695                 AND lccn_matched
696                 AND heading <> existing_heading
697                 ORDER BY id
698                 LIMIT 1
699             )
700         });
701         $dbh->do(qq{
702             UPDATE $schema.auths_$batch
703             SET imported = TRUE
704             WHERE auth_id IN (
705                 SELECT auth_id
706                 FROM $schema.auths_$batch
707                 WHERE to_import
708                 AND NOT imported
709                 AND lccn_matched
710                 AND heading <> existing_heading
711                 ORDER BY id
712                 LIMIT 1
713             )
714         });
715         $dbh->commit;
716     }
717 }
718
719 sub handle_overlay_auths_stage3 {
720     my $dbh = shift;
721     my $schema = shift;
722     my $batch = shift;
723
724     my $getct = $dbh->prepare(qq{
725         SELECT COUNT(*)
726         FROM  $schema.auths_$batch
727         WHERE to_import
728         AND NOT imported
729         AND (
730             auth_id IS NULL OR
731             auth_id = cancelled_auth_id
732         )
733         AND cancelled_auth_id IS NOT NULL
734     });
735     $getct->execute();
736     my $max = $getct->fetchrow_arrayref()->[0];
737     report_progress('Number of auths to update', $max);
738
739     for (my $i = 1; $i <= $max; $i++) {
740         report_progress('... auths updated', $i) if 0 == $i % 10 or $i == $max;
741         $dbh->begin_work;
742         $dbh->do(qq{
743             UPDATE authority.record_entry a
744             SET marc = b.marc,
745                 edit_date = NOW()
746             FROM $schema.auths_$batch b
747             WHERE a.id = b.cancelled_auth_id
748             AND cancelled_auth_id IN (
749                 SELECT auth_id
750                 FROM $schema.auths_$batch
751                 WHERE to_import
752                 AND NOT imported
753                 AND (
754                     auth_id IS NULL OR
755                     auth_id = cancelled_auth_id
756                 )
757                 AND cancelled_auth_id IS NOT NULL
758                 ORDER BY id
759                 LIMIT 1
760             )
761         });
762         $dbh->do(qq{
763             UPDATE $schema.auths_$batch
764             SET imported = TRUE
765             WHERE cancelled_auth_id IN (
766                 SELECT cancelled_auth_id
767                 FROM $schema.auths_$batch
768                 WHERE to_import
769                 AND NOT imported
770                 AND (
771                     auth_id IS NULL OR
772                     auth_id = cancelled_auth_id
773                 )
774                 AND cancelled_auth_id IS NOT NULL
775                 ORDER BY id
776                 LIMIT 1
777             )
778         });
779         $dbh->commit;
780     }
781 }
782
783 sub handle_link_auth_auth {
784     my $dbh = shift;
785     my $schema = shift;
786     my $batch = shift;
787
788     $dbh->do(q{
789         UPDATE config.internal_flag SET enabled = TRUE
790         WHERE name = 'ingest.disable_authority_auto_update';
791     });
792
793     my $sth = $dbh->prepare(qq{
794         SELECT COALESCE(new_auth_id, auth_id, cancelled_auth_id) AS id
795         FROM $schema.auths_$batch
796         WHERE imported
797         ORDER BY 1
798     });
799     $sth->execute();
800     my @ids = map { $_->{id} } @{ $sth->fetchall_arrayref({}) };
801     my $i = 0;
802     report_progress(scalar(@ids) . " records to do auth-auth linking");
803     foreach my $id (@ids) {
804         $i++;
805         report_progress('... auth-auth linkings processed', $i) if 0 == $i % 10 or $i == scalar(@ids);
806         system "/openils/bin/authority_authority_linker.pl -r $id -c /openils/conf/opensrf_core.xml";
807     }
808
809     $dbh->do(q{
810         UPDATE config.internal_flag SET enabled = FALSE
811         WHERE name = 'ingest.disable_authority_auto_update';
812     });
813 }
814
815 sub handle_link_auth_bib {
816     my $dbh = shift;
817     my $schema = shift;
818     my $batch = shift;
819     my $link_skipped = shift;
820
821     my $query;
822     if ($link_skipped) {
823         $query = qq{
824             SELECT bib_id AS id
825             FROM $schema.$batch
826             WHERE NOT imported
827             AND skip_reason ~ '^edit'
828             ORDER BY 1
829         };
830     } else {
831         $query = qq{
832             SELECT bib_id AS id
833             FROM $schema.$batch
834             WHERE imported
835             ORDER BY 1
836         };
837     }
838
839     my $sth = $dbh->prepare($query);
840     $sth->execute();
841     my @ids = map { $_->{id} } @{ $sth->fetchall_arrayref({}) };
842     my $i = 0;
843     report_progress(scalar(@ids) . " records to do auth-bib linking");
844     foreach my $id (@ids) {
845         $i++;
846         report_progress('... auth-bib linkings processed', $i) if 0 == $i % 10 or $i == scalar(@ids);
847         system "/openils/bin/authority_control_fields.pl --record $id -c /openils/conf/opensrf_core.xml";
848     }
849
850 }
851
852 sub handle_export_skipped_bibs {
853     my $dbh = shift;
854     my $schema = shift;
855     my $batch = shift;
856     my $output = shift;
857
858     my $outfh;
859     open($outfh, '>', $output) or die("Could not open input file $output: $!\n");
860     binmode $outfh, ':utf8';
861
862     my $sth = $dbh->prepare(qq{
863         SELECT marc
864         FROM $schema.$batch
865         WHERE skip_reason ~ '^edit'
866         ORDER BY id
867     });
868     $sth->execute();
869    
870     while (my $row  = $sth->fetchrow_hashref()) {
871         my $marc = MARC::Record->new_from_xml($row->{marc});
872         print $outfh $marc->as_usmarc();
873     }
874     $outfh->close();
875 }
876
877 sub handle_export_skipped_auths {
878     my $dbh = shift;
879     my $schema = shift;
880     my $batch = shift;
881     my $output = shift;
882
883     my $outfh;
884     open($outfh, '>', $output) or die("Could not open input file $output: $!\n");
885     binmode $outfh, ':utf8';
886
887     my $sth = $dbh->prepare(qq{
888         SELECT marc
889         FROM $schema.auths_$batch
890         WHERE NOT imported
891         ORDER BY id
892     });
893     $sth->execute();
894    
895     while (my $row  = $sth->fetchrow_hashref()) {
896         my $marc = MARC::Record->new_from_xml($row->{marc});
897         print $outfh $marc->as_usmarc();
898     }
899     $outfh->close();
900 }