X-Git-Url: http://git.equinoxoli.org/?p=migration-tools.git;a=blobdiff_plain;f=eg_staged_bib_overlay;h=d4e788a6d3795939d1559b605c68c281edc9a3da;hp=8e723be2242362f3db1dd2b40572a5434f3b38ba;hb=6ade3d523bfdfe7add008ebe0a0e6d494edac88c;hpb=280c85e703057bd317af85c1f898b344f9ed86c2 diff --git a/eg_staged_bib_overlay b/eg_staged_bib_overlay index 8e723be..d4e788a 100755 --- a/eg_staged_bib_overlay +++ b/eg_staged_bib_overlay @@ -33,7 +33,9 @@ my $dbpw; my $dbhost; my $batch; my $cutoff; -my $wait = 1; +my $wait = 0; +my $output; +my $link_skipped; my $ret = GetOptions( 'action:s' => \$action, @@ -45,6 +47,8 @@ my $ret = GetOptions( 'batch:s' => \$batch, 'cutoff:s' => \$cutoff, 'wait:i' => \$wait, + 'output:s' => \$output, + 'link-skipped' => \$link_skipped, ); abort('must specify --action') unless defined $action; @@ -55,10 +59,24 @@ abort('must specify --dbhost') unless defined $dbhost; abort('must specify --dbpw') unless defined $dbpw; abort('must specify --batch') unless defined $batch; -abort('--action must be "stage_bibs" or "filter_bibs" or "load_bibs"') unless +abort(q{--action must be "stage_bibs", "filter_bibs", "load_bibs", "stage_auths", +"match_auths", "load_new_auths", "overlay_auths_stage1", +"overlay_auths_stage2", "overlay_auths_stage3", "link_auth_auth", +"link_auth_bib", "export_skipped_bibs", or "export_skipped_auths"}) unless $action eq 'filter_bibs' or $action eq 'stage_bibs' or - $action eq 'load_bibs'; + $action eq 'load_bibs' or + $action eq 'stage_auths' or + $action eq 'match_auths' or + $action eq 'load_new_auths' or + $action eq 'overlay_auths_stage1' or + $action eq 'overlay_auths_stage2' or + $action eq 'overlay_auths_stage3' or + $action eq 'link_auth_auth' or + $action eq 'link_auth_bib' or + $action eq 'export_skipped_bibs' or + $action eq 'export_skipped_auths' +; my $dbh = connect_db($db, $dbuser, $dbpw, $dbhost); @@ -76,6 +94,45 @@ if ($action eq 'load_bibs' ) { handle_load_bibs($dbh, $schema, $batch, $wait); } +if ($action eq 'stage_auths') { + abort('must specify at least one input file') unless @ARGV; + handle_stage_auths($dbh, $schema, $batch); +} + +if ($action eq 'match_auths') { + handle_match_auths($dbh, $schema, $batch); +} + +if ($action eq 'load_new_auths') { + handle_load_new_auths($dbh, $schema, $batch); +} + +if ($action eq 'overlay_auths_stage1') { + handle_overlay_auths_stage1($dbh, $schema, $batch); +} +if ($action eq 'overlay_auths_stage2') { + handle_overlay_auths_stage2($dbh, $schema, $batch); +} +if ($action eq 'overlay_auths_stage3') { + handle_overlay_auths_stage3($dbh, $schema, $batch); +} + +if ($action eq 'link_auth_auth') { + handle_link_auth_auth($dbh, $schema, $batch); +} +if ($action eq 'link_auth_bib') { + handle_link_auth_bib($dbh, $schema, $batch, $link_skipped); +} + +if ($action eq 'export_skipped_bibs') { + abort('must specify output file') unless defined $output; + handle_export_skipped_bibs($dbh, $schema, $batch, $output); +} +if ($action eq 'export_skipped_auths') { + abort('must specify output file') unless defined $output; + handle_export_skipped_auths($dbh, $schema, $batch, $output); +} + sub abort { my $msg = shift; print STDERR "$0: $msg", "\n"; @@ -105,12 +162,46 @@ This program has several modes controlled by the --action switch: wait the number of seconds specified by the --wait switch. + --action stage_auths - load MARC authorities into staging + table + --action match_auths - identify matches with authority + records already present in the + Evergreen database; matching is + based on LCCN, cancelled LCCN, and + main heading. + --action load_new_auths - load new (unmatched) authorities + --action overlay_auths_stage1 - overlay based on LCCN where + heading has NOT changed; this step + disables propagation to bib records + --action overlay_auths_stage2 - overlay based on LCCN where heading + HAS changed; propagates changes + to bib records + --action overlay_auths_stage3 - overlay for records where a cancelled + LCCN is replaced with a new one + --action link_auth_auth - run authority_authority_linker.pl for + the authorities that were overlaid + or added in this batch. + --action link_auth_bib - run authority_control_fields.pl for + the bibs that were overlaid in this + batch. Add --link-skipped to specify + that bibs that were matched but + skipped due to having be edited after + the cutoff should be linked (rather + than linking the imported bibs) + --action export_skipped_bibs - export to ISO2709 file whose name is + specified by --output those bibs + that had been edited after the cutoff. + --action export_skipped_auths - export to ISO2709 file whose name is + specified by --output those authorities + that could not be definitively + handled as updates or adds. + Several switches are used regardless of the specified action: --schema - Pg schema in which staging table will live; should be created beforehand --batch - name of bib batch; will also be used as the name - of the staging table + of the staging tables --db - database name --dbuser - database user --dbpw - database password @@ -190,14 +281,19 @@ sub handle_stage_bibs { $dbh->commit; $dbh->begin_work; } - my $marc = MARC::Record->new_from_usmarc($_); - my $bibid = $marc->subfield('901', 'c'); - if ($bibid !~ /^\d+$/) { - print STDERR "Record $i is suspect; skipping\n"; + eval { + my $marc = MARC::Record->new_from_usmarc($_); + my $bibid = $marc->subfield('901', 'c'); + if ($bibid !~ /^\d+$/) { + die('Subfield 901$c is not numeric or missing.'); + } + my $xml = OpenILS::Application::AppUtils->entityize($marc->as_xml_record()); + $ins->execute($xml, $bibid); + }; + if ($@) { + warn("Record $i is bad: $@; skipping."); next; } - my $xml = OpenILS::Application::AppUtils->entityize($marc->as_xml_record()); - $ins->execute($xml, $bibid); } $dbh->commit; report_progress("Records staged", $i) if 0 != $i % 100; @@ -226,6 +322,8 @@ sub handle_filter_bibs { FROM biblio.record_entry WHERE deleted ) + AND to_import + AND NOT imported }); $sth1->execute(); my $ct = $sth1->rows; @@ -240,11 +338,24 @@ sub handle_filter_bibs { FROM biblio.record_entry WHERE edit_date >= ? ) - AND to_import; + AND to_import + AND NOT imported }); $sth2->execute($cutoff); $ct = $sth2->rows; report_progress("Filtering out $ct records edited after cutoff date of $cutoff"); + + my $sth3 = $dbh->prepare(qq{ + UPDATE $schema.$batch + SET to_import = FALSE, + skip_reason = 'XML is not well-formed' + WHERE NOT xml_is_well_formed(marc) + AND to_import + AND NOT imported + }); + $sth3->execute(); + $ct = $sth3->rows; + report_progress("Filtering out $ct records whose XML is not well-formed"); } sub handle_load_bibs { @@ -276,7 +387,7 @@ sub handle_load_bibs { FROM $schema.$batch WHERE to_import AND NOT imported - ORDER BY id + ORDER BY bib_id DESC LIMIT 1 ) }); @@ -295,5 +406,495 @@ sub handle_load_bibs { $dbh->commit; sleep $wait; } - +} + +sub handle_stage_auths { + my $dbh = shift; + my $schema = shift; + my $batch = shift; + + $dbh->do(qq{ + DROP TABLE IF EXISTS $schema.auths_$batch; + }); + $dbh->do(qq{ + CREATE TABLE $schema.auths_$batch ( + id SERIAL, + marc TEXT, + auth_id BIGINT, + new_auth_id BIGINT, + existing_heading TEXT, + lccn TEXT, + cancelled_lccn TEXT, + cancelled_auth_id BIGINT, + heading TEXT, + lccn_matched BOOLEAN DEFAULT FALSE, + heading_matched BOOLEAN DEFAULT FALSE, + imported BOOLEAN DEFAULT FALSE, + to_import BOOLEAN DEFAULT TRUE, + skip_reason TEXT + ) + }); + + local $/ = "\035"; + my $i = 0; + binmode STDIN, ':utf8'; + my $ins = $dbh->prepare(qq{ + INSERT INTO $schema.auths_$batch (marc, auth_id, lccn, cancelled_lccn, heading) + VALUES (?, ?, ?, ?, authority.normalize_heading(?)) + }); + $dbh->begin_work; + while (<>) { + $i++; + if (0 == $i % 100) { + report_progress("Records staged", $i); + $dbh->commit; + $dbh->begin_work; + } + eval { + my $marc = MARC::Record->new_from_usmarc($_); + my $authid = $marc->subfield('901', 'c'); + if (defined($authid) && $authid !~ /^\d+$/) { + undef $authid; + } + my $lccn = $marc->subfield('010', 'a'); + if (defined $lccn) { + $lccn =~ s/^\s+//; + $lccn =~ s/\s+$//; + $lccn =~ s/\s+/ /g; + } + my $cancelled_lccn = $marc->subfield('010', 'z'); + if (defined $cancelled_lccn) { + $cancelled_lccn =~ s/^\s+//; + $cancelled_lccn =~ s/\s+$//; + $cancelled_lccn =~ s/\s+/ /g; + } + my $xml = OpenILS::Application::AppUtils->entityize($marc->as_xml_record()); + $ins->execute($xml, $authid, $lccn, $cancelled_lccn, $xml); + }; + if ($@) { + warn("Record $i is bad: $@; skipping."); + next; + } + } + $dbh->commit; + report_progress("Records staged", $i) if 0 != $i % 100; + $dbh->do(qq/ + CREATE INDEX auths_${batch}_auth_id_idx ON + $schema.auths_$batch (auth_id); + /); + $dbh->do(qq/ + CREATE INDEX auths_${batch}_id_idx ON + $schema.auths_$batch (id); + /); + $dbh->do(qq/ + CREATE INDEX auths_${batch}_lccn_idx ON + $schema.auths_$batch (lccn); + /); +} + +sub handle_match_auths { + my ($dbh, $schema, $batch) = @_; + + my $sth = $dbh->prepare(qq{ + UPDATE $schema.auths_${batch} a + SET auth_id = b.record, + lccn_matched = TRUE, + existing_heading = authority.normalize_heading(c.marc) + FROM authority.full_rec b + JOIN authority.record_entry c ON (b.record = c.id) + WHERE tag = '010' + AND subfield = 'a' + AND value = lccn + AND auth_id IS NULL + AND lccn IS NOT NULL; + }); + $sth->execute(); + my $ct = $sth->rows; + report_progress("Matched $ct authorities on LCCN"); + + $sth = $dbh->prepare(qq{ + UPDATE $schema.auths_${batch} a + SET cancelled_auth_id = b.record + FROM authority.full_rec b + WHERE tag = '010' + AND subfield = 'a' + AND value = cancelled_lccn + AND auth_id IS NULL + AND cancelled_lccn IS NOT NULL; + }); + $sth->execute(); + $ct = $sth->rows; + report_progress("Matched $ct authorities on cancelled LCCN"); + + $sth = $dbh->prepare(qq{ + UPDATE $schema.auths_$batch a + SET auth_id = b.id, + heading_matched = TRUE, + existing_heading = b.heading + FROM authority.record_entry b + WHERE a.heading = b.heading + AND auth_id IS NULL; + }); + $sth->execute(); + $ct = $sth->rows; + report_progress("Matched $ct authorities on heading"); +} + +sub handle_load_new_auths { + my $dbh = shift; + my $schema = shift; + my $batch = shift; + + my $getct = $dbh->prepare(qq{ + SELECT COUNT(*) + FROM $schema.auths_$batch + WHERE to_import + AND NOT imported + AND new_auth_id IS NULL + AND auth_id IS NULL + AND cancelled_auth_id IS NULL + }); + $getct->execute(); + my $max = $getct->fetchrow_arrayref()->[0]; + + report_progress('Number of authorities to add', $max); + for (my $i = 1; $i <= $max; $i++) { + report_progress('... authorities added', $i) if 0 == $i % 10 or $i == $max; + $dbh->begin_work; + $dbh->do(qq{ + INSERT INTO authority.record_entry (marc, last_xact_id) + SELECT marc, ? || '-' || id + FROM $schema.auths_$batch b + WHERE id IN ( + SELECT id + FROM $schema.auths_$batch + WHERE to_import + AND NOT imported + AND new_auth_id IS NULL + AND auth_id IS NULL + AND cancelled_auth_id IS NULL + ORDER BY id + LIMIT 1 + ) + }, {}, "auths_$batch"); + $dbh->do(qq{ + UPDATE $schema.auths_$batch + SET imported = TRUE, + new_auth_id = CURRVAL('authority.record_entry_id_seq') + WHERE id IN ( + SELECT id + FROM $schema.auths_$batch + WHERE to_import + AND NOT imported + AND new_auth_id IS NULL + AND auth_id IS NULL + AND cancelled_auth_id IS NULL + ORDER BY id + LIMIT 1 + ) + }); + $dbh->commit; + sleep $wait; + } +} + +sub handle_overlay_auths_stage1 { + my $dbh = shift; + my $schema = shift; + my $batch = shift; + + my $getct = $dbh->prepare(qq{ + SELECT COUNT(*) + FROM $schema.auths_$batch + WHERE to_import + AND NOT imported + AND lccn_matched + AND heading = existing_heading + }); + $getct->execute(); + my $max = $getct->fetchrow_arrayref()->[0]; + report_progress('Number of auths to update', $max); + + $dbh->do(q{ + UPDATE config.internal_flag SET enabled = TRUE + WHERE name = 'ingest.disable_authority_auto_update'; + }); + for (my $i = 1; $i <= $max; $i++) { + report_progress('... auths updated', $i) if 0 == $i % 10 or $i == $max; + $dbh->begin_work; + $dbh->do(qq{ + UPDATE authority.record_entry a + SET marc = b.marc, + edit_date = NOW() + FROM $schema.auths_$batch b + WHERE a.id = b.auth_id + AND auth_id IN ( + SELECT auth_id + FROM $schema.auths_$batch + WHERE to_import + AND NOT imported + AND lccn_matched + AND heading = existing_heading + ORDER BY id + LIMIT 1 + ) + }); + $dbh->do(qq{ + UPDATE $schema.auths_$batch + SET imported = TRUE + WHERE auth_id IN ( + SELECT auth_id + FROM $schema.auths_$batch + WHERE to_import + AND NOT imported + AND lccn_matched + AND heading = existing_heading + ORDER BY id + LIMIT 1 + ) + }); + $dbh->commit; + } + $dbh->do(q{ + UPDATE config.internal_flag SET enabled = FALSE + WHERE name = 'ingest.disable_authority_auto_update'; + }); +} + +sub handle_overlay_auths_stage2 { + my $dbh = shift; + my $schema = shift; + my $batch = shift; + + my $getct = $dbh->prepare(qq{ + SELECT COUNT(*) + FROM $schema.auths_$batch + WHERE to_import + AND NOT imported + AND lccn_matched + AND heading <> existing_heading + }); + $getct->execute(); + my $max = $getct->fetchrow_arrayref()->[0]; + report_progress('Number of auths to update', $max); + + for (my $i = 1; $i <= $max; $i++) { + report_progress('... auths updated', $i) if 0 == $i % 10 or $i == $max; + $dbh->begin_work; + $dbh->do(qq{ + UPDATE authority.record_entry a + SET marc = b.marc, + edit_date = NOW() + FROM $schema.auths_$batch b + WHERE a.id = b.auth_id + AND auth_id IN ( + SELECT auth_id + FROM $schema.auths_$batch + WHERE to_import + AND NOT imported + AND lccn_matched + AND heading <> existing_heading + ORDER BY id + LIMIT 1 + ) + }); + $dbh->do(qq{ + UPDATE $schema.auths_$batch + SET imported = TRUE + WHERE auth_id IN ( + SELECT auth_id + FROM $schema.auths_$batch + WHERE to_import + AND NOT imported + AND lccn_matched + AND heading <> existing_heading + ORDER BY id + LIMIT 1 + ) + }); + $dbh->commit; + } +} + +sub handle_overlay_auths_stage3 { + my $dbh = shift; + my $schema = shift; + my $batch = shift; + + my $getct = $dbh->prepare(qq{ + SELECT COUNT(*) + FROM $schema.auths_$batch + WHERE to_import + AND NOT imported + AND ( + auth_id IS NULL OR + auth_id = cancelled_auth_id + ) + AND cancelled_auth_id IS NOT NULL + }); + $getct->execute(); + my $max = $getct->fetchrow_arrayref()->[0]; + report_progress('Number of auths to update', $max); + + for (my $i = 1; $i <= $max; $i++) { + report_progress('... auths updated', $i) if 0 == $i % 10 or $i == $max; + $dbh->begin_work; + $dbh->do(qq{ + UPDATE authority.record_entry a + SET marc = b.marc, + edit_date = NOW() + FROM $schema.auths_$batch b + WHERE a.id = b.cancelled_auth_id + AND cancelled_auth_id IN ( + SELECT auth_id + FROM $schema.auths_$batch + WHERE to_import + AND NOT imported + AND ( + auth_id IS NULL OR + auth_id = cancelled_auth_id + ) + AND cancelled_auth_id IS NOT NULL + ORDER BY id + LIMIT 1 + ) + }); + $dbh->do(qq{ + UPDATE $schema.auths_$batch + SET imported = TRUE + WHERE cancelled_auth_id IN ( + SELECT cancelled_auth_id + FROM $schema.auths_$batch + WHERE to_import + AND NOT imported + AND ( + auth_id IS NULL OR + auth_id = cancelled_auth_id + ) + AND cancelled_auth_id IS NOT NULL + ORDER BY id + LIMIT 1 + ) + }); + $dbh->commit; + } +} + +sub handle_link_auth_auth { + my $dbh = shift; + my $schema = shift; + my $batch = shift; + + $dbh->do(q{ + UPDATE config.internal_flag SET enabled = TRUE + WHERE name = 'ingest.disable_authority_auto_update'; + }); + + my $sth = $dbh->prepare(qq{ + SELECT COALESCE(new_auth_id, auth_id, cancelled_auth_id) AS id + FROM $schema.auths_$batch + WHERE imported + ORDER BY 1 + }); + $sth->execute(); + my @ids = map { $_->{id} } @{ $sth->fetchall_arrayref({}) }; + my $i = 0; + report_progress(scalar(@ids) . " records to do auth-auth linking"); + foreach my $id (@ids) { + $i++; + report_progress('... auth-auth linkings processed', $i) if 0 == $i % 10 or $i == scalar(@ids); + system "/openils/bin/authority_authority_linker.pl -r $id -c /openils/conf/opensrf_core.xml"; + } + + $dbh->do(q{ + UPDATE config.internal_flag SET enabled = FALSE + WHERE name = 'ingest.disable_authority_auto_update'; + }); +} + +sub handle_link_auth_bib { + my $dbh = shift; + my $schema = shift; + my $batch = shift; + my $link_skipped = shift; + + my $query; + if ($link_skipped) { + $query = qq{ + SELECT bib_id AS id + FROM $schema.$batch + WHERE NOT imported + AND skip_reason ~ '^edit' + ORDER BY 1 + }; + } else { + $query = qq{ + SELECT bib_id AS id + FROM $schema.$batch + WHERE imported + ORDER BY 1 + }; + } + + my $sth = $dbh->prepare($query); + $sth->execute(); + my @ids = map { $_->{id} } @{ $sth->fetchall_arrayref({}) }; + my $i = 0; + report_progress(scalar(@ids) . " records to do auth-bib linking"); + foreach my $id (@ids) { + $i++; + report_progress('... auth-bib linkings processed', $i) if 0 == $i % 10 or $i == scalar(@ids); + system "/openils/bin/authority_control_fields.pl --record $id -c /openils/conf/opensrf_core.xml"; + } + +} + +sub handle_export_skipped_bibs { + my $dbh = shift; + my $schema = shift; + my $batch = shift; + my $output = shift; + + my $outfh; + open($outfh, '>', $output) or die("Could not open input file $output: $!\n"); + binmode $outfh, ':utf8'; + + my $sth = $dbh->prepare(qq{ + SELECT marc + FROM $schema.$batch + WHERE skip_reason ~ '^edit' + ORDER BY id + }); + $sth->execute(); + + while (my $row = $sth->fetchrow_hashref()) { + my $marc = MARC::Record->new_from_xml($row->{marc}); + print $outfh $marc->as_usmarc(); + } + $outfh->close(); +} + +sub handle_export_skipped_auths { + my $dbh = shift; + my $schema = shift; + my $batch = shift; + my $output = shift; + + my $outfh; + open($outfh, '>', $output) or die("Could not open input file $output: $!\n"); + binmode $outfh, ':utf8'; + + my $sth = $dbh->prepare(qq{ + SELECT marc + FROM $schema.auths_$batch + WHERE NOT imported + ORDER BY id + }); + $sth->execute(); + + while (my $row = $sth->fetchrow_hashref()) { + my $marc = MARC::Record->new_from_xml($row->{marc}); + print $outfh $marc->as_usmarc(); + } + $outfh->close(); }