my $batch;
my $cutoff;
my $wait = 1;
+my $output;
+my $link_skipped;
my $ret = GetOptions(
'action:s' => \$action,
'batch:s' => \$batch,
'cutoff:s' => \$cutoff,
'wait:i' => \$wait,
+ 'output:s' => \$output,
+ 'link-skipped' => \$link_skipped,
);
abort('must specify --action') unless defined $action;
abort('must specify --dbpw') unless defined $dbpw;
abort('must specify --batch') unless defined $batch;
-abort('--action must be "stage_bibs" or "filter_bibs" or "load_bibs"') unless
+abort(q{--action must be "stage_bibs", "filter_bibs", "load_bibs", "stage_auths",
+"match_auths", "load_new_auths", "overlay_auths_stage1",
+"overlay_auths_stage2", "overlay_auths_stage3", "link_auth_auth",
+"link_auth_bib", "export_skipped_bibs", or "export_skipped_auths"}) unless
$action eq 'filter_bibs' or
$action eq 'stage_bibs' or
- $action eq 'load_bibs';
+ $action eq 'load_bibs' or
+ $action eq 'stage_auths' or
+ $action eq 'match_auths' or
+ $action eq 'load_new_auths' or
+ $action eq 'overlay_auths_stage1' or
+ $action eq 'overlay_auths_stage2' or
+ $action eq 'overlay_auths_stage3' or
+ $action eq 'link_auth_auth' or
+ $action eq 'link_auth_bib' or
+ $action eq 'export_skipped_bibs' or
+ $action eq 'export_skipped_auths'
+;
my $dbh = connect_db($db, $dbuser, $dbpw, $dbhost);
handle_load_bibs($dbh, $schema, $batch, $wait);
}
+if ($action eq 'stage_auths') {
+ abort('must specify at least one input file') unless @ARGV;
+ handle_stage_auths($dbh, $schema, $batch);
+}
+
+if ($action eq 'match_auths') {
+ handle_match_auths($dbh, $schema, $batch);
+}
+
+if ($action eq 'load_new_auths') {
+ handle_load_new_auths($dbh, $schema, $batch);
+}
+
+if ($action eq 'overlay_auths_stage1') {
+ handle_overlay_auths_stage1($dbh, $schema, $batch);
+}
+if ($action eq 'overlay_auths_stage2') {
+ handle_overlay_auths_stage2($dbh, $schema, $batch);
+}
+if ($action eq 'overlay_auths_stage3') {
+ handle_overlay_auths_stage3($dbh, $schema, $batch);
+}
+
+if ($action eq 'link_auth_auth') {
+ handle_link_auth_auth($dbh, $schema, $batch);
+}
+if ($action eq 'link_auth_bib') {
+ handle_link_auth_bib($dbh, $schema, $batch, $link_skipped);
+}
+
+if ($action eq 'export_skipped_bibs') {
+ abort('must specify output file') unless defined $output;
+ handle_export_skipped_bibs($dbh, $schema, $batch, $output);
+}
+if ($action eq 'export_skipped_auths') {
+ abort('must specify output file') unless defined $output;
+ handle_export_skipped_auths($dbh, $schema, $batch, $output);
+}
+
sub abort {
my $msg = shift;
print STDERR "$0: $msg", "\n";
wait the number of seconds specified by the
--wait switch.
+ --action stage_auths - load MARC authorities into staging
+ table
+ --action match_auths - identify matches with authority
+ records already present in the
+ Evergreen database; matching is
+ based on LCCN, cancelled LCCN, and
+ main heading.
+ --action load_new_auths - load new (unmatched) authorities
+ --action overlay_auths_stage1 - overlay based on LCCN where
+ heading has NOT changed; this step
+ disables propagation to bib records
+ --action overlay_auths_stage2 - overlay based on LCCN where heading
+ HAS changed; propagates changes
+ to bib records
+ --action overlay_auths_stage3 - overlay for records where a cancelled
+ LCCN is replaced with a new one
+ --action link_auth_auth - run authority_authority_linker.pl for
+ the authorities that were overlaid
+ or added in this batch.
+ --action link_auth_bib - run authority_control_fields.pl for
+ the bibs that were overlaid in this
+ batch. Add --link-skipped to specify
+ that bibs that were matched but
+ skipped due to having be edited after
+ the cutoff should be linked (rather
+ than linking the imported bibs)
+ --action export_skipped_bibs - export to ISO2709 file whose name is
+ specified by --output those bibs
+ that had been edited after the cutoff.
+ --action export_skipped_auths - export to ISO2709 file whose name is
+ specified by --output those authorities
+ that could not be definitively
+ handled as updates or adds.
+
Several switches are used regardless of the specified action:
--schema - Pg schema in which staging table will live; should be
created beforehand
--batch - name of bib batch; will also be used as the name
- of the staging table
+ of the staging tables
--db - database name
--dbuser - database user
--dbpw - database password
$dbh->commit;
$dbh->begin_work;
}
- my $marc = MARC::Record->new_from_usmarc($_);
- my $bibid = $marc->subfield('901', 'c');
- if ($bibid !~ /^\d+$/) {
- print STDERR "Record $i is suspect; skipping\n";
+ eval {
+ my $marc = MARC::Record->new_from_usmarc($_);
+ my $bibid = $marc->subfield('901', 'c');
+ if ($bibid !~ /^\d+$/) {
+ die('Subfield 901$c is not numeric or missing.');
+ }
+ my $xml = OpenILS::Application::AppUtils->entityize($marc->as_xml_record());
+ $ins->execute($xml, $bibid);
+ };
+ if ($@) {
+ warn("Record $i is bad: $@; skipping.");
next;
}
- my $xml = OpenILS::Application::AppUtils->entityize($marc->as_xml_record());
- $ins->execute($xml, $bibid);
}
$dbh->commit;
report_progress("Records staged", $i) if 0 != $i % 100;
FROM biblio.record_entry
WHERE deleted
)
+ AND to_import
+ AND NOT imported
});
$sth1->execute();
my $ct = $sth1->rows;
FROM biblio.record_entry
WHERE edit_date >= ?
)
- AND to_import;
+ AND to_import
+ AND NOT imported
});
$sth2->execute($cutoff);
$ct = $sth2->rows;
report_progress("Filtering out $ct records edited after cutoff date of $cutoff");
+
+ my $sth3 = $dbh->prepare(qq{
+ UPDATE $schema.$batch
+ SET to_import = FALSE,
+ skip_reason = 'XML is not well-formed'
+ WHERE NOT xml_is_well_formed(marc)
+ AND to_import
+ AND NOT imported
+ });
+ $sth3->execute();
+ $ct = $sth3->rows;
+ report_progress("Filtering out $ct records whose XML is not well-formed");
}
sub handle_load_bibs {
FROM $schema.$batch
WHERE to_import
AND NOT imported
- ORDER BY id
+ ORDER BY bib_id DESC
LIMIT 1
)
});
$dbh->commit;
sleep $wait;
}
-
+}
+
+sub handle_stage_auths {
+ my $dbh = shift;
+ my $schema = shift;
+ my $batch = shift;
+
+ $dbh->do(qq{
+ DROP TABLE IF EXISTS $schema.auths_$batch;
+ });
+ $dbh->do(qq{
+ CREATE TABLE $schema.auths_$batch (
+ id SERIAL,
+ marc TEXT,
+ auth_id BIGINT,
+ new_auth_id BIGINT,
+ existing_heading TEXT,
+ lccn TEXT,
+ cancelled_lccn TEXT,
+ cancelled_auth_id BIGINT,
+ heading TEXT,
+ lccn_matched BOOLEAN DEFAULT FALSE,
+ heading_matched BOOLEAN DEFAULT FALSE,
+ imported BOOLEAN DEFAULT FALSE,
+ to_import BOOLEAN DEFAULT TRUE,
+ skip_reason TEXT
+ )
+ });
+
+ local $/ = "\035";
+ my $i = 0;
+ binmode STDIN, ':utf8';
+ my $ins = $dbh->prepare(qq{
+ INSERT INTO $schema.auths_$batch (marc, auth_id, lccn, cancelled_lccn, heading)
+ VALUES (?, ?, ?, ?, authority.normalize_heading(?))
+ });
+ $dbh->begin_work;
+ while (<>) {
+ $i++;
+ if (0 == $i % 100) {
+ report_progress("Records staged", $i);
+ $dbh->commit;
+ $dbh->begin_work;
+ }
+ eval {
+ my $marc = MARC::Record->new_from_usmarc($_);
+ my $authid = $marc->subfield('901', 'c');
+ if (defined($authid) && $authid !~ /^\d+$/) {
+ undef $authid;
+ }
+ my $lccn = $marc->subfield('010', 'a');
+ if (defined $lccn) {
+ $lccn =~ s/^\s+//;
+ $lccn =~ s/\s+$//;
+ $lccn =~ s/\s+/ /g;
+ }
+ my $cancelled_lccn = $marc->subfield('010', 'z');
+ if (defined $cancelled_lccn) {
+ $cancelled_lccn =~ s/^\s+//;
+ $cancelled_lccn =~ s/\s+$//;
+ $cancelled_lccn =~ s/\s+/ /g;
+ }
+ my $xml = OpenILS::Application::AppUtils->entityize($marc->as_xml_record());
+ $ins->execute($xml, $authid, $lccn, $cancelled_lccn, $xml);
+ };
+ if ($@) {
+ warn("Record $i is bad: $@; skipping.");
+ next;
+ }
+ }
+ $dbh->commit;
+ report_progress("Records staged", $i) if 0 != $i % 100;
+ $dbh->do(qq/
+ CREATE INDEX auths_${batch}_auth_id_idx ON
+ $schema.auths_$batch (auth_id);
+ /);
+ $dbh->do(qq/
+ CREATE INDEX auths_${batch}_id_idx ON
+ $schema.auths_$batch (id);
+ /);
+ $dbh->do(qq/
+ CREATE INDEX auths_${batch}_lccn_idx ON
+ $schema.auths_$batch (lccn);
+ /);
+}
+
+sub handle_match_auths {
+ my ($dbh, $schema, $batch) = @_;
+
+ my $sth = $dbh->prepare(qq{
+ UPDATE $schema.auths_${batch} a
+ SET auth_id = b.record,
+ lccn_matched = TRUE,
+ existing_heading = authority.normalize_heading(c.marc)
+ FROM authority.full_rec b
+ JOIN authority.record_entry c ON (b.record = c.id)
+ WHERE tag = '010'
+ AND subfield = 'a'
+ AND value = lccn
+ AND auth_id IS NULL
+ AND lccn IS NOT NULL;
+ });
+ $sth->execute();
+ my $ct = $sth->rows;
+ report_progress("Matched $ct authorities on LCCN");
+
+ $sth = $dbh->prepare(qq{
+ UPDATE $schema.auths_${batch} a
+ SET cancelled_auth_id = b.record
+ FROM authority.full_rec b
+ WHERE tag = '010'
+ AND subfield = 'a'
+ AND value = cancelled_lccn
+ AND auth_id IS NULL
+ AND cancelled_lccn IS NOT NULL;
+ });
+ $sth->execute();
+ $ct = $sth->rows;
+ report_progress("Matched $ct authorities on cancelled LCCN");
+
+ $sth = $dbh->prepare(qq{
+ UPDATE $schema.auths_$batch a
+ SET auth_id = b.id,
+ heading_matched = TRUE,
+ existing_heading = b.heading
+ FROM authority.record_entry b
+ WHERE a.heading = b.heading
+ AND auth_id IS NULL;
+ });
+ $sth->execute();
+ $ct = $sth->rows;
+ report_progress("Matched $ct authorities on heading");
+}
+
+sub handle_load_new_auths {
+ my $dbh = shift;
+ my $schema = shift;
+ my $batch = shift;
+
+ my $getct = $dbh->prepare(qq{
+ SELECT COUNT(*)
+ FROM $schema.auths_$batch
+ WHERE to_import
+ AND NOT imported
+ AND new_auth_id IS NULL
+ AND auth_id IS NULL
+ AND cancelled_auth_id IS NULL
+ });
+ $getct->execute();
+ my $max = $getct->fetchrow_arrayref()->[0];
+
+ report_progress('Number of authorities to add', $max);
+ for (my $i = 1; $i <= $max; $i++) {
+ report_progress('... authorities added', $i) if 0 == $i % 10 or $i == $max;
+ $dbh->begin_work;
+ $dbh->do(qq{
+ INSERT INTO authority.record_entry (marc, last_xact_id)
+ SELECT marc, ? || '-' || id
+ FROM $schema.auths_$batch b
+ WHERE id IN (
+ SELECT id
+ FROM $schema.auths_$batch
+ WHERE to_import
+ AND NOT imported
+ AND new_auth_id IS NULL
+ AND auth_id IS NULL
+ AND cancelled_auth_id IS NULL
+ ORDER BY id
+ LIMIT 1
+ )
+ }, {}, "auths_$batch");
+ $dbh->do(qq{
+ UPDATE $schema.auths_$batch
+ SET imported = TRUE,
+ new_auth_id = CURRVAL('authority.record_entry_id_seq')
+ WHERE id IN (
+ SELECT id
+ FROM $schema.auths_$batch
+ WHERE to_import
+ AND NOT imported
+ AND new_auth_id IS NULL
+ AND auth_id IS NULL
+ AND cancelled_auth_id IS NULL
+ ORDER BY id
+ LIMIT 1
+ )
+ });
+ $dbh->commit;
+ sleep $wait;
+ }
+}
+
+sub handle_overlay_auths_stage1 {
+ my $dbh = shift;
+ my $schema = shift;
+ my $batch = shift;
+
+ my $getct = $dbh->prepare(qq{
+ SELECT COUNT(*)
+ FROM $schema.auths_$batch
+ WHERE to_import
+ AND NOT imported
+ AND lccn_matched
+ AND heading = existing_heading
+ });
+ $getct->execute();
+ my $max = $getct->fetchrow_arrayref()->[0];
+ report_progress('Number of auths to update', $max);
+
+ $dbh->do(q{
+ UPDATE config.internal_flag SET enabled = TRUE
+ WHERE name = 'ingest.disable_authority_auto_update';
+ });
+ for (my $i = 1; $i <= $max; $i++) {
+ report_progress('... auths updated', $i) if 0 == $i % 10 or $i == $max;
+ $dbh->begin_work;
+ $dbh->do(qq{
+ UPDATE authority.record_entry a
+ SET marc = b.marc,
+ edit_date = NOW()
+ FROM $schema.auths_$batch b
+ WHERE a.id = b.auth_id
+ AND auth_id IN (
+ SELECT auth_id
+ FROM $schema.auths_$batch
+ WHERE to_import
+ AND NOT imported
+ AND lccn_matched
+ AND heading = existing_heading
+ ORDER BY id
+ LIMIT 1
+ )
+ });
+ $dbh->do(qq{
+ UPDATE $schema.auths_$batch
+ SET imported = TRUE
+ WHERE auth_id IN (
+ SELECT auth_id
+ FROM $schema.auths_$batch
+ WHERE to_import
+ AND NOT imported
+ AND lccn_matched
+ AND heading = existing_heading
+ ORDER BY id
+ LIMIT 1
+ )
+ });
+ $dbh->commit;
+ }
+ $dbh->do(q{
+ UPDATE config.internal_flag SET enabled = FALSE
+ WHERE name = 'ingest.disable_authority_auto_update';
+ });
+}
+
+sub handle_overlay_auths_stage2 {
+ my $dbh = shift;
+ my $schema = shift;
+ my $batch = shift;
+
+ my $getct = $dbh->prepare(qq{
+ SELECT COUNT(*)
+ FROM $schema.auths_$batch
+ WHERE to_import
+ AND NOT imported
+ AND lccn_matched
+ AND heading <> existing_heading
+ });
+ $getct->execute();
+ my $max = $getct->fetchrow_arrayref()->[0];
+ report_progress('Number of auths to update', $max);
+
+ for (my $i = 1; $i <= $max; $i++) {
+ report_progress('... auths updated', $i) if 0 == $i % 10 or $i == $max;
+ $dbh->begin_work;
+ $dbh->do(qq{
+ UPDATE authority.record_entry a
+ SET marc = b.marc,
+ edit_date = NOW()
+ FROM $schema.auths_$batch b
+ WHERE a.id = b.auth_id
+ AND auth_id IN (
+ SELECT auth_id
+ FROM $schema.auths_$batch
+ WHERE to_import
+ AND NOT imported
+ AND lccn_matched
+ AND heading <> existing_heading
+ ORDER BY id
+ LIMIT 1
+ )
+ });
+ $dbh->do(qq{
+ UPDATE $schema.auths_$batch
+ SET imported = TRUE
+ WHERE auth_id IN (
+ SELECT auth_id
+ FROM $schema.auths_$batch
+ WHERE to_import
+ AND NOT imported
+ AND lccn_matched
+ AND heading <> existing_heading
+ ORDER BY id
+ LIMIT 1
+ )
+ });
+ $dbh->commit;
+ }
+}
+
+sub handle_overlay_auths_stage3 {
+ my $dbh = shift;
+ my $schema = shift;
+ my $batch = shift;
+
+ my $getct = $dbh->prepare(qq{
+ SELECT COUNT(*)
+ FROM $schema.auths_$batch
+ WHERE to_import
+ AND NOT imported
+ AND (
+ auth_id IS NULL OR
+ auth_id = cancelled_auth_id
+ )
+ AND cancelled_auth_id IS NOT NULL
+ });
+ $getct->execute();
+ my $max = $getct->fetchrow_arrayref()->[0];
+ report_progress('Number of auths to update', $max);
+
+ for (my $i = 1; $i <= $max; $i++) {
+ report_progress('... auths updated', $i) if 0 == $i % 10 or $i == $max;
+ $dbh->begin_work;
+ $dbh->do(qq{
+ UPDATE authority.record_entry a
+ SET marc = b.marc,
+ edit_date = NOW()
+ FROM $schema.auths_$batch b
+ WHERE a.id = b.cancelled_auth_id
+ AND cancelled_auth_id IN (
+ SELECT auth_id
+ FROM $schema.auths_$batch
+ WHERE to_import
+ AND NOT imported
+ AND (
+ auth_id IS NULL OR
+ auth_id = cancelled_auth_id
+ )
+ AND cancelled_auth_id IS NOT NULL
+ ORDER BY id
+ LIMIT 1
+ )
+ });
+ $dbh->do(qq{
+ UPDATE $schema.auths_$batch
+ SET imported = TRUE
+ WHERE cancelled_auth_id IN (
+ SELECT cancelled_auth_id
+ FROM $schema.auths_$batch
+ WHERE to_import
+ AND NOT imported
+ AND (
+ auth_id IS NULL OR
+ auth_id = cancelled_auth_id
+ )
+ AND cancelled_auth_id IS NOT NULL
+ ORDER BY id
+ LIMIT 1
+ )
+ });
+ $dbh->commit;
+ }
+}
+
+sub handle_link_auth_auth {
+ my $dbh = shift;
+ my $schema = shift;
+ my $batch = shift;
+
+ $dbh->do(q{
+ UPDATE config.internal_flag SET enabled = TRUE
+ WHERE name = 'ingest.disable_authority_auto_update';
+ });
+
+ my $sth = $dbh->prepare(qq{
+ SELECT COALESCE(new_auth_id, auth_id, cancelled_auth_id) AS id
+ FROM $schema.auths_$batch
+ WHERE imported
+ ORDER BY 1
+ });
+ $sth->execute();
+ my @ids = map { $_->{id} } @{ $sth->fetchall_arrayref({}) };
+ my $i = 0;
+ report_progress(scalar(@ids) . " records to do auth-auth linking");
+ foreach my $id (@ids) {
+ $i++;
+ report_progress('... auth-auth linkings processed', $i) if 0 == $i % 10 or $i == scalar(@ids);
+ system "/openils/bin/authority_authority_linker.pl -r $id -c /openils/conf/opensrf_core.xml";
+ }
+
+ $dbh->do(q{
+ UPDATE config.internal_flag SET enabled = FALSE
+ WHERE name = 'ingest.disable_authority_auto_update';
+ });
+}
+
+sub handle_link_auth_bib {
+ my $dbh = shift;
+ my $schema = shift;
+ my $batch = shift;
+ my $link_skipped = shift;
+
+ my $query;
+ if ($link_skipped) {
+ $query = qq{
+ SELECT bib_id AS id
+ FROM $schema.$batch
+ WHERE NOT imported
+ AND skip_reason ~ '^edit'
+ ORDER BY 1
+ };
+ } else {
+ $query = qq{
+ SELECT bib_id AS id
+ FROM $schema.$batch
+ WHERE imported
+ ORDER BY 1
+ };
+ }
+
+ my $sth = $dbh->prepare($query);
+ $sth->execute();
+ my @ids = map { $_->{id} } @{ $sth->fetchall_arrayref({}) };
+ my $i = 0;
+ report_progress(scalar(@ids) . " records to do auth-bib linking");
+ foreach my $id (@ids) {
+ $i++;
+ report_progress('... auth-bib linkings processed', $i) if 0 == $i % 10 or $i == scalar(@ids);
+ system "/openils/bin/authority_control_fields.pl --record $id -c /openils/conf/opensrf_core.xml";
+ }
+
+}
+
+sub handle_export_skipped_bibs {
+ my $dbh = shift;
+ my $schema = shift;
+ my $batch = shift;
+ my $output = shift;
+
+ my $outfh;
+ open($outfh, '>', $output) or die("Could not open input file $output: $!\n");
+ binmode $outfh, ':utf8';
+
+ my $sth = $dbh->prepare(qq{
+ SELECT marc
+ FROM $schema.$batch
+ WHERE skip_reason ~ '^edit'
+ ORDER BY id
+ });
+ $sth->execute();
+
+ while (my $row = $sth->fetchrow_hashref()) {
+ my $marc = MARC::Record->new_from_xml($row->{marc});
+ print $outfh $marc->as_usmarc();
+ }
+ $outfh->close();
+}
+
+sub handle_export_skipped_auths {
+ my $dbh = shift;
+ my $schema = shift;
+ my $batch = shift;
+ my $output = shift;
+
+ my $outfh;
+ open($outfh, '>', $output) or die("Could not open input file $output: $!\n");
+ binmode $outfh, ':utf8';
+
+ my $sth = $dbh->prepare(qq{
+ SELECT marc
+ FROM $schema.auths_$batch
+ WHERE NOT imported
+ ORDER BY id
+ });
+ $sth->execute();
+
+ while (my $row = $sth->fetchrow_hashref()) {
+ my $marc = MARC::Record->new_from_xml($row->{marc});
+ print $outfh $marc->as_usmarc();
+ }
+ $outfh->close();
}