Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

EVA-3260 update block allocation strategy #83

Merged
merged 10 commits into from
Mar 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,10 @@ public void merge(ACCESSION accessionOrigin, ACCESSION mergeInto, String reason)
dbService.merge(accessionOrigin, mergeInto, reason);
}

public void shutDownAccessioning() {
accessionGenerator.shutDownAccessionGenerator();
}

protected AccessionGenerator<MODEL, ACCESSION> getAccessionGenerator() {
return accessionGenerator;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package uk.ac.ebi.ampt2d.commons.accession.core.exceptions;

public class AccessionGeneratorShutDownException extends RuntimeException {
public AccessionGeneratorShutDownException(String msg) {
super(msg);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,9 @@ <HASH> List<AccessionWrapper<MODEL, HASH, ACCESSION>> generateAccessions(Map<HAS
* @param response DB response
*/
void postSave(SaveResponse<ACCESSION> response);

/**
* This method should be used to shut-down the generator and release resources
*/
void shutDownAccessionGenerator();
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,4 +67,8 @@ public static <MODEL> SingleAccessionGenerator<MODEL, String> ofSHA1AccessionGen
return ofHashAccessionGenerator(summaryFunction, new SHA1HashingFunction());
}

@Override
public void shutDownAccessionGenerator() {
// Do nothing - no resources to release
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -168,4 +168,14 @@ public Set<ContiguousIdBlock> recoverState(long[] committedElements) throws Acce
this.availableRanges.addAll(newAvailableRanges);
return doCommit(committedElements);
}

public List<ContiguousIdBlock> getAssignedBlocks(){
return assignedBlocks.stream().collect(Collectors.toList());
}

public void shutDownBlockManager() {
assignedBlocks.clear();
availableRanges.clear();
generatedAccessions.clear();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import uk.ac.ebi.ampt2d.commons.accession.block.initialization.BlockInitializationException;
import uk.ac.ebi.ampt2d.commons.accession.core.exceptions.AccessionCouldNotBeGeneratedException;
import uk.ac.ebi.ampt2d.commons.accession.core.exceptions.AccessionGeneratorShutDownException;
import uk.ac.ebi.ampt2d.commons.accession.core.exceptions.AccessionIsNotPendingException;
import uk.ac.ebi.ampt2d.commons.accession.core.models.AccessionWrapper;
import uk.ac.ebi.ampt2d.commons.accession.core.models.SaveResponse;
Expand Down Expand Up @@ -47,6 +48,8 @@ public class MonotonicAccessionGenerator<MODEL> implements AccessionGenerator<MO
private final String applicationInstanceId;
private final ContiguousIdBlockService blockService;

private boolean SHUTDOWN = false;

public MonotonicAccessionGenerator(String categoryId,
String applicationInstanceId,
ContiguousIdBlockService contiguousIdBlockService,
Expand Down Expand Up @@ -88,8 +91,7 @@ private static BlockManager initializeBlockManager(ContiguousIdBlockService bloc
assertBlockParametersAreInitialized(blockService, categoryId);
BlockManager blockManager = new BlockManager();
List<ContiguousIdBlock> uncompletedBlocks = blockService
.getUncompletedBlocksByCategoryIdAndApplicationInstanceIdOrderByEndAsc(categoryId,
applicationInstanceId);
.reserveUncompletedBlocksForCategoryIdAndApplicationInstanceId(categoryId, applicationInstanceId);
//Insert as available ranges
for (ContiguousIdBlock block : uncompletedBlocks) {
blockManager.addBlock(block);
Expand All @@ -116,6 +118,7 @@ private void recoverState(long[] committedElements) throws AccessionIsNotPending

public synchronized long[] generateAccessions(int numAccessionsToGenerate)
throws AccessionCouldNotBeGeneratedException {
checkAccessionGeneratorNotShutDown();
long[] accessions = new long[numAccessionsToGenerate];
reserveNewBlocksUntilSizeIs(numAccessionsToGenerate);

Expand Down Expand Up @@ -147,20 +150,24 @@ private synchronized void reserveNewBlock(String categoryId, String instanceId)
}

public synchronized void commit(long... accessions) throws AccessionIsNotPendingException {
checkAccessionGeneratorNotShutDown();
blockService.save(blockManager.commit(accessions));
}

public synchronized void release(long... accessions) throws AccessionIsNotPendingException {
checkAccessionGeneratorNotShutDown();
blockManager.release(accessions);
}

public synchronized MonotonicRangePriorityQueue getAvailableRanges() {
checkAccessionGeneratorNotShutDown();
return blockManager.getAvailableRanges();
}

@Override
public <HASH> List<AccessionWrapper<MODEL, HASH, Long>> generateAccessions(Map<HASH, MODEL> messages)
throws AccessionCouldNotBeGeneratedException {
checkAccessionGeneratorNotShutDown();
long[] accessions = generateAccessions(messages.size());
int i = 0;
List<AccessionWrapper<MODEL, HASH, Long>> accessionedModels = new ArrayList<>();
Expand All @@ -174,8 +181,27 @@ public <HASH> List<AccessionWrapper<MODEL, HASH, Long>> generateAccessions(Map<H

@Override
public synchronized void postSave(SaveResponse<Long> response) {
checkAccessionGeneratorNotShutDown();
commit(response.getSavedAccessions().stream().mapToLong(l -> l).toArray());
release(response.getSaveFailedAccessions().stream().mapToLong(l -> l).toArray());
}

public void shutDownAccessionGenerator(){
List<ContiguousIdBlock> blockList = blockManager.getAssignedBlocks();
blockList.stream().forEach(block -> block.releaseReserved());
blockService.save(blockList);
blockManager.shutDownBlockManager();
SHUTDOWN = true;
}

/**
* Before doing any operation on Accession Generator, we need to make sure it has not been shut down.
* We should make the check by calling this method as the first thing in all public methods of this class
*/
private void checkAccessionGeneratorNotShutDown(){
if(SHUTDOWN){
throw new AccessionGeneratorShutDownException("Accession Generator has been shut down and is no longer available");
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import javax.persistence.Index;
import javax.persistence.Table;
import javax.persistence.UniqueConstraint;
import java.time.LocalDateTime;

/**
* This class represents a block allocated by an application instance, in a monotonic sequence associated with a
Expand Down Expand Up @@ -63,6 +64,10 @@ public class ContiguousIdBlock implements Comparable<ContiguousIdBlock> {

private long lastCommitted;

private boolean reserved;

private LocalDateTime createdTimestamp;

// Create / update dates

ContiguousIdBlock() {
Expand All @@ -75,6 +80,8 @@ public ContiguousIdBlock(String categoryId, String applicationInstanceId, long f
this.firstValue = firstValue;
this.lastValue = firstValue + size - 1;
this.lastCommitted = firstValue - 1;
this.reserved = true;
this.createdTimestamp = LocalDateTime.now();
}

/**
Expand Down Expand Up @@ -127,6 +134,10 @@ public long getId() {
return id;
}

public String getCategoryId() {
return categoryId;
}

public long getLastCommitted() {
return lastCommitted;
}
Expand All @@ -135,6 +146,14 @@ public void setLastCommitted(long lastCommitted) {
this.lastCommitted = lastCommitted;
}

public String getApplicationInstanceId() {
return applicationInstanceId;
}

public void setApplicationInstanceId(String applicationInstanceId) {
this.applicationInstanceId = applicationInstanceId;
}

public long getFirstValue() {
return firstValue;
}
Expand All @@ -143,6 +162,26 @@ public long getLastValue() {
return lastValue;
}

public boolean isReserved() {
return reserved == true;
}

public boolean isNotReserved() {
return reserved == false;
}

public void markAsReserved() {
this.reserved = true;
}

public void releaseReserved() {
this.reserved = false;
}

public boolean isFull() {
return lastCommitted == lastValue;
}

public boolean isNotFull() {
return lastCommitted != lastValue;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,18 @@
*/
package uk.ac.ebi.ampt2d.commons.accession.persistence.jpa.monotonic.repositories;

import org.springframework.data.jpa.repository.Query;
import org.springframework.data.repository.CrudRepository;
import org.springframework.data.repository.query.Param;
import org.springframework.stereotype.Repository;
import uk.ac.ebi.ampt2d.commons.accession.persistence.jpa.monotonic.entities.ContiguousIdBlock;

import java.util.stream.Stream;
import java.util.List;

@Repository
public interface ContiguousIdBlockRepository extends CrudRepository<ContiguousIdBlock, Long> {

ContiguousIdBlock findFirstByCategoryIdAndApplicationInstanceIdOrderByLastValueDesc(String categoryId,
String instanceId);

Stream<ContiguousIdBlock> findAllByCategoryIdAndApplicationInstanceIdOrderByLastValueAsc(String categoryId,
String instanceId);
@Query("SELECT cib FROM ContiguousIdBlock cib WHERE cib.categoryId = :categoryId AND cib.lastCommitted != cib.lastValue AND (cib.reserved IS NULL OR cib.reserved IS FALSE) ORDER BY cib.lastValue asc")
List<ContiguousIdBlock> findUncompletedAndUnreservedBlocksOrderByLastValueAsc(@Param("categoryId") String categoryId);

ContiguousIdBlock findFirstByCategoryIdOrderByLastValueDesc(String categoryId);
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,32 @@
import javax.persistence.PersistenceContext;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/**
* The ContiguousIdBlockService is used by AccessionGenerator to enter/update block information in DB.
*
* In case of multiprocessing, we need to make sure a block is used by only one AccessionGenerator at any point of time.
* To prevent a block from being used by multiple AccessionGenerator, we mark the block as reserved (using column reserved)
* when they are in use by an AccessionGenerator. A block, marked as reserved implies it is currently being used by an
* AccessionGenerator and should not be picked up for use by any other AccessionGenerator.
*
* Whenever an AccessionGenerator asks for a block from the ContiguousIdBlockService for using the accessions in it, we
* should reserve the block for the calling Accession Generator.
*
* Existing Uncompleted Blocks
* When an AccessionGenerator starts, it asks for Uncompleted Blocks, in order to use the remaining accessions
* in them. As these blocks, will be used by the calling AccessionGenerator, we need to explicitly mark them
* as reserved in DB.
* (see method @reserveUncompletedBlocksByCategoryIdAndApplicationInstanceIdOrderByEndAsc)
* New Block
* When an AccessionGenerator asks for a new block, we create a new block with correct values (based on the given
* parameters and existing blocks) and save it in DB. A newly created block is implicitly marked as reserved.
* (see method @reserveNewBlock)
*
* Also, when saving the blocks, we need to check for the block's last committed value.
* If it's last committed value is same as last value, we should release the block in DB
*
*/
nitin-ebi marked this conversation as resolved.
Show resolved Hide resolved
public class ContiguousIdBlockService {

private ContiguousIdBlockRepository repository;
Expand All @@ -47,6 +70,8 @@ public ContiguousIdBlockService(ContiguousIdBlockRepository repository, Map<Stri

@Transactional
public void save(Iterable<ContiguousIdBlock> blocks) {
// release block if full
blocks.forEach(block -> {if (block.isFull()) {block.releaseReserved();}});
repository.saveAll(blocks);
entityManager.flush();
}
Expand Down Expand Up @@ -74,12 +99,15 @@ public BlockParameters getBlockParameters(String categoryId) {
return categoryBlockInitializations.get(categoryId);
}

@Transactional(readOnly = true)
public List<ContiguousIdBlock> getUncompletedBlocksByCategoryIdAndApplicationInstanceIdOrderByEndAsc(
String categoryId, String applicationInstanceId) {
try (Stream<ContiguousIdBlock> reservedBlocksOfThisInstance = repository
.findAllByCategoryIdAndApplicationInstanceIdOrderByLastValueAsc(categoryId, applicationInstanceId)) {
return reservedBlocksOfThisInstance.filter(ContiguousIdBlock::isNotFull).collect(Collectors.toList());
}
@Transactional(isolation = Isolation.SERIALIZABLE)
public List<ContiguousIdBlock> reserveUncompletedBlocksForCategoryIdAndApplicationInstanceId(String categoryId, String applicationInstanceId) {
List<ContiguousIdBlock> blockList = repository.findUncompletedAndUnreservedBlocksOrderByLastValueAsc(categoryId);
blockList.stream().forEach(block -> {
block.setApplicationInstanceId(applicationInstanceId);
block.markAsReserved();
});
save(blockList);
return blockList;
}

}
Loading
Loading