Skip to content

Commit

Permalink
Read whole response into memory to prevent locking due to too many op…
Browse files Browse the repository at this point in the history
…en connections.
  • Loading branch information
kenwenzel committed Jul 18, 2023
1 parent 989e057 commit 542745e
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.io.ByteStreams;
import io.github.linkedfactory.kvin.Kvin;
import io.github.linkedfactory.kvin.KvinListener;
import io.github.linkedfactory.kvin.KvinTuple;
Expand All @@ -26,13 +27,17 @@
import org.apache.http.impl.client.HttpClients;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import org.apache.http.util.EntityUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.*;

public class KvinHttp implements Kvin {
private static Logger logger = LoggerFactory.getLogger(KvinHttp.class);

String hostEndpoint;
ArrayList<KvinListener> listeners = new ArrayList<>();
Expand All @@ -46,7 +51,7 @@ public KvinHttp(String hostEndpoint) {
}

public CloseableHttpClient getHttpClient() {
return HttpClients.custom().setConnectionManager(new PoolingHttpClientConnectionManager()).build();
return HttpClients.createDefault();
}

@Override
Expand Down Expand Up @@ -161,6 +166,7 @@ public IExtendedIterator<KvinTuple> fetch(URI item, URI property, URI context, l
}

private IExtendedIterator<KvinTuple> fetchInternal(URI item, URI property, URI context, Long end, Long begin, Long limit, Long interval, String op) {
InputStream content = null;
try {
// building url
URIBuilder uriBuilder = new URIBuilder(this.hostEndpoint + "/values");
Expand All @@ -183,9 +189,18 @@ private IExtendedIterator<KvinTuple> fetchInternal(URI item, URI property, URI c
}

// converting json to kvin tuples
JsonFormatParser jsonParser = new JsonFormatParser(entity.getContent());
// TODO directly read from stream with pooled HTTP client
content = entity.getContent();
JsonFormatParser jsonParser = new JsonFormatParser(new ByteArrayInputStream(ByteStreams.toByteArray(content)));
return jsonParser.parse();
} catch (Exception e) {
if (content != null) {
try {
content.close();
} catch (IOException ioe) {
logger.error("Error while closing input stream", ioe);
}
}
throw new RuntimeException(e);
}
}
Expand Down Expand Up @@ -232,7 +247,7 @@ private IExtendedIterator<URI> descendantsInternal(URI item, Long limit) {

// converting json to URI
return new NiceIterator<>() {
final JsonParser jsonParser = jsonFactory.createParser(entity.getContent());
JsonParser jsonParser = jsonFactory.createParser(new ByteArrayInputStream(ByteStreams.toByteArray(entity.getContent())));

@Override
public boolean hasNext() {
Expand Down Expand Up @@ -267,9 +282,13 @@ private boolean isLoopingArray() throws IOException {
@Override
public void close() {
try {
jsonParser.close();
if (jsonParser != null) {
jsonParser.close();
jsonParser = null;
}
} catch (IOException e) {
// ignore
logger.error("Exception while closing JSON parser", e);
}
}
};
Expand All @@ -296,7 +315,7 @@ public IExtendedIterator<URI> properties(URI item) {

// converting json to URI
return new NiceIterator<>() {
final JsonParser jsonParser = jsonFactory.createParser(entity.getContent());
JsonParser jsonParser = jsonFactory.createParser(new ByteArrayInputStream(ByteStreams.toByteArray(entity.getContent())));

@Override
public boolean hasNext() {
Expand Down Expand Up @@ -331,9 +350,13 @@ private boolean isLoopingArray() throws IOException {
@Override
public void close() {
try {
jsonParser.close();
if (jsonParser != null) {
jsonParser.close();
jsonParser = null;
}
} catch (IOException e) {
// ignore
logger.error("Exception while closing JSON parser", e);
}
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
import net.enilink.commons.iterator.NiceIterator;
import net.enilink.komma.core.URI;
import net.enilink.komma.core.URIs;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.InputStream;
Expand All @@ -23,10 +25,11 @@
import java.util.*;

public class JsonFormatParser {
final static Logger logger = LoggerFactory.getLogger(JsonFormatParser.class);
final static JsonFactory jsonFactory = new JsonFactory().configure(Feature.AUTO_CLOSE_SOURCE, true);
final JsonParser jsonParser;
final static ObjectMapper mapper = new ObjectMapper()
.configure(JsonGenerator.Feature.WRITE_BIGDECIMAL_AS_PLAIN, true);
JsonParser jsonParser;

public JsonFormatParser(InputStream content) throws IOException {
jsonParser = jsonFactory.createParser(content);
Expand Down Expand Up @@ -67,7 +70,17 @@ public boolean hasNext() {
} else {
return isTuplesAlreadyGenerated();
}
} catch (IOException e) {
} catch (Exception e) {
logger.error("Exception while parsing", e);
try {
if (jsonParser != null) {
jsonParser.close();
jsonParser = null;
}
} catch (IOException ioe) {
// ignore
logger.error("Exception while closing JSON parser", ioe);
}
throw new RuntimeException(e);
}
}
Expand Down Expand Up @@ -110,7 +123,8 @@ private boolean addNextProperty() throws IOException {
}

private boolean addNextPropertyItem(URI item, URI property) throws IOException {
while (jsonParser.nextToken() != JsonToken.END_ARRAY) {
JsonToken token;
while ((token = jsonParser.nextToken()) != JsonToken.END_ARRAY && token != null) {
isLoopingPropertyItems = true;
if (jsonParser.getCurrentToken() == JsonToken.START_OBJECT) {
JsonNode node = mapper.readTree(jsonParser);
Expand All @@ -134,9 +148,13 @@ private boolean isTuplesAlreadyGenerated() {
@Override
public void close() {
try {
jsonParser.close();
if (jsonParser != null) {
jsonParser.close();
jsonParser = null;
}
} catch (IOException e) {
// ignore
logger.error("Exception while closing JSON parser", e);
}
}
};
Expand Down

0 comments on commit 542745e

Please sign in to comment.