tlc mapping sql
[migration-tools.git] / eg_staged_bib_overlay
1 #!/usr/bin/perl
2
3 # Copyright (c) 2016 Equinox Software, Inc.
4 # Author: Galen Charlton <gmc@esilibrary.com>
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2, or (at your option)
9 # any later version.
10 #
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 # GNU General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program.  If not, see <http://www.gnu.org/licenses/>
18
19 use strict;
20 use warnings;
21
22 use Getopt::Long;
23 use MARC::Record;
24 use MARC::File::XML (BinaryEncoding => 'utf8');
25 use DBI;
26 use OpenILS::Application::AppUtils;
27
28 my $action;
29 my $schema = 'bib_loads';
30 my $db;
31 my $dbuser;
32 my $dbpw;
33 my $dbhost;
34 my $batch;
35 my $cutoff;
36 my $wait = 0;
37 my $output;
38 my $link_skipped;
39
40 my $ret = GetOptions(
41     'action:s'      => \$action,
42     'schema:s'      => \$schema,
43     'db:s'          => \$db,
44     'dbuser:s'      => \$dbuser,
45     'dbhost:s'      => \$dbhost,
46     'dbpw:s'        => \$dbpw,
47     'batch:s'       => \$batch,
48     'cutoff:s'      => \$cutoff,
49     'wait:i'        => \$wait,
50     'output:s'      => \$output,
51     'link-skipped'  => \$link_skipped,
52 );
53
54 abort('must specify --action') unless defined $action;
55 abort('must specify --schema') unless defined $schema;
56 abort('must specify --db') unless defined $db;
57 abort('must specify --dbuser') unless defined $dbuser;
58 abort('must specify --dbhost') unless defined $dbhost;
59 abort('must specify --dbpw') unless defined $dbpw;
60 abort('must specify --batch') unless defined $batch;
61
62 abort(q{--action must be "stage_bibs", "filter_bibs", "load_bibs", "stage_auths",
63 "match_auths", "load_new_auths", "overlay_auths_stage1",
64 "overlay_auths_stage2", "overlay_auths_stage3", "link_auth_auth",
65 "link_auth_bib", "export_skipped_bibs", or "export_skipped_auths"}) unless
66     $action eq 'filter_bibs' or
67     $action eq 'stage_bibs' or
68     $action eq 'load_bibs' or
69     $action eq 'stage_auths' or
70     $action eq 'match_auths' or
71     $action eq 'load_new_auths' or
72     $action eq 'overlay_auths_stage1' or
73     $action eq 'overlay_auths_stage2' or
74     $action eq 'overlay_auths_stage3' or
75     $action eq 'link_auth_auth' or
76     $action eq 'link_auth_bib' or
77     $action eq 'export_skipped_bibs' or
78     $action eq 'export_skipped_auths'
79 ;
80
81 my $dbh = connect_db($db, $dbuser, $dbpw, $dbhost);
82
83 if ($action eq 'stage_bibs') {
84     abort('must specify at least one input file') unless @ARGV;
85     handle_stage_bibs($dbh, $schema, $batch);
86 }
87
88 if ($action eq 'filter_bibs') {
89     abort('must specify cutoff date when filtering') unless defined $cutoff;
90     handle_filter_bibs($dbh, $schema, $batch, $cutoff);
91 }
92
93 if ($action eq 'load_bibs' ) {
94     handle_load_bibs($dbh, $schema, $batch, $wait);
95 }
96
97 if ($action eq 'stage_auths') {
98     abort('must specify at least one input file') unless @ARGV;
99     handle_stage_auths($dbh, $schema, $batch);
100 }
101
102 if ($action eq 'match_auths') {
103     handle_match_auths($dbh, $schema, $batch);
104 }
105
106 if ($action eq 'load_new_auths') {
107     handle_load_new_auths($dbh, $schema, $batch);
108 }
109
110 if ($action eq 'overlay_auths_stage1') {
111     handle_overlay_auths_stage1($dbh, $schema, $batch);
112 }
113 if ($action eq 'overlay_auths_stage2') {
114     handle_overlay_auths_stage2($dbh, $schema, $batch);
115 }
116 if ($action eq 'overlay_auths_stage3') {
117     handle_overlay_auths_stage3($dbh, $schema, $batch);
118 }
119
120 if ($action eq 'link_auth_auth') {
121     handle_link_auth_auth($dbh, $schema, $batch);
122 }
123 if ($action eq 'link_auth_bib') {
124     handle_link_auth_bib($dbh, $schema, $batch, $link_skipped);
125 }
126
127 if ($action eq 'export_skipped_bibs') {
128     abort('must specify output file') unless defined $output;
129     handle_export_skipped_bibs($dbh, $schema, $batch, $output);
130 }
131 if ($action eq 'export_skipped_auths') {
132     abort('must specify output file') unless defined $output;
133     handle_export_skipped_auths($dbh, $schema, $batch, $output);
134 }
135
136 sub abort {
137     my $msg = shift;
138     print STDERR "$0: $msg", "\n";
139     print_usage();
140     exit 1;
141 }
142
143 sub print_usage {
144     print <<_USAGE_;
145
146 Utility to stage and overlay bib records in an Evergreen database. This
147 expects that the incoming records will have been previously exported
148 from that Evergreen database and modified in some fashion (e.g., for
149 authority record processing) and that the bib ID can be found in the
150 901\$c subfield.
151
152 This program has several modes controlled by the --action switch:
153
154   --action stage_bibs  - load MARC bib records into a staging table
155   --action filter_bibs - mark previously staged bibs that should
156                          be excluded from a subsequent load, either
157                          because the target bib is deleted in Evergreen
158                          or the record was modified after a date
159                          specified by the --cutoff switch
160   --action load_bibs   - overlay bib records using a previously staged
161                          batch, one at a time. After each bib, it will
162                          wait the number of seconds specified by the
163                          --wait switch.
164
165   --action stage_auths          - load MARC authorities into staging
166                                   table
167   --action match_auths          - identify matches with authority
168                                   records already present in the
169                                   Evergreen database; matching is
170                                   based on LCCN, cancelled LCCN, and
171                                   main heading.
172   --action load_new_auths       - load new (unmatched) authorities
173   --action overlay_auths_stage1 - overlay based on LCCN where
174                                   heading has NOT changed; this step
175                                   disables propagation to bib records
176   --action overlay_auths_stage2 - overlay based on LCCN where heading
177                                   HAS changed; propagates changes
178                                   to bib records
179   --action overlay_auths_stage3 - overlay for records where a cancelled
180                                   LCCN is replaced with a new one
181   --action link_auth_auth       - run authority_authority_linker.pl for
182                                   the authorities that were overlaid
183                                   or added in this batch.
184   --action link_auth_bib        - run authority_control_fields.pl for
185                                   the bibs that were overlaid in this
186                                   batch.  Add --link-skipped to specify
187                                   that bibs that were matched but
188                                   skipped due to having be edited after
189                                   the cutoff should be linked (rather
190                                   than linking the imported bibs)
191   --action export_skipped_bibs  - export to ISO2709 file whose name is
192                                   specified by --output those bibs
193                                   that had been edited after the cutoff.
194   --action export_skipped_auths - export to ISO2709 file whose name is
195                                   specified by --output those authorities
196                                   that could not be definitively
197                                   handled as updates or adds.
198
199 Several switches are used regardless of the specified action:
200
201   --schema  - Pg schema in which staging table will live; should be
202               created beforehand
203   --batch   - name of bib batch; will also be used as the name
204               of the staging tables
205   --db      - database name
206   --dbuser  - database user
207   --dbpw    - database password
208   --dbhost  - database host
209
210 Examples:
211
212 $0 --schema bib_load --batch bibs_2016_01 --db evergreen \\
213    --dbuser evergreen --dbpw evergreen --dbhost localhost \\
214    --action stage_bibs -- file1.mrc file2.mrc [...]
215
216 $0 --schema bib_load --batch bibs_2016_01 --db evergreen \\
217    --dbuser evergreen --dbpw evergreen --dbhost localhost \\
218    --action filter_bibs --cutoff 2016-01-02
219
220 $0 --schema bib_load --batch bibs_2016_01 --db evergreen \\
221    --dbuser evergreen --dbpw evergreen --dbhost localhost \\
222    --action load_bibs --wait 2
223
224 _USAGE_
225 }
226
227
228 sub report_progress {
229     my ($msg, $counter) = @_;
230     if (defined $counter) {
231         print STDERR "$msg: $counter\n";
232     } else {
233         print STDERR "$msg\n";
234     }
235 }
236
237 sub connect_db {
238     my ($db, $dbuser, $dbpw, $dbhost) = @_;
239
240     my $dsn = "dbi:Pg:host=$dbhost;dbname=$db;port=5432";
241
242     my $attrs = {
243         ShowErrorStatement => 1,
244         RaiseError => 1,
245         PrintError => 1,
246         pg_enable_utf8 => 1,
247     };
248     my $dbh = DBI->connect($dsn, $dbuser, $dbpw, $attrs);
249
250     return $dbh;
251 }
252
253 sub handle_stage_bibs {
254     my $dbh = shift;
255     my $schema = shift;
256     my $batch = shift;
257
258     $dbh->do(qq{
259         DROP TABLE IF EXISTS $schema.$batch;
260     });
261     $dbh->do(qq{
262         CREATE TABLE $schema.$batch (
263             id          SERIAL,
264             marc        TEXT,
265             bib_id      BIGINT,
266             imported    BOOLEAN DEFAULT FALSE,
267             to_import   BOOLEAN DEFAULT TRUE,
268             skip_reason TEXT
269         )
270     });
271
272     local $/ = "\035";
273     my $i = 0;
274     binmode STDIN, ':utf8';
275     my $ins = $dbh->prepare("INSERT INTO $schema.$batch (marc, bib_id) VALUES (?, ?)");
276     $dbh->begin_work;
277     while (<>) {
278         $i++;
279         if (0 == $i % 100) {
280             report_progress("Records staged", $i);
281             $dbh->commit;
282             $dbh->begin_work;
283         }
284         eval {
285             my $marc = MARC::Record->new_from_usmarc($_);
286             my $bibid = $marc->subfield('901', 'c');
287             if ($bibid !~ /^\d+$/) {
288                 die('Subfield 901$c is not numeric or missing.');
289             }
290             my $xml = OpenILS::Application::AppUtils->entityize($marc->as_xml_record());
291             $ins->execute($xml, $bibid);
292         };
293         if ($@) {
294             warn("Record $i is bad: $@; skipping.");
295             next;
296         }
297     }
298     $dbh->commit;
299     report_progress("Records staged", $i) if 0 != $i % 100;
300     $dbh->do(qq/
301         CREATE INDEX ${batch}_bib_id_idx ON
302             $schema.$batch (bib_id);
303     /);
304     $dbh->do(qq/
305         CREATE INDEX ${batch}_id_idx ON
306             $schema.$batch (id);
307     /);
308 }
309
310 sub handle_filter_bibs {
311     my $dbh = shift;
312     my $schema = shift;
313     my $batch = shift;
314     my $cutoff = shift;
315
316     my $sth1 = $dbh->prepare(qq{
317         UPDATE $schema.$batch
318         SET to_import = FALSE,
319             skip_reason = 'deleted'
320         WHERE bib_id IN (
321             SELECT id
322             FROM biblio.record_entry
323             WHERE deleted
324         )
325         AND to_import
326         AND NOT imported
327     });
328     $sth1->execute();
329     my $ct = $sth1->rows;
330     report_progress("Filtering out $ct records that are currently deleted");
331
332     my $sth2 = $dbh->prepare(qq{
333         UPDATE $schema.$batch
334         SET to_import = FALSE,
335             skip_reason = 'edited after cutoff of $cutoff'
336         WHERE bib_id IN (
337             SELECT id
338             FROM biblio.record_entry
339             WHERE edit_date >= ?
340         )
341         AND to_import
342         AND NOT imported
343     });
344     $sth2->execute($cutoff);
345     $ct = $sth2->rows;
346     report_progress("Filtering out $ct records edited after cutoff date of $cutoff");
347
348     my $sth3 = $dbh->prepare(qq{
349         UPDATE $schema.$batch
350         SET to_import = FALSE,
351             skip_reason = 'XML is not well-formed'
352         WHERE NOT xml_is_well_formed(marc)
353         AND to_import
354         AND NOT imported
355     });
356     $sth3->execute();
357     $ct = $sth3->rows;
358     report_progress("Filtering out $ct records whose XML is not well-formed");
359 }
360
361 sub handle_load_bibs {
362     my $dbh = shift;
363     my $schema = shift;
364     my $batch = shift;
365     my $wait = shift;
366
367     my $getct = $dbh->prepare(qq{
368         SELECT COUNT(*)
369         FROM  $schema.$batch
370         WHERE to_import
371         AND NOT imported
372     });
373     $getct->execute();
374     my $max = $getct->fetchrow_arrayref()->[0];
375
376     report_progress('Number of bibs to update', $max);
377     for (my $i = 1; $i <= $max; $i++) {
378         report_progress('... bibs updated', $i) if 0 == $i % 10 or $i == $max;
379         $dbh->begin_work;
380         $dbh->do(qq{
381             UPDATE biblio.record_entry a
382             SET marc = b.marc
383             FROM $schema.$batch b
384             WHERE a.id = b.bib_id
385             AND bib_id IN (
386                 SELECT bib_id
387                 FROM $schema.$batch
388                 WHERE to_import
389                 AND NOT imported
390                 ORDER BY bib_id DESC
391                 LIMIT 1
392             )
393         });
394         $dbh->do(qq{
395             UPDATE $schema.$batch
396             SET imported = TRUE
397             WHERE bib_id IN (
398                 SELECT bib_id
399                 FROM $schema.$batch
400                 WHERE to_import
401                 AND NOT imported
402                 ORDER BY id
403                 LIMIT 1
404             )
405         });
406         $dbh->commit;
407         sleep $wait;
408     }
409 }
410
411 sub handle_stage_auths {
412     my $dbh = shift;
413     my $schema = shift;
414     my $batch = shift;
415
416     $dbh->do(qq{
417         DROP TABLE IF EXISTS $schema.auths_$batch;
418     });
419     $dbh->do(qq{
420         CREATE TABLE $schema.auths_$batch (
421             id          SERIAL,
422             marc        TEXT,
423             auth_id     BIGINT,
424             new_auth_id BIGINT,
425             existing_heading TEXT,
426             lccn        TEXT,
427             cancelled_lccn TEXT,
428             cancelled_auth_id BIGINT,
429             heading     TEXT,
430             lccn_matched BOOLEAN DEFAULT FALSE,
431             heading_matched BOOLEAN DEFAULT FALSE,
432             imported    BOOLEAN DEFAULT FALSE,
433             to_import   BOOLEAN DEFAULT TRUE,
434             skip_reason TEXT
435         )
436     });
437
438     local $/ = "\035";
439     my $i = 0;
440     binmode STDIN, ':utf8';
441     my $ins = $dbh->prepare(qq{
442         INSERT INTO $schema.auths_$batch (marc, auth_id, lccn, cancelled_lccn, heading)
443         VALUES (?, ?, ?, ?, authority.normalize_heading(?))
444     });
445     $dbh->begin_work;
446     while (<>) {
447         $i++;
448         if (0 == $i % 100) {
449             report_progress("Records staged", $i);
450             $dbh->commit;
451             $dbh->begin_work;
452         }
453         eval {
454             my $marc = MARC::Record->new_from_usmarc($_);
455             my $authid = $marc->subfield('901', 'c');
456             if (defined($authid) && $authid !~ /^\d+$/) {
457                 undef $authid;
458             }
459             my $lccn = $marc->subfield('010', 'a');
460             if (defined $lccn) {
461                 $lccn =~ s/^\s+//;
462                 $lccn =~ s/\s+$//;
463                 $lccn =~ s/\s+/ /g;
464             }
465             my $cancelled_lccn = $marc->subfield('010', 'z');
466             if (defined $cancelled_lccn) {
467                 $cancelled_lccn =~ s/^\s+//;
468                 $cancelled_lccn =~ s/\s+$//;
469                 $cancelled_lccn =~ s/\s+/ /g;
470             }
471             my $xml = OpenILS::Application::AppUtils->entityize($marc->as_xml_record());
472             $ins->execute($xml, $authid, $lccn, $cancelled_lccn, $xml);
473         };
474         if ($@) {
475             warn("Record $i is bad: $@; skipping.");
476             next;
477         }
478     }
479     $dbh->commit;
480     report_progress("Records staged", $i) if 0 != $i % 100;
481     $dbh->do(qq/
482         CREATE INDEX auths_${batch}_auth_id_idx ON
483             $schema.auths_$batch (auth_id);
484     /);
485     $dbh->do(qq/
486         CREATE INDEX auths_${batch}_id_idx ON
487             $schema.auths_$batch (id);
488     /);
489     $dbh->do(qq/
490         CREATE INDEX auths_${batch}_lccn_idx ON
491             $schema.auths_$batch (lccn);
492     /);
493 }
494
495 sub handle_match_auths {
496     my ($dbh, $schema, $batch) = @_;
497
498     my $sth = $dbh->prepare(qq{
499         UPDATE $schema.auths_${batch} a
500         SET auth_id = b.record,
501             lccn_matched = TRUE,
502             existing_heading = authority.normalize_heading(c.marc)
503         FROM authority.full_rec b
504         JOIN authority.record_entry c ON (b.record = c.id)
505         WHERE tag = '010'
506         AND   subfield = 'a'
507         AND   value = lccn
508         AND   auth_id IS NULL
509         AND   lccn IS NOT NULL;
510     });
511     $sth->execute();
512     my $ct = $sth->rows;
513     report_progress("Matched $ct authorities on LCCN");
514
515     $sth = $dbh->prepare(qq{
516         UPDATE $schema.auths_${batch} a
517         SET cancelled_auth_id = b.record
518         FROM authority.full_rec b
519         WHERE tag = '010'
520         AND   subfield = 'a'
521         AND   value = cancelled_lccn
522         AND   auth_id IS NULL
523         AND   cancelled_lccn IS NOT NULL;
524     });
525     $sth->execute();
526     $ct = $sth->rows;
527     report_progress("Matched $ct authorities on cancelled LCCN");
528
529     $sth = $dbh->prepare(qq{
530         UPDATE $schema.auths_$batch a
531         SET auth_id = b.id,
532             heading_matched = TRUE,
533             existing_heading = b.heading
534         FROM authority.record_entry b
535         WHERE a.heading = b.heading
536         AND   auth_id IS NULL;
537     });
538     $sth->execute();
539     $ct = $sth->rows;
540     report_progress("Matched $ct authorities on heading");
541 }
542
543 sub handle_load_new_auths {
544     my $dbh = shift;
545     my $schema = shift;
546     my $batch = shift;
547
548     my $getct = $dbh->prepare(qq{
549         SELECT COUNT(*)
550         FROM  $schema.auths_$batch
551         WHERE to_import
552         AND NOT imported
553         AND new_auth_id IS NULL
554         AND auth_id IS NULL
555         AND cancelled_auth_id IS NULL
556     });
557     $getct->execute();
558     my $max = $getct->fetchrow_arrayref()->[0];
559
560     report_progress('Number of authorities to add', $max);
561     for (my $i = 1; $i <= $max; $i++) {
562         report_progress('... authorities added', $i) if 0 == $i % 10 or $i == $max;
563         $dbh->begin_work;
564         $dbh->do(qq{
565             INSERT INTO authority.record_entry (marc, last_xact_id)
566             SELECT marc, ? || '-' || id
567             FROM $schema.auths_$batch b
568             WHERE id IN (
569                 SELECT id
570                 FROM $schema.auths_$batch
571                 WHERE to_import
572                 AND NOT imported
573                 AND new_auth_id IS NULL
574                 AND auth_id IS NULL
575                 AND cancelled_auth_id IS NULL
576                 ORDER BY id
577                 LIMIT 1
578             )
579         }, {}, "auths_$batch");
580         $dbh->do(qq{
581             UPDATE $schema.auths_$batch
582             SET imported = TRUE,
583                 new_auth_id = CURRVAL('authority.record_entry_id_seq')
584             WHERE id IN (
585                 SELECT id
586                 FROM $schema.auths_$batch
587                 WHERE to_import
588                 AND NOT imported
589                 AND new_auth_id IS NULL
590                 AND auth_id IS NULL
591                 AND cancelled_auth_id IS NULL
592                 ORDER BY id
593                 LIMIT 1
594             )
595         });
596         $dbh->commit;
597         sleep $wait;
598     }
599 }
600
601 sub handle_overlay_auths_stage1 {
602     my $dbh = shift;
603     my $schema = shift;
604     my $batch = shift;
605
606     my $getct = $dbh->prepare(qq{
607         SELECT COUNT(*)
608         FROM  $schema.auths_$batch
609         WHERE to_import
610         AND NOT imported
611         AND lccn_matched
612         AND heading = existing_heading
613     });
614     $getct->execute();
615     my $max = $getct->fetchrow_arrayref()->[0];
616     report_progress('Number of auths to update', $max);
617
618     $dbh->do(q{
619         UPDATE config.internal_flag SET enabled = TRUE
620         WHERE name = 'ingest.disable_authority_auto_update';
621     });
622     for (my $i = 1; $i <= $max; $i++) {
623         report_progress('... auths updated', $i) if 0 == $i % 10 or $i == $max;
624         $dbh->begin_work;
625         $dbh->do(qq{
626             UPDATE authority.record_entry a
627             SET marc = b.marc,
628                 edit_date = NOW()
629             FROM $schema.auths_$batch b
630             WHERE a.id = b.auth_id
631             AND auth_id IN (
632                 SELECT auth_id
633                 FROM $schema.auths_$batch
634                 WHERE to_import
635                 AND NOT imported
636                 AND lccn_matched
637                 AND heading = existing_heading
638                 ORDER BY id
639                 LIMIT 1
640             )
641         });
642         $dbh->do(qq{
643             UPDATE $schema.auths_$batch
644             SET imported = TRUE
645             WHERE auth_id IN (
646                 SELECT auth_id
647                 FROM $schema.auths_$batch
648                 WHERE to_import
649                 AND NOT imported
650                 AND lccn_matched
651                 AND heading = existing_heading
652                 ORDER BY id
653                 LIMIT 1
654             )
655         });
656         $dbh->commit;
657     }
658     $dbh->do(q{
659         UPDATE config.internal_flag SET enabled = FALSE
660         WHERE name = 'ingest.disable_authority_auto_update';
661     });
662 }
663
664 sub handle_overlay_auths_stage2 {
665     my $dbh = shift;
666     my $schema = shift;
667     my $batch = shift;
668
669     my $getct = $dbh->prepare(qq{
670         SELECT COUNT(*)
671         FROM  $schema.auths_$batch
672         WHERE to_import
673         AND NOT imported
674         AND lccn_matched
675         AND heading <> existing_heading
676     });
677     $getct->execute();
678     my $max = $getct->fetchrow_arrayref()->[0];
679     report_progress('Number of auths to update', $max);
680
681     for (my $i = 1; $i <= $max; $i++) {
682         report_progress('... auths updated', $i) if 0 == $i % 10 or $i == $max;
683         $dbh->begin_work;
684         $dbh->do(qq{
685             UPDATE authority.record_entry a
686             SET marc = b.marc,
687                 edit_date = NOW()
688             FROM $schema.auths_$batch b
689             WHERE a.id = b.auth_id
690             AND auth_id IN (
691                 SELECT auth_id
692                 FROM $schema.auths_$batch
693                 WHERE to_import
694                 AND NOT imported
695                 AND lccn_matched
696                 AND heading <> existing_heading
697                 ORDER BY id
698                 LIMIT 1
699             )
700         });
701         $dbh->do(qq{
702             UPDATE $schema.auths_$batch
703             SET imported = TRUE
704             WHERE auth_id IN (
705                 SELECT auth_id
706                 FROM $schema.auths_$batch
707                 WHERE to_import
708                 AND NOT imported
709                 AND lccn_matched
710                 AND heading <> existing_heading
711                 ORDER BY id
712                 LIMIT 1
713             )
714         });
715         $dbh->commit;
716     }
717 }
718
719 sub handle_overlay_auths_stage3 {
720     my $dbh = shift;
721     my $schema = shift;
722     my $batch = shift;
723
724     my $getct = $dbh->prepare(qq{
725         SELECT COUNT(*)
726         FROM  $schema.auths_$batch
727         WHERE to_import
728         AND NOT imported
729         AND (
730             auth_id IS NULL OR
731             auth_id = cancelled_auth_id
732         )
733         AND cancelled_auth_id IS NOT NULL
734     });
735     $getct->execute();
736     my $max = $getct->fetchrow_arrayref()->[0];
737     report_progress('Number of auths to update', $max);
738
739     for (my $i = 1; $i <= $max; $i++) {
740         report_progress('... auths updated', $i) if 0 == $i % 10 or $i == $max;
741         $dbh->begin_work;
742         $dbh->do(qq{
743             UPDATE authority.record_entry a
744             SET marc = b.marc,
745                 edit_date = NOW()
746             FROM $schema.auths_$batch b
747             WHERE a.id = b.cancelled_auth_id
748             AND cancelled_auth_id IN (
749                 SELECT auth_id
750                 FROM $schema.auths_$batch
751                 WHERE to_import
752                 AND NOT imported
753                 AND (
754                     auth_id IS NULL OR
755                     auth_id = cancelled_auth_id
756                 )
757                 AND cancelled_auth_id IS NOT NULL
758                 ORDER BY id
759                 LIMIT 1
760             )
761         });
762         $dbh->do(qq{
763             UPDATE $schema.auths_$batch
764             SET imported = TRUE
765             WHERE cancelled_auth_id IN (
766                 SELECT cancelled_auth_id
767                 FROM $schema.auths_$batch
768                 WHERE to_import
769                 AND NOT imported
770                 AND (
771                     auth_id IS NULL OR
772                     auth_id = cancelled_auth_id
773                 )
774                 AND cancelled_auth_id IS NOT NULL
775                 ORDER BY id
776                 LIMIT 1
777             )
778         });
779         $dbh->commit;
780     }
781 }
782
783 sub handle_link_auth_auth {
784     my $dbh = shift;
785     my $schema = shift;
786     my $batch = shift;
787
788     $dbh->do(q{
789         UPDATE config.internal_flag SET enabled = TRUE
790         WHERE name = 'ingest.disable_authority_auto_update';
791     });
792
793     my $sth = $dbh->prepare(qq{
794         SELECT COALESCE(new_auth_id, auth_id, cancelled_auth_id) AS id
795         FROM $schema.auths_$batch
796         WHERE imported
797         ORDER BY 1
798     });
799     $sth->execute();
800     my @ids = map { $_->{id} } @{ $sth->fetchall_arrayref({}) };
801     my $i = 0;
802     report_progress(scalar(@ids) . " records to do auth-auth linking");
803     foreach my $id (@ids) {
804         $i++;
805         report_progress('... auth-auth linkings processed', $i) if 0 == $i % 10 or $i == scalar(@ids);
806         system "/openils/bin/authority_authority_linker.pl -r $id -c /openils/conf/opensrf_core.xml";
807     }
808
809     $dbh->do(q{
810         UPDATE config.internal_flag SET enabled = FALSE
811         WHERE name = 'ingest.disable_authority_auto_update';
812     });
813 }
814
815 sub handle_link_auth_bib {
816     my $dbh = shift;
817     my $schema = shift;
818     my $batch = shift;
819     my $link_skipped = shift;
820
821     my $query;
822     if ($link_skipped) {
823         $query = qq{
824             SELECT bib_id AS id
825             FROM $schema.$batch
826             WHERE NOT imported
827             AND skip_reason ~ '^edit'
828             ORDER BY 1
829         };
830     } else {
831         $query = qq{
832             SELECT bib_id AS id
833             FROM $schema.$batch
834             WHERE imported
835             ORDER BY 1
836         };
837     }
838
839     my $sth = $dbh->prepare($query);
840     $sth->execute();
841     my @ids = map { $_->{id} } @{ $sth->fetchall_arrayref({}) };
842     my $i = 0;
843     report_progress(scalar(@ids) . " records to do auth-bib linking");
844     foreach my $id (@ids) {
845         $i++;
846         report_progress('... auth-bib linkings processed', $i) if 0 == $i % 10 or $i == scalar(@ids);
847         system "/openils/bin/authority_control_fields.pl --record $id -c /openils/conf/opensrf_core.xml";
848     }
849
850 }
851
852 sub handle_export_skipped_bibs {
853     my $dbh = shift;
854     my $schema = shift;
855     my $batch = shift;
856     my $output = shift;
857
858     my $outfh;
859     open($outfh, '>', $output) or die("Could not open input file $output: $!\n");
860     binmode $outfh, ':utf8';
861
862     my $sth = $dbh->prepare(qq{
863         SELECT marc
864         FROM $schema.$batch
865         WHERE skip_reason ~ '^edit'
866         ORDER BY id
867     });
868     $sth->execute();
869    
870     while (my $row  = $sth->fetchrow_hashref()) {
871         my $marc = MARC::Record->new_from_xml($row->{marc});
872         print $outfh $marc->as_usmarc();
873     }
874     $outfh->close();
875 }
876
877 sub handle_export_skipped_auths {
878     my $dbh = shift;
879     my $schema = shift;
880     my $batch = shift;
881     my $output = shift;
882
883     my $outfh;
884     open($outfh, '>', $output) or die("Could not open input file $output: $!\n");
885     binmode $outfh, ':utf8';
886
887     my $sth = $dbh->prepare(qq{
888         SELECT marc
889         FROM $schema.auths_$batch
890         WHERE NOT imported
891         ORDER BY id
892     });
893     $sth->execute();
894    
895     while (my $row  = $sth->fetchrow_hashref()) {
896         my $marc = MARC::Record->new_from_xml($row->{marc});
897         print $outfh $marc->as_usmarc();
898     }
899     $outfh->close();
900 }