53c4080ed20398479c5fb54ff8c25de9d5cb1716
[migration-tools.git] / eg_staged_bib_overlay
1 #!/usr/bin/perl
2
3 # Copyright (c) 2016 Equinox Software, Inc.
4 # Author: Galen Charlton <gmc@esilibrary.com>
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2, or (at your option)
9 # any later version.
10 #
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 # GNU General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program.  If not, see <http://www.gnu.org/licenses/>
18
19 use strict;
20 use warnings;
21
22 use Getopt::Long;
23 use MARC::Record;
24 use MARC::File::XML (BinaryEncoding => 'utf8');
25 use DBI;
26 use OpenILS::Application::AppUtils;
27
28 my $action;
29 my $schema = 'bib_loads';
30 my $db;
31 my $dbuser;
32 my $dbpw;
33 my $dbhost;
34 my $batch;
35 my $cutoff;
36 my $wait = 1;
37
38 my $ret = GetOptions(
39     'action:s'      => \$action,
40     'schema:s'      => \$schema,
41     'db:s'          => \$db,
42     'dbuser:s'      => \$dbuser,
43     'dbhost:s'      => \$dbhost,
44     'dbpw:s'        => \$dbpw,
45     'batch:s'       => \$batch,
46     'cutoff:s'      => \$cutoff,
47     'wait:i'        => \$wait,
48 );
49
50 abort('must specify --action') unless defined $action;
51 abort('must specify --schema') unless defined $schema;
52 abort('must specify --db') unless defined $db;
53 abort('must specify --dbuser') unless defined $dbuser;
54 abort('must specify --dbhost') unless defined $dbhost;
55 abort('must specify --dbpw') unless defined $dbpw;
56 abort('must specify --batch') unless defined $batch;
57
58 abort(q{--action must be "stage_bibs", "filter_bibs", "load_bibs", "stage_auths",
59 "match_auths", "load_new_auths", "overlay_auths_stage1",
60 "overlay_auths_stage2", "overlay_auths_stage3", "link_auth_auth"}) unless
61     $action eq 'filter_bibs' or
62     $action eq 'stage_bibs' or
63     $action eq 'load_bibs' or
64     $action eq 'stage_auths' or
65     $action eq 'match_auths' or
66     $action eq 'load_new_auths' or
67     $action eq 'overlay_auths_stage1' or
68     $action eq 'overlay_auths_stage2' or
69     $action eq 'overlay_auths_stage3' or
70     $action eq 'link_auth_auth' or
71     $action eq 'link_auth_bib'
72 ;
73
74 my $dbh = connect_db($db, $dbuser, $dbpw, $dbhost);
75
76 if ($action eq 'stage_bibs') {
77     abort('must specify at least one input file') unless @ARGV;
78     handle_stage_bibs($dbh, $schema, $batch);
79 }
80
81 if ($action eq 'filter_bibs') {
82     abort('must specify cutoff date when filtering') unless defined $cutoff;
83     handle_filter_bibs($dbh, $schema, $batch, $cutoff);
84 }
85
86 if ($action eq 'load_bibs' ) {
87     handle_load_bibs($dbh, $schema, $batch, $wait);
88 }
89
90 if ($action eq 'stage_auths') {
91     abort('must specify at least one input file') unless @ARGV;
92     handle_stage_auths($dbh, $schema, $batch);
93 }
94
95 if ($action eq 'match_auths') {
96     handle_match_auths($dbh, $schema, $batch);
97 }
98
99 if ($action eq 'load_new_auths') {
100     handle_load_new_auths($dbh, $schema, $batch);
101 }
102
103 if ($action eq 'overlay_auths_stage1') {
104     handle_overlay_auths_stage1($dbh, $schema, $batch);
105 }
106 if ($action eq 'overlay_auths_stage2') {
107     handle_overlay_auths_stage2($dbh, $schema, $batch);
108 }
109 if ($action eq 'overlay_auths_stage3') {
110     handle_overlay_auths_stage3($dbh, $schema, $batch);
111 }
112
113 if ($action eq 'link_auth_auth') {
114     handle_link_auth_auth($dbh, $schema, $batch);
115 }
116 if ($action eq 'link_auth_bib') {
117     handle_link_auth_bib($dbh, $schema, $batch);
118 }
119
120 sub abort {
121     my $msg = shift;
122     print STDERR "$0: $msg", "\n";
123     print_usage();
124     exit 1;
125 }
126
127 sub print_usage {
128     print <<_USAGE_;
129
130 Utility to stage and overlay bib records in an Evergreen database. This
131 expects that the incoming records will have been previously exported
132 from that Evergreen database and modified in some fashion (e.g., for
133 authority record processing) and that the bib ID can be found in the
134 901\$c subfield.
135
136 This program has several modes controlled by the --action switch:
137
138   --action stage_bibs  - load MARC bib records into a staging table
139   --action filter_bibs - mark previously staged bibs that should
140                          be excluded from a subsequent load, either
141                          because the target bib is deleted in Evergreen
142                          or the record was modified after a date
143                          specified by the --cutoff switch
144   --action load_bibs   - overlay bib records using a previously staged
145                          batch, one at a time. After each bib, it will
146                          wait the number of seconds specified by the
147                          --wait switch.
148
149   --action stage_auths          - load MARC authorities into staging
150                                   table
151   --action match_auths          - identify matches with authority
152                                   records already present in the
153                                   Evergreen database; matching is
154                                   based on LCCN, cancelled LCCN, and
155                                   main heading.
156   --action load_new_auths       - load new (unmatched) authorities
157   --action overlay_auths_stage1 - overlay based on LCCN where
158                                   heading has NOT changed; this step
159                                   disables propagation to bib records
160   --action overlay_auths_stage2 - overlay based on LCCN where heading
161                                   HAS changed; propagates changes
162                                   to bib records
163   --action overlay_auths_stage3 - overlay for records where a cancelled
164                                   LCCN is replaced with a new one
165   --action link_auth_auth       - run authority_authority_linker.pl for
166                                   the authorities that were overlaid
167                                   or added in this batch.
168   --action link_auth_bib        - run authority_control_fields.pl for
169                                   the bibs that were overlaid in this
170                                   batch.
171
172 Several switches are used regardless of the specified action:
173
174   --schema  - Pg schema in which staging table will live; should be
175               created beforehand
176   --batch   - name of bib batch; will also be used as the name
177               of the staging tables
178   --db      - database name
179   --dbuser  - database user
180   --dbpw    - database password
181   --dbhost  - database host
182
183 Examples:
184
185 $0 --schema bib_load --batch bibs_2016_01 --db evergreen \\
186    --dbuser evergreen --dbpw evergreen --dbhost localhost \\
187    --action stage_bibs -- file1.mrc file2.mrc [...]
188
189 $0 --schema bib_load --batch bibs_2016_01 --db evergreen \\
190    --dbuser evergreen --dbpw evergreen --dbhost localhost \\
191    --action filter_bibs --cutoff 2016-01-02
192
193 $0 --schema bib_load --batch bibs_2016_01 --db evergreen \\
194    --dbuser evergreen --dbpw evergreen --dbhost localhost \\
195    --action load_bibs --wait 2
196
197 _USAGE_
198 }
199
200
201 sub report_progress {
202     my ($msg, $counter) = @_;
203     if (defined $counter) {
204         print STDERR "$msg: $counter\n";
205     } else {
206         print STDERR "$msg\n";
207     }
208 }
209
210 sub connect_db {
211     my ($db, $dbuser, $dbpw, $dbhost) = @_;
212
213     my $dsn = "dbi:Pg:host=$dbhost;dbname=$db;port=5432";
214
215     my $attrs = {
216         ShowErrorStatement => 1,
217         RaiseError => 1,
218         PrintError => 1,
219         pg_enable_utf8 => 1,
220     };
221     my $dbh = DBI->connect($dsn, $dbuser, $dbpw, $attrs);
222
223     return $dbh;
224 }
225
226 sub handle_stage_bibs {
227     my $dbh = shift;
228     my $schema = shift;
229     my $batch = shift;
230
231     $dbh->do(qq{
232         DROP TABLE IF EXISTS $schema.$batch;
233     });
234     $dbh->do(qq{
235         CREATE TABLE $schema.$batch (
236             id          SERIAL,
237             marc        TEXT,
238             bib_id      BIGINT,
239             imported    BOOLEAN DEFAULT FALSE,
240             to_import   BOOLEAN DEFAULT TRUE,
241             skip_reason TEXT
242         )
243     });
244
245     local $/ = "\035";
246     my $i = 0;
247     binmode STDIN, ':utf8';
248     my $ins = $dbh->prepare("INSERT INTO $schema.$batch (marc, bib_id) VALUES (?, ?)");
249     $dbh->begin_work;
250     while (<>) {
251         $i++;
252         if (0 == $i % 100) {
253             report_progress("Records staged", $i);
254             $dbh->commit;
255             $dbh->begin_work;
256         }
257         my $marc = MARC::Record->new_from_usmarc($_);
258         my $bibid = $marc->subfield('901', 'c');
259         if ($bibid !~ /^\d+$/) {
260             print STDERR "Record $i is suspect; skipping\n";
261             next;
262         }
263         my $xml = OpenILS::Application::AppUtils->entityize($marc->as_xml_record());
264         $ins->execute($xml, $bibid);
265     }
266     $dbh->commit;
267     report_progress("Records staged", $i) if 0 != $i % 100;
268     $dbh->do(qq/
269         CREATE INDEX ${batch}_bib_id_idx ON
270             $schema.$batch (bib_id);
271     /);
272     $dbh->do(qq/
273         CREATE INDEX ${batch}_id_idx ON
274             $schema.$batch (id);
275     /);
276 }
277
278 sub handle_filter_bibs {
279     my $dbh = shift;
280     my $schema = shift;
281     my $batch = shift;
282     my $cutoff = shift;
283
284     my $sth1 = $dbh->prepare(qq{
285         UPDATE $schema.$batch
286         SET to_import = FALSE,
287             skip_reason = 'deleted'
288         WHERE bib_id IN (
289             SELECT id
290             FROM biblio.record_entry
291             WHERE deleted
292         )
293         AND to_import
294         AND NOT imported
295     });
296     $sth1->execute();
297     my $ct = $sth1->rows;
298     report_progress("Filtering out $ct records that are currently deleted");
299
300     my $sth2 = $dbh->prepare(qq{
301         UPDATE $schema.$batch
302         SET to_import = FALSE,
303             skip_reason = 'edited after cutoff of $cutoff'
304         WHERE bib_id IN (
305             SELECT id
306             FROM biblio.record_entry
307             WHERE edit_date >= ?
308         )
309         AND to_import
310         AND NOT imported
311     });
312     $sth2->execute($cutoff);
313     $ct = $sth2->rows;
314     report_progress("Filtering out $ct records edited after cutoff date of $cutoff");
315
316     my $sth3 = $dbh->prepare(qq{
317         UPDATE $schema.$batch
318         SET to_import = FALSE,
319             skip_reason = 'XML is not well-formed'
320         WHERE NOT xml_is_well_formed(marc)
321         AND to_import
322         AND NOT imported
323     });
324     $sth3->execute();
325     $ct = $sth3->rows;
326     report_progress("Filtering out $ct records whose XML is not well-formed");
327 }
328
329 sub handle_load_bibs {
330     my $dbh = shift;
331     my $schema = shift;
332     my $batch = shift;
333     my $wait = shift;
334
335     my $getct = $dbh->prepare(qq{
336         SELECT COUNT(*)
337         FROM  $schema.$batch
338         WHERE to_import
339         AND NOT imported
340     });
341     $getct->execute();
342     my $max = $getct->fetchrow_arrayref()->[0];
343
344     report_progress('Number of bibs to update', $max);
345     for (my $i = 1; $i <= $max; $i++) {
346         report_progress('... bibs updated', $i) if 0 == $i % 10 or $i == $max;
347         $dbh->begin_work;
348         $dbh->do(qq{
349             UPDATE biblio.record_entry a
350             SET marc = b.marc
351             FROM $schema.$batch b
352             WHERE a.id = b.bib_id
353             AND bib_id IN (
354                 SELECT bib_id
355                 FROM $schema.$batch
356                 WHERE to_import
357                 AND NOT imported
358                 ORDER BY id
359                 LIMIT 1
360             )
361         });
362         $dbh->do(qq{
363             UPDATE $schema.$batch
364             SET imported = TRUE
365             WHERE bib_id IN (
366                 SELECT bib_id
367                 FROM $schema.$batch
368                 WHERE to_import
369                 AND NOT imported
370                 ORDER BY id
371                 LIMIT 1
372             )
373         });
374         $dbh->commit;
375         sleep $wait;
376     }
377 }
378
379 sub handle_stage_auths {
380     my $dbh = shift;
381     my $schema = shift;
382     my $batch = shift;
383
384     $dbh->do(qq{
385         DROP TABLE IF EXISTS $schema.auths_$batch;
386     });
387     $dbh->do(qq{
388         CREATE TABLE $schema.auths_$batch (
389             id          SERIAL,
390             marc        TEXT,
391             auth_id     BIGINT,
392             new_auth_id BIGINT,
393             existing_heading TEXT,
394             lccn        TEXT,
395             cancelled_lccn TEXT,
396             cancelled_auth_id BIGINT,
397             heading     TEXT,
398             lccn_matched BOOLEAN DEFAULT FALSE,
399             heading_matched BOOLEAN DEFAULT FALSE,
400             imported    BOOLEAN DEFAULT FALSE,
401             to_import   BOOLEAN DEFAULT TRUE,
402             skip_reason TEXT
403         )
404     });
405
406     local $/ = "\035";
407     my $i = 0;
408     binmode STDIN, ':utf8';
409     my $ins = $dbh->prepare(qq{
410         INSERT INTO $schema.auths_$batch (marc, auth_id, lccn, cancelled_lccn, heading)
411         VALUES (?, ?, ?, ?, authority.normalize_heading(?))
412     });
413     $dbh->begin_work;
414     while (<>) {
415         $i++;
416         if (0 == $i % 100) {
417             report_progress("Records staged", $i);
418             $dbh->commit;
419             $dbh->begin_work;
420         }
421         my $marc = MARC::Record->new_from_usmarc($_);
422         my $authid = $marc->subfield('901', 'c');
423         if (defined($authid) && $authid !~ /^\d+$/) {
424             undef $authid;
425         }
426         my $lccn = $marc->subfield('010', 'a');
427         if (defined $lccn) {
428             $lccn =~ s/^\s+//;
429             $lccn =~ s/\s+$//;
430             $lccn =~ s/\s+/ /g;
431         }
432         my $cancelled_lccn = $marc->subfield('010', 'z');
433         if (defined $cancelled_lccn) {
434             $cancelled_lccn =~ s/^\s+//;
435             $cancelled_lccn =~ s/\s+$//;
436             $cancelled_lccn =~ s/\s+/ /g;
437         }
438         my $xml = OpenILS::Application::AppUtils->entityize($marc->as_xml_record());
439         $ins->execute($xml, $authid, $lccn, $cancelled_lccn, $xml);
440     }
441     $dbh->commit;
442     report_progress("Records staged", $i) if 0 != $i % 100;
443     $dbh->do(qq/
444         CREATE INDEX auths_${batch}_auth_id_idx ON
445             $schema.auths_$batch (auth_id);
446     /);
447     $dbh->do(qq/
448         CREATE INDEX auths_${batch}_id_idx ON
449             $schema.auths_$batch (id);
450     /);
451     $dbh->do(qq/
452         CREATE INDEX auths_${batch}_lccn_idx ON
453             $schema.auths_$batch (lccn);
454     /);
455 }
456
457 sub handle_match_auths {
458     my ($dbh, $schema, $batch) = @_;
459
460     my $sth = $dbh->prepare(qq{
461         UPDATE $schema.auths_${batch} a
462         SET auth_id = b.record,
463             lccn_matched = TRUE,
464             existing_heading = authority.normalize_heading(c.marc)
465         FROM authority.full_rec b
466         JOIN authority.record_entry c ON (b.record = c.id)
467         WHERE tag = '010'
468         AND   subfield = 'a'
469         AND   value = lccn
470         AND   auth_id IS NULL
471         AND   lccn IS NOT NULL;
472     });
473     $sth->execute();
474     my $ct = $sth->rows;
475     report_progress("Matched $ct authorities on LCCN");
476
477     $sth = $dbh->prepare(qq{
478         UPDATE $schema.auths_${batch} a
479         SET cancelled_auth_id = b.record
480         FROM authority.full_rec b
481         WHERE tag = '010'
482         AND   subfield = 'a'
483         AND   value = cancelled_lccn
484         AND   auth_id IS NULL
485         AND   cancelled_lccn IS NOT NULL;
486     });
487     $sth->execute();
488     $ct = $sth->rows;
489     report_progress("Matched $ct authorities on cancelled LCCN");
490
491     $sth = $dbh->prepare(qq{
492         UPDATE $schema.auths_$batch a
493         SET auth_id = b.id,
494             heading_matched = TRUE,
495             existing_heading = b.heading
496         FROM authority.record_entry b
497         WHERE a.heading = b.heading
498         AND   auth_id IS NULL;
499     });
500     $sth->execute();
501     $ct = $sth->rows;
502     report_progress("Matched $ct authorities on heading");
503 }
504
505 sub handle_load_new_auths {
506     my $dbh = shift;
507     my $schema = shift;
508     my $batch = shift;
509
510     my $getct = $dbh->prepare(qq{
511         SELECT COUNT(*)
512         FROM  $schema.auths_$batch
513         WHERE to_import
514         AND NOT imported
515         AND new_auth_id IS NULL
516         AND auth_id IS NULL
517         AND cancelled_auth_id IS NULL
518     });
519     $getct->execute();
520     my $max = $getct->fetchrow_arrayref()->[0];
521
522     report_progress('Number of authorities to add', $max);
523     for (my $i = 1; $i <= $max; $i++) {
524         report_progress('... authorities added', $i) if 0 == $i % 10 or $i == $max;
525         $dbh->begin_work;
526         $dbh->do(qq{
527             INSERT INTO authority.record_entry (marc, last_xact_id)
528             SELECT marc, ? || '-' || id
529             FROM $schema.auths_$batch b
530             WHERE id IN (
531                 SELECT id
532                 FROM $schema.auths_$batch
533                 WHERE to_import
534                 AND NOT imported
535                 AND new_auth_id IS NULL
536                 AND auth_id IS NULL
537                 AND cancelled_auth_id IS NULL
538                 ORDER BY id
539                 LIMIT 1
540             )
541         }, {}, "auths_$batch");
542         $dbh->do(qq{
543             UPDATE $schema.auths_$batch
544             SET imported = TRUE,
545                 new_auth_id = CURRVAL('authority.record_entry_id_seq')
546             WHERE id IN (
547                 SELECT id
548                 FROM $schema.auths_$batch
549                 WHERE to_import
550                 AND NOT imported
551                 AND new_auth_id IS NULL
552                 AND auth_id IS NULL
553                 AND cancelled_auth_id IS NULL
554                 ORDER BY id
555                 LIMIT 1
556             )
557         });
558         $dbh->commit;
559     }
560 }
561
562 sub handle_overlay_auths_stage1 {
563     my $dbh = shift;
564     my $schema = shift;
565     my $batch = shift;
566
567     my $getct = $dbh->prepare(qq{
568         SELECT COUNT(*)
569         FROM  $schema.auths_$batch
570         WHERE to_import
571         AND NOT imported
572         AND lccn_matched
573         AND heading = existing_heading
574     });
575     $getct->execute();
576     my $max = $getct->fetchrow_arrayref()->[0];
577     report_progress('Number of auths to update', $max);
578
579     $dbh->do(q{
580         UPDATE config.internal_flag SET enabled = TRUE
581         WHERE name = 'ingest.disable_authority_auto_update';
582     });
583     for (my $i = 1; $i <= $max; $i++) {
584         report_progress('... auths updated', $i) if 0 == $i % 10 or $i == $max;
585         $dbh->begin_work;
586         $dbh->do(qq{
587             UPDATE authority.record_entry a
588             SET marc = b.marc,
589                 edit_date = NOW()
590             FROM $schema.auths_$batch b
591             WHERE a.id = b.auth_id
592             AND auth_id IN (
593                 SELECT auth_id
594                 FROM $schema.auths_$batch
595                 WHERE to_import
596                 AND NOT imported
597                 AND lccn_matched
598                 AND heading = existing_heading
599                 ORDER BY id
600                 LIMIT 1
601             )
602         });
603         $dbh->do(qq{
604             UPDATE $schema.auths_$batch
605             SET imported = TRUE
606             WHERE auth_id IN (
607                 SELECT auth_id
608                 FROM $schema.auths_$batch
609                 WHERE to_import
610                 AND NOT imported
611                 AND lccn_matched
612                 AND heading = existing_heading
613                 ORDER BY id
614                 LIMIT 1
615             )
616         });
617         $dbh->commit;
618     }
619     $dbh->do(q{
620         UPDATE config.internal_flag SET enabled = FALSE
621         WHERE name = 'ingest.disable_authority_auto_update';
622     });
623 }
624
625 sub handle_overlay_auths_stage2 {
626     my $dbh = shift;
627     my $schema = shift;
628     my $batch = shift;
629
630     my $getct = $dbh->prepare(qq{
631         SELECT COUNT(*)
632         FROM  $schema.auths_$batch
633         WHERE to_import
634         AND NOT imported
635         AND lccn_matched
636         AND heading <> existing_heading
637     });
638     $getct->execute();
639     my $max = $getct->fetchrow_arrayref()->[0];
640     report_progress('Number of auths to update', $max);
641
642     for (my $i = 1; $i <= $max; $i++) {
643         report_progress('... auths updated', $i) if 0 == $i % 10 or $i == $max;
644         $dbh->begin_work;
645         $dbh->do(qq{
646             UPDATE authority.record_entry a
647             SET marc = b.marc,
648                 edit_date = NOW()
649             FROM $schema.auths_$batch b
650             WHERE a.id = b.auth_id
651             AND auth_id IN (
652                 SELECT auth_id
653                 FROM $schema.auths_$batch
654                 WHERE to_import
655                 AND NOT imported
656                 AND lccn_matched
657                 AND heading <> existing_heading
658                 ORDER BY id
659                 LIMIT 1
660             )
661         });
662         $dbh->do(qq{
663             UPDATE $schema.auths_$batch
664             SET imported = TRUE
665             WHERE auth_id IN (
666                 SELECT auth_id
667                 FROM $schema.auths_$batch
668                 WHERE to_import
669                 AND NOT imported
670                 AND lccn_matched
671                 AND heading <> existing_heading
672                 ORDER BY id
673                 LIMIT 1
674             )
675         });
676         $dbh->commit;
677     }
678 }
679
680 sub handle_overlay_auths_stage3 {
681     my $dbh = shift;
682     my $schema = shift;
683     my $batch = shift;
684
685     my $getct = $dbh->prepare(qq{
686         SELECT COUNT(*)
687         FROM  $schema.auths_$batch
688         WHERE to_import
689         AND NOT imported
690         AND (
691             auth_id IS NULL OR
692             auth_id = cancelled_auth_id
693         )
694         AND cancelled_auth_id IS NOT NULL
695     });
696     $getct->execute();
697     my $max = $getct->fetchrow_arrayref()->[0];
698     report_progress('Number of auths to update', $max);
699
700     for (my $i = 1; $i <= $max; $i++) {
701         report_progress('... auths updated', $i) if 0 == $i % 10 or $i == $max;
702         $dbh->begin_work;
703         $dbh->do(qq{
704             UPDATE authority.record_entry a
705             SET marc = b.marc,
706                 edit_date = NOW()
707             FROM $schema.auths_$batch b
708             WHERE a.id = b.cancelled_auth_id
709             AND cancelled_auth_id IN (
710                 SELECT auth_id
711                 FROM $schema.auths_$batch
712                 WHERE to_import
713                 AND NOT imported
714                 AND (
715                     auth_id IS NULL OR
716                     auth_id = cancelled_auth_id
717                 )
718                 AND cancelled_auth_id IS NOT NULL
719                 ORDER BY id
720                 LIMIT 1
721             )
722         });
723         $dbh->do(qq{
724             UPDATE $schema.auths_$batch
725             SET imported = TRUE
726             WHERE cancelled_auth_id IN (
727                 SELECT cancelled_auth_id
728                 FROM $schema.auths_$batch
729                 WHERE to_import
730                 AND NOT imported
731                 AND (
732                     auth_id IS NULL OR
733                     auth_id = cancelled_auth_id
734                 )
735                 AND cancelled_auth_id IS NOT NULL
736                 ORDER BY id
737                 LIMIT 1
738             )
739         });
740         $dbh->commit;
741     }
742 }
743
744 sub handle_link_auth_auth {
745     my $dbh = shift;
746     my $schema = shift;
747     my $batch = shift;
748
749     $dbh->do(q{
750         UPDATE config.internal_flag SET enabled = TRUE
751         WHERE name = 'ingest.disable_authority_auto_update';
752     });
753
754     my $sth = $dbh->prepare(qq{
755         SELECT COALESCE(new_auth_id, auth_id, cancelled_auth_id) AS id
756         FROM $schema.auths_$batch
757         WHERE imported
758         ORDER BY 1
759     });
760     $sth->execute();
761     my @ids = map { $_->{id} } @{ $sth->fetchall_arrayref({}) };
762     my $i = 0;
763     report_progress(scalar(@ids) . " records to do auth-auth linking");
764     foreach my $id (@ids) {
765         $i++;
766         report_progress('... auth-auth linkings processed', $i) if 0 == $i % 10 or $i == scalar(@ids);
767         system "/openils/bin/authority_authority_linker.pl -r $id -c /openils/conf/opensrf_core.xml";
768     }
769
770     $dbh->do(q{
771         UPDATE config.internal_flag SET enabled = FALSE
772         WHERE name = 'ingest.disable_authority_auto_update';
773     });
774 }
775
776 sub handle_link_auth_bib {
777     my $dbh = shift;
778     my $schema = shift;
779     my $batch = shift;
780
781     my $sth = $dbh->prepare(qq{
782         SELECT bib_id AS id
783         FROM $schema.$batch
784         WHERE imported
785         ORDER BY 1
786     });
787     $sth->execute();
788     my @ids = map { $_->{id} } @{ $sth->fetchall_arrayref({}) };
789     my $i = 0;
790     report_progress(scalar(@ids) . " records to do auth-bib linking");
791     foreach my $id (@ids) {
792         $i++;
793         report_progress('... auth-bib linkings processed', $i) if 0 == $i % 10 or $i == scalar(@ids);
794         system "/openils/bin/authority_control_fields.pl --record $id -c /openils/conf/opensrf_core.xml";
795     }
796
797 }