--force-utf8 in leader for marc_cleanup
[migration-tools.git] / eg_staged_bib_overlay
1 #!/usr/bin/perl
2
3 # Copyright (c) 2016 Equinox Software, Inc.
4 # Author: Galen Charlton <gmc@esilibrary.com>
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2, or (at your option)
9 # any later version.
10 #
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 # GNU General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program.  If not, see <http://www.gnu.org/licenses/>
18
19 use strict;
20 use warnings;
21
22 use Getopt::Long;
23 use MARC::Record;
24 use MARC::File::XML (BinaryEncoding => 'utf8');
25 use DBI;
26 use OpenILS::Application::AppUtils;
27
28 my $action;
29 my $schema = 'bib_loads';
30 my $db;
31 my $dbuser;
32 my $dbpw;
33 my $dbhost;
34 my $batch;
35 my $cutoff;
36 my $wait = 0;
37 my $output;
38 my $link_skipped;
39
40 my $ret = GetOptions(
41     'action:s'      => \$action,
42     'schema:s'      => \$schema,
43     'db:s'          => \$db,
44     'dbuser:s'      => \$dbuser,
45     'dbhost:s'      => \$dbhost,
46     'dbpw:s'        => \$dbpw,
47     'batch:s'       => \$batch,
48     'cutoff:s'      => \$cutoff,
49     'wait:i'        => \$wait,
50     'output:s'      => \$output,
51     'link-skipped'  => \$link_skipped,
52 );
53
54 abort('must specify --action') unless defined $action;
55 abort('must specify --schema') unless defined $schema;
56 abort('must specify --db') unless defined $db;
57 abort('must specify --dbuser') unless defined $dbuser;
58 abort('must specify --dbhost') unless defined $dbhost;
59 abort('must specify --dbpw') unless defined $dbpw;
60 abort('must specify --batch') unless defined $batch;
61
62 abort(q{--action must be "stage_bibs", "filter_bibs", "load_bibs", "stage_auths",
63 "match_auths", "load_new_auths", "overlay_auths_stage1",
64 "overlay_auths_stage2", "overlay_auths_stage3", "link_auth_auth",
65 "link_auth_bib", "export_skipped_bibs", or "export_skipped_auths"}) unless
66     $action eq 'filter_bibs' or
67     $action eq 'stage_bibs' or
68     $action eq 'load_bibs' or
69     $action eq 'stage_auths' or
70     $action eq 'match_auths' or
71     $action eq 'load_new_auths' or
72     $action eq 'overlay_auths_stage1' or
73     $action eq 'overlay_auths_stage2' or
74     $action eq 'overlay_auths_stage3' or
75     $action eq 'link_auth_auth' or
76     $action eq 'link_auth_bib' or
77     $action eq 'export_skipped_bibs' or
78     $action eq 'export_skipped_auths'
79 ;
80
81 my $dbh = connect_db($db, $dbuser, $dbpw, $dbhost);
82
83 if ($action eq 'stage_bibs') {
84     abort('must specify at least one input file') unless @ARGV;
85     handle_stage_bibs($dbh, $schema, $batch);
86 }
87
88 if ($action eq 'filter_bibs') {
89     abort('must specify cutoff date when filtering') unless defined $cutoff;
90     handle_filter_bibs($dbh, $schema, $batch, $cutoff);
91 }
92
93 if ($action eq 'load_bibs' ) {
94     handle_load_bibs($dbh, $schema, $batch, $wait);
95 }
96
97 if ($action eq 'stage_auths') {
98     abort('must specify at least one input file') unless @ARGV;
99     handle_stage_auths($dbh, $schema, $batch);
100 }
101
102 if ($action eq 'match_auths') {
103     handle_match_auths($dbh, $schema, $batch);
104 }
105
106 if ($action eq 'load_new_auths') {
107     handle_load_new_auths($dbh, $schema, $batch);
108 }
109
110 if ($action eq 'overlay_auths_stage1') {
111     handle_overlay_auths_stage1($dbh, $schema, $batch);
112 }
113 if ($action eq 'overlay_auths_stage2') {
114     handle_overlay_auths_stage2($dbh, $schema, $batch);
115 }
116 if ($action eq 'overlay_auths_stage3') {
117     handle_overlay_auths_stage3($dbh, $schema, $batch);
118 }
119
120 if ($action eq 'link_auth_auth') {
121     handle_link_auth_auth($dbh, $schema, $batch);
122 }
123 if ($action eq 'link_auth_bib') {
124     handle_link_auth_bib($dbh, $schema, $batch, $link_skipped);
125 }
126
127 if ($action eq 'export_skipped_bibs') {
128     abort('must specify output file') unless defined $output;
129     handle_export_skipped_bibs($dbh, $schema, $batch, $output);
130 }
131 if ($action eq 'export_skipped_auths') {
132     abort('must specify output file') unless defined $output;
133     handle_export_skipped_auths($dbh, $schema, $batch, $output);
134 }
135
136 sub abort {
137     my $msg = shift;
138     print STDERR "$0: $msg", "\n";
139     print_usage();
140     exit 1;
141 }
142
143 sub print_usage {
144     print <<_USAGE_;
145
146 Utility to stage and overlay bib records in an Evergreen database. This
147 expects that the incoming records will have been previously exported
148 from that Evergreen database and modified in some fashion (e.g., for
149 authority record processing) and that the bib ID can be found in the
150 901\$c subfield.
151
152 This program has several modes controlled by the --action switch:
153
154   --action stage_bibs  - load MARC bib records into a staging table
155   --action filter_bibs - mark previously staged bibs that should
156                          be excluded from a subsequent load, either
157                          because the target bib is deleted in Evergreen
158                          or the record was modified after a date
159                          specified by the --cutoff switch
160   --action load_bibs   - overlay bib records using a previously staged
161                          batch, one at a time. After each bib, it will
162                          wait the number of seconds specified by the
163                          --wait switch.
164
165   --action stage_auths          - load MARC authorities into staging
166                                   table
167   --action match_auths          - identify matches with authority
168                                   records already present in the
169                                   Evergreen database; matching is
170                                   based on LCCN, cancelled LCCN, and
171                                   main heading.
172   --action load_new_auths       - load new (unmatched) authorities
173   --action overlay_auths_stage1 - overlay based on LCCN where
174                                   heading has NOT changed; this step
175                                   disables propagation to bib records
176   --action overlay_auths_stage2 - overlay based on LCCN where heading
177                                   HAS changed; propagates changes
178                                   to bib records
179   --action overlay_auths_stage3 - overlay for records where a cancelled
180                                   LCCN is replaced with a new one
181   --action link_auth_auth       - run authority_authority_linker.pl for
182                                   the authorities that were overlaid
183                                   or added in this batch.
184   --action link_auth_bib        - run authority_control_fields.pl for
185                                   the bibs that were overlaid in this
186                                   batch.  Add --link-skipped to specify
187                                   that bibs that were matched but
188                                   skipped due to having be edited after
189                                   the cutoff should be linked (rather
190                                   than linking the imported bibs)
191   --action export_skipped_bibs  - export to ISO2709 file whose name is
192                                   specified by --output those bibs
193                                   that had been edited after the cutoff.
194   --action export_skipped_auths - export to ISO2709 file whose name is
195                                   specified by --output those authorities
196                                   that could not be definitively
197                                   handled as updates or adds.
198
199 Several switches are used regardless of the specified action:
200
201   --schema  - Pg schema in which staging table will live; should be
202               created beforehand
203   --batch   - name of bib batch; will also be used as the name
204               of the staging tables
205   --db      - database name
206   --dbuser  - database user
207   --dbpw    - database password
208   --dbhost  - database host
209
210 Examples:
211
212 $0 --schema bib_load --batch bibs_2016_01 --db evergreen \\
213    --dbuser evergreen --dbpw evergreen --dbhost localhost \\
214    --action stage_bibs -- file1.mrc file2.mrc [...]
215
216 $0 --schema bib_load --batch bibs_2016_01 --db evergreen \\
217    --dbuser evergreen --dbpw evergreen --dbhost localhost \\
218    --action filter_bibs --cutoff 2016-01-02
219
220 $0 --schema bib_load --batch bibs_2016_01 --db evergreen \\
221    --dbuser evergreen --dbpw evergreen --dbhost localhost \\
222    --action load_bibs --wait 2
223
224 _USAGE_
225 }
226
227
228 sub report_progress {
229     my ($msg, $counter) = @_;
230     if (defined $counter) {
231         print STDERR "$msg: $counter\n";
232     } else {
233         print STDERR "$msg\n";
234     }
235 }
236
237 sub connect_db {
238     my ($db, $dbuser, $dbpw, $dbhost) = @_;
239
240     my $dsn = "dbi:Pg:host=$dbhost;dbname=$db;port=5432";
241
242     my $attrs = {
243         ShowErrorStatement => 1,
244         RaiseError => 1,
245         PrintError => 1,
246         pg_enable_utf8 => 1,
247     };
248     my $dbh = DBI->connect($dsn, $dbuser, $dbpw, $attrs);
249
250     return $dbh;
251 }
252
253 sub handle_stage_bibs {
254     my $dbh = shift;
255     my $schema = shift;
256     my $batch = shift;
257
258     $dbh->do(qq{
259         DROP TABLE IF EXISTS $schema.$batch;
260     });
261     $dbh->do(qq{
262         CREATE TABLE $schema.$batch (
263             id          SERIAL,
264             marc        TEXT,
265             bib_id      BIGINT,
266             imported    BOOLEAN DEFAULT FALSE,
267             to_import   BOOLEAN DEFAULT TRUE,
268             skip_reason TEXT
269         )
270     });
271
272     local $/ = "\035";
273     my $i = 0;
274     binmode STDIN, ':utf8';
275     my $ins = $dbh->prepare("INSERT INTO $schema.$batch (marc, bib_id) VALUES (?, ?)");
276     $dbh->begin_work;
277     while (<>) {
278         $i++;
279         if (0 == $i % 100) {
280             report_progress("Records staged", $i);
281             $dbh->commit;
282             $dbh->begin_work;
283         }
284         eval {
285             my $marc = MARC::Record->new_from_usmarc($_);
286             my $bibid = $marc->subfield('901', 'c');
287             if ($bibid !~ /^\d+$/) {
288                 die('Subfield 901$c is not numeric or missing.');
289             }
290             my $xml = OpenILS::Application::AppUtils->entityize($marc->as_xml_record());
291             $ins->execute($xml, $bibid);
292         };
293         if ($@) {
294             warn("Record $i is bad: $@; skipping.");
295             next;
296         }
297     }
298     $dbh->commit;
299     report_progress("Records staged", $i) if 0 != $i % 100;
300     $dbh->do(qq/
301         CREATE INDEX ${batch}_bib_id_idx ON
302             $schema.$batch (bib_id);
303     /);
304     $dbh->do(qq/
305         CREATE INDEX ${batch}_id_idx ON
306             $schema.$batch (id);
307     /);
308 }
309
310 sub handle_filter_bibs {
311     my $dbh = shift;
312     my $schema = shift;
313     my $batch = shift;
314     my $cutoff = shift;
315
316     my $sth1 = $dbh->prepare(qq{
317         UPDATE $schema.$batch
318         SET to_import = FALSE,
319             skip_reason = 'deleted'
320         WHERE bib_id IN (
321             SELECT id
322             FROM biblio.record_entry
323             WHERE deleted
324         )
325         AND to_import
326         AND NOT imported
327     });
328     $sth1->execute();
329     my $ct = $sth1->rows;
330     report_progress("Filtering out $ct records that are currently deleted");
331
332     my $sth2 = $dbh->prepare(qq{
333         UPDATE $schema.$batch
334         SET to_import = FALSE,
335             skip_reason = 'edited after cutoff of $cutoff'
336         WHERE bib_id IN (
337             SELECT id
338             FROM biblio.record_entry
339             WHERE edit_date >= ?
340         )
341         AND to_import
342         AND NOT imported
343     });
344     $sth2->execute($cutoff);
345     $ct = $sth2->rows;
346     report_progress("Filtering out $ct records edited after cutoff date of $cutoff");
347
348     my $sth3 = $dbh->prepare(qq{
349         UPDATE $schema.$batch
350         SET to_import = FALSE,
351             skip_reason = 'XML is not well-formed'
352         WHERE NOT xml_is_well_formed(marc)
353         AND to_import
354         AND NOT imported
355     });
356     $sth3->execute();
357     $ct = $sth3->rows;
358     report_progress("Filtering out $ct records whose XML is not well-formed");
359 }
360
361 sub handle_load_bibs {
362     my $dbh = shift;
363     my $schema = shift;
364     my $batch = shift;
365     my $wait = shift;
366
367     my $getct = $dbh->prepare(qq{
368         SELECT COUNT(*)
369         FROM  $schema.$batch
370         WHERE to_import
371         AND NOT imported
372     });
373     $getct->execute();
374     my $max = $getct->fetchrow_arrayref()->[0];
375
376     report_progress('Number of bibs to update', $max);
377     for (my $i = 1; $i <= $max; $i++) {
378         report_progress('... bibs updated', $i) if 0 == $i % 10 or $i == $max;
379         $dbh->begin_work;
380         $dbh->do(qq{
381             UPDATE biblio.record_entry a
382             SET marc = b.marc
383             FROM $schema.$batch b
384             WHERE a.id = b.bib_id
385             AND bib_id IN (
386                 SELECT bib_id
387                 FROM $schema.$batch
388                 WHERE to_import
389                 AND NOT imported
390                 ORDER BY id
391                 LIMIT 1
392             )
393         });
394         $dbh->do(qq{
395             UPDATE $schema.$batch
396             SET imported = TRUE
397             WHERE bib_id IN (
398                 SELECT bib_id
399                 FROM $schema.$batch
400                 WHERE to_import
401                 AND NOT imported
402                 ORDER BY id
403                 LIMIT 1
404             )
405         });
406         $dbh->commit;
407         sleep $wait if ($wait);
408     }
409 }
410
411 sub handle_stage_auths {
412     my $dbh = shift;
413     my $schema = shift;
414     my $batch = shift;
415
416     $dbh->do(qq{
417         DROP TABLE IF EXISTS $schema.auths_$batch;
418     });
419     $dbh->do(qq{
420         CREATE TABLE $schema.auths_$batch (
421             id          SERIAL,
422             marc        TEXT,
423             auth_id     BIGINT,
424             new_auth_id BIGINT,
425             existing_heading TEXT,
426             lccn        TEXT,
427             cancelled_lccn TEXT,
428             cancelled_auth_id BIGINT,
429             heading     TEXT,
430             lccn_matched BOOLEAN DEFAULT FALSE,
431             heading_matched BOOLEAN DEFAULT FALSE,
432             imported    BOOLEAN DEFAULT FALSE,
433             to_import   BOOLEAN DEFAULT TRUE,
434             skip_reason TEXT
435         )
436     });
437
438     local $/ = "\035";
439     my $i = 0;
440     binmode STDIN, ':utf8';
441     my $ins = $dbh->prepare(qq{
442         INSERT INTO $schema.auths_$batch (marc, auth_id, lccn, cancelled_lccn, heading)
443         VALUES (?, ?, ?, ?, authority.normalize_heading(?))
444     });
445     $dbh->begin_work;
446     while (<>) {
447         $i++;
448         if (0 == $i % 100) {
449             report_progress("Records staged", $i);
450             $dbh->commit;
451             $dbh->begin_work;
452         }
453         eval {
454             my $marc = MARC::Record->new_from_usmarc($_);
455             my $authid = $marc->subfield('901', 'c');
456             if (defined($authid) && $authid !~ /^\d+$/) {
457                 undef $authid;
458             }
459             my $lccn = $marc->subfield('010', 'a');
460             if (defined $lccn) {
461                 $lccn =~ s/^\s+//;
462                 $lccn =~ s/\s+$//;
463                 $lccn =~ s/\s+/ /g;
464             }
465             my $cancelled_lccn = $marc->subfield('010', 'z');
466             if (defined $cancelled_lccn) {
467                 $cancelled_lccn =~ s/^\s+//;
468                 $cancelled_lccn =~ s/\s+$//;
469                 $cancelled_lccn =~ s/\s+/ /g;
470             }
471             my $xml = OpenILS::Application::AppUtils->entityize($marc->as_xml_record());
472             $ins->execute($xml, $authid, $lccn, $cancelled_lccn, $xml);
473         };
474         if ($@) {
475             warn("Record $i is bad: $@; skipping.");
476             next;
477         }
478     }
479     $dbh->commit;
480     report_progress("Records staged", $i) if 0 != $i % 100;
481     $dbh->do(qq/
482         CREATE INDEX auths_${batch}_auth_id_idx ON
483             $schema.auths_$batch (auth_id);
484     /);
485     $dbh->do(qq/
486         CREATE INDEX auths_${batch}_id_idx ON
487             $schema.auths_$batch (id);
488     /);
489     $dbh->do(qq/
490         CREATE INDEX auths_${batch}_lccn_idx ON
491             $schema.auths_$batch (lccn);
492     /);
493 }
494
495 sub handle_match_auths {
496     my ($dbh, $schema, $batch) = @_;
497
498     my $sth = $dbh->prepare(qq{
499         UPDATE $schema.auths_${batch} a
500         SET auth_id = b.record,
501             lccn_matched = TRUE,
502             existing_heading = authority.normalize_heading(c.marc)
503         FROM authority.full_rec b
504         JOIN authority.record_entry c ON (b.record = c.id)
505         WHERE tag = '010'
506         AND   subfield = 'a'
507         AND   value = lccn
508         AND   auth_id IS NULL
509         AND   lccn IS NOT NULL;
510     });
511     $sth->execute();
512     my $ct = $sth->rows;
513     report_progress("Matched $ct authorities on LCCN");
514
515     $sth = $dbh->prepare(qq{
516         UPDATE $schema.auths_${batch} a
517         SET cancelled_auth_id = b.record
518         FROM authority.full_rec b
519         WHERE tag = '010'
520         AND   subfield = 'a'
521         AND   value = cancelled_lccn
522         AND   auth_id IS NULL
523         AND   cancelled_lccn IS NOT NULL;
524     });
525     $sth->execute();
526     $ct = $sth->rows;
527     report_progress("Matched $ct authorities on cancelled LCCN");
528
529     $sth = $dbh->prepare(qq{
530         UPDATE $schema.auths_$batch a
531         SET auth_id = b.id,
532             heading_matched = TRUE,
533             existing_heading = b.heading
534         FROM authority.record_entry b
535         WHERE a.heading = b.heading
536         AND   auth_id IS NULL;
537     });
538     $sth->execute();
539     $ct = $sth->rows;
540     report_progress("Matched $ct authorities on heading");
541 }
542
543 sub handle_load_new_auths {
544     my $dbh = shift;
545     my $schema = shift;
546     my $batch = shift;
547
548     my $getct = $dbh->prepare(qq{
549         SELECT COUNT(*)
550         FROM  $schema.auths_$batch
551         WHERE to_import
552         AND NOT imported
553         AND new_auth_id IS NULL
554         AND auth_id IS NULL
555         AND cancelled_auth_id IS NULL
556     });
557     $getct->execute();
558     my $max = $getct->fetchrow_arrayref()->[0];
559
560     report_progress('Number of authorities to add', $max);
561     for (my $i = 1; $i <= $max; $i++) {
562         report_progress('... authorities added', $i) if 0 == $i % 10 or $i == $max;
563         $dbh->begin_work;
564         $dbh->do(qq{
565             INSERT INTO authority.record_entry (marc, last_xact_id)
566             SELECT marc, ? || '-' || id
567             FROM $schema.auths_$batch b
568             WHERE id IN (
569                 SELECT id
570                 FROM $schema.auths_$batch
571                 WHERE to_import
572                 AND NOT imported
573                 AND new_auth_id IS NULL
574                 AND auth_id IS NULL
575                 AND cancelled_auth_id IS NULL
576                 ORDER BY id
577                 LIMIT 1
578             )
579         }, {}, "auths_$batch");
580         $dbh->do(qq{
581             UPDATE $schema.auths_$batch
582             SET imported = TRUE,
583                 new_auth_id = CURRVAL('authority.record_entry_id_seq')
584             WHERE id IN (
585                 SELECT id
586                 FROM $schema.auths_$batch
587                 WHERE to_import
588                 AND NOT imported
589                 AND new_auth_id IS NULL
590                 AND auth_id IS NULL
591                 AND cancelled_auth_id IS NULL
592                 ORDER BY id
593                 LIMIT 1
594             )
595         });
596         $dbh->commit;
597     }
598 }
599
600 sub handle_overlay_auths_stage1 {
601     my $dbh = shift;
602     my $schema = shift;
603     my $batch = shift;
604
605     my $getct = $dbh->prepare(qq{
606         SELECT COUNT(*)
607         FROM  $schema.auths_$batch
608         WHERE to_import
609         AND NOT imported
610         AND lccn_matched
611         AND heading = existing_heading
612     });
613     $getct->execute();
614     my $max = $getct->fetchrow_arrayref()->[0];
615     report_progress('Number of auths to update', $max);
616
617     $dbh->do(q{
618         UPDATE config.internal_flag SET enabled = TRUE
619         WHERE name = 'ingest.disable_authority_auto_update';
620     });
621     for (my $i = 1; $i <= $max; $i++) {
622         report_progress('... auths updated', $i) if 0 == $i % 10 or $i == $max;
623         $dbh->begin_work;
624         $dbh->do(qq{
625             UPDATE authority.record_entry a
626             SET marc = b.marc,
627                 edit_date = NOW()
628             FROM $schema.auths_$batch b
629             WHERE a.id = b.auth_id
630             AND auth_id IN (
631                 SELECT auth_id
632                 FROM $schema.auths_$batch
633                 WHERE to_import
634                 AND NOT imported
635                 AND lccn_matched
636                 AND heading = existing_heading
637                 ORDER BY id
638                 LIMIT 1
639             )
640         });
641         $dbh->do(qq{
642             UPDATE $schema.auths_$batch
643             SET imported = TRUE
644             WHERE auth_id IN (
645                 SELECT auth_id
646                 FROM $schema.auths_$batch
647                 WHERE to_import
648                 AND NOT imported
649                 AND lccn_matched
650                 AND heading = existing_heading
651                 ORDER BY id
652                 LIMIT 1
653             )
654         });
655         $dbh->commit;
656     }
657     $dbh->do(q{
658         UPDATE config.internal_flag SET enabled = FALSE
659         WHERE name = 'ingest.disable_authority_auto_update';
660     });
661 }
662
663 sub handle_overlay_auths_stage2 {
664     my $dbh = shift;
665     my $schema = shift;
666     my $batch = shift;
667
668     my $getct = $dbh->prepare(qq{
669         SELECT COUNT(*)
670         FROM  $schema.auths_$batch
671         WHERE to_import
672         AND NOT imported
673         AND lccn_matched
674         AND heading <> existing_heading
675     });
676     $getct->execute();
677     my $max = $getct->fetchrow_arrayref()->[0];
678     report_progress('Number of auths to update', $max);
679
680     for (my $i = 1; $i <= $max; $i++) {
681         report_progress('... auths updated', $i) if 0 == $i % 10 or $i == $max;
682         $dbh->begin_work;
683         $dbh->do(qq{
684             UPDATE authority.record_entry a
685             SET marc = b.marc,
686                 edit_date = NOW()
687             FROM $schema.auths_$batch b
688             WHERE a.id = b.auth_id
689             AND auth_id IN (
690                 SELECT auth_id
691                 FROM $schema.auths_$batch
692                 WHERE to_import
693                 AND NOT imported
694                 AND lccn_matched
695                 AND heading <> existing_heading
696                 ORDER BY id
697                 LIMIT 1
698             )
699         });
700         $dbh->do(qq{
701             UPDATE $schema.auths_$batch
702             SET imported = TRUE
703             WHERE auth_id IN (
704                 SELECT auth_id
705                 FROM $schema.auths_$batch
706                 WHERE to_import
707                 AND NOT imported
708                 AND lccn_matched
709                 AND heading <> existing_heading
710                 ORDER BY id
711                 LIMIT 1
712             )
713         });
714         $dbh->commit;
715     }
716 }
717
718 sub handle_overlay_auths_stage3 {
719     my $dbh = shift;
720     my $schema = shift;
721     my $batch = shift;
722
723     my $getct = $dbh->prepare(qq{
724         SELECT COUNT(*)
725         FROM  $schema.auths_$batch
726         WHERE to_import
727         AND NOT imported
728         AND (
729             auth_id IS NULL OR
730             auth_id = cancelled_auth_id
731         )
732         AND cancelled_auth_id IS NOT NULL
733     });
734     $getct->execute();
735     my $max = $getct->fetchrow_arrayref()->[0];
736     report_progress('Number of auths to update', $max);
737
738     for (my $i = 1; $i <= $max; $i++) {
739         report_progress('... auths updated', $i) if 0 == $i % 10 or $i == $max;
740         $dbh->begin_work;
741         $dbh->do(qq{
742             UPDATE authority.record_entry a
743             SET marc = b.marc,
744                 edit_date = NOW()
745             FROM $schema.auths_$batch b
746             WHERE a.id = b.cancelled_auth_id
747             AND cancelled_auth_id IN (
748                 SELECT auth_id
749                 FROM $schema.auths_$batch
750                 WHERE to_import
751                 AND NOT imported
752                 AND (
753                     auth_id IS NULL OR
754                     auth_id = cancelled_auth_id
755                 )
756                 AND cancelled_auth_id IS NOT NULL
757                 ORDER BY id
758                 LIMIT 1
759             )
760         });
761         $dbh->do(qq{
762             UPDATE $schema.auths_$batch
763             SET imported = TRUE
764             WHERE cancelled_auth_id IN (
765                 SELECT cancelled_auth_id
766                 FROM $schema.auths_$batch
767                 WHERE to_import
768                 AND NOT imported
769                 AND (
770                     auth_id IS NULL OR
771                     auth_id = cancelled_auth_id
772                 )
773                 AND cancelled_auth_id IS NOT NULL
774                 ORDER BY id
775                 LIMIT 1
776             )
777         });
778         $dbh->commit;
779     }
780 }
781
782 sub handle_link_auth_auth {
783     my $dbh = shift;
784     my $schema = shift;
785     my $batch = shift;
786
787     $dbh->do(q{
788         UPDATE config.internal_flag SET enabled = TRUE
789         WHERE name = 'ingest.disable_authority_auto_update';
790     });
791
792     my $sth = $dbh->prepare(qq{
793         SELECT COALESCE(new_auth_id, auth_id, cancelled_auth_id) AS id
794         FROM $schema.auths_$batch
795         WHERE imported
796         ORDER BY 1
797     });
798     $sth->execute();
799     my @ids = map { $_->{id} } @{ $sth->fetchall_arrayref({}) };
800     my $i = 0;
801     report_progress(scalar(@ids) . " records to do auth-auth linking");
802     foreach my $id (@ids) {
803         $i++;
804         report_progress('... auth-auth linkings processed', $i) if 0 == $i % 10 or $i == scalar(@ids);
805         system "/openils/bin/authority_authority_linker.pl -r $id -c /openils/conf/opensrf_core.xml";
806     }
807
808     $dbh->do(q{
809         UPDATE config.internal_flag SET enabled = FALSE
810         WHERE name = 'ingest.disable_authority_auto_update';
811     });
812 }
813
814 sub handle_link_auth_bib {
815     my $dbh = shift;
816     my $schema = shift;
817     my $batch = shift;
818     my $link_skipped = shift;
819
820     my $query;
821     if ($link_skipped) {
822         $query = qq{
823             SELECT bib_id AS id
824             FROM $schema.$batch
825             WHERE NOT imported
826             AND skip_reason ~ '^edit'
827             ORDER BY 1
828         };
829     } else {
830         $query = qq{
831             SELECT bib_id AS id
832             FROM $schema.$batch
833             WHERE imported
834             ORDER BY 1
835         };
836     }
837
838     my $sth = $dbh->prepare($query);
839     $sth->execute();
840     my @ids = map { $_->{id} } @{ $sth->fetchall_arrayref({}) };
841     my $i = 0;
842     report_progress(scalar(@ids) . " records to do auth-bib linking");
843     foreach my $id (@ids) {
844         $i++;
845         report_progress('... auth-bib linkings processed', $i) if 0 == $i % 10 or $i == scalar(@ids);
846         system "/openils/bin/authority_control_fields.pl --record $id -c /openils/conf/opensrf_core.xml";
847     }
848
849 }
850
851 sub handle_export_skipped_bibs {
852     my $dbh = shift;
853     my $schema = shift;
854     my $batch = shift;
855     my $output = shift;
856
857     my $outfh;
858     open($outfh, '>', $output) or die("Could not open input file $output: $!\n");
859     binmode $outfh, ':utf8';
860
861     my $sth = $dbh->prepare(qq{
862         SELECT marc
863         FROM $schema.$batch
864         WHERE skip_reason ~ '^edit'
865         ORDER BY id
866     });
867     $sth->execute();
868    
869     while (my $row  = $sth->fetchrow_hashref()) {
870         my $marc = MARC::Record->new_from_xml($row->{marc});
871         print $outfh $marc->as_usmarc();
872     }
873     $outfh->close();
874 }
875
876 sub handle_export_skipped_auths {
877     my $dbh = shift;
878     my $schema = shift;
879     my $batch = shift;
880     my $output = shift;
881
882     my $outfh;
883     open($outfh, '>', $output) or die("Could not open input file $output: $!\n");
884     binmode $outfh, ':utf8';
885
886     my $sth = $dbh->prepare(qq{
887         SELECT marc
888         FROM $schema.auths_$batch
889         WHERE NOT imported
890         ORDER BY id
891     });
892     $sth->execute();
893    
894     while (my $row  = $sth->fetchrow_hashref()) {
895         my $marc = MARC::Record->new_from_xml($row->{marc});
896         print $outfh $marc->as_usmarc();
897     }
898     $outfh->close();
899 }