Skip to content

Commit

Permalink
Elasticsearch docs update
Browse files Browse the repository at this point in the history
Added code snippet about Bulk API usage via low level RestClient

expanded Bulk API usage description

rewritten low level Rest Client example
added Java API example

added awaitility
added test cases for bulk operations
  • Loading branch information
lasteris committed Aug 24, 2024
1 parent 15d0e96 commit cb2cc54
Show file tree
Hide file tree
Showing 10 changed files with 395 additions and 67 deletions.
157 changes: 140 additions & 17 deletions docs/src/main/asciidoc/elasticsearch.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -126,10 +126,13 @@ package org.acme.elasticsearch;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
import org.apache.http.util.EntityUtils;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
Expand All @@ -151,13 +154,49 @@ public class FruitService {
restClient.performRequest(request); //<4>
}
public void index(List<Fruit> list) throws IOException {
var entityList = new ArrayList<JsonObject>();
for (var fruit : list) {
entityList.add(new JsonObject().put("index", new JsonObject()//<5>
.put("_index", "fruits").put("_id", fruit.id)));
entityList.add(JsonObject.mapFrom(fruit));
}
Request request = new Request(
"POST", "fruits/_bulk?pretty");
request.setEntity(new StringEntity(
toNdJsonString(entityList),//<6>
ContentType.create("application/x-ndjson")));//<7>
restClient.performRequest(request);
}
public void delete(List<String> identityList) throws IOException {
var entityList = new ArrayList<JsonObject>();
for (var id : identityList) {
entityList.add(new JsonObject().put("delete",
new JsonObject().put("_index", "fruits").put("_id", id)));//<8>
}
Request request = new Request(
"POST", "fruits/_bulk?pretty");
request.setEntity(new StringEntity(
toNdJsonString(entityList),
ContentType.create("application/x-ndjson")));
restClient.performRequest(request);
}
public Fruit get(String id) throws IOException {
Request request = new Request(
"GET",
"/fruits/_doc/" + id);
Response response = restClient.performRequest(request);
String responseBody = EntityUtils.toString(response.getEntity());
JsonObject json = new JsonObject(responseBody); //<5>
JsonObject json = new JsonObject(responseBody); //<9>
return json.getJsonObject("_source").mapTo(Fruit.class);
}
Expand Down Expand Up @@ -191,32 +230,63 @@ public class FruitService {
}
return results;
}
private static String toNdJsonString(List<JsonObject> objects) {
return objects.stream()
.map(JsonObject::encode)
.collect(Collectors.joining("\n", "", "\n"));
}
}
----
<1> We inject an Elasticsearch low level `RestClient` into our service.
<2> We create an Elasticsearch request.
<3> We use Vert.x `JsonObject` to serialize the object before sending it to Elasticsearch, you can use whatever you want to serialize your objects to JSON.
<4> We send the request (indexing request here) to Elasticsearch.
<5> In order to deserialize the object from Elasticsearch, we again use Vert.x `JsonObject`.
<5> As we `index` collection of objects we should use `index`, `create` or `update` action.
<6> We use `toNdJsonString(entityList)` call to produce output like below
+
[source, json]
----
{"index", {"_index" : "fruits", "_id", "1"}}
{"id": "1", "name": "apple", "color": "red"}
... ... ... ...
{"create", {"_index" : "fruits", "_id", "N"}}
{"id": "N", "name": "dragonfruit", "color": "pink"}
----
<7> Pass the content type that is expected by the search backend for bulk requests.
<8> The bulk API's delete operation JSON already contains all the required information; hence, there is no request body following this operation in the Bulk API request body.
+
[source, json]
----
{"delete", {"_index" : "fruits", "_id", "1"}}
{"delete", {"_index" : "fruits", "_id", "2"}}
... ... ... ...
{"delete", {"_index" : "fruits", "_id", "N"}}
----
<9> In order to deserialize the object from Elasticsearch, we again use Vert.x JsonObject.

Check warning on line 268 in docs/src/main/asciidoc/elasticsearch.adoc

View workflow job for this annotation

GitHub Actions / Linting with Vale

[vale] reported by reviewdog 🐶 [Quarkus.TermsWarnings] Consider using 'to' rather than 'In order to' unless updating existing content that uses the term. Raw Output: {"message": "[Quarkus.TermsWarnings] Consider using 'to' rather than 'In order to' unless updating existing content that uses the term.", "location": {"path": "docs/src/main/asciidoc/elasticsearch.adoc", "range": {"start": {"line": 268, "column": 5}}}, "severity": "WARNING"}

Check warning on line 268 in docs/src/main/asciidoc/elasticsearch.adoc

View workflow job for this annotation

GitHub Actions / Linting with Vale

[vale] reported by reviewdog 🐶 [Quarkus.Fluff] Depending on the context, consider using 'Be concise: use 'to' rather than' rather than 'In order to'. Raw Output: {"message": "[Quarkus.Fluff] Depending on the context, consider using 'Be concise: use 'to' rather than' rather than 'In order to'.", "location": {"path": "docs/src/main/asciidoc/elasticsearch.adoc", "range": {"start": {"line": 268, "column": 5}}}, "severity": "INFO"}

Check warning on line 268 in docs/src/main/asciidoc/elasticsearch.adoc

View workflow job for this annotation

GitHub Actions / Linting with Vale

[vale] reported by reviewdog 🐶 [Quarkus.Spelling] Use correct American English spelling. Did you really mean 'deserialize'? Raw Output: {"message": "[Quarkus.Spelling] Use correct American English spelling. Did you really mean 'deserialize'?", "location": {"path": "docs/src/main/asciidoc/elasticsearch.adoc", "range": {"start": {"line": 268, "column": 17}}}, "severity": "WARNING"}

Now, create the `org.acme.elasticsearch.FruitResource` class as follows:

Check warning on line 270 in docs/src/main/asciidoc/elasticsearch.adoc

View workflow job for this annotation

GitHub Actions / Linting with Vale

[vale] reported by reviewdog 🐶 [Quarkus.TermsSuggestions] Depending on the context, consider using 'because' or 'while' rather than 'as'. Raw Output: {"message": "[Quarkus.TermsSuggestions] Depending on the context, consider using 'because' or 'while' rather than 'as'.", "location": {"path": "docs/src/main/asciidoc/elasticsearch.adoc", "range": {"start": {"line": 270, "column": 62}}}, "severity": "INFO"}

[source,java]
----
package org.acme.elasticsearch;
import jakarta.inject.Inject;
import jakarta.ws.rs.BadRequestException;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.core.Response;
import java.io.IOException;
import java.net.URI;
import java.util.List;
import java.util.UUID;
import org.jboss.resteasy.reactive.RestQuery;
import jakarta.inject.Inject;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.DELETE;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.PathParam;
import jakarta.ws.rs.QueryParam;
import jakarta.ws.rs.core.Response;
import jakarta.ws.rs.BadRequestException;
@Path("/fruits")
public class FruitResource {
Expand All @@ -233,6 +303,20 @@ public class FruitResource {
return Response.created(URI.create("/fruits/" + fruit.id)).build();
}
@Path("bulk")
@DELETE
public Response delete(List<String> identityList) throws IOException {
fruitService.delete(identityList);
return Response.ok().build();
}
@Path("bulk")
@POST
public Response index(List<Fruit> list) throws IOException {
fruitService.index(list);
return Response.ok().build();
}
@GET
@Path("/{id}")
public Fruit get(String id) throws IOException {
Expand Down Expand Up @@ -384,18 +468,26 @@ Here is a version of the `FruitService` using the Elasticsearch Java Client inst

[source,java]
----
import java.io.IOException;
import java.io.StringReader;
import java.util.List;
import java.util.stream.Collectors;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.elasticsearch.core.IndexRequest;
import co.elastic.clients.elasticsearch._types.FieldValue;
import co.elastic.clients.elasticsearch._types.query_dsl.QueryBuilders;
import co.elastic.clients.elasticsearch.core.*;
import co.elastic.clients.elasticsearch.core.search.Hit;
import co.elastic.clients.elasticsearch.core.search.HitsMetadata;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import org.acme.elasticsearch.Fruit;
import java.io.IOException;
import java.util.List;
import java.util.stream.Collectors;
import co.elastic.clients.elasticsearch.core.GetRequest;
import co.elastic.clients.elasticsearch.core.GetResponse;
import co.elastic.clients.elasticsearch.core.SearchRequest;
import co.elastic.clients.elasticsearch.core.SearchResponse;
import co.elastic.clients.elasticsearch.core.BulkRequest;
import co.elastic.clients.elasticsearch.core.BulkResponse;
@ApplicationScoped
public class FruitService {
Expand All @@ -410,6 +502,37 @@ public class FruitService {
client.index(request); // <4>
}
public void index(List<Fruit> list) throws IOException {
BulkRequest.Builder br = new BulkRequest.Builder();
for (var fruit : list) {
br.operations(op -> op
.index(idx -> idx.index("fruits").id(fruit.id).document(fruit)));
}
BulkResponse result = client.bulk(br.build());
if (result.errors()) {
throw new RuntimeException("The indexing operation encountered errors.");
}
}
public void delete(List<String> list) throws IOException {
BulkRequest.Builder br = new BulkRequest.Builder();
for (var id : list) {
br.operations(op -> op.delete(idx -> idx.index("fruits").id(id)));
}
BulkResponse result = client.bulk(br.build());
if (result.errors()) {
throw new RuntimeException("The indexing operation encountered errors.");
}
}
public Fruit get(String id) throws IOException {
GetRequest getRequest = GetRequest.of(
b -> b.index("fruits")
Expand Down
10 changes: 10 additions & 0 deletions integration-tests/elasticsearch-java-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,16 @@
<artifactId>rest-assured</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<scope>test</scope>
</dependency>

<!-- Minimal test dependencies to *-deployment artifacts for consistent build order -->
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import jakarta.inject.Inject;
import jakarta.ws.rs.BadRequestException;
import jakarta.ws.rs.DELETE;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
Expand Down Expand Up @@ -54,4 +55,18 @@ public List<Fruit> searchUnsafe(@QueryParam("json") String json) throws IOExcept
return fruitService.searchWithJson(json);
}

@Path("bulk")
@DELETE
public Response delete(List<String> identityList) throws IOException {
fruitService.delete(identityList);
return Response.ok().build();
}

@Path("bulk")
@POST
public Response index(List<Fruit> list) throws IOException {
fruitService.index(list);
return Response.ok().build();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,14 @@
import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.elasticsearch._types.FieldValue;
import co.elastic.clients.elasticsearch._types.query_dsl.QueryBuilders;
import co.elastic.clients.elasticsearch.core.*;
import co.elastic.clients.elasticsearch.core.BulkRequest;
import co.elastic.clients.elasticsearch.core.BulkResponse;
import co.elastic.clients.elasticsearch.core.GetRequest;
import co.elastic.clients.elasticsearch.core.GetResponse;
import co.elastic.clients.elasticsearch.core.IndexRequest;
import co.elastic.clients.elasticsearch.core.SearchRequest;
import co.elastic.clients.elasticsearch.core.SearchResponse;
import co.elastic.clients.elasticsearch.core.search.Hit;
import co.elastic.clients.elasticsearch.core.search.HitsMetadata;

@ApplicationScoped
Expand Down Expand Up @@ -53,7 +60,7 @@ private List<Fruit> search(String term, String match) throws IOException {

SearchResponse<Fruit> searchResponse = client.search(searchRequest, Fruit.class);
HitsMetadata<Fruit> hits = searchResponse.hits();
return hits.hits().stream().map(hit -> hit.source()).collect(Collectors.toList());
return hits.hits().stream().map(Hit::source).collect(Collectors.toList());
}

public List<Fruit> searchWithJson(String json) throws IOException {
Expand All @@ -63,6 +70,37 @@ public List<Fruit> searchWithJson(String json) throws IOException {
}
SearchResponse<Fruit> searchResponse = client.search(searchRequest, Fruit.class);
HitsMetadata<Fruit> hits = searchResponse.hits();
return hits.hits().stream().map(hit -> hit.source()).collect(Collectors.toList());
return hits.hits().stream().map(Hit::source).collect(Collectors.toList());
}

public void index(List<Fruit> list) throws IOException {

BulkRequest.Builder br = new BulkRequest.Builder();

for (var fruit : list) {
br.operations(op -> op
.index(idx -> idx.index("fruits").id(fruit.id).document(fruit)));
}

BulkResponse result = client.bulk(br.build());

if (result.errors()) {
throw new RuntimeException("The indexing operation encountered errors.");
}
}

public void delete(List<String> list) throws IOException {

BulkRequest.Builder br = new BulkRequest.Builder();

for (var id : list) {
br.operations(op -> op.delete(idx -> idx.index("fruits").id(id)));
}

BulkResponse result = client.bulk(br.build());

if (result.errors()) {
throw new RuntimeException("The indexing operation encountered errors.");
}
}
}
Loading

0 comments on commit cb2cc54

Please sign in to comment.