8b419b509d2fd8cde95df7c67344838dc52cc88b
[migration-tools.git] / eg_staged_bib_overlay
1 #!/usr/bin/perl
2
3 # Copyright (c) 2016 Equinox Software, Inc.
4 # Author: Galen Charlton <gmc@esilibrary.com>
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2, or (at your option)
9 # any later version.
10 #
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 # GNU General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program.  If not, see <http://www.gnu.org/licenses/>
18
19 use strict;
20 use warnings;
21
22 use Getopt::Long;
23 use MARC::Record;
24 use MARC::File::XML (BinaryEncoding => 'utf8');
25 use DBI;
26 use OpenILS::Application::AppUtils;
27
28 my $action;
29 my $schema = 'bib_loads';
30 my $db;
31 my $dbuser;
32 my $dbpw;
33 my $dbhost;
34 my $batch;
35 my $cutoff;
36 my $wait = 1;
37
38 my $ret = GetOptions(
39     'action:s'      => \$action,
40     'schema:s'      => \$schema,
41     'db:s'          => \$db,
42     'dbuser:s'      => \$dbuser,
43     'dbhost:s'      => \$dbhost,
44     'dbpw:s'        => \$dbpw,
45     'batch:s'       => \$batch,
46     'cutoff:s'      => \$cutoff,
47     'wait:i'        => \$wait,
48 );
49
50 abort('must specify --action') unless defined $action;
51 abort('must specify --schema') unless defined $schema;
52 abort('must specify --db') unless defined $db;
53 abort('must specify --dbuser') unless defined $dbuser;
54 abort('must specify --dbhost') unless defined $dbhost;
55 abort('must specify --dbpw') unless defined $dbpw;
56 abort('must specify --batch') unless defined $batch;
57
58 abort(q{--action must be "stage_bibs", "filter_bibs", "load_bibs", "stage_auths",
59 "match_auths", "load_new_auths", "overlay_auths_stage1",
60 "overlay_auths_stage2", "overlay_auths_stage3", "link_auth_auth"}) unless
61     $action eq 'filter_bibs' or
62     $action eq 'stage_bibs' or
63     $action eq 'load_bibs' or
64     $action eq 'stage_auths' or
65     $action eq 'match_auths' or
66     $action eq 'load_new_auths' or
67     $action eq 'overlay_auths_stage1' or
68     $action eq 'overlay_auths_stage2' or
69     $action eq 'overlay_auths_stage3' or
70     $action eq 'link_auth_auth'
71 ;
72
73 my $dbh = connect_db($db, $dbuser, $dbpw, $dbhost);
74
75 if ($action eq 'stage_bibs') {
76     abort('must specify at least one input file') unless @ARGV;
77     handle_stage_bibs($dbh, $schema, $batch);
78 }
79
80 if ($action eq 'filter_bibs') {
81     abort('must specify cutoff date when filtering') unless defined $cutoff;
82     handle_filter_bibs($dbh, $schema, $batch, $cutoff);
83 }
84
85 if ($action eq 'load_bibs' ) {
86     handle_load_bibs($dbh, $schema, $batch, $wait);
87 }
88
89 if ($action eq 'stage_auths') {
90     abort('must specify at least one input file') unless @ARGV;
91     handle_stage_auths($dbh, $schema, $batch);
92 }
93
94 if ($action eq 'match_auths') {
95     handle_match_auths($dbh, $schema, $batch);
96 }
97
98 if ($action eq 'load_new_auths') {
99     handle_load_new_auths($dbh, $schema, $batch);
100 }
101
102 if ($action eq 'overlay_auths_stage1') {
103     handle_overlay_auths_stage1($dbh, $schema, $batch);
104 }
105 if ($action eq 'overlay_auths_stage2') {
106     handle_overlay_auths_stage2($dbh, $schema, $batch);
107 }
108 if ($action eq 'overlay_auths_stage3') {
109     handle_overlay_auths_stage3($dbh, $schema, $batch);
110 }
111
112 if ($action eq 'link_auth_auth') {
113     handle_link_auth_auth($dbh, $schema, $batch);
114 }
115
116 sub abort {
117     my $msg = shift;
118     print STDERR "$0: $msg", "\n";
119     print_usage();
120     exit 1;
121 }
122
123 sub print_usage {
124     print <<_USAGE_;
125
126 Utility to stage and overlay bib records in an Evergreen database. This
127 expects that the incoming records will have been previously exported
128 from that Evergreen database and modified in some fashion (e.g., for
129 authority record processing) and that the bib ID can be found in the
130 901\$c subfield.
131
132 This program has several modes controlled by the --action switch:
133
134   --action stage_bibs  - load MARC bib records into a staging table
135   --action filter_bibs - mark previously staged bibs that should
136                          be excluded from a subsequent load, either
137                          because the target bib is deleted in Evergreen
138                          or the record was modified after a date
139                          specified by the --cutoff switch
140   --action load_bibs   - overlay bib records using a previously staged
141                          batch, one at a time. After each bib, it will
142                          wait the number of seconds specified by the
143                          --wait switch.
144
145   --action stage_auths          - load MARC authorities into staging
146                                   table
147   --action match_auths          - identify matches with authority
148                                   records already present in the
149                                   Evergreen database; matching is
150                                   based on LCCN, cancelled LCCN, and
151                                   main heading.
152   --action load_new_auths       - load new (unmatched) authorities
153   --action overlay_auths_stage1 - overlay based on LCCN where
154                                   heading has not change; this step
155                                   disables propagation to bib records
156   --action overlay_auths_stage2 - overlay based on LCCN where heading
157                                   has NOT changed; propagates changes
158                                   to bib records
159   --action overlay_auths_stage3 - overlay for records where a cancelled
160                                   LCCN is replaced with a new one
161   --action link_auth_auth       - run authority_authority_linker.pl for
162                                   the authorities that were overlaid
163                                   or added in this batch.
164
165 Several switches are used regardless of the specified action:
166
167   --schema  - Pg schema in which staging table will live; should be
168               created beforehand
169   --batch   - name of bib batch; will also be used as the name
170               of the staging tables
171   --db      - database name
172   --dbuser  - database user
173   --dbpw    - database password
174   --dbhost  - database host
175
176 Examples:
177
178 $0 --schema bib_load --batch bibs_2016_01 --db evergreen \\
179    --dbuser evergreen --dbpw evergreen --dbhost localhost \\
180    --action stage_bibs -- file1.mrc file2.mrc [...]
181
182 $0 --schema bib_load --batch bibs_2016_01 --db evergreen \\
183    --dbuser evergreen --dbpw evergreen --dbhost localhost \\
184    --action filter_bibs --cutoff 2016-01-02
185
186 $0 --schema bib_load --batch bibs_2016_01 --db evergreen \\
187    --dbuser evergreen --dbpw evergreen --dbhost localhost \\
188    --action load_bibs --wait 2
189
190 _USAGE_
191 }
192
193
194 sub report_progress {
195     my ($msg, $counter) = @_;
196     if (defined $counter) {
197         print STDERR "$msg: $counter\n";
198     } else {
199         print STDERR "$msg\n";
200     }
201 }
202
203 sub connect_db {
204     my ($db, $dbuser, $dbpw, $dbhost) = @_;
205
206     my $dsn = "dbi:Pg:host=$dbhost;dbname=$db;port=5432";
207
208     my $attrs = {
209         ShowErrorStatement => 1,
210         RaiseError => 1,
211         PrintError => 1,
212         pg_enable_utf8 => 1,
213     };
214     my $dbh = DBI->connect($dsn, $dbuser, $dbpw, $attrs);
215
216     return $dbh;
217 }
218
219 sub handle_stage_bibs {
220     my $dbh = shift;
221     my $schema = shift;
222     my $batch = shift;
223
224     $dbh->do(qq{
225         DROP TABLE IF EXISTS $schema.$batch;
226     });
227     $dbh->do(qq{
228         CREATE TABLE $schema.$batch (
229             id          SERIAL,
230             marc        TEXT,
231             bib_id      BIGINT,
232             imported    BOOLEAN DEFAULT FALSE,
233             to_import   BOOLEAN DEFAULT TRUE,
234             skip_reason TEXT
235         )
236     });
237
238     local $/ = "\035";
239     my $i = 0;
240     binmode STDIN, ':utf8';
241     my $ins = $dbh->prepare("INSERT INTO $schema.$batch (marc, bib_id) VALUES (?, ?)");
242     $dbh->begin_work;
243     while (<>) {
244         $i++;
245         if (0 == $i % 100) {
246             report_progress("Records staged", $i);
247             $dbh->commit;
248             $dbh->begin_work;
249         }
250         my $marc = MARC::Record->new_from_usmarc($_);
251         my $bibid = $marc->subfield('901', 'c');
252         if ($bibid !~ /^\d+$/) {
253             print STDERR "Record $i is suspect; skipping\n";
254             next;
255         }
256         my $xml = OpenILS::Application::AppUtils->entityize($marc->as_xml_record());
257         $ins->execute($xml, $bibid);
258     }
259     $dbh->commit;
260     report_progress("Records staged", $i) if 0 != $i % 100;
261     $dbh->do(qq/
262         CREATE INDEX ${batch}_bib_id_idx ON
263             $schema.$batch (bib_id);
264     /);
265     $dbh->do(qq/
266         CREATE INDEX ${batch}_id_idx ON
267             $schema.$batch (id);
268     /);
269 }
270
271 sub handle_filter_bibs {
272     my $dbh = shift;
273     my $schema = shift;
274     my $batch = shift;
275     my $cutoff = shift;
276
277     my $sth1 = $dbh->prepare(qq{
278         UPDATE $schema.$batch
279         SET to_import = FALSE,
280             skip_reason = 'deleted'
281         WHERE bib_id IN (
282             SELECT id
283             FROM biblio.record_entry
284             WHERE deleted
285         )
286         AND to_import
287         AND NOT imported
288     });
289     $sth1->execute();
290     my $ct = $sth1->rows;
291     report_progress("Filtering out $ct records that are currently deleted");
292
293     my $sth2 = $dbh->prepare(qq{
294         UPDATE $schema.$batch
295         SET to_import = FALSE,
296             skip_reason = 'edited after cutoff of $cutoff'
297         WHERE bib_id IN (
298             SELECT id
299             FROM biblio.record_entry
300             WHERE edit_date >= ?
301         )
302         AND to_import
303         AND NOT imported
304     });
305     $sth2->execute($cutoff);
306     $ct = $sth2->rows;
307     report_progress("Filtering out $ct records edited after cutoff date of $cutoff");
308
309     my $sth3 = $dbh->prepare(qq{
310         UPDATE $schema.$batch
311         SET to_import = FALSE,
312             skip_reason = 'XML is not well-formed'
313         WHERE NOT xml_is_well_formed(marc)
314         AND to_import
315         AND NOT imported
316     });
317     $sth3->execute();
318     $ct = $sth3->rows;
319     report_progress("Filtering out $ct records whose XML is not well-formed");
320 }
321
322 sub handle_load_bibs {
323     my $dbh = shift;
324     my $schema = shift;
325     my $batch = shift;
326     my $wait = shift;
327
328     my $getct = $dbh->prepare(qq{
329         SELECT COUNT(*)
330         FROM  $schema.$batch
331         WHERE to_import
332         AND NOT imported
333     });
334     $getct->execute();
335     my $max = $getct->fetchrow_arrayref()->[0];
336
337     report_progress('Number of bibs to update', $max);
338     for (my $i = 1; $i <= $max; $i++) {
339         report_progress('... bibs updated', $i) if 0 == $i % 10 or $i == $max;
340         $dbh->begin_work;
341         $dbh->do(qq{
342             UPDATE biblio.record_entry a
343             SET marc = b.marc
344             FROM $schema.$batch b
345             WHERE a.id = b.bib_id
346             AND bib_id IN (
347                 SELECT bib_id
348                 FROM $schema.$batch
349                 WHERE to_import
350                 AND NOT imported
351                 ORDER BY id
352                 LIMIT 1
353             )
354         });
355         $dbh->do(qq{
356             UPDATE $schema.$batch
357             SET imported = TRUE
358             WHERE bib_id IN (
359                 SELECT bib_id
360                 FROM $schema.$batch
361                 WHERE to_import
362                 AND NOT imported
363                 ORDER BY id
364                 LIMIT 1
365             )
366         });
367         $dbh->commit;
368         sleep $wait;
369     }
370 }
371
372 sub handle_stage_auths {
373     my $dbh = shift;
374     my $schema = shift;
375     my $batch = shift;
376
377     $dbh->do(qq{
378         DROP TABLE IF EXISTS $schema.auths_$batch;
379     });
380     $dbh->do(qq{
381         CREATE TABLE $schema.auths_$batch (
382             id          SERIAL,
383             marc        TEXT,
384             auth_id     BIGINT,
385             new_auth_id BIGINT,
386             existing_heading TEXT,
387             lccn        TEXT,
388             cancelled_lccn TEXT,
389             cancelled_auth_id BIGINT,
390             heading     TEXT,
391             lccn_matched BOOLEAN DEFAULT FALSE,
392             heading_matched BOOLEAN DEFAULT FALSE,
393             imported    BOOLEAN DEFAULT FALSE,
394             to_import   BOOLEAN DEFAULT TRUE,
395             skip_reason TEXT
396         )
397     });
398
399     local $/ = "\035";
400     my $i = 0;
401     binmode STDIN, ':utf8';
402     my $ins = $dbh->prepare(qq{
403         INSERT INTO $schema.auths_$batch (marc, auth_id, lccn, cancelled_lccn, heading)
404         VALUES (?, ?, ?, ?, authority.normalize_heading(?))
405     });
406     $dbh->begin_work;
407     while (<>) {
408         $i++;
409         if (0 == $i % 100) {
410             report_progress("Records staged", $i);
411             $dbh->commit;
412             $dbh->begin_work;
413         }
414         my $marc = MARC::Record->new_from_usmarc($_);
415         my $authid = $marc->subfield('901', 'c');
416         if (defined($authid) && $authid !~ /^\d+$/) {
417             undef $authid;
418         }
419         my $lccn = $marc->subfield('010', 'a');
420         if (defined $lccn) {
421             $lccn =~ s/^\s+//;
422             $lccn =~ s/\s+$//;
423             $lccn =~ s/\s+/ /g;
424         }
425         my $cancelled_lccn = $marc->subfield('010', 'z');
426         if (defined $cancelled_lccn) {
427             $cancelled_lccn =~ s/^\s+//;
428             $cancelled_lccn =~ s/\s+$//;
429             $cancelled_lccn =~ s/\s+/ /g;
430         }
431         my $xml = OpenILS::Application::AppUtils->entityize($marc->as_xml_record());
432         $ins->execute($xml, $authid, $lccn, $cancelled_lccn, $xml);
433     }
434     $dbh->commit;
435     report_progress("Records staged", $i) if 0 != $i % 100;
436     $dbh->do(qq/
437         CREATE INDEX auths_${batch}_auth_id_idx ON
438             $schema.auths_$batch (auth_id);
439     /);
440     $dbh->do(qq/
441         CREATE INDEX auths_${batch}_id_idx ON
442             $schema.auths_$batch (id);
443     /);
444     $dbh->do(qq/
445         CREATE INDEX auths_${batch}_lccn_idx ON
446             $schema.auths_$batch (lccn);
447     /);
448 }
449
450 sub handle_match_auths {
451     my ($dbh, $schema, $batch) = @_;
452
453     my $sth = $dbh->prepare(qq{
454         UPDATE $schema.auths_${batch} a
455         SET auth_id = b.record,
456             lccn_matched = TRUE,
457             existing_heading = authority.normalize_heading(c.marc)
458         FROM authority.full_rec b
459         JOIN authority.record_entry c ON (b.record = c.id)
460         WHERE tag = '010'
461         AND   subfield = 'a'
462         AND   value = lccn
463         AND   auth_id IS NULL
464         AND   lccn IS NOT NULL;
465     });
466     $sth->execute();
467     my $ct = $sth->rows;
468     report_progress("Matched $ct authorities on LCCN");
469
470     $sth = $dbh->prepare(qq{
471         UPDATE $schema.auths_${batch} a
472         SET cancelled_auth_id = b.record
473         FROM authority.full_rec b
474         WHERE tag = '010'
475         AND   subfield = 'a'
476         AND   value = cancelled_lccn
477         AND   auth_id IS NULL
478         AND   cancelled_lccn IS NOT NULL;
479     });
480     $sth->execute();
481     $ct = $sth->rows;
482     report_progress("Matched $ct authorities on cancelled LCCN");
483
484     $sth = $dbh->prepare(qq{
485         UPDATE $schema.auths_$batch a
486         SET auth_id = b.id,
487             heading_matched = TRUE,
488             existing_heading = b.heading
489         FROM authority.record_entry b
490         WHERE a.heading = b.heading
491         AND   auth_id IS NULL;
492     });
493     $sth->execute();
494     $ct = $sth->rows;
495     report_progress("Matched $ct authorities on heading");
496 }
497
498 sub handle_load_new_auths {
499     my $dbh = shift;
500     my $schema = shift;
501     my $batch = shift;
502
503     my $getct = $dbh->prepare(qq{
504         SELECT COUNT(*)
505         FROM  $schema.auths_$batch
506         WHERE to_import
507         AND NOT imported
508         AND new_auth_id IS NULL
509         AND auth_id IS NULL
510         AND cancelled_auth_id IS NULL
511     });
512     $getct->execute();
513     my $max = $getct->fetchrow_arrayref()->[0];
514
515     report_progress('Number of authorities to add', $max);
516     for (my $i = 1; $i <= $max; $i++) {
517         report_progress('... authorities added', $i) if 0 == $i % 10 or $i == $max;
518         $dbh->begin_work;
519         $dbh->do(qq{
520             INSERT INTO authority.record_entry (marc, last_xact_id)
521             SELECT marc, ? || '-' || id
522             FROM $schema.auths_$batch b
523             WHERE id IN (
524                 SELECT id
525                 FROM $schema.auths_$batch
526                 WHERE to_import
527                 AND NOT imported
528                 AND new_auth_id IS NULL
529                 AND auth_id IS NULL
530                 AND cancelled_auth_id IS NULL
531                 ORDER BY id
532                 LIMIT 1
533             )
534         }, {}, "auths_$batch");
535         $dbh->do(qq{
536             UPDATE $schema.auths_$batch
537             SET imported = TRUE,
538                 new_auth_id = CURRVAL('authority.record_entry_id_seq')
539             WHERE id IN (
540                 SELECT id
541                 FROM $schema.auths_$batch
542                 WHERE to_import
543                 AND NOT imported
544                 AND new_auth_id IS NULL
545                 AND auth_id IS NULL
546                 AND cancelled_auth_id IS NULL
547                 ORDER BY id
548                 LIMIT 1
549             )
550         });
551         $dbh->commit;
552     }
553 }
554
555 sub handle_overlay_auths_stage1 {
556     my $dbh = shift;
557     my $schema = shift;
558     my $batch = shift;
559
560     my $getct = $dbh->prepare(qq{
561         SELECT COUNT(*)
562         FROM  $schema.auths_$batch
563         WHERE to_import
564         AND NOT imported
565         AND lccn_matched
566         AND heading = existing_heading
567     });
568     $getct->execute();
569     my $max = $getct->fetchrow_arrayref()->[0];
570     report_progress('Number of auths to update', $max);
571
572     $dbh->do(q{
573         UPDATE config.internal_flag SET enabled = TRUE
574         WHERE name = 'ingest.disable_authority_auto_update';
575     });
576     for (my $i = 1; $i <= $max; $i++) {
577         report_progress('... auths updated', $i) if 0 == $i % 10 or $i == $max;
578         $dbh->begin_work;
579         $dbh->do(qq{
580             UPDATE authority.record_entry a
581             SET marc = b.marc,
582                 edit_date = NOW()
583             FROM $schema.auths_$batch b
584             WHERE a.id = b.auth_id
585             AND auth_id IN (
586                 SELECT auth_id
587                 FROM $schema.auths_$batch
588                 WHERE to_import
589                 AND NOT imported
590                 AND lccn_matched
591                 AND heading = existing_heading
592                 ORDER BY id
593                 LIMIT 1
594             )
595         });
596         $dbh->do(qq{
597             UPDATE $schema.auths_$batch
598             SET imported = TRUE
599             WHERE auth_id IN (
600                 SELECT auth_id
601                 FROM $schema.auths_$batch
602                 WHERE to_import
603                 AND NOT imported
604                 AND lccn_matched
605                 AND heading = existing_heading
606                 ORDER BY id
607                 LIMIT 1
608             )
609         });
610         $dbh->commit;
611     }
612     $dbh->do(q{
613         UPDATE config.internal_flag SET enabled = FALSE
614         WHERE name = 'ingest.disable_authority_auto_update';
615     });
616 }
617
618 sub handle_overlay_auths_stage2 {
619     my $dbh = shift;
620     my $schema = shift;
621     my $batch = shift;
622
623     my $getct = $dbh->prepare(qq{
624         SELECT COUNT(*)
625         FROM  $schema.auths_$batch
626         WHERE to_import
627         AND NOT imported
628         AND lccn_matched
629         AND heading <> existing_heading
630     });
631     $getct->execute();
632     my $max = $getct->fetchrow_arrayref()->[0];
633     report_progress('Number of auths to update', $max);
634
635     for (my $i = 1; $i <= $max; $i++) {
636         report_progress('... auths updated', $i) if 0 == $i % 10 or $i == $max;
637         $dbh->begin_work;
638         $dbh->do(qq{
639             UPDATE authority.record_entry a
640             SET marc = b.marc,
641                 edit_date = NOW()
642             FROM $schema.auths_$batch b
643             WHERE a.id = b.auth_id
644             AND auth_id IN (
645                 SELECT auth_id
646                 FROM $schema.auths_$batch
647                 WHERE to_import
648                 AND NOT imported
649                 AND lccn_matched
650                 AND heading <> existing_heading
651                 ORDER BY id
652                 LIMIT 1
653             )
654         });
655         $dbh->do(qq{
656             UPDATE $schema.auths_$batch
657             SET imported = TRUE
658             WHERE auth_id IN (
659                 SELECT auth_id
660                 FROM $schema.auths_$batch
661                 WHERE to_import
662                 AND NOT imported
663                 AND lccn_matched
664                 AND heading <> existing_heading
665                 ORDER BY id
666                 LIMIT 1
667             )
668         });
669         $dbh->commit;
670     }
671 }
672
673 sub handle_overlay_auths_stage3 {
674     my $dbh = shift;
675     my $schema = shift;
676     my $batch = shift;
677
678     my $getct = $dbh->prepare(qq{
679         SELECT COUNT(*)
680         FROM  $schema.auths_$batch
681         WHERE to_import
682         AND NOT imported
683         AND (
684             auth_id IS NULL OR
685             auth_id = cancelled_auth_id
686         )
687         AND cancelled_auth_id IS NOT NULL
688     });
689     $getct->execute();
690     my $max = $getct->fetchrow_arrayref()->[0];
691     report_progress('Number of auths to update', $max);
692
693     for (my $i = 1; $i <= $max; $i++) {
694         report_progress('... auths updated', $i) if 0 == $i % 10 or $i == $max;
695         $dbh->begin_work;
696         $dbh->do(qq{
697             UPDATE authority.record_entry a
698             SET marc = b.marc,
699                 edit_date = NOW()
700             FROM $schema.auths_$batch b
701             WHERE a.id = b.cancelled_auth_id
702             AND cancelled_auth_id IN (
703                 SELECT auth_id
704                 FROM $schema.auths_$batch
705                 WHERE to_import
706                 AND NOT imported
707                 AND (
708                     auth_id IS NULL OR
709                     auth_id = cancelled_auth_id
710                 )
711                 AND cancelled_auth_id IS NOT NULL
712                 ORDER BY id
713                 LIMIT 1
714             )
715         });
716         $dbh->do(qq{
717             UPDATE $schema.auths_$batch
718             SET imported = TRUE
719             WHERE cancelled_auth_id IN (
720                 SELECT cancelled_auth_id
721                 FROM $schema.auths_$batch
722                 WHERE to_import
723                 AND NOT imported
724                 AND (
725                     auth_id IS NULL OR
726                     auth_id = cancelled_auth_id
727                 )
728                 AND cancelled_auth_id IS NOT NULL
729                 ORDER BY id
730                 LIMIT 1
731             )
732         });
733         $dbh->commit;
734     }
735 }
736
737 sub handle_link_auth_auth {
738     my $dbh = shift;
739     my $schema = shift;
740     my $batch = shift;
741
742     $dbh->do(q{
743         UPDATE config.internal_flag SET enabled = TRUE
744         WHERE name = 'ingest.disable_authority_auto_update';
745     });
746
747     my $sth = $dbh->prepare(qq{
748         SELECT COALESCE(new_auth_id, auth_id, cancelled_auth_id) AS id
749         FROM $schema.auths_$batch
750         WHERE imported
751         ORDER BY 1
752     });
753     $sth->execute();
754     my @ids = map { $_->{id} } @{ $sth->fetchall_arrayref({}) };
755     my $i = 0;
756     report_progress(scalar(@ids) . " records to do auth-auth linking");
757     foreach my $id (@ids) {
758         $i++;
759         report_progress('... auth-auth linkings processed', $i) if 0 == $i % 10 or $i == scalar(@ids);
760         system "/openils/bin/authority_authority_linker.pl -r $id -c /openils/conf/opensrf_core.xml";
761     }
762
763     $dbh->do(q{
764         UPDATE config.internal_flag SET enabled = FALSE
765         WHERE name = 'ingest.disable_authority_auto_update';
766     });
767 }