diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java index 665761eddd77a..c071b22ba4cba 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java @@ -11,6 +11,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; +import org.opensearch.ExceptionsHelper; import org.opensearch.OpenSearchCorruptionException; import org.opensearch.action.support.ChannelActionListener; import org.opensearch.cluster.node.DiscoveryNode; @@ -18,6 +19,7 @@ import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.Nullable; import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.CancellableThreads; import org.opensearch.common.util.concurrent.AbstractRunnable; import org.opensearch.common.util.concurrent.ConcurrentCollections; import org.opensearch.core.action.ActionListener; @@ -269,15 +271,7 @@ public void onReplicationFailure( ReplicationFailedException e, boolean sendShardFailure ) { - logger.error( - () -> new ParameterizedMessage( - "[shardId {}] [replication id {}] Replication failed, timing data: {}", - replicaShard.shardId().getId(), - state.getReplicationId(), - state.getTimingData() - ), - e - ); + logReplicationFailure(state, e, replicaShard); if (sendShardFailure == true) { failShard(e, replicaShard); } else { @@ -293,6 +287,30 @@ public void onReplicationFailure( } } + private void logReplicationFailure(SegmentReplicationState state, ReplicationFailedException e, IndexShard replicaShard) { + // only log as error if error is not a cancellation. + if (ExceptionsHelper.unwrap(e, CancellableThreads.ExecutionCancelledException.class) == null) { + logger.error( + () -> new ParameterizedMessage( + "[shardId {}] [replication id {}] Replication failed, timing data: {}", + replicaShard.shardId(), + state.getReplicationId(), + state.getTimingData() + ), + e + ); + } else { + logger.debug( + () -> new ParameterizedMessage( + "[shardId {}] [replication id {}] Replication cancelled", + replicaShard.shardId(), + state.getReplicationId() + ), + e + ); + } + } + protected void updateVisibleCheckpoint(long replicationId, IndexShard replicaShard) { // Update replication checkpoint on source via transport call only supported for remote store integration. For node- // node communication, checkpoint update is piggy-backed to GET_SEGMENT_FILES transport call @@ -503,7 +521,6 @@ public void onResponse(Void o) { @Override public void onFailure(Exception e) { - logger.error(() -> new ParameterizedMessage("Exception replicating {} marking as failed.", target.description()), e); if (e instanceof OpenSearchCorruptionException) { onGoingReplications.fail(replicationId, new ReplicationFailedException("Store corruption during replication", e), true); return; @@ -584,15 +601,7 @@ public void onReplicationFailure( ReplicationFailedException e, boolean sendShardFailure ) { - logger.error( - () -> new ParameterizedMessage( - "[shardId {}] [replication id {}] Force replication Sync failed, timing data: {}", - indexShard.shardId().getId(), - state.getReplicationId(), - state.getTimingData() - ), - e - ); + logReplicationFailure(state, e, indexShard); if (sendShardFailure) { failShard(e, indexShard); }