Skip to content

Commit

Permalink
Added recovery agent for running recovery of blocks
Browse files Browse the repository at this point in the history
  • Loading branch information
nitin-ebi committed May 8, 2024
1 parent 905b266 commit ccad0fb
Show file tree
Hide file tree
Showing 5 changed files with 220 additions and 6 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package uk.ac.ebi.ampt2d.commons.accession.generators.monotonic;

import uk.ac.ebi.ampt2d.commons.accession.persistence.jpa.monotonic.entities.ContiguousIdBlock;
import uk.ac.ebi.ampt2d.commons.accession.persistence.jpa.monotonic.service.ContiguousIdBlockService;
import uk.ac.ebi.ampt2d.commons.accession.persistence.jpa.monotonic.service.MonotonicDatabaseService;

import java.time.LocalDateTime;
import java.util.Collections;
import java.util.List;
import java.util.Set;

public class MonotonicAccessionRecoveryAgent {
private final ContiguousIdBlockService blockService;
private final MonotonicDatabaseService monotonicDatabaseService;

public MonotonicAccessionRecoveryAgent(ContiguousIdBlockService blockService,
MonotonicDatabaseService monotonicDatabaseService) {
this.blockService = blockService;
this.monotonicDatabaseService = monotonicDatabaseService;
}

public void runRecovery(String categoryId, String applicationInstanceId, LocalDateTime lastUpdatedTime) {
List<ContiguousIdBlock> blocksToRecover = blockService.allBlocksForCategoryIdReservedBeforeTheGivenTimeFrame(categoryId, lastUpdatedTime);
for (ContiguousIdBlock block : blocksToRecover) {
// if block is already complete, there is nothing to recover, just release the block
if (block.getLastCommitted() == block.getLastValue()) {
setAppInstanceIdAndReleaseBlock(applicationInstanceId, block);
continue;
}

// run recover state for a block using BlockManager's recover state method
Set<ContiguousIdBlock> blockSet = recoverStateForBlock(block);

if (blockSet.isEmpty()) {
// if block's last committed is correctly set, BlockManager's recover method will return an empty set
// in this case, we just need to release the block
setAppInstanceIdAndReleaseBlock(applicationInstanceId, block);
} else {
ContiguousIdBlock blockToUpdate = blockSet.iterator().next();
setAppInstanceIdAndReleaseBlock(applicationInstanceId, blockToUpdate);
}
}
}

private Set<ContiguousIdBlock> recoverStateForBlock(ContiguousIdBlock block) {
BlockManager blockManager = new BlockManager();
blockManager.addBlock(block);
MonotonicRange monotonicRange = blockManager.getAvailableRanges().poll();
long[] committedElements = monotonicDatabaseService.getAccessionsInRanges(Collections.singletonList(monotonicRange));
return blockManager.recoverState(committedElements);
}

private void setAppInstanceIdAndReleaseBlock(String applicationInstanceId, ContiguousIdBlock block) {
block.setApplicationInstanceId(applicationInstanceId);
block.releaseReserved();
blockService.save(block);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.springframework.stereotype.Repository;
import uk.ac.ebi.ampt2d.commons.accession.persistence.jpa.monotonic.entities.ContiguousIdBlock;

import java.time.LocalDateTime;
import java.util.List;

@Repository
Expand All @@ -31,4 +32,7 @@ public interface ContiguousIdBlockRepository extends CrudRepository<ContiguousId
List<ContiguousIdBlock> findUncompletedAndUnreservedBlocksOrderByLastValueAsc(@Param("categoryId") String categoryId);

ContiguousIdBlock findFirstByCategoryIdOrderByLastValueDesc(String categoryId);

List<ContiguousIdBlock> findByCategoryIdAndReservedIsTrueAndLastUpdatedTimestampLessThanEqualOrderByLastValueAsc(
String categoryId, LocalDateTime lastUpdatedTime);
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

import javax.persistence.EntityManager;
import javax.persistence.PersistenceContext;
import java.time.LocalDateTime;
import java.util.List;
import java.util.Map;

Expand Down Expand Up @@ -76,6 +77,16 @@ public void save(Iterable<ContiguousIdBlock> blocks) {
entityManager.flush();
}

@Transactional
public void save(ContiguousIdBlock block) {
// release block if full
if (block.isFull()) {
block.releaseReserved();
}
repository.save(block);
entityManager.flush();
}

@Transactional(isolation = Isolation.SERIALIZABLE)
public ContiguousIdBlock reserveNewBlock(String categoryId, String instanceId) {
ContiguousIdBlock lastBlock = repository.findFirstByCategoryIdOrderByLastValueDesc(categoryId);
Expand Down Expand Up @@ -110,4 +121,9 @@ public List<ContiguousIdBlock> reserveUncompletedBlocksForCategoryIdAndApplicati
return blockList;
}

public List<ContiguousIdBlock> allBlocksForCategoryIdReservedBeforeTheGivenTimeFrame(String categoryId,
LocalDateTime lastUpdatedTimeStamp) {
return repository.findByCategoryIdAndReservedIsTrueAndLastUpdatedTimestampLessThanEqualOrderByLastValueAsc(categoryId, lastUpdatedTimeStamp);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package uk.ac.ebi.ampt2d.commons.accession.generators.monotonic;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.autoconfigure.orm.jpa.DataJpaTest;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringRunner;
import uk.ac.ebi.ampt2d.commons.accession.core.models.AccessionWrapper;
import uk.ac.ebi.ampt2d.commons.accession.persistence.jpa.monotonic.entities.ContiguousIdBlock;
import uk.ac.ebi.ampt2d.commons.accession.persistence.jpa.monotonic.repositories.ContiguousIdBlockRepository;
import uk.ac.ebi.ampt2d.commons.accession.persistence.jpa.monotonic.service.ContiguousIdBlockService;
import uk.ac.ebi.ampt2d.commons.accession.service.BasicSpringDataRepositoryMonotonicDatabaseService;
import uk.ac.ebi.ampt2d.test.configuration.MonotonicAccessionGeneratorTestConfiguration;
import uk.ac.ebi.ampt2d.test.configuration.TestMonotonicDatabaseServiceTestConfiguration;
import uk.ac.ebi.ampt2d.test.models.TestModel;
import uk.ac.ebi.ampt2d.test.persistence.TestMonotonicEntity;

import java.time.LocalDateTime;
import java.util.Comparator;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.LongStream;
import java.util.stream.StreamSupport;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

@RunWith(SpringRunner.class)
@DataJpaTest
@ContextConfiguration(classes = {MonotonicAccessionGeneratorTestConfiguration.class, TestMonotonicDatabaseServiceTestConfiguration.class})
public class MonotonicAccessionRecoveryAgentTest {
private static final String TEST_CATEGORY = "TEST_CATEGORY";
private static final String TEST_APP_INSTANCE_ID = "TEST_APP_INSTANCE_ID";
private static final String TEST_RECOVERY_AGENT_APP_INSTANCE_ID = "TEST_RECOVERY_AGENT_APP_INSTANCE_ID";

@Autowired
private BasicSpringDataRepositoryMonotonicDatabaseService<TestModel, TestMonotonicEntity> monotonicDBService;
@Autowired
private ContiguousIdBlockRepository repository;
@Autowired
private ContiguousIdBlockService service;

@Test
public void testRunRecovery() throws InterruptedException {
// block1 does not have any accessions used
ContiguousIdBlock block1 = new ContiguousIdBlock(TEST_CATEGORY, TEST_APP_INSTANCE_ID, 0, 100);
repository.save(block1);

// block2 is full and has all accessions used
ContiguousIdBlock block2 = new ContiguousIdBlock(TEST_CATEGORY, TEST_APP_INSTANCE_ID, 100, 100);
block2.setLastCommitted(199);
repository.save(block2);

// block3 has some of the accessions used but not captured in the block's table
ContiguousIdBlock block3 = new ContiguousIdBlock(TEST_CATEGORY, TEST_APP_INSTANCE_ID, 200, 100);
repository.save(block3);
// save some accessions in db that are not captured in block3
List<AccessionWrapper<TestModel, String, Long>> accessionsSet = LongStream.range(200l, 225l)
.boxed()
.map(longAcc -> new AccessionWrapper<>(longAcc, "hash-1" + longAcc, TestModel.of("test-obj-1-" + longAcc)))
.collect(Collectors.toList());
monotonicDBService.save(accessionsSet);

// block4 should not be recovered as it is after the recover cut off time
Thread.sleep(2000);
ContiguousIdBlock block4 = new ContiguousIdBlock(TEST_CATEGORY, TEST_APP_INSTANCE_ID, 300, 100);
repository.save(block4);

// run recovery through recovery agent
LocalDateTime recoverCutOffTime = block3.getLastUpdatedTimestamp();
MonotonicAccessionRecoveryAgent recoveryAgent = new MonotonicAccessionRecoveryAgent(service, monotonicDBService);
recoveryAgent.runRecovery(TEST_CATEGORY, TEST_RECOVERY_AGENT_APP_INSTANCE_ID, recoverCutOffTime);


List<ContiguousIdBlock> blockList = StreamSupport.stream(repository.findAll().spliterator(), false)
.sorted(Comparator.comparing(ContiguousIdBlock::getFirstValue))
.collect(Collectors.toList());
assertEquals(4, blockList.size());

assertEquals(TEST_RECOVERY_AGENT_APP_INSTANCE_ID, blockList.get(0).getApplicationInstanceId());
assertEquals(0, blockList.get(0).getFirstValue());
assertEquals(-1, blockList.get(0).getLastCommitted());
assertTrue(blockList.get(0).isNotReserved());

assertEquals(TEST_RECOVERY_AGENT_APP_INSTANCE_ID, blockList.get(1).getApplicationInstanceId());
assertEquals(100, blockList.get(1).getFirstValue());
assertEquals(199, blockList.get(1).getLastCommitted());
assertTrue(blockList.get(1).isNotReserved());

assertEquals(TEST_RECOVERY_AGENT_APP_INSTANCE_ID, blockList.get(2).getApplicationInstanceId());
assertEquals(200, blockList.get(2).getFirstValue());
assertEquals(224, blockList.get(2).getLastCommitted());
assertTrue(blockList.get(2).isNotReserved());

assertEquals(TEST_APP_INSTANCE_ID, blockList.get(3).getApplicationInstanceId());
assertEquals(300, blockList.get(3).getFirstValue());
assertEquals(299, blockList.get(3).getLastCommitted());
assertTrue(blockList.get(3).isReserved());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
import java.time.LocalDateTime;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThrows;
Expand Down Expand Up @@ -275,8 +277,10 @@ public void testLastUpdateTimeStampAutoUpdate() {
entityManager.flush();

// assert block values
ContiguousIdBlock blockInDB = repository.findById(1L).get();
assertEquals(1L, blockInDB.getId());
List<ContiguousIdBlock> blockInDBList = StreamSupport.stream(repository.findAll().spliterator(), false)
.collect(Collectors.toList());
assertEquals(1, blockInDBList.size());
ContiguousIdBlock blockInDB = blockInDBList.get(0);
assertEquals(CATEGORY_ID, blockInDB.getCategoryId());
assertEquals(INSTANCE_ID, blockInDB.getApplicationInstanceId());
assertEquals(100, blockInDB.getFirstValue());
Expand All @@ -289,7 +293,7 @@ public void testLastUpdateTimeStampAutoUpdate() {
block.setLastCommitted(100);
repository.save(block);
entityManager.flush();
blockInDB = repository.findById(1L).get();
blockInDB = repository.findAll().iterator().next();
assertEquals(100, blockInDB.getLastCommitted());

LocalDateTime blockLastCommittedUpdateTime = blockInDB.getLastUpdatedTimestamp();
Expand All @@ -298,7 +302,7 @@ public void testLastUpdateTimeStampAutoUpdate() {
block.setApplicationInstanceId(INSTANCE_ID_2);
repository.save(block);
entityManager.flush();
blockInDB = repository.findById(1L).get();
blockInDB = repository.findAll().iterator().next();
assertEquals(INSTANCE_ID_2, blockInDB.getApplicationInstanceId());

LocalDateTime blockApplicationInstanceUpdateTime = blockInDB.getLastUpdatedTimestamp();
Expand All @@ -307,7 +311,7 @@ public void testLastUpdateTimeStampAutoUpdate() {
block.releaseReserved();
repository.save(block);
entityManager.flush();
blockInDB = repository.findById(1L).get();
blockInDB = repository.findAll().iterator().next();
assertTrue(blockInDB.isNotReserved());

LocalDateTime blockReleaseAsReservedUpdateTime = blockInDB.getLastUpdatedTimestamp();
Expand All @@ -316,7 +320,7 @@ public void testLastUpdateTimeStampAutoUpdate() {
block.markAsReserved();
repository.save(block);
entityManager.flush();
blockInDB = repository.findById(1L).get();
blockInDB = repository.findAll().iterator().next();
assertTrue(blockInDB.isReserved());

LocalDateTime blockMarkAsReservedUpdateTime = blockInDB.getLastUpdatedTimestamp();
Expand All @@ -327,4 +331,34 @@ public void testLastUpdateTimeStampAutoUpdate() {
assertTrue(blockReleaseAsReservedUpdateTime.isBefore(blockMarkAsReservedUpdateTime));
}

@Test
public void testGetBlocksWithLastUpdatedTimeStampLessThan() throws InterruptedException {
// reserved
ContiguousIdBlock block1 = new ContiguousIdBlock(CATEGORY_ID, INSTANCE_ID, 0, 100);
// reserved
ContiguousIdBlock block2 = new ContiguousIdBlock(CATEGORY_ID, INSTANCE_ID, 100, 100);
// not reserved
ContiguousIdBlock block3 = getUnreservedContiguousIdBlock(CATEGORY_ID, INSTANCE_ID, 200, 100);
// reserved but different category
ContiguousIdBlock block4 = new ContiguousIdBlock(CATEGORY_ID_2, INSTANCE_ID, 300, 100);
// reserved but after timestamp
Thread.sleep(2000L);
ContiguousIdBlock block5 = new ContiguousIdBlock(CATEGORY_ID, INSTANCE_ID, 400, 100);
repository.save(block1);
repository.save(block2);
repository.save(block3);
repository.save(block4);
repository.save(block5);
entityManager.flush();

LocalDateTime cutOffTimestamp = block4.getLastUpdatedTimestamp();
List<ContiguousIdBlock> blocksList = service.allBlocksForCategoryIdReservedBeforeTheGivenTimeFrame(CATEGORY_ID, cutOffTimestamp);

assertEquals(2, blocksList.size());
assertTrue(blocksList.get(0).isReserved());
assertEquals(0, blocksList.get(0).getFirstValue());
assertTrue(blocksList.get(1).isReserved());
assertEquals(100, blocksList.get(1).getFirstValue());
}

}

0 comments on commit ccad0fb

Please sign in to comment.