From d045995a37770457084cde8bb8cdde1e67c50587 Mon Sep 17 00:00:00 2001 From: Gaurav Bafna Date: Tue, 1 Oct 2024 19:22:38 +0530 Subject: [PATCH] Address PR comments Signed-off-by: Gaurav Bafna --- .../remotestore/RemoteRestoreSnapshotIT.java | 5 ++-- .../RemoteStorePinnedTimestampService.java | 28 ++++++++----------- .../snapshots/SnapshotsService.java | 16 +++-------- 3 files changed, 19 insertions(+), 30 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteRestoreSnapshotIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteRestoreSnapshotIT.java index 01d83feeab3e1..faa9a8ea1bcfd 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteRestoreSnapshotIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteRestoreSnapshotIT.java @@ -796,6 +796,8 @@ public void testCreateSnapshotV2_Orphan_Timestamp_Cleanup() throws Exception { RemoteStorePinnedTimestampService.class, internalCluster().getClusterManagerName() ); + forceSyncPinnedTimestamps(); + long pinnedTimestamp = System.currentTimeMillis(); final CountDownLatch latch = new CountDownLatch(1); LatchedActionListener latchedActionListener = new LatchedActionListener<>(new ActionListener<>() { @@ -819,7 +821,6 @@ public void onFailure(Exception e) {} assertThat(snapshotInfo.successfulShards(), equalTo(snapshotInfo.totalShards())); assertThat(snapshotInfo.getPinnedTimestamp(), greaterThan(0L)); - forceSyncPinnedTimestamps(); waitUntil(() -> 1 == RemoteStorePinnedTimestampService.getPinnedEntities().size()); } @@ -1106,7 +1107,7 @@ public void testCreateSnapshotV2() throws Exception { .get() .status(); assertEquals(RestStatus.ACCEPTED, createSnapshotResponseStatus); - + forceSyncPinnedTimestamps(); assertEquals(2, RemoteStorePinnedTimestampService.getPinnedEntities().size()); } diff --git a/server/src/main/java/org/opensearch/node/remotestore/RemoteStorePinnedTimestampService.java b/server/src/main/java/org/opensearch/node/remotestore/RemoteStorePinnedTimestampService.java index ea80120c81b70..3a0515e6ea5b2 100644 --- a/server/src/main/java/org/opensearch/node/remotestore/RemoteStorePinnedTimestampService.java +++ b/server/src/main/java/org/opensearch/node/remotestore/RemoteStorePinnedTimestampService.java @@ -52,7 +52,7 @@ public class RemoteStorePinnedTimestampService implements Closeable { private static final Logger logger = LogManager.getLogger(RemoteStorePinnedTimestampService.class); private static Tuple> pinnedTimestampsSet = new Tuple<>(-1L, Set.of()); - private static HashMap> pinnedEntityToTimestampsMap = new HashMap<>(); + private static Map> pinnedEntityToTimestampsMap = new HashMap<>(); public static final String PINNED_TIMESTAMPS_PATH_TOKEN = "pinned_timestamps"; public static final String PINNED_TIMESTAMPS_FILENAME_SEPARATOR = "__"; @@ -276,7 +276,7 @@ public static Tuple> getPinnedTimestamps() { return pinnedTimestampsSet; } - public static HashMap> getPinnedEntities() { + public static Map> getPinnedEntities() { return pinnedEntityToTimestampsMap; } @@ -311,20 +311,16 @@ protected void runInternal() { logger.debug("Fetched pinned timestamps from remote store: {} - {}", triggerTimestamp, pinnedTimestamps); pinnedTimestampsSet = new Tuple<>(triggerTimestamp, pinnedTimestamps); - - pinnedEntityToTimestampsMap = new HashMap<>(); - pinnedEntityToTimestampsMap.putAll( - pinnedTimestampList.keySet() - .stream() - .collect(Collectors.toMap(RemoteStorePinnedTimestampService.this::getEntityFromBlobName, blobName -> { - long timestamp = RemoteStorePinnedTimestampService.this.getTimestampFromBlobName(blobName); - return Collections.singletonList(timestamp); - }, (existingList, newList) -> { - List mergedList = new ArrayList<>(existingList); - mergedList.addAll(newList); - return mergedList; - })) - ); + pinnedEntityToTimestampsMap = pinnedTimestampList.keySet() + .stream() + .collect(Collectors.toMap(RemoteStorePinnedTimestampService.this::getEntityFromBlobName, blobName -> { + long timestamp = RemoteStorePinnedTimestampService.this.getTimestampFromBlobName(blobName); + return Collections.singletonList(timestamp); + }, (existingList, newList) -> { + List mergedList = new ArrayList<>(existingList); + mergedList.addAll(newList); + return mergedList; + })); } catch (Throwable t) { logger.error("Exception while fetching pinned timestamp details", t); } diff --git a/server/src/main/java/org/opensearch/snapshots/SnapshotsService.java b/server/src/main/java/org/opensearch/snapshots/SnapshotsService.java index 7b02b8869f75f..c80f18cdd82f7 100644 --- a/server/src/main/java/org/opensearch/snapshots/SnapshotsService.java +++ b/server/src/main/java/org/opensearch/snapshots/SnapshotsService.java @@ -654,10 +654,8 @@ public TimeValue timeout() { } private void cleanOrphanTimestamp(String repoName, RepositoryData repositoryData) { - Collection snapshotUUIDs = repositoryData.getSnapshotIds().stream().map(SnapshotId::getUUID).collect(Collectors.toList()); - remoteStorePinnedTimestampService.forceSyncPinnedTimestamps(); - - HashMap> pinnedEntities = RemoteStorePinnedTimestampService.getPinnedEntities(); + Collection snapshotUUIDs = repositoryData.getSnapshotIds().stream().map(SnapshotId::getUUID).collect(Collectors.toSet()); + Map> pinnedEntities = RemoteStorePinnedTimestampService.getPinnedEntities(); List orphanPinnedEntities = pinnedEntities.keySet() .stream() @@ -682,15 +680,14 @@ private boolean isOrphanPinnedEntity(String repoName, Collection snapsho return Objects.equals(tokens.v1(), repoName) && snapshotUUIDs.contains(tokens.v2()) == false; } - private void deleteOrphanTimestamps(HashMap> pinnedEntities, List orphanPinnedEntities) { + private void deleteOrphanTimestamps(Map> pinnedEntities, List orphanPinnedEntities) { final CountDownLatch latch = new CountDownLatch(orphanPinnedEntities.size()); for (String pinnedEntity : orphanPinnedEntities) { assert pinnedEntities.get(pinnedEntity).size() == 1 : "Multiple timestamps for same repo-snapshot uuid found"; long orphanTimestamp = pinnedEntities.get(pinnedEntity).get(0); - String pinningEntity = getPinningEntity(pinnedEntity); remoteStorePinnedTimestampService.unpinTimestamp( orphanTimestamp, - pinningEntity, + pinnedEntity, new LatchedActionListener<>(new ActionListener<>() { @Override public void onResponse(Void unused) {} @@ -763,11 +760,6 @@ public static String getPinningEntity(String repositoryName, String snapshotUUID return repositoryName + SNAPSHOT_PINNED_TIMESTAMP_DELIMITER + snapshotUUID; } - public static String getPinningEntity(String blobname) { - String[] tokens = blobname.split(SNAPSHOT_PINNED_TIMESTAMP_DELIMITER); - return tokens[0] + SNAPSHOT_PINNED_TIMESTAMP_DELIMITER + tokens[1]; - } - public static Tuple getRepoSnapshotUUIDTuple(String pinningEntity) { String[] tokens = pinningEntity.split(SNAPSHOT_PINNED_TIMESTAMP_DELIMITER); return new Tuple<>(tokens[0], tokens[1]);