Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improvement](fe) When checkpoint generates image file, it is flushed to disk in batches to avoid full disk IO due to forced flushing of too large image files at once #39487

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.doris.common.io;

import java.io.DataOutputStream;
import java.io.FilterOutputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;

Expand All @@ -28,47 +28,75 @@
*/

public class CountingDataOutputStream extends DataOutputStream {
private static final long FSYNC_SIZE_IN_BYTES = 1024 * 1024 * 8;
/**
* The number of bytes written to the data output stream so far. If this counter overflows,
* it will be wrapped to Long.MAX_VALUE.
*/
private long count;
/**
* The number of bytes since from last force sync so far. If the fsyncDelta greater than fsyncSize,
* it will force sync
*/
private long fsyncDelta;
protected long fsyncSize;

public CountingDataOutputStream(OutputStream out) {
super(new CountingOutputStream(out, 0L));
this(out, 0, FSYNC_SIZE_IN_BYTES);
}

public CountingDataOutputStream(OutputStream out, long count) {
super(new CountingOutputStream(out, count));
public CountingDataOutputStream(OutputStream out, long fsyncSize) {
this(out, 0, fsyncSize);
}

public long getCount() {
return ((CountingOutputStream) this.out).getCount();
public CountingDataOutputStream(OutputStream out, long count, long fsyncSize) {
super(out);
this.count = count;
this.fsyncSize = fsyncSize;
this.fsyncDelta = 0;
}

public void close() throws IOException {
this.out.close();
public long getCount() {
return this.count;
}

public static class CountingOutputStream extends FilterOutputStream {
private long count;

public CountingOutputStream(OutputStream out, long count) {
super(out);
this.count = count;
}

public long getCount() {
return this.count;
}
public void write(byte[] b, int off, int len) throws IOException {
super.write(b, off, len);
incCount(len);
partialFsync(len);
}

public void write(byte[] b, int off, int len) throws IOException {
this.out.write(b, off, len);
this.count += len;
}
/**
* see {@link java.io.DataOutputStream#write(int)} or {@link java.io.OutputStream#write(int)}
*/
@Override
public synchronized void write(int b) throws IOException {
super.write(b);
incCount(1);
partialFsync(1);
}

public void write(int b) throws IOException {
this.out.write(b);
++this.count;
/**
* Increases the written counter by the specified value until it reaches Long.MAX_VALUE.
*/
private void incCount(int value) {
long temp = count + value;
if (temp < 0) {
temp = Long.MAX_VALUE;
}
count = temp;
}

public void close() throws IOException {
this.out.close();
/**
* image file force sync partial,avoid excessive sync size that causes disk IO throughput to be full,so we force sync
* partial ahead
*/
private void partialFsync(int deltaBytes) throws IOException {
this.fsyncDelta += deltaBytes;
// if this.out is FileOutputStream we should force sync disk in a batch (eg:8MB)
if (this.fsyncDelta >= fsyncSize && this.out instanceof FileOutputStream) {
((FileOutputStream) this.out).getChannel().force(true);
this.fsyncDelta = 0;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,18 +68,18 @@ public Buffer(int size) {
}

public void write(DataInput in, int len) throws IOException {
int newcount = count + len;
if (newcount > buf.length) {
byte[] newbuf = new byte[Math.max(buf.length << 1, newcount)];
System.arraycopy(buf, 0, newbuf, 0, count);
buf = newbuf;
int newCount = count + len;
if (newCount > buf.length) {
byte[] newBuf = new byte[Math.max(buf.length << 1, newCount)];
System.arraycopy(buf, 0, newBuf, 0, count);
buf = newBuf;
}
in.readFully(buf, count, len);
count = newcount;
count = newCount;
}
}

private Buffer buffer;
private final Buffer buffer;

/** Constructs a new empty buffer. */
public DataOutputBuffer() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package org.apache.doris.common.io;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.junit.jupiter.api.Test;

import java.io.File;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;

public class CountingDataOutputStreamTest {
private static final Logger LOG = LogManager.getLogger(CountingDataOutputStreamTest.class);

@Test
public void testWrite() {
File file = new File("/Users/hantongyang/data/meta_test.txt");
try (CountingDataOutputStream out = new CountingDataOutputStream(Files.newOutputStream(file.toPath()))) {
byte[] data = String.valueOf('a').getBytes(StandardCharsets.UTF_8);
out.write(data, 0, data.length);
} catch (Exception e) {
LOG.error(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,18 +45,15 @@
public class LogController {

private static final Logger LOG = LogManager.getLogger(LogController.class);
private static long WEB_LOG_BYTES = 1024 * 1024; // 1MB

private String addVerboseName;
private String delVerboseName;
private static final long WEB_LOG_BYTES = 1024 * 1024; // 1MB

@Autowired
private ReadEnvironment readEnvironment;

@RequestMapping(path = "/log", method = RequestMethod.GET)
public Object log(HttpServletRequest request) {
Map<String, Map<String, String>> map = new HashMap<>();
appendLogConf(map);
appendLogConf(map,null,null);
appendLogInfo(map);
return ResponseEntityBuilder.ok(map);
}
Expand All @@ -65,14 +62,14 @@ public Object log(HttpServletRequest request) {
public Object logLevel(HttpServletRequest request) {
Map<String, Map<String, String>> map = new HashMap<>();
// get parameters
addVerboseName = request.getParameter("add_verbose");
delVerboseName = request.getParameter("del_verbose");
String addVerboseName = request.getParameter("add_verbose");
String delVerboseName = request.getParameter("del_verbose");
LOG.info("add verbose name: {}, del verbose name: {}", addVerboseName, delVerboseName);
appendLogConf(map);
appendLogConf(map,addVerboseName,delVerboseName);
return ResponseEntityBuilder.ok(map);
}

private void appendLogConf(Map<String, Map<String, String>> content) {
private void appendLogConf(Map<String, Map<String, String>> content,String addVerboseName,String delVerboseName) {
Map<String, String> map = new HashMap<>();

try {
Expand All @@ -82,7 +79,7 @@ private void appendLogConf(Map<String, Map<String, String>> content) {
List<String> verboseNames = Lists.newArrayList(configs.y);
if (!verboseNames.contains(addVerboseName)) {
verboseNames.add(addVerboseName);
configs = Log4jConfig.updateLogging(null, verboseNames.toArray(new String[verboseNames.size()]),
configs = Log4jConfig.updateLogging(null, verboseNames.toArray(new String[0]),
null);
readEnvironment.reinitializeLoggingSystem();
}
Expand All @@ -92,7 +89,7 @@ private void appendLogConf(Map<String, Map<String, String>> content) {
List<String> verboseNames = Lists.newArrayList(configs.y);
if (verboseNames.contains(delVerboseName)) {
verboseNames.remove(delVerboseName);
configs = Log4jConfig.updateLogging(null, verboseNames.toArray(new String[verboseNames.size()]),
configs = Log4jConfig.updateLogging(null, verboseNames.toArray(new String[0]),
null);
readEnvironment.reinitializeLoggingSystem();
}
Expand All @@ -104,7 +101,6 @@ private void appendLogConf(Map<String, Map<String, String>> content) {
content.put("LogConfiguration", map);
} catch (IOException e) {
LOG.error(e);
e.printStackTrace();
}
}

Expand All @@ -119,7 +115,7 @@ private void appendLogInfo(Map<String, Map<String, String>> content) {
raf = new RandomAccessFile(logPath, "r");
long fileSize = raf.length();
long startPos = fileSize < WEB_LOG_BYTES ? 0L : fileSize - WEB_LOG_BYTES;
long webContentLength = fileSize < WEB_LOG_BYTES ? fileSize : WEB_LOG_BYTES;
long webContentLength = Math.min(fileSize, WEB_LOG_BYTES);
raf.seek(startPos);
map.put("showingLast", webContentLength + " bytes of log");
StringBuilder sb = new StringBuilder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,14 +79,14 @@ private interface WriteMethod {

private Delegate delegate;

public void setDelegate(CountingDataOutputStream dos, List<MetaIndex> indices) {
private void setDelegate(CountingDataOutputStream dos, List<MetaIndex> indices) {
this.delegate = (name, method) -> {
indices.add(new MetaIndex(name, dos.getCount()));
return method.write();
};
}

public long doWork(String name, WriteMethod method) throws IOException {
private long doWork(String name, WriteMethod method) throws IOException {
if (delegate == null) {
return method.write();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package org.apache.doris.persist.meta;

import org.apache.doris.catalog.Env;

import org.junit.jupiter.api.Test;

import java.io.File;
import java.io.IOException;

public class MetaWriterTest {

@Test
public void testWrite() throws IOException {
File file = new File("");
MetaWriter.write(file, Env.getCurrentEnv());
}
}
Loading