Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

INTERNAL: Creating piped collection operation future. #619

Merged
merged 1 commit into from
Jun 27, 2023

Conversation

brido4125
Copy link
Collaborator

https://github.com/jam2in/arcus-works/issues/403 관련 PR입니다.
piped operation Future 생성 후, CollectionFuture 상속 받도록하였습니다.

@brido4125 brido4125 requested a review from jhpark816 June 12, 2023 09:51
@brido4125 brido4125 self-assigned this Jun 13, 2023
Copy link
Collaborator

@jhpark816 jhpark816 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1차 리뷰

@@ -126,6 +126,7 @@
import net.spy.memcached.internal.OperationFuture;
import net.spy.memcached.internal.BulkOperationFuture;
import net.spy.memcached.internal.SMGetFuture;
import net.spy.memcached.internal.PipedOperationFuture;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PipedCollectionFuture 용어가 더 적합하지 않는 지?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

위의 코멘트 참고 바랍니다.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

반영완료

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@brido4125
반영 안되어 있습니다.

public CollectionOperationStatus getOperationStatus() {
for (OperationStatus status : mergedOperationStatus) {
if (!status.isSuccess()) {
return new CollectionOperationStatus(status);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

기존 코드가 이상하지 않나요?
mergedOperationStatus에는 CollectionOperationStatus 보관 중인 데,
이를 다시 ColectionOperationStatus 생성하여 리턴하고 있습니다.
이상하다면, 이 부분을 먼저 별도 PR로 수정합시다.

Copy link
Collaborator Author

@brido4125 brido4125 Jun 14, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mergedOperationStatusCollectionOperationStatus가 아닌 OperationStastus를 보관하고 있습니다.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

다시 검토해서 코멘트 올리겠습니다.

}
return false;
}

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

get() 메소드도 추가합시다.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

get() 메서드는 CollectionFuture을 상속 받고, 기존 로직에서도 구현하지 않아 따로 오버라이딩하지 않았습니다.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

get() 메소드는 OperationFuture에서 상속 받고 있어서 구현하지 않아도 될 것 같습니다.

mergedResult.put(index, status);
}

public void addOp(Operation op) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

기존에 setOperation() 메소드가 있으니,
이와 일관되게 addOperation() 명칭이 나은 것 같습니다.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

반영완료

}
}
return true;
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isDone() 메소드를 isCancelled() 다음에 두는 것이 좋겠습니다.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

반영완료

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

무조건 첫번째 OperationStatus 리턴하는 것이 이상하네여.
아래는 기존 로직이니, 참고 바랍니다.

     @Override
      public CollectionOperationStatus getOperationStatus() {
        for (OperationStatus status : mergedOperationStatus) {
          if (!status.isSuccess()) {
            return new CollectionOperationStatus(status);
          }
        }
        return new CollectionOperationStatus(true, "END", CollectionResponse.END);
      }

mergedOperationStatus.add(status);
}

public void addMergedResult(int index, CollectionOperationStatus status) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

용어 변경 검토

  • addMergedResult => addEachResult
  • addCollectionOpStatus => addOperationStatus

@@ -974,7 +969,7 @@ public void receivedStatus(OperationStatus status) {
getLogger().warn("Unhandled state: " + status);
cstatus = new CollectionOperationStatus(status);
}
mergedOperationStatus.add(cstatus);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

기존 로직에서 CollectionOperationStatus가 아닌 OperationStatus 를 넘기는 것이 맞지 않는 지?

@brido4125
Copy link
Collaborator Author

brido4125 commented Jun 14, 2023

기존 로직에서 CollectionOperationStatus가 아닌 OperationStatus 를 넘기는 것이 맞지 않는 지?

로직 상으로는 OperationStatus을 넘기는게 맞지만 이를 다시 CollectionOperationStatus로 생성해주고 있기 때문에 아래와 같은 방안을 생각했습니다.

아래는 현재 piped 연산의 receivedStatus() 메서드 구현입니다.
receivedStatus인자는 OperationStatus로 설정되는 반면
CollectionPipedUpdateOperationImpl
CollectionPipedInsertOperationImpl에서
receivedStatus를 호출하며 인자로 넣어주는 타입은 CollectionOperationStatus입니다.
다만, 현재 로직은 부모타입은 자식타입을 인수로 받을 수 있어서 해당 타입의 인스턴스가 전달됩니다.

public void receivedStatus(OperationStatus status) {
              CollectionOperationStatus cstatus;

              if (status instanceof CollectionOperationStatus) {
                cstatus = (CollectionOperationStatus) status;
              } else {
                getLogger().warn("Unhandled state: " + status);
                cstatus = new CollectionOperationStatus(status);
              }
              rv.addCollectionOpStatus(cstatus);
            }

