Skip to content

Commit

Permalink
Merge pull request #83 from nitin-ebi/block-allocation-strategy
Browse files Browse the repository at this point in the history
EVA-3260 update block allocation strategy
  • Loading branch information
nitin-ebi authored Mar 21, 2024
2 parents 99d6d49 + 5f2f724 commit c0a5c31
Show file tree
Hide file tree
Showing 16 changed files with 457 additions and 80 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,10 @@ public void merge(ACCESSION accessionOrigin, ACCESSION mergeInto, String reason)
dbService.merge(accessionOrigin, mergeInto, reason);
}

public void shutDownAccessioning() {
accessionGenerator.shutDownAccessionGenerator();
}

protected AccessionGenerator<MODEL, ACCESSION> getAccessionGenerator() {
return accessionGenerator;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package uk.ac.ebi.ampt2d.commons.accession.core.exceptions;

public class AccessionGeneratorShutDownException extends RuntimeException {
public AccessionGeneratorShutDownException(String msg) {
super(msg);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,9 @@ <HASH> List<AccessionWrapper<MODEL, HASH, ACCESSION>> generateAccessions(Map<HAS
* @param response DB response
*/
void postSave(SaveResponse<ACCESSION> response);

/**
* This method should be used to shut-down the generator and release resources
*/
void shutDownAccessionGenerator();
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,4 +67,8 @@ public static <MODEL> SingleAccessionGenerator<MODEL, String> ofSHA1AccessionGen
return ofHashAccessionGenerator(summaryFunction, new SHA1HashingFunction());
}

@Override
public void shutDownAccessionGenerator() {
// Do nothing - no resources to release
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -168,4 +168,14 @@ public Set<ContiguousIdBlock> recoverState(long[] committedElements) throws Acce
this.availableRanges.addAll(newAvailableRanges);
return doCommit(committedElements);
}

public List<ContiguousIdBlock> getAssignedBlocks(){
return assignedBlocks.stream().collect(Collectors.toList());
}

public void shutDownBlockManager() {
assignedBlocks.clear();
availableRanges.clear();
generatedAccessions.clear();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import uk.ac.ebi.ampt2d.commons.accession.block.initialization.BlockInitializationException;
import uk.ac.ebi.ampt2d.commons.accession.core.exceptions.AccessionCouldNotBeGeneratedException;
import uk.ac.ebi.ampt2d.commons.accession.core.exceptions.AccessionGeneratorShutDownException;
import uk.ac.ebi.ampt2d.commons.accession.core.exceptions.AccessionIsNotPendingException;
import uk.ac.ebi.ampt2d.commons.accession.core.models.AccessionWrapper;
import uk.ac.ebi.ampt2d.commons.accession.core.models.SaveResponse;
Expand Down Expand Up @@ -47,6 +48,8 @@ public class MonotonicAccessionGenerator<MODEL> implements AccessionGenerator<MO
private final String applicationInstanceId;
private final ContiguousIdBlockService blockService;

private boolean SHUTDOWN = false;

public MonotonicAccessionGenerator(String categoryId,
String applicationInstanceId,
ContiguousIdBlockService contiguousIdBlockService,
Expand Down Expand Up @@ -88,8 +91,7 @@ private static BlockManager initializeBlockManager(ContiguousIdBlockService bloc
assertBlockParametersAreInitialized(blockService, categoryId);
BlockManager blockManager = new BlockManager();
List<ContiguousIdBlock> uncompletedBlocks = blockService
.getUncompletedBlocksByCategoryIdAndApplicationInstanceIdOrderByEndAsc(categoryId,
applicationInstanceId);
.reserveUncompletedBlocksForCategoryIdAndApplicationInstanceId(categoryId, applicationInstanceId);
//Insert as available ranges
for (ContiguousIdBlock block : uncompletedBlocks) {
blockManager.addBlock(block);
Expand All @@ -116,6 +118,7 @@ private void recoverState(long[] committedElements) throws AccessionIsNotPending

public synchronized long[] generateAccessions(int numAccessionsToGenerate)
throws AccessionCouldNotBeGeneratedException {
checkAccessionGeneratorNotShutDown();
long[] accessions = new long[numAccessionsToGenerate];
reserveNewBlocksUntilSizeIs(numAccessionsToGenerate);

Expand Down Expand Up @@ -147,20 +150,24 @@ private synchronized void reserveNewBlock(String categoryId, String instanceId)
}

public synchronized void commit(long... accessions) throws AccessionIsNotPendingException {
checkAccessionGeneratorNotShutDown();
blockService.save(blockManager.commit(accessions));
}

public synchronized void release(long... accessions) throws AccessionIsNotPendingException {
checkAccessionGeneratorNotShutDown();
blockManager.release(accessions);
}

public synchronized MonotonicRangePriorityQueue getAvailableRanges() {
checkAccessionGeneratorNotShutDown();
return blockManager.getAvailableRanges();
}

@Override
public <HASH> List<AccessionWrapper<MODEL, HASH, Long>> generateAccessions(Map<HASH, MODEL> messages)
throws AccessionCouldNotBeGeneratedException {
checkAccessionGeneratorNotShutDown();
long[] accessions = generateAccessions(messages.size());
int i = 0;
List<AccessionWrapper<MODEL, HASH, Long>> accessionedModels = new ArrayList<>();
Expand All @@ -174,8 +181,27 @@ public <HASH> List<AccessionWrapper<MODEL, HASH, Long>> generateAccessions(Map<H

@Override
public synchronized void postSave(SaveResponse<Long> response) {
checkAccessionGeneratorNotShutDown();
commit(response.getSavedAccessions().stream().mapToLong(l -> l).toArray());
release(response.getSaveFailedAccessions().stream().mapToLong(l -> l).toArray());
}

public void shutDownAccessionGenerator(){
List<ContiguousIdBlock> blockList = blockManager.getAssignedBlocks();
blockList.stream().forEach(block -> block.releaseReserved());
blockService.save(blockList);
blockManager.shutDownBlockManager();
SHUTDOWN = true;
}

/**
* Before doing any operation on Accession Generator, we need to make sure it has not been shut down.
* We should make the check by calling this method as the first thing in all public methods of this class
*/
private void checkAccessionGeneratorNotShutDown(){
if(SHUTDOWN){
throw new AccessionGeneratorShutDownException("Accession Generator has been shut down and is no longer available");
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import javax.persistence.Index;
import javax.persistence.Table;
import javax.persistence.UniqueConstraint;
import java.time.LocalDateTime;

/**
* This class represents a block allocated by an application instance, in a monotonic sequence associated with a
Expand Down Expand Up @@ -63,6 +64,10 @@ public class ContiguousIdBlock implements Comparable<ContiguousIdBlock> {

private long lastCommitted;

private boolean reserved;

private LocalDateTime createdTimestamp;

// Create / update dates

ContiguousIdBlock() {
Expand All @@ -75,6 +80,8 @@ public ContiguousIdBlock(String categoryId, String applicationInstanceId, long f
this.firstValue = firstValue;
this.lastValue = firstValue + size - 1;
this.lastCommitted = firstValue - 1;
this.reserved = true;
this.createdTimestamp = LocalDateTime.now();
}

/**
Expand Down Expand Up @@ -127,6 +134,10 @@ public long getId() {
return id;
}

public String getCategoryId() {
return categoryId;
}

public long getLastCommitted() {
return lastCommitted;
}
Expand All @@ -135,6 +146,14 @@ public void setLastCommitted(long lastCommitted) {
this.lastCommitted = lastCommitted;
}

public String getApplicationInstanceId() {
return applicationInstanceId;
}

public void setApplicationInstanceId(String applicationInstanceId) {
this.applicationInstanceId = applicationInstanceId;
}

public long getFirstValue() {
return firstValue;
}
Expand All @@ -143,6 +162,26 @@ public long getLastValue() {
return lastValue;
}

public boolean isReserved() {
return reserved == true;
}

public boolean isNotReserved() {
return reserved == false;
}

public void markAsReserved() {
this.reserved = true;
}

public void releaseReserved() {
this.reserved = false;
}

public boolean isFull() {
return lastCommitted == lastValue;
}

public boolean isNotFull() {
return lastCommitted != lastValue;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,18 @@
*/
package uk.ac.ebi.ampt2d.commons.accession.persistence.jpa.monotonic.repositories;

import org.springframework.data.jpa.repository.Query;
import org.springframework.data.repository.CrudRepository;
import org.springframework.data.repository.query.Param;
import org.springframework.stereotype.Repository;
import uk.ac.ebi.ampt2d.commons.accession.persistence.jpa.monotonic.entities.ContiguousIdBlock;

import java.util.stream.Stream;
import java.util.List;

@Repository
public interface ContiguousIdBlockRepository extends CrudRepository<ContiguousIdBlock, Long> {

ContiguousIdBlock findFirstByCategoryIdAndApplicationInstanceIdOrderByLastValueDesc(String categoryId,
String instanceId);

Stream<ContiguousIdBlock> findAllByCategoryIdAndApplicationInstanceIdOrderByLastValueAsc(String categoryId,
String instanceId);
@Query("SELECT cib FROM ContiguousIdBlock cib WHERE cib.categoryId = :categoryId AND cib.lastCommitted != cib.lastValue AND (cib.reserved IS NULL OR cib.reserved IS FALSE) ORDER BY cib.lastValue asc")
List<ContiguousIdBlock> findUncompletedAndUnreservedBlocksOrderByLastValueAsc(@Param("categoryId") String categoryId);

ContiguousIdBlock findFirstByCategoryIdOrderByLastValueDesc(String categoryId);
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,32 @@
import javax.persistence.PersistenceContext;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/**
* The ContiguousIdBlockService is used by AccessionGenerator to enter/update block information in DB.
*
* In case of multiprocessing, we need to make sure a block is used by only one AccessionGenerator at any point of time.
* To prevent a block from being used by multiple AccessionGenerator, we mark the block as reserved (using column reserved)
* when they are in use by an AccessionGenerator. A block, marked as reserved implies it is currently being used by an
* AccessionGenerator and should not be picked up for use by any other AccessionGenerator.
*
* Whenever an AccessionGenerator asks for a block from the ContiguousIdBlockService for using the accessions in it, we
* should reserve the block for the calling Accession Generator.
*
* Existing Uncompleted Blocks
* When an AccessionGenerator starts, it asks for Uncompleted Blocks, in order to use the remaining accessions
* in them. As these blocks, will be used by the calling AccessionGenerator, we need to explicitly mark them
* as reserved in DB.
* (see method @reserveUncompletedBlocksByCategoryIdAndApplicationInstanceIdOrderByEndAsc)
* New Block
* When an AccessionGenerator asks for a new block, we create a new block with correct values (based on the given
* parameters and existing blocks) and save it in DB. A newly created block is implicitly marked as reserved.
* (see method @reserveNewBlock)
*
* Also, when saving the blocks, we need to check for the block's last committed value.
* If it's last committed value is same as last value, we should release the block in DB
*
*/
public class ContiguousIdBlockService {

private ContiguousIdBlockRepository repository;
Expand All @@ -47,6 +70,8 @@ public ContiguousIdBlockService(ContiguousIdBlockRepository repository, Map<Stri

@Transactional
public void save(Iterable<ContiguousIdBlock> blocks) {
// release block if full
blocks.forEach(block -> {if (block.isFull()) {block.releaseReserved();}});
repository.saveAll(blocks);
entityManager.flush();
}
Expand Down Expand Up @@ -74,12 +99,15 @@ public BlockParameters getBlockParameters(String categoryId) {
return categoryBlockInitializations.get(categoryId);
}

@Transactional(readOnly = true)
public List<ContiguousIdBlock> getUncompletedBlocksByCategoryIdAndApplicationInstanceIdOrderByEndAsc(
String categoryId, String applicationInstanceId) {
try (Stream<ContiguousIdBlock> reservedBlocksOfThisInstance = repository
.findAllByCategoryIdAndApplicationInstanceIdOrderByLastValueAsc(categoryId, applicationInstanceId)) {
return reservedBlocksOfThisInstance.filter(ContiguousIdBlock::isNotFull).collect(Collectors.toList());
}
@Transactional(isolation = Isolation.SERIALIZABLE)
public List<ContiguousIdBlock> reserveUncompletedBlocksForCategoryIdAndApplicationInstanceId(String categoryId, String applicationInstanceId) {
List<ContiguousIdBlock> blockList = repository.findUncompletedAndUnreservedBlocksOrderByLastValueAsc(categoryId);
blockList.stream().forEach(block -> {
block.setApplicationInstanceId(applicationInstanceId);
block.markAsReserved();
});
save(blockList);
return blockList;
}

}
Loading

0 comments on commit c0a5c31

Please sign in to comment.