Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ENHANCE: Seperate FrontCacheLogic in asyncGetBulk from MemcachedClient to FrontCacheClient. #624

Merged
merged 1 commit into from
Jul 18, 2023

Conversation

brido4125
Copy link
Collaborator

@brido4125 brido4125 commented Jun 19, 2023

MemcachedClient asyncBulkGet내의 FrontCache 로직을 FrontCacheClient로 이전 작업을 수행했습니다.

@brido4125 brido4125 changed the title ENHANCE: Seperate FrontCacheLogic from MemcachedClient to FrontCacheClient. ENHANCE: Seperate FrontCacheLogic in asyncGetBulk from MemcachedClient to FrontCacheClient. Jun 19, 2023
@brido4125 brido4125 force-pushed the enhance/asyncBulkGet branch 3 times, most recently from f6534e8 to be4f1df Compare June 20, 2023 02:02
@brido4125 brido4125 self-assigned this Jun 20, 2023
Copy link
Collaborator

@oliviarla oliviarla left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

리뷰 완료했습니다.

* Case 1. local cache is not used.
* All data from Arcus server.
* */
if (localCacheManager == null) {

This comment was marked as resolved.

This comment was marked as resolved.

if (localCacheManager == null) {
return super.asyncGetBulk(keys, tc_iter);
}

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Case 2 주석을 이쪽으로 옮겨야 할 것 같습니다.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

주석을 세분화 시켰습니다

}

public Map<String, T> get() throws InterruptedException, ExecutionException {
Map<String, T> noneCachedValue = null;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

아래 get 메서드에 Long.MAX_VALUE, TimeUnit.MILLISECONDS를 인자로 넣어 호출할 수 있습니다.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

현재 Long.MAX_VALUE 이슈로 인해 아래와 같이
GetBulkFuture의 get()을 호출하도록 변경했습니다.

추후 수정이 필요할 경우 GetBulkFuture 부분만 변경하기 위함입니다.

public Map<String, T> get() throws InterruptedException, ExecutionException {
    return super.get();
  }

@jhpark816 jhpark816 requested a review from uhm0311 July 3, 2023 09:31
@brido4125
Copy link
Collaborator Author

작업 순서 상 아래 PR merge 후 리뷰 요청 다시 드리겠습니다.
#622

@brido4125 brido4125 marked this pull request as draft July 4, 2023 02:04
@brido4125 brido4125 marked this pull request as ready for review July 6, 2023 02:03
@brido4125 brido4125 requested a review from oliviarla July 6, 2023 02:03
Copy link
Collaborator

@jhpark816 jhpark816 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

리뷰 완료

@@ -148,24 +134,16 @@ private Map<String, T> internalGet(long to, TimeUnit unit,
throw new ExecutionException(op.getException());
}
}
Map<String, T> m = new HashMap<String, T>();
Map<String, T> decodedMap = new HashMap<String, T>();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

용어 변경하는 것이 나을 것 같습니다.

  • decodedMap => resultMap

while (keyIter.hasNext() && tc_iter.hasNext()) {
String key = keyIter.next();
Transcoder<T> tc = tc_iter.next();
T decodedValue = localCacheManager.get(key, tc);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

용어 변경

  • decodedValue => value

frontCacheHit.put(key, decodedValue);
continue;
}
frontCacheMiss.put(key, tc);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if ... else ... 문을 사용하시죠.

@@ -174,6 +184,74 @@ public OperationStatus getStatus() {
GetFuture<T> parent = super.asyncGet(key, tc);
return new FrontCacheGetFuture<T>(localCacheManager, key, parent);
}
@Override

This comment was marked as resolved.

