LP#1931737: DYM can cause deadlocks w/ parallel ingest
authorMike Rylander <mrylander@gmail.com>
Fri, 4 Mar 2022 15:38:12 +0000 (10:38 -0500)
committerMike Rylander <mrylander@gmail.com>
Tue, 25 Oct 2022 15:41:27 +0000 (11:41 -0400)
This patch causes all symspell dictionary updates to occur at then end
of metabib search field updates in one go, which allows Postgres' INSERT
... ON CONFLICT mechanism to properly lock and serialize changes when
necessary.

Signed-off-by: Mike Rylander <mrylander@gmail.com>
Signed-off-by: Jane Sandberg <js7389@princeton.edu>
Signed-off-by: Jason Stephenson <jason@sigio.com>
Signed-off-by: blake <blake@mobiusconsortium.org>

Open-ILS/src/sql/Pg/030.schema.metabib.sql
Open-ILS/src/sql/Pg/300.schema.staged_search.sql
Open-ILS/src/sql/Pg/upgrade/WWWW.schema.dym_update_and_reify.sql [new file with mode: 0644]

index 58163fa..ac35fc7 100644 (file)
@@ -1168,6 +1168,7 @@ BEGIN
 
     IF NOT b_skip_search THEN
         PERFORM metabib.update_combined_index_vectors(bib_id);
+        PERFORM search.symspell_dictionary_reify(); -- NOTE: we only use search data for symspell today
     END IF;
 
     RETURN;
index b3b41f9..868ce5f 100644 (file)
@@ -1140,6 +1140,72 @@ CREATE TABLE search.symspell_dictionary (
     identifier_suggestions  TEXT[]
 ) WITH (fillfactor = 80);
 
+-- INSERT-only table that catches updates to be reconciled
+CREATE UNLOGGED TABLE search.symspell_dictionary_updates (
+    transaction_id          BIGINT,
+    keyword_count           INT     NOT NULL DEFAULT 0,
+    title_count             INT     NOT NULL DEFAULT 0,
+    author_count            INT     NOT NULL DEFAULT 0,
+    subject_count           INT     NOT NULL DEFAULT 0,
+    series_count            INT     NOT NULL DEFAULT 0,
+    identifier_count        INT     NOT NULL DEFAULT 0,
+
+    prefix_key              TEXT    NOT NULL,
+
+    keyword_suggestions     TEXT[],
+    title_suggestions       TEXT[],
+    author_suggestions      TEXT[],
+    subject_suggestions     TEXT[],
+    series_suggestions      TEXT[],
+    identifier_suggestions  TEXT[]
+);
+CREATE INDEX symspell_dictionary_updates_tid_idx ON search.symspell_dictionary_updates (transaction_id);
+
+CREATE OR REPLACE FUNCTION search.symspell_dictionary_reify () RETURNS SETOF search.symspell_dictionary AS $f$
+ WITH new_rows AS (
+    DELETE FROM search.symspell_dictionary_updates WHERE transaction_id = txid_current() RETURNING *
+ ), computed_rows AS ( -- this collapses the rows deleted into the format we need for UPSERT
+    SELECT  SUM(keyword_count)    AS keyword_count,
+            SUM(title_count)      AS title_count,
+            SUM(author_count)     AS author_count,
+            SUM(subject_count)    AS subject_count,
+            SUM(series_count)     AS series_count,
+            SUM(identifier_count) AS identifier_count,
+
+            prefix_key,
+
+            ARRAY_REMOVE(ARRAY_AGG(DISTINCT keyword_suggestions[1]), NULL)    AS keyword_suggestions,
+            ARRAY_REMOVE(ARRAY_AGG(DISTINCT title_suggestions[1]), NULL)      AS title_suggestions,
+            ARRAY_REMOVE(ARRAY_AGG(DISTINCT author_suggestions[1]), NULL)     AS author_suggestions,
+            ARRAY_REMOVE(ARRAY_AGG(DISTINCT subject_suggestions[1]), NULL)    AS subject_suggestions,
+            ARRAY_REMOVE(ARRAY_AGG(DISTINCT series_suggestions[1]), NULL)     AS series_suggestions,
+            ARRAY_REMOVE(ARRAY_AGG(DISTINCT identifier_suggestions[1]), NULL) AS identifier_suggestions
+      FROM  new_rows
+      GROUP BY prefix_key
+ )
+ INSERT INTO search.symspell_dictionary AS d SELECT * FROM computed_rows
+ ON CONFLICT (prefix_key) DO UPDATE SET
+    keyword_count = GREATEST(0, d.keyword_count + EXCLUDED.keyword_count),
+    keyword_suggestions = evergreen.text_array_merge_unique(EXCLUDED.keyword_suggestions,d.keyword_suggestions),
+
+    title_count = GREATEST(0, d.title_count + EXCLUDED.title_count),
+    title_suggestions = evergreen.text_array_merge_unique(EXCLUDED.title_suggestions,d.title_suggestions),
+
+    author_count = GREATEST(0, d.author_count + EXCLUDED.author_count),
+    author_suggestions = evergreen.text_array_merge_unique(EXCLUDED.author_suggestions,d.author_suggestions),
+
+    subject_count = GREATEST(0, d.subject_count + EXCLUDED.subject_count),
+    subject_suggestions = evergreen.text_array_merge_unique(EXCLUDED.subject_suggestions,d.subject_suggestions),
+
+    series_count = GREATEST(0, d.series_count + EXCLUDED.series_count),
+    series_suggestions = evergreen.text_array_merge_unique(EXCLUDED.series_suggestions,d.series_suggestions),
+
+    identifier_count = GREATEST(0, d.identifier_count + EXCLUDED.identifier_count),
+    identifier_suggestions = evergreen.text_array_merge_unique(EXCLUDED.identifier_suggestions,d.identifier_suggestions)
+ RETURNING *;
+$f$ LANGUAGE SQL;
+
+
 CREATE OR REPLACE FUNCTION search.symspell_parse_words ( phrase TEXT )
 RETURNS SETOF TEXT AS $F$
     SELECT UNNEST(x) FROM regexp_matches($1, '([[:alnum:]]+''*[[:alnum:]]*)', 'g') x;
