-
Notifications
You must be signed in to change notification settings - Fork 46
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ENHANCE: Seperate FrontCacheLogic in asyncGetBulk from MemcachedClient to FrontCacheClient. #624
Conversation
f6534e8
to
be4f1df
Compare
be4f1df
to
a7c51f7
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
리뷰 완료했습니다.
* Case 1. local cache is not used. | ||
* All data from Arcus server. | ||
* */ | ||
if (localCacheManager == null) { |
This comment was marked as resolved.
This comment was marked as resolved.
Sorry, something went wrong.
This comment was marked as resolved.
This comment was marked as resolved.
Sorry, something went wrong.
if (localCacheManager == null) { | ||
return super.asyncGetBulk(keys, tc_iter); | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Case 2 주석을 이쪽으로 옮겨야 할 것 같습니다.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
주석을 세분화 시켰습니다
} | ||
|
||
public Map<String, T> get() throws InterruptedException, ExecutionException { | ||
Map<String, T> noneCachedValue = null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
아래 get 메서드에 Long.MAX_VALUE, TimeUnit.MILLISECONDS를 인자로 넣어 호출할 수 있습니다.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
현재 Long.MAX_VALUE 이슈로 인해 아래와 같이
GetBulkFuture의 get()을 호출하도록 변경했습니다.
추후 수정이 필요할 경우 GetBulkFuture 부분만 변경하기 위함입니다.
public Map<String, T> get() throws InterruptedException, ExecutionException {
return super.get();
}
작업 순서 상 아래 PR merge 후 리뷰 요청 다시 드리겠습니다. |
a7c51f7
to
8fcfb2e
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
리뷰 완료
@@ -148,24 +134,16 @@ private Map<String, T> internalGet(long to, TimeUnit unit, | |||
throw new ExecutionException(op.getException()); | |||
} | |||
} | |||
Map<String, T> m = new HashMap<String, T>(); | |||
Map<String, T> decodedMap = new HashMap<String, T>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
용어 변경하는 것이 나을 것 같습니다.
- decodedMap => resultMap
while (keyIter.hasNext() && tc_iter.hasNext()) { | ||
String key = keyIter.next(); | ||
Transcoder<T> tc = tc_iter.next(); | ||
T decodedValue = localCacheManager.get(key, tc); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
용어 변경
- decodedValue => value
frontCacheHit.put(key, decodedValue); | ||
continue; | ||
} | ||
frontCacheMiss.put(key, tc); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if ... else ...
문을 사용하시죠.
@@ -174,6 +184,74 @@ public OperationStatus getStatus() { | |||
GetFuture<T> parent = super.asyncGet(key, tc); | |||
return new FrontCacheGetFuture<T>(localCacheManager, key, parent); | |||
} | |||
@Override |
This comment was marked as resolved.
This comment was marked as resolved.
Sorry, something went wrong.
public <T> BulkFuture<Map<String, T>> asyncGetBulk(Collection<String> keys, | ||
Transcoder<T> tc) { | ||
return asyncGetBulk(keys, new SingleElementInfiniteIterator<Transcoder<T>>(tc)); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
메소드 설명 주석 추가
|
||
public BulkFuture<Map<String, Object>> asyncGetBulk(String... keys) { | ||
return asyncGetBulk(Arrays.asList(keys), transcoder); | ||
} |
This comment was marked as resolved.
This comment was marked as resolved.
Sorry, something went wrong.
This comment was marked as resolved.
This comment was marked as resolved.
Sorry, something went wrong.
@@ -190,4 +268,7 @@ public OperationFuture<Boolean> delete(String key) { | |||
return super.delete(key); | |||
} | |||
|
|||
public LocalCacheManager getLocalCacheManager() { | |||
return localCacheManager; | |||
} |
This comment was marked as resolved.
This comment was marked as resolved.
Sorry, something went wrong.
This comment was marked as resolved.
This comment was marked as resolved.
Sorry, something went wrong.
|
||
public Map<String, T> get(long duration, TimeUnit units) | ||
throws InterruptedException, ExecutionException, TimeoutException { | ||
Map<String, T> noneCachedValue = null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FrontCacheBulkGetFuture.get()
메소드를 응용에서 여러번 호출하는 경우에도 정상 동작해야 할 것 같습니다. 검토 바랍니다.
다른 Future 객체를 통해 get() 요청을 여러번 수행하더라도 같은 결과가 나오도록 구현된 상태인지 검토가 필요한 것 같습니다.
8fcfb2e
to
db0b379
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
리뷰 완료
* */ | ||
final Map<String, T> frontCacheHit = new ConcurrentHashMap<String, T>(); | ||
final Map<String, Transcoder<T>> frontCacheMiss = | ||
new ConcurrentHashMap<String, Transcoder<T>>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
worker thread 혼자 사용하는 map이기 때문에,
ConcurrentHashMap이 아니라 HashMap 타입이어도 됩니다.
new ConcurrentHashMap<String, Transcoder<T>>(); | ||
|
||
Iterator<String> keyIter = keys.iterator(); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
여기 1 line은 제거하시죠.
BulkGetFuture<T> parent = (BulkGetFuture<T>) super.asyncGetBulk( | ||
frontCacheMiss.keySet(), frontCacheMiss.values().iterator()); | ||
|
||
FrontCacheBulkGetFuture<T> rv = new FrontCacheBulkGetFuture<T>(parent, localCacheManager); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FrontCacheBulkGetFuture
생성 시에 frontCacheHit을 그대로 넘기면 될 것 같습니다.
따라서, 아래 인자 순서면 적당하지 않나 생각합니다.
- localCacheManager
- parent
- frontCacheHit
} | ||
putLocalCache(result); | ||
result.putAll(localCachedData); | ||
return result; |
This comment was marked as resolved.
This comment was marked as resolved.
Sorry, something went wrong.
return result; | ||
} | ||
result = super.get(); | ||
return result; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
아래 형태의 코드로 유지합시다.
if (result == null) {
result = super.get();
}
return result;
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
super.get()은 BulkGetFuture 결과를 그대로 리턴하는 것이므로 문제가 있습니다.
FrontCacheBulkGetFuture의 get(long duration, TimeUnit units)
메소드를 호출해야 합니다.
} | ||
} | ||
Iterator<String> keyIter = keys.iterator(); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
여기 1 line도 지우는 것이 나을 듯.
@brido4125 확인 바랍니다. |
db0b379
to
10d06ba
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
리뷰 완료
|
||
public Map<String, T> get() throws InterruptedException, ExecutionException { | ||
try { | ||
return get(DEFAULT_OPERATION_TIMEOUT, MILLISECONDS); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
다른 코드와 일관되게 TimeUnit.MILLISECONDS
값을 사용합시다.
} | ||
} | ||
|
||
public Map<String, T> get(long duration, TimeUnit units) |
This comment was marked as resolved.
This comment was marked as resolved.
Sorry, something went wrong.
This comment was marked as resolved.
This comment was marked as resolved.
Sorry, something went wrong.
10d06ba
to
032ee15
Compare
getSome의 경우 사용자에 의해 여러번 호출 시, 그 결과가 duration에 의해 달라질 수 있습니다. |
032ee15
to
c417ffb
Compare
Cancel 로직 분석논의해주신 대로 응용에서 cancel() 메서드 호출 시, 대략적인 최적화 로직은 아래와 같습니다. 아래는 store 연산의 op의 handleLine 로직입니다. public void handleLine(String line) {
if (this.isCancelled()) {
//COMPLETE로 변경 로직
return;
}
assert getState() == OperationState.READING
: "Read ``" + line + "'' when in " + getState() + " state";
/* ENABLE_REPLICATION if */
if (hasSwitchedOver(line)) {
receivedMoveOperations(line);
return;
}
/* ENABLE_REPLICATION end */
/* ENABLE_MIGRATION if */
if (hasNotMyKey(line)) {
addRedirectSingleKeyOperation(line, key);
transitionState(OperationState.REDIRECT);
return;
}
/* ENABLE_MIGRATION end */
getCallback().receivedStatus(matchStatus(line, STORED, NOT_FOUND, EXISTS));
transitionState(OperationState.COMPLETE);
} 다만 위의 내용은 추후에 별도의 PR로 처리하겠습니다. |
@brido4125 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
리뷰 완료
@@ -148,24 +134,16 @@ private Map<String, T> internalGet(long to, TimeUnit unit, | |||
throw new ExecutionException(op.getException()); | |||
} | |||
} | |||
Map<String, T> m = new HashMap<String, T>(); | |||
Map<String, T> resultMap = new HashMap<String, T>(); | |||
|
This comment was marked as resolved.
This comment was marked as resolved.
Sorry, something went wrong.
|
||
public FrontCacheBulkGetFuture(LocalCacheManager localCacheManager, | ||
BulkGetFuture<T> parentFuture, | ||
Map<String, T> hits) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hits 인자명도 localCachedData로 변경하는 것이 어떤지?
public FrontCacheBulkGetFuture(LocalCacheManager localCacheManager, | ||
BulkGetFuture<T> parentFuture, | ||
Map<String, T> hits) { | ||
super(parentFuture.getRvMap(), parentFuture.getOps(), parentFuture.getLatch()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
아래와 같이 호출할 수 있도록 변경하면 좋겠습니다.
그러면, getRvMap(), getOps(), getLatch() 호출은 BulkGetFuture 구현 안에서만 수행됩니다.
참고로, GetFuture에서도 그 형태로 구현하였습니다.
super(parentFuture);
return result; | ||
} | ||
Map<String, T> getSomeResult = super.getSome(duration, units); | ||
if (getSomeResult.size() == getOps().size()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
getOps() 메소드를 사용해도 되고,
getOpCount() 메소드를 별도로 두어 사용하여도 될 것 같습니다.
이에 대해서는 생각해 보고, 스스로 판단해 주세요.
@brido4125 |
WRITE_QUEUED 상태의 연산은 즉시 COMPLETE로 변경하지 않습니다. |
56f4c14
to
fad7416
Compare
cancel 호출 시 getSome 동작이 Exception throws가 아닌 |
@@ -42,4 +42,7 @@ public interface BulkFuture<V> extends Future<V> { | |||
public V getSome(long timeout, TimeUnit unit) | |||
throws InterruptedException, ExecutionException; | |||
|
|||
|
|||
public int getOpsCount(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OpsCount는 ops가 여러개 있는 것으로 여겨집니다.
OpCount가 낫습니다.
fad7416
to
0673fb9
Compare
…t to FrontCacheMemcachedClient.
0673fb9
to
ae2d5d1
Compare
@jhpark816 |
MemcachedClient asyncBulkGet내의 FrontCache 로직을 FrontCacheClient로 이전 작업을 수행했습니다.