Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

INTERNAL: Refactor api using broadcast operation. #646

Merged
merged 1 commit into from
Aug 18, 2023

Conversation

brido4125
Copy link
Collaborator

@brido4125 brido4125 commented Jul 25, 2023

관련 이슈

https://github.com/jam2in/arcus-works/issues/403
https://github.com/jam2in/arcus-works/issues/424

Motivation

기존 broadcast 관련 api들은 다른 api들과 다른 방식을 통해
latch를 생성하고 op를 생성한다.
이 작업을 통해 다른 api들과 내부 로직 순서를 통일시켜 일관성을 유지시킨다.

주요 변경 지점

  • 모든 broadcast api들을 BroadcastFuture를 사용하여 동작하도록 변경했습니다.
  • broadcastOp 메서드에서 생성하던 latch를 Future 내부에서 생성하도록 변경했습니다.
  • 그로인해 BroadcastFuture 내부에 latch를 countdown 시키는 complete() 메서드를 생성했습니다.
  • ArcusClient의 flush() 메서드가 OperationFuture를 반환하고 있어 BroadcastFuture가 OperationFuture를 상속 받도록 설계했습니다.

변경된 broadcast 연산 api 종류

  • 응용에게 Future 반환하는 api -> 생성된 Future 그대로 리턴
    • MemcachedClient의 flush() 메서드, return type : Future
    • ArcusClient의 flush() 메서드, return type : OperationFuture
  • 응용에게 자료구조 반환하는 api -> api 내부에서 BroadcastFuture의 get() 호출 후 리턴
    • MemcachedClient getVersions(), 메서드 return type : Map<SocketAddress, String>
    • MemcachedClient getStats()메서드, return type: Map<SocketAddress, Map<String, String>>
    • MemcachedClient listSaslMechanisms() 메서드, return type:Set

@brido4125 brido4125 self-assigned this Jul 25, 2023
flushResult.set(s.isSuccess());
if (s.isSuccess()) {
rv.setResult(Boolean.TRUE);
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

이렇게 바꾼 이유는 무엇인가요?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BroadcastFuture의 생성자에서 False라는 값을 기본값을 설정해줍니다.
특정 Boolean 타입이 아닌 null로 생성하게되면
아래의 BroadcastFuture의 get() 메서드 로직에서
timeout이 발생했을 경우
result의 type을 구분하는 조건문에서 NPE를 발생시킵니다.

public T get(long duration, TimeUnit units)
          throws InterruptedException, TimeoutException, ExecutionException {
    
    //...
      if (timedoutOps.size() > 0) {
        if (objRef.get() instanceof Boolean) {
          throw new CheckedOperationTimeoutException(duration, units, timedoutOps);
        }
        throw new OperationTimeoutException(duration, units, timedoutOps);
      }
    }
    //...
  }

위 조건 비교 로직을 통해 timeout 발생 시 응용에게
던져지는 예외를 기존과 동일하게 구분해주고 있습니다.

  • flush : CheckedOperationTimeoutException
  • 그 외 api : OperationTimeoutException

이러한 구현은 응용의 인터페이스에
변화를 주지 않고 리팩토링 하기 위함입니다.

This comment was marked as outdated.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

반영완료했습니다.