결론적으로 코멘트 주신것처럼 cstatus를 받고도 getOperationStatus에서
새로운 CollectionOperationStatus을 생성하는것은 비효율적입니다.
그래서 mergedOperationStatus의 리스트 타입을
OperationStatus -> CollectionOperationStatus로 변경하고
getOperationStatus메서드도 아래와 같이 변경할 예정입니다.

 @Override
  public CollectionOperationStatus getOperationStatus() {
    return mergedOperationStatus.get(0);
  }

Copy link
Collaborator

@jhpark816 jhpark816 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

리뷰 완료

}
return false;
}

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

get() 메소드는 OperationFuture에서 상속 받고 있어서 구현하지 않아도 될 것 같습니다.

}
}
return true;
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

무조건 첫번째 OperationStatus 리턴하는 것이 이상하네여.
아래는 기존 로직이니, 참고 바랍니다.

     @Override
      public CollectionOperationStatus getOperationStatus() {
        for (OperationStatus status : mergedOperationStatus) {
          if (!status.isSuccess()) {
            return new CollectionOperationStatus(status);
          }
        }
        return new CollectionOperationStatus(true, "END", CollectionResponse.END);
      }

return mergedOperationStatus.get(0);
}

public void addCollectionOpStatus(CollectionOperationStatus status) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

용어를 addOperationStatus()로 변경합시다.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

반영완료

@@ -126,6 +126,7 @@
import net.spy.memcached.internal.OperationFuture;
import net.spy.memcached.internal.BulkOperationFuture;
import net.spy.memcached.internal.SMGetFuture;
import net.spy.memcached.internal.PipedOperationFuture;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

위의 코멘트 참고 바랍니다.

@brido4125
Copy link
Collaborator Author

무조건 첫번째 OperationStatus 리턴하는 것이 이상하네요. 아래는 기존 로직이니, 참고 바랍니다.

현재 mergedOperationStatus의 경우 구현체가 사이즈가 1인 ArrayList라서
굳이 for 루프를 돌지 않아도 되어 위와 같이 구현하였습니다.

구현은 아래와 같습니다.

private final List<CollectionOperationStatus> mergedOperationStatus = Collections
          .synchronizedList(new ArrayList<CollectionOperationStatus>(1));

thread-safe하게 CollectionOperationStatus 값을 read하기 위해 위와 같이 설계한것 같습니다.
다만, mergedOperationStatus에 값을 설정해주는 쓰레드는
IO 쓰레드이기 때문에 동시성 이슈가 발생하지 않을 것 같고,
동시성 이슈가 발생할 수 있는 지점은
응용에서 getOperationStatus()를 호출할 때라고 생각됩니다.
하지만 이는 read 동작만 수행하기에 thread-safe한 자료구조를 사용하지 않아도 된다고 생각됩니다.

의견 부탁드립니다.

@jhpark816
Copy link
Collaborator

mergedOperationStatus List 크기가 1이어도 되는 지를 먼저 확인해 주시죠.

@jhpark816
Copy link
Collaborator

@brido4125
asyncSetPipedExist() 메소드도 확인 바랍니다.

@brido4125
Copy link
Collaborator Author

brido4125 commented Jun 14, 2023

mergedOperationStatus List 크기가 1이어도 되는 지를 먼저 확인해 주시죠.

확인해본 결과 mergedOperationStatus List가 1이면 에러가 발생할 수도 있습니다.

asyncBopPipedUpdateBulk를 예시로 검토했습니다.(Mop,Lop,Sop 존재)
(참고로 PipedInsertBulk도 동일한 로직을 가집니다)

MAX_PIPED_ITEM_COUNT(500으로 설정)보다 elements의 사이즈가 작은 경우는
하나의 piped Operation이 생성되는 로직이고
elements의 사이즈가 500보다 클 경우는
여러개의 piped Operation이 생성되는 로직입니다.
현재 PR에서 새롭게 생성한 PipedFuture는
여러개의 piped Op가 생성되는 경우를 다루는 Future입니다.
지금 상황처럼 여러개의 Op가 생성되는 경우
해당 pipedOp들마다 각각의 CollectionOperationStatus를 가지게 됩니다.
즉, 현재 구조처럼 Future에서 한개의 CollectionOperationStatus을 가지게 되면
가장 마지막에 receivedStatus()를 호출한 op의 CollectionOperationStatus
해당 값이 설정되게 됩니다.
결론적으로 마지막 Operation의 상태만 반영이 되는것입니다.

