#!/usr/bin/perl # Copyright (c) 2016 Equinox Software, Inc. # Author: Galen Charlton # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2, or (at your option) # any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see use strict; use warnings; use Getopt::Long; use MARC::Record; use MARC::File::XML (BinaryEncoding => 'utf8'); use DBI; use OpenILS::Application::AppUtils; my $action; my $schema = 'bib_loads'; my $db; my $dbuser; my $dbpw; my $dbhost; my $batch; my $cutoff; my $wait = 1; my $ret = GetOptions( 'action:s' => \$action, 'schema:s' => \$schema, 'db:s' => \$db, 'dbuser:s' => \$dbuser, 'dbhost:s' => \$dbhost, 'dbpw:s' => \$dbpw, 'batch:s' => \$batch, 'cutoff:s' => \$cutoff, 'wait:i' => \$wait, ); abort('must specify --action') unless defined $action; abort('must specify --schema') unless defined $schema; abort('must specify --db') unless defined $db; abort('must specify --dbuser') unless defined $dbuser; abort('must specify --dbhost') unless defined $dbhost; abort('must specify --dbpw') unless defined $dbpw; abort('must specify --batch') unless defined $batch; abort(q{--action must be "stage_bibs", "filter_bibs", "load_bibs", "stage_auths", "match_auths", "load_new_auths", "overlay_auths_stage1", "overlay_auths_stage2", "overlay_auths_stage3", "link_auth_auth"}) unless $action eq 'filter_bibs' or $action eq 'stage_bibs' or $action eq 'load_bibs' or $action eq 'stage_auths' or $action eq 'match_auths' or $action eq 'load_new_auths' or $action eq 'overlay_auths_stage1' or $action eq 'overlay_auths_stage2' or $action eq 'overlay_auths_stage3' or $action eq 'link_auth_auth' or $action eq 'link_auth_bib' ; my $dbh = connect_db($db, $dbuser, $dbpw, $dbhost); if ($action eq 'stage_bibs') { abort('must specify at least one input file') unless @ARGV; handle_stage_bibs($dbh, $schema, $batch); } if ($action eq 'filter_bibs') { abort('must specify cutoff date when filtering') unless defined $cutoff; handle_filter_bibs($dbh, $schema, $batch, $cutoff); } if ($action eq 'load_bibs' ) { handle_load_bibs($dbh, $schema, $batch, $wait); } if ($action eq 'stage_auths') { abort('must specify at least one input file') unless @ARGV; handle_stage_auths($dbh, $schema, $batch); } if ($action eq 'match_auths') { handle_match_auths($dbh, $schema, $batch); } if ($action eq 'load_new_auths') { handle_load_new_auths($dbh, $schema, $batch); } if ($action eq 'overlay_auths_stage1') { handle_overlay_auths_stage1($dbh, $schema, $batch); } if ($action eq 'overlay_auths_stage2') { handle_overlay_auths_stage2($dbh, $schema, $batch); } if ($action eq 'overlay_auths_stage3') { handle_overlay_auths_stage3($dbh, $schema, $batch); } if ($action eq 'link_auth_auth') { handle_link_auth_auth($dbh, $schema, $batch); } if ($action eq 'link_auth_bib') { handle_link_auth_bib($dbh, $schema, $batch); } sub abort { my $msg = shift; print STDERR "$0: $msg", "\n"; print_usage(); exit 1; } sub print_usage { print <<_USAGE_; Utility to stage and overlay bib records in an Evergreen database. This expects that the incoming records will have been previously exported from that Evergreen database and modified in some fashion (e.g., for authority record processing) and that the bib ID can be found in the 901\$c subfield. This program has several modes controlled by the --action switch: --action stage_bibs - load MARC bib records into a staging table --action filter_bibs - mark previously staged bibs that should be excluded from a subsequent load, either because the target bib is deleted in Evergreen or the record was modified after a date specified by the --cutoff switch --action load_bibs - overlay bib records using a previously staged batch, one at a time. After each bib, it will wait the number of seconds specified by the --wait switch. --action stage_auths - load MARC authorities into staging table --action match_auths - identify matches with authority records already present in the Evergreen database; matching is based on LCCN, cancelled LCCN, and main heading. --action load_new_auths - load new (unmatched) authorities --action overlay_auths_stage1 - overlay based on LCCN where heading has NOT changed; this step disables propagation to bib records --action overlay_auths_stage2 - overlay based on LCCN where heading HAS changed; propagates changes to bib records --action overlay_auths_stage3 - overlay for records where a cancelled LCCN is replaced with a new one --action link_auth_auth - run authority_authority_linker.pl for the authorities that were overlaid or added in this batch. --action link_auth_bib - run authority_control_fields.pl for the bibs that were overlaid in this batch. Several switches are used regardless of the specified action: --schema - Pg schema in which staging table will live; should be created beforehand --batch - name of bib batch; will also be used as the name of the staging tables --db - database name --dbuser - database user --dbpw - database password --dbhost - database host Examples: $0 --schema bib_load --batch bibs_2016_01 --db evergreen \\ --dbuser evergreen --dbpw evergreen --dbhost localhost \\ --action stage_bibs -- file1.mrc file2.mrc [...] $0 --schema bib_load --batch bibs_2016_01 --db evergreen \\ --dbuser evergreen --dbpw evergreen --dbhost localhost \\ --action filter_bibs --cutoff 2016-01-02 $0 --schema bib_load --batch bibs_2016_01 --db evergreen \\ --dbuser evergreen --dbpw evergreen --dbhost localhost \\ --action load_bibs --wait 2 _USAGE_ } sub report_progress { my ($msg, $counter) = @_; if (defined $counter) { print STDERR "$msg: $counter\n"; } else { print STDERR "$msg\n"; } } sub connect_db { my ($db, $dbuser, $dbpw, $dbhost) = @_; my $dsn = "dbi:Pg:host=$dbhost;dbname=$db;port=5432"; my $attrs = { ShowErrorStatement => 1, RaiseError => 1, PrintError => 1, pg_enable_utf8 => 1, }; my $dbh = DBI->connect($dsn, $dbuser, $dbpw, $attrs); return $dbh; } sub handle_stage_bibs { my $dbh = shift; my $schema = shift; my $batch = shift; $dbh->do(qq{ DROP TABLE IF EXISTS $schema.$batch; }); $dbh->do(qq{ CREATE TABLE $schema.$batch ( id SERIAL, marc TEXT, bib_id BIGINT, imported BOOLEAN DEFAULT FALSE, to_import BOOLEAN DEFAULT TRUE, skip_reason TEXT ) }); local $/ = "\035"; my $i = 0; binmode STDIN, ':utf8'; my $ins = $dbh->prepare("INSERT INTO $schema.$batch (marc, bib_id) VALUES (?, ?)"); $dbh->begin_work; while (<>) { $i++; if (0 == $i % 100) { report_progress("Records staged", $i); $dbh->commit; $dbh->begin_work; } my $marc = MARC::Record->new_from_usmarc($_); my $bibid = $marc->subfield('901', 'c'); if ($bibid !~ /^\d+$/) { print STDERR "Record $i is suspect; skipping\n"; next; } my $xml = OpenILS::Application::AppUtils->entityize($marc->as_xml_record()); $ins->execute($xml, $bibid); } $dbh->commit; report_progress("Records staged", $i) if 0 != $i % 100; $dbh->do(qq/ CREATE INDEX ${batch}_bib_id_idx ON $schema.$batch (bib_id); /); $dbh->do(qq/ CREATE INDEX ${batch}_id_idx ON $schema.$batch (id); /); } sub handle_filter_bibs { my $dbh = shift; my $schema = shift; my $batch = shift; my $cutoff = shift; my $sth1 = $dbh->prepare(qq{ UPDATE $schema.$batch SET to_import = FALSE, skip_reason = 'deleted' WHERE bib_id IN ( SELECT id FROM biblio.record_entry WHERE deleted ) AND to_import AND NOT imported }); $sth1->execute(); my $ct = $sth1->rows; report_progress("Filtering out $ct records that are currently deleted"); my $sth2 = $dbh->prepare(qq{ UPDATE $schema.$batch SET to_import = FALSE, skip_reason = 'edited after cutoff of $cutoff' WHERE bib_id IN ( SELECT id FROM biblio.record_entry WHERE edit_date >= ? ) AND to_import AND NOT imported }); $sth2->execute($cutoff); $ct = $sth2->rows; report_progress("Filtering out $ct records edited after cutoff date of $cutoff"); my $sth3 = $dbh->prepare(qq{ UPDATE $schema.$batch SET to_import = FALSE, skip_reason = 'XML is not well-formed' WHERE NOT xml_is_well_formed(marc) AND to_import AND NOT imported }); $sth3->execute(); $ct = $sth3->rows; report_progress("Filtering out $ct records whose XML is not well-formed"); } sub handle_load_bibs { my $dbh = shift; my $schema = shift; my $batch = shift; my $wait = shift; my $getct = $dbh->prepare(qq{ SELECT COUNT(*) FROM $schema.$batch WHERE to_import AND NOT imported }); $getct->execute(); my $max = $getct->fetchrow_arrayref()->[0]; report_progress('Number of bibs to update', $max); for (my $i = 1; $i <= $max; $i++) { report_progress('... bibs updated', $i) if 0 == $i % 10 or $i == $max; $dbh->begin_work; $dbh->do(qq{ UPDATE biblio.record_entry a SET marc = b.marc FROM $schema.$batch b WHERE a.id = b.bib_id AND bib_id IN ( SELECT bib_id FROM $schema.$batch WHERE to_import AND NOT imported ORDER BY id LIMIT 1 ) }); $dbh->do(qq{ UPDATE $schema.$batch SET imported = TRUE WHERE bib_id IN ( SELECT bib_id FROM $schema.$batch WHERE to_import AND NOT imported ORDER BY id LIMIT 1 ) }); $dbh->commit; sleep $wait; } } sub handle_stage_auths { my $dbh = shift; my $schema = shift; my $batch = shift; $dbh->do(qq{ DROP TABLE IF EXISTS $schema.auths_$batch; }); $dbh->do(qq{ CREATE TABLE $schema.auths_$batch ( id SERIAL, marc TEXT, auth_id BIGINT, new_auth_id BIGINT, existing_heading TEXT, lccn TEXT, cancelled_lccn TEXT, cancelled_auth_id BIGINT, heading TEXT, lccn_matched BOOLEAN DEFAULT FALSE, heading_matched BOOLEAN DEFAULT FALSE, imported BOOLEAN DEFAULT FALSE, to_import BOOLEAN DEFAULT TRUE, skip_reason TEXT ) }); local $/ = "\035"; my $i = 0; binmode STDIN, ':utf8'; my $ins = $dbh->prepare(qq{ INSERT INTO $schema.auths_$batch (marc, auth_id, lccn, cancelled_lccn, heading) VALUES (?, ?, ?, ?, authority.normalize_heading(?)) }); $dbh->begin_work; while (<>) { $i++; if (0 == $i % 100) { report_progress("Records staged", $i); $dbh->commit; $dbh->begin_work; } my $marc = MARC::Record->new_from_usmarc($_); my $authid = $marc->subfield('901', 'c'); if (defined($authid) && $authid !~ /^\d+$/) { undef $authid; } my $lccn = $marc->subfield('010', 'a'); if (defined $lccn) { $lccn =~ s/^\s+//; $lccn =~ s/\s+$//; $lccn =~ s/\s+/ /g; } my $cancelled_lccn = $marc->subfield('010', 'z'); if (defined $cancelled_lccn) { $cancelled_lccn =~ s/^\s+//; $cancelled_lccn =~ s/\s+$//; $cancelled_lccn =~ s/\s+/ /g; } my $xml = OpenILS::Application::AppUtils->entityize($marc->as_xml_record()); $ins->execute($xml, $authid, $lccn, $cancelled_lccn, $xml); } $dbh->commit; report_progress("Records staged", $i) if 0 != $i % 100; $dbh->do(qq/ CREATE INDEX auths_${batch}_auth_id_idx ON $schema.auths_$batch (auth_id); /); $dbh->do(qq/ CREATE INDEX auths_${batch}_id_idx ON $schema.auths_$batch (id); /); $dbh->do(qq/ CREATE INDEX auths_${batch}_lccn_idx ON $schema.auths_$batch (lccn); /); } sub handle_match_auths { my ($dbh, $schema, $batch) = @_; my $sth = $dbh->prepare(qq{ UPDATE $schema.auths_${batch} a SET auth_id = b.record, lccn_matched = TRUE, existing_heading = authority.normalize_heading(c.marc) FROM authority.full_rec b JOIN authority.record_entry c ON (b.record = c.id) WHERE tag = '010' AND subfield = 'a' AND value = lccn AND auth_id IS NULL AND lccn IS NOT NULL; }); $sth->execute(); my $ct = $sth->rows; report_progress("Matched $ct authorities on LCCN"); $sth = $dbh->prepare(qq{ UPDATE $schema.auths_${batch} a SET cancelled_auth_id = b.record FROM authority.full_rec b WHERE tag = '010' AND subfield = 'a' AND value = cancelled_lccn AND auth_id IS NULL AND cancelled_lccn IS NOT NULL; }); $sth->execute(); $ct = $sth->rows; report_progress("Matched $ct authorities on cancelled LCCN"); $sth = $dbh->prepare(qq{ UPDATE $schema.auths_$batch a SET auth_id = b.id, heading_matched = TRUE, existing_heading = b.heading FROM authority.record_entry b WHERE a.heading = b.heading AND auth_id IS NULL; }); $sth->execute(); $ct = $sth->rows; report_progress("Matched $ct authorities on heading"); } sub handle_load_new_auths { my $dbh = shift; my $schema = shift; my $batch = shift; my $getct = $dbh->prepare(qq{ SELECT COUNT(*) FROM $schema.auths_$batch WHERE to_import AND NOT imported AND new_auth_id IS NULL AND auth_id IS NULL AND cancelled_auth_id IS NULL }); $getct->execute(); my $max = $getct->fetchrow_arrayref()->[0]; report_progress('Number of authorities to add', $max); for (my $i = 1; $i <= $max; $i++) { report_progress('... authorities added', $i) if 0 == $i % 10 or $i == $max; $dbh->begin_work; $dbh->do(qq{ INSERT INTO authority.record_entry (marc, last_xact_id) SELECT marc, ? || '-' || id FROM $schema.auths_$batch b WHERE id IN ( SELECT id FROM $schema.auths_$batch WHERE to_import AND NOT imported AND new_auth_id IS NULL AND auth_id IS NULL AND cancelled_auth_id IS NULL ORDER BY id LIMIT 1 ) }, {}, "auths_$batch"); $dbh->do(qq{ UPDATE $schema.auths_$batch SET imported = TRUE, new_auth_id = CURRVAL('authority.record_entry_id_seq') WHERE id IN ( SELECT id FROM $schema.auths_$batch WHERE to_import AND NOT imported AND new_auth_id IS NULL AND auth_id IS NULL AND cancelled_auth_id IS NULL ORDER BY id LIMIT 1 ) }); $dbh->commit; } } sub handle_overlay_auths_stage1 { my $dbh = shift; my $schema = shift; my $batch = shift; my $getct = $dbh->prepare(qq{ SELECT COUNT(*) FROM $schema.auths_$batch WHERE to_import AND NOT imported AND lccn_matched AND heading = existing_heading }); $getct->execute(); my $max = $getct->fetchrow_arrayref()->[0]; report_progress('Number of auths to update', $max); $dbh->do(q{ UPDATE config.internal_flag SET enabled = TRUE WHERE name = 'ingest.disable_authority_auto_update'; }); for (my $i = 1; $i <= $max; $i++) { report_progress('... auths updated', $i) if 0 == $i % 10 or $i == $max; $dbh->begin_work; $dbh->do(qq{ UPDATE authority.record_entry a SET marc = b.marc, edit_date = NOW() FROM $schema.auths_$batch b WHERE a.id = b.auth_id AND auth_id IN ( SELECT auth_id FROM $schema.auths_$batch WHERE to_import AND NOT imported AND lccn_matched AND heading = existing_heading ORDER BY id LIMIT 1 ) }); $dbh->do(qq{ UPDATE $schema.auths_$batch SET imported = TRUE WHERE auth_id IN ( SELECT auth_id FROM $schema.auths_$batch WHERE to_import AND NOT imported AND lccn_matched AND heading = existing_heading ORDER BY id LIMIT 1 ) }); $dbh->commit; } $dbh->do(q{ UPDATE config.internal_flag SET enabled = FALSE WHERE name = 'ingest.disable_authority_auto_update'; }); } sub handle_overlay_auths_stage2 { my $dbh = shift; my $schema = shift; my $batch = shift; my $getct = $dbh->prepare(qq{ SELECT COUNT(*) FROM $schema.auths_$batch WHERE to_import AND NOT imported AND lccn_matched AND heading <> existing_heading }); $getct->execute(); my $max = $getct->fetchrow_arrayref()->[0]; report_progress('Number of auths to update', $max); for (my $i = 1; $i <= $max; $i++) { report_progress('... auths updated', $i) if 0 == $i % 10 or $i == $max; $dbh->begin_work; $dbh->do(qq{ UPDATE authority.record_entry a SET marc = b.marc, edit_date = NOW() FROM $schema.auths_$batch b WHERE a.id = b.auth_id AND auth_id IN ( SELECT auth_id FROM $schema.auths_$batch WHERE to_import AND NOT imported AND lccn_matched AND heading <> existing_heading ORDER BY id LIMIT 1 ) }); $dbh->do(qq{ UPDATE $schema.auths_$batch SET imported = TRUE WHERE auth_id IN ( SELECT auth_id FROM $schema.auths_$batch WHERE to_import AND NOT imported AND lccn_matched AND heading <> existing_heading ORDER BY id LIMIT 1 ) }); $dbh->commit; } } sub handle_overlay_auths_stage3 { my $dbh = shift; my $schema = shift; my $batch = shift; my $getct = $dbh->prepare(qq{ SELECT COUNT(*) FROM $schema.auths_$batch WHERE to_import AND NOT imported AND ( auth_id IS NULL OR auth_id = cancelled_auth_id ) AND cancelled_auth_id IS NOT NULL }); $getct->execute(); my $max = $getct->fetchrow_arrayref()->[0]; report_progress('Number of auths to update', $max); for (my $i = 1; $i <= $max; $i++) { report_progress('... auths updated', $i) if 0 == $i % 10 or $i == $max; $dbh->begin_work; $dbh->do(qq{ UPDATE authority.record_entry a SET marc = b.marc, edit_date = NOW() FROM $schema.auths_$batch b WHERE a.id = b.cancelled_auth_id AND cancelled_auth_id IN ( SELECT auth_id FROM $schema.auths_$batch WHERE to_import AND NOT imported AND ( auth_id IS NULL OR auth_id = cancelled_auth_id ) AND cancelled_auth_id IS NOT NULL ORDER BY id LIMIT 1 ) }); $dbh->do(qq{ UPDATE $schema.auths_$batch SET imported = TRUE WHERE cancelled_auth_id IN ( SELECT cancelled_auth_id FROM $schema.auths_$batch WHERE to_import AND NOT imported AND ( auth_id IS NULL OR auth_id = cancelled_auth_id ) AND cancelled_auth_id IS NOT NULL ORDER BY id LIMIT 1 ) }); $dbh->commit; } } sub handle_link_auth_auth { my $dbh = shift; my $schema = shift; my $batch = shift; $dbh->do(q{ UPDATE config.internal_flag SET enabled = TRUE WHERE name = 'ingest.disable_authority_auto_update'; }); my $sth = $dbh->prepare(qq{ SELECT COALESCE(new_auth_id, auth_id, cancelled_auth_id) AS id FROM $schema.auths_$batch WHERE imported ORDER BY 1 }); $sth->execute(); my @ids = map { $_->{id} } @{ $sth->fetchall_arrayref({}) }; my $i = 0; report_progress(scalar(@ids) . " records to do auth-auth linking"); foreach my $id (@ids) { $i++; report_progress('... auth-auth linkings processed', $i) if 0 == $i % 10 or $i == scalar(@ids); system "/openils/bin/authority_authority_linker.pl -r $id -c /openils/conf/opensrf_core.xml"; } $dbh->do(q{ UPDATE config.internal_flag SET enabled = FALSE WHERE name = 'ingest.disable_authority_auto_update'; }); } sub handle_link_auth_bib { my $dbh = shift; my $schema = shift; my $batch = shift; my $sth = $dbh->prepare(qq{ SELECT bib_id AS id FROM $schema.$batch WHERE imported ORDER BY 1 }); $sth->execute(); my @ids = map { $_->{id} } @{ $sth->fetchall_arrayref({}) }; my $i = 0; report_progress(scalar(@ids) . " records to do auth-bib linking"); foreach my $id (@ids) { $i++; report_progress('... auth-bib linkings processed', $i) if 0 == $i % 10 or $i == scalar(@ids); system "/openils/bin/authority_control_fields.pl --record $id -c /openils/conf/opensrf_core.xml"; } }