Skip to content

Commit

Permalink
Address PR comments
Browse files Browse the repository at this point in the history
Signed-off-by: Gaurav Bafna <[email protected]>
  • Loading branch information
gbbafna committed Oct 1, 2024
1 parent 0503f63 commit d045995
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -796,6 +796,8 @@ public void testCreateSnapshotV2_Orphan_Timestamp_Cleanup() throws Exception {
RemoteStorePinnedTimestampService.class,
internalCluster().getClusterManagerName()
);
forceSyncPinnedTimestamps();

long pinnedTimestamp = System.currentTimeMillis();
final CountDownLatch latch = new CountDownLatch(1);
LatchedActionListener<Void> latchedActionListener = new LatchedActionListener<>(new ActionListener<>() {
Expand All @@ -819,7 +821,6 @@ public void onFailure(Exception e) {}
assertThat(snapshotInfo.successfulShards(), equalTo(snapshotInfo.totalShards()));
assertThat(snapshotInfo.getPinnedTimestamp(), greaterThan(0L));

forceSyncPinnedTimestamps();
waitUntil(() -> 1 == RemoteStorePinnedTimestampService.getPinnedEntities().size());
}

Expand Down Expand Up @@ -1106,7 +1107,7 @@ public void testCreateSnapshotV2() throws Exception {
.get()
.status();
assertEquals(RestStatus.ACCEPTED, createSnapshotResponseStatus);

forceSyncPinnedTimestamps();
assertEquals(2, RemoteStorePinnedTimestampService.getPinnedEntities().size());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
public class RemoteStorePinnedTimestampService implements Closeable {
private static final Logger logger = LogManager.getLogger(RemoteStorePinnedTimestampService.class);
private static Tuple<Long, Set<Long>> pinnedTimestampsSet = new Tuple<>(-1L, Set.of());
private static HashMap<String, List<Long>> pinnedEntityToTimestampsMap = new HashMap<>();
private static Map<String, List<Long>> pinnedEntityToTimestampsMap = new HashMap<>();
public static final String PINNED_TIMESTAMPS_PATH_TOKEN = "pinned_timestamps";
public static final String PINNED_TIMESTAMPS_FILENAME_SEPARATOR = "__";

Expand Down Expand Up @@ -276,7 +276,7 @@ public static Tuple<Long, Set<Long>> getPinnedTimestamps() {
return pinnedTimestampsSet;
}

public static HashMap<String, List<Long>> getPinnedEntities() {
public static Map<String, List<Long>> getPinnedEntities() {
return pinnedEntityToTimestampsMap;
}

Expand Down Expand Up @@ -311,20 +311,16 @@ protected void runInternal() {

logger.debug("Fetched pinned timestamps from remote store: {} - {}", triggerTimestamp, pinnedTimestamps);
pinnedTimestampsSet = new Tuple<>(triggerTimestamp, pinnedTimestamps);

pinnedEntityToTimestampsMap = new HashMap<>();
pinnedEntityToTimestampsMap.putAll(
pinnedTimestampList.keySet()
.stream()
.collect(Collectors.toMap(RemoteStorePinnedTimestampService.this::getEntityFromBlobName, blobName -> {
long timestamp = RemoteStorePinnedTimestampService.this.getTimestampFromBlobName(blobName);
return Collections.singletonList(timestamp);
}, (existingList, newList) -> {
List<Long> mergedList = new ArrayList<>(existingList);
mergedList.addAll(newList);
return mergedList;
}))
);
pinnedEntityToTimestampsMap = pinnedTimestampList.keySet()
.stream()
.collect(Collectors.toMap(RemoteStorePinnedTimestampService.this::getEntityFromBlobName, blobName -> {
long timestamp = RemoteStorePinnedTimestampService.this.getTimestampFromBlobName(blobName);
return Collections.singletonList(timestamp);
}, (existingList, newList) -> {
List<Long> mergedList = new ArrayList<>(existingList);
mergedList.addAll(newList);
return mergedList;
}));
} catch (Throwable t) {
logger.error("Exception while fetching pinned timestamp details", t);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -654,10 +654,8 @@ public TimeValue timeout() {
}

private void cleanOrphanTimestamp(String repoName, RepositoryData repositoryData) {
Collection<String> snapshotUUIDs = repositoryData.getSnapshotIds().stream().map(SnapshotId::getUUID).collect(Collectors.toList());
remoteStorePinnedTimestampService.forceSyncPinnedTimestamps();

HashMap<String, List<Long>> pinnedEntities = RemoteStorePinnedTimestampService.getPinnedEntities();
Collection<String> snapshotUUIDs = repositoryData.getSnapshotIds().stream().map(SnapshotId::getUUID).collect(Collectors.toSet());
Map<String, List<Long>> pinnedEntities = RemoteStorePinnedTimestampService.getPinnedEntities();

List<String> orphanPinnedEntities = pinnedEntities.keySet()
.stream()
Expand All @@ -682,15 +680,14 @@ private boolean isOrphanPinnedEntity(String repoName, Collection<String> snapsho
return Objects.equals(tokens.v1(), repoName) && snapshotUUIDs.contains(tokens.v2()) == false;
}

private void deleteOrphanTimestamps(HashMap<String, List<Long>> pinnedEntities, List<String> orphanPinnedEntities) {
private void deleteOrphanTimestamps(Map<String, List<Long>> pinnedEntities, List<String> orphanPinnedEntities) {
final CountDownLatch latch = new CountDownLatch(orphanPinnedEntities.size());
for (String pinnedEntity : orphanPinnedEntities) {
assert pinnedEntities.get(pinnedEntity).size() == 1 : "Multiple timestamps for same repo-snapshot uuid found";
long orphanTimestamp = pinnedEntities.get(pinnedEntity).get(0);
String pinningEntity = getPinningEntity(pinnedEntity);
remoteStorePinnedTimestampService.unpinTimestamp(
orphanTimestamp,
pinningEntity,
pinnedEntity,
new LatchedActionListener<>(new ActionListener<>() {
@Override
public void onResponse(Void unused) {}
Expand Down Expand Up @@ -763,11 +760,6 @@ public static String getPinningEntity(String repositoryName, String snapshotUUID
return repositoryName + SNAPSHOT_PINNED_TIMESTAMP_DELIMITER + snapshotUUID;
}

public static String getPinningEntity(String blobname) {
String[] tokens = blobname.split(SNAPSHOT_PINNED_TIMESTAMP_DELIMITER);
return tokens[0] + SNAPSHOT_PINNED_TIMESTAMP_DELIMITER + tokens[1];
}

public static Tuple<String, String> getRepoSnapshotUUIDTuple(String pinningEntity) {
String[] tokens = pinningEntity.split(SNAPSHOT_PINNED_TIMESTAMP_DELIMITER);
return new Tuple<>(tokens[0], tokens[1]);
Expand Down

0 comments on commit d045995

Please sign in to comment.