@Override
  public <T> CollectionFuture<Map<Integer, CollectionOperationStatus>> asyncBopPipedUpdateBulk(
          String key, List<Element<T>> elements, Transcoder<T> tc) {

    if (elements.size() <= CollectionPipedUpdate.MAX_PIPED_ITEM_COUNT) {
      CollectionPipedUpdate<T> collectionPipedUpdate = new BTreePipedUpdate<T>(
              key, elements, tc);
      return asyncCollectionPipedUpdate(key, collectionPipedUpdate);
    } else {
      PartitionedList<Element<T>> list = new PartitionedList<Element<T>>(
              elements, CollectionPipedUpdate.MAX_PIPED_ITEM_COUNT);

      List<CollectionPipedUpdate<T>> collectionPipedUpdateList =
          new ArrayList<CollectionPipedUpdate<T>>(list.size());

      for (int i = 0; i < list.size(); i++) {
        collectionPipedUpdateList.add(new BTreePipedUpdate<T>(key, list.get(i), tc));
      }

      return asyncCollectionPipedUpdate(key, collectionPipedUpdateList);
    }
  }

해결방안

현재 asyncCollectionPipedUpdate 메서드는 인수로
List<CollectionPipedUpdate<T>> updateList를 받고 있습니다.
해당 List는 Element가 500보다 큰 경우
500 단위로 Element를 묶어서 리스트화 시킨 인수입니다.
ex) Element 1200개가 들어오면 List size는 3
리스트의 size를 Future의 생성자에 넣어주어
해당 값만큼 mergedOperationStatus 크기를 설정해주면 됩니다.

@jhpark816
Copy link
Collaborator

jhpark816 commented Jun 14, 2023

@brido4125
1은 List의 initial capacity 입니다.
1인 list 만들어 다수의 element 를 직접 넣어 보세요.
아래 참고하시고요

Each ArrayList instance has a capacity. The capacity is the size of the array used to store the elements in the list. It is always at least as large as the list size. As elements are added to an ArrayList, its capacity grows automatically. The details of the growth policy are not specified beyond the fact that adding an element has constant amortized time cost.

@jhpark816
Copy link
Collaborator

@brido4125
ok. udpateList 크기로 설정해 주면, 더 명확할 것 같습니다.
기존 piped 연산 처리의 수정 사항을 먼저 PR 받는 것이 좋을 것 같습니다.

return true;
}
};
return rv;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@brido4125
ops Collection을 만들어 사용하는 아래 방식을 검토해 보시죠.

  • i 변수 대신 idx 변수 사용
  • rv.setOperations()로 한번에 설정
  • addOps() 코드 최적화
    • key validate 1회만 수행
    for (int idx = 0; idx < updateList.size(); idx++) {
      final CollectionPipedUpdate<T> update = updateList.get(idx);
      Operation op = opFact.collectionPipedUpdate(key, update,
          new CollectionPipedUpdateOperation.Callback() {
          });
      ops.add(op);
    }
    rv.setOperations(ops);
    addOps(key, ops);
    return rv;

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i 변수 대신 idx 변수 사용

현재 idx 변수는 Callback 객체의 내부에서 사용되기 때문에
항상 final 키워드로 선언되어야합니다.

만약 i를 사용하게 될 경우 final 변수가 아니기 때문에 내부 Callback의 구현에서 연산의 인덱싱을 할 수 없습니다.

결론적으로 idx변수는 기존처럼 사용해야합니다.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ops Collection을 만들어 사용하는 아래 방식을 검토해 보시죠.

해당 방식 적용하여 리팩토링 했습니다.

@jhpark816
Copy link
Collaborator

@brido4125

  • 본 PR 마지막 코멘트 확인 및 수정하고, rebase 바랍니다.
  • 최근 PR에 piped 연산 코드에 정리 부분을 merge하기 전에 본 PR을 먼저 merge하는 것이 좋겠습니다.

@brido4125
Copy link
Collaborator Author

일전에는 piped 작업의 순서에 있어 해당 PR을 가장 마지막 순서로 지정하라고 하셨는데
어떤 이유에서 작업 순서를 변경해야하나요?

아래는 우선적으로 수행되어야 한다고 가정하고 작업한 PR들입니다.

@jhpark816
Copy link
Collaborator

jhpark816 commented Jun 23, 2023

기존 single 연산을 multi 연산의 형식으로 변경하는 것이라서,
multi 연산 형식의 구현이 먼저 잘 정리되어 반영되는 것이 좋겠습니다.

