Skip to content

Commit

Permalink
Merge pull request #902 from wangqifan/feature/lz4
Browse files Browse the repository at this point in the history
add lz4 for metacache
  • Loading branch information
LanternLee authored Oct 29, 2024
2 parents 8138daa + efda006 commit 97e9741
Show file tree
Hide file tree
Showing 11 changed files with 155 additions and 9 deletions.
8 changes: 7 additions & 1 deletion core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -76,5 +76,11 @@
<groupId>org.slf4j</groupId>
<artifactId>jcl-over-slf4j</artifactId>
</dependency>
</dependencies>
<dependency>
<groupId>org.lz4</groupId>
<artifactId>lz4-java</artifactId>
<version>1.8.0</version>
<scope>compile</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package com.ctrip.xpipe.spring;

import net.jpountz.lz4.LZ4Factory;
import net.jpountz.lz4.LZ4FastDecompressor;
import net.jpountz.lz4.LZ4SafeDecompressor;
import org.apache.http.Header;
import org.apache.http.HttpException;
import org.apache.http.HttpResponse;
import org.apache.http.HttpResponseInterceptor;
import org.apache.http.entity.ByteArrayEntity;
import org.apache.http.entity.InputStreamEntity;
import org.apache.http.protocol.HttpContext;

import net.jpountz.lz4.LZ4FrameInputStream;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;

