Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/#137 MultiQueue를 이용하여 Controller에서 오는 로그 요청의 부하 감소 #140

Merged
merged 8 commits into from
Aug 29, 2024
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package info.logbat.domain.log.application;

import info.logbat.common.event.EventProducer;
import info.logbat.domain.log.domain.Log;
import info.logbat.domain.log.flatter.LogRequestFlatter;
import info.logbat.domain.log.presentation.payload.request.CreateLogRequest;
import info.logbat.domain.project.application.AppService;
import java.util.ArrayList;
Expand All @@ -15,7 +15,7 @@
@RequiredArgsConstructor
public class LogService {

private final LogRequestFlatter logRequestFlatter;
private final EventProducer<Log> producer;
private final AppService appService;

public void saveLogs(String appKey, List<CreateLogRequest> requests) {
Expand All @@ -28,7 +28,7 @@ public void saveLogs(String appKey, List<CreateLogRequest> requests) {
log.error("Failed to convert request to entity: {}", request, e);
}
});
logRequestFlatter.flatten(logs);
producer.produce(logs);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,9 @@
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Primary;
import org.springframework.stereotype.Component;

@Primary
//@Primary
@Component
public class ReentrantLogQueue<T> implements EventProducer<T>, EventConsumer<T> {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,11 @@
import javax.sql.DataSource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Component;

/**
* 비동기적으로 로그를 처리하는 클래스입니다. 이 클래스는 로그를 저장하는 비동기 작업을 수행하며, 이를 위해 별도의 스레드 풀을 사용합니다.
*/
@Slf4j
@Component
public class AsyncLogProcessor {

// 로그 저장 작업을 수행하는 스레드 풀
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,25 +20,25 @@
public class AsyncLogRepository implements LogRepository {

private final JdbcTemplate jdbcTemplate;
private final AsyncLogProcessor asyncLogProcessor;
private final AsyncMultiProcessor<Log> asyncMultiProcessor;

private static final Long DEFAULT_RETURNS = 0L;

@PostConstruct
public void init() {
log.info("AsyncLogRepository is initialized.");
asyncLogProcessor.init(this::saveLogsToDatabase);
asyncMultiProcessor.init(this::saveLogsToDatabase);
}

@Deprecated
@Override
public long save(Log log) {
asyncLogProcessor.submitLog(log);
return DEFAULT_RETURNS;
}

@Deprecated
@Override
public List<Log> saveAll(List<Log> logs) {
asyncLogProcessor.submitLogs(logs);
return logs;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package info.logbat.domain.log.repository;

import com.zaxxer.hikari.HikariDataSource;
import info.logbat.common.event.EventProducer;
import info.logbat.domain.log.queue.ReentrantLogQueue;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.Consumer;
import javax.sql.DataSource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Primary;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Component;

@Slf4j
@Primary
@Component
public class AsyncMultiProcessor<E> implements EventProducer<E> {

private final List<ReentrantLogQueue<E>> queues;
private final List<ExecutorService> flatterExecutors;
private Consumer<List<E>> saveFunction;
private final int queueCount;

public AsyncMultiProcessor(@Value("${queue.count:3}") int queueCount,
@Value("${jdbc.async.timeout:5000}") Long timeout,
@Value("${jdbc.async.bulk-size:3000}") Integer bulkSize, JdbcTemplate jdbcTemplate) {
this.queueCount = queueCount;
this.queues = new ArrayList<>(queueCount);
this.flatterExecutors = new ArrayList<>(queueCount);
int poolSize = getPoolSize(jdbcTemplate);
setup(queueCount, timeout, bulkSize, poolSize);
}

public void init(Consumer<List<E>> saveFunction) {
this.saveFunction = saveFunction;
}

@Override
public void produce(List<E> data) {
if (data.isEmpty()) {
return;
}
int selectedQueue = ThreadLocalRandom.current().nextInt(queueCount);
flatterExecutors.get(selectedQueue).execute(() -> queues.get(selectedQueue).produce(data));
}

private void setup(int queueCount, Long timeout, Integer bulkSize, int poolSize) {
ExecutorService followerExecutor = Executors.newFixedThreadPool(poolSize);
ReentrantLogQueue<E> queue = new ReentrantLogQueue<>(timeout, bulkSize);

for (int i = 0; i < queueCount; i++) {
queues.add(queue);
flatterExecutors.add(Executors.newSingleThreadExecutor());
}
CompletableFuture.runAsync(() -> leaderTask(queue, followerExecutor));
}

private void leaderTask(ReentrantLogQueue<E> queue, ExecutorService follower) {
while (!Thread.currentThread().isInterrupted()) {
List<E> element = queue.consume();
follower.execute(() -> saveFunction.accept(element));
}
}

private static int getPoolSize(JdbcTemplate jdbcTemplate) {
DataSource dataSource = jdbcTemplate.getDataSource();
if (!(dataSource instanceof HikariDataSource)) {
throw new IllegalArgumentException("DataSource is null");
}
int poolSize = ((HikariDataSource) dataSource).getMaximumPoolSize();
log.debug("Creating AsyncLogProcessor with pool size: {}", poolSize);
return poolSize * 5 / 10;
}
}
Loading