@jhpark816
Copy link
Collaborator

pipe exist 연산의 경우는 piped future가 어떻게 달라질지 영향을 미칠 수 있으므로,
먼저 정리되는 것이 좋겠습니다.

@jhpark816
Copy link
Collaborator

@brido4125
piped exist 연산에 의하여 piped future 달라지는 부분이 발생하나요?

@brido4125
Copy link
Collaborator Author

brido4125 commented Jun 23, 2023

piped exist 연산은 현재 Map<T,Boolean> 을 future.get()의 결과로 리턴해줍니다.
간략한 설명은 아래와 같습니다.

key : set에 존재하는지 확인할 대상
value : 해당 key의 존재 여부를 나타냄 . 즉,collectionOperationStatus가 EXIST면 True, NOT_EXIS면 false로 설정 이외의 실패 상태를 받을 경우는 map에 put하지 않고 mergedOperationStatus에 추가해줍니다.

하지만 exist를 제외한 piped 연산들은
Map<Integer, CollectionOperationStatus> 를 Future.get()의 결과로 리턴해줍니다.
실패한 연산을 map에 put 해줍니다.

결론적으로 기존의 piped future와 exist piped 연산의 return value가 달라 달라지는 부분이 생길 수 있습니다.

제 생각에는 PipedCollectionFuture에 제네릭 <K,V>를 적용하여 여러 타입의 key-value 쌍을 받도록 해서 piped exist 또한 기존의 PipedCollectionFuture를 사용할 수 있게 하는것이 좋다고 생각합니다.

@brido4125
Copy link
Collaborator Author

brido4125 commented Jun 23, 2023

우선 piped exist 연산으로 인한 추후 변경 가능성을 고려해 PipedCollectionFuture을 제네릭 <K,V> 타입으로 변경하였습니다.

Copy link
Collaborator

@jhpark816 jhpark816 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

리뷰 완료

}
};
addOps(key, ops);
rv.setOps(ops);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

아래의 순서대로 합시다.

rv.setOps(ops);
addOps(key, ops);

return;
}
findNode.addOpToInputQ(op);
addedQueue.offer(findNode);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

아래와 같이 하면 될 것 같습니다.

MemcachedNode node = findNodeByKey(key);
for (Operation op : ops) {
  addOperation(node, op);
}

import java.util.concurrent.TimeoutException;
import java.util.concurrent.ExecutionException;

public class PipedCollectionFuture<K, V>
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

piped exist 연산에서 key는 어떤 타입인가요?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

사용자가 정의한 타입입니다.
그래서 제네릭 로 정의됩니다.

앞선 코멘트에 Map<T,Boolean>를 리턴한다고 있습니다.
참고 부탁드립니다.

@jhpark816
Copy link
Collaborator

@uhm0311 @oliviarla 리뷰 바랍니다.

@brido4125 brido4125 force-pushed the refactor/pipeOperation branch 2 times, most recently from 337b587 to b1ef7d5 Compare June 23, 2023 05:39
public void addOperations(String key, Collection<Operation> ops) {
MemcachedNode findNode = findNodeByKey(key);
for (Operation op : ops) {
addOperation(findNode, op);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jhpark816 @oliviarla

이런 변경은 별도의 PR로 분리하는 것이 낫지 않을까요?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

오프라인 검토에 의해 해당 변경은 이전 상태로 되돌리겠습니다.

@jhpark816
Copy link
Collaborator

@uhm0311 @oliviarla
본 PR 리뷰하고, 의견 주세요.

@uhm0311
Copy link
Collaborator

uhm0311 commented Jun 27, 2023

@brido4125 수정이 모두 끝나면 코멘트 바랍니다.

@brido4125
Copy link
Collaborator Author

@uhm0311 수정 완료 했습니다. 리뷰 진행해주시면 됩니다

Copy link
Collaborator

@uhm0311 uhm0311 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

이외에 리뷰 의견 없습니다.


@Override
public Map<K, V> get(long duration,
TimeUnit units)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indent 맞춰주세요.

@jhpark816
Copy link
Collaborator

@brido4125
리뷰 사항을 모두 반영하였나요?

@brido4125
Copy link
Collaborator Author

@jhpark816

민경님 리뷰까지 반영하여 Rebase하려고 합니다.
먼저 rebase할까요?

@jhpark816
Copy link
Collaborator

@brido4125
현재까지 리뷰된 사항을 PR에 반영만 해 주면 됩니다.

@jhpark816 jhpark816 merged commit d01f72c into naver:develop Jun 27, 2023
1 check passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants