diff --git a/core/pom.xml b/core/pom.xml index d45e5333d..363218509 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -76,5 +76,11 @@ org.slf4j jcl-over-slf4j - + + org.lz4 + lz4-java + 1.8.0 + compile + + \ No newline at end of file diff --git a/core/src/main/java/com/ctrip/xpipe/spring/LZ4DecompressionInterceptor.java b/core/src/main/java/com/ctrip/xpipe/spring/LZ4DecompressionInterceptor.java new file mode 100644 index 000000000..bc2961515 --- /dev/null +++ b/core/src/main/java/com/ctrip/xpipe/spring/LZ4DecompressionInterceptor.java @@ -0,0 +1,52 @@ +package com.ctrip.xpipe.spring; + +import net.jpountz.lz4.LZ4Factory; +import net.jpountz.lz4.LZ4FastDecompressor; +import net.jpountz.lz4.LZ4SafeDecompressor; +import org.apache.http.Header; +import org.apache.http.HttpException; +import org.apache.http.HttpResponse; +import org.apache.http.HttpResponseInterceptor; +import org.apache.http.entity.ByteArrayEntity; +import org.apache.http.entity.InputStreamEntity; +import org.apache.http.protocol.HttpContext; + +import net.jpountz.lz4.LZ4FrameInputStream; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; + +public class LZ4DecompressionInterceptor implements HttpResponseInterceptor { + + private static LZ4Factory factory = LZ4Factory.fastestInstance(); + + @Override + public void process(HttpResponse response, HttpContext context) throws HttpException, IOException { + Header head = response.getFirstHeader("Content-Encoding"); + if (head == null) { + return; + } + String encoding = head.getValue(); + if ("lz4".equalsIgnoreCase(encoding)) { + // 获取响应实体 + InputStream entityStream = response.getEntity().getContent(); + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + byte[] buffer = new byte[1024]; + int bytesRead; + while ((bytesRead = entityStream.read(buffer)) != -1) { + outputStream.write(buffer, 0, bytesRead); + } + + byte[] compressed = outputStream.toByteArray(); + + LZ4SafeDecompressor decompressor = factory.safeDecompressor(); + byte[] deCompressedData = decompressor.decompress(compressed, compressed.length * 20); + + // 将解压缩后的数据设置回响应实体 + response.setEntity(new ByteArrayEntity(deCompressedData)); + + } + } + +} diff --git a/core/src/main/java/com/ctrip/xpipe/spring/RestTemplateFactory.java b/core/src/main/java/com/ctrip/xpipe/spring/RestTemplateFactory.java index 41ee4dd59..043e7fe69 100644 --- a/core/src/main/java/com/ctrip/xpipe/spring/RestTemplateFactory.java +++ b/core/src/main/java/com/ctrip/xpipe/spring/RestTemplateFactory.java @@ -83,6 +83,7 @@ public static RestOperations createCommonsHttpRestTemplate(int maxConnPerRoute, .setMaxConnTotal(maxConnTotal) .setDefaultSocketConfig(SocketConfig.custom().setSoTimeout(soTimeout).build()) .setDefaultRequestConfig(RequestConfig.custom().setConnectTimeout(connectTimeout).build()) + .addInterceptorLast(new LZ4DecompressionInterceptor()) .build(); ClientHttpRequestFactory factory = new HttpComponentsClientHttpRequestFactory(httpClient) { @Override diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/config/impl/DataCenterConfigBean.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/config/impl/DataCenterConfigBean.java index dc4177418..f7316e27a 100644 --- a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/config/impl/DataCenterConfigBean.java +++ b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/config/impl/DataCenterConfigBean.java @@ -35,6 +35,8 @@ public class DataCenterConfigBean extends AbstractConfigBean { public static final String KEY_CONSOLE_NO_DB_DOMAIN = "console.no.db.domain"; + public static final String KEY_HTTP_ACCEPT_ENCODING = "http.accept.encoding"; + private AtomicReference zkConnection = new AtomicReference<>(); private AtomicReference zkNameSpace = new AtomicReference<>(); @@ -88,4 +90,9 @@ public Map getConsoleDomains() { public String getBeaconOrgRoutes() { return getProperty(KEY_BEACON_ORG_ROUTE, "[]"); } + + public String getHttpAcceptEncoding() { + return getProperty(KEY_HTTP_ACCEPT_ENCODING, null); + } + } diff --git a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/config/ConsoleConfig.java b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/config/ConsoleConfig.java index ca8f7e3bb..091df6e7b 100644 --- a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/config/ConsoleConfig.java +++ b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/config/ConsoleConfig.java @@ -156,4 +156,6 @@ public interface ConsoleConfig extends CoreConfig, CheckerConfig, AlertConfig { String getConsoleNoDbDomain(); + String getHttpAcceptEncoding(); + } diff --git a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/config/impl/DefaultConsoleConfig.java b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/config/impl/DefaultConsoleConfig.java index 700841513..eadb46bff 100644 --- a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/config/impl/DefaultConsoleConfig.java +++ b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/config/impl/DefaultConsoleConfig.java @@ -582,6 +582,11 @@ public String getConsoleNoDbDomain() { return dataCenterConfigBean.getConsoleNoDbDomain(); } + @Override + public String getHttpAcceptEncoding() { + return dataCenterConfigBean.getHttpAcceptEncoding(); + } + @Override public void addListener(ConfigKeyListener listener) { this.listenersSet.add(listener); diff --git a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/controller/api/checker/ConsoleCheckerController.java b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/controller/api/checker/ConsoleCheckerController.java index 64b78caed..89959f429 100644 --- a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/controller/api/checker/ConsoleCheckerController.java +++ b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/controller/api/checker/ConsoleCheckerController.java @@ -105,16 +105,17 @@ public String getDividedMeta(@RequestParam(value="format", required = false) Str } @GetMapping(ConsoleCheckerPath.PATH_GET_ALL_META_LONG_PULL) - public DeferredResult getDividedMetaLongPull(@RequestParam(value="format", required = false) String format, + public DeferredResult getDividedMetaLongPull(@RequestParam(value="format", required = false) String format, @RequestParam(value="version") long version) { - DeferredResult response = new DeferredResult<>(Long.valueOf(consoleConfig.getServletMethodTimeoutMilli())); + DeferredResult response = new DeferredResult<>(Long.valueOf(consoleConfig.getServletMethodTimeoutMilli())); executors.execute(new Runnable() { @Override public void run() { try { XpipeMeta xpipeMeta = metaCache.getXpipeMetaLongPull(version); - response.setResult((format != null && format.equals("xml"))? xpipeMeta.toString() : coder.encode(xpipeMeta)); + String result = format != null && format.equals("xml")? xpipeMeta.toString() : coder.encode(xpipeMeta); + response.setResult(result.getBytes()); } catch (InterruptedException e) { response.setErrorResult(e.getMessage()); } diff --git a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/controller/api/metaserver/ConsoleController.java b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/controller/api/metaserver/ConsoleController.java index aa8633c68..7508783e8 100644 --- a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/controller/api/metaserver/ConsoleController.java +++ b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/controller/api/metaserver/ConsoleController.java @@ -51,7 +51,7 @@ public class ConsoleController extends AbstractConsoleController { private KeeperContainerService keeperContainerService; @RequestMapping(value = "/dc/{dcId}", method = RequestMethod.GET, produces={MediaType.APPLICATION_JSON_UTF8_VALUE}) - public String getDcMeta(@PathVariable String dcId, @RequestParam(value="format", required = false) String format, + public byte[] getDcMeta(@PathVariable String dcId, @RequestParam(value="format", required = false) String format, @RequestParam(value ="types", required = false) Set types) throws Exception { DcMeta dcMeta; Set upperCaseTypes = types == null ? Collections.emptySet() @@ -79,7 +79,8 @@ public String getDcMeta(@PathVariable String dcId, @RequestParam(value="format", } }); toRemoveClusters.forEach(clusterName -> dcMeta.getClusters().remove(clusterName)); - return (format != null && format.equals("xml"))? dcMeta.toString() : coder.encode(dcMeta); + String res = (format != null && format.equals("xml"))? dcMeta.toString() : coder.encode(dcMeta); + return res.getBytes(); } @RequestMapping(value = "/dc/{dcId}/cluster/{clusterId}", method = RequestMethod.GET, produces={MediaType.APPLICATION_JSON_UTF8_VALUE}) diff --git a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/controller/config/LZ4CompressionResponseBodyAdvice.java b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/controller/config/LZ4CompressionResponseBodyAdvice.java new file mode 100644 index 000000000..fee72d96a --- /dev/null +++ b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/controller/config/LZ4CompressionResponseBodyAdvice.java @@ -0,0 +1,55 @@ +package com.ctrip.xpipe.redis.console.controller.config; + +import net.jpountz.lz4.LZ4Compressor; +import net.jpountz.lz4.LZ4Factory; +import org.springframework.core.MethodParameter; +import org.springframework.http.HttpHeaders; +import org.springframework.http.MediaType; +import org.springframework.http.converter.HttpMessageConverter; +import org.springframework.http.server.ServerHttpRequest; +import org.springframework.http.server.ServerHttpResponse; +import org.springframework.web.bind.annotation.RestControllerAdvice; +import org.springframework.web.context.request.RequestContextHolder; +import org.springframework.web.context.request.ServletRequestAttributes; +import org.springframework.web.servlet.mvc.method.annotation.ResponseBodyAdvice; + +import javax.servlet.http.HttpServletRequest; +import java.util.Arrays; + +@RestControllerAdvice +public class LZ4CompressionResponseBodyAdvice implements ResponseBodyAdvice { + + private static LZ4Factory factory = LZ4Factory.fastestInstance(); + + @Override + public boolean supports(MethodParameter methodParameter, Class> aClass) { + + if(!methodParameter.getParameterType().equals(byte[].class)) { + return false; + } + // 获取请求头 + HttpServletRequest request = ((ServletRequestAttributes) RequestContextHolder.getRequestAttributes()).getRequest(); + String encode = request.getHeader(HttpHeaders.ACCEPT_ENCODING); + // 检查请求头中是否包含 Content-Encoding: lz4 + return encode!= null && encode.contains("lz4"); + } + + @Override + public byte[] beforeBodyWrite(byte[] body, MethodParameter methodParameter, MediaType mediaType, Class> aClass, ServerHttpRequest serverHttpRequest, ServerHttpResponse serverHttpResponse) { + + LZ4Compressor compressor = factory.fastCompressor(); + + // 压缩字节数组 + int maxCompressedLength = compressor.maxCompressedLength(body.length); + byte[] compressedBytes = new byte[maxCompressedLength]; + int compressedLength = compressor.compress(body, 0, body.length, compressedBytes, 0, maxCompressedLength); + + // 设置 Content-Encoding 头部为 lz4 + serverHttpResponse.getHeaders().set("Content-Encoding", "lz4"); + serverHttpResponse.getHeaders().setContentLength(compressedLength); + + // 返回压缩后的字节数组(需要截取实际压缩长度) + return Arrays.copyOf(compressedBytes, compressedLength); + + } +} diff --git a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/controller/config/MvcConfig.java b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/controller/config/MvcConfig.java index 8673af17a..263e894ee 100644 --- a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/controller/config/MvcConfig.java +++ b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/controller/config/MvcConfig.java @@ -26,6 +26,7 @@ public class MvcConfig extends WebMvcConfigurerAdapter{ @Autowired private ConsoleConfig config; + @Override public void addInterceptors(InterceptorRegistry registry) { registry.addInterceptor(new LogInterceptor()); diff --git a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/resources/ConsolePortalService.java b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/resources/ConsolePortalService.java index 17dc1806e..1c84a647d 100644 --- a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/resources/ConsolePortalService.java +++ b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/resources/ConsolePortalService.java @@ -79,8 +79,8 @@ public XpipeMeta getXpipeAllMeta(long version) throws SAXException, IOException .queryParam("format", "xml") .queryParam("version", version) .build(); - - ResponseEntity raw = exchange(comp.toUri().toString(), HttpMethod.GET, null, String.class, "getXpipeAllMeta"); + HttpEntity entity = buildHttpEntity(); + ResponseEntity raw = exchange(comp.toUri().toString(), HttpMethod.GET, entity, String.class, "getXpipeAllMeta"); if (StringUtil.isEmpty(raw.getBody())) return null; return DefaultSaxParser.parse(raw.getBody()); } @@ -287,7 +287,8 @@ public DcMeta getDcMeta(String dcName, Set allowTypes) { .queryParam("types", allowTypes.toArray()) .buildAndExpand(dcName); } - ResponseEntity resp = exchange(comp.toUriString(), HttpMethod.GET, null, DcMeta.class, "getDcMeta"); + HttpEntity entity = buildHttpEntity(); + ResponseEntity resp = exchange(comp.toUriString(), HttpMethod.GET, entity, DcMeta.class, "getDcMeta"); return resp.getBody(); } @@ -319,4 +320,18 @@ public ResponseEntity call() { } } + public void setConsoleConfig(ConsoleConfig config) { + this.config = config; + } + + private HttpEntity buildHttpEntity() { + HttpEntity entity = null; + if(!StringUtil.isEmpty(config.getHttpAcceptEncoding())) { + HttpHeaders headers = new HttpHeaders(); + headers.set(HttpHeaders.ACCEPT_ENCODING, config.getHttpAcceptEncoding()); + entity = new HttpEntity<>(headers); + } + return entity; + } + }