SCLENDS bibliographic dedupe routine
[migration-tools.git] / sql / dedupe / sclends_dedupe.sql
1 -- SCLENDS bibliographic dedupe routine
2 --
3 -- Copyright 2010-2011 Equinox Software, Inc.
4 -- Author: Galen Charlton
5 --
6 -- This implements a bibliographic deduplication routine based
7 -- on criteria and an algorithm specified by the South Carolina
8 -- State Library on behalf of the SC LENDS consortium.  This work
9 -- was sponsored by SC LENDS, whose impetus is gratefully
10 -- acknowledged.  Portions of this script were subseqently expanded
11 -- based on the advice of the Indiana State Library on the behalf
12 -- of the Evergreen Indiana project.
13
14 -- schema to store the dedupe routine and intermediate data
15 CREATE SCHEMA m_dedupe;
16
17 CREATE TYPE mig_isbn_match AS (norm_isbn TEXT, norm_title TEXT, qual TEXT, bibid BIGINT);
18
19 -- function to calculate the normalized ISBN and title match keys
20 -- and the bibliographic portion of the quality score.  The normalized
21 -- ISBN key consists of the set of 020$a and 020$z normalized as follows:
22 --  * numeric portion of the ISBN converted to ISBN-13 format
23 --
24 -- The normalized title key is taken FROM the 245$a with the nonfiling
25 -- characters and leading and trailing whitespace removed, ampersands
26 -- converted to ' and ', other punctuation removed, and the text converted
27 -- to lowercase.
28 --
29 -- The quality score is a 19-digit integer computed by concatenating
30 -- counts of various attributes in the MARC records; see the get_quality
31 -- routine for details.
32 --
33 CREATE OR REPLACE FUNCTION m_dedupe.get_isbn_match_key (bib_id BIGINT, marc TEXT) RETURNS SETOF mig_isbn_match AS $func$
34 use strict;
35 use warnings;
36
37 use MARC::Record;
38 use MARC::File::XML (BinaryEncoding => 'utf8');
39 use Business::ISBN;
40
41 binmode(STDERR, ':bytes');
42 binmode(STDOUT, ':utf8');
43 binmode(STDERR, ':utf8');
44
45 my $get_quality = sub {
46     my $marc = shift;
47
48     my $has003 = (scalar($marc->field('003'))) ? '1' : '0';
49
50     return join('', $has003,
51                     count_field($marc, '02.'),
52                     count_field($marc, '24.'),
53                     field_length($marc, '300'),               
54                     field_length($marc, '100'),               
55                     count_field($marc, '010'),
56                     count_field($marc, '50.', '51.', '52.', '53.', '54.', '55.', '56.', '57.', '58.'),
57                     count_field($marc, '6..'),
58                     count_field($marc, '440', '490', '830'),
59                     count_field($marc, '7..'),
60                 );
61 };
62
63 my ($bibid, $xml) = @_;
64
65 $xml =~ s/(<leader>.........)./${1}a/;
66 my $marc;
67 eval {
68     $marc = MARC::Record->new_from_xml($xml);
69 };
70 if ($@) {
71     #elog("could not parse $bibid: $@\n");
72     import MARC::File::XML (BinaryEncoding => 'utf8');
73     return;
74 }
75
76 my @f245 = $marc->field('245');
77 return unless @f245; # must have 245
78 my $norm_title = norm_title($f245[0]);
79 return unless $norm_title ne '';
80
81 my @isbns = $marc->field('020');
82 return unless @isbns; # must have at least 020
83
84 my $qual = $get_quality->($marc);
85
86 my @norm_isbns = norm_isbns(@isbns);
87 foreach my $isbn (@norm_isbns) {
88     return_next({ norm_isbn => $isbn, norm_title => $norm_title, qual => $qual, bibid => $bibid });
89 }
90 return undef;
91
92
93 sub count_field {
94     my ($marc) = shift;
95     my @tags = @_;
96     my $total = 0;
97     foreach my $tag (@tags) {
98         my @f = $marc->field($tag);
99         $total += scalar(@f);
100     }
101     $total = 99 if $total > 99;
102     return sprintf("%-02.2d", $total);
103 }
104
105 sub field_length {
106     my $marc = shift;
107     my $tag = shift;
108
109     my @f = $marc->field($tag);
110     return '00' unless @f;
111     my $len = length($f[0]->as_string);
112     $len = 99 if $len > 99;
113     return sprintf("%-02.2d", $len);
114 }
115
116 sub norm_title {
117     my $f245 = shift;
118     my $sfa = $f245->subfield('a');
119     return '' unless defined $sfa;
120     my $nonf = $f245->indicator(2);
121     $nonf = '0' unless $nonf =~ /^\d$/;
122     if ($nonf == 0) {
123         $sfa =~ s/^a //i;
124         $sfa =~ s/^an //i;
125         $sfa =~ s/^the //i;
126     } else {
127         $sfa = substr($sfa, $nonf);
128     }
129     $sfa =~ s/&/ and /g;
130     $sfa = lc $sfa;
131     $sfa =~ s/\[large print\]//;
132     $sfa =~ s/[[:punct:]]//g;
133     $sfa =~ s/^\s+//;
134     $sfa =~ s/\s+$//;
135     $sfa =~ s/\s+/ /g;
136     return $sfa;
137 }
138
139 sub norm_isbns {
140     my @isbns = @_;
141
142     my %uniq_isbns = ();
143     foreach my $field (@isbns) {
144         my $sfa = $field->subfield('a');
145         my $norm = norm_isbn($sfa);
146         $uniq_isbns{$norm}++ unless $norm eq '';
147         my $sfz = $field->subfield('z');
148         $norm = norm_isbn($sfz);
149         $uniq_isbns{$norm}++ unless $norm eq '';
150     }
151     return sort(keys %uniq_isbns);
152 }
153
154 sub norm_isbn {
155     my $str = shift;
156     my $norm = '';
157     return '' unless defined $str;
158     $str =~ s/-//g;
159     $str =~ s/^\s+//;
160     $str =~ s/\s+$//;
161     $str =~ s/\s+//g;
162     $str = lc $str;
163     my $isbn;
164     if ($str =~ /^(\d{12}[0-9-x])/) {
165         $isbn = $1;
166         $norm = $isbn;
167     } elsif ($str =~ /^(\d{9}[0-9x])/) {
168         $isbn =  Business::ISBN->new($1);
169         my $isbn13 = $isbn->as_isbn13;
170         $norm = lc($isbn13->as_string);
171         $norm =~ s/-//g;
172     }
173     return $norm;
174 }
175 $func$ LANGUAGE PLPERLU;
176
177 -- Specify set of bibs to dedupe.  This version
178 -- simply collects the IDs of all non-deleted bibs,
179 -- but the query could be expanded to exclude bibliographic
180 -- records that should not participate in the deduplication.
181 CREATE TABLE m_dedupe.bibs_to_check AS
182 SELECT id AS bib_id 
183 FROM biblio.record_entry bre
184 WHERE NOT deleted;
185
186 -- staging table for the match keys
187 CREATE TABLE m_dedupe.match_keys (
188   norm_isbn TEXT,
189   norm_title TEXT,
190   qual TEXT,
191   bibid BIGINT
192 );
193
194 -- calculate match keys
195 INSERT INTO m_dedupe.match_keys 
196 SELECT  (a.get_isbn_match_key::mig_isbn_match).norm_isbn,
197         (a.get_isbn_match_key::mig_isbn_match).norm_title,
198         (a.get_isbn_match_key::mig_isbn_match).qual,
199         (a.get_isbn_match_key::mig_isbn_match).bibid                                                                                 
200 FROM (
201     SELECT m_dedupe.get_isbn_match_key(bre.id, bre.marc)
202     FROM biblio.record_entry bre
203     JOIN m_dedupe.bibs_to_check c ON (c.bib_id = bre.id)
204 ) a;
205
206 CREATE INDEX norm_idx on m_dedupe.match_keys(norm_isbn, norm_title);
207 CREATE INDEX qual_idx on m_dedupe.match_keys(qual);
208
209 -- and remove duplicates
210 CREATE TEMPORARY TABLE uniq_match_keys AS 
211 SELECT DISTINCT norm_isbn, norm_title, qual, bibid
212 FROM m_dedupe.match_keys;
213
214 DELETE FROM m_dedupe.match_keys;
215 INSERT INTO m_dedupe.match_keys SELECT * FROM uniq_match_keys;
216
217 -- find highest-quality match keys
218 CREATE TABLE m_dedupe.lead_quals AS
219 SELECT max(qual) as max_qual, norm_isbn, norm_title
220 FROM m_dedupe.match_keys
221 GROUP BY norm_isbn, norm_title
222 HAVING COUNT(*) > 1;
223
224 CREATE INDEX norm_idx2 ON m_dedupe.lead_quals(norm_isbn, norm_title);
225 CREATE INDEX norm_qual_idx2 ON m_dedupe.lead_quals(norm_isbn, norm_title, max_qual);
226
227 -- identify prospective lead bibs
228 CREATE TABLE m_dedupe.prospective_leads AS
229 SELECT bibid, a.norm_isbn, a.norm_title, b.max_qual, count(ac.id) as copy_count
230 FROM m_dedupe.match_keys a
231 JOIN m_dedupe.lead_quals b on (a.qual = b.max_qual and a.norm_isbn = b.norm_isbn and a.norm_title = b.norm_title)
232 JOIN asset.call_number acn on (acn.record = bibid)
233 JOIN asset.copy ac on (ac.call_number = acn.id)
234 WHERE not acn.deleted
235 and not ac.deleted
236 GROUP BY bibid, a.norm_isbn, a.norm_title, b.max_qual;
237
238 -- and use number of copies to break ties
239 CREATE TABLE m_dedupe.best_lead_keys AS
240 SELECT norm_isbn, norm_title, max_qual, max(copy_count) AS copy_count
241 FROM m_dedupe.prospective_leads
242 GROUP BY norm_isbn, norm_title, max_qual;
243
244 CREATE TABLE m_dedupe.best_leads AS
245 SELECT bibid, a.norm_isbn, a.norm_title, a.max_qual, copy_count
246 FROM m_dedupe.best_lead_keys a
247 JOIN m_dedupe.prospective_leads b USING (norm_isbn, norm_title, max_qual, copy_count);
248
249 -- and break any remaining ties using the lowest bib ID as the winner
250 CREATE TABLE m_dedupe.unique_leads AS
251 SELECT MIN(bibid) AS lead_bibid, norm_isbn, norm_title, max_qual
252 FROM m_dedupe.best_leads
253 GROUP BY norm_isbn, norm_title, max_qual;
254
255 -- start computing the merge map
256 CREATE TABLE m_dedupe.merge_map_pre
257 AS SELECT distinct lead_bibid, bibid as sub_bibid 
258 FROM m_dedupe.unique_leads
259 JOIN m_dedupe.match_keys using (norm_isbn, norm_title)
260 WHERE lead_bibid <> bibid;
261
262 -- and resolve transitive maps
263 UPDATE m_dedupe.merge_map_pre a
264 SET lead_bibid = b.lead_bibid
265 FROM m_dedupe.merge_map_pre b
266 WHERE a.lead_bibid = b.sub_bibid;
267
268 UPDATE m_dedupe.merge_map_pre a
269 SET lead_bibid = b.lead_bibid
270 FROM m_dedupe.merge_map_pre b
271 WHERE a.lead_bibid = b.sub_bibid;
272
273 UPDATE m_dedupe.merge_map_pre a
274 SET lead_bibid = b.lead_bibid
275 FROM m_dedupe.merge_map_pre b
276 WHERE a.lead_bibid = b.sub_bibid;
277
278 -- and produce the final merge map
279 CREATE TABLE m_dedupe.merge_map
280 AS SELECT min(lead_bibid) as lead_bibid, sub_bibid
281 FROM m_dedupe.merge_map_pre
282 GROUP BY sub_bibid;
283
284 -- add a unique ID to the merge map so that
285 -- we can do the actual record merging in chunks
286 ALTER TABLE m_dedupe.merge_map ADD COLUMN id serial, ADD COLUMN done BOOLEAN DEFAULT FALSE;
287
288 -- and here's an example of processing a chunk of a 1000
289 -- merges
290 SELECT asset.merge_record_assets(lead_bibid, sub_bibid)
291 FROM m_dedupe.merge_map WHERE id in (
292   SELECT id FROM m_dedupe.merge_map
293   WHERE done = false
294   ORDER BY id
295   LIMIT 1000
296 );
297
298 UPDATE m_dedupe.merge_map set done = true
299 WHERE id in (
300   SELECT id FROM m_dedupe.merge_map
301   WHERE done = false
302   ORDER BY id
303   LIMIT 1000
304 );
305