eg_staged_bib_overlay: add actions to export skipped records
[migration-tools.git] / eg_staged_bib_overlay
1 #!/usr/bin/perl
2
3 # Copyright (c) 2016 Equinox Software, Inc.
4 # Author: Galen Charlton <gmc@esilibrary.com>
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2, or (at your option)
9 # any later version.
10 #
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 # GNU General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program.  If not, see <http://www.gnu.org/licenses/>
18
19 use strict;
20 use warnings;
21
22 use Getopt::Long;
23 use MARC::Record;
24 use MARC::File::XML (BinaryEncoding => 'utf8');
25 use DBI;
26 use OpenILS::Application::AppUtils;
27
28 my $action;
29 my $schema = 'bib_loads';
30 my $db;
31 my $dbuser;
32 my $dbpw;
33 my $dbhost;
34 my $batch;
35 my $cutoff;
36 my $wait = 1;
37 my $output;
38
39 my $ret = GetOptions(
40     'action:s'      => \$action,
41     'schema:s'      => \$schema,
42     'db:s'          => \$db,
43     'dbuser:s'      => \$dbuser,
44     'dbhost:s'      => \$dbhost,
45     'dbpw:s'        => \$dbpw,
46     'batch:s'       => \$batch,
47     'cutoff:s'      => \$cutoff,
48     'wait:i'        => \$wait,
49     'output:s'      => \$output,
50 );
51
52 abort('must specify --action') unless defined $action;
53 abort('must specify --schema') unless defined $schema;
54 abort('must specify --db') unless defined $db;
55 abort('must specify --dbuser') unless defined $dbuser;
56 abort('must specify --dbhost') unless defined $dbhost;
57 abort('must specify --dbpw') unless defined $dbpw;
58 abort('must specify --batch') unless defined $batch;
59
60 abort(q{--action must be "stage_bibs", "filter_bibs", "load_bibs", "stage_auths",
61 "match_auths", "load_new_auths", "overlay_auths_stage1",
62 "overlay_auths_stage2", "overlay_auths_stage3", "link_auth_auth",
63 "link_auth_bib", "export_skipped_bibs", or "export_skipped_auths"}) unless
64     $action eq 'filter_bibs' or
65     $action eq 'stage_bibs' or
66     $action eq 'load_bibs' or
67     $action eq 'stage_auths' or
68     $action eq 'match_auths' or
69     $action eq 'load_new_auths' or
70     $action eq 'overlay_auths_stage1' or
71     $action eq 'overlay_auths_stage2' or
72     $action eq 'overlay_auths_stage3' or
73     $action eq 'link_auth_auth' or
74     $action eq 'link_auth_bib' or
75     $action eq 'export_skipped_bibs' or
76     $action eq 'export_skipped_auths'
77 ;
78
79 my $dbh = connect_db($db, $dbuser, $dbpw, $dbhost);
80
81 if ($action eq 'stage_bibs') {
82     abort('must specify at least one input file') unless @ARGV;
83     handle_stage_bibs($dbh, $schema, $batch);
84 }
85
86 if ($action eq 'filter_bibs') {
87     abort('must specify cutoff date when filtering') unless defined $cutoff;
88     handle_filter_bibs($dbh, $schema, $batch, $cutoff);
89 }
90
91 if ($action eq 'load_bibs' ) {
92     handle_load_bibs($dbh, $schema, $batch, $wait);
93 }
94
95 if ($action eq 'stage_auths') {
96     abort('must specify at least one input file') unless @ARGV;
97     handle_stage_auths($dbh, $schema, $batch);
98 }
99
100 if ($action eq 'match_auths') {
101     handle_match_auths($dbh, $schema, $batch);
102 }
103
104 if ($action eq 'load_new_auths') {
105     handle_load_new_auths($dbh, $schema, $batch);
106 }
107
108 if ($action eq 'overlay_auths_stage1') {
109     handle_overlay_auths_stage1($dbh, $schema, $batch);
110 }
111 if ($action eq 'overlay_auths_stage2') {
112     handle_overlay_auths_stage2($dbh, $schema, $batch);
113 }
114 if ($action eq 'overlay_auths_stage3') {
115     handle_overlay_auths_stage3($dbh, $schema, $batch);
116 }
117
118 if ($action eq 'link_auth_auth') {
119     handle_link_auth_auth($dbh, $schema, $batch);
120 }
121 if ($action eq 'link_auth_bib') {
122     handle_link_auth_bib($dbh, $schema, $batch);
123 }
124
125 if ($action eq 'export_skipped_bibs') {
126     abort('must specify output file') unless defined $output;
127     handle_export_skipped_bibs($dbh, $schema, $batch, $output);
128 }
129 if ($action eq 'export_skipped_auths') {
130     abort('must specify output file') unless defined $output;
131     handle_export_skipped_auths($dbh, $schema, $batch, $output);
132 }
133
134 sub abort {
135     my $msg = shift;
136     print STDERR "$0: $msg", "\n";
137     print_usage();
138     exit 1;
139 }
140
141 sub print_usage {
142     print <<_USAGE_;
143
144 Utility to stage and overlay bib records in an Evergreen database. This
145 expects that the incoming records will have been previously exported
146 from that Evergreen database and modified in some fashion (e.g., for
147 authority record processing) and that the bib ID can be found in the
148 901\$c subfield.
149
150 This program has several modes controlled by the --action switch:
151
152   --action stage_bibs  - load MARC bib records into a staging table
153   --action filter_bibs - mark previously staged bibs that should
154                          be excluded from a subsequent load, either
155                          because the target bib is deleted in Evergreen
156                          or the record was modified after a date
157                          specified by the --cutoff switch
158   --action load_bibs   - overlay bib records using a previously staged
159                          batch, one at a time. After each bib, it will
160                          wait the number of seconds specified by the
161                          --wait switch.
162
163   --action stage_auths          - load MARC authorities into staging
164                                   table
165   --action match_auths          - identify matches with authority
166                                   records already present in the
167                                   Evergreen database; matching is
168                                   based on LCCN, cancelled LCCN, and
169                                   main heading.
170   --action load_new_auths       - load new (unmatched) authorities
171   --action overlay_auths_stage1 - overlay based on LCCN where
172                                   heading has NOT changed; this step
173                                   disables propagation to bib records
174   --action overlay_auths_stage2 - overlay based on LCCN where heading
175                                   HAS changed; propagates changes
176                                   to bib records
177   --action overlay_auths_stage3 - overlay for records where a cancelled
178                                   LCCN is replaced with a new one
179   --action link_auth_auth       - run authority_authority_linker.pl for
180                                   the authorities that were overlaid
181                                   or added in this batch.
182   --action link_auth_bib        - run authority_control_fields.pl for
183                                   the bibs that were overlaid in this
184                                   batch.
185   --action export_skipped_bibs  - export to ISO2709 file whose name is
186                                   specified by --output those bibs
187                                   that had been edited after the cutoff.
188   --action export_skipped_auths - export to ISO2709 file whose name is
189                                   specified by --output those authorities
190                                   that could not be definitively
191                                   handled as updates or adds.
192
193 Several switches are used regardless of the specified action:
194
195   --schema  - Pg schema in which staging table will live; should be
196               created beforehand
197   --batch   - name of bib batch; will also be used as the name
198               of the staging tables
199   --db      - database name
200   --dbuser  - database user
201   --dbpw    - database password
202   --dbhost  - database host
203
204 Examples:
205
206 $0 --schema bib_load --batch bibs_2016_01 --db evergreen \\
207    --dbuser evergreen --dbpw evergreen --dbhost localhost \\
208    --action stage_bibs -- file1.mrc file2.mrc [...]
209
210 $0 --schema bib_load --batch bibs_2016_01 --db evergreen \\
211    --dbuser evergreen --dbpw evergreen --dbhost localhost \\
212    --action filter_bibs --cutoff 2016-01-02
213
214 $0 --schema bib_load --batch bibs_2016_01 --db evergreen \\
215    --dbuser evergreen --dbpw evergreen --dbhost localhost \\
216    --action load_bibs --wait 2
217
218 _USAGE_
219 }
220
221
222 sub report_progress {
223     my ($msg, $counter) = @_;
224     if (defined $counter) {
225         print STDERR "$msg: $counter\n";
226     } else {
227         print STDERR "$msg\n";
228     }
229 }
230
231 sub connect_db {
232     my ($db, $dbuser, $dbpw, $dbhost) = @_;
233
234     my $dsn = "dbi:Pg:host=$dbhost;dbname=$db;port=5432";
235
236     my $attrs = {
237         ShowErrorStatement => 1,
238         RaiseError => 1,
239         PrintError => 1,
240         pg_enable_utf8 => 1,
241     };
242     my $dbh = DBI->connect($dsn, $dbuser, $dbpw, $attrs);
243
244     return $dbh;
245 }
246
247 sub handle_stage_bibs {
248     my $dbh = shift;
249     my $schema = shift;
250     my $batch = shift;
251
252     $dbh->do(qq{
253         DROP TABLE IF EXISTS $schema.$batch;
254     });
255     $dbh->do(qq{
256         CREATE TABLE $schema.$batch (
257             id          SERIAL,
258             marc        TEXT,
259             bib_id      BIGINT,
260             imported    BOOLEAN DEFAULT FALSE,
261             to_import   BOOLEAN DEFAULT TRUE,
262             skip_reason TEXT
263         )
264     });
265
266     local $/ = "\035";
267     my $i = 0;
268     binmode STDIN, ':utf8';
269     my $ins = $dbh->prepare("INSERT INTO $schema.$batch (marc, bib_id) VALUES (?, ?)");
270     $dbh->begin_work;
271     while (<>) {
272         $i++;
273         if (0 == $i % 100) {
274             report_progress("Records staged", $i);
275             $dbh->commit;
276             $dbh->begin_work;
277         }
278         my $marc = MARC::Record->new_from_usmarc($_);
279         my $bibid = $marc->subfield('901', 'c');
280         if ($bibid !~ /^\d+$/) {
281             print STDERR "Record $i is suspect; skipping\n";
282             next;
283         }
284         my $xml = OpenILS::Application::AppUtils->entityize($marc->as_xml_record());
285         $ins->execute($xml, $bibid);
286     }
287     $dbh->commit;
288     report_progress("Records staged", $i) if 0 != $i % 100;
289     $dbh->do(qq/
290         CREATE INDEX ${batch}_bib_id_idx ON
291             $schema.$batch (bib_id);
292     /);
293     $dbh->do(qq/
294         CREATE INDEX ${batch}_id_idx ON
295             $schema.$batch (id);
296     /);
297 }
298
299 sub handle_filter_bibs {
300     my $dbh = shift;
301     my $schema = shift;
302     my $batch = shift;
303     my $cutoff = shift;
304
305     my $sth1 = $dbh->prepare(qq{
306         UPDATE $schema.$batch
307         SET to_import = FALSE,
308             skip_reason = 'deleted'
309         WHERE bib_id IN (
310             SELECT id
311             FROM biblio.record_entry
312             WHERE deleted
313         )
314         AND to_import
315         AND NOT imported
316     });
317     $sth1->execute();
318     my $ct = $sth1->rows;
319     report_progress("Filtering out $ct records that are currently deleted");
320
321     my $sth2 = $dbh->prepare(qq{
322         UPDATE $schema.$batch
323         SET to_import = FALSE,
324             skip_reason = 'edited after cutoff of $cutoff'
325         WHERE bib_id IN (
326             SELECT id
327             FROM biblio.record_entry
328             WHERE edit_date >= ?
329         )
330         AND to_import
331         AND NOT imported
332     });
333     $sth2->execute($cutoff);
334     $ct = $sth2->rows;
335     report_progress("Filtering out $ct records edited after cutoff date of $cutoff");
336
337     my $sth3 = $dbh->prepare(qq{
338         UPDATE $schema.$batch
339         SET to_import = FALSE,
340             skip_reason = 'XML is not well-formed'
341         WHERE NOT xml_is_well_formed(marc)
342         AND to_import
343         AND NOT imported
344     });
345     $sth3->execute();
346     $ct = $sth3->rows;
347     report_progress("Filtering out $ct records whose XML is not well-formed");
348 }
349
350 sub handle_load_bibs {
351     my $dbh = shift;
352     my $schema = shift;
353     my $batch = shift;
354     my $wait = shift;
355
356     my $getct = $dbh->prepare(qq{
357         SELECT COUNT(*)
358         FROM  $schema.$batch
359         WHERE to_import
360         AND NOT imported
361     });
362     $getct->execute();
363     my $max = $getct->fetchrow_arrayref()->[0];
364
365     report_progress('Number of bibs to update', $max);
366     for (my $i = 1; $i <= $max; $i++) {
367         report_progress('... bibs updated', $i) if 0 == $i % 10 or $i == $max;
368         $dbh->begin_work;
369         $dbh->do(qq{
370             UPDATE biblio.record_entry a
371             SET marc = b.marc
372             FROM $schema.$batch b
373             WHERE a.id = b.bib_id
374             AND bib_id IN (
375                 SELECT bib_id
376                 FROM $schema.$batch
377                 WHERE to_import
378                 AND NOT imported
379                 ORDER BY id
380                 LIMIT 1
381             )
382         });
383         $dbh->do(qq{
384             UPDATE $schema.$batch
385             SET imported = TRUE
386             WHERE bib_id IN (
387                 SELECT bib_id
388                 FROM $schema.$batch
389                 WHERE to_import
390                 AND NOT imported
391                 ORDER BY id
392                 LIMIT 1
393             )
394         });
395         $dbh->commit;
396         sleep $wait;
397     }
398 }
399
400 sub handle_stage_auths {
401     my $dbh = shift;
402     my $schema = shift;
403     my $batch = shift;
404
405     $dbh->do(qq{
406         DROP TABLE IF EXISTS $schema.auths_$batch;
407     });
408     $dbh->do(qq{
409         CREATE TABLE $schema.auths_$batch (
410             id          SERIAL,
411             marc        TEXT,
412             auth_id     BIGINT,
413             new_auth_id BIGINT,
414             existing_heading TEXT,
415             lccn        TEXT,
416             cancelled_lccn TEXT,
417             cancelled_auth_id BIGINT,
418             heading     TEXT,
419             lccn_matched BOOLEAN DEFAULT FALSE,
420             heading_matched BOOLEAN DEFAULT FALSE,
421             imported    BOOLEAN DEFAULT FALSE,
422             to_import   BOOLEAN DEFAULT TRUE,
423             skip_reason TEXT
424         )
425     });
426
427     local $/ = "\035";
428     my $i = 0;
429     binmode STDIN, ':utf8';
430     my $ins = $dbh->prepare(qq{
431         INSERT INTO $schema.auths_$batch (marc, auth_id, lccn, cancelled_lccn, heading)
432         VALUES (?, ?, ?, ?, authority.normalize_heading(?))
433     });
434     $dbh->begin_work;
435     while (<>) {
436         $i++;
437         if (0 == $i % 100) {
438             report_progress("Records staged", $i);
439             $dbh->commit;
440             $dbh->begin_work;
441         }
442         my $marc = MARC::Record->new_from_usmarc($_);
443         my $authid = $marc->subfield('901', 'c');
444         if (defined($authid) && $authid !~ /^\d+$/) {
445             undef $authid;
446         }
447         my $lccn = $marc->subfield('010', 'a');
448         if (defined $lccn) {
449             $lccn =~ s/^\s+//;
450             $lccn =~ s/\s+$//;
451             $lccn =~ s/\s+/ /g;
452         }
453         my $cancelled_lccn = $marc->subfield('010', 'z');
454         if (defined $cancelled_lccn) {
455             $cancelled_lccn =~ s/^\s+//;
456             $cancelled_lccn =~ s/\s+$//;
457             $cancelled_lccn =~ s/\s+/ /g;
458         }
459         my $xml = OpenILS::Application::AppUtils->entityize($marc->as_xml_record());
460         $ins->execute($xml, $authid, $lccn, $cancelled_lccn, $xml);
461     }
462     $dbh->commit;
463     report_progress("Records staged", $i) if 0 != $i % 100;
464     $dbh->do(qq/
465         CREATE INDEX auths_${batch}_auth_id_idx ON
466             $schema.auths_$batch (auth_id);
467     /);
468     $dbh->do(qq/
469         CREATE INDEX auths_${batch}_id_idx ON
470             $schema.auths_$batch (id);
471     /);
472     $dbh->do(qq/
473         CREATE INDEX auths_${batch}_lccn_idx ON
474             $schema.auths_$batch (lccn);
475     /);
476 }
477
478 sub handle_match_auths {
479     my ($dbh, $schema, $batch) = @_;
480
481     my $sth = $dbh->prepare(qq{
482         UPDATE $schema.auths_${batch} a
483         SET auth_id = b.record,
484             lccn_matched = TRUE,
485             existing_heading = authority.normalize_heading(c.marc)
486         FROM authority.full_rec b
487         JOIN authority.record_entry c ON (b.record = c.id)
488         WHERE tag = '010'
489         AND   subfield = 'a'
490         AND   value = lccn
491         AND   auth_id IS NULL
492         AND   lccn IS NOT NULL;
493     });
494     $sth->execute();
495     my $ct = $sth->rows;
496     report_progress("Matched $ct authorities on LCCN");
497
498     $sth = $dbh->prepare(qq{
499         UPDATE $schema.auths_${batch} a
500         SET cancelled_auth_id = b.record
501         FROM authority.full_rec b
502         WHERE tag = '010'
503         AND   subfield = 'a'
504         AND   value = cancelled_lccn
505         AND   auth_id IS NULL
506         AND   cancelled_lccn IS NOT NULL;
507     });
508     $sth->execute();
509     $ct = $sth->rows;
510     report_progress("Matched $ct authorities on cancelled LCCN");
511
512     $sth = $dbh->prepare(qq{
513         UPDATE $schema.auths_$batch a
514         SET auth_id = b.id,
515             heading_matched = TRUE,
516             existing_heading = b.heading
517         FROM authority.record_entry b
518         WHERE a.heading = b.heading
519         AND   auth_id IS NULL;
520     });
521     $sth->execute();
522     $ct = $sth->rows;
523     report_progress("Matched $ct authorities on heading");
524 }
525
526 sub handle_load_new_auths {
527     my $dbh = shift;
528     my $schema = shift;
529     my $batch = shift;
530
531     my $getct = $dbh->prepare(qq{
532         SELECT COUNT(*)
533         FROM  $schema.auths_$batch
534         WHERE to_import
535         AND NOT imported
536         AND new_auth_id IS NULL
537         AND auth_id IS NULL
538         AND cancelled_auth_id IS NULL
539     });
540     $getct->execute();
541     my $max = $getct->fetchrow_arrayref()->[0];
542
543     report_progress('Number of authorities to add', $max);
544     for (my $i = 1; $i <= $max; $i++) {
545         report_progress('... authorities added', $i) if 0 == $i % 10 or $i == $max;
546         $dbh->begin_work;
547         $dbh->do(qq{
548             INSERT INTO authority.record_entry (marc, last_xact_id)
549             SELECT marc, ? || '-' || id
550             FROM $schema.auths_$batch b
551             WHERE id IN (
552                 SELECT id
553                 FROM $schema.auths_$batch
554                 WHERE to_import
555                 AND NOT imported
556                 AND new_auth_id IS NULL
557                 AND auth_id IS NULL
558                 AND cancelled_auth_id IS NULL
559                 ORDER BY id
560                 LIMIT 1
561             )
562         }, {}, "auths_$batch");
563         $dbh->do(qq{
564             UPDATE $schema.auths_$batch
565             SET imported = TRUE,
566                 new_auth_id = CURRVAL('authority.record_entry_id_seq')
567             WHERE id IN (
568                 SELECT id
569                 FROM $schema.auths_$batch
570                 WHERE to_import
571                 AND NOT imported
572                 AND new_auth_id IS NULL
573                 AND auth_id IS NULL
574                 AND cancelled_auth_id IS NULL
575                 ORDER BY id
576                 LIMIT 1
577             )
578         });
579         $dbh->commit;
580     }
581 }
582
583 sub handle_overlay_auths_stage1 {
584     my $dbh = shift;
585     my $schema = shift;
586     my $batch = shift;
587
588     my $getct = $dbh->prepare(qq{
589         SELECT COUNT(*)
590         FROM  $schema.auths_$batch
591         WHERE to_import
592         AND NOT imported
593         AND lccn_matched
594         AND heading = existing_heading
595     });
596     $getct->execute();
597     my $max = $getct->fetchrow_arrayref()->[0];
598     report_progress('Number of auths to update', $max);
599
600     $dbh->do(q{
601         UPDATE config.internal_flag SET enabled = TRUE
602         WHERE name = 'ingest.disable_authority_auto_update';
603     });
604     for (my $i = 1; $i <= $max; $i++) {
605         report_progress('... auths updated', $i) if 0 == $i % 10 or $i == $max;
606         $dbh->begin_work;
607         $dbh->do(qq{
608             UPDATE authority.record_entry a
609             SET marc = b.marc,
610                 edit_date = NOW()
611             FROM $schema.auths_$batch b
612             WHERE a.id = b.auth_id
613             AND auth_id IN (
614                 SELECT auth_id
615                 FROM $schema.auths_$batch
616                 WHERE to_import
617                 AND NOT imported
618                 AND lccn_matched
619                 AND heading = existing_heading
620                 ORDER BY id
621                 LIMIT 1
622             )
623         });
624         $dbh->do(qq{
625             UPDATE $schema.auths_$batch
626             SET imported = TRUE
627             WHERE auth_id IN (
628                 SELECT auth_id
629                 FROM $schema.auths_$batch
630                 WHERE to_import
631                 AND NOT imported
632                 AND lccn_matched
633                 AND heading = existing_heading
634                 ORDER BY id
635                 LIMIT 1
636             )
637         });
638         $dbh->commit;
639     }
640     $dbh->do(q{
641         UPDATE config.internal_flag SET enabled = FALSE
642         WHERE name = 'ingest.disable_authority_auto_update';
643     });
644 }
645
646 sub handle_overlay_auths_stage2 {
647     my $dbh = shift;
648     my $schema = shift;
649     my $batch = shift;
650
651     my $getct = $dbh->prepare(qq{
652         SELECT COUNT(*)
653         FROM  $schema.auths_$batch
654         WHERE to_import
655         AND NOT imported
656         AND lccn_matched
657         AND heading <> existing_heading
658     });
659     $getct->execute();
660     my $max = $getct->fetchrow_arrayref()->[0];
661     report_progress('Number of auths to update', $max);
662
663     for (my $i = 1; $i <= $max; $i++) {
664         report_progress('... auths updated', $i) if 0 == $i % 10 or $i == $max;
665         $dbh->begin_work;
666         $dbh->do(qq{
667             UPDATE authority.record_entry a
668             SET marc = b.marc,
669                 edit_date = NOW()
670             FROM $schema.auths_$batch b
671             WHERE a.id = b.auth_id
672             AND auth_id IN (
673                 SELECT auth_id
674                 FROM $schema.auths_$batch
675                 WHERE to_import
676                 AND NOT imported
677                 AND lccn_matched
678                 AND heading <> existing_heading
679                 ORDER BY id
680                 LIMIT 1
681             )
682         });
683         $dbh->do(qq{
684             UPDATE $schema.auths_$batch
685             SET imported = TRUE
686             WHERE auth_id IN (
687                 SELECT auth_id
688                 FROM $schema.auths_$batch
689                 WHERE to_import
690                 AND NOT imported
691                 AND lccn_matched
692                 AND heading <> existing_heading
693                 ORDER BY id
694                 LIMIT 1
695             )
696         });
697         $dbh->commit;
698     }
699 }
700
701 sub handle_overlay_auths_stage3 {
702     my $dbh = shift;
703     my $schema = shift;
704     my $batch = shift;
705
706     my $getct = $dbh->prepare(qq{
707         SELECT COUNT(*)
708         FROM  $schema.auths_$batch
709         WHERE to_import
710         AND NOT imported
711         AND (
712             auth_id IS NULL OR
713             auth_id = cancelled_auth_id
714         )
715         AND cancelled_auth_id IS NOT NULL
716     });
717     $getct->execute();
718     my $max = $getct->fetchrow_arrayref()->[0];
719     report_progress('Number of auths to update', $max);
720
721     for (my $i = 1; $i <= $max; $i++) {
722         report_progress('... auths updated', $i) if 0 == $i % 10 or $i == $max;
723         $dbh->begin_work;
724         $dbh->do(qq{
725             UPDATE authority.record_entry a
726             SET marc = b.marc,
727                 edit_date = NOW()
728             FROM $schema.auths_$batch b
729             WHERE a.id = b.cancelled_auth_id
730             AND cancelled_auth_id IN (
731                 SELECT auth_id
732                 FROM $schema.auths_$batch
733                 WHERE to_import
734                 AND NOT imported
735                 AND (
736                     auth_id IS NULL OR
737                     auth_id = cancelled_auth_id
738                 )
739                 AND cancelled_auth_id IS NOT NULL
740                 ORDER BY id
741                 LIMIT 1
742             )
743         });
744         $dbh->do(qq{
745             UPDATE $schema.auths_$batch
746             SET imported = TRUE
747             WHERE cancelled_auth_id IN (
748                 SELECT cancelled_auth_id
749                 FROM $schema.auths_$batch
750                 WHERE to_import
751                 AND NOT imported
752                 AND (
753                     auth_id IS NULL OR
754                     auth_id = cancelled_auth_id
755                 )
756                 AND cancelled_auth_id IS NOT NULL
757                 ORDER BY id
758                 LIMIT 1
759             )
760         });
761         $dbh->commit;
762     }
763 }
764
765 sub handle_link_auth_auth {
766     my $dbh = shift;
767     my $schema = shift;
768     my $batch = shift;
769
770     $dbh->do(q{
771         UPDATE config.internal_flag SET enabled = TRUE
772         WHERE name = 'ingest.disable_authority_auto_update';
773     });
774
775     my $sth = $dbh->prepare(qq{
776         SELECT COALESCE(new_auth_id, auth_id, cancelled_auth_id) AS id
777         FROM $schema.auths_$batch
778         WHERE imported
779         ORDER BY 1
780     });
781     $sth->execute();
782     my @ids = map { $_->{id} } @{ $sth->fetchall_arrayref({}) };
783     my $i = 0;
784     report_progress(scalar(@ids) . " records to do auth-auth linking");
785     foreach my $id (@ids) {
786         $i++;
787         report_progress('... auth-auth linkings processed', $i) if 0 == $i % 10 or $i == scalar(@ids);
788         system "/openils/bin/authority_authority_linker.pl -r $id -c /openils/conf/opensrf_core.xml";
789     }
790
791     $dbh->do(q{
792         UPDATE config.internal_flag SET enabled = FALSE
793         WHERE name = 'ingest.disable_authority_auto_update';
794     });
795 }
796
797 sub handle_link_auth_bib {
798     my $dbh = shift;
799     my $schema = shift;
800     my $batch = shift;
801
802     my $sth = $dbh->prepare(qq{
803         SELECT bib_id AS id
804         FROM $schema.$batch
805         WHERE imported
806         ORDER BY 1
807     });
808     $sth->execute();
809     my @ids = map { $_->{id} } @{ $sth->fetchall_arrayref({}) };
810     my $i = 0;
811     report_progress(scalar(@ids) . " records to do auth-bib linking");
812     foreach my $id (@ids) {
813         $i++;
814         report_progress('... auth-bib linkings processed', $i) if 0 == $i % 10 or $i == scalar(@ids);
815         system "/openils/bin/authority_control_fields.pl --record $id -c /openils/conf/opensrf_core.xml";
816     }
817
818 }
819
820 sub handle_export_skipped_bibs {
821     my $dbh = shift;
822     my $schema = shift;
823     my $batch = shift;
824     my $output = shift;
825
826     my $outfh;
827     open($outfh, '>', $output) or die("Could not open input file $output: $!\n");
828     binmode $outfh, ':utf8';
829
830     my $sth = $dbh->prepare(qq{
831         SELECT marc
832         FROM $schema.$batch
833         WHERE skip_reason ~ '^edit'
834         ORDER BY id
835     });
836     $sth->execute();
837    
838     while (my $row  = $sth->fetchrow_hashref()) {
839         my $marc = MARC::Record->new_from_xml($row->{marc});
840         print $outfh $marc->as_usmarc();
841     }
842     $outfh->close();
843 }
844
845 sub handle_export_skipped_auths {
846     my $dbh = shift;
847     my $schema = shift;
848     my $batch = shift;
849     my $output = shift;
850
851     my $outfh;
852     open($outfh, '>', $output) or die("Could not open input file $output: $!\n");
853     binmode $outfh, ':utf8';
854
855     my $sth = $dbh->prepare(qq{
856         SELECT marc
857         FROM $schema.auths_$batch
858         WHERE NOT imported
859         ORDER BY id
860     });
861     $sth->execute();
862    
863     while (my $row  = $sth->fetchrow_hashref()) {
864         my $marc = MARC::Record->new_from_xml($row->{marc});
865         print $outfh $marc->as_usmarc();
866     }
867     $outfh->close();
868 }