additional escaping and multiple table options for add_sf9 function
[migration-tools.git] / eg_staged_bib_overlay
index d8bf5c9..a2d776c 100755 (executable)
@@ -34,6 +34,8 @@ my $dbhost;
 my $batch;
 my $cutoff;
 my $wait = 1;
+my $output;
+my $link_skipped;
 
 my $ret = GetOptions(
     'action:s'      => \$action,
@@ -45,6 +47,8 @@ my $ret = GetOptions(
     'batch:s'       => \$batch,
     'cutoff:s'      => \$cutoff,
     'wait:i'        => \$wait,
+    'output:s'      => \$output,
+    'link-skipped'  => \$link_skipped,
 );
 
 abort('must specify --action') unless defined $action;
@@ -55,10 +59,24 @@ abort('must specify --dbhost') unless defined $dbhost;
 abort('must specify --dbpw') unless defined $dbpw;
 abort('must specify --batch') unless defined $batch;
 
-abort('--action must be "stage_bibs" or "filter_bibs" or "load_bibs"') unless
+abort(q{--action must be "stage_bibs", "filter_bibs", "load_bibs", "stage_auths",
+"match_auths", "load_new_auths", "overlay_auths_stage1",
+"overlay_auths_stage2", "overlay_auths_stage3", "link_auth_auth",
+"link_auth_bib", "export_skipped_bibs", or "export_skipped_auths"}) unless
     $action eq 'filter_bibs' or
     $action eq 'stage_bibs' or
-    $action eq 'load_bibs';
+    $action eq 'load_bibs' or
+    $action eq 'stage_auths' or
+    $action eq 'match_auths' or
+    $action eq 'load_new_auths' or
+    $action eq 'overlay_auths_stage1' or
+    $action eq 'overlay_auths_stage2' or
+    $action eq 'overlay_auths_stage3' or
+    $action eq 'link_auth_auth' or
+    $action eq 'link_auth_bib' or
+    $action eq 'export_skipped_bibs' or
+    $action eq 'export_skipped_auths'
+;
 
 my $dbh = connect_db($db, $dbuser, $dbpw, $dbhost);
 
@@ -76,6 +94,45 @@ if ($action eq 'load_bibs' ) {
     handle_load_bibs($dbh, $schema, $batch, $wait);
 }
 
+if ($action eq 'stage_auths') {
+    abort('must specify at least one input file') unless @ARGV;
+    handle_stage_auths($dbh, $schema, $batch);
+}
+
+if ($action eq 'match_auths') {
+    handle_match_auths($dbh, $schema, $batch);
+}
+
+if ($action eq 'load_new_auths') {
+    handle_load_new_auths($dbh, $schema, $batch);
+}
+
+if ($action eq 'overlay_auths_stage1') {
+    handle_overlay_auths_stage1($dbh, $schema, $batch);
+}
+if ($action eq 'overlay_auths_stage2') {
+    handle_overlay_auths_stage2($dbh, $schema, $batch);
+}
+if ($action eq 'overlay_auths_stage3') {
+    handle_overlay_auths_stage3($dbh, $schema, $batch);
+}
+
+if ($action eq 'link_auth_auth') {
+    handle_link_auth_auth($dbh, $schema, $batch);
+}
+if ($action eq 'link_auth_bib') {
+    handle_link_auth_bib($dbh, $schema, $batch, $link_skipped);
+}
+
+if ($action eq 'export_skipped_bibs') {
+    abort('must specify output file') unless defined $output;
+    handle_export_skipped_bibs($dbh, $schema, $batch, $output);
+}
+if ($action eq 'export_skipped_auths') {
+    abort('must specify output file') unless defined $output;
+    handle_export_skipped_auths($dbh, $schema, $batch, $output);
+}
+
 sub abort {
     my $msg = shift;
     print STDERR "$0: $msg", "\n";
@@ -105,12 +162,46 @@ This program has several modes controlled by the --action switch:
                          wait the number of seconds specified by the
                          --wait switch.
 
+  --action stage_auths          - load MARC authorities into staging
+                                  table
+  --action match_auths          - identify matches with authority
+                                  records already present in the
+                                  Evergreen database; matching is
+                                  based on LCCN, cancelled LCCN, and
+                                  main heading.
+  --action load_new_auths       - load new (unmatched) authorities
+  --action overlay_auths_stage1 - overlay based on LCCN where
+                                  heading has NOT changed; this step
+                                  disables propagation to bib records
+  --action overlay_auths_stage2 - overlay based on LCCN where heading
+                                  HAS changed; propagates changes
+                                  to bib records
+  --action overlay_auths_stage3 - overlay for records where a cancelled
+                                  LCCN is replaced with a new one
+  --action link_auth_auth       - run authority_authority_linker.pl for
+                                  the authorities that were overlaid
+                                  or added in this batch.
+  --action link_auth_bib        - run authority_control_fields.pl for
+                                  the bibs that were overlaid in this
+                                  batch.  Add --link-skipped to specify
+                                  that bibs that were matched but
+                                  skipped due to having be edited after
+                                  the cutoff should be linked (rather
+                                  than linking the imported bibs)
+  --action export_skipped_bibs  - export to ISO2709 file whose name is
+                                  specified by --output those bibs
+                                  that had been edited after the cutoff.
+  --action export_skipped_auths - export to ISO2709 file whose name is
+                                  specified by --output those authorities
+                                  that could not be definitively
+                                  handled as updates or adds.
+
 Several switches are used regardless of the specified action:
 
   --schema  - Pg schema in which staging table will live; should be
               created beforehand
   --batch   - name of bib batch; will also be used as the name
-              of the staging table
+              of the staging tables
   --db      - database name
   --dbuser  - database user
   --dbpw    - database password
@@ -128,7 +219,7 @@ $0 --schema bib_load --batch bibs_2016_01 --db evergreen \\
 
 $0 --schema bib_load --batch bibs_2016_01 --db evergreen \\
    --dbuser evergreen --dbpw evergreen --dbhost localhost \\
-   --action load_bibs
+   --action load_bibs --wait 2
 
 _USAGE_
 }
@@ -190,14 +281,19 @@ sub handle_stage_bibs {
             $dbh->commit;
             $dbh->begin_work;
         }
-        my $marc = MARC::Record->new_from_usmarc($_);
-        my $bibid = $marc->subfield('901', 'c');
-        if ($bibid !~ /^\d+$/) {
-            print STDERR "Record $i is suspect; skipping\n";
+        eval {
+            my $marc = MARC::Record->new_from_usmarc($_);
+            my $bibid = $marc->subfield('901', 'c');
+            if ($bibid !~ /^\d+$/) {
+                die('Subfield 901$c is not numeric or missing.');
+            }
+            my $xml = OpenILS::Application::AppUtils->entityize($marc->as_xml_record());
+            $ins->execute($xml, $bibid);
+        };
+        if ($@) {
+            warn("Record $i is bad: $@; skipping.");
             next;
         }
-        my $xml = OpenILS::Application::AppUtils->entityize($marc->as_xml_record());
-        $ins->execute($xml, $bibid);
     }
     $dbh->commit;
     report_progress("Records staged", $i) if 0 != $i % 100;
@@ -226,6 +322,8 @@ sub handle_filter_bibs {
             FROM biblio.record_entry
             WHERE deleted
         )
+        AND to_import
+        AND NOT imported
     });
     $sth1->execute();
     my $ct = $sth1->rows;
@@ -240,11 +338,24 @@ sub handle_filter_bibs {
             FROM biblio.record_entry
             WHERE edit_date >= ?
         )
-        AND to_import;
+        AND to_import
+        AND NOT imported
     });
     $sth2->execute($cutoff);
     $ct = $sth2->rows;
     report_progress("Filtering out $ct records edited after cutoff date of $cutoff");
+
+    my $sth3 = $dbh->prepare(qq{
+        UPDATE $schema.$batch
+        SET to_import = FALSE,
+            skip_reason = 'XML is not well-formed'
+        WHERE NOT xml_is_well_formed(marc)
+        AND to_import
+        AND NOT imported
+    });
+    $sth3->execute();
+    $ct = $sth3->rows;
+    report_progress("Filtering out $ct records whose XML is not well-formed");
 }
 
 sub handle_load_bibs {
@@ -276,7 +387,7 @@ sub handle_load_bibs {
                 FROM $schema.$batch
                 WHERE to_import
                 AND NOT imported
-                ORDER BY id
+                ORDER BY bib_id DESC
                 LIMIT 1
             )
         });
@@ -295,5 +406,495 @@ sub handle_load_bibs {
         $dbh->commit;
         sleep $wait;
     }
-    
+}
+
+sub handle_stage_auths {
+    my $dbh = shift;
+    my $schema = shift;
+    my $batch = shift;
+
+    $dbh->do(qq{
+        DROP TABLE IF EXISTS $schema.auths_$batch;
+    });
+    $dbh->do(qq{
+        CREATE TABLE $schema.auths_$batch (
+            id          SERIAL,
+            marc        TEXT,
+            auth_id     BIGINT,
+            new_auth_id BIGINT,
+            existing_heading TEXT,
+            lccn        TEXT,
+            cancelled_lccn TEXT,
+            cancelled_auth_id BIGINT,
+            heading     TEXT,
+            lccn_matched BOOLEAN DEFAULT FALSE,
+            heading_matched BOOLEAN DEFAULT FALSE,
+            imported    BOOLEAN DEFAULT FALSE,
+            to_import   BOOLEAN DEFAULT TRUE,
+            skip_reason TEXT
+        )
+    });
+
+    local $/ = "\035";
+    my $i = 0;
+    binmode STDIN, ':utf8';
+    my $ins = $dbh->prepare(qq{
+        INSERT INTO $schema.auths_$batch (marc, auth_id, lccn, cancelled_lccn, heading)
+        VALUES (?, ?, ?, ?, authority.normalize_heading(?))
+    });
+    $dbh->begin_work;
+    while (<>) {
+        $i++;
+        if (0 == $i % 100) {
+            report_progress("Records staged", $i);
+            $dbh->commit;
+            $dbh->begin_work;
+        }
+        eval {
+            my $marc = MARC::Record->new_from_usmarc($_);
+            my $authid = $marc->subfield('901', 'c');
+            if (defined($authid) && $authid !~ /^\d+$/) {
+                undef $authid;
+            }
+            my $lccn = $marc->subfield('010', 'a');
+            if (defined $lccn) {
+                $lccn =~ s/^\s+//;
+                $lccn =~ s/\s+$//;
+                $lccn =~ s/\s+/ /g;
+            }
+            my $cancelled_lccn = $marc->subfield('010', 'z');
+            if (defined $cancelled_lccn) {
+                $cancelled_lccn =~ s/^\s+//;
+                $cancelled_lccn =~ s/\s+$//;
+                $cancelled_lccn =~ s/\s+/ /g;
+            }
+            my $xml = OpenILS::Application::AppUtils->entityize($marc->as_xml_record());
+            $ins->execute($xml, $authid, $lccn, $cancelled_lccn, $xml);
+        };
+        if ($@) {
+            warn("Record $i is bad: $@; skipping.");
+            next;
+        }
+    }
+    $dbh->commit;
+    report_progress("Records staged", $i) if 0 != $i % 100;
+    $dbh->do(qq/
+        CREATE INDEX auths_${batch}_auth_id_idx ON
+            $schema.auths_$batch (auth_id);
+    /);
+    $dbh->do(qq/
+        CREATE INDEX auths_${batch}_id_idx ON
+            $schema.auths_$batch (id);
+    /);
+    $dbh->do(qq/
+        CREATE INDEX auths_${batch}_lccn_idx ON
+            $schema.auths_$batch (lccn);
+    /);
+}
+
+sub handle_match_auths {
+    my ($dbh, $schema, $batch) = @_;
+
+    my $sth = $dbh->prepare(qq{
+        UPDATE $schema.auths_${batch} a
+        SET auth_id = b.record,
+            lccn_matched = TRUE,
+            existing_heading = authority.normalize_heading(c.marc)
+        FROM authority.full_rec b
+        JOIN authority.record_entry c ON (b.record = c.id)
+        WHERE tag = '010'
+        AND   subfield = 'a'
+        AND   value = lccn
+        AND   auth_id IS NULL
+        AND   lccn IS NOT NULL;
+    });
+    $sth->execute();
+    my $ct = $sth->rows;
+    report_progress("Matched $ct authorities on LCCN");
+
+    $sth = $dbh->prepare(qq{
+        UPDATE $schema.auths_${batch} a
+        SET cancelled_auth_id = b.record
+        FROM authority.full_rec b
+        WHERE tag = '010'
+        AND   subfield = 'a'
+        AND   value = cancelled_lccn
+        AND   auth_id IS NULL
+        AND   cancelled_lccn IS NOT NULL;
+    });
+    $sth->execute();
+    $ct = $sth->rows;
+    report_progress("Matched $ct authorities on cancelled LCCN");
+
+    $sth = $dbh->prepare(qq{
+        UPDATE $schema.auths_$batch a
+        SET auth_id = b.id,
+            heading_matched = TRUE,
+            existing_heading = b.heading
+        FROM authority.record_entry b
+        WHERE a.heading = b.heading
+        AND   auth_id IS NULL;
+    });
+    $sth->execute();
+    $ct = $sth->rows;
+    report_progress("Matched $ct authorities on heading");
+}
+
+sub handle_load_new_auths {
+    my $dbh = shift;
+    my $schema = shift;
+    my $batch = shift;
+
+    my $getct = $dbh->prepare(qq{
+        SELECT COUNT(*)
+        FROM  $schema.auths_$batch
+        WHERE to_import
+        AND NOT imported
+        AND new_auth_id IS NULL
+        AND auth_id IS NULL
+        AND cancelled_auth_id IS NULL
+    });
+    $getct->execute();
+    my $max = $getct->fetchrow_arrayref()->[0];
+
+    report_progress('Number of authorities to add', $max);
+    for (my $i = 1; $i <= $max; $i++) {
+        report_progress('... authorities added', $i) if 0 == $i % 10 or $i == $max;
+        $dbh->begin_work;
+        $dbh->do(qq{
+            INSERT INTO authority.record_entry (marc, last_xact_id)
+            SELECT marc, ? || '-' || id
+            FROM $schema.auths_$batch b
+            WHERE id IN (
+                SELECT id
+                FROM $schema.auths_$batch
+                WHERE to_import
+                AND NOT imported
+                AND new_auth_id IS NULL
+                AND auth_id IS NULL
+                AND cancelled_auth_id IS NULL
+                ORDER BY id
+                LIMIT 1
+            )
+        }, {}, "auths_$batch");
+        $dbh->do(qq{
+            UPDATE $schema.auths_$batch
+            SET imported = TRUE,
+                new_auth_id = CURRVAL('authority.record_entry_id_seq')
+            WHERE id IN (
+                SELECT id
+                FROM $schema.auths_$batch
+                WHERE to_import
+                AND NOT imported
+                AND new_auth_id IS NULL
+                AND auth_id IS NULL
+                AND cancelled_auth_id IS NULL
+                ORDER BY id
+                LIMIT 1
+            )
+        });
+        $dbh->commit;
+        sleep $wait;
+    }
+}
+
+sub handle_overlay_auths_stage1 {
+    my $dbh = shift;
+    my $schema = shift;
+    my $batch = shift;
+
+    my $getct = $dbh->prepare(qq{
+        SELECT COUNT(*)
+        FROM  $schema.auths_$batch
+        WHERE to_import
+        AND NOT imported
+        AND lccn_matched
+        AND heading = existing_heading
+    });
+    $getct->execute();
+    my $max = $getct->fetchrow_arrayref()->[0];
+    report_progress('Number of auths to update', $max);
+
+    $dbh->do(q{
+        UPDATE config.internal_flag SET enabled = TRUE
+        WHERE name = 'ingest.disable_authority_auto_update';
+    });
+    for (my $i = 1; $i <= $max; $i++) {
+        report_progress('... auths updated', $i) if 0 == $i % 10 or $i == $max;
+        $dbh->begin_work;
+        $dbh->do(qq{
+            UPDATE authority.record_entry a
+            SET marc = b.marc,
+                edit_date = NOW()
+            FROM $schema.auths_$batch b
+            WHERE a.id = b.auth_id
+            AND auth_id IN (
+                SELECT auth_id
+                FROM $schema.auths_$batch
+                WHERE to_import
+                AND NOT imported
+                AND lccn_matched
+                AND heading = existing_heading
+                ORDER BY id
+                LIMIT 1
+            )
+        });
+        $dbh->do(qq{
+            UPDATE $schema.auths_$batch
+            SET imported = TRUE
+            WHERE auth_id IN (
+                SELECT auth_id
+                FROM $schema.auths_$batch
+                WHERE to_import
+                AND NOT imported
+                AND lccn_matched
+                AND heading = existing_heading
+                ORDER BY id
+                LIMIT 1
+            )
+        });
+        $dbh->commit;
+    }
+    $dbh->do(q{
+        UPDATE config.internal_flag SET enabled = FALSE
+        WHERE name = 'ingest.disable_authority_auto_update';
+    });
+}
+
+sub handle_overlay_auths_stage2 {
+    my $dbh = shift;
+    my $schema = shift;
+    my $batch = shift;
+
+    my $getct = $dbh->prepare(qq{
+        SELECT COUNT(*)
+        FROM  $schema.auths_$batch
+        WHERE to_import
+        AND NOT imported
+        AND lccn_matched
+        AND heading <> existing_heading
+    });
+    $getct->execute();
+    my $max = $getct->fetchrow_arrayref()->[0];
+    report_progress('Number of auths to update', $max);
+
+    for (my $i = 1; $i <= $max; $i++) {
+        report_progress('... auths updated', $i) if 0 == $i % 10 or $i == $max;
+        $dbh->begin_work;
+        $dbh->do(qq{
+            UPDATE authority.record_entry a
+            SET marc = b.marc,
+                edit_date = NOW()
+            FROM $schema.auths_$batch b
+            WHERE a.id = b.auth_id
+            AND auth_id IN (
+                SELECT auth_id
+                FROM $schema.auths_$batch
+                WHERE to_import
+                AND NOT imported
+                AND lccn_matched
+                AND heading <> existing_heading
+                ORDER BY id
+                LIMIT 1
+            )
+        });
+        $dbh->do(qq{
+            UPDATE $schema.auths_$batch
+            SET imported = TRUE
+            WHERE auth_id IN (
+                SELECT auth_id
+                FROM $schema.auths_$batch
+                WHERE to_import
+                AND NOT imported
+                AND lccn_matched
+                AND heading <> existing_heading
+                ORDER BY id
+                LIMIT 1
+            )
+        });
+        $dbh->commit;
+    }
+}
+
+sub handle_overlay_auths_stage3 {
+    my $dbh = shift;
+    my $schema = shift;
+    my $batch = shift;
+
+    my $getct = $dbh->prepare(qq{
+        SELECT COUNT(*)
+        FROM  $schema.auths_$batch
+        WHERE to_import
+        AND NOT imported
+        AND (
+            auth_id IS NULL OR
+            auth_id = cancelled_auth_id
+        )
+        AND cancelled_auth_id IS NOT NULL
+    });
+    $getct->execute();
+    my $max = $getct->fetchrow_arrayref()->[0];
+    report_progress('Number of auths to update', $max);
+
+    for (my $i = 1; $i <= $max; $i++) {
+        report_progress('... auths updated', $i) if 0 == $i % 10 or $i == $max;
+        $dbh->begin_work;
+        $dbh->do(qq{
+            UPDATE authority.record_entry a
+            SET marc = b.marc,
+                edit_date = NOW()
+            FROM $schema.auths_$batch b
+            WHERE a.id = b.cancelled_auth_id
+            AND cancelled_auth_id IN (
+                SELECT auth_id
+                FROM $schema.auths_$batch
+                WHERE to_import
+                AND NOT imported
+                AND (
+                    auth_id IS NULL OR
+                    auth_id = cancelled_auth_id
+                )
+                AND cancelled_auth_id IS NOT NULL
+                ORDER BY id
+                LIMIT 1
+            )
+        });
+        $dbh->do(qq{
+            UPDATE $schema.auths_$batch
+            SET imported = TRUE
+            WHERE cancelled_auth_id IN (
+                SELECT cancelled_auth_id
+                FROM $schema.auths_$batch
+                WHERE to_import
+                AND NOT imported
+                AND (
+                    auth_id IS NULL OR
+                    auth_id = cancelled_auth_id
+                )
+                AND cancelled_auth_id IS NOT NULL
+                ORDER BY id
+                LIMIT 1
+            )
+        });
+        $dbh->commit;
+    }
+}
+
+sub handle_link_auth_auth {
+    my $dbh = shift;
+    my $schema = shift;
+    my $batch = shift;
+
+    $dbh->do(q{
+        UPDATE config.internal_flag SET enabled = TRUE
+        WHERE name = 'ingest.disable_authority_auto_update';
+    });
+
+    my $sth = $dbh->prepare(qq{
+        SELECT COALESCE(new_auth_id, auth_id, cancelled_auth_id) AS id
+        FROM $schema.auths_$batch
+        WHERE imported
+        ORDER BY 1
+    });
+    $sth->execute();
+    my @ids = map { $_->{id} } @{ $sth->fetchall_arrayref({}) };
+    my $i = 0;
+    report_progress(scalar(@ids) . " records to do auth-auth linking");
+    foreach my $id (@ids) {
+        $i++;
+        report_progress('... auth-auth linkings processed', $i) if 0 == $i % 10 or $i == scalar(@ids);
+        system "/openils/bin/authority_authority_linker.pl -r $id -c /openils/conf/opensrf_core.xml";
+    }
+
+    $dbh->do(q{
+        UPDATE config.internal_flag SET enabled = FALSE
+        WHERE name = 'ingest.disable_authority_auto_update';
+    });
+}
+
+sub handle_link_auth_bib {
+    my $dbh = shift;
+    my $schema = shift;
+    my $batch = shift;
+    my $link_skipped = shift;
+
+    my $query;
+    if ($link_skipped) {
+        $query = qq{
+            SELECT bib_id AS id
+            FROM $schema.$batch
+            WHERE NOT imported
+            AND skip_reason ~ '^edit'
+            ORDER BY 1
+        };
+    } else {
+        $query = qq{
+            SELECT bib_id AS id
+            FROM $schema.$batch
+            WHERE imported
+            ORDER BY 1
+        };
+    }
+
+    my $sth = $dbh->prepare($query);
+    $sth->execute();
+    my @ids = map { $_->{id} } @{ $sth->fetchall_arrayref({}) };
+    my $i = 0;
+    report_progress(scalar(@ids) . " records to do auth-bib linking");
+    foreach my $id (@ids) {
+        $i++;
+        report_progress('... auth-bib linkings processed', $i) if 0 == $i % 10 or $i == scalar(@ids);
+        system "/openils/bin/authority_control_fields.pl --record $id -c /openils/conf/opensrf_core.xml";
+    }
+
+}
+
+sub handle_export_skipped_bibs {
+    my $dbh = shift;
+    my $schema = shift;
+    my $batch = shift;
+    my $output = shift;
+
+    my $outfh;
+    open($outfh, '>', $output) or die("Could not open input file $output: $!\n");
+    binmode $outfh, ':utf8';
+
+    my $sth = $dbh->prepare(qq{
+        SELECT marc
+        FROM $schema.$batch
+        WHERE skip_reason ~ '^edit'
+        ORDER BY id
+    });
+    $sth->execute();
+   
+    while (my $row  = $sth->fetchrow_hashref()) {
+        my $marc = MARC::Record->new_from_xml($row->{marc});
+        print $outfh $marc->as_usmarc();
+    }
+    $outfh->close();
+}
+
+sub handle_export_skipped_auths {
+    my $dbh = shift;
+    my $schema = shift;
+    my $batch = shift;
+    my $output = shift;
+
+    my $outfh;
+    open($outfh, '>', $output) or die("Could not open input file $output: $!\n");
+    binmode $outfh, ':utf8';
+
+    my $sth = $dbh->prepare(qq{
+        SELECT marc
+        FROM $schema.auths_$batch
+        WHERE NOT imported
+        ORDER BY id
+    });
+    $sth->execute();
+   
+    while (my $row  = $sth->fetchrow_hashref()) {
+        my $marc = MARC::Record->new_from_xml($row->{marc});
+        print $outfh $marc->as_usmarc();
+    }
+    $outfh->close();
 }