public <T> BulkFuture<Map<String, T>> asyncGetBulk(Collection<String> keys,
Transcoder<T> tc) {
return asyncGetBulk(keys, new SingleElementInfiniteIterator<Transcoder<T>>(tc));
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

메소드 설명 주석 추가


public BulkFuture<Map<String, Object>> asyncGetBulk(String... keys) {
return asyncGetBulk(Arrays.asList(keys), transcoder);
}

This comment was marked as resolved.

This comment was marked as resolved.

@@ -190,4 +268,7 @@ public OperationFuture<Boolean> delete(String key) {
return super.delete(key);
}

public LocalCacheManager getLocalCacheManager() {
return localCacheManager;
}

This comment was marked as resolved.

This comment was marked as resolved.


public Map<String, T> get(long duration, TimeUnit units)
throws InterruptedException, ExecutionException, TimeoutException {
Map<String, T> noneCachedValue = null;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FrontCacheBulkGetFuture.get() 메소드를 응용에서 여러번 호출하는 경우에도 정상 동작해야 할 것 같습니다. 검토 바랍니다.

다른 Future 객체를 통해 get() 요청을 여러번 수행하더라도 같은 결과가 나오도록 구현된 상태인지 검토가 필요한 것 같습니다.

Copy link
Collaborator

@jhpark816 jhpark816 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

리뷰 완료

* */
final Map<String, T> frontCacheHit = new ConcurrentHashMap<String, T>();
final Map<String, Transcoder<T>> frontCacheMiss =
new ConcurrentHashMap<String, Transcoder<T>>();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

worker thread 혼자 사용하는 map이기 때문에,
ConcurrentHashMap이 아니라 HashMap 타입이어도 됩니다.

new ConcurrentHashMap<String, Transcoder<T>>();

Iterator<String> keyIter = keys.iterator();

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

여기 1 line은 제거하시죠.

BulkGetFuture<T> parent = (BulkGetFuture<T>) super.asyncGetBulk(
frontCacheMiss.keySet(), frontCacheMiss.values().iterator());

FrontCacheBulkGetFuture<T> rv = new FrontCacheBulkGetFuture<T>(parent, localCacheManager);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FrontCacheBulkGetFuture 생성 시에 frontCacheHit을 그대로 넘기면 될 것 같습니다.
따라서, 아래 인자 순서면 적당하지 않나 생각합니다.

  • localCacheManager
  • parent
  • frontCacheHit

}
putLocalCache(result);
result.putAll(localCachedData);
return result;

This comment was marked as resolved.

return result;
}
result = super.get();
return result;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

아래 형태의 코드로 유지합시다.

    if (result == null) {
      result = super.get();
    }
    return result;

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

super.get()은 BulkGetFuture 결과를 그대로 리턴하는 것이므로 문제가 있습니다.
FrontCacheBulkGetFuture의 get(long duration, TimeUnit units) 메소드를 호출해야 합니다.

}
}
Iterator<String> keyIter = keys.iterator();

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

여기 1 line도 지우는 것이 나을 듯.

@jhpark816
Copy link
Collaborator

@brido4125 확인 바랍니다.

Copy link
Collaborator

@jhpark816 jhpark816 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

리뷰 완료


public Map<String, T> get() throws InterruptedException, ExecutionException {
try {
return get(DEFAULT_OPERATION_TIMEOUT, MILLISECONDS);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

다른 코드와 일관되게 TimeUnit.MILLISECONDS 값을 사용합시다.

}
}

public Map<String, T> get(long duration, TimeUnit units)

This comment was marked as resolved.

This comment was marked as resolved.

@brido4125
Copy link
Collaborator Author

getSome의 경우 사용자에 의해 여러번 호출 시, 그 결과가 duration에 의해 달라질 수 있습니다.
그래서 이전 요청의 duration과 현재 요청의 duration이
같을 경우만 기존의 결과를 반환하도록 구현했습니다.

@brido4125
Copy link
Collaborator Author

Cancel 로직 분석

논의해주신 대로 응용에서 cancel() 메서드 호출 시,
OperationState를 Complete 상태로 변경해주고 있습니다.
즉, cancel을 호출한 시점에 연산이 완료되지 않은 op들에 대해서
Complete 상태로 변경
해주고 있습니다.
다만 서버로부터 응답을 받는 로직을 모두 수행한 다음
transitionState(OperationState.COMPLETE)을 통해
op의 state를 변경해주고 있습니다.
즉, Cancel인 상태라면 수행해도 되지 않을 로직을 수행하며
COMPLETE 상태로 변경
해주고 있습니다.

대략적인 최적화 로직은 아래와 같습니다.
(여러개의 api에 대해 연구를 더 진행하면 로직이 변경될 것 같습니다)

아래는 store 연산의 op의 handleLine 로직입니다.
cancel을 호출하더라도 아래의 handleLine은 수행됩니다.
현재 로직은 if (this.isCancelled())가 없기에
Line 별로 콜백 함수를 수행을 하고 나서
COMPLETE 상태로 변경합니다.
하지만 cancel이 호출되었다는 것은 콜백 함수를 호출할 필요도
없기에 이러한 부분을 if (this.isCancelled())를 통해 최적화 할 수 있습니다.

