removed moving call numbers where items and call numbers were on incompatible orgs...
[migration-tools.git] / sql / dedupe / sclends_dedupe.sql
1 -- SCLENDS bibliographic dedupe routine
2 --
3 -- Copyright 2010-2011 Equinox Software, Inc.
4 -- Author: Galen Charlton <gmc@esilibrary.com>
5 -- 
6 -- This program is free software; you can redistribute it and/or modify
7 -- it under the terms of the GNU General Public License as published by
8 -- the Free Software Foundation; either version 2, or (at your option)
9 -- any later version.
10 -- 
11 -- This program is distributed in the hope that it will be useful,
12 -- but WITHOUT ANY WARRANTY; without even the implied warranty of
13 -- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 -- GNU General Public License for more details.
15 -- 
16 -- You should have received a copy of the GNU General Public License
17 -- along with this program.  If not, see <http://www.gnu.org/licenses/>.
18 --
19 --
20 -- This implements a bibliographic deduplication routine based
21 -- on criteria and an algorithm specified by the South Carolina
22 -- State Library on behalf of the SC LENDS consortium.  This work
23 -- was sponsored by SC LENDS, whose impetus is gratefully
24 -- acknowledged.  Portions of this script were subseqently expanded
25 -- based on the advice of the Indiana State Library on the behalf
26 -- of the Evergreen Indiana project.
27
28 -- schema to store the dedupe routine and intermediate data
29 CREATE SCHEMA m_dedupe;
30
31 CREATE TYPE mig_isbn_match AS (norm_isbn TEXT, norm_title TEXT, qual TEXT, bibid BIGINT);
32
33 -- function to calculate the normalized ISBN and title match keys
34 -- and the bibliographic portion of the quality score.  The normalized
35 -- ISBN key consists of the set of 020$a and 020$z normalized as follows:
36 --  * numeric portion of the ISBN converted to ISBN-13 format
37 --
38 -- The normalized title key is taken FROM the 245$a with the nonfiling
39 -- characters and leading and trailing whitespace removed, ampersands
40 -- converted to ' and ', other punctuation removed, and the text converted
41 -- to lowercase.
42 --
43 -- The quality score is a 19-digit integer computed by concatenating
44 -- counts of various attributes in the MARC records; see the get_quality
45 -- routine for details.
46 --
47 CREATE OR REPLACE FUNCTION m_dedupe.get_isbn_match_key (bib_id BIGINT, marc TEXT) RETURNS SETOF mig_isbn_match AS $func$
48 use strict;
49 use warnings;
50
51 use MARC::Record;
52 use MARC::File::XML (BinaryEncoding => 'utf8');
53 use Business::ISBN;
54
55 binmode(STDERR, ':bytes');
56 binmode(STDOUT, ':utf8');
57 binmode(STDERR, ':utf8');
58
59 my $get_quality = sub {
60     my $marc = shift;
61
62     my $has003 = (scalar($marc->field('003'))) ? '1' : '0';
63
64     return join('', $has003,
65                     count_field($marc, '02.'),
66                     count_field($marc, '24.'),
67                     field_length($marc, '300'),               
68                     field_length($marc, '100'),               
69                     count_field($marc, '010'),
70                     count_field($marc, '50.', '51.', '52.', '53.', '54.', '55.', '56.', '57.', '58.'),
71                     count_field($marc, '6..'),
72                     count_field($marc, '440', '490', '830'),
73                     count_field($marc, '7..'),
74                 );
75 };
76
77 my ($bibid, $xml) = @_;
78
79 $xml =~ s/(<leader>.........)./${1}a/;
80 my $marc;
81 eval {
82     $marc = MARC::Record->new_from_xml($xml);
83 };
84 if ($@) {
85     #elog("could not parse $bibid: $@\n");
86     import MARC::File::XML (BinaryEncoding => 'utf8');
87     return;
88 }
89
90 my @f245 = $marc->field('245');
91 return unless @f245; # must have 245
92 my $norm_title = norm_title($f245[0]);
93 return unless $norm_title ne '';
94
95 my @isbns = $marc->field('020');
96 return unless @isbns; # must have at least 020
97
98 my $qual = $get_quality->($marc);
99
100 my @norm_isbns = norm_isbns(@isbns);
101 foreach my $isbn (@norm_isbns) {
102     return_next({ norm_isbn => $isbn, norm_title => $norm_title, qual => $qual, bibid => $bibid });
103 }
104 return undef;
105
106
107 sub count_field {
108     my ($marc) = shift;
109     my @tags = @_;
110     my $total = 0;
111     foreach my $tag (@tags) {
112         my @f = $marc->field($tag);
113         $total += scalar(@f);
114     }
115     $total = 99 if $total > 99;
116     return sprintf("%-02.2d", $total);
117 }
118
119 sub field_length {
120     my $marc = shift;
121     my $tag = shift;
122
123     my @f = $marc->field($tag);
124     return '00' unless @f;
125     my $len = length($f[0]->as_string);
126     $len = 99 if $len > 99;
127     return sprintf("%-02.2d", $len);
128 }
129
130 sub norm_title {
131     my $f245 = shift;
132     my $sfa = $f245->subfield('a');
133     return '' unless defined $sfa;
134     my $nonf = $f245->indicator(2);
135     $nonf = '0' unless $nonf =~ /^\d$/;
136     if ($nonf == 0) {
137         $sfa =~ s/^a //i;
138         $sfa =~ s/^an //i;
139         $sfa =~ s/^the //i;
140     } else {
141         $sfa = substr($sfa, $nonf);
142     }
143     $sfa =~ s/&/ and /g;
144     $sfa = lc $sfa;
145     $sfa =~ s/\[large print\]//;
146     $sfa =~ s/[[:punct:]]//g;
147     $sfa =~ s/^\s+//;
148     $sfa =~ s/\s+$//;
149     $sfa =~ s/\s+/ /g;
150     return $sfa;
151 }
152
153 sub norm_isbns {
154     my @isbns = @_;
155
156     my %uniq_isbns = ();
157     foreach my $field (@isbns) {
158         my $sfa = $field->subfield('a');
159         my $norm = norm_isbn($sfa);
160         $uniq_isbns{$norm}++ unless $norm eq '';
161         my $sfz = $field->subfield('z');
162         $norm = norm_isbn($sfz);
163         $uniq_isbns{$norm}++ unless $norm eq '';
164     }
165     return sort(keys %uniq_isbns);
166 }
167
168 sub norm_isbn {
169     my $str = shift;
170     my $norm = '';
171     return '' unless defined $str;
172     $str =~ s/-//g;
173     $str =~ s/^\s+//;
174     $str =~ s/\s+$//;
175     $str =~ s/\s+//g;
176     $str = lc $str;
177     my $isbn;
178     if ($str =~ /^(\d{12}[0-9-x])/) {
179         $isbn = $1;
180         $norm = $isbn;
181     } elsif ($str =~ /^(\d{9}[0-9x])/) {
182         $isbn =  Business::ISBN->new($1);
183         my $isbn13 = $isbn->as_isbn13;
184         $norm = lc($isbn13->as_string);
185         $norm =~ s/-//g;
186     }
187     return $norm;
188 }
189 $func$ LANGUAGE PLPERLU;
190
191 -- Specify set of bibs to dedupe.  This version
192 -- simply collects the IDs of all non-deleted bibs,
193 -- but the query could be expanded to exclude bibliographic
194 -- records that should not participate in the deduplication.
195 CREATE TABLE m_dedupe.bibs_to_check AS
196 SELECT id AS bib_id 
197 FROM biblio.record_entry bre
198 WHERE NOT deleted;
199
200 -- staging table for the match keys
201 CREATE TABLE m_dedupe.match_keys (
202   norm_isbn TEXT,
203   norm_title TEXT,
204   qual TEXT,
205   bibid BIGINT
206 );
207
208 -- calculate match keys
209 INSERT INTO m_dedupe.match_keys 
210 SELECT  (a.get_isbn_match_key::mig_isbn_match).norm_isbn,
211         (a.get_isbn_match_key::mig_isbn_match).norm_title,
212         (a.get_isbn_match_key::mig_isbn_match).qual,
213         (a.get_isbn_match_key::mig_isbn_match).bibid                                                                                 
214 FROM (
215     SELECT m_dedupe.get_isbn_match_key(bre.id, bre.marc)
216     FROM biblio.record_entry bre
217     JOIN m_dedupe.bibs_to_check c ON (c.bib_id = bre.id)
218 ) a;
219
220 CREATE INDEX norm_idx on m_dedupe.match_keys(norm_isbn, norm_title);
221 CREATE INDEX qual_idx on m_dedupe.match_keys(qual);
222
223 -- and remove duplicates
224 CREATE TEMPORARY TABLE uniq_match_keys AS 
225 SELECT DISTINCT norm_isbn, norm_title, qual, bibid
226 FROM m_dedupe.match_keys;
227
228 DELETE FROM m_dedupe.match_keys;
229 INSERT INTO m_dedupe.match_keys SELECT * FROM uniq_match_keys;
230
231 -- find highest-quality match keys
232 CREATE TABLE m_dedupe.lead_quals AS
233 SELECT max(qual) as max_qual, norm_isbn, norm_title
234 FROM m_dedupe.match_keys
235 GROUP BY norm_isbn, norm_title
236 HAVING COUNT(*) > 1;
237
238 CREATE INDEX norm_idx2 ON m_dedupe.lead_quals(norm_isbn, norm_title);
239 CREATE INDEX norm_qual_idx2 ON m_dedupe.lead_quals(norm_isbn, norm_title, max_qual);
240
241 -- identify prospective lead bibs
242 CREATE TABLE m_dedupe.prospective_leads AS
243 SELECT bibid, a.norm_isbn, a.norm_title, b.max_qual, count(ac.id) as copy_count
244 FROM m_dedupe.match_keys a
245 JOIN m_dedupe.lead_quals b on (a.qual = b.max_qual and a.norm_isbn = b.norm_isbn and a.norm_title = b.norm_title)
246 JOIN asset.call_number acn on (acn.record = bibid)
247 JOIN asset.copy ac on (ac.call_number = acn.id)
248 WHERE not acn.deleted
249 and not ac.deleted
250 GROUP BY bibid, a.norm_isbn, a.norm_title, b.max_qual;
251
252 -- and use number of copies to break ties
253 CREATE TABLE m_dedupe.best_lead_keys AS
254 SELECT norm_isbn, norm_title, max_qual, max(copy_count) AS copy_count
255 FROM m_dedupe.prospective_leads
256 GROUP BY norm_isbn, norm_title, max_qual;
257
258 CREATE TABLE m_dedupe.best_leads AS
259 SELECT bibid, a.norm_isbn, a.norm_title, a.max_qual, copy_count
260 FROM m_dedupe.best_lead_keys a
261 JOIN m_dedupe.prospective_leads b USING (norm_isbn, norm_title, max_qual, copy_count);
262
263 -- and break any remaining ties using the lowest bib ID as the winner
264 CREATE TABLE m_dedupe.unique_leads AS
265 SELECT MIN(bibid) AS lead_bibid, norm_isbn, norm_title, max_qual
266 FROM m_dedupe.best_leads
267 GROUP BY norm_isbn, norm_title, max_qual;
268
269 -- start computing the merge map
270 CREATE TABLE m_dedupe.merge_map_pre
271 AS SELECT distinct lead_bibid, bibid as sub_bibid 
272 FROM m_dedupe.unique_leads
273 JOIN m_dedupe.match_keys using (norm_isbn, norm_title)
274 WHERE lead_bibid <> bibid;
275
276 -- and resolve transitive maps
277 UPDATE m_dedupe.merge_map_pre a
278 SET lead_bibid = b.lead_bibid
279 FROM m_dedupe.merge_map_pre b
280 WHERE a.lead_bibid = b.sub_bibid;
281
282 UPDATE m_dedupe.merge_map_pre a
283 SET lead_bibid = b.lead_bibid
284 FROM m_dedupe.merge_map_pre b
285 WHERE a.lead_bibid = b.sub_bibid;
286
287 UPDATE m_dedupe.merge_map_pre a
288 SET lead_bibid = b.lead_bibid
289 FROM m_dedupe.merge_map_pre b
290 WHERE a.lead_bibid = b.sub_bibid;
291
292 -- and produce the final merge map
293 CREATE TABLE m_dedupe.merge_map
294 AS SELECT min(lead_bibid) as lead_bibid, sub_bibid
295 FROM m_dedupe.merge_map_pre
296 GROUP BY sub_bibid;
297
298 -- add a unique ID to the merge map so that
299 -- we can do the actual record merging in chunks
300 ALTER TABLE m_dedupe.merge_map ADD COLUMN id serial, ADD COLUMN done BOOLEAN DEFAULT FALSE;
301
302 -- and here's an example of processing a chunk of a 1000
303 -- merges
304 SELECT asset.merge_record_assets(lead_bibid, sub_bibid)
305 FROM m_dedupe.merge_map WHERE id in (
306   SELECT id FROM m_dedupe.merge_map
307   WHERE done = false
308   ORDER BY id
309   LIMIT 1000
310 );
311
312 UPDATE m_dedupe.merge_map set done = true
313 WHERE id in (
314   SELECT id FROM m_dedupe.merge_map
315   WHERE done = false
316   ORDER BY id
317   LIMIT 1000
318 );
319