public Operation newOp(final MemcachedNode n,
final CountDownLatch latch) {
final BroadcastFuture<Boolean> rv
= new BroadcastFuture<Boolean>(getAllNodes().size(), operationTimeout, Boolean.FALSE);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

locator.getAll()을 2번 호출하는 것보단 1번 호출한 다음 변수에 담아두었다가 두 번 참조하는 것이 좋을 것 같습니다.
BroadcastFuture 생성자의 인자로 사용하고, broadcastOp()의 인자로 사용하는 것입니다.

CountDownLatch blatch = broadcastOp(new BroadcastOpFactory() {
public Operation newOp(final MemcachedNode n,
final CountDownLatch latch) {
final BroadcastFuture<Map<SocketAddress, Map<String, String>>> internalFuture
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

변수 이름을 간단히 future라고만 해도 될 것 같습니다.
같은 이름을 갖는 다른 부분도 마찬가지입니다.

CountDownLatch blatch = broadcastOp(new BroadcastOpFactory() {
public Operation newOp(final MemcachedNode n,
final CountDownLatch latch) {
final CountDownLatch latch = new CountDownLatch(getAllNodes().size());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

여기도 getAllNodes()의 결과를 변수로 담아두었다가 broadcastOp()의 인자로 넘기도록 합시다.

@@ -2356,7 +2210,7 @@ int getAddedQueueSize() {
*
* @return all memcachednode from node locator
*/
Collection<MemcachedNode> getAllNodes() {
protected Collection<MemcachedNode> getAllNodes() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

접근 제한자 바꾸면서 컴파일 에러는 안났나요?
StatisticsHandler 클래스에서 이 메소드를 사용합니다.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

StatisticsHandler와 MemcachedClient는 동일한 패키지 내에 존재합니다.
MemcachedClient 인스턴스를 생성한다면
protected, default 모두 접근 가능합니다.

다만 ArucusClient에서 MemcachedClient를 생성하지 않고
getAllNodes()에 접근하기 위해 protected
접근 제어자 변경 했습니다.

@brido4125 brido4125 force-pushed the refactor/broadcastOp branch 2 times, most recently from d729f39 to 4d03e4b Compare July 26, 2023 08:41

public Collection<MemcachedNode> getNodes() {
return nodes;
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

이 함수를 사용하는 곳이 broadcastOp() 인자에 넘기는 것 외에 없다면, broadcastOp() 호출하는 쪽에서 allNodes를 지역변수에 담고 넘기는 것이 좋을 것 같습니다.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

반영완료 하였습니다.

@@ -1495,24 +1494,22 @@ public void addOperations(final Map<MemcachedNode, Operation> ops) {
/**
* Broadcast an operation to all nodes.
*/
public CountDownLatch broadcastOperation(BroadcastOpFactory of) {
return broadcastOperation(of, locator.getAll());
public void broadcastOperation(BroadcastOpFactory of) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

사용하는 곳이 없다면 제거하도록 합시다.

Operation op = opFact.flush(prefix, delay, false,
new OperationCallback() {
public void receivedStatus(OperationStatus s) {
flushResult.set(s.isSuccess());
rv.setResult(s.isSuccess());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

node의 개수 만큼 Operation 객체를 생성할텐데, 그 결과는 최신 것만 1개를 넣습니다.
이 구조가 조금 의문이 듭니다.
@jhpark816 @oliviarla 의견 부탁드립니다.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

기본값을 True로 설정하고 아래의 로직을 통해
false인 경우를 반영할수 있도록 변경하였습니다.

 public void receivedStatus(OperationStatus s) {
            if (s.isSuccess()) {
              return;
            }
            rv.setResult(s.isSuccess());
          }

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if (!s.isSuccess()) {
  rv.setResult(s.isSuccess());
}

이렇게만 해도 충분할 것 같습니다.
만약 nodes가 empty인 경우 실제로는 연산을 수행하지 않았음에도 연산 수행 결과가 true가 되는데, 그래도 괜찮으려나요?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

말씀하신 문제를 해결하기 위해
nodes가 비어있을 경우에는 생성자의 인자를
false로 result를 설정해줍니다.

final BroadcastFuture<Boolean> rv
            = new BroadcastFuture<Boolean>(operationTimeout, !nodes.isEmpty(), nodes.size());

위와 같이 구현하면 우선은 동작의 정확성은 보장할 수 있습니다.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

추후 인터페이스가 변경될 때,
다른 broadcast API들과 로직을 통일하면 될 것 같습니다.

@brido4125
Copy link
Collaborator Author

brido4125 commented Jul 31, 2023

오프라인에서 논의된 대로
BroadcastFuture의 리턴타입을 Future<Map<MemcachedNode, String>>
같이 통일하려면 하위 호환성이 문제가 발생합니다.

현재 PR은 하위 호환성 문제가 발생하지 않도록 기존의
리턴 타입을 고려하여 리팩토링 진행했습니다.

만약 추후에 사용자 인터페이스가 변경가능하다는 결론이 나오면
Future<Map<MemcachedNode, String>>와 같은
타입을 리턴하도록 api 변경하도록 하겠습니다.

@brido4125 brido4125 force-pushed the refactor/broadcastOp branch 2 times, most recently from d1b7be0 to 0c5a208 Compare August 4, 2023 07:50
@brido4125 brido4125 requested a review from uhm0311 August 9, 2023 01:39
/**
* Broadcast an operation to a specific collection of nodes.
*/
public CountDownLatch broadcastOperation(final BroadcastOpFactory of,
public void broadcastOperation(final BroadcastOpFactory of,
Collection<MemcachedNode> nodes) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indent 맞춰주세요

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

반영완료 하였습니다.

Copy link
Collaborator

@jhpark816 jhpark816 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

리뷰 완료

if (objRef.get() instanceof Boolean) {
throw new CheckedOperationTimeoutException(duration, units, timedoutOps);
}
throw new OperationTimeoutException(duration, units, timedoutOps);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

결과의 type에 따라 expcetion 유형이 달라지니, 위 코드는 혼돈이 됩니다.
다음과 같이

  • future에서는 CheckedOperationTimeoutException 던지고,
  • 호출 쪽에서 필요하면 catch하여 OperationTimeoutException 던지면 될 것 같습니다.

Operation op = opFact.flush(prefix, delay, false,
new OperationCallback() {
public void receivedStatus(OperationStatus s) {
flushResult.set(s.isSuccess());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

기존 코드가 문제가 있어서, 이를 수정하는 PR을 먼저 받아야 할 것 같습니다.

  • flush 실패한 node가 존재하더라도 최종 결과는 성공으로 설정됩니다.

수정 로직은

  • flushResult = true 설정
  • receivedStatus()에서 실패한 경우만 flushResult.set(false) 설정

추가 검토할 사항은 nodes가 empty인 경우,
최종 결과를 true 또는 false 중의 어떤 것으로 할 것인가 입니다.
true이어도 문제는 없을 것 같습니다.

final CountDownLatch latch) {
Collection<MemcachedNode> nodes = getAllNodes();
final BroadcastFuture<Boolean> rv
= new BroadcastFuture<Boolean>(operationTimeout, !nodes.isEmpty(), nodes.size());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

두 번째 인자가 다른 case와는 달라서 혼돈이 되네요.

Map<SocketAddress, Boolean> 형태로 결과를 저장하고,
future.get() 시에 모두 true이면 true 리턴하고
하나라도 false이면 false 리턴하면 됩니다.

불필요하게 Map 저장하는 부분이 있지만,
다른 broadcast 연산과 일관성을 위해 이렇게 유지하는 것이 낫지 않나 생각합니다.

Collection<MemcachedNode> nodes = getAllNodes();
final BroadcastFuture<Boolean> rv
= new BroadcastFuture<Boolean>(operationTimeout, !nodes.isEmpty(), nodes.size());
broadcastOp(new BroadcastOpFactory() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

broadcast() 리턴 값과 future 사용 방식에 관해서는 고민하고 리뷰 달겠습니다.

Copy link
Collaborator

@jhpark816 jhpark816 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

리뷰 완료

final SocketAddress sa = n.getSocketAddress();
rv.put(sa, new HashMap<String, String>());
resultMap.put(sa, new HashMap<String, String>());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

이 경우엔, 1개 노드에 대한 결과를 담는 HashMap을 Callback 안에 두고,

  • gotStat()에서 해당 HashMap에 기록하고,
  • receivedStatus()에서 resultMap에 put 하는 것이 좋겠습니다.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@brido4125
위의 코멘트 확인 바랍니다.
링크는 #646 (comment)

final BroadcastFuture<Map<SocketAddress, Map<String, String>>> future
= new BroadcastFuture<Map<SocketAddress, Map<String, String>>>(
operationTimeout, resultMap, nodes.size());
broadcastOp(new BroadcastOpFactory() {
Copy link
Collaborator

@jhpark816 jhpark816 Aug 11, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BroadcastOpFactory와 broadcastOp 사용 방식이 복잡하게 보입니다.
for 루프를 사용하여 op = opFact.stats() 생성하는 방식이 간단하지 않나 생각합니다.
어떤가요?

만약 변경한다면, 이는 별도 PR로 진행하는 것이 좋습니다.

@brido4125
Copy link
Collaborator Author

brido4125 commented Aug 11, 2023

  1. Flush 로직 수정 PR 생성
  2. broadcast Op 인스턴스 생성 로직 수정 PR 생성(타 api와 동일한 로직으로 수정)
  3. 해당 PR 수정

순으로 리뷰 반영하겠습니다.

@jhpark816
Copy link
Collaborator

@brido4125
broadcast Op 생성 방식은 나중에 처리하시죠.
이의 처리 방식에 대한 의견을 먼저 들었으면 합니다.

@brido4125
Copy link
Collaborator Author

Map<SocketAddress, Boolean> 형태로 결과를 저장하고,
future.get() 시에 모두 true이면 true 리턴하고
하나라도 false이면 false 리턴하면 됩니다.

위와 같이 flush와 다른 broadcast 연산들 간 통일되게 로직을 구성하기 어려운 이유는 아래와 같습니다.

  1. ArcusClient flush() api의 리턴 타입이 OperationFuture입니다.그래서 현재 BraodcastFuture가 OperationFuture을 상속 받고 있습니다.이는 아래와 같은 문제가 발생합니다.
    • flush
      • Broadcast 타입의 Future 생성
      • 내부 field로 resultMap을 Map<SocketAddress, Boolean>으로 생성
      • 최종 결과는 OperationFuture 으로 설정됨
      • Future의 get() 호출 시 resultMap의 value를 순회하는 로직이 get() 로직 내에 포함되어야함
      • 또한 flush만 순회 로직을 가지기에 T 타입이 Boolean인 경우만 위 로직을 가지도록 해야함
    • 그 외 api
      • Broadcast 타입의 Future 생성
      • 내부 field로 resultMap을 Map<SocketAddress, String>으로 생성
      • 최종 결과는 OperationFuture으로 설정됨
      • Future의 get() 호출 시 String 타입 반환
      • get()을 호출하여 await를 수행하고 나서, resultMap을 따로 반환하는 로직으로 구현 가능
    • 결론적으로 현재 로직은 Future 생성자에서 비일관성을 가지는 로직이고 이를 해결하더라도 get()을 호출하는 부분에서 flush와 타 api들간의 비일관성은 생길 수 밖에 없습니다.
  • 현재 구성된 로직도 비일관성을 가지지만 위 로직들 보다는 간결하다고 생각합니다.

@uhm0311
Copy link
Collaborator

uhm0311 commented Aug 17, 2023

BraodcastFuture를 감싸는 new OperationFuture() { ... }을 반환하면 어떨까요?

@brido4125
Copy link
Collaborator Author

"BraodcastFuture를 감싸는" 이라는 표현이
잘 이해가 안가는데, 단순히 OperationFuture 익명 클래스를 생성하는게 맞나요?
대략 아래와 같이 구현할 것 같습니다.
제가 잘못 이해했다면 예시를 들어주시면 감사하겠습니다.

@Override
  public OperationFuture<Boolean> flush(final String prefix, final int delay) {
    Collection<MemcachedNode> nodes = getAllNodes();
    final BroadcastFuture<Boolean> rv
            = new BroadcastFuture<Boolean>(operationTimeout, nodes.size());

    for (MemcachedNode node : nodes) {
      final SocketAddress socketAddress = node.getSocketAddress();
      Operation op = opFact.flush(prefix, delay, false, new OperationCallback() {
        public void receivedStatus(OperationStatus status) {
          if (!status.isSuccess()) {
            rv.putResult(socketAddress, Boolean.FALSE);
          }
        }
        public void complete() {
          rv.complete();
        }
      });
      rv.addOp(op);
      addOp(node, op);
    }
    return new OperationFuture<Boolean>(rv.getLatch(), operationTimeout) {
      @Override
      public Boolean get(long duration, TimeUnit units) throws InterruptedException, TimeoutException, ExecutionException {
        return rv.get(duration, units).isEmpty();
      }
    };
  }

@uhm0311
Copy link
Collaborator

uhm0311 commented Aug 17, 2023

네 예시 코드가 맞습니다.

@jhpark816
Copy link
Collaborator

@uhm0311 @brido4125
현재 PR 대로 하나의 Boolean 값을 가지는 형태로 하시죠.
Map<SocketAddress, Boolean> 결과를 가지는 형태는 반드시 필요할 때에 검토하시죠.

Copy link
Collaborator

@jhpark816 jhpark816 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

리뷰 완료

operationTimeout, resultMap, nodes.size());

for (MemcachedNode node : nodes) {
final SocketAddress socketAddress = node.getSocketAddress();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

줄여서 sa 하고 하죠.
java client에서 sa 라는 변수를 많이 사용하고 있습니다.

Operation op = opFact.stats(arg,
new StatsOperation.Callback() {
public void gotStat(String name, String val) {
resultMap.get(socketAddress).put(name, val);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

예전에 단 코멘트 확인하세요.
링크) #646 (comment)

public CountDownLatch broadcastOperation(final BroadcastOpFactory of,
Collection<MemcachedNode> nodes) {
final CountDownLatch latch = new CountDownLatch(nodes.size());
public void broadcastOperation(final BroadcastOpFactory of, Collection<MemcachedNode> nodes) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

이 메소드가 더 이상 필요 없을 것 같습니다.

};
});
rv.addOp(op);
addOp(node, op);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

addOp()는 현재 아래와 같이 동작합니다.
발생 확률이 아주 낮지만, 중간에 exception 발생할 수 있을 것 같습니다.
그래도 문제는 없죠?

  protected void checkState() {
    if (shuttingDown) {
      throw new IllegalStateException("Shutting down");
    }
    assert isAlive() : "IO Thread is not running.";
  }

  protected Operation addOp(final MemcachedNode node, final Operation op) {
    checkState();
    conn.addOperation(node, op);
    return op;
  }

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

기존 로직에서도 아래 함수를 통해 broadcastOp 인스턴스를
생성할 때 exception가 발생할 수 있었습니다.
checkShuttingDown 인자의 경우 waitForQueue() 메서드를
제외하고는 true로 고정되어 있어 checkState()와 동일한 조건문을 수행합니다.

private CountDownLatch broadcastOp(BroadcastOpFactory of, Collection<MemcachedNode> nodes, boolean checkShuttingDown) {
    if (checkShuttingDown && shuttingDown) {
      throw new IllegalStateException("Shutting down");
    }
    return conn.broadcastOperation(of, nodes);
  }

결론적으로 exception 발생 확률은 존재하지만
별다른 문제는 없다고 생각됩니다.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@brido4125
차이점이 있습니다.

  • 기존 로직: broadcast 수행 전에 1회 검사. 따라서 all or nothing 형태로 동작
  • 수정 로직: 각 node에 대한 연산마다 검사. 따라서, 일부 nodes에만 요청이 갈 수 있음.

참고로, 예전에 addOpMap() 메소드를 추가하였는 데, 이 경우도 all or nothing 형태로 동작합니다.
broadcast 연산일 경우, all or nothing이 필요하다면 이렇게 동작해야 하고,
all or nothing 형태로 동작하지 않아도 된다면, 현 PR과 같이 동작해도 됩니다.

이에 관련하여 최종 동작 방식과 근거를 정리해 보죠.

Copy link
Collaborator

@jhpark816 jhpark816 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

리뷰 완료

};
});
rv.addOp(op);
addOp(node, op);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@brido4125
차이점이 있습니다.

  • 기존 로직: broadcast 수행 전에 1회 검사. 따라서 all or nothing 형태로 동작
  • 수정 로직: 각 node에 대한 연산마다 검사. 따라서, 일부 nodes에만 요청이 갈 수 있음.

참고로, 예전에 addOpMap() 메소드를 추가하였는 데, 이 경우도 all or nothing 형태로 동작합니다.
broadcast 연산일 경우, all or nothing이 필요하다면 이렇게 동작해야 하고,
all or nothing 형태로 동작하지 않아도 된다면, 현 PR과 같이 동작해도 됩니다.

이에 관련하여 최종 동작 방식과 근거를 정리해 보죠.

@@ -13,5 +11,5 @@ public interface BroadcastOpFactory {
* Construct a new operation for delivery to the given node.
* Each operation should count the given latch down upon completion.
*/
Operation newOp(MemcachedNode n, CountDownLatch latch);
Operation newOp(MemcachedNode n);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BroadcastOpFactory가 필요한 지 검토하세요.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

제거해도 되는 인터페이스입니다.

public void receivedStatus(OperationStatus status) {
if (!status.isSuccess()) {
getLogger().warn("Unsuccessful stat fetch: %s",
status);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1 line으로 만들죠.


public void complete() {
future.complete();
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

align 맞춰주세요.

future.addOp(op);
addOp(node, op);
}
Set<String> rv = null;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

future.get() 으로 리턴 받는 타입이 ConcurrentMap<String, String> 아닌가요?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

future.get() 으로 리턴 받는 타입이 ConcurrentMap<String, String> 아닌가요?

네, 해당 Map의 keySet()을 통해 KeyValue 값들이 담긴 Set을 최종 자료구조로 반환합니다.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

궁금한 사항이 있습니다.
아래와 같은 현재 PR 코드가 정상 동작했던 이유는 무엇인가요?

Set<String> rv = future.get(...);

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

future.get().keySet()을 사용합니다.

@brido4125
Copy link
Collaborator Author

이에 관련하여 최종 동작 방식과 근거를 정리해 보죠.

a, b, c MemcahedNode가 존재하는 상황에서 Flush api 요청이 들어왔다고 가정해보겠습니다.
Flush op 인스턴스가 a,b,c마다 할당이 되어야 합니다.
현재 구현 아래와 같은 상황을 발생시킵니다.

  • a, b -> Op 인스턴스 할당 성공
  • c -> Op 인스턴스 할당 실패

이렇게 될 경우 응용은 Node c로부터 발생하는 IllegalStateException가 발생하면
이를 확인하고 예외 처리 로직을 수행해야합니다.
만약 예외처리 로직에서 flush를 한번 더 호출한다고 가정하면
a, b 노드들에 대한 Op 인스턴스들은 의미가 없는 연산을 수행하게 됩니다.
왜냐하면 a, b에 할당된 Op 인스턴스는 첫번째 Flush 연산에서
높은 확률로 연산이 정상적으로 이루어질 수 있기 때문입니다.
(연산 과정에서 예외가 발생할 수 도 있습니다)

그래서 의미없는 연산 부하를 IO 쓰레드 및 서버에 주지 않기 위해
opMap을 이용해 all or nothing으로 동작시켜야 한다고 생각합니다.

@jhpark816
Copy link
Collaborator

@brido4125
opMap 대신에 아래 asyncGetBulk() 코드의 형식을 따르는 것이 나을 것 같습니다.

    checkState();
    for (Map.Entry<MemcachedNode, List<Collection<String>>> me
            : chunks.entrySet()) {
      MemcachedNode node = me.getKey();
      for (Collection<String> lk : me.getValue()) {
        Operation op;
        if (node == null) {
          op = opFact.mget(lk, cb);
        } else {
          op = node.enabledMGetOp() ? opFact.mget(lk, cb)
                                    : opFact.get(lk, cb);
        }
        conn.addOperation(node, op);
        ops.add(op);
      }
    }

@brido4125 brido4125 force-pushed the refactor/broadcastOp branch 2 times, most recently from cce046b to dbdf927 Compare August 18, 2023 04:47
@jhpark816
Copy link
Collaborator

@uhm0311 최종 리뷰 바랍니다.

if (!(op.getState() == OperationState.COMPLETE || op.isCancelled())) {
return false;
}
public void complete() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

메소드 시작 전에 이전 메소드 사이에 1개의 Empty Line을 둡시다.
다른 곳도 포함해주세요.

final BroadcastFuture<Map<SocketAddress, String>> future
= new BroadcastFuture<Map<SocketAddress, String>>(
operationTimeout, result, nodes.size());
checkState();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

checkState() 호출 전에 1개의 Empty Line을 둡시다.

if (op != null && op.isCancelled()) {
throw new RuntimeException(op.getCancelCause());
public void complete() {
future.complete();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indent가 맞지 않습니다.

future.addOp(op);
conn.addOperation(node, op);
}
Map<SocketAddress, Map<String, String>> rv = null;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sync 연산에서 for문과 rv 선언 사이에 1개의 Empty Line을 둡시다.
listSaslMechanisms()에서도 포함해주세요.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

리뷰해주신 사항들 반영완료 했습니다.

}

return rv.keySet();
return rv;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

다른 곳과의 일관성을 위해 try-catch문과 return rv문 사이에 있는 Empty Line을 제거합시다.

@@ -128,6 +127,7 @@
import net.spy.memcached.internal.SMGetFuture;
import net.spy.memcached.internal.PipedCollectionFuture;
import net.spy.memcached.internal.CollectionGetFuture;
import net.spy.memcached.internal.BroadcastFuture;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

import 문이 ABC 순이므로 원래대로라면 이곳에 위치하지 않을 것 같습니다.
프로젝트 루트 디렉토리 우클릭 한 다음 Optimize imports 한 번 실행해주세요.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Optimize imports 동작 시 변경되는 파일이 너무 많아
별도 PR로 처리하겠습니다.

@jhpark816 jhpark816 merged commit 5dde87b into naver:develop Aug 18, 2023
1 check passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants