LP1913807 Staff catalog shows preferred lib holdings counts
[evergreen-equinox.git] / Open-ILS / src / perlmods / lib / OpenILS / Application / Search / Biblio.pm
index 780963b..9fe2500 100644 (file)
@@ -10,13 +10,12 @@ use OpenSRF::Utils::SettingsClient;
 use OpenILS::Utils::CStoreEditor q/:funcs/;
 use OpenSRF::Utils::Cache;
 use Encode;
+use Email::Send;
+use Email::Simple;
 
 use OpenSRF::Utils::Logger qw/:logger/;
 
-
-use OpenSRF::Utils::JSON;
-
-use Time::HiRes qw(time);
+use Time::HiRes qw(time sleep);
 use OpenSRF::EX qw(:try);
 use Digest::MD5 qw(md5_hex);
 
@@ -40,18 +39,18 @@ my $superpage_size;
 my $max_superpages;
 
 sub initialize {
-       $cache = OpenSRF::Utils::Cache->new('global');
-       my $sclient = OpenSRF::Utils::SettingsClient->new();
-       $cache_timeout = $sclient->config_value(
-                       "apps", "open-ils.search", "app_settings", "cache_timeout" ) || 300;
+    $cache = OpenSRF::Utils::Cache->new('global');
+    my $sclient = OpenSRF::Utils::SettingsClient->new();
+    $cache_timeout = $sclient->config_value(
+            "apps", "open-ils.search", "app_settings", "cache_timeout" ) || 300;
 
-       $superpage_size = $sclient->config_value(
-                       "apps", "open-ils.search", "app_settings", "superpage_size" ) || 500;
+    $superpage_size = $sclient->config_value(
+            "apps", "open-ils.search", "app_settings", "superpage_size" ) || 500;
 
-       $max_superpages = $sclient->config_value(
-                       "apps", "open-ils.search", "app_settings", "max_superpages" ) || 20;
+    $max_superpages = $sclient->config_value(
+            "apps", "open-ils.search", "app_settings", "max_superpages" ) || 20;
 
-       $logger->info("Search cache timeout is $cache_timeout, ".
+    $logger->info("Search cache timeout is $cache_timeout, ".
         " superpage_size is $superpage_size, max_superpages is $max_superpages");
 }
 
@@ -62,28 +61,28 @@ sub initialize {
 # mods structures. Creates one MODS structure for each doc id.
 # ---------------------------------------------------------------------------
 sub _records_to_mods {
-       my @ids = @_;
-       
-       my @results;
-       my @marcxml_objs;
-
-       my $session = OpenSRF::AppSession->create("open-ils.cstore");
-       my $request = $session->request(
-                       "open-ils.cstore.direct.biblio.record_entry.search", { id => \@ids } );
-
-       while( my $resp = $request->recv ) {
-               my $content = $resp->content;
-               next if $content->id == OILS_PRECAT_RECORD;
-               my $u = OpenILS::Utils::ModsParser->new();  # FIXME: we really need a new parser for each object?
-               $u->start_mods_batch( $content->marc );
-               my $mods = $u->finish_mods_batch();
-               $mods->doc_id($content->id());
-               $mods->tcn($content->tcn_value);
-               push @results, $mods;
-       }
+    my @ids = @_;
+    
+    my @results;
+    my @marcxml_objs;
+
+    my $session = OpenSRF::AppSession->create("open-ils.cstore");
+    my $request = $session->request(
+            "open-ils.cstore.direct.biblio.record_entry.search", { id => \@ids } );
+
+    while( my $resp = $request->recv ) {
+        my $content = $resp->content;
+        next if $content->id == OILS_PRECAT_RECORD;
+        my $u = OpenILS::Utils::ModsParser->new();  # FIXME: we really need a new parser for each object?
+        $u->start_mods_batch( $content->marc );
+        my $mods = $u->finish_mods_batch();
+        $mods->doc_id($content->id());
+        $mods->tcn($content->tcn_value);
+        push @results, $mods;
+    }
 
-       $session->disconnect();
-       return \@results;
+    $session->disconnect();
+    return \@results;
 }
 
 __PACKAGE__->register_method(
@@ -136,16 +135,16 @@ __PACKAGE__->register_method(
 
 # converts a record into a mods object with NO copy counts attached
 sub record_id_to_mods_slim {
-       my( $self, $client, $id ) = @_;
-       return undef unless defined $id;
+    my( $self, $client, $id ) = @_;
+    return undef unless defined $id;
 
-       if(ref($id) and ref($id) == 'ARRAY') {
-               return _records_to_mods( @$id );
-       }
-       my $mods_list = _records_to_mods( $id );
-       my $mods_obj  = $mods_list->[0];
-       return OpenILS::Event->new('BIBLIO_RECORD_ENTRY_NOT_FOUND') unless $mods_obj;
-       return $mods_obj;
+    if(ref($id) and ref($id) eq 'ARRAY') {
+        return _records_to_mods( @$id );
+    }
+    my $mods_list = _records_to_mods( $id );
+    my $mods_obj  = $mods_list->[0];
+    return OpenILS::Event->new('BIBLIO_RECORD_ENTRY_NOT_FOUND') unless $mods_obj;
+    return $mods_obj;
 }
 
 
@@ -156,7 +155,7 @@ __PACKAGE__->register_method(
     stream   => 1
 );
 sub record_id_to_mods_slim_batch {
-       my($self, $conn, $id_list) = @_;
+    my($self, $conn, $id_list) = @_;
     $conn->respond(_records_to_mods($_)->[0]) for @$id_list;
     return undef;
 }
@@ -276,6 +275,51 @@ sub record_id_to_copy_count {
     return [ sort { $a->{depth} <=> $b->{depth} } @count ];
 }
 
+__PACKAGE__->register_method(
+    method   => "record_has_holdable_copy",
+    api_name => "open-ils.search.biblio.record.has_holdable_copy",
+    signature => {
+        desc => q/Returns a boolean indicating if a record has any holdable copies./,
+        params => [
+            {desc => 'Record ID', type => 'number'}
+        ],
+        return => {
+            desc => q/bool indicating if the record has any holdable copies/,
+            type => 'bool'
+        }
+    }
+);
+
+__PACKAGE__->register_method(
+    method   => "record_has_holdable_copy",
+    api_name => "open-ils.search.biblio.metarecord.has_holdable_copy",
+    signature => {
+        desc => q/Returns a boolean indicating if a record has any holdable copies./,
+        params => [
+            {desc => 'Record ID', type => 'number'}
+        ],
+        return => {
+            desc => q/bool indicating if the record has any holdable copies/,
+            type => 'bool'
+        }
+    }
+);
+
+sub record_has_holdable_copy {
+    my($self, $client, $record_id ) = @_;
+
+    return 0 unless $record_id;
+
+    my $key = $self->api_name =~ /metarecord/ ? 'metarecord' : 'record';
+
+    my $data = $U->cstorereq(
+        "open-ils.cstore.json_query.atomic",
+        { from => ['asset.' . $key . '_has_holdable_copy' => $record_id ] }
+    );
+
+    return ${@$data[0]}{'asset.' . $key . '_has_holdable_copy'} eq 't';
+
+}
 
 __PACKAGE__->register_method(
     method   => "biblio_search_tcn",
@@ -305,7 +349,7 @@ sub biblio_search_tcn {
     my $search = {tcn_value => $tcn};
     $search->{deleted} = 'f' unless $include_deleted;
     my $recs = $e->search_biblio_record_entry( $search, {idlist =>1} );
-       
+    
     return { count => scalar(@$recs), ids => $recs };
 }
 
@@ -317,10 +361,10 @@ __PACKAGE__->register_method(
     api_name => "open-ils.search.asset.copy.find_by_barcode",
 );
 sub biblio_barcode_to_copy { 
-       my( $self, $client, $barcode ) = @_;
-       my( $copy, $evt ) = $U->fetch_copy_by_barcode($barcode);
-       return $evt if $evt;
-       return $copy;
+    my( $self, $client, $barcode ) = @_;
+    my( $copy, $evt ) = $U->fetch_copy_by_barcode($barcode);
+    return $evt if $evt;
+    return $copy;
 }
 
 __PACKAGE__->register_method(
@@ -328,17 +372,17 @@ __PACKAGE__->register_method(
     api_name => "open-ils.search.asset.copy.batch.retrieve",
 );
 sub biblio_id_to_copy { 
-       my( $self, $client, $ids ) = @_;
-       $logger->info("Fetching copies @$ids");
-       return $U->cstorereq(
-               "open-ils.cstore.direct.asset.copy.search.atomic", { id => $ids } );
+    my( $self, $client, $ids ) = @_;
+    $logger->info("Fetching copies @$ids");
+    return $U->cstorereq(
+        "open-ils.cstore.direct.asset.copy.search.atomic", { id => $ids } );
 }
 
 
 __PACKAGE__->register_method(
-       method  => "biblio_id_to_uris",
-       api_name=> "open-ils.search.asset.uri.retrieve_by_bib",
-       argc    => 2, 
+    method  => "biblio_id_to_uris",
+    api_name=> "open-ils.search.asset.uri.retrieve_by_bib",
+    argc    => 2, 
     stream  => 1,
     signature => q#
         @param BibID Which bib record contains the URIs
@@ -349,14 +393,14 @@ __PACKAGE__->register_method(
 
 );
 sub biblio_id_to_uris { 
-       my( $self, $client, $bib, $org, $depth ) = @_;
+    my( $self, $client, $bib, $org, $depth ) = @_;
     die "Org ID required" unless defined($org);
     die "Bib ID required" unless defined($bib);
 
     my @params;
     push @params, $depth if (defined $depth);
 
-       my $ids = $U->cstorereq( "open-ils.cstore.json_query.atomic",
+    my $ids = $U->cstorereq( "open-ils.cstore.json_query.atomic",
         {   select  => { auri => [ 'id' ] },
             from    => {
                 acn => {
@@ -390,8 +434,8 @@ sub biblio_id_to_uris {
         }
     );
 
-       my $uris = $U->cstorereq(
-               "open-ils.cstore.direct.asset.uri.search.atomic",
+    my $uris = $U->cstorereq(
+        "open-ils.cstore.direct.asset.uri.search.atomic",
         { id => [ map { (values %$_) } @$ids ] }
     );
 
@@ -417,9 +461,9 @@ __PACKAGE__->register_method(
 );
 
 sub copy_retrieve {
-       my( $self, $client, $cid ) = @_;
-       my( $copy, $evt ) = $U->fetch_copy($cid);
-       return $evt || $copy;
+    my( $self, $client, $cid ) = @_;
+    my( $copy, $evt ) = $U->fetch_copy($cid);
+    return $evt || $copy;
 }
 
 __PACKAGE__->register_method(
@@ -427,10 +471,10 @@ __PACKAGE__->register_method(
     api_name => "open-ils.search.asset.call_number.retrieve"
 );
 sub volume_retrieve {
-       my( $self, $client, $vid ) = @_;
-       my $e = new_editor();
-       my $vol = $e->retrieve_asset_call_number($vid) or return $e->event;
-       return $vol;
+    my( $self, $client, $vid ) = @_;
+    my $e = new_editor();
+    my $vol = $e->retrieve_asset_call_number($vid) or return $e->event;
+    return $vol;
 }
 
 __PACKAGE__->register_method(
@@ -440,14 +484,14 @@ __PACKAGE__->register_method(
 );
 
 sub fleshed_copy_retrieve_batch { 
-       my( $self, $client, $ids ) = @_;
-       $logger->info("Fetching fleshed copies @$ids");
-       return $U->cstorereq(
-               "open-ils.cstore.direct.asset.copy.search.atomic",
-               { id => $ids },
-               { flesh => 1, 
-                 flesh_fields => { acp => [ qw/ circ_lib location status stat_cat_entries parts / ] }
-               });
+    my( $self, $client, $ids ) = @_;
+    $logger->info("Fetching fleshed copies @$ids");
+    return $U->cstorereq(
+        "open-ils.cstore.direct.asset.copy.search.atomic",
+        { id => $ids },
+        { flesh => 1, 
+          flesh_fields => { acp => [ qw/ circ_lib location status stat_cat_entries parts / ] }
+        });
 }
 
 
@@ -457,9 +501,9 @@ __PACKAGE__->register_method(
 );
 
 sub fleshed_copy_retrieve { 
-       my( $self, $client, $id ) = @_;
-       my( $c, $e) = $U->fetch_fleshed_copy($id);
-       return $e || $c;
+    my( $self, $client, $id ) = @_;
+    my( $c, $e) = $U->fetch_fleshed_copy($id);
+    return $e || $c;
 }
 
 
@@ -469,12 +513,12 @@ __PACKAGE__->register_method(
     authoritative => 1,
 );
 sub fleshed_by_barcode {
-       my( $self, $conn, $barcode ) = @_;
-       my $e = new_editor();
-       my $copyid = $e->search_asset_copy(
-               {barcode => $barcode, deleted => 'f'}, {idlist=>1})->[0]
-               or return $e->event;
-       return fleshed_copy_retrieve2( $self, $conn, $copyid);
+    my( $self, $conn, $barcode ) = @_;
+    my $e = new_editor();
+    my $copyid = $e->search_asset_copy(
+        {barcode => $barcode, deleted => 'f'}, {idlist=>1})->[0]
+        or return $e->event;
+    return fleshed_copy_retrieve2( $self, $conn, $copyid);
 }
 
 
@@ -485,11 +529,11 @@ __PACKAGE__->register_method(
 );
 
 sub fleshed_copy_retrieve2 { 
-       my( $self, $client, $id ) = @_;
-       my $e = new_editor();
-       my $copy = $e->retrieve_asset_copy(
-               [
-                       $id,
+    my( $self, $client, $id ) = @_;
+    my $e = new_editor();
+    my $copy = $e->retrieve_asset_copy(
+        [
+            $id,
             {
                 flesh        => 2,
                 flesh_fields => {
@@ -499,27 +543,27 @@ sub fleshed_copy_retrieve2 {
                     ascecm => [qw/ stat_cat stat_cat_entry /],
                 }
             }
-               ]
-       ) or return $e->event;
+        ]
+    ) or return $e->event;
 
-       # For backwards compatibility
-       #$copy->stat_cat_entries($copy->stat_cat_entry_copy_maps);
+    # For backwards compatibility
+    #$copy->stat_cat_entries($copy->stat_cat_entry_copy_maps);
 
-       if( $copy->status->id == OILS_COPY_STATUS_CHECKED_OUT ) {
-               $copy->circulations(
-                       $e->search_action_circulation( 
-                               [       
-                                       { target_copy => $copy->id },
-                                       {
-                                               order_by => { circ => 'xact_start desc' },
-                                               limit => 1
-                                       }
-                               ]
-                       )
-               );
-       }
+    if( $copy->status->id == OILS_COPY_STATUS_CHECKED_OUT ) {
+        $copy->circulations(
+            $e->search_action_circulation( 
+                [   
+                    { target_copy => $copy->id },
+                    {
+                        order_by => { circ => 'xact_start desc' },
+                        limit => 1
+                    }
+                ]
+            )
+        );
+    }
 
-       return $copy;
+    return $copy;
 }
 
 
@@ -530,20 +574,20 @@ __PACKAGE__->register_method(
 );
 
 sub flesh_copy_custom {
-       my( $self, $conn, $copyid, $fields ) = @_;
-       my $e = new_editor();
-       my $copy = $e->retrieve_asset_copy(
-               [
-                       $copyid,
-                       { 
-                               flesh                           => 1,
-                               flesh_fields    => { 
-                                       acp => $fields,
-                               }
-                       }
-               ]
-       ) or return $e->event;
-       return $copy;
+    my( $self, $conn, $copyid, $fields ) = @_;
+    my $e = new_editor();
+    my $copy = $e->retrieve_asset_copy(
+        [
+            $copyid,
+            { 
+                flesh               => 1,
+                flesh_fields    => { 
+                    acp => $fields,
+                }
+            }
+        ]
+    ) or return $e->event;
+    return $copy;
 }
 
 
@@ -553,14 +597,14 @@ __PACKAGE__->register_method(
 );
 
 sub biblio_barcode_to_title {
-       my( $self, $client, $barcode ) = @_;
+    my( $self, $client, $barcode ) = @_;
 
-       my $title = $apputils->simple_scalar_request(
-               "open-ils.storage",
-               "open-ils.storage.biblio.record_entry.retrieve_by_barcode", $barcode );
+    my $title = $apputils->simple_scalar_request(
+        "open-ils.storage",
+        "open-ils.storage.biblio.record_entry.retrieve_by_barcode", $barcode );
 
-       return { ids => [ $title->id ], count => 1 } if $title;
-       return { count => 0 };
+    return { ids => [ $title->id ], count => 1 } if $title;
+    return { count => 0 };
 }
 
 __PACKAGE__->register_method(
@@ -659,7 +703,7 @@ __PACKAGE__->register_method(
 
 
 sub find_peer_bibs {
-       my( $self, $client, $doc_id ) = @_;
+    my( $self, $client, $doc_id ) = @_;
     my $e = new_editor();
 
     my $multi_home_list = $e->search_biblio_peer_bib_copy_map(
@@ -710,18 +754,18 @@ __PACKAGE__->register_method(
 
 # takes a copy object and returns it fleshed mods object
 sub biblio_copy_to_mods {
-       my( $self, $client, $copy ) = @_;
+    my( $self, $client, $copy ) = @_;
 
-       my $volume = $U->cstorereq( 
-               "open-ils.cstore.direct.asset.call_number.retrieve",
-               $copy->call_number() );
+    my $volume = $U->cstorereq( 
+        "open-ils.cstore.direct.asset.call_number.retrieve",
+        $copy->call_number() );
 
-       my $mods = _records_to_mods($volume->record());
-       $mods = shift @$mods;
-       $volume->copies([$copy]);
-       push @{$mods->call_numbers()}, $volume;
+    my $mods = _records_to_mods($volume->record());
+    $mods = shift @$mods;
+    $volume->copies([$copy]);
+    push @{$mods->call_numbers()}, $volume;
 
-       return $mods;
+    return $mods;
 }
 
 
@@ -746,8 +790,9 @@ Recognized search keys include:
  title   (ti) - search title      *
  subject (su) - search subject    *
  series  (se) - search series     *
- lang - limit by language (specifiy multiple langs with lang:l1 lang:l2 ...)
+ lang - limit by language (specify multiple langs with lang:l1 lang:l2 ...)
  site - search at specified org unit, corresponds to actor.org_unit.shortname
+ pref_ou - extend search to specified org unit, corresponds to actor.org_unit.shortname
  sort - sort type (title, author, pubdate)
  dir  - sort direction (asc, desc)
  available - if set to anything other than "false" or "0", limits to available items
@@ -783,133 +828,21 @@ __PACKAGE__->register_method(
 }
 
 sub multiclass_query {
-    my($self, $conn, $arghash, $query, $docache) = @_;
-
-    $logger->debug("initial search query => $query");
-    my $orig_query = $query;
-
-    $query =~ s/\+/ /go;
-    $query =~ s/^\s+//go;
-
-    # convert convenience classes (e.g. kw for keyword) to the full class name
-    # ensure that the convenience class isn't part of a word (e.g. 'playhouse')
-    $query =~ s/(^|\s)kw(:|\|)/$1keyword$2/go;
-    $query =~ s/(^|\s)ti(:|\|)/$1title$2/go;
-    $query =~ s/(^|\s)au(:|\|)/$1author$2/go;
-    $query =~ s/(^|\s)su(:|\|)/$1subject$2/go;
-    $query =~ s/(^|\s)se(:|\|)/$1series$2/go;
-    $query =~ s/(^|\s)name(:|\|)/$1author$2/og;
-
-    $logger->debug("cleansed query string => $query");
-    my $search = {};
-
-    my $simple_class_re  = qr/((?:\w+(?:\|\w+)?):[^:]+?)$/;
-    my $class_list_re    = qr/(?:keyword|title|author|subject|series)/;
-    my $modifier_list_re = qr/(?:site|dir|sort|lang|available)/;
-
-    my $tmp_value = '';
-    while ($query =~ s/$simple_class_re//so) {
-
-        my $qpart = $1;
-        my $where = index($qpart,':');
-        my $type  = substr($qpart, 0, $where++);
-        my $value = substr($qpart, $where);
-
-        if ($type !~ /^(?:$class_list_re|$modifier_list_re)/o) {
-            $tmp_value = "$qpart $tmp_value";
-            next;
-        }
-
-        if ($type =~ /$class_list_re/o ) {
-            $value .= $tmp_value;
-            $tmp_value = '';
-        }
-
-        next unless $type and $value;
-
-        $value =~ s/^\s*//og;
-        $value =~ s/\s*$//og;
-        $type = 'sort_dir' if $type eq 'dir';
-
-        if($type eq 'site') {
-            # 'site' is the org shortname.  when using this, we also want 
-            # to search at the requested org's depth
-            my $e = new_editor();
-            if(my $org = $e->search_actor_org_unit({shortname => $value})->[0]) {
-                $arghash->{org_unit} = $org->id if $org;
-                $arghash->{depth} = $e->retrieve_actor_org_unit_type($org->ou_type)->depth;
-            } else {
-                $logger->warn("'site:' query used on invalid org shortname: $value ... ignoring");
-            }
-
-        } elsif($type eq 'available') {
-            # limit to available
-            $arghash->{available} = 1 unless $value eq 'false' or $value eq '0';
-
-        } elsif($type eq 'lang') {
-            # collect languages into an array of languages
-            $arghash->{language} = [] unless $arghash->{language};
-            push(@{$arghash->{language}}, $value);
-
-        } elsif($type =~ /^sort/o) {
-            # sort and sort_dir modifiers
-            $arghash->{$type} = $value;
-
-        } else {
-            # append the search term to the term under construction
-            $search->{$type} =  {} unless $search->{$type};
-            $search->{$type}->{term} =  
-                ($search->{$type}->{term}) ? $search->{$type}->{term} . " $value" : $value;
-        }
+    # arghash only really supports limit/offset anymore
+    my($self, $conn, $arghash, $query, $docache, $phys_loc) = @_;
+
+    if ($query) {
+        $query =~ s/\+/ /go;
+        $query =~ s/^\s+//go;
+        $query =~ s/\s+/ /go;
+        $arghash->{query} = $query
     }
 
-    $query .= " $tmp_value";
-    $query =~ s/\s+/ /go;
-    $query =~ s/^\s+//go;
-    $query =~ s/\s+$//go;
-
-    my $type = $arghash->{default_class} || 'keyword';
-    $type = ($type eq '-') ? 'keyword' : $type;
-    $type = ($type !~ /^(title|author|keyword|subject|series)(?:\|\w+)?$/o) ? 'keyword' : $type;
-
-    if($query) {
-        # This is the front part of the string before any special tokens were
-        # parsed OR colon-separated strings that do not denote a class.
-        # Add this data to the default search class
-        $search->{$type} =  {} unless $search->{$type};
-        $search->{$type}->{term} =
-            ($search->{$type}->{term}) ? $search->{$type}->{term} . " $query" : $query;
-    }
-    my $real_search = $arghash->{searches} = { $type => { term => $orig_query } };
+    $logger->debug("initial search query => $query") if $query;
 
-    # capture the original limit because the search method alters the limit internally
-    my $ol = $arghash->{limit};
+    (my $method = $self->api_name) =~ s/\.query/.staged/o;
+    return $self->method_lookup($method)->dispatch($arghash, $docache, $phys_loc);
 
-       my $sclient = OpenSRF::Utils::SettingsClient->new;
-
-    (my $method = $self->api_name) =~ s/\.query//o;
-
-    $method =~ s/multiclass/multiclass.staged/
-        if $sclient->config_value(apps => 'open-ils.search',
-            app_settings => 'use_staged_search') =~ /true/i;
-
-    # XXX This stops the session locale from doing the right thing.
-    # XXX Revisit this and have it translate to a lang instead of a locale.
-    #$arghash->{preferred_language} = $U->get_org_locale($arghash->{org_unit})
-    #    unless $arghash->{preferred_language};
-
-       $method = $self->method_lookup($method);
-    my ($data) = $method->run($arghash, $docache);
-
-    $arghash->{searches} = $search if (!$data->{complex_query});
-
-    $arghash->{limit} = $ol if $ol;
-    $data->{compiled_search} = $arghash;
-    $data->{query} = $orig_query;
-
-    $logger->info("compiled search is " . OpenSRF::Utils::JSON->perl2JSON($arghash));
-
-    return $data;
 }
 
 __PACKAGE__->register_method(
@@ -927,57 +860,61 @@ __PACKAGE__->register_method(
 );
 
 sub cat_search_z_style_wrapper {
-       my $self = shift;
-       my $client = shift;
-       my $authtoken = shift;
-       my $args = shift;
-
-       my $cstore = OpenSRF::AppSession->connect('open-ils.cstore');
-
-       my $ou = $cstore->request(
-               'open-ils.cstore.direct.actor.org_unit.search',
-               { parent_ou => undef }
-       )->gather(1);
-
-       my $result = { service => 'native-evergreen-catalog', records => [] };
-       my $searchhash = { limit => $$args{limit}, offset => $$args{offset}, org_unit => $ou->id };
-
-       $$searchhash{searches}{title}{term}   = $$args{search}{title}   if $$args{search}{title};
-       $$searchhash{searches}{author}{term}  = $$args{search}{author}  if $$args{search}{author};
-       $$searchhash{searches}{subject}{term} = $$args{search}{subject} if $$args{search}{subject};
-       $$searchhash{searches}{keyword}{term} = $$args{search}{keyword} if $$args{search}{keyword};
-       $$searchhash{searches}{'identifier|isbn'}{term} = $$args{search}{isbn} if $$args{search}{isbn};
-       $$searchhash{searches}{'identifier|issn'}{term} = $$args{search}{issn} if $$args{search}{issn};
-
-       $$searchhash{searches}{keyword}{term} .= join ' ', $$searchhash{searches}{keyword}{term}, $$args{search}{tcn}       if $$args{search}{tcn};
-       $$searchhash{searches}{keyword}{term} .= join ' ', $$searchhash{searches}{keyword}{term}, $$args{search}{publisher} if $$args{search}{publisher};
-       $$searchhash{searches}{keyword}{term} .= join ' ', $$searchhash{searches}{keyword}{term}, $$args{search}{pubdate}   if $$args{search}{pubdate};
-       $$searchhash{searches}{keyword}{term} .= join ' ', $$searchhash{searches}{keyword}{term}, $$args{search}{item_type} if $$args{search}{item_type};
-
-       my $list = the_quest_for_knowledge( $self, $client, $searchhash );
-
-       if ($list->{count} > 0) {
-               $result->{count} = $list->{count};
-
-               my $records = $cstore->request(
-                       'open-ils.cstore.direct.biblio.record_entry.search.atomic',
-                       { id => [ map { ( $_->[0] ) } @{$list->{ids}} ] }
-               )->gather(1);
-
-               for my $rec ( @$records ) {
-                       
-                       my $u = OpenILS::Utils::ModsParser->new();
+    my $self = shift;
+    my $client = shift;
+    my $authtoken = shift;
+    my $args = shift;
+
+    my $cstore = OpenSRF::AppSession->connect('open-ils.cstore');
+
+    my $ou = $cstore->request(
+        'open-ils.cstore.direct.actor.org_unit.search',
+        { parent_ou => undef }
+    )->gather(1);
+
+    my $result = { service => 'native-evergreen-catalog', records => [] };
+    my $searchhash = { limit => $$args{limit}, offset => $$args{offset}, org_unit => $ou->id };
+
+    $$searchhash{searches}{title}{term}   = $$args{search}{title}   if $$args{search}{title};
+    $$searchhash{searches}{author}{term}  = $$args{search}{author}  if $$args{search}{author};
+    $$searchhash{searches}{subject}{term} = $$args{search}{subject} if $$args{search}{subject};
+    $$searchhash{searches}{keyword}{term} = $$args{search}{keyword} if $$args{search}{keyword};
+    $$searchhash{searches}{'identifier|isbn'}{term} = $$args{search}{isbn} if $$args{search}{isbn};
+    $$searchhash{searches}{'identifier|issn'}{term} = $$args{search}{issn} if $$args{search}{issn};
+    $$searchhash{searches}{'identifier|upc'}{term} = $$args{search}{upc} if $$args{search}{upc};
+
+    $$searchhash{searches}{keyword}{term} .= join ' ', $$searchhash{searches}{keyword}{term}, $$args{search}{tcn}       if $$args{search}{tcn};
+    $$searchhash{searches}{keyword}{term} .= join ' ', $$searchhash{searches}{keyword}{term}, $$args{search}{publisher} if $$args{search}{publisher};
+    $$searchhash{searches}{keyword}{term} .= join ' ', $$searchhash{searches}{keyword}{term}, $$args{search}{pubdate}   if $$args{search}{pubdate};
+    $$searchhash{searches}{keyword}{term} .= join ' ', $$searchhash{searches}{keyword}{term}, $$args{search}{item_type} if $$args{search}{item_type};
+
+    my $method = 'open-ils.search.biblio.multiclass.staged';
+    $method .= '.staff' if $self->api_name =~ /staff$/;
+
+    my ($list) = $self->method_lookup($method)->run( $searchhash );
+
+    if ($list->{count} > 0 and @{$list->{ids}}) {
+        $result->{count} = $list->{count};
+
+        my $records = $cstore->request(
+            'open-ils.cstore.direct.biblio.record_entry.search.atomic',
+            { id => [ map { ( $_->[0] ) } @{$list->{ids}} ] }
+        )->gather(1);
+
+        for my $rec ( @$records ) {
+            
+            my $u = OpenILS::Utils::ModsParser->new();
                         $u->start_mods_batch( $rec->marc );
                         my $mods = $u->finish_mods_batch();
 
-                       push @{ $result->{records} }, { mvr => $mods, marcxml => $rec->marc, bibid => $rec->id };
+            push @{ $result->{records} }, { mvr => $mods, marcxml => $rec->marc, bibid => $rec->id };
 
-               }
+        }
 
-       }
+    }
 
     $cstore->disconnect();
-       return $result;
+    return $result;
 }
 
 # ----------------------------------------------------------------------------
@@ -1062,78 +999,78 @@ __PACKAGE__->register_method(
 );
 
 sub the_quest_for_knowledge {
-       my( $self, $conn, $searchhash, $docache ) = @_;
+    my( $self, $conn, $searchhash, $docache ) = @_;
 
-       return { count => 0 } unless $searchhash and
-               ref $searchhash->{searches} eq 'HASH';
+    return { count => 0 } unless $searchhash and
+        ref $searchhash->{searches} eq 'HASH';
 
-       my $method = 'open-ils.storage.biblio.multiclass.search_fts';
-       my $ismeta = 0;
-       my @recs;
+    my $method = 'open-ils.storage.biblio.multiclass.search_fts';
+    my $ismeta = 0;
+    my @recs;
 
-       if($self->api_name =~ /metabib/) {
-               $ismeta = 1;
-               $method =~ s/biblio/metabib/o;
-       }
+    if($self->api_name =~ /metabib/) {
+        $ismeta = 1;
+        $method =~ s/biblio/metabib/o;
+    }
 
-       # do some simple sanity checking
-       if(!$searchhash->{searches} or
-               ( !grep { /^(?:title|author|subject|series|keyword|identifier\|is[bs]n)/ } keys %{$searchhash->{searches}} ) ) {
-               return { count => 0 };
-       }
+    # do some simple sanity checking
+    if(!$searchhash->{searches} or
+        ( !grep { /^(?:title|author|subject|series|keyword|identifier\|is[bs]n)/ } keys %{$searchhash->{searches}} ) ) {
+        return { count => 0 };
+    }
 
     my $offset = $searchhash->{offset} ||  0;   # user value or default in local var now
     my $limit  = $searchhash->{limit}  || 10;   # user value or default in local var now
     my $end    = $offset + $limit - 1;
 
-       my $maxlimit = 5000;
+    my $maxlimit = 5000;
     $searchhash->{offset} = 0;                  # possible user value overwritten in hash
     $searchhash->{limit}  = $maxlimit;          # possible user value overwritten in hash
 
-       return { count => 0 } if $offset > $maxlimit;
+    return { count => 0 } if $offset > $maxlimit;
 
-       my @search;
-       push( @search, ($_ => $$searchhash{$_})) for (sort keys %$searchhash);
-       my $s = OpenSRF::Utils::JSON->perl2JSON(\@search);
-       my $ckey = $pfx . md5_hex($method . $s);
+    my @search;
+    push( @search, ($_ => $$searchhash{$_})) for (sort keys %$searchhash);
+    my $s = OpenSRF::Utils::JSON->perl2JSON(\@search);
+    my $ckey = $pfx . md5_hex($method . $s);
 
-       $logger->info("bib search for: $s");
+    $logger->info("bib search for: $s");
 
-       $searchhash->{limit} -= $offset;
+    $searchhash->{limit} -= $offset;
 
 
     my $trim = 0;
-       my $result = ($docache) ? search_cache($ckey, $offset, $limit) : undef;
-
-       if(!$result) {
-
-               $method .= ".staff" if($self->api_name =~ /staff/);
-               $method .= ".atomic";
-       
-               for (keys %$searchhash) { 
-                       delete $$searchhash{$_} 
-                               unless defined $$searchhash{$_}; 
-               }
-       
-               $result = $U->storagereq( $method, %$searchhash );
+    my $result = ($docache) ? search_cache($ckey, $offset, $limit) : undef;
+
+    if(!$result) {
+
+        $method .= ".staff" if($self->api_name =~ /staff/);
+        $method .= ".atomic";
+    
+        for (keys %$searchhash) { 
+            delete $$searchhash{$_} 
+                unless defined $$searchhash{$_}; 
+        }
+    
+        $result = $U->storagereq( $method, %$searchhash );
         $trim = 1;
 
-       } else { 
-               $docache = 0;   # results came FROM cache, so we don't write back
-       }
+    } else { 
+        $docache = 0;   # results came FROM cache, so we don't write back
+    }
 
-       return {count => 0} unless ($result && $$result[0]);
+    return {count => 0} unless ($result && $$result[0]);
 
-       @recs = @$result;
+    @recs = @$result;
 
-       my $count = ($ismeta) ? $result->[0]->[3] : $result->[0]->[2];
+    my $count = ($ismeta) ? $result->[0]->[3] : $result->[0]->[2];
 
-       if($docache) {
-               # If we didn't get this data from the cache, put it into the cache
-               # then return the correct offset of records
-               $logger->debug("putting search cache $ckey\n");
-               put_cache($ckey, $count, \@recs);
-       }
+    if($docache) {
+        # If we didn't get this data from the cache, put it into the cache
+        # then return the correct offset of records
+        $logger->debug("putting search cache $ckey\n");
+        put_cache($ckey, $count, \@recs);
+    }
 
     if($trim) {
         # if we have the full set of data, trim out 
@@ -1146,7 +1083,7 @@ sub the_quest_for_knowledge {
         @recs = @t;
     }
 
-       return { ids => \@recs, count => $count };
+    return { ids => \@recs, count => $count };
 }
 
 
@@ -1191,8 +1128,11 @@ __PACKAGE__->register_method(
     signature => q/The .staff search includes hidden bibs, hidden items and bibs with no items.  Otherwise, @see open-ils.search.biblio.multiclass.staged/
 );
 
+my $estimation_strategy;
 sub staged_search {
-       my($self, $conn, $search_hash, $docache) = @_;
+    my($self, $conn, $search_hash, $docache, $phys_loc) = @_;
+
+    $phys_loc ||= $U->get_org_tree->id;
 
     my $IAmMetabib = ($self->api_name =~ /metabib/) ? 1 : 0;
 
@@ -1203,10 +1143,12 @@ sub staged_search {
     $method .= '.staff' if $self->api_name =~ /staff$/;
     $method .= '.atomic';
                 
-    return {count => 0} unless (
-        $search_hash and 
-        $search_hash->{searches} and 
-        scalar( keys %{$search_hash->{searches}} ));
+    if (!$search_hash->{query}) {
+        return {count => 0} unless (
+            $search_hash and 
+            $search_hash->{searches} and 
+            scalar( keys %{$search_hash->{searches}} ));
+    }
 
     my $search_duration;
     my $user_offset = $search_hash->{offset} ||  0; # user-specified offset
@@ -1227,26 +1169,44 @@ sub staged_search {
     $search_hash->{core_limit}  = $superpage_size * $max_superpages;
 
     # Set the configured estimation strategy, defaults to 'inclusion'.
-       my $estimation_strategy = OpenSRF::Utils::SettingsClient
-        ->new
-        ->config_value(
-            apps => 'open-ils.search', app_settings => 'estimation_strategy'
-        ) || 'inclusion';
-       $search_hash->{estimation_strategy} = $estimation_strategy;
+    unless ($estimation_strategy) {
+        $estimation_strategy = OpenSRF::Utils::SettingsClient
+            ->new
+            ->config_value(
+                apps => 'open-ils.search', app_settings => 'estimation_strategy'
+            ) || 'inclusion';
+    }
+    $search_hash->{estimation_strategy} = $estimation_strategy;
 
     # pull any existing results from the cache
     my $key = search_cache_key($method, $search_hash);
     my $facet_key = $key.'_facets';
     my $cache_data = $cache->get_cache($key) || {};
 
+    # First, we want to make sure that someone else isn't currently trying to perform exactly
+    # this same search.  The point is to allow just one instance of a search to fill the needs
+    # of all concurrent, identical searches.  This will avoid spammy searches killing the
+    # database without requiring admins to start locking some IP addresses out entirely.
+    #
+    # There's still a tiny race condition where 2 might run, but without sigificantly more code
+    # and complexity, this is close to the best we can do.
+
+    if ($cache_data->{running}) { # someone is already doing the search...
+        my $stop_looping = time() + $cache_timeout;
+        while ( sleep(1) and time() < $stop_looping ) { # sleep for a second ... maybe they'll finish
+            $cache_data = $cache->get_cache($key) || {};
+            last if (!$cache_data->{running});
+        }
+    } elsif (!$cache_data->{0}) { # we're the first ... let's give it a try
+        $cache->put_cache($key, { running => $$ }, $cache_timeout / 3);
+    }
+
     # keep retrieving results until we find enough to 
     # fulfill the user-specified limit and offset
     my $all_results = [];
     my $page; # current superpage
-    my $est_hit_count = 0;
     my $current_page_summary = {};
     my $global_summary = {checked => 0, visible => 0, excluded => 0, deleted => 0, total => 0};
-    my $is_real_hit_count = 0;
     my $new_ids = [];
 
     for($page = 0; $page < $max_superpages; $page++) {
@@ -1267,10 +1227,11 @@ sub staged_search {
             # retrieve the window of results from the database
             $logger->debug("staged search: fetching results from the database");
             $search_hash->{skip_check} = $page * $superpage_size;
+            $search_hash->{return_query} = $page == 0 ? 1 : 0;
+
             my $start = time;
             $results = $U->storagereq($method, %$search_hash);
             $search_duration = time - $start;
-            $logger->info("staged search: DB call took $search_duration seconds and returned ".scalar(@$results)." rows, including summary");
             $summary = shift(@$results) if $results;
 
             unless($summary) {
@@ -1279,27 +1240,23 @@ sub staged_search {
                 return {count => 0};
             }
 
-            my $hc = $summary->{estimated_hit_count} || $summary->{visible};
-            if($hc == 0) {
-                $logger->info("search returned 0 results: duration=$search_duration: params=".
-                    OpenSRF::Utils::JSON->perl2JSON($search_hash));
-            }
+            $logger->info("staged search: DB call took $search_duration seconds and returned ".scalar(@$results)." rows, including summary");
 
             # Create backwards-compatible result structures
             if($IAmMetabib) {
-                $results = [map {[$_->{id}, $_->{rel}, $_->{record}]} @$results];
+                $results = [map {[$_->{id}, $_->{badges}, $_->{popularity}, $_->{rel}, $_->{record}]} @$results];
             } else {
-                $results = [map {[$_->{id}]} @$results];
+                $results = [map {[$_->{id}, $_->{badges}, $_->{popularity}]} @$results];
             }
 
-            tag_circulated_records($search_hash->{authtoken}, $results, $IAmMetabib) 
-                if $search_hash->{tag_circulated_records} and $search_hash->{authtoken};
-
             push @$new_ids, grep {defined($_)} map {$_->[0]} @$results;
             $results = [grep {defined $_->[0]} @$results];
             cache_staged_search_page($key, $page, $summary, $results) if $docache;
         }
 
+        tag_circulated_records($search_hash->{authtoken}, $results, $IAmMetabib) 
+            if $search_hash->{tag_circulated_records} and $search_hash->{authtoken};
+
         $current_page_summary = $summary;
 
         # add the new set of results to the set under construction
@@ -1307,58 +1264,88 @@ sub staged_search {
 
         my $current_count = scalar(@$all_results);
 
-        $est_hit_count = $summary->{estimated_hit_count} || $summary->{visible}
-            if $page == 0;
-
-        $logger->debug("staged search: located $current_count, with estimated hits=".
-            $summary->{estimated_hit_count}." : visible=".$summary->{visible}.", checked=".$summary->{checked});
-
-               if (defined($summary->{estimated_hit_count})) {
-            foreach (qw/ checked visible excluded deleted /) {
-                $global_summary->{$_} += $summary->{$_};
+        if ($page == 0) { # all summaries are the same, just get the first
+            for (keys %$summary) {
+                $global_summary->{$_} = $summary->{$_};
             }
-                       $global_summary->{total} = $summary->{total};
-               }
+        }
 
         # we've found all the possible hits
-        last if $current_count == $summary->{visible}
-            and not defined $summary->{estimated_hit_count};
+        last if $current_count == $summary->{visible};
 
         # we've found enough results to satisfy the requested limit/offset
         last if $current_count >= ($user_limit + $user_offset);
 
         # we've scanned all possible hits
-        if($summary->{checked} < $superpage_size) {
-            $est_hit_count = scalar(@$all_results);
-            # we have all possible results in hand, so we know the final hit count
-            $is_real_hit_count = 1;
-            last;
+        last if($summary->{checked} < $superpage_size);
+    }
+
+    # Let other backends grab our data now that we're done.
+    $cache_data = $cache->get_cache($key);
+    if ($$cache_data{running} and $$cache_data{running} == $$) {
+        delete $$cache_data{running};
+        $cache->put_cache($key, $cache_data, $cache_timeout);
+    }
+
+    my $setting_names = [ qw/
+             opac.did_you_mean.max_suggestions
+             opac.did_you_mean.low_result_threshold
+             search.symspell.min_suggestion_use_threshold
+             search.symspell.soundex.weight
+             search.symspell.pg_trgm.weight
+             search.symspell.keyboard_distance.weight/ ];
+    my %suggest_settings = $U->ou_ancestor_setting_batch_insecure(
+        $phys_loc, $setting_names
+    );
+
+    # Defaults...
+    $suggest_settings{$_} ||= {value=>undef} for @$setting_names;
+
+    # Pull this one off the front, it's not used for the function call
+    my $max_suggestions_setting = shift @$setting_names;
+    my $sugg_low_thresh_setting = shift @$setting_names;
+    $max_suggestions_setting = $suggest_settings{$max_suggestions_setting}{value} // -1;
+    my $suggest_low_threshold = $suggest_settings{$sugg_low_thresh_setting}{value} || 0;
+
+    if ($global_summary->{visible} <= $suggest_low_threshold and $max_suggestions_setting != 0) {
+        # For now, we're doing one-class/one-term suggestions only
+        my ($class, $term) = one_class_one_term($global_summary->{query_struct});
+        if ($class && $term) { # check for suggestions!
+            my $suggestion_verbosity = 4;
+            if ($max_suggestions_setting == -1) { # special value that means "only best suggestion, and not always"
+                $max_suggestions_setting = 1;
+                $suggestion_verbosity = 0;
+            }
+
+            my @settings_params = map { $suggest_settings{$_}{value} } @$setting_names;
+            my $suggs = new_editor()->json_query({
+                from  => [
+                    'search.symspell_lookup',
+                        $term, $class,
+                        $suggestion_verbosity,
+                        1, # case transfer
+                        @settings_params
+                ],
+                limit => $max_suggestions_setting
+            });
+            if (@$suggs and $$suggs[0]{suggestion} ne $term) {
+                $global_summary->{suggestions}{'one_class_one_term'} = {
+                    class       => $class,
+                    term        => $term,
+                    suggestions  => $suggs
+                };
+            }
         }
     }
 
     my @results = grep {defined $_} @$all_results[$user_offset..($user_offset + $user_limit - 1)];
 
-       # refine the estimate if we have more than one superpage
-       if ($page > 0 and not $is_real_hit_count) {
-               if ($global_summary->{checked} >= $global_summary->{total}) {
-                       $est_hit_count = $global_summary->{visible};
-               } else {
-                       my $updated_hit_count = $U->storagereq(
-                               'open-ils.storage.fts_paging_estimate',
-                               $global_summary->{checked},
-                               $global_summary->{visible},
-                               $global_summary->{excluded},
-                               $global_summary->{deleted},
-                               $global_summary->{total}
-                       );
-                       $est_hit_count = $updated_hit_count->{$estimation_strategy};
-               }
-       }
-
     $conn->respond_complete(
         {
-            count             => $est_hit_count,
+            global_summary    => $global_summary,
+            count             => $global_summary->{visible},
             core_limit        => $search_hash->{core_limit},
+            superpage         => $page,
             superpage_size    => $search_hash->{check_limit},
             superpage_summary => $current_page_summary,
             facet_key         => $facet_key,
@@ -1366,10 +1353,74 @@ sub staged_search {
         }
     );
 
-    cache_facets($facet_key, $new_ids, $IAmMetabib, $ignore_facet_classes) if $docache;
+    $logger->info("Completed canonicalized search is: $$global_summary{canonicalized_query}");
+
+    return cache_facets($facet_key, $new_ids, $IAmMetabib, $ignore_facet_classes) if $docache;
+}
+
+sub one_class_one_term {
+    my $qstruct = shift;
+    my $node = $$qstruct{children};
+
+    my $class = undef;
+    my $term = undef;
+    while ($node) {
+        last if (
+            $$node{'|'}
+            or @{$$node{'&'}} != 1
+            or ($$node{'&'}[0]{fields} and @{$$node{'&'}[0]{fields}} > 0)
+        );
+
+        $class ||= $$node{'&'}[0]{class};
+        $term ||= $$node{'&'}[0]{content};
+
+        last if ($term);
+
+        $node = $$node{'&'}[0]{children};
+    }
+
+    return ($class, $term);
+}
+
+sub fetch_display_fields {
+    my $self = shift;
+    my $conn = shift;
+    my $highlight_map = shift;
+    my @records = @_;
+
+    unless (@records) {
+        $conn->respond_complete;
+        return;
+    }
+
+    my $hl_map_string = "";
+    if (ref($highlight_map) =~ /HASH/) {
+        for my $tsq (keys %$highlight_map) {
+            my $field_list = join(',', @{$$highlight_map{$tsq}});
+            $hl_map_string .= ' || ' if $hl_map_string;
+            $hl_map_string .= "hstore(($tsq)\:\:TEXT,'$field_list')";
+        }
+    }
+
+    my $e = new_editor();
+
+    for my $record ( @records ) {
+        next unless ($record && $hl_map_string);
+        $conn->respond(
+            $e->json_query(
+                {from => ['search.highlight_display_fields', $record, $hl_map_string]}
+            )
+        );
+    }
 
     return undef;
 }
+__PACKAGE__->register_method(
+    method    => 'fetch_display_fields',
+    api_name  => 'open-ils.search.fetch.metabib.display_field.highlight',
+    stream   => 1
+);
+
 
 sub tag_circulated_records {
     my ($auth, $results, $metabib) = @_;
@@ -1378,15 +1429,15 @@ sub tag_circulated_records {
 
     my $query = {
         select   => { acn => [{ column => 'record', alias => 'tagme' }] }, 
-        from     => { acp => 'acn' }, 
-        where    => { id => { in => { from => ['action.usr_visible_circ_copies', $e->requestor->id] } } },
+        from     => { auch => { acp => { join => 'acn' }} }, 
+        where    => { usr => $e->requestor->id },
         distinct => 1
     };
 
     if ($metabib) {
         $query = {
-            select   => { mmsm => [{ column => 'metarecord', alias => 'tagme' }] },
-            from     => 'mmsm',
+            select   => { mmrsm => [{ column => 'metarecord', alias => 'tagme' }] },
+            from     => 'mmrsm',
             where    => { source => { in => $query } },
             distinct => 1
         };
@@ -1408,15 +1459,15 @@ sub tag_circulated_records {
 sub search_cache_key {
     my $method = shift;
     my $search_hash = shift;
-       my @sorted;
+    my @sorted;
     for my $key (sort keys %$search_hash) {
-           push(@sorted, ($key => $$search_hash{$key})) 
+        push(@sorted, ($key => $$search_hash{$key})) 
             unless $key eq 'limit'  or 
                    $key eq 'offset' or 
                    $key eq 'skip_check';
     }
-       my $s = OpenSRF::Utils::JSON->perl2JSON(\@sorted);
-       return $pfx . md5_hex($method . $s);
+    my $s = OpenSRF::Utils::JSON->perl2JSON(\@sorted);
+    return $pfx . md5_hex($method . $s);
 }
 
 sub retrieve_cached_facets {
@@ -1427,6 +1478,16 @@ sub retrieve_cached_facets {
 
     return undef unless ($key and $key =~ /_facets$/);
 
+    eval {
+        local $SIG{ALRM} = sub {die};
+        alarm(10); # we'll sleep for as much as 10s
+        do {
+            die if $cache->get_cache($key . '_COMPLETE');
+        } while (sleep(0.05));
+        alarm(0);
+    };
+    alarm(0);
+
     my $blob = $cache->get_cache($key) || {};
 
     my $facets = {};
@@ -1476,46 +1537,18 @@ sub cache_facets {
     my $data = $cache->get_cache($key);
     $data ||= {};
 
-    if (!ref($ignore)) {
-        $ignore = ['identifier']; # ignore the identifier class by default
-    }
-
     return undef unless (@$results);
 
-    # The query we're constructing
-    #
-    # select  mfae.field as id,
-    #         mfae.value,
-    #         count(distinct mmrsm.appropriate-id-field )
-    #   from  metabib.facet_entry mfae
-    #         join metabib.metarecord_sourc_map mmrsm on (mfae.source = mmrsm.source)
-    #   where mmrsm.appropriate-id-field in IDLIST
-    #   group by 1,2;
-
-    my $count_field = $metabib ? 'metarecord' : 'source';
-    my $facets = $U->cstorereq( "open-ils.cstore.json_query.atomic",
-        {   select  => {
-                mfae => [ { column => 'field', alias => 'id'}, 'value' ],
-                mmrsm => [{
-                    transform => 'count',
-                    distinct => 1,
-                    column => $count_field,
-                    alias => 'count',
-                    aggregate => 1
-                }]
-            },
-            from    => {
-                mfae => {
-                    mmrsm => { field => 'source', fkey => 'source' },
-                    cmf   => { field => 'id', fkey => 'field' }
-                }
-            },
-            where   => {
-                '+mmrsm' => { $count_field => $results },
-                '+cmf'   => { field_class => { 'not in' => $ignore }, facet_field => 't' }
-            }
-        }
-    );
+    my $facets_function = $metabib ? 'search.facets_for_metarecord_set'
+                                   : 'search.facets_for_record_set';
+    my $results_str = '{' . join(',', @$results) . '}';
+    my $ignore_str = ref($ignore) ? '{' . join(',', @$ignore) . '}'
+                                  : '{}';
+    my $query = {   
+        from => [ $facets_function, $ignore_str, $results_str ]
+    };
+
+    my $facets = OpenILS::Utils::CStoreEditor->new->json_query($query, {substream => 1});
 
     for my $facet (@$facets) {
         next unless ($facet->{value});
@@ -1525,6 +1558,7 @@ sub cache_facets {
     $logger->info("facet compilation: cached with key=$key");
 
     $cache->put_cache($key, $data, $cache_timeout);
+    $cache->put_cache($key.'_COMPLETE', 1, $cache_timeout);
 }
 
 sub cache_staged_search_page {
@@ -1538,49 +1572,51 @@ sub cache_staged_search_page {
     };
 
     $logger->info("staged search: cached with key=$key, superpage=$page, estimated=".
-        $summary->{estimated_hit_count}.", visible=".$summary->{visible});
+        ($summary->{estimated_hit_count} || "none") .
+        ", visible=" . ($summary->{visible} || "none")
+    );
 
     $cache->put_cache($key, $data, $cache_timeout);
 }
 
 sub search_cache {
 
-       my $key         = shift;
-       my $offset      = shift;
-       my $limit       = shift;
-       my $start       = $offset;
-       my $end         = $offset + $limit - 1;
+    my $key     = shift;
+    my $offset  = shift;
+    my $limit   = shift;
+    my $start   = $offset;
+    my $end     = $offset + $limit - 1;
 
-       $logger->debug("searching cache for $key : $start..$end\n");
+    $logger->debug("searching cache for $key : $start..$end\n");
 
-       return undef unless $cache;
-       my $data = $cache->get_cache($key);
+    return undef unless $cache;
+    my $data = $cache->get_cache($key);
 
-       return undef unless $data;
+    return undef unless $data;
 
-       my $count = $data->[0];
-       $data = $data->[1];
+    my $count = $data->[0];
+    $data = $data->[1];
 
-       return undef unless $offset < $count;
+    return undef unless $offset < $count;
 
-       my @result;
-       for( my $i = $offset; $i <= $end; $i++ ) {
-               last unless my $d = $$data[$i];
-               push( @result, $d );
-       }
+    my @result;
+    for( my $i = $offset; $i <= $end; $i++ ) {
+        last unless my $d = $$data[$i];
+        push( @result, $d );
+    }
 
-       $logger->debug("search_cache found ".scalar(@result)." items for count=$count, start=$start, end=$end");
+    $logger->debug("search_cache found ".scalar(@result)." items for count=$count, start=$start, end=$end");
 
-       return \@result;
+    return \@result;
 }
 
 
 sub put_cache {
-       my( $key, $count, $data ) = @_;
-       return undef unless $cache;
-       $logger->debug("search_cache putting ".
-               scalar(@$data)." items at key $key with timeout $cache_timeout");
-       $cache->put_cache($key, [ $count, $data ], $cache_timeout);
+    my( $key, $count, $data ) = @_;
+    return undef unless $cache;
+    $logger->debug("search_cache putting ".
+        scalar(@$data)." items at key $key with timeout $cache_timeout");
+    $cache->put_cache($key, [ $count, $data ], $cache_timeout);
 }
 
 
@@ -1590,16 +1626,16 @@ __PACKAGE__->register_method(
 );
 
 sub biblio_mrid_to_modsbatch_batch {
-       my( $self, $client, $mrids) = @_;
-       # warn "Performing mrid_to_modsbatch_batch..."; # unconditional warn
-       my @mods;
-       my $method = $self->method_lookup("open-ils.search.biblio.metarecord.mods_slim.retrieve");
-       for my $id (@$mrids) {
-               next unless defined $id;
-               my ($m) = $method->run($id);
-               push @mods, $m;
-       }
-       return \@mods;
+    my( $self, $client, $mrids) = @_;
+    # warn "Performing mrid_to_modsbatch_batch..."; # unconditional warn
+    my @mods;
+    my $method = $self->method_lookup("open-ils.search.biblio.metarecord.mods_slim.retrieve");
+    for my $id (@$mrids) {
+        next unless defined $id;
+        my ($m) = $method->run($id);
+        push @mods, $m;
+    }
+    return \@mods;
 }
 
 
@@ -1624,47 +1660,47 @@ foreach (qw /open-ils.search.biblio.metarecord.mods_slim.retrieve
 }
 
 sub biblio_mrid_to_modsbatch {
-       my( $self, $client, $mrid, $args) = @_;
+    my( $self, $client, $mrid, $args) = @_;
 
-       # warn "Grabbing mvr for $mrid\n";    # unconditional warn
+    # warn "Grabbing mvr for $mrid\n";    # unconditional warn
 
-       my ($mr, $evt) = _grab_metarecord($mrid);
-       return $evt unless $mr;
+    my ($mr, $evt) = _grab_metarecord($mrid);
+    return $evt unless $mr;
 
-       my $mvr = biblio_mrid_check_mvr($self, $client, $mr) ||
+    my $mvr = biblio_mrid_check_mvr($self, $client, $mr) ||
               biblio_mrid_make_modsbatch($self, $client, $mr);
 
-       return $mvr unless ref($args);  
+    return $mvr unless ref($args);  
 
-       # Here we find the lead record appropriate for the given filters 
-       # and use that for the title and author of the metarecord
+    # Here we find the lead record appropriate for the given filters 
+    # and use that for the title and author of the metarecord
     my $format = $$args{format};
     my $org    = $$args{org};
     my $depth  = $$args{depth};
 
-       return $mvr unless $format or $org or $depth;
+    return $mvr unless $format or $org or $depth;
 
-       my $method = "open-ils.storage.ordered.metabib.metarecord.records";
-       $method = "$method.staff" if $self->api_name =~ /staff/o; 
+    my $method = "open-ils.storage.ordered.metabib.metarecord.records";
+    $method = "$method.staff" if $self->api_name =~ /staff/o; 
 
-       my $rec = $U->storagereq($method, $format, $org, $depth, 1);
+    my $rec = $U->storagereq($method, $format, $org, $depth, 1);
 
-       if( my $mods = $U->record_to_mvr($rec) ) {
+    if( my $mods = $U->record_to_mvr($rec) ) {
 
         $mvr->title( $mods->title );
         $mvr->author($mods->author);
-               $logger->debug("mods_slim updating title and ".
-                       "author in mvr with ".$mods->title." : ".$mods->author);
-       }
+        $logger->debug("mods_slim updating title and ".
+            "author in mvr with ".$mods->title." : ".$mods->author);
+    }
 
-       return $mvr;
+    return $mvr;
 }
 
 # converts a metarecord to an mvr
 sub _mr_to_mvr {
-       my $mr = shift;
-       my $perl = OpenSRF::Utils::JSON->JSON2perl($mr->mods());
-       return Fieldmapper::metabib::virtual_record->new($perl);
+    my $mr = shift;
+    my $perl = OpenSRF::Utils::JSON->JSON2perl($mr->mods());
+    return Fieldmapper::metabib::virtual_record->new($perl);
 }
 
 # checks to see if a metarecord has mods, if so returns true;
@@ -1677,26 +1713,25 @@ __PACKAGE__->register_method(
 );
 
 sub biblio_mrid_check_mvr {
-       my( $self, $client, $mrid ) = @_;
-       my $mr; 
+    my( $self, $client, $mrid ) = @_;
+    my $mr; 
 
-       my $evt;
-       if(ref($mrid)) { $mr = $mrid; } 
-       else { ($mr, $evt) = _grab_metarecord($mrid); }
-       return $evt if $evt;
+    my $evt;
+    if(ref($mrid)) { $mr = $mrid; } 
+    else { ($mr, $evt) = _grab_metarecord($mrid); }
+    return $evt if $evt;
 
-       # warn "Checking mvr for mr " . $mr->id . "\n";   # unconditional warn
+    # warn "Checking mvr for mr " . $mr->id . "\n";   # unconditional warn
 
-       return _mr_to_mvr($mr) if $mr->mods();
-       return undef;
+    return _mr_to_mvr($mr) if $mr->mods();
+    return undef;
 }
 
 sub _grab_metarecord {
-       my $mrid = shift;
-       #my $e = OpenILS::Utils::Editor->new;
-       my $e = new_editor();
-       my $mr = $e->retrieve_metabib_metarecord($mrid) or return ( undef, $e->event );
-       return ($mr);
+    my $mrid = shift;
+    my $e = new_editor();
+    my $mr = $e->retrieve_metabib_metarecord($mrid) or return ( undef, $e->event );
+    return ($mr);
 }
 
 
@@ -1709,69 +1744,67 @@ __PACKAGE__->register_method(
 );
 
 sub biblio_mrid_make_modsbatch {
-       my( $self, $client, $mrid ) = @_;
+    my( $self, $client, $mrid ) = @_;
 
-       #my $e = OpenILS::Utils::Editor->new;
-       my $e = new_editor();
+    my $e = new_editor();
 
-       my $mr;
-       if( ref($mrid) ) {
-               $mr = $mrid;
-               $mrid = $mr->id;
-       } else {
-               $mr = $e->retrieve_metabib_metarecord($mrid) 
-                       or return $e->event;
-       }
+    my $mr;
+    if( ref($mrid) ) {
+        $mr = $mrid;
+        $mrid = $mr->id;
+    } else {
+        $mr = $e->retrieve_metabib_metarecord($mrid) 
+            or return $e->event;
+    }
 
-       my $masterid = $mr->master_record;
-       $logger->info("creating new mods batch for metarecord=$mrid, master record=$masterid");
+    my $masterid = $mr->master_record;
+    $logger->info("creating new mods batch for metarecord=$mrid, master record=$masterid");
 
-       my $ids = $U->storagereq(
-               'open-ils.storage.ordered.metabib.metarecord.records.staff.atomic', $mrid);
-       return undef unless @$ids;
+    my $ids = $U->storagereq(
+        'open-ils.storage.ordered.metabib.metarecord.records.staff.atomic', $mrid);
+    return undef unless @$ids;
 
-       my $master = $e->retrieve_biblio_record_entry($masterid)
-               or return $e->event;
+    my $master = $e->retrieve_biblio_record_entry($masterid)
+        or return $e->event;
 
-       # start the mods batch
-       my $u = OpenILS::Utils::ModsParser->new();
-       $u->start_mods_batch( $master->marc );
+    # start the mods batch
+    my $u = OpenILS::Utils::ModsParser->new();
+    $u->start_mods_batch( $master->marc );
 
-       # grab all of the sub-records and shove them into the batch
-       my @ids = grep { $_ ne $masterid } @$ids;
-       #my $subrecs = (@ids) ? $e->batch_retrieve_biblio_record_entry(\@ids) : [];
+    # grab all of the sub-records and shove them into the batch
+    my @ids = grep { $_ ne $masterid } @$ids;
+    #my $subrecs = (@ids) ? $e->batch_retrieve_biblio_record_entry(\@ids) : [];
 
-       my $subrecs = [];
-       if(@$ids) {
-               for my $i (@$ids) {
-                       my $r = $e->retrieve_biblio_record_entry($i);
-                       push( @$subrecs, $r ) if $r;
-               }
-       }
+    my $subrecs = [];
+    if(@$ids) {
+        for my $i (@$ids) {
+            my $r = $e->retrieve_biblio_record_entry($i);
+            push( @$subrecs, $r ) if $r;
+        }
+    }
 
-       for(@$subrecs) {
-               $logger->debug("adding record ".$_->id." to mods batch for metarecord=$mrid");
-               $u->push_mods_batch( $_->marc ) if $_->marc;
-       }
+    for(@$subrecs) {
+        $logger->debug("adding record ".$_->id." to mods batch for metarecord=$mrid");
+        $u->push_mods_batch( $_->marc ) if $_->marc;
+    }
 
 
-       # finish up and send to the client
-       my $mods = $u->finish_mods_batch();
-       $mods->doc_id($mrid);
-       $client->respond_complete($mods);
+    # finish up and send to the client
+    my $mods = $u->finish_mods_batch();
+    $mods->doc_id($mrid);
+    $client->respond_complete($mods);
 
 
-       # now update the mods string in the db
-       my $string = OpenSRF::Utils::JSON->perl2JSON($mods->decast);
-       $mr->mods($string);
+    # now update the mods string in the db
+    my $string = OpenSRF::Utils::JSON->perl2JSON($mods->decast);
+    $mr->mods($string);
 
-       #$e = OpenILS::Utils::Editor->new(xact => 1);
-       $e = new_editor(xact => 1);
-       $e->update_metabib_metarecord($mr) 
-               or $logger->error("Error setting mods text on metarecord $mrid : " . Dumper($e->event));
-       $e->finish;
+    $e = new_editor(xact => 1);
+    $e->update_metabib_metarecord($mr) 
+        or $logger->error("Error setting mods text on metarecord $mrid : " . Dumper($e->event));
+    $e->finish;
 
-       return undef;
+    return undef;
 }
 
 
@@ -1800,17 +1833,17 @@ foreach (qw/open-ils.search.biblio.metarecord_to_records
 }
 
 sub biblio_mrid_to_record_ids {
-       my( $self, $client, $mrid, $args ) = @_;
+    my( $self, $client, $mrid, $args ) = @_;
 
     my $format = $$args{format};
     my $org    = $$args{org};
     my $depth  = $$args{depth};
 
-       my $method = "open-ils.storage.ordered.metabib.metarecord.records.atomic";
-       $method =~ s/atomic/staff\.atomic/o if $self->api_name =~ /staff/o; 
-       my $recs = $U->storagereq($method, $mrid, $format, $org, $depth);
+    my $method = "open-ils.storage.ordered.metabib.metarecord.records.atomic";
+    $method =~ s/atomic/staff\.atomic/o if $self->api_name =~ /staff/o; 
+    my $recs = $U->storagereq($method, $mrid, $format, $org, $depth);
 
-       return { count => scalar(@$recs), ids => $recs };
+    return { count => scalar(@$recs), ids => $recs };
 }
 
 
@@ -1832,18 +1865,18 @@ my $slim_marc_sheet;
 my $settings_client = OpenSRF::Utils::SettingsClient->new();
 
 sub biblio_record_to_marc_html {
-       my($self, $client, $recordid, $slim, $marcxml) = @_;
+    my($self, $client, $recordid, $slim, $marcxml) = @_;
 
     my $sheet;
-       my $dir = $settings_client->config_value("dirs", "xsl");
+    my $dir = $settings_client->config_value("dirs", "xsl");
 
     if($slim) {
         unless($slim_marc_sheet) {
-                   my $xsl = $settings_client->config_value(
-                           "apps", "open-ils.search", "app_settings", 'marc_html_xsl_slim');
+            my $xsl = $settings_client->config_value(
+                "apps", "open-ils.search", "app_settings", 'marc_html_xsl_slim');
             if($xsl) {
-                       $xsl = $parser->parse_file("$dir/$xsl");
-                       $slim_marc_sheet = $xslt->parse_stylesheet($xsl);
+                $xsl = $parser->parse_file("$dir/$xsl");
+                $slim_marc_sheet = $xslt->parse_stylesheet($xsl);
             }
         }
         $sheet = $slim_marc_sheet;
@@ -1852,10 +1885,10 @@ sub biblio_record_to_marc_html {
     unless($sheet) {
         unless($marc_sheet) {
             my $xsl_key = ($slim) ? 'marc_html_xsl_slim' : 'marc_html_xsl';
-                   my $xsl = $settings_client->config_value(
-                           "apps", "open-ils.search", "app_settings", 'marc_html_xsl');
-                   $xsl = $parser->parse_file("$dir/$xsl");
-                   $marc_sheet = $xslt->parse_stylesheet($xsl);
+            my $xsl = $settings_client->config_value(
+                "apps", "open-ils.search", "app_settings", 'marc_html_xsl');
+            $xsl = $parser->parse_file("$dir/$xsl");
+            $marc_sheet = $xslt->parse_stylesheet($xsl);
         }
         $sheet = $marc_sheet;
     }
@@ -1873,18 +1906,90 @@ sub biblio_record_to_marc_html {
         $marcxml = $record->marc;
     }
 
-       my $xmldoc = $parser->parse_string($marcxml);
-       my $html = $sheet->transform($xmldoc);
-       return $html->documentElement->toString();
+    my $xmldoc = $parser->parse_string($marcxml);
+    my $html = $sheet->transform($xmldoc);
+    return $html->documentElement->toString();
+}
+
+__PACKAGE__->register_method(
+    method    => "send_event_email_output",
+    api_name  => "open-ils.search.biblio.record.email.send_output",
+);
+sub send_event_email_output {
+    my($self, $client, $auth, $event_id, $capkey, $capanswer) = @_;
+    return undef unless $event_id;
+
+    my $captcha_pass = 0;
+    my $real_answer;
+    if ($capkey) {
+        $real_answer = $cache->get_cache(md5_hex($capkey));
+        $captcha_pass++ if ($real_answer eq $capanswer);
+    }
+
+    my $e = new_editor(authtoken => $auth);
+    return $e->die_event unless $captcha_pass || $e->checkauth;
+
+    my $event = $e->retrieve_action_trigger_event([$event_id,{flesh => 1, flesh_fields => { atev => ['template_output']}}]);
+    return undef unless ($event and $event->template_output);
+
+    my $smtp = OpenSRF::Utils::SettingsClient
+        ->new
+        ->config_value('email_notify', 'smtp_server');
+
+    my $sender = Email::Send->new({mailer => 'SMTP'});
+    $sender->mailer_args([Host => $smtp]);
+
+    my $stat;
+    my $err;
+
+    my $email = Email::Simple->new($event->template_output->data);
+
+    for my $hfield (qw/From To Subject Bcc Cc Reply-To Sender/) {
+        my @headers = $email->header($hfield);
+        $email->header_set($hfield => map { encode("MIME-Header", $_) } @headers) if ($headers[0]);
+    }
+
+    $email->header_set('MIME-Version' => '1.0');
+    $email->header_set('Content-Type' => "text/plain; charset=UTF-8");
+    $email->header_set('Content-Transfer-Encoding' => '8bit');
+
+    try {
+        $stat = $sender->send($email);
+    } catch Error with {
+        $err = $stat = shift;
+        $logger->error("send_event_email_output: Email failed with error: $err");
+    };
+
+    if( !$err and $stat and $stat->type eq 'success' ) {
+        $logger->info("send_event_email_output: successfully sent email");
+        return 1;
+    } else {
+        $logger->warn("send_event_email_output: unable to send email: ".Dumper($stat));
+        return 0;
+    }
 }
 
 __PACKAGE__->register_method(
     method    => "format_biblio_record_entry",
+    api_name  => "open-ils.search.biblio.record.print.preview",
+);
+
+__PACKAGE__->register_method(
+    method    => "format_biblio_record_entry",
+    api_name  => "open-ils.search.biblio.record.email.preview",
+);
+
+__PACKAGE__->register_method(
+    method    => "format_biblio_record_entry",
     api_name  => "open-ils.search.biblio.record.print",
     signature => {
         desc   => 'Returns a printable version of the specified bib record',
         params => [
             { desc => 'Biblio record entry ID or array of IDs', type => 'number' },
+            { desc => 'Context library for holdings, if applicable', type => 'number' },
+            { desc => 'Sort order, if applicable', type => 'string' },
+            { desc => 'Sort direction, if applicable', type => 'string' },
+            { desc => 'Definition Group Member id', type => 'number' },
         ],
         return => {
             desc => q/An action_trigger.event object or error event./,
@@ -1898,8 +2003,15 @@ __PACKAGE__->register_method(
     signature => {
         desc   => 'Emails an A/T templated version of the specified bib records to the authorized user',
         params => [
-            { desc => 'Authentication token',  type => 'string'},
+            { desc => 'Authentication token', type => 'string'},
             { desc => 'Biblio record entry ID or array of IDs', type => 'number' },
+            { desc => 'Context library for holdings, if applicable', type => 'number' },
+            { desc => 'Sort order, if applicable', type => 'string' },
+            { desc => 'Sort direction, if applicable', type => 'string' },
+            { desc => 'Definition Group Member id', type => 'number' },
+            { desc => 'Whether to bypass auth due to captcha', type => 'bool' },
+            { desc => 'Email address, if none for the user', type => 'string' },
+            { desc => 'Subject, if customized', type => 'string' },
         ],
         return => {
             desc => q/Undefined on success, otherwise an error event./,
@@ -1909,25 +2021,43 @@ __PACKAGE__->register_method(
 );
 
 sub format_biblio_record_entry {
-    my($self, $conn, $arg1, $arg2) = @_;
+    my ($self, $conn) = splice @_, 0, 2;
 
     my $for_print = ($self->api_name =~ /print/);
     my $for_email = ($self->api_name =~ /email/);
+    my $preview = ($self->api_name =~ /preview/);
 
-    my $e; my $auth; my $bib_id; my $context_org;
+    my ($auth, $captcha_pass, $email, $subject);
+    if ($for_email) {
+        $auth = shift @_;
+        ($captcha_pass, $email, $subject) = splice @_, -3, 3;
+    }
+    my ($bib_id, $holdings_context_org, $bib_sort, $sort_dir, $group_member) = @_;
+    $holdings_context_org ||= $U->get_org_tree->id;
+    $bib_sort ||= 'author';
+    $sort_dir ||= 'ascending';
+
+    my $e; my $event_context_org; my $type = 'brief';
 
     if ($for_print) {
-        $bib_id = $arg1;
-        $context_org = $arg2 || $U->fetch_org_tree->id;
+        $event_context_org = $holdings_context_org;
         $e = new_editor(xact => 1);
     } elsif ($for_email) {
-        $auth = $arg1;
-        $bib_id = $arg2;
         $e = new_editor(authtoken => $auth, xact => 1);
-        return $e->die_event unless $e->checkauth;
-        $context_org = $e->requestor->home_ou;
+        return $e->die_event unless $captcha_pass || $e->checkauth;
+        $event_context_org = $e->requestor ? $e->requestor->home_ou : $holdings_context_org;
+        $email ||= $e->requestor ? $e->requestor->email : '';
+    }
+
+    if ($group_member) {
+        $group_member = $e->retrieve_action_trigger_event_def_group_member($group_member);
+        if ($group_member and $U->is_true($group_member->holdings)) {
+            $type = 'full';
+        }
     }
 
+    $holdings_context_org = $e->retrieve_actor_org_unit($holdings_context_org);
+
     my $bib_ids;
     if (ref $bib_id ne 'ARRAY') {
         $bib_ids = [ $bib_id ];
@@ -1939,7 +2069,7 @@ sub format_biblio_record_entry {
     $bucket->btype('temp');
     $bucket->name('format_biblio_record_entry ' . $U->create_uuid_string);
     if ($for_email) {
-        $bucket->owner($e->requestor) 
+        $bucket->owner($e->requestor || 1) 
     } else {
         $bucket->owner(1);
     }
@@ -1957,13 +2087,26 @@ sub format_biblio_record_entry {
 
     $e->commit;
 
+    my $usr_data = {
+        type        => $type,
+        email       => $email,
+        subject     => $subject,
+        context_org => $holdings_context_org->shortname,
+        sort_by     => $bib_sort,
+        sort_dir    => $sort_dir,
+        preview     => $preview
+    };
+
     if ($for_print) {
 
-        return $U->fire_object_event(undef, 'biblio.format.record_entry.print', [ $bucket ], $context_org);
+        return $U->fire_object_event(undef, 'biblio.format.record_entry.print', [ $bucket ], $event_context_org, undef, [ $usr_data ]);
 
     } elsif ($for_email) {
 
-        $U->create_events_for_hook('biblio.format.record_entry.email', $bucket, $context_org, undef, undef, 1);
+        return $U->fire_object_event(undef, 'biblio.format.record_entry.email', [ $bucket ], $event_context_org, undef, [ $usr_data ])
+            if ($preview);
+
+        $U->create_events_for_hook('biblio.format.record_entry.email', $bucket, $event_context_org, undef, $usr_data, 1);
     }
 
     return undef;
@@ -1976,8 +2119,8 @@ __PACKAGE__->register_method(
 );
 
 sub retrieve_all_copy_statuses {
-       my( $self, $client ) = @_;
-       return new_editor()->retrieve_all_config_copy_status();
+    my( $self, $client ) = @_;
+    return new_editor()->retrieve_all_config_copy_status();
 }
 
 
@@ -1992,18 +2135,18 @@ __PACKAGE__->register_method(
 );
 
 sub copy_counts_per_org {
-       my( $self, $client, $record_id ) = @_;
+    my( $self, $client, $record_id ) = @_;
 
-       warn "Retreiveing copy copy counts for record $record_id and method " . $self->api_name . "\n";
+    warn "Retreiveing copy copy counts for record $record_id and method " . $self->api_name . "\n";
 
-       my $method = "open-ils.storage.biblio.record_entry.global_copy_count.atomic";
-       if($self->api_name =~ /staff/) { $method =~ s/atomic/staff\.atomic/; }
+    my $method = "open-ils.storage.biblio.record_entry.global_copy_count.atomic";
+    if($self->api_name =~ /staff/) { $method =~ s/atomic/staff\.atomic/; }
 
-       my $counts = $apputils->simple_scalar_request(
-               "open-ils.storage", $method, $record_id );
+    my $counts = $apputils->simple_scalar_request(
+        "open-ils.storage", $method, $record_id );
 
-       $counts = [ sort {$a->[0] <=> $b->[0]} @$counts ];
-       return $counts;
+    $counts = [ sort {$a->[0] <=> $b->[0]} @$counts ];
+    return $counts;
 }
 
 
@@ -2014,14 +2157,14 @@ __PACKAGE__->register_method(
               . "[ org_id, callnumber_prefix, callnumber_label, callnumber_suffix, <status1_count>, <status2_count>,...] "
               . "where statusx is a copy status name.  The statuses are sorted by ID.",
 );
-               
+        
 
 sub copy_count_summary {
-       my( $self, $client, $rid, $org, $depth ) = @_;
+    my( $self, $client, $rid, $org, $depth ) = @_;
     $org   ||= 1;
     $depth ||= 0;
     my $data = $U->storagereq(
-               'open-ils.storage.biblio.record_entry.status_copy_count.atomic', $rid, $org, $depth );
+        'open-ils.storage.biblio.record_entry.status_copy_count.atomic', $rid, $org, $depth );
 
     return [ sort {
         (($a->[1] ? $a->[1] . ' ' : '') . $a->[2] . ($a->[3] ? ' ' . $a->[3] : ''))
@@ -2043,7 +2186,7 @@ sub copy_location_count_summary {
     $org   ||= 1;
     $depth ||= 0;
     my $data = $U->storagereq(
-               'open-ils.storage.biblio.record_entry.status_copy_location_count.atomic', $rid, $org, $depth );
+        'open-ils.storage.biblio.record_entry.status_copy_location_count.atomic', $rid, $org, $depth );
 
     return [ sort {
         (($a->[1] ? $a->[1] . ' ' : '') . $a->[2] . ($a->[3] ? ' ' . $a->[3] : ''))
@@ -2091,8 +2234,7 @@ __PACKAGE__->register_method(
                         'See perldoc ' . __PACKAGE__ . ' for more detail.',
                 type => 'object'
             },
-            {desc => 'limit (optional)',  type => 'number'},
-            {desc => 'offset (optional)', type => 'number'}
+            {desc => 'timeout (optional)',  type => 'number'}
         ],
         return => {
             desc => 'Results object like: { "count": $i, "ids": [...] }',
@@ -2102,7 +2244,7 @@ __PACKAGE__->register_method(
 );
 }
 
-=head3 open-ils.search.biblio.marc (arghash, limit, offset)
+=head3 open-ils.search.biblio.marc (arghash, timeout)
 
 As elsewhere the arghash is the required argument, and must be a hashref.  The keys are:
 
@@ -2159,49 +2301,49 @@ Presently, search uses the cache unconditionally.
 =cut
 
 # FIXME: that example above isn't actually tested.
+# FIXME: sort and limit added.  item_type not tested yet.
 # TODO: docache option?
 sub marc_search {
-       my( $self, $conn, $args, $limit, $offset, $timeout ) = @_;
+    my( $self, $conn, $args, $timeout ) = @_;
 
-       my $method = 'open-ils.storage.biblio.full_rec.multi_search';
-       $method .= ".staff" if $self->api_name =~ /staff/;
-       $method .= ".atomic";
+    my $method = 'open-ils.storage.biblio.full_rec.multi_search';
+    $method .= ".staff" if $self->api_name =~ /staff/;
+    $method .= ".atomic";
 
-    $limit  ||= 10;     # FIXME: what about $args->{limit} ?
-    $offset ||=  0;     # FIXME: what about $args->{offset} ?
+    my $limit = $args->{limit} || 10;
+    my $offset = $args->{offset} || 0;
 
     # allow caller to pass in a call timeout since MARC searches
     # can take longer than the default 60-second timeout.  
     # Default to 2 mins.  Arbitrarily cap at 5 mins.
     $timeout = 120 if !$timeout or $timeout > 300;
 
-       my @search;
-       push( @search, ($_ => $$args{$_}) ) for (sort keys %$args);
-       my $ckey = $pfx . md5_hex($method . OpenSRF::Utils::JSON->perl2JSON(\@search));
+    my @search;
+    push( @search, ($_ => $$args{$_}) ) for (sort keys %$args);
+    my $ckey = $pfx . md5_hex($method . OpenSRF::Utils::JSON->perl2JSON(\@search));
 
-       my $recs = search_cache($ckey, $offset, $limit);
+    my $recs = search_cache($ckey, $offset, $limit);
 
-       if(!$recs) {
+    if(!$recs) {
 
         my $ses = OpenSRF::AppSession->create('open-ils.storage');
         my $req = $ses->request($method, %$args);
         my $resp = $req->recv($timeout);
 
         if($resp and $recs = $resp->content) {
-                       put_cache($ckey, scalar(@$recs), $recs);
-                       $recs = [ @$recs[$offset..($offset + ($limit - 1))] ];
-               } else {
-                       $recs = [];
-               }
+            put_cache($ckey, scalar(@$recs), $recs);
+        } else {
+            $recs = [];
+        }
 
         $ses->kill_me;
-       }
+    }
 
-       my $count = 0;
-       $count = $recs->[0]->[2] if $recs->[0] and $recs->[0]->[2];
-       my @recs = map { $_->[0] } @$recs;
+    my $count = 0;
+    $count = $recs->[0]->[2] if $recs->[0] and $recs->[0]->[2];
+    my @recs = map { $_->[0] } @$recs;
 
-       return { ids => \@recs, count => $count };
+    return { ids => \@recs, count => $count };
 }
 
 
@@ -2226,25 +2368,25 @@ __PACKAGE__->register_method(
 }
 
 sub biblio_search_isbn { 
-       my( $self, $client, $isbn ) = @_;
-       $logger->debug("Searching ISBN $isbn");
-       # the previous implementation of this method was essentially unlimited,
-       # so we will set our limit very high and let multiclass.query provide any
-       # actual limit
-       # XXX: if making this unlimited is deemed important, we might consider
-       # reworking 'open-ils.storage.id_list.biblio.record_entry.search.isbn',
-       # which is functionally deprecated at this point, or a custom call to
-       # 'open-ils.storage.biblio.multiclass.search_fts'
+    my( $self, $client, $isbn ) = @_;
+    $logger->debug("Searching ISBN $isbn");
+    # the previous implementation of this method was essentially unlimited,
+    # so we will set our limit very high and let multiclass.query provide any
+    # actual limit
+    # XXX: if making this unlimited is deemed important, we might consider
+    # reworking 'open-ils.storage.id_list.biblio.record_entry.search.isbn',
+    # which is functionally deprecated at this point, or a custom call to
+    # 'open-ils.storage.biblio.multiclass.search_fts'
 
     my $isbn_method = 'open-ils.search.biblio.multiclass.query';
     if ($self->api_name =~ m/.staff$/) {
         $isbn_method .= '.staff';
     }
 
-       my $method = $self->method_lookup($isbn_method);
-       my ($search_result) = $method->run({'limit' => 1000000}, "identifier|isbn:$isbn");
-       my @recs = map { $_->[0] } @{$search_result->{'ids'}};
-       return { ids => \@recs, count => $search_result->{'count'} };
+    my $method = $self->method_lookup($isbn_method);
+    my ($search_result) = $method->run({'limit' => 1000000}, "identifier|isbn:$isbn");
+    my @recs = map { $_->[0] } @{$search_result->{'ids'}};
+    return { ids => \@recs, count => $search_result->{'count'} };
 }
 
 __PACKAGE__->register_method(
@@ -2254,21 +2396,21 @@ __PACKAGE__->register_method(
 
 # XXX: see biblio_search_isbn() for note concerning 'limit'
 sub biblio_search_isbn_batch { 
-       my( $self, $client, $isbn_list ) = @_;
-       $logger->debug("Searching ISBNs @$isbn_list");
-       my @recs = (); my %rec_set = ();
-       my $method = $self->method_lookup('open-ils.search.biblio.multiclass.query');
-       foreach my $isbn ( @$isbn_list ) {
-               my ($search_result) = $method->run({'limit' => 1000000}, "identifier|isbn:$isbn");
-               my @recs_subset = map { $_->[0] } @{$search_result->{'ids'}};
-               foreach my $rec (@recs_subset) {
-                       if (! $rec_set{ $rec }) {
-                               $rec_set{ $rec } = 1;
-                               push @recs, $rec;
-                       }
-               }
-       }
-       return { ids => \@recs, count => scalar(@recs) };
+    my( $self, $client, $isbn_list ) = @_;
+    $logger->debug("Searching ISBNs @$isbn_list");
+    my @recs = (); my %rec_set = ();
+    my $method = $self->method_lookup('open-ils.search.biblio.multiclass.query');
+    foreach my $isbn ( @$isbn_list ) {
+        my ($search_result) = $method->run({'limit' => 1000000}, "identifier|isbn:$isbn");
+        my @recs_subset = map { $_->[0] } @{$search_result->{'ids'}};
+        foreach my $rec (@recs_subset) {
+            if (! $rec_set{ $rec }) {
+                $rec_set{ $rec } = 1;
+                push @recs, $rec;
+            }
+        }
+    }
+    return { ids => \@recs, count => scalar(@recs) };
 }
 
 foreach my $issn_method (qw/
@@ -2292,25 +2434,25 @@ __PACKAGE__->register_method(
 }
 
 sub biblio_search_issn { 
-       my( $self, $client, $issn ) = @_;
-       $logger->debug("Searching ISSN $issn");
-       # the previous implementation of this method was essentially unlimited,
-       # so we will set our limit very high and let multiclass.query provide any
-       # actual limit
-       # XXX: if making this unlimited is deemed important, we might consider
-       # reworking 'open-ils.storage.id_list.biblio.record_entry.search.issn',
-       # which is functionally deprecated at this point, or a custom call to
-       # 'open-ils.storage.biblio.multiclass.search_fts'
+    my( $self, $client, $issn ) = @_;
+    $logger->debug("Searching ISSN $issn");
+    # the previous implementation of this method was essentially unlimited,
+    # so we will set our limit very high and let multiclass.query provide any
+    # actual limit
+    # XXX: if making this unlimited is deemed important, we might consider
+    # reworking 'open-ils.storage.id_list.biblio.record_entry.search.issn',
+    # which is functionally deprecated at this point, or a custom call to
+    # 'open-ils.storage.biblio.multiclass.search_fts'
 
     my $issn_method = 'open-ils.search.biblio.multiclass.query';
     if ($self->api_name =~ m/.staff$/) {
         $issn_method .= '.staff';
     }
 
-       my $method = $self->method_lookup($issn_method);
-       my ($search_result) = $method->run({'limit' => 1000000}, "identifier|issn:$issn");
-       my @recs = map { $_->[0] } @{$search_result->{'ids'}};
-       return { ids => \@recs, count => $search_result->{'count'} };
+    my $method = $self->method_lookup($issn_method);
+    my ($search_result) = $method->run({'limit' => 1000000}, "identifier|issn:$issn");
+    my @recs = map { $_->[0] } @{$search_result->{'ids'}};
+    return { ids => \@recs, count => $search_result->{'count'} };
 }
 
 
@@ -2330,11 +2472,11 @@ __PACKAGE__->register_method(
 );
 
 sub fetch_mods_by_copy {
-       my( $self, $client, $copyid ) = @_;
-       my ($record, $evt) = $apputils->fetch_record_by_copy( $copyid );
-       return $evt if $evt;
-       return OpenILS::Event->new('ITEM_NOT_CATALOGED') unless $record->marc;
-       return $apputils->record_to_mvr($record);
+    my( $self, $client, $copyid ) = @_;
+    my ($record, $evt) = $apputils->fetch_record_by_copy( $copyid );
+    return $evt if $evt;
+    return OpenILS::Event->new('ITEM_NOT_CATALOGED') unless $record->marc;
+    return $apputils->record_to_mvr($record);
 }
 
 
@@ -2361,17 +2503,17 @@ __PACKAGE__->register_method(
 
 # RETURNS array of arrays like so: label, owning_lib, record, id
 sub cn_browse {
-       my( $self, $client, @params ) = @_;
-       my $method;
+    my( $self, $client, @params ) = @_;
+    my $method;
 
-       $method = 'open-ils.storage.asset.call_number.browse.target.atomic' 
-               if( $self->api_name =~ /target/ );
-       $method = 'open-ils.storage.asset.call_number.browse.page_up.atomic'
-               if( $self->api_name =~ /page_up/ );
-       $method = 'open-ils.storage.asset.call_number.browse.page_down.atomic'
-               if( $self->api_name =~ /page_down/ );
+    $method = 'open-ils.storage.asset.call_number.browse.target.atomic' 
+        if( $self->api_name =~ /target/ );
+    $method = 'open-ils.storage.asset.call_number.browse.page_up.atomic'
+        if( $self->api_name =~ /page_up/ );
+    $method = 'open-ils.storage.asset.call_number.browse.page_down.atomic'
+        if( $self->api_name =~ /page_down/ );
 
-       return $apputils->simplereq( 'open-ils.storage', $method, @params );
+    return $apputils->simplereq( 'open-ils.storage', $method, @params );
 }
 # -------------------------------------------------------------------------------------
 
@@ -2383,12 +2525,12 @@ __PACKAGE__->register_method(
 );
 
 sub fetch_cn {
-       my( $self, $client, $id ) = @_;
+    my( $self, $client, $id ) = @_;
 
-       my $e = new_editor();
-       my( $cn, $evt ) = $apputils->fetch_callnumber( $id, 0, $e );
-       return $evt if $evt;
-       return $cn;
+    my $e = new_editor();
+    my( $cn, $evt ) = $apputils->fetch_callnumber( $id, 0, $e );
+    return $evt if $evt;
+    return $cn;
 }
 
 __PACKAGE__->register_method(
@@ -2399,12 +2541,12 @@ __PACKAGE__->register_method(
 );
 
 sub fetch_fleshed_cn {
-       my( $self, $client, $id ) = @_;
+    my( $self, $client, $id ) = @_;
 
-       my $e = new_editor();
-       my( $cn, $evt ) = $apputils->fetch_callnumber( $id, 1, $e );
-       return $evt if $evt;
-       return $cn;
+    my $e = new_editor();
+    my( $cn, $evt ) = $apputils->fetch_callnumber( $id, 1, $e );
+    return $evt if $evt;
+    return $cn;
 }
 
 
@@ -2412,36 +2554,36 @@ __PACKAGE__->register_method(
     method    => "fetch_copy_by_cn",
     api_name  => 'open-ils.search.copies_by_call_number.retrieve',
     signature => q/
-               Returns an array of copy ID's by callnumber ID
-               @param cnid The callnumber ID
-               @return An array of copy IDs
-       /
+        Returns an array of copy ID's by callnumber ID
+        @param cnid The callnumber ID
+        @return An array of copy IDs
+    /
 );
 
 sub fetch_copy_by_cn {
-       my( $self, $conn, $cnid ) = @_;
-       return $U->cstorereq(
-               'open-ils.cstore.direct.asset.copy.id_list.atomic', 
-               { call_number => $cnid, deleted => 'f' } );
+    my( $self, $conn, $cnid ) = @_;
+    return $U->cstorereq(
+        'open-ils.cstore.direct.asset.copy.id_list.atomic', 
+        { call_number => $cnid, deleted => 'f' } );
 }
 
 __PACKAGE__->register_method(
     method    => 'fetch_cn_by_info',
     api_name  => 'open-ils.search.call_number.retrieve_by_info',
     signature => q/
-               @param label The callnumber label
-               @param record The record the cn is attached to
-               @param org The owning library of the cn
-               @return The callnumber object
-       /
+        @param label The callnumber label
+        @param record The record the cn is attached to
+        @param org The owning library of the cn
+        @return The callnumber object
+    /
 );
 
 
 sub fetch_cn_by_info {
-       my( $self, $conn, $label, $record, $org ) = @_;
-       return $U->cstorereq(
-               'open-ils.cstore.direct.asset.call_number.search',
-               { label => $label, record => $record, owning_lib => $org, deleted => 'f' });
+    my( $self, $conn, $label, $record, $org ) = @_;
+    return $U->cstorereq(
+        'open-ils.cstore.direct.asset.call_number.search',
+        { label => $label, record => $record, owning_lib => $org, deleted => 'f' });
 }
 
 
@@ -2473,10 +2615,10 @@ __PACKAGE__->register_method(
 );
 
 sub bib_extras {
-       my $self = shift;
+    my $self = shift;
     $logger->warn("deprecation warning: " .$self->api_name);
 
-       my $e = new_editor();
+    my $e = new_editor();
 
     my $ctype = $self->{ctype};
     my $ccvms = $e->search_config_coded_value_map({ctype => $ctype});
@@ -2512,9 +2654,8 @@ __PACKAGE__->register_method(
 sub fetch_slim_record {
     my( $self, $conn, $ids ) = @_;
 
-#my $editor = OpenILS::Utils::Editor->new;
     my $editor = new_editor();
-       my @res;
+    my @res;
     for( @$ids ) {
         return $editor->event unless
             my $r = $editor->retrieve_biblio_record_entry($_);
@@ -2529,11 +2670,11 @@ __PACKAGE__->register_method(
     api_name  => 'open-ils.search.biblio.record_hold_parts',
     signature => q/
        Returns a list of {label :foo, id : bar} objects for viable monograph parts for a given record
-       /
+    /
 );
 
 sub rec_hold_parts {
-       my( $self, $conn, $args ) = @_;
+    my( $self, $conn, $args ) = @_;
 
     my $rec        = $$args{record};
     my $mrec       = $$args{metarecord};
@@ -2554,7 +2695,8 @@ sub rec_hold_parts {
                     },
                     distinct => 1,
                 }
-            }
+            },
+            deleted => 'f'
         },
         order_by =>[{class=>'bmp', field=>'label_sortkey'}]
     };
@@ -2577,16 +2719,16 @@ __PACKAGE__->register_method(
     method    => 'rec_to_mr_rec_descriptors',
     api_name  => 'open-ils.search.metabib.record_to_descriptors',
     signature => q/
-               specialized method...
-               Given a biblio record id or a metarecord id, 
-               this returns a list of metabib.record_descriptor
-               objects that live within the same metarecord
-               @param args Object of args including:
-       /
+        specialized method...
+        Given a biblio record id or a metarecord id, 
+        this returns a list of metabib.record_descriptor
+        objects that live within the same metarecord
+        @param args Object of args including:
+    /
 );
 
 sub rec_to_mr_rec_descriptors {
-       my( $self, $conn, $args ) = @_;
+    my( $self, $conn, $args ) = @_;
 
     my $rec        = $$args{record};
     my $mrec       = $$args{metarecord};
@@ -2597,69 +2739,69 @@ sub rec_to_mr_rec_descriptors {
 
     my $hard_boundary = $U->ou_ancestor_setting_value($pickup_lib, OILS_SETTING_HOLD_HARD_BOUNDARY) if (defined $pickup_lib);
 
-       my $e = new_editor();
-       my $recs;
-
-       if( !$mrec ) {
-               my $map = $e->search_metabib_metarecord_source_map({source => $rec});
-               return $e->event unless @$map;
-               $mrec = $$map[0]->metarecord;
-       }
-
-       $recs = $e->search_metabib_metarecord_source_map({metarecord => $mrec});
-       return $e->event unless @$recs;
-
-       my @recs = map { $_->source } @$recs;
-       my $search = { record => \@recs };
-       $search->{item_form} = $item_forms if $item_forms and @$item_forms;
-       $search->{item_type} = $item_types if $item_types and @$item_types;
-       $search->{item_lang} = $item_lang  if $item_lang;
-
-       my $desc = $e->search_metabib_record_descriptor($search);
-
-       my $query = {
-               distinct => 1,
-               select   => { 'bre' => ['id'] },
-               from     => {
-                       'bre' => {
-                               'acn' => {
-                                       'join' => {
-                                               'acp' => {"join" => {"acpl" => {}, "ccs" => {}}}
-                                         }
-                                 }
-                        }
-               },
-               where => {
-                       '+bre' => { id => \@recs },
-                       '+acp' => {
-                               holdable => 't',
-                               deleted  => 'f'
-                       },
-                       "+ccs" => { holdable => 't' },
-                       "+acpl" => { holdable => 't' }
-               }
-       };
-
-       if ($hard_boundary) { # 0 (or "top") is the same as no setting
-               my $orgs = $e->json_query(
-                       { from => [ 'actor.org_unit_descendants' => $pickup_lib, $hard_boundary ] }
-               ) or return $e->die_event;
-
-               $query->{where}->{"+acp"}->{circ_lib} = [ map { $_->{id} } @$orgs ];
-       }
-
-       my $good_records = $e->json_query($query) or return $e->die_event;
-
-       my @keep;
-       for my $d (@$desc) {
-               if ( grep { $d->record == $_->{id} } @$good_records ) {
-                       push @keep, $d;
-               }
-       }
-
-       $desc = \@keep;
-
-       return { metarecord => $mrec, descriptors => $desc };
+    my $e = new_editor();
+    my $recs;
+
+    if( !$mrec ) {
+        my $map = $e->search_metabib_metarecord_source_map({source => $rec});
+        return $e->event unless @$map;
+        $mrec = $$map[0]->metarecord;
+    }
+
+    $recs = $e->search_metabib_metarecord_source_map({metarecord => $mrec});
+    return $e->event unless @$recs;
+
+    my @recs = map { $_->source } @$recs;
+    my $search = { record => \@recs };
+    $search->{item_form} = $item_forms if $item_forms and @$item_forms;
+    $search->{item_type} = $item_types if $item_types and @$item_types;
+    $search->{item_lang} = $item_lang  if $item_lang;
+
+    my $desc = $e->search_metabib_record_descriptor($search);
+
+    my $query = {
+        distinct => 1,
+        select   => { 'bre' => ['id'] },
+        from     => {
+            'bre' => {
+                'acn' => {
+                    'join' => {
+                        'acp' => {"join" => {"acpl" => {}, "ccs" => {}}}
+                      }
+                  }
+             }
+        },
+        where => {
+            '+bre' => { id => \@recs },
+            '+acp' => {
+                holdable => 't',
+                deleted  => 'f'
+            },
+            "+ccs" => { holdable => 't' },
+            "+acpl" => { holdable => 't', deleted => 'f' }
+        }
+    };
+
+    if ($hard_boundary) { # 0 (or "top") is the same as no setting
+        my $orgs = $e->json_query(
+            { from => [ 'actor.org_unit_descendants' => $pickup_lib, $hard_boundary ] }
+        ) or return $e->die_event;
+
+        $query->{where}->{"+acp"}->{circ_lib} = [ map { $_->{id} } @$orgs ];
+    }
+
+    my $good_records = $e->json_query($query) or return $e->die_event;
+
+    my @keep;
+    for my $d (@$desc) {
+        if ( grep { $d->record == $_->{id} } @$good_records ) {
+            push @keep, $d;
+        }
+    }
+
+    $desc = \@keep;
+
+    return { metarecord => $mrec, descriptors => $desc };
 }
 
 
@@ -2669,7 +2811,7 @@ __PACKAGE__->register_method(
 );
 
 sub fetch_age_protect {
-       return new_editor()->retrieve_all_config_rule_age_hold_protect();
+    return new_editor()->retrieve_all_config_rule_age_hold_protect();
 }
 
 
@@ -2684,27 +2826,468 @@ __PACKAGE__->register_method(
 );
 
 sub copies_by_cn_label {
-       my( $self, $conn, $record, $cn_parts, $circ_lib ) = @_;
-       my $e = new_editor();
+    my( $self, $conn, $record, $cn_parts, $circ_lib ) = @_;
+    my $e = new_editor();
     my $cnp_id = $cn_parts->[0] eq '' ? -1 : $e->search_asset_call_number_prefix({label => $cn_parts->[0]}, {idlist=>1})->[0];
     my $cns_id = $cn_parts->[2] eq '' ? -1 : $e->search_asset_call_number_suffix({label => $cn_parts->[2]}, {idlist=>1})->[0];
-       my $cns = $e->search_asset_call_number({record => $record, prefix => $cnp_id, label => $cn_parts->[1], suffix => $cns_id, deleted => 'f'}, {idlist=>1});
-       return [] unless @$cns;
-
-       # show all non-deleted copies in the staff client ...
-       if ($self->api_name =~ /staff$/o) {
-               return $e->search_asset_copy({call_number => $cns, circ_lib => $circ_lib, deleted => 'f'}, {idlist=>1});
-       }
-
-       # ... otherwise, grab the copies ...
-       my $copies = $e->search_asset_copy(
-               [ {call_number => $cns, circ_lib => $circ_lib, deleted => 'f', opac_visible => 't'},
-                 {flesh => 1, flesh_fields => { acp => [ qw/location status/] } }
-               ]
-       );
-
-       # ... and test for location and status visibility
-       return [ map { ($U->is_true($_->location->opac_visible) && $U->is_true($_->status->opac_visible)) ? ($_->id) : () } @$copies ];
+    my $cns = $e->search_asset_call_number({record => $record, prefix => $cnp_id, label => $cn_parts->[1], suffix => $cns_id, deleted => 'f'}, {idlist=>1});
+    return [] unless @$cns;
+
+    # show all non-deleted copies in the staff client ...
+    if ($self->api_name =~ /staff$/o) {
+        return $e->search_asset_copy({call_number => $cns, circ_lib => $circ_lib, deleted => 'f'}, {idlist=>1});
+    }
+
+    # ... otherwise, grab the copies ...
+    my $copies = $e->search_asset_copy(
+        [ {call_number => $cns, circ_lib => $circ_lib, deleted => 'f', opac_visible => 't'},
+          {flesh => 1, flesh_fields => { acp => [ qw/location status/] } }
+        ]
+    );
+
+    # ... and test for location and status visibility
+    return [ map { ($U->is_true($_->location->opac_visible) && $U->is_true($_->status->opac_visible)) ? ($_->id) : () } @$copies ];
+}
+
+__PACKAGE__->register_method(
+    method   => 'bib_copies',
+    api_name => 'open-ils.search.bib.copies',
+    stream => 1
+);
+__PACKAGE__->register_method(
+    method   => 'bib_copies',
+    api_name => 'open-ils.search.bib.copies.staff',
+    stream => 1
+);
+
+sub bib_copies {
+    my ($self, $client, $rec_id, $org, $depth, $limit, $offset, $pref_ou) = @_;
+    my $is_staff = ($self->api_name =~ /staff/);
+
+    my $cstore = OpenSRF::AppSession->create('open-ils.cstore');
+    my $req = $cstore->request(
+        'open-ils.cstore.json_query', mk_copy_query(
+        $rec_id, $org, $depth, $limit, $offset, $pref_ou, $is_staff));
+
+    my $resp;
+    while ($resp = $req->recv) {
+        $client->respond($resp->content); 
+    }
+
+    return undef;
+}
+
+# TODO: this comes almost directly from WWW/EGCatLoader/Record.pm
+# Refactor to share
+sub mk_copy_query {
+    my $rec_id = shift;
+    my $org = shift;
+    my $depth = shift;
+    my $copy_limit = shift;
+    my $copy_offset = shift;
+    my $pref_ou = shift;
+    my $is_staff = shift;
+    my $base_query = shift;
+
+    my $query = $base_query || $U->basic_opac_copy_query(
+        $rec_id, undef, undef, $copy_limit, $copy_offset, $is_staff
+    );
+
+    if ($org) { # TODO: root org test
+        # no need to add the org join filter if we're not actually filtering
+        $query->{from}->{acp}->[1] = { aou => {
+            fkey => 'circ_lib',
+            field => 'id',
+            filter => {
+                id => {
+                    in => {
+                        select => {aou => [{
+                            column => 'id', 
+                            transform => 'actor.org_unit_descendants',
+                            result_field => 'id', 
+                            params => [$depth]
+                        }]},
+                        from => 'aou',
+                        where => {id => $org}
+                    }
+                }
+            }
+        }};
+
+        if ($pref_ou) {
+            # Make sure the pref OU is included in the results
+            my $in = $query->{from}->{acp}->[1]->{aou}->{filter}->{id}->{in};
+            delete $query->{from}->{acp}->[1]->{aou}->{filter}->{id};
+            $query->{from}->{acp}->[1]->{aou}->{filter}->{'-or'} = [
+                {id => {in => $in}},
+                {id => $pref_ou}
+            ];
+        }
+    };
+
+    # Unsure if we want these in the shared function, leaving here for now
+    unshift(@{$query->{order_by}},
+        { class => "aou", field => 'id',
+          transform => 'evergreen.rank_ou', params => [$org, $pref_ou]
+        }
+    );
+    push(@{$query->{order_by}},
+        { class => "acp", field => 'id',
+          transform => 'evergreen.rank_cp'
+        }
+    );
+
+    return $query;
+}
+
+__PACKAGE__->register_method(
+    method    => 'record_urls',
+    api_name  => 'open-ils.search.biblio.record.resource_urls.retrieve',
+    argc      => 1,
+    stream    => 1,
+    signature => {
+        desc   => q/Returns bib record 856 URL content./,
+        params => [
+            {desc => 'Context org unit ID', type => 'number'},
+            {desc => 'Record ID or Array of Record IDs', type => 'number or array'}
+        ],
+        return => {
+            desc => 'Stream of URL objects, one collection object per record',
+            type => 'object'
+        }
+    }
+);
+
+sub record_urls {
+    my ($self, $client, $org_id, $record_ids) = @_;
+
+    $record_ids = [$record_ids] unless ref $record_ids eq 'ARRAY';
+
+    my $e = new_editor();
+
+    for my $record_id (@$record_ids) {
+
+        my @urls;
+
+        # Start with scoped located URIs
+        my $uris = $e->json_query({
+            from => ['evergreen.located_uris_as_uris', $record_id, $org_id]});
+
+        for my $uri (@$uris) {
+            push(@urls, {
+                href => $uri->{href},
+                label => $uri->{label},
+                note => $uri->{use_restriction}
+            });
+        }
+
+        # Logic copied from TPAC misc_utils.tts
+        my $bib = $e->retrieve_biblio_record_entry($record_id)
+            or return $e->event;
+
+        my $marc_doc = $U->marc_xml_to_doc($bib->marc);
+
+        for my $node ($marc_doc->findnodes('//*[@tag="856" and @ind1="4"]')) {
+
+            # asset.uri's
+            next if $node->findnodes('./*[@code="9" or @code="w" or @code="n"]');
+
+            my $url = {};
+            my ($label) = $node->findnodes('./*[@code="y"]');
+            my ($notes) = $node->findnodes('./*[@code="z" or @code="3"]');
+
+            my $first = 1;
+            for my $href_node ($node->findnodes('./*[@code="u"]')) {
+                next unless $href_node;
+
+                # it's possible for multiple $u's to exist within 1 856 tag.
+                # in that case, honor the label/notes data for the first $u, but
+                # leave any subsequent $u's as unadorned href's.
+                # use href/link/note keys to be consistent with args.uri's
+
+                my $href = $href_node->textContent;
+                push(@urls, {
+                    href => $href,
+                    label => ($first && $label) ?  $label->textContent : $href,
+                    note => ($first && $notes) ? $notes->textContent : '',
+                    ind2 => $node->getAttribute('ind2')
+                });
+                $first = 0;
+            }
+        }
+
+        $client->respond({id => $record_id, urls => \@urls});
+    }
+
+    return undef;
+}
+
+__PACKAGE__->register_method(
+    method    => 'catalog_record_summary',
+    api_name  => 'open-ils.search.biblio.record.catalog_summary',
+    stream    => 1,
+    max_bundle_count => 1,
+    signature => {
+        desc   => 'Stream of record data suitable for catalog display',
+        params => [
+            {desc => 'Context org unit ID', type => 'number'},
+            {desc => 'Array of Record IDs', type => 'array'}
+        ],
+        return => { 
+            desc => q/
+                Stream of record summary objects including id, record,
+                hold_count, copy_counts, display (metabib display
+                fields), attributes (metabib record attrs), plus
+                metabib_id and metabib_records for the metabib variant.
+            /
+        }
+    }
+);
+__PACKAGE__->register_method(
+    method    => 'catalog_record_summary',
+    api_name  => 'open-ils.search.biblio.record.catalog_summary.staff',
+    stream    => 1,
+    max_bundle_count => 1,
+    signature => q/see open-ils.search.biblio.record.catalog_summary/
+);
+__PACKAGE__->register_method(
+    method    => 'catalog_record_summary',
+    api_name  => 'open-ils.search.biblio.metabib.catalog_summary',
+    stream    => 1,
+    max_bundle_count => 1,
+    signature => q/see open-ils.search.biblio.record.catalog_summary/
+);
+
+__PACKAGE__->register_method(
+    method    => 'catalog_record_summary',
+    api_name  => 'open-ils.search.biblio.metabib.catalog_summary.staff',
+    stream    => 1,
+    max_bundle_count => 1,
+    signature => q/see open-ils.search.biblio.record.catalog_summary/
+);
+
+
+sub catalog_record_summary {
+    my ($self, $client, $org_id, $record_ids, $options) = @_;
+    my $e = new_editor();
+    $options ||= {};
+    my $pref_ou = $options->{pref_ou};
+
+    my $is_meta = ($self->api_name =~ /metabib/);
+    my $is_staff = ($self->api_name =~ /staff/);
+
+    my $holds_method = $is_meta ? 
+        'open-ils.circ.mmr.holds.count' : 
+        'open-ils.circ.bre.holds.count';
+
+    my $copy_method = $is_meta ? 
+        'open-ils.search.biblio.metarecord.copy_count':
+        'open-ils.search.biblio.record.copy_count';
+
+    $copy_method .= '.staff' if $is_staff;
+
+    $copy_method = $self->method_lookup($copy_method); # local method
+
+    for my $rec_id (@$record_ids) {
+
+        my $response = $is_meta ? 
+            get_one_metarecord_summary($self, $e, $org_id, $rec_id) :
+            get_one_record_summary($self, $e, $org_id, $rec_id);
+
+        ($response->{copy_counts}) = $copy_method->run($org_id, $rec_id);
+
+        $response->{first_call_number} = get_first_call_number(
+            $e, $rec_id, $org_id, $is_staff, $is_meta, $options);
+
+        if ($pref_ou) {
+
+            # If we already have the pref ou copy counts, avoid the extra fetch.
+            my ($match) = 
+                grep {$_->{org_unit} eq $pref_ou} @{$response->{copy_counts}};
+
+            if (!$match) {
+                my ($counts) = $copy_method->run($pref_ou, $rec_id);
+                ($match) = grep {$_->{org_unit} eq $pref_ou} @$counts;
+            }
+
+            $response->{pref_ou_copy_counts} = $match;
+        }
+
+        $response->{hold_count} = 
+            $U->simplereq('open-ils.circ', $holds_method, $rec_id);
+
+        if ($options->{flesh_copies}) {
+            $response->{copies} = get_representative_copies(
+                $e, $rec_id, $org_id, $is_staff, $is_meta, $options);
+        }
+
+        $client->respond($response);
+    }
+
+    return undef;
+}
+
+# Returns a snapshot of copy information for a given record or metarecord,
+# sorted by pref org and search org.
+sub get_representative_copies {
+    my ($e, $rec_id, $org_id, $is_staff, $is_meta, $options) = @_;
+
+    my @rec_ids;
+    my $limit = $options->{copy_limit};
+    my $copy_depth = $options->{copy_depth};
+    my $copy_offset = $options->{copy_offset};
+    my $pref_ou = $options->{pref_ou};
+
+    my $org_tree = $U->get_org_tree;
+    if (!$org_id) { $org_id = $org_tree->id; }
+    my $org = $U->find_org($org_tree, $org_id);
+
+    return [] unless $org;
+
+    my $func = 'unapi.biblio_record_entry_feed';
+    my $includes = '{holdings_xml,acp,acnp,acns}';
+    my $limits = "acn=>$limit,acp=>$limit";
+
+    if ($is_meta) {
+        $func = 'unapi.metabib_virtual_record_feed';
+        $includes = '{holdings_xml,acp,acnp,acns,mmr.unapi}';
+        $limits .= ",bre=>$limit";
+    }
+
+    my $xml_query = $e->json_query({from => [
+        $func, '{'.$rec_id.'}', 'marcxml', 
+        $includes, $org->shortname, $copy_depth, $limits,
+        undef, undef,undef, undef, undef, 
+        undef, undef, undef, $pref_ou
+    ]})->[0];
+
+    my $xml = $xml_query->{$func};
+
+    my $doc = XML::LibXML->new->parse_string($xml);
+
+    my $copies = [];
+    for my $volume ($doc->documentElement->findnodes('//*[local-name()="volume"]')) {
+        my $label = $volume->getAttribute('label');
+        my $prefix = $volume->getElementsByTagName('call_number_prefix')->[0]->getAttribute('label');
+        my $suffix = $volume->getElementsByTagName('call_number_suffix')->[0]->getAttribute('label');
+
+        my $copies_node = $volume->findnodes('./*[local-name()="copies"]')->[0];
+
+        for my $copy ($copies_node->findnodes('./*[local-name()="copy"]')) {
+
+            my $status = $copy->getElementsByTagName('status')->[0]->textContent;
+            my $location = $copy->getElementsByTagName('location')->[0]->textContent;
+            my $circ_lib_sn = $copy->getElementsByTagName('circ_lib')->[0]->getAttribute('shortname');
+
+            push(@$copies, {
+                call_number_label => $label,
+                call_number_prefix_label => $prefix,
+                call_number_suffix_label => $suffix,
+                circ_lib_sn => $circ_lib_sn,
+                copy_status => $status,
+                copy_location => $location
+            });
+        }
+    }
+
+    return $copies;
+}
+
+sub get_first_call_number {
+    my ($e, $rec_id, $org_id, $is_staff, $is_meta, $options) = @_;
+
+    my $limit = $options->{copy_limit};
+    $options->{copy_limit} = 1;
+
+    my $copies = get_representative_copies(
+        $e, $rec_id, $org_id, $is_staff, $is_meta, $options);
+
+    $options->{copy_limit} = $limit;
+
+    return $copies->[0];
+}
+
+sub get_one_rec_urls {
+    my ($self, $e, $org_id, $bib_id) = @_;
+
+    my ($resp) = $self->method_lookup(
+        'open-ils.search.biblio.record.resource_urls.retrieve')
+        ->run($org_id, $bib_id);
+
+    return $resp->{urls};
+}
+
+# Start with a bib summary and augment the data with additional
+# metarecord content.
+sub get_one_metarecord_summary {
+    my ($self, $e, $org_id, $rec_id) = @_;
+
+    my $meta = $e->retrieve_metabib_metarecord($rec_id) or return {};
+    my $maps = $e->search_metabib_metarecord_source_map({metarecord => $rec_id});
+
+    my $bre_id = $meta->master_record; 
+
+    my $response = get_one_record_summary($self, $e, $org_id, $bre_id);
+    $response->{urls} = get_one_rec_urls($self, $e, $org_id, $bre_id);
+
+    $response->{metabib_id} = $rec_id;
+    $response->{metabib_records} = [map {$_->source} @$maps];
+
+    my @other_bibs = map {$_->source} grep {$_->source != $bre_id} @$maps;
+
+    # Augment the record attributes with those of all of the records
+    # linked to this metarecord.
+    if (@other_bibs) {
+        my $attrs = $e->search_metabib_record_attr_flat({id => \@other_bibs});
+
+        my $attributes = $response->{attributes};
+
+        for my $attr (@$attrs) {
+            $attributes->{$attr->attr} = [] unless $attributes->{$attr->attr};
+            push(@{$attributes->{$attr->attr}}, $attr->value) # avoid dupes
+                unless grep {$_ eq $attr->value} @{$attributes->{$attr->attr}};
+        }
+    }
+
+    return $response;
+}
+
+sub get_one_record_summary {
+    my ($self, $e, $org_id, $rec_id) = @_;
+
+    my $bre = $e->retrieve_biblio_record_entry([$rec_id, {
+        flesh => 1,
+        flesh_fields => {
+            bre => [qw/compressed_display_entries mattrs creator editor/]
+        }
+    }]) or return {};
+
+    # Compressed display fields are pachaged as JSON
+    my $display = {};
+    $display->{$_->name} = OpenSRF::Utils::JSON->JSON2perl($_->value)
+        foreach @{$bre->compressed_display_entries};
+
+    # Create an object of 'mraf' attributes.
+    # Any attribute can be multi so dedupe and array-ify all of them.
+    my $attributes = {};
+    for my $attr (@{$bre->mattrs}) {
+        $attributes->{$attr->attr} = {} unless $attributes->{$attr->attr};
+        $attributes->{$attr->attr}->{$attr->value} = 1; # avoid dupes
+    }
+    $attributes->{$_} = [keys %{$attributes->{$_}}] for keys %$attributes;
+
+    # clear bulk
+    $bre->clear_marc;
+    $bre->clear_mattrs;
+    $bre->clear_compressed_display_entries;
+
+    return {
+        id => $rec_id,
+        record => $bre,
+        display => $display,
+        attributes => $attributes,
+        urls => get_one_rec_urls($self, $e, $org_id, $rec_id)
+    };
 }