diff --git a/src/main/java/eu/openanalytics/containerproxy/log/S3LogStorage.java b/src/main/java/eu/openanalytics/containerproxy/log/S3LogStorage.java index 1e2cb5c0..5dddab29 100644 --- a/src/main/java/eu/openanalytics/containerproxy/log/S3LogStorage.java +++ b/src/main/java/eu/openanalytics/containerproxy/log/S3LogStorage.java @@ -66,10 +66,17 @@ public void initialize() throws IOException { String endpoint = environment.getProperty("proxy.container-log-s3-endpoint", "https://s3-eu-west-1.amazonaws.com"); enableSSE = Boolean.valueOf(environment.getProperty("proxy.container-log-s3-sse", "false")); - String subPath = containerLogPath.substring("s3://".length()); + String subPath = containerLogPath.substring("s3://".length()).trim(); + if (subPath.endsWith("/")) subPath = subPath.substring(0, subPath.length() - 1); + int bucketPathIndex = subPath.indexOf("/"); - bucketName = subPath.substring(0, bucketPathIndex); - bucketPath = subPath.substring(bucketPathIndex + 1); + if (bucketPathIndex == -1) { + bucketName = subPath; + bucketPath = ""; + } else { + bucketName = subPath.substring(0, bucketPathIndex); + bucketPath = subPath.substring(bucketPathIndex + 1) + "/"; + } s3 = AmazonS3ClientBuilder.standard() .withEndpointConfiguration(new EndpointConfiguration(endpoint, null)) @@ -87,7 +94,8 @@ public OutputStream[] createOutputStreams(Proxy proxy) throws IOException { OutputStream[] streams = new OutputStream[2]; for (int i = 0; i < streams.length; i++) { String fileName = paths[i].substring(paths[i].lastIndexOf("/") + 1); - streams[i] = new BufferedOutputStream(new S3OutputStream(bucketPath + "/" + fileName), 1024*1024); + // TODO kubernetes never flushes. So perform timed flushes, and also flush upon container shutdown + streams[i] = new BufferedOutputStream(new S3OutputStream(bucketPath + fileName), 1024*1024); } return streams; } diff --git a/src/main/java/eu/openanalytics/containerproxy/service/LogService.java b/src/main/java/eu/openanalytics/containerproxy/service/LogService.java index 7582b79d..b172ea4d 100644 --- a/src/main/java/eu/openanalytics/containerproxy/service/LogService.java +++ b/src/main/java/eu/openanalytics/containerproxy/service/LogService.java @@ -46,6 +46,8 @@ public class LogService { private boolean loggingEnabled; private Logger log = LogManager.getLogger(LogService.class); + private static final String PARAM_STREAMS = "streams"; + @Inject Environment environment; @@ -85,15 +87,36 @@ public void attachToOutput(Proxy proxy, BiConsumer o if (streams == null || streams.length < 2) { log.error("Failed to attach logging of proxy " + proxy.getId() + ": no output streams defined"); } else { + proxy.getContainers().get(0).getParameters().put(PARAM_STREAMS, streams); + if (log.isDebugEnabled()) log.debug("Container logging started for proxy " + proxy.getId()); // Note that this call will block until the container is stopped. outputAttacher.accept(streams[0], streams[1]); } } catch (Exception e) { log.error("Failed to attach logging of proxy " + proxy.getId(), e); } + if (log.isDebugEnabled()) log.debug("Container logging ended for proxy " + proxy.getId()); }); } + public void detach(Proxy proxy) { + if (!isLoggingEnabled()) return; + + OutputStream[] streams = (OutputStream[]) proxy.getContainers().get(0).getParameters().get(PARAM_STREAMS); + if (streams == null || streams.length < 2) { + log.warn("Cannot detach container logging: streams not found"); + return; + } + for (int i = 0; i < streams.length; i++) { + try { + streams[i].flush(); + streams[i].close(); + } catch (IOException e) { + log.error("Failed to close container logging streams", e); + } + } + } + public String[] getLogs(Proxy proxy) { if (!isLoggingEnabled()) return null; diff --git a/src/main/java/eu/openanalytics/containerproxy/service/ProxyService.java b/src/main/java/eu/openanalytics/containerproxy/service/ProxyService.java index 66ef8785..0ca663f5 100644 --- a/src/main/java/eu/openanalytics/containerproxy/service/ProxyService.java +++ b/src/main/java/eu/openanalytics/containerproxy/service/ProxyService.java @@ -251,6 +251,7 @@ public void stopProxy(Proxy proxy, boolean async, boolean ignoreAccessControl) { Runnable releaser = () -> { try { backend.stopProxy(proxy); + logService.detach(proxy); log.info(String.format("Proxy released [user: %s] [spec: %s] [id: %s]", proxy.getUserId(), proxy.getSpec().getId(), proxy.getId())); eventService.post(EventType.ProxyStop.toString(), proxy.getUserId(), proxy.getSpec().getId()); } catch (Exception e){