@@ -1646,107 +1712,6 @@ BEGIN
 END;
 $F$ LANGUAGE PLPGSQL;
 
-CREATE OR REPLACE FUNCTION search.symspell_build_and_merge_entries (
-    full_input      TEXT,
-    source_class    TEXT,
-    old_input       TEXT DEFAULT NULL,
-    include_phrases BOOL DEFAULT FALSE
-) RETURNS SETOF search.symspell_dictionary AS $F$
-DECLARE
-    new_entry       RECORD;
-    conflict_entry  RECORD;
-BEGIN
-
-    IF full_input = old_input THEN -- neither NULL, and are the same
-        RETURN;
-    END IF;
-
-    FOR new_entry IN EXECUTE $q$
-        SELECT  count,
-                prefix_key,
-                s AS suggestions
-          FROM  (SELECT prefix_key,
-                        ARRAY_AGG(DISTINCT $q$ || source_class || $q$_suggestions[1]) s,
-                        SUM($q$ || source_class || $q$_count) count
-                  FROM  search.symspell_build_entries($1, $2, $3, $4)
-                  GROUP BY 1) x
-        $q$ USING full_input, source_class, old_input, include_phrases
-    LOOP
-        EXECUTE $q$
-            SELECT  prefix_key,
-                    $q$ || source_class || $q$_suggestions suggestions,
-                    $q$ || source_class || $q$_count count
-              FROM  search.symspell_dictionary
-              WHERE prefix_key = $1 $q$
-            INTO conflict_entry
-            USING new_entry.prefix_key;
-
-        IF new_entry.count <> 0 THEN -- Real word, and count changed
-            IF conflict_entry.prefix_key IS NOT NULL THEN -- we'll be updating
-                IF conflict_entry.count > 0 THEN -- it's a real word
-                    RETURN QUERY EXECUTE $q$
-                        UPDATE  search.symspell_dictionary
-                           SET  $q$ || source_class || $q$_count = $2
-                          WHERE prefix_key = $1
-                          RETURNING * $q$
-                        USING new_entry.prefix_key, GREATEST(0, new_entry.count + conflict_entry.count);
-                ELSE -- it was a prefix key or delete-emptied word before
-                    IF conflict_entry.suggestions @> new_entry.suggestions THEN -- already have all suggestions here...
-                        RETURN QUERY EXECUTE $q$
-                            UPDATE  search.symspell_dictionary
-                               SET  $q$ || source_class || $q$_count = $2
-                              WHERE prefix_key = $1
-                              RETURNING * $q$
-                            USING new_entry.prefix_key, GREATEST(0, new_entry.count);
-                    ELSE -- new suggestion!
-                        RETURN QUERY EXECUTE $q$
-                            UPDATE  search.symspell_dictionary
-                               SET  $q$ || source_class || $q$_count = $2,
-                                    $q$ || source_class || $q$_suggestions = $3
-                              WHERE prefix_key = $1
-                              RETURNING * $q$
-                            USING new_entry.prefix_key, GREATEST(0, new_entry.count), evergreen.text_array_merge_unique(conflict_entry.suggestions,new_entry.suggestions);
-                    END IF;
-                END IF;
-            ELSE
-                -- We keep the on-conflict clause just in case...
-                RETURN QUERY EXECUTE $q$
-                    INSERT INTO search.symspell_dictionary AS d (
-                        $q$ || source_class || $q$_count,
-                        prefix_key,
-                        $q$ || source_class || $q$_suggestions
-                    ) VALUES ( $1, $2, $3 ) ON CONFLICT (prefix_key) DO
-                        UPDATE SET  $q$ || source_class || $q$_count = d.$q$ || source_class || $q$_count + EXCLUDED.$q$ || source_class || $q$_count,
-                                    $q$ || source_class || $q$_suggestions = evergreen.text_array_merge_unique(d.$q$ || source_class || $q$_suggestions, EXCLUDED.$q$ || source_class || $q$_suggestions)
-                        RETURNING * $q$
-                    USING new_entry.count, new_entry.prefix_key, new_entry.suggestions;
-            END IF;
-        ELSE -- key only, or no change
-            IF conflict_entry.prefix_key IS NOT NULL THEN -- we'll be updating
-                IF NOT conflict_entry.suggestions @> new_entry.suggestions THEN -- There are new suggestions
-                    RETURN QUERY EXECUTE $q$
-                        UPDATE  search.symspell_dictionary
-                           SET  $q$ || source_class || $q$_suggestions = $2
-                          WHERE prefix_key = $1
-                          RETURNING * $q$
-                        USING new_entry.prefix_key, evergreen.text_array_merge_unique(conflict_entry.suggestions,new_entry.suggestions);
-                END IF;
-            ELSE
-                RETURN QUERY EXECUTE $q$
-                    INSERT INTO search.symspell_dictionary AS d (
-                        $q$ || source_class || $q$_count,
-                        prefix_key,
-                        $q$ || source_class || $q$_suggestions
-                    ) VALUES ( $1, $2, $3 ) ON CONFLICT (prefix_key) DO -- key exists, suggestions may be added due to this entry
-                        UPDATE SET  $q$ || source_class || $q$_suggestions = evergreen.text_array_merge_unique(d.$q$ || source_class || $q$_suggestions, EXCLUDED.$q$ || source_class || $q$_suggestions)
-                    RETURNING * $q$
-                    USING new_entry.count, new_entry.prefix_key, new_entry.suggestions;
-            END IF;
-        END IF;
-    END LOOP;
-END;
-$F$ LANGUAGE PLPGSQL;
-
 CREATE OR REPLACE FUNCTION search.symspell_maintain_entries () RETURNS TRIGGER AS $f$
 DECLARE
     search_class    TEXT;
