Skip to content

Commit

Permalink
Support non-scripting field updates. Add tests for updating large str…
Browse files Browse the repository at this point in the history
…ing content.

Signed-off-by: sjudeng <[email protected]>
  • Loading branch information
sjudeng committed Apr 7, 2017
1 parent 12dcf21 commit 83a1da2
Show file tree
Hide file tree
Showing 5 changed files with 88 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,15 @@

public class ElasticSearchConstants {

private static final Logger log = LoggerFactory.getLogger(ElasticSearchConstants.class);

public static final String ES_PROPERTIES_FILE = "janusgraph-es.properties";
public static final String ES_DOC_KEY = "doc";
public static final String ES_SCRIPT_KEY = "script";
public static final String ES_INLINE_KEY = "inline";
public static final String ES_LANG_KEY = "lang";
public static final String ES_VERSION_EXPECTED;

private static final Logger log = LoggerFactory.getLogger(ElasticSearchConstants.class);


static {
Properties props;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@
import org.janusgraph.diskstorage.configuration.ConfigNamespace;
import org.janusgraph.diskstorage.configuration.ConfigOption;
import org.janusgraph.diskstorage.configuration.Configuration;
import static org.janusgraph.diskstorage.es.ElasticSearchConstants.ES_DOC_KEY;
import static org.janusgraph.diskstorage.es.ElasticSearchConstants.ES_INLINE_KEY;
import static org.janusgraph.diskstorage.es.ElasticSearchConstants.ES_LANG_KEY;
import static org.janusgraph.diskstorage.es.ElasticSearchConstants.ES_SCRIPT_KEY;
import org.janusgraph.diskstorage.indexing.IndexEntry;
import org.janusgraph.diskstorage.indexing.IndexFeatures;
import org.janusgraph.diskstorage.indexing.IndexMutation;
Expand Down Expand Up @@ -478,7 +482,8 @@ public void mutate(Map<String, Map<String, IndexMutation>> mutations, KeyInforma
requests.add(ElasticSearchMutation.createDeleteRequest(indexName, storename, docid));
} else {
String script = getDeletionScript(informations, storename, mutation);
requests.add(ElasticSearchMutation.createUpdateRequest(indexName, storename, docid, script, scriptLang));
Map<String,Object> doc = ImmutableMap.of(ES_SCRIPT_KEY, ImmutableMap.of(ES_INLINE_KEY, script, ES_LANG_KEY, scriptLang));
requests.add(ElasticSearchMutation.createUpdateRequest(indexName, storename, docid, doc));
log.trace("Adding script {}", script);
}
}
Expand All @@ -488,16 +493,27 @@ public void mutate(Map<String, Map<String, IndexMutation>> mutations, KeyInforma
Map<String, Object> source = getNewDocument(mutation.getAdditions(), informations.get(storename));
requests.add(ElasticSearchMutation.createIndexRequest(indexName, storename, docid, source));
} else {
boolean needUpsert = !mutation.hasDeletions();
String script = getAdditionScript(informations, storename, mutation);
if (needUpsert) {
Map doc = getNewDocument(mutation.getAdditions(), informations.get(storename));
requests.add(ElasticSearchMutation.createUpdateRequest(indexName, storename, docid, script, scriptLang, doc));
final Map upsert;
if (!mutation.hasDeletions()) {
upsert = getNewDocument(mutation.getAdditions(), informations.get(storename));
} else {
requests.add(ElasticSearchMutation.createUpdateRequest(indexName, storename, docid, script, scriptLang));
upsert = null;
}

log.trace("Adding script {}", script);
String inline = getAdditionScript(informations, storename, mutation);
if (!inline.isEmpty()) {
Map script = ImmutableMap.of(ES_INLINE_KEY, inline, ES_LANG_KEY, scriptLang);
final ImmutableMap.Builder builder = ImmutableMap.builder().put(ES_SCRIPT_KEY, script);
requests.add(ElasticSearchMutation.createUpdateRequest(indexName, storename, docid, builder, upsert));
log.trace("Adding script {}", inline);
}

Map<String,Object> doc = getAdditionDoc(informations, storename, mutation);
if (!doc.isEmpty()) {
final ImmutableMap.Builder builder = ImmutableMap.builder().put(ES_DOC_KEY, doc);
requests.add(ElasticSearchMutation.createUpdateRequest(indexName, storename, docid, builder, upsert));
log.trace("Adding update {}", doc);
}
}
}

Expand Down Expand Up @@ -543,12 +559,6 @@ private String getAdditionScript(KeyInformation.IndexRetriever informations, Str
for (IndexEntry e : mutation.getAdditions()) {
KeyInformation keyInformation = informations.get(storename).get(e.field);
switch (keyInformation.getCardinality()) {
case SINGLE:
script.append("ctx._source[\"" + e.field + "\"] = " + convertToJsType(e.value, scriptLang) + ";");
if (hasDualStringMapping(keyInformation)) {
script.append("ctx._source[\"" + getDualMappingName(e.field) + "\"] = " + convertToJsType(e.value, scriptLang) + ";");
}
break;
case SET:
case LIST:
script.append("ctx._source[\"" + e.field + "\"].add(" + convertToJsType(e.value, scriptLang) + ");");
Expand All @@ -563,6 +573,21 @@ private String getAdditionScript(KeyInformation.IndexRetriever informations, Str
return script.toString();
}

private Map<String,Object> getAdditionDoc(KeyInformation.IndexRetriever informations, String storename, IndexMutation mutation) throws PermanentBackendException {
Map<String,Object> doc = new HashMap<>();
for (IndexEntry e : mutation.getAdditions()) {
KeyInformation keyInformation = informations.get(storename).get(e.field);
if (keyInformation.getCardinality() == Cardinality.SINGLE) {
doc.put(e.field, convertToEsType(e.value));
if (hasDualStringMapping(keyInformation)) {
doc.put(getDualMappingName(e.field), convertToEsType(e.value));
}
}
}

return doc;
}

private static String convertToJsType(Object value, String scriptLang) throws PermanentBackendException {
try {
XContentBuilder builder = XContentFactory.jsonBuilder().startObject();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,13 @@ public static ElasticSearchMutation createIndexRequest(String index, String type
return new ElasticSearchMutation(RequestType.INDEX, index, type, id, source);
}

public static ElasticSearchMutation createUpdateRequest(String index, String type, String id, String script, String lang) {
return new ElasticSearchMutation(RequestType.UPDATE, index, type, id, ImmutableMap.of("script", ImmutableMap.of("inline", script, "lang", lang)));
public static ElasticSearchMutation createUpdateRequest(String index, String type, String id, Map source) {
return new ElasticSearchMutation(RequestType.UPDATE, index, type, id, source);
}

public static ElasticSearchMutation createUpdateRequest(String index, String type, String id, String script, String lang, Map upsert) {
return new ElasticSearchMutation(RequestType.UPDATE, index, type, id, ImmutableMap.of("script", ImmutableMap.of("inline", script, "lang", lang), "upsert", upsert));
public static ElasticSearchMutation createUpdateRequest(String index, String type, String id, ImmutableMap.Builder builder, Map upsert) {
final Map source = upsert == null ? builder.build() : builder.put("upsert", upsert).build();
return new ElasticSearchMutation(RequestType.UPDATE, index, type, id, source);
}

public RequestType getRequestType() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,9 +119,16 @@ public void bulkRequest(List<ElasticSearchMutation> requests) throws IOException
brb.add(new IndexRequest(indexName, type, id).source(request.getSource()));
break;
} case UPDATE: {
String inline = ((Map<String, String>) request.getSource().get("script")).get("inline");
Script script = new Script(inline, ScriptService.ScriptType.INLINE, null, null);
UpdateRequestBuilder update = client.prepareUpdate(indexName, type, id).setScript(script);
UpdateRequestBuilder update = client.prepareUpdate(indexName, type, id);
if (request.getSource().containsKey("script")) {
Map<String,String> script = ((Map<String, String>) request.getSource().get("script"));
String inline = script.get("inline");
String lang = script.get("lang");
update.setScript(new Script(inline, ScriptService.ScriptType.INLINE, lang, null));
}
if (request.getSource().containsKey("doc")) {
update.setDoc((Map) request.getSource().get("doc"));
}
if (request.getSource().containsKey("upsert")) {
update.setUpsert((Map) request.getSource().get("upsert"));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.google.common.base.Throwables;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;
import org.apache.commons.lang.RandomStringUtils;
import org.janusgraph.core.Cardinality;
import org.janusgraph.core.JanusGraphException;
import org.janusgraph.core.schema.Parameter;
Expand Down Expand Up @@ -164,4 +165,30 @@ public void testUnescapedDollarInSet() throws Exception {
assertEquals("unescaped", tx.query(new IndexQuery("vertex", PredicateCondition.of(PHONE_SET, Cmp.EQUAL, "$123"))).get(0));
assertEquals("unescaped", tx.query(new IndexQuery("vertex", PredicateCondition.of(PHONE_SET, Cmp.EQUAL, "12345"))).get(0));
}

/**
* Test adding and overwriting with long string content.
*
*/
@Test
public void testUpdateAdditionWithLongString() throws Exception {
initialize("vertex");
Multimap<String, Object> initialDoc = HashMultimap.create();
initialDoc.put(TEXT, RandomStringUtils.randomAlphanumeric(500000) + " bob " + RandomStringUtils.randomAlphanumeric(500000));

add("vertex", "long", initialDoc, true);

clopen();

assertEquals(1, tx.query(new IndexQuery("vertex", PredicateCondition.of(TEXT, Text.CONTAINS, "bob"))).size());
assertEquals(0, tx.query(new IndexQuery("vertex", PredicateCondition.of(TEXT, Text.CONTAINS, "world"))).size());

tx.add("vertex", "long", TEXT, RandomStringUtils.randomAlphanumeric(500000) + " world " + RandomStringUtils.randomAlphanumeric(500000), false);

clopen();

assertEquals(0, tx.query(new IndexQuery("vertex", PredicateCondition.of(TEXT, Text.CONTAINS, "bob"))).size());
assertEquals(1, tx.query(new IndexQuery("vertex", PredicateCondition.of(TEXT, Text.CONTAINS, "world"))).size());
}

}

0 comments on commit 83a1da2

Please sign in to comment.