public class LZ4DecompressionInterceptor implements HttpResponseInterceptor {

private static LZ4Factory factory = LZ4Factory.fastestInstance();

@Override
public void process(HttpResponse response, HttpContext context) throws HttpException, IOException {
Header head = response.getFirstHeader("Content-Encoding");
if (head == null) {
return;
}
String encoding = head.getValue();
if ("lz4".equalsIgnoreCase(encoding)) {
// 获取响应实体
InputStream entityStream = response.getEntity().getContent();
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
byte[] buffer = new byte[1024];
int bytesRead;
while ((bytesRead = entityStream.read(buffer)) != -1) {
outputStream.write(buffer, 0, bytesRead);
}

byte[] compressed = outputStream.toByteArray();

LZ4SafeDecompressor decompressor = factory.safeDecompressor();
byte[] deCompressedData = decompressor.decompress(compressed, compressed.length * 20);

// 将解压缩后的数据设置回响应实体
response.setEntity(new ByteArrayEntity(deCompressedData));

}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ public static RestOperations createCommonsHttpRestTemplate(int maxConnPerRoute,
.setMaxConnTotal(maxConnTotal)
.setDefaultSocketConfig(SocketConfig.custom().setSoTimeout(soTimeout).build())
.setDefaultRequestConfig(RequestConfig.custom().setConnectTimeout(connectTimeout).build())
.addInterceptorLast(new LZ4DecompressionInterceptor())
.build();
ClientHttpRequestFactory factory = new HttpComponentsClientHttpRequestFactory(httpClient) {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ public class DataCenterConfigBean extends AbstractConfigBean {

public static final String KEY_CONSOLE_NO_DB_DOMAIN = "console.no.db.domain";

public static final String KEY_HTTP_ACCEPT_ENCODING = "http.accept.encoding";

private AtomicReference<String> zkConnection = new AtomicReference<>();
private AtomicReference<String> zkNameSpace = new AtomicReference<>();

Expand Down Expand Up @@ -88,4 +90,9 @@ public Map<String, String> getConsoleDomains() {
public String getBeaconOrgRoutes() {
return getProperty(KEY_BEACON_ORG_ROUTE, "[]");
}

public String getHttpAcceptEncoding() {
return getProperty(KEY_HTTP_ACCEPT_ENCODING, null);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -156,4 +156,6 @@ public interface ConsoleConfig extends CoreConfig, CheckerConfig, AlertConfig {

String getConsoleNoDbDomain();

String getHttpAcceptEncoding();

}
Original file line number Diff line number Diff line change
Expand Up @@ -582,6 +582,11 @@ public String getConsoleNoDbDomain() {
return dataCenterConfigBean.getConsoleNoDbDomain();
}

@Override
public String getHttpAcceptEncoding() {
return dataCenterConfigBean.getHttpAcceptEncoding();
}

@Override
public void addListener(ConfigKeyListener listener) {
this.listenersSet.add(listener);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,16 +105,17 @@ public String getDividedMeta(@RequestParam(value="format", required = false) Str
}

@GetMapping(ConsoleCheckerPath.PATH_GET_ALL_META_LONG_PULL)
public DeferredResult<String> getDividedMetaLongPull(@RequestParam(value="format", required = false) String format,
public DeferredResult<byte []> getDividedMetaLongPull(@RequestParam(value="format", required = false) String format,
@RequestParam(value="version") long version) {
DeferredResult<String> response = new DeferredResult<>(Long.valueOf(consoleConfig.getServletMethodTimeoutMilli()));
DeferredResult<byte []> response = new DeferredResult<>(Long.valueOf(consoleConfig.getServletMethodTimeoutMilli()));

executors.execute(new Runnable() {
@Override
public void run() {
try {
XpipeMeta xpipeMeta = metaCache.getXpipeMetaLongPull(version);
response.setResult((format != null && format.equals("xml"))? xpipeMeta.toString() : coder.encode(xpipeMeta));
String result = format != null && format.equals("xml")? xpipeMeta.toString() : coder.encode(xpipeMeta);
response.setResult(result.getBytes());
} catch (InterruptedException e) {
response.setErrorResult(e.getMessage());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public class ConsoleController extends AbstractConsoleController {
private KeeperContainerService keeperContainerService;

@RequestMapping(value = "/dc/{dcId}", method = RequestMethod.GET, produces={MediaType.APPLICATION_JSON_UTF8_VALUE})
public String getDcMeta(@PathVariable String dcId, @RequestParam(value="format", required = false) String format,
public byte[] getDcMeta(@PathVariable String dcId, @RequestParam(value="format", required = false) String format,
@RequestParam(value ="types", required = false) Set<String> types) throws Exception {
DcMeta dcMeta;
Set<String> upperCaseTypes = types == null ? Collections.emptySet()
Expand Down Expand Up @@ -79,7 +79,8 @@ public String getDcMeta(@PathVariable String dcId, @RequestParam(value="format",
}
});
toRemoveClusters.forEach(clusterName -> dcMeta.getClusters().remove(clusterName));
return (format != null && format.equals("xml"))? dcMeta.toString() : coder.encode(dcMeta);
String res = (format != null && format.equals("xml"))? dcMeta.toString() : coder.encode(dcMeta);
return res.getBytes();
}

@RequestMapping(value = "/dc/{dcId}/cluster/{clusterId}", method = RequestMethod.GET, produces={MediaType.APPLICATION_JSON_UTF8_VALUE})
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package com.ctrip.xpipe.redis.console.controller.config;

import net.jpountz.lz4.LZ4Compressor;
import net.jpountz.lz4.LZ4Factory;
import org.springframework.core.MethodParameter;
import org.springframework.http.HttpHeaders;
import org.springframework.http.MediaType;
import org.springframework.http.converter.HttpMessageConverter;
import org.springframework.http.server.ServerHttpRequest;
import org.springframework.http.server.ServerHttpResponse;
import org.springframework.web.bind.annotation.RestControllerAdvice;
import org.springframework.web.context.request.RequestContextHolder;
import org.springframework.web.context.request.ServletRequestAttributes;
import org.springframework.web.servlet.mvc.method.annotation.ResponseBodyAdvice;

import javax.servlet.http.HttpServletRequest;
import java.util.Arrays;

@RestControllerAdvice
public class LZ4CompressionResponseBodyAdvice implements ResponseBodyAdvice<byte[]> {

private static LZ4Factory factory = LZ4Factory.fastestInstance();

@Override
public boolean supports(MethodParameter methodParameter, Class<? extends HttpMessageConverter<?>> aClass) {

if(!methodParameter.getParameterType().equals(byte[].class)) {
return false;
}
// 获取请求头
HttpServletRequest request = ((ServletRequestAttributes) RequestContextHolder.getRequestAttributes()).getRequest();
String encode = request.getHeader(HttpHeaders.ACCEPT_ENCODING);
// 检查请求头中是否包含 Content-Encoding: lz4
return encode!= null && encode.contains("lz4");
}

@Override
public byte[] beforeBodyWrite(byte[] body, MethodParameter methodParameter, MediaType mediaType, Class<? extends HttpMessageConverter<?>> aClass, ServerHttpRequest serverHttpRequest, ServerHttpResponse serverHttpResponse) {

LZ4Compressor compressor = factory.fastCompressor();

// 压缩字节数组
int maxCompressedLength = compressor.maxCompressedLength(body.length);
byte[] compressedBytes = new byte[maxCompressedLength];
int compressedLength = compressor.compress(body, 0, body.length, compressedBytes, 0, maxCompressedLength);

// 设置 Content-Encoding 头部为 lz4
serverHttpResponse.getHeaders().set("Content-Encoding", "lz4");
serverHttpResponse.getHeaders().setContentLength(compressedLength);

// 返回压缩后的字节数组(需要截取实际压缩长度)
return Arrays.copyOf(compressedBytes, compressedLength);

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ public class MvcConfig extends WebMvcConfigurerAdapter{
@Autowired
private ConsoleConfig config;


@Override
public void addInterceptors(InterceptorRegistry registry) {
registry.addInterceptor(new LogInterceptor());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,8 @@ public XpipeMeta getXpipeAllMeta(long version) throws SAXException, IOException
.queryParam("format", "xml")
.queryParam("version", version)
.build();

ResponseEntity<String> raw = exchange(comp.toUri().toString(), HttpMethod.GET, null, String.class, "getXpipeAllMeta");
HttpEntity<String> entity = buildHttpEntity();
ResponseEntity<String> raw = exchange(comp.toUri().toString(), HttpMethod.GET, entity, String.class, "getXpipeAllMeta");
if (StringUtil.isEmpty(raw.getBody())) return null;
return DefaultSaxParser.parse(raw.getBody());
}
Expand Down Expand Up @@ -287,7 +287,8 @@ public DcMeta getDcMeta(String dcName, Set<String> allowTypes) {
.queryParam("types", allowTypes.toArray())
.buildAndExpand(dcName);
}
ResponseEntity<DcMeta> resp = exchange(comp.toUriString(), HttpMethod.GET, null, DcMeta.class, "getDcMeta");
HttpEntity<String> entity = buildHttpEntity();
ResponseEntity<DcMeta> resp = exchange(comp.toUriString(), HttpMethod.GET, entity, DcMeta.class, "getDcMeta");
return resp.getBody();
}

Expand Down Expand Up @@ -319,4 +320,18 @@ public ResponseEntity<T> call() {
}
}

public void setConsoleConfig(ConsoleConfig config) {
this.config = config;
}

private HttpEntity<String> buildHttpEntity() {
HttpEntity<String> entity = null;
if(!StringUtil.isEmpty(config.getHttpAcceptEncoding())) {
HttpHeaders headers = new HttpHeaders();
headers.set(HttpHeaders.ACCEPT_ENCODING, config.getHttpAcceptEncoding());
entity = new HttpEntity<>(headers);
}
return entity;
}

}

0 comments on commit 97e9741

Please sign in to comment.