@@ -1763,7 +1728,17 @@ BEGIN
         old_value := OLD.value;
     END IF;
 
-    PERFORM * FROM search.symspell_build_and_merge_entries(new_value, search_class, old_value);
+    IF new_value = old_value THEN
+        -- same, move along
+    ELSE
+        INSERT INTO search.symspell_dictionary_updates
+            SELECT  txid_current(), *
+              FROM  search.symspell_build_entries(
+                        new_value,
+                        search_class,
+                        old_value
+                    );
+    END IF;
 
     RETURN NULL; -- always fired AFTER
 END;
diff --git a/Open-ILS/src/sql/Pg/upgrade/WWWW.schema.dym_update_and_reify.sql b/Open-ILS/src/sql/Pg/upgrade/WWWW.schema.dym_update_and_reify.sql
new file mode 100644 (file)
index 0000000..8ff1772
--- /dev/null
@@ -0,0 +1,240 @@
+BEGIN;
+
+-- INSERT-only table that catches dictionary updates to be reconciled
+CREATE UNLOGGED TABLE search.symspell_dictionary_updates (
+    transaction_id          BIGINT,
+    keyword_count           INT     NOT NULL DEFAULT 0,
+    title_count             INT     NOT NULL DEFAULT 0,
+    author_count            INT     NOT NULL DEFAULT 0,
+    subject_count           INT     NOT NULL DEFAULT 0,
+    series_count            INT     NOT NULL DEFAULT 0,
+    identifier_count        INT     NOT NULL DEFAULT 0,
+
+    prefix_key              TEXT    NOT NULL,
+
+    keyword_suggestions     TEXT[],
+    title_suggestions       TEXT[],
+    author_suggestions      TEXT[],
+    subject_suggestions     TEXT[],
+    series_suggestions      TEXT[],
+    identifier_suggestions  TEXT[]
+);
+CREATE INDEX symspell_dictionary_updates_tid_idx ON search.symspell_dictionary_updates (transaction_id);
+
+-- Function that collects this transactions additions to the unlogged update table
+CREATE OR REPLACE FUNCTION search.symspell_dictionary_reify () RETURNS SETOF search.symspell_dictionary AS $f$
+ WITH new_rows AS (
+    DELETE FROM search.symspell_dictionary_updates WHERE transaction_id = txid_current() RETURNING *
+ ), computed_rows AS ( -- this collapses the rows deleted into the format we need for UPSERT
+    SELECT  SUM(keyword_count)    AS keyword_count,
+            SUM(title_count)      AS title_count,
+            SUM(author_count)     AS author_count,
+            SUM(subject_count)    AS subject_count,
+            SUM(series_count)     AS series_count,
+            SUM(identifier_count) AS identifier_count,
+
+            prefix_key,
+
+            ARRAY_REMOVE(ARRAY_AGG(DISTINCT keyword_suggestions[1]), NULL)    AS keyword_suggestions,
+            ARRAY_REMOVE(ARRAY_AGG(DISTINCT title_suggestions[1]), NULL)      AS title_suggestions,
+            ARRAY_REMOVE(ARRAY_AGG(DISTINCT author_suggestions[1]), NULL)     AS author_suggestions,
+            ARRAY_REMOVE(ARRAY_AGG(DISTINCT subject_suggestions[1]), NULL)    AS subject_suggestions,
+            ARRAY_REMOVE(ARRAY_AGG(DISTINCT series_suggestions[1]), NULL)     AS series_suggestions,
+            ARRAY_REMOVE(ARRAY_AGG(DISTINCT identifier_suggestions[1]), NULL) AS identifier_suggestions
+      FROM  new_rows
+      GROUP BY prefix_key
+ )
+ INSERT INTO search.symspell_dictionary AS d SELECT * FROM computed_rows
+ ON CONFLICT (prefix_key) DO UPDATE SET
+    keyword_count = GREATEST(0, d.keyword_count + EXCLUDED.keyword_count),
+    keyword_suggestions = evergreen.text_array_merge_unique(EXCLUDED.keyword_suggestions,d.keyword_suggestions),
+
+    title_count = GREATEST(0, d.title_count + EXCLUDED.title_count),
+    title_suggestions = evergreen.text_array_merge_unique(EXCLUDED.title_suggestions,d.title_suggestions),
+
+    author_count = GREATEST(0, d.author_count + EXCLUDED.author_count),
+    author_suggestions = evergreen.text_array_merge_unique(EXCLUDED.author_suggestions,d.author_suggestions),
+
+    subject_count = GREATEST(0, d.subject_count + EXCLUDED.subject_count),
+    subject_suggestions = evergreen.text_array_merge_unique(EXCLUDED.subject_suggestions,d.subject_suggestions),
+
+    series_count = GREATEST(0, d.series_count + EXCLUDED.series_count),
+    series_suggestions = evergreen.text_array_merge_unique(EXCLUDED.series_suggestions,d.series_suggestions),
+
+    identifier_count = GREATEST(0, d.identifier_count + EXCLUDED.identifier_count),
+    identifier_suggestions = evergreen.text_array_merge_unique(EXCLUDED.identifier_suggestions,d.identifier_suggestions)
+ RETURNING *;
+$f$ LANGUAGE SQL;
+
+-- simplified metabib.*_field_entry trigger that stages updates for reification in one go
+CREATE OR REPLACE FUNCTION search.symspell_maintain_entries () RETURNS TRIGGER AS $f$
+DECLARE
+    search_class    TEXT;
+    new_value       TEXT := NULL;
+    old_value       TEXT := NULL;
+BEGIN
+    search_class := COALESCE(TG_ARGV[0], SPLIT_PART(TG_TABLE_NAME,'_',1));
+
+    IF TG_OP IN ('INSERT', 'UPDATE') THEN
+        new_value := NEW.value;
+    END IF;
+
+    IF TG_OP IN ('DELETE', 'UPDATE') THEN
+        old_value := OLD.value;
+    END IF;
+
+    IF new_value = old_value THEN
+        -- same, move along
+    ELSE
+        INSERT INTO search.symspell_dictionary_updates
+            SELECT  txid_current(), *
+              FROM  search.symspell_build_entries(
+                        new_value,
+                        search_class,
+                        old_value
+                    );
+    END IF;
+
+    RETURN NULL; -- always fired AFTER
+END;
+$f$ LANGUAGE PLPGSQL;
+
+CREATE OR REPLACE FUNCTION metabib.reingest_metabib_field_entries(
+    bib_id BIGINT,
+    skip_facet BOOL DEFAULT FALSE,
+    skip_display BOOL DEFAULT FALSE,
+    skip_browse BOOL DEFAULT FALSE,
+    skip_search BOOL DEFAULT FALSE,
+    only_fields INT[] DEFAULT '{}'::INT[]
+) RETURNS VOID AS $func$
+DECLARE
+    fclass          RECORD;
+    ind_data        metabib.field_entry_template%ROWTYPE;
+    mbe_row         metabib.browse_entry%ROWTYPE;
+    mbe_id          BIGINT;
+    b_skip_facet    BOOL;
+    b_skip_display    BOOL;
+    b_skip_browse   BOOL;
+    b_skip_search   BOOL;
+    value_prepped   TEXT;
+    field_list      INT[] := only_fields;
+    field_types     TEXT[] := '{}'::TEXT[];
+BEGIN
+
+    IF field_list = '{}'::INT[] THEN
+        SELECT ARRAY_AGG(id) INTO field_list FROM config.metabib_field;
+    END IF;
+
+    SELECT COALESCE(NULLIF(skip_facet, FALSE), EXISTS (SELECT enabled FROM config.internal_flag WHERE name =  'ingest.skip_facet_indexing' AND enabled)) INTO b_skip_facet;
+    SELECT COALESCE(NULLIF(skip_display, FALSE), EXISTS (SELECT enabled FROM config.internal_flag WHERE name =  'ingest.skip_display_indexing' AND enabled)) INTO b_skip_display;
+    SELECT COALESCE(NULLIF(skip_browse, FALSE), EXISTS (SELECT enabled FROM config.internal_flag WHERE name =  'ingest.skip_browse_indexing' AND enabled)) INTO b_skip_browse;
+    SELECT COALESCE(NULLIF(skip_search, FALSE), EXISTS (SELECT enabled FROM config.internal_flag WHERE name =  'ingest.skip_search_indexing' AND enabled)) INTO b_skip_search;
+
+    IF NOT b_skip_facet THEN field_types := field_types || '{facet}'; END IF;
+    IF NOT b_skip_display THEN field_types := field_types || '{display}'; END IF;
+    IF NOT b_skip_browse THEN field_types := field_types || '{browse}'; END IF;
+    IF NOT b_skip_search THEN field_types := field_types || '{search}'; END IF;
+
+    PERFORM * FROM config.internal_flag WHERE name = 'ingest.assume_inserts_only' AND enabled;
+    IF NOT FOUND THEN
+        IF NOT b_skip_search THEN
+            FOR fclass IN SELECT * FROM config.metabib_class LOOP
+                -- RAISE NOTICE 'Emptying out %', fclass.name;
+                EXECUTE $$DELETE FROM metabib.$$ || fclass.name || $$_field_entry WHERE source = $$ || bib_id;
+            END LOOP;
+        END IF;
+        IF NOT b_skip_facet THEN
+            DELETE FROM metabib.facet_entry WHERE source = bib_id;
+        END IF;
+        IF NOT b_skip_display THEN
+            DELETE FROM metabib.display_entry WHERE source = bib_id;
+        END IF;
+        IF NOT b_skip_browse THEN
+            DELETE FROM metabib.browse_entry_def_map WHERE source = bib_id;
+        END IF;
+    END IF;
+
+    FOR ind_data IN SELECT * FROM biblio.extract_metabib_field_entry( bib_id, ' ', field_types, field_list ) LOOP
+
+    -- don't store what has been normalized away
+        CONTINUE WHEN ind_data.value IS NULL;
+
+        IF ind_data.field < 0 THEN
+            ind_data.field = -1 * ind_data.field;
+        END IF;
+
+        IF ind_data.facet_field AND NOT b_skip_facet THEN
+            INSERT INTO metabib.facet_entry (field, source, value)
+                VALUES (ind_data.field, ind_data.source, ind_data.value);
+        END IF;
+
+        IF ind_data.display_field AND NOT b_skip_display THEN
+            INSERT INTO metabib.display_entry (field, source, value)
+                VALUES (ind_data.field, ind_data.source, ind_data.value);
+        END IF;
+
+
+        IF ind_data.browse_field AND NOT b_skip_browse THEN
+            -- A caveat about this SELECT: this should take care of replacing
+            -- old mbe rows when data changes, but not if normalization (by
+            -- which I mean specifically the output of
+            -- evergreen.oils_tsearch2()) changes.  It may or may not be
+            -- expensive to add a comparison of index_vector to index_vector
+            -- to the WHERE clause below.
+
+            CONTINUE WHEN ind_data.sort_value IS NULL;
+
+            value_prepped := metabib.browse_normalize(ind_data.value, ind_data.field);
+            IF ind_data.browse_nocase THEN
+                SELECT INTO mbe_row * FROM metabib.browse_entry
+                    WHERE evergreen.lowercase(value) = evergreen.lowercase(value_prepped) AND sort_value = ind_data.sort_value
+                    ORDER BY sort_value, value LIMIT 1; -- gotta pick something, I guess
+            ELSE
+                SELECT INTO mbe_row * FROM metabib.browse_entry
+                    WHERE value = value_prepped AND sort_value = ind_data.sort_value;
+            END IF;
+
+            IF FOUND THEN
+                mbe_id := mbe_row.id;
+            ELSE
+                INSERT INTO metabib.browse_entry
+                    ( value, sort_value ) VALUES
+                    ( value_prepped, ind_data.sort_value );
+
+                mbe_id := CURRVAL('metabib.browse_entry_id_seq'::REGCLASS);
+            END IF;
+
+            INSERT INTO metabib.browse_entry_def_map (entry, def, source, authority)
+                VALUES (mbe_id, ind_data.field, ind_data.source, ind_data.authority);
+        END IF;
+
+        IF ind_data.search_field AND NOT b_skip_search THEN
+            -- Avoid inserting duplicate rows
+            EXECUTE 'SELECT 1 FROM metabib.' || ind_data.field_class ||
+                '_field_entry WHERE field = $1 AND source = $2 AND value = $3'
+                INTO mbe_id USING ind_data.field, ind_data.source, ind_data.value;
+                -- RAISE NOTICE 'Search for an already matching row returned %', mbe_id;
+            IF mbe_id IS NULL THEN
+                EXECUTE $$
+                INSERT INTO metabib.$$ || ind_data.field_class || $$_field_entry (field, source, value)
+                    VALUES ($$ ||
+                        quote_literal(ind_data.field) || $$, $$ ||
+                        quote_literal(ind_data.source) || $$, $$ ||
+                        quote_literal(ind_data.value) ||
+                    $$);$$;
+            END IF;
+        END IF;
+
+    END LOOP;
+
+    IF NOT b_skip_search THEN
+        PERFORM metabib.update_combined_index_vectors(bib_id);
+        PERFORM search.symspell_dictionary_reify();
+    END IF;
+
+    RETURN;
+END;
+$func$ LANGUAGE PLPGSQL;
+
+COMMIT;
+