Skip to content

Commit

Permalink
Implement pagination with Point In Time and search_after #34
Browse files Browse the repository at this point in the history
  • Loading branch information
ahakanzn committed Sep 4, 2024
1 parent 275096a commit c6d29de
Show file tree
Hide file tree
Showing 8 changed files with 163 additions and 45 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
<spring-cloud-openfeign.version>2.2.6.RELEASE</spring-cloud-openfeign.version>
<spring-cloud-zookeeper.version>2.2.4.RELEASE</spring-cloud-zookeeper.version>
<zookeeper.version>3.4.14</zookeeper.version>
<elasticsearch.version>7.9.1</elasticsearch.version>
<elasticsearch.version>7.17.18</elasticsearch.version>
<guava.version>17.0</guava.version>
<logback.version>1.2.3</logback.version>
<logstash-logback-encoder.version>4.11</logstash-logback-encoder.version>
Expand Down
30 changes: 23 additions & 7 deletions src/main/java/org/gbif/literature/export/CsvWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package org.gbif.literature.export;

import org.gbif.api.model.common.export.ExportFormat;
import org.gbif.api.model.common.paging.PagingResponse;
import org.gbif.api.model.literature.LiteratureTopic;
import org.gbif.api.model.literature.LiteratureType;
import org.gbif.api.model.literature.search.LiteratureSearchResult;
Expand Down Expand Up @@ -58,10 +59,12 @@ public class CsvWriter<T> {

private final CellProcessor[] processors;

private final Iterable<T> pager;
private final LiteraturePager pager;

private final ExportFormat preference;

private final int exportPageLimit;

// Use dozer if set to true.
private Class<?> forClass;

Expand Down Expand Up @@ -91,8 +94,14 @@ public void export(Writer writer) {
private void exportUsingBeanWriter(Writer writer) {
try (ICsvBeanWriter beanWriter = new CsvBeanWriter(writer, csvPreference())) {
beanWriter.writeHeader(header);
for (T o : pager) {
beanWriter.write(o, fields, processors);
while (true) {
PagingResponse<LiteratureSearchResult> response = pager.nextPage(exportPageLimit);
for (LiteratureSearchResult result : response.getResults()) {
beanWriter.write(result, fields, processors);
}
if (response.isEndOfRecords() || response.getResults().isEmpty()) {
break;
}
}
}
}
Expand All @@ -102,15 +111,21 @@ private void exportUsingDozerBeanWriter(Writer writer) {
try (CsvDozerBeanWriter beanWriter = new CsvDozerBeanWriter(writer, csvPreference())) {
beanWriter.writeHeader(header);
beanWriter.configureBeanMapping(forClass, fields);
for (T o : pager) {
beanWriter.write(o, processors);
while (true) {
PagingResponse<LiteratureSearchResult> response = pager.nextPage(exportPageLimit);
for (LiteratureSearchResult result : response.getResults()) {
beanWriter.write(result, processors);
}
if (response.isEndOfRecords() || response.getResults().isEmpty()) {
break;
}
}
}
}

/** Creates an CsvWriter/exporter of DatasetSearchResult. */
public static CsvWriter<LiteratureSearchResult> literatureSearchResultCsvWriter(
Iterable<LiteratureSearchResult> pager, ExportFormat preference) {
LiteraturePager pager, ExportFormat preference, int exportPageLimit) {
return CsvWriter.<LiteratureSearchResult>builder()
.fields(
new String[] {
Expand Down Expand Up @@ -179,7 +194,8 @@ public static CsvWriter<LiteratureSearchResult> literatureSearchResultCsvWriter(
new Optional(new ListStringProcessor()) // gbifDownloadKey
})
.preference(preference)
.pager(pager)
.pager(pager)
.exportPageLimit(exportPageLimit)
.build();
}

Expand Down
91 changes: 79 additions & 12 deletions src/main/java/org/gbif/literature/export/LiteraturePager.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,33 +13,100 @@
*/
package org.gbif.literature.export;

import org.gbif.api.model.common.paging.PagingRequest;
import java.io.IOException;

import java.util.Arrays;
import java.util.List;

import org.elasticsearch.action.search.ClosePointInTimeRequest;
import org.elasticsearch.action.search.OpenPointInTimeRequest;
import org.elasticsearch.action.search.OpenPointInTimeResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.core.TimeValue;

import org.gbif.api.model.common.paging.PagingResponse;
import org.gbif.api.model.common.search.SearchRequest;
import org.gbif.api.model.literature.search.LiteratureSearchRequest;
import org.gbif.api.model.literature.search.LiteratureSearchResult;
import org.gbif.api.util.iterables.BasePager;
import org.gbif.literature.config.EsClientConfigProperties;
import org.gbif.literature.search.LiteratureEsResponseParser;
import org.gbif.literature.search.LiteratureSearchService;

/** Iterates over results of {@link LiteratureSearchService#search(SearchRequest)}. */
public class LiteraturePager extends BasePager<LiteratureSearchResult> {
public class LiteraturePager {

private final LiteratureSearchService literatureSearchService;
private final RestHighLevelClient client;
private final LiteratureSearchRequest literatureSearchRequest;
private List<Object> searchAfterValues;
private String pitId;
private String index;
private final LiteratureEsResponseParser esResponseParser;

public LiteraturePager(
LiteratureSearchService literatureSearchService,
LiteratureSearchRequest literatureSearchRequest,
int pageSize) {
super(pageSize);
EsClientConfigProperties esClientConfigProperties,
LiteratureSearchService literatureSearchService,
LiteratureSearchRequest literatureSearchRequest,
RestHighLevelClient client, LiteratureEsResponseParser esResponseParser) {
this.index = esClientConfigProperties.getIndex();
this.literatureSearchService = literatureSearchService;
this.literatureSearchRequest = literatureSearchRequest;
this.client = client;
this.esResponseParser = esResponseParser;
this.pitId = null;
this.searchAfterValues = null;
}

public PagingResponse<LiteratureSearchResult> nextPage(int pageSize) {
literatureSearchRequest.setLimit(pageSize);
try {
if (pitId == null) {
pitId = openPIT();
}

SearchResponse searchResponse =
literatureSearchService.exportSearch(literatureSearchRequest, searchAfterValues, pitId);

org.gbif.api.model.common.search.SearchResponse results = parseSearchResults(searchResponse);

pitId = searchResponse.pointInTimeId();

if (results.isEndOfRecords() || searchResponse.getHits().getHits().length == 0) {
closePIT(pitId);
pitId = null;
return results;
}

searchAfterValues = getSearchAfterValues(searchResponse);
return results;
} catch (IOException e) {
throw new RuntimeException(e);
}
}

private org.gbif.api.model.common.search.SearchResponse parseSearchResults(
org.elasticsearch.action.search.SearchResponse esSearchResponse) {
return esResponseParser.buildSearchResponse(esSearchResponse, literatureSearchRequest);
}

private List<Object> getSearchAfterValues(SearchResponse searchResponse) {
if (searchResponse.getHits().getHits().length == 0) {
return null;
}
return Arrays.asList(searchResponse.getHits().getHits()[searchResponse.getHits().getHits().length - 1].getSortValues());
}

private String openPIT() throws IOException {
OpenPointInTimeRequest openRequest = new OpenPointInTimeRequest(index);
openRequest.keepAlive(TimeValue.timeValueMinutes(1));
OpenPointInTimeResponse openResponse = client.openPointInTime(openRequest, RequestOptions.DEFAULT);
return openResponse.getPointInTimeId();
}

@Override
public PagingResponse<LiteratureSearchResult> nextPage(PagingRequest pagingRequest) {
literatureSearchRequest.setOffset(pagingRequest.getOffset());
literatureSearchRequest.setLimit(pagingRequest.getLimit());
return literatureSearchService.exportSearch(literatureSearchRequest);
private void closePIT(String pitId) throws IOException {
ClosePointInTimeRequest closeRequest = new ClosePointInTimeRequest(pitId);
client.closePointInTime(closeRequest, RequestOptions.DEFAULT);
}
}
17 changes: 15 additions & 2 deletions src/main/java/org/gbif/literature/resource/LiteratureResource.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
*/
package org.gbif.literature.resource;

import org.elasticsearch.client.RestHighLevelClient;

import org.gbif.api.documentation.CommonParameters;
import org.gbif.api.model.common.export.ExportFormat;
import org.gbif.api.model.common.paging.Pageable;
Expand All @@ -26,8 +28,10 @@
import org.gbif.api.model.literature.search.LiteratureSearchResult;
import org.gbif.api.vocabulary.Country;
import org.gbif.api.vocabulary.Language;
import org.gbif.literature.config.EsClientConfigProperties;
import org.gbif.literature.export.CsvWriter;
import org.gbif.literature.export.LiteraturePager;
import org.gbif.literature.search.LiteratureEsResponseParser;
import org.gbif.literature.search.LiteratureSearchService;

import java.io.BufferedWriter;
Expand Down Expand Up @@ -98,9 +102,17 @@ public class LiteratureResource {
private static final int EXPORT_PAGE_LIMIT = 5_000;

private final LiteratureSearchService searchService;
private final RestHighLevelClient client;
private final LiteratureEsResponseParser esResponseParser;
private final EsClientConfigProperties esClientConfigProperties;

public LiteratureResource(LiteratureSearchService searchService) {
public LiteratureResource(LiteratureSearchService searchService, RestHighLevelClient client,
LiteratureEsResponseParser esResponseParser,
EsClientConfigProperties esClientConfigProperties) {
this.searchService = searchService;
this.client = client;
this.esResponseParser = esResponseParser;
this.esClientConfigProperties = esClientConfigProperties;
}

private static final String REPEATED =
Expand Down Expand Up @@ -388,8 +400,9 @@ public void export(
FILE_HEADER_PRE + System.currentTimeMillis() + '.' + format.name().toLowerCase());

try (Writer writer = new BufferedWriter(new OutputStreamWriter(response.getOutputStream()))) {
LiteraturePager pager = new LiteraturePager(esClientConfigProperties,searchService, searchRequest, client, esResponseParser);
CsvWriter.literatureSearchResultCsvWriter(
new LiteraturePager(searchService, searchRequest, EXPORT_PAGE_LIMIT), format)
pager, format, EXPORT_PAGE_LIMIT)
.export(writer);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.elasticsearch.search.aggregations.bucket.filter.FilterAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.builder.PointInTimeBuilder;
import org.elasticsearch.search.fetch.subphase.highlight.HighlightBuilder;
import org.elasticsearch.search.sort.SortBuilder;
import org.elasticsearch.search.sort.SortBuilders;
Expand Down Expand Up @@ -85,15 +86,23 @@ public EsSearchRequestBuilder(EsFieldMapper<P> esFieldMapper) {
}

public SearchRequest buildSearchRequest(
FacetedSearchRequest<P> searchRequest, boolean facetsEnabled, String index) {
FacetedSearchRequest<P> searchRequest, boolean facetsEnabled, String index, List<Object> searchAfter, String pitId) {

SearchRequest esSearchRequest = new SearchRequest();
esSearchRequest.indices(index);

SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.trackTotalHits(true);
esSearchRequest.source(searchSourceBuilder);

searchSourceBuilder.fetchSource(esFieldMapper.getMappedFields(), esFieldMapper.excludeFields());
if (searchAfter != null && !searchAfter.isEmpty()) {
searchSourceBuilder.searchAfter(searchAfter.toArray());
}

if (pitId != null) {
searchSourceBuilder.pointInTimeBuilder(new PointInTimeBuilder(pitId));
} else {
esSearchRequest.indices(index);
}

// size and offset
searchSourceBuilder.size(searchRequest.getLimit());
Expand Down Expand Up @@ -129,6 +138,7 @@ public SearchRequest buildSearchRequest(

// post-filter
buildPostFilter(groupedParams.postFilterParams).ifPresent(searchSourceBuilder::postFilter);
esSearchRequest.source(searchSourceBuilder);

return esSearchRequest;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
*/
package org.gbif.literature.search;

import org.elasticsearch.index.query.ZeroTermsQueryOption;

import org.gbif.api.model.literature.LiteratureRelevance;
import org.gbif.api.model.literature.LiteratureTopic;
import org.gbif.api.model.literature.LiteratureType;
Expand All @@ -25,7 +27,6 @@
import org.elasticsearch.index.query.Operator;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.search.MatchQuery;
import org.elasticsearch.search.sort.FieldSortBuilder;
import org.elasticsearch.search.sort.SortBuilder;
import org.elasticsearch.search.sort.SortBuilders;
Expand Down Expand Up @@ -173,7 +174,7 @@ public QueryBuilder fullTextQuery(String q) {
.fuzziness("AUTO")
.prefixLength(3)
.lenient(true)
.zeroTermsQuery(MatchQuery.ZeroTermsQuery.ALL));
.zeroTermsQuery(ZeroTermsQueryOption.ALL));

return boolQueryBuilder;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,12 @@
*/
package org.gbif.literature.search;

import org.gbif.api.model.common.search.SearchResponse;
import java.io.IOException;

import java.util.List;

import org.elasticsearch.action.search.SearchResponse;

import org.gbif.api.model.literature.search.LiteratureSearchParameter;
import org.gbif.api.model.literature.search.LiteratureSearchRequest;
import org.gbif.api.model.literature.search.LiteratureSearchResult;
Expand All @@ -27,6 +32,8 @@ public interface LiteratureSearchService

Optional<LiteratureSearchResult> get(Object identifier);

SearchResponse<LiteratureSearchResult, LiteratureSearchParameter> exportSearch(
LiteratureSearchRequest literatureSearchRequest);
SearchResponse exportSearch(
LiteratureSearchRequest literatureSearchRequest,List<Object> searchAfter, String pitId )
throws IOException;

}
Loading

0 comments on commit c6d29de

Please sign in to comment.