public void handleLine(String line) {
    if (this.isCancelled()) {
     //COMPLETE로 변경 로직
      return;
    }
    assert getState() == OperationState.READING
            : "Read ``" + line + "'' when in " + getState() + " state";
    /* ENABLE_REPLICATION if */
    if (hasSwitchedOver(line)) {
      receivedMoveOperations(line);
      return;
    }
    /* ENABLE_REPLICATION end */
    /* ENABLE_MIGRATION if */
    if (hasNotMyKey(line)) {
      addRedirectSingleKeyOperation(line, key);
      transitionState(OperationState.REDIRECT);
      return;
    }
    /* ENABLE_MIGRATION end */
    getCallback().receivedStatus(matchStatus(line, STORED, NOT_FOUND, EXISTS));
    transitionState(OperationState.COMPLETE);
  }

다만 위의 내용은 추후에 별도의 PR로 처리하겠습니다.

@jhpark816
Copy link
Collaborator

@brido4125
하나의 연결로 여러 요청을 비동기 방식으로 순서적으로 처리하기 때문에,
cancel 되더라도 응답 처리는 필요합니다.
응답을 처리하지 않고 다음 연산의 응답으로 처리하면 맞지 않게 됩니다.
응답을 읽어 버려도 되지만, 응답을 원래 로직으로 처리하여도 됩니다.
replication or migration 처리는 재검토가 필요해 보이네요.

Copy link
Collaborator

@jhpark816 jhpark816 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

리뷰 완료

@@ -148,24 +134,16 @@ private Map<String, T> internalGet(long to, TimeUnit unit,
throw new ExecutionException(op.getException());
}
}
Map<String, T> m = new HashMap<String, T>();
Map<String, T> resultMap = new HashMap<String, T>();

This comment was marked as resolved.


public FrontCacheBulkGetFuture(LocalCacheManager localCacheManager,
BulkGetFuture<T> parentFuture,
Map<String, T> hits) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hits 인자명도 localCachedData로 변경하는 것이 어떤지?

public FrontCacheBulkGetFuture(LocalCacheManager localCacheManager,
BulkGetFuture<T> parentFuture,
Map<String, T> hits) {
super(parentFuture.getRvMap(), parentFuture.getOps(), parentFuture.getLatch());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

아래와 같이 호출할 수 있도록 변경하면 좋겠습니다.
그러면, getRvMap(), getOps(), getLatch() 호출은 BulkGetFuture 구현 안에서만 수행됩니다.
참고로, GetFuture에서도 그 형태로 구현하였습니다.

super(parentFuture);

return result;
}
Map<String, T> getSomeResult = super.getSome(duration, units);
if (getSomeResult.size() == getOps().size()) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getOps() 메소드를 사용해도 되고,
getOpCount() 메소드를 별도로 두어 사용하여도 될 것 같습니다.
이에 대해서는 생각해 보고, 스스로 판단해 주세요.

@jhpark816
Copy link
Collaborator

@brido4125
요청을 캐시 서버로 보내지 않은 연산을 cancel 처리한 경우는
해당 연산의 상태를 COMPLETE 상태로 바로 변경하나요?

@brido4125
Copy link
Collaborator Author

요청을 캐시 서버로 보내지 않은 연산을 cancel 처리한 경우는
해당 연산의 상태를 COMPLETE 상태로 바로 변경하나요?

WRITE_QUEUED 상태의 연산은 즉시 COMPLETE로 변경하지 않습니다.
해당 상태의 연산이라면 응답 처리 로직 최적화를 수행해도 관계 없을 것 같습니다.

@brido4125 brido4125 force-pushed the enhance/asyncBulkGet branch 3 times, most recently from 56f4c14 to fad7416 Compare July 12, 2023 11:28
@brido4125
Copy link
Collaborator Author

cancel 호출 시 getSome 동작이 Exception throws가 아닌
일부 연산 결과라도 리턴하도록 하는 PR은 별도로 올리겠습니다.

@@ -42,4 +42,7 @@ public interface BulkFuture<V> extends Future<V> {
public V getSome(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException;


public int getOpsCount();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OpsCount는 ops가 여러개 있는 것으로 여겨집니다.
OpCount가 낫습니다.

@brido4125
Copy link
Collaborator Author

@jhpark816
최종 리뷰 부탁드립니다.

@jhpark816 jhpark816 merged commit 78cbcb9 into naver:develop Jul 18, 2023
1 check passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants