Skip to content

Commit

Permalink
Add initial migration console snapshot support
Browse files Browse the repository at this point in the history
Signed-off-by: Andre Kurait <[email protected]>
  • Loading branch information
AndreKurait committed Jun 20, 2024
1 parent e805ed7 commit d8d3641
Show file tree
Hide file tree
Showing 11 changed files with 249 additions and 20 deletions.
8 changes: 6 additions & 2 deletions CreateSnapshot/src/main/java/com/rfs/CreateSnapshot.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ public static class Args {

@Parameter(names = {"--target-password"}, description = "Optional. The target password; if not provided, will assume no auth on target", required = false)
public String targetPass = null;

@Parameter(names = {"--insecure"}, description = "Allow untrusted SSL certificates", required = false)
public boolean insecure = false;
}

public static void main(String[] args) throws Exception {
Expand All @@ -67,9 +70,10 @@ public static void main(String[] args) throws Exception {
final String targetHost = arguments.targetHost;
final String targetUser = arguments.targetUser;
final String targetPass = arguments.targetPass;
final boolean insecure = arguments.insecure;

final ConnectionDetails sourceConnection = new ConnectionDetails(sourceHost, sourceUser, sourcePass);
final ConnectionDetails targetConnection = new ConnectionDetails(targetHost, targetUser, targetPass);
final ConnectionDetails sourceConnection = new ConnectionDetails(sourceHost, sourceUser, sourcePass, insecure);
final ConnectionDetails targetConnection = new ConnectionDetails(targetHost, targetUser, targetPass, insecure);

TryHandlePhaseFailure.executeWithTryCatch(() -> {
log.info("Running RfsWorker");
Expand Down
6 changes: 6 additions & 0 deletions RFS/src/main/java/com/rfs/common/ConnectionDetails.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,15 @@ public static enum Protocol {
public final String username;
public final String password;
public final AuthType authType;
public final boolean insecure;

public ConnectionDetails(String url, String username, String password) {
this(url, username, password, false);
}

public ConnectionDetails(String url, String username, String password, boolean insecure) {
this.url = url; // http://localhost:9200
this.insecure = insecure;

// If the username is provided, the password must be as well, and vice versa
if ((username == null && password != null) || (username != null && password == null)) {
Expand Down
48 changes: 38 additions & 10 deletions RFS/src/main/java/com/rfs/common/RestClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,16 @@

import java.util.Base64;

import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
import lombok.SneakyThrows;
import reactor.core.publisher.Mono;
import reactor.netty.http.client.HttpClient;
import reactor.netty.tcp.SslProvider;
import reactor.netty.ByteBufMono;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLParameters;

public class RestClient {
public static class Response {
Expand All @@ -22,20 +29,41 @@ public Response(int responseCode, String responseBody, String responseMessage) {
public final ConnectionDetails connectionDetails;
private final HttpClient client;

@SneakyThrows

Check warning on line 32 in RFS/src/main/java/com/rfs/common/RestClient.java

View check run for this annotation

Codecov / codecov/patch

RFS/src/main/java/com/rfs/common/RestClient.java#L32

Added line #L32 was not covered by tests
public RestClient(ConnectionDetails connectionDetails) {
this.connectionDetails = connectionDetails;

SslProvider sslProvider;
if (connectionDetails.insecure) {
SslContext sslContext = SslContextBuilder.forClient()
.trustManager(InsecureTrustManagerFactory.INSTANCE)
.build();

Check warning on line 40 in RFS/src/main/java/com/rfs/common/RestClient.java

View check run for this annotation

Codecov / codecov/patch

RFS/src/main/java/com/rfs/common/RestClient.java#L38-L40

Added lines #L38 - L40 were not covered by tests

sslProvider = SslProvider.builder()
.sslContext(sslContext)
.handlerConfigurator(sslHandler -> {
SSLEngine engine = sslHandler.engine();
SSLParameters sslParameters = engine.getSSLParameters();
sslParameters.setEndpointIdentificationAlgorithm(null);
engine.setSSLParameters(sslParameters);
})
.build();
} else {
sslProvider = SslProvider.defaultClientProvider();

Check warning on line 52 in RFS/src/main/java/com/rfs/common/RestClient.java

View check run for this annotation

Codecov / codecov/patch

RFS/src/main/java/com/rfs/common/RestClient.java#L42-L52

Added lines #L42 - L52 were not covered by tests
}

this.client = HttpClient.create()
.baseUrl(connectionDetails.url)
.headers(h -> {
h.add("Content-Type", "application/json");
h.add("User-Agent", "RfsWorker-1.0");
if (connectionDetails.authType == ConnectionDetails.AuthType.BASIC) {
String credentials = connectionDetails.username + ":" + connectionDetails.password;
String encodedCredentials = Base64.getEncoder().encodeToString(credentials.getBytes());
h.add("Authorization", "Basic " + encodedCredentials);
}
});
.secure(sslProvider)
.baseUrl(connectionDetails.url)
.headers(h -> {
h.add("Content-Type", "application/json");
h.add("User-Agent", "RfsWorker-1.0");

Check warning on line 60 in RFS/src/main/java/com/rfs/common/RestClient.java

View check run for this annotation

Codecov / codecov/patch

RFS/src/main/java/com/rfs/common/RestClient.java#L56-L60

Added lines #L56 - L60 were not covered by tests
if (connectionDetails.authType == ConnectionDetails.AuthType.BASIC) {
String credentials = connectionDetails.username + ":" + connectionDetails.password;
String encodedCredentials = Base64.getEncoder().encodeToString(credentials.getBytes());
h.add("Authorization", "Basic " + encodedCredentials);

Check warning on line 64 in RFS/src/main/java/com/rfs/common/RestClient.java

View check run for this annotation

Codecov / codecov/patch

RFS/src/main/java/com/rfs/common/RestClient.java#L62-L64

Added lines #L62 - L64 were not covered by tests
}
});

Check warning on line 66 in RFS/src/main/java/com/rfs/common/RestClient.java

View check run for this annotation

Codecov / codecov/patch

RFS/src/main/java/com/rfs/common/RestClient.java#L66

Added line #L66 was not covered by tests
}

public Mono<Response> getAsync(String path) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,13 @@
import console_link.logic.clusters as logic_clusters
import console_link.logic.metrics as logic_metrics
import console_link.logic.backfill as logic_backfill
from console_link.logic.backfill import ExitCode
import console_link.logic.snapshot as logic_snapshot

from console_link.models.utils import ExitCode
from console_link.environment import Environment
from console_link.models.metrics_source import Component, MetricStatistic
from console_link.models.snapshot import SnapshotStatus

import logging

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -88,6 +92,34 @@ def replayer_group(ctx):
def start_replayer_cmd(ctx):
ctx.env.replayer.start()

# ##################### SNAPSHOT ###################

@cli.group(name="snapshot")
@click.pass_obj
def snapshot_group(ctx):
"""All actions related to snapshot creation"""
if ctx.env.snapshot is None:
raise click.UsageError("Snapshot is not set")


@snapshot_group.command(name="create")
@click.pass_obj
def create_snapshot_cmd(ctx):
"""Create a snapshot of the source cluster"""
snapshot = ctx.env.snapshot
status, message = logic_snapshot.create(snapshot)
if status != SnapshotStatus.COMPLETED:
raise click.ClickException(message)
click.echo(message)


@snapshot_group.command(name="status")
@click.pass_obj
def status_snapshot_cmd(ctx):
"""Check the status of the snapshot"""
snapshot = ctx.env.snapshot
_, message = logic_snapshot.status(snapshot, source_cluster=ctx.env.source_cluster, target_cluster=ctx.env.target_cluster)
click.echo(message)

# ##################### BACKFILL ###################

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from console_link.logic.metrics import get_metrics_source
from console_link.logic.backfill import get_backfill
from console_link.models.backfill_base import Backfill
from console_link.models.snapshot import Snapshot, S3Snapshot
import yaml
from cerberus import Validator

Expand All @@ -17,6 +18,7 @@
"replayer": {"type": "dict", "required": False},
"backfill": {"type": "dict", "required": False},
"metrics_source": {"type": "dict", "required": False},
"snapshot": {"type": "dict", "required": False},
}


Expand All @@ -25,6 +27,7 @@ class Environment:
target_cluster: Optional[Cluster] = None
backfill: Optional[Backfill] = None
metrics_source: Optional[MetricsSource] = None
snapshot: Optional[Snapshot] = None

def __init__(self, config_file: str):
self.config_file = config_file
Expand Down Expand Up @@ -61,3 +64,11 @@ def __init__(self, config_file: str):
logger.info(f"Backfill migration initialized: {self.backfill}")
else:
logger.info("No backfill provided")

if 'snapshot' in self.config:
self.snapshot: Snapshot = S3Snapshot(self.config["snapshot"],
source_cluster=self.source_cluster,
target_cluster=self.target_cluster)
logger.info(f"Snapshot initialized: {self.snapshot}")
else:
logger.info("No snapshot provided")
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import json
import logging
from typing import Dict, Optional, Tuple

from console_link.models.utils import ExitCode
from console_link.models.backfill_osi import OpenSearchIngestionBackfill
from console_link.models.backfill_rfs import DockerRFSBackfill, ECSRFSBackfill
from console_link.models.cluster import Cluster
Expand All @@ -13,11 +13,6 @@
logger = logging.getLogger(__name__)


class ExitCode(Enum):
SUCCESS = 0
FAILURE = 1


BackfillType = Enum("BackfillType",
["opensearch_ingestion", "reindex_from_snapshot"])

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import logging
from typing import Tuple
from console_link.models.snapshot import Snapshot, SnapshotStatus

logger = logging.getLogger(__name__)

def create(snapshot: Snapshot, *args, **kwargs) -> Tuple[SnapshotStatus, str]:
logger.info(f"Creating snapshot with {args=} and {kwargs=}")
try:
result = snapshot.create(*args, **kwargs)
except Exception as e:
logger.error(f"Failed to create snapshot: {e}")
return SnapshotStatus.FAILED, f"Failure when creating snapshot: {type(e).__name__} {e}"

if result.success:
return SnapshotStatus.COMPLETED, "Snapshot created successfully." + "\n" + result.value
return SnapshotStatus.FAILED, "Snapshot creation failed." + "\n" + result.value

def status(snapshot: Snapshot, *args, **kwargs) -> Tuple[SnapshotStatus, str]:
logger.info("Getting snapshot status")
try:
result = snapshot.status(*args, **kwargs)
except Exception as e:
logger.error(f"Failed to get status of snapshot: {e}")
return SnapshotStatus.FAILED, f"Failure when getting status of snapshot: {type(e).__name__} {e}"
if result.success:
return SnapshotStatus.COMPLETED, result.value
return SnapshotStatus.FAILED, "Snapshot status retrieval failed." + "\n" + result.value
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
from abc import ABC, abstractmethod
from enum import Enum
import logging
import subprocess
from typing import Dict, Optional, Tuple
from console_link.models.cluster import Cluster
from console_link.models.command_result import CommandResult
from cerberus import Validator

logger = logging.getLogger(__name__)

SnapshotStatus = Enum(
"SnapshotStatus", [
"NOT_STARTED",
"RUNNING",
"COMPLETED",
"FAILED"
])

class Snapshot(ABC):
"""
Interface for creating and managing snapshots.
"""
def __init__(self, config: Dict, source_cluster: Cluster, target_cluster: Optional[Cluster] = None) -> None:
self.config = config
self.source_cluster = source_cluster
self.target_cluster = target_cluster

@abstractmethod
def create(self, *args, **kwargs) -> CommandResult:
"""Create a snapshot."""
pass

@abstractmethod
def status(self, *args, **kwargs) -> CommandResult:
"""Get the status of the snapshot."""
pass


S3_SNAPSHOT_SCHEMA = {
'snapshot_name': {
'type': 'string',
'required': True
},
's3_repo_uri': {
'type': 'string',
'required': True
},
's3_region': {
'type': 'string',
'required': True
}
}

class S3Snapshot(Snapshot):
def __init__(self, config: Dict, source_cluster: Cluster, target_cluster: Optional[Cluster] = None) -> None:
super().__init__(config, source_cluster, target_cluster)
v = Validator(S3_SNAPSHOT_SCHEMA)
if not v.validate(config):
raise ValueError("Invalid config file for snapshot", v.errors)
self.snapshot_name = config['snapshot_name']
self.s3_repo_uri = config['s3_repo_uri']
self.s3_region = config['s3_region']

def create(self, *args, **kwargs) -> CommandResult:
command = [
"/root/createSnapshot/bin/CreateSnapshot",
"--snapshot-name", self.snapshot_name,
"--s3-repo-uri", self.s3_repo_uri,
"--s3-region", self.s3_region,
"--source-host", self.source_cluster.endpoint,
"--target-host", self.target_cluster.endpoint,
]
if self.source_cluster.allow_insecure or self.target_cluster.allow_insecure:
command.append("--insecure")
logger.info(f"Creating snapshot with command: {' '.join(command)}")
try:
subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, check=True)
logger.info(f"Snapshot {self.config['snapshot_name']} created successfully")
return CommandResult(success=True, value=f"Snapshot {self.config['snapshot_name']} created successfully")
except subprocess.CalledProcessError as e:
logger.error(f"Failed to create snapshot: {str(e)}")
return CommandResult(success=False, value=f"Failed to create snapshot: {str(e)} {e.output} {e.stderr}")
def status(self, *args, **kwargs) -> CommandResult:
return CommandResult(success=False, value=f"Command not implemented")
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# define a custom exception for aws api errors
from enum import Enum
from typing import Dict


Expand All @@ -20,3 +21,7 @@ def raise_for_aws_api_error(response: Dict) -> None:
"Non-2XX status code received",
status_code=status_code
)

class ExitCode(Enum):
SUCCESS = 0
FAILURE = 1
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
source_cluster:
endpoint: "https://capture-proxy-es:9200"
endpoint: "https://capture-proxy:9200"
allow_insecure: true
basic_auth:
username: "admin"
Expand All @@ -16,3 +16,7 @@ metrics_source:
backfill:
reindex_from_snapshot:
docker:
snapshot:
snapshot_name: "snapshot_2023_01_01"
s3_repo_uri: "s3://my-snapshot-bucket"
s3_region: "us-east-2"
Loading

0 comments on commit d8d3641

Please sign in to comment.