Skip to content

Commit

Permalink
Setup Initial Jenkins Git Structure (#744)
Browse files Browse the repository at this point in the history
This change introduces a shared library structure for our Jenkins pipelines. As can be seen, this allows creating a pipeline template that our actual pipeline files can utilize and configure as needed.

Along with this, this change includes some common functions that will be needed for testing and have been added to the console library, with a future change expected to move our python testing files to be able to use this console library as well.

---------

Signed-off-by: Tanner Lewis <[email protected]>
  • Loading branch information
lewijacn committed Jun 21, 2024
1 parent 1383562 commit 8ecb500
Show file tree
Hide file tree
Showing 9 changed files with 405 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ services:
# this is a convenience thing for testing -- it should be removed before this makes it to prod.
- ./lib/console_link:/root/lib/console_link
environment:
# Copy local AWS env variables to Docker container
# Copy local AWS env to Docker container
#- ~/.aws:/root/.aws
- AWS_ACCESS_KEY_ID=${AWS_ACCESS_KEY_ID}
- AWS_SECRET_ACCESS_KEY=${AWS_SECRET_ACCESS_KEY}
- AWS_SESSION_TOKEN=${AWS_SESSION_TOKEN}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,48 @@ def cat_indices_cmd(ctx):
click.echo(logic_clusters.cat_indices(ctx.env.target_cluster))


@cluster_group.command(name="connection-check")
@click.pass_obj
def connection_check_cmd(ctx):
"""Checks if a connection can be established to source and target clusters"""
click.echo("SOURCE CLUSTER")
click.echo(logic_clusters.connection_check(ctx.env.source_cluster))
click.echo("TARGET CLUSTER")
click.echo(logic_clusters.connection_check(ctx.env.target_cluster))


@cluster_group.command(name="run-test-benchmarks")
@click.pass_obj
def run_test_benchmarks_cmd(ctx):
"""Run a series of OpenSearch Benchmark workloads against the source cluster"""
click.echo(logic_clusters.run_test_benchmarks(ctx.env.source_cluster))


@cluster_group.command(name="clear-indices")
@click.option("--acknowledge-risk", is_flag=True, show_default=True, default=False,
help="Flag to acknowledge risk and skip confirmation")
@click.option('--cluster',
type=click.Choice(['source', 'target'], case_sensitive=False),
help="Cluster to perform clear indices action on",
required=True)
@click.pass_obj
def clear_indices_cmd(ctx, acknowledge_risk, cluster):
"""[Caution] Clear indices on a source or target cluster"""
cluster_focus = ctx.env.source_cluster if cluster.lower() == 'source' else ctx.env.target_cluster
if acknowledge_risk:
click.echo("Performing clear indices operation...")
click.echo(logic_clusters.clear_indices(cluster_focus))
else:
if click.confirm(f'Clearing indices WILL result in the loss of all data on the {cluster.lower()} cluster. '
f'Are you sure you want to continue?'):
click.echo(f"Performing clear indices operation on {cluster.lower()} cluster...")
click.echo(logic_clusters.clear_indices(cluster_focus))
else:
click.echo("Aborting command.")

# ##################### REPLAYER ###################


@cli.group(name="replayer")
@click.pass_obj
def replayer_group(ctx):
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,53 @@
from console_link.models.cluster import Cluster
from console_link.models.cluster import Cluster, HttpMethod
from dataclasses import dataclass
import logging

logger = logging.getLogger(__name__)


@dataclass
class ConnectionResult:
connection_message: str
connection_established: bool
cluster_version: str


def cat_indices(cluster: Cluster, as_json=False):
as_json_suffix = "?format=json" if as_json else ""
as_json_suffix = "?format=json" if as_json else "?v"
cat_indices_path = f"/_cat/indices{as_json_suffix}"
r = cluster.call_api(cat_indices_path)
return r.json() if as_json else r.content


def connection_check(cluster: Cluster) -> ConnectionResult:
cluster_details_path = "/"
caught_exception = None
r = None
try:
r = cluster.call_api(cluster_details_path, timeout=3)
except Exception as e:
caught_exception = e
logging.debug(f"Unable to access cluster: {cluster} with exception: {e}")
if caught_exception is None:
response_json = r.json()
return ConnectionResult(connection_message="Successfully connected!",
connection_established=True,
cluster_version=response_json['version']['number'])
else:
return ConnectionResult(connection_message=f"Unable to connect to cluster with error: {caught_exception}",
connection_established=False,
cluster_version=None)


def run_test_benchmarks(cluster: Cluster):
cluster.execute_benchmark_workload(workload="geonames")
cluster.execute_benchmark_workload(workload="http_logs")
cluster.execute_benchmark_workload(workload="nested")
cluster.execute_benchmark_workload(workload="nyc_taxis")


# As a default we exclude system indices and searchguard indices
def clear_indices(cluster: Cluster):
clear_indices_path = "/*,-.*,-searchguard*,-sg7*"
r = cluster.call_api(clear_indices_path, method=HttpMethod.DELETE)
return r.content
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from requests.auth import HTTPBasicAuth
from cerberus import Validator
import logging
import subprocess
from console_link.models.schema_tools import contains_one_of

requests.packages.urllib3.disable_warnings() # ignore: type
Expand Down Expand Up @@ -84,7 +85,7 @@ def __init__(self, config: Dict) -> None:
elif 'sigv4' in config:
self.auth_type = AuthMethod.SIGV4

def call_api(self, path, method: HttpMethod = HttpMethod.GET) -> requests.Response:
def call_api(self, path, method: HttpMethod = HttpMethod.GET, timeout=None) -> requests.Response:
"""
Calls an API on the cluster.
"""
Expand All @@ -105,7 +106,30 @@ def call_api(self, path, method: HttpMethod = HttpMethod.GET) -> requests.Respon
f"{self.endpoint}{path}",
verify=(not self.allow_insecure),
auth=auth,
timeout=timeout
)
logger.debug(f"Cluster API call request: {r.request}")
r.raise_for_status()
return r

def execute_benchmark_workload(self, workload: str,
workload_params='target_throughput:0.5,bulk_size:10,bulk_indexing_clients:1,'
'search_clients:1'):
client_options = ""
if not self.allow_insecure:
client_options += "use_ssl:true,verify_certs:false"
if self.auth_type == AuthMethod.BASIC_AUTH:
if self.auth_details['password'] is not None:
client_options += (f"basic_auth_user:{self.auth_details['username']},"
f"basic_auth_password:{self.auth_details['password']}")
else:
raise NotImplementedError(f"Auth type {self.auth_type} with AWS Secret ARN is not currently support "
f"for executing benchmark workloads")
elif self.auth_type == AuthMethod.SIGV4:
raise NotImplementedError(f"Auth type {self.auth_type} is not currently support for executing "
f"benchmark workloads")
logger.info(f"Running opensearch-benchmark with '{workload}' workload")
subprocess.run(f"opensearch-benchmark execute-test --distribution-version=1.0.0 "
f"--target-host={self.endpoint} --workload={workload} --pipeline=benchmark-only --test-mode "
f"--kill-running-processes --workload-params={workload_params} "
f"--client-options={client_options}", shell=True)
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,44 @@ def test_cli_cluster_cat_indices(runner, env, mocker):
mock.assert_called()


def test_cli_cluster_connection_check(runner, env, mocker):
mock = mocker.patch('console_link.logic.clusters.connection_check')
result = runner.invoke(cli, ['--config-file', str(VALID_SERVICES_YAML), 'clusters', 'connection-check'],
catch_exceptions=True)
# Should have been called two times.
assert result.exit_code == 0
assert 'SOURCE CLUSTER' in result.output
assert 'TARGET CLUSTER' in result.output
mock.assert_called()


def test_cli_cluster_run_test_benchmarks(runner, env, mocker):
mock = mocker.patch('console_link.logic.clusters.run_test_benchmarks')
result = runner.invoke(cli, ['--config-file', str(VALID_SERVICES_YAML), 'clusters', 'run-test-benchmarks'],
catch_exceptions=True)
mock.assert_called_once()
assert result.exit_code == 0


def test_cli_cluster_clear_indices(runner, env, mocker):
mock = mocker.patch('console_link.logic.clusters.clear_indices')
result = runner.invoke(cli,
['--config-file', str(VALID_SERVICES_YAML), 'clusters', 'clear-indices',
'--cluster', 'source', '--acknowledge-risk'],
catch_exceptions=True)
mock.assert_called_once()
assert result.exit_code == 0


def test_cli_cluster_clear_indices_no_acknowledge(runner, env, mocker):
mock = mocker.patch('console_link.logic.clusters.clear_indices')
runner.invoke(cli,
['--config-file', str(VALID_SERVICES_YAML), 'clusters', 'clear-indices',
'--cluster', 'source'],
catch_exceptions=True)
assert not mock.called


def test_cli_with_metrics_get_data(runner, env, mocker):
mock = mocker.patch('console_link.models.metrics_source.PrometheusMetricsSource.get_metrics')
result = runner.invoke(cli, ['--config-file', str(VALID_SERVICES_YAML), 'metrics', 'list'],
Expand All @@ -69,10 +107,10 @@ def test_cli_with_backfill_describe(runner, env, mocker):

def test_cli_snapshot_create(runner, env, mocker):
mock = mocker.patch('console_link.logic.snapshot.create')

# Set the mock return value
mock.return_value = SnapshotStatus.COMPLETED, "Snapshot created successfully."

# Test snapshot creation
result = runner.invoke(cli, ['--config-file', str(VALID_SERVICES_YAML), 'snapshot', 'create'],
catch_exceptions=True)
Expand All @@ -87,16 +125,16 @@ def test_cli_snapshot_create(runner, env, mocker):
@pytest.mark.skip(reason="Not implemented yet")
def test_cli_snapshot_status(runner, env, mocker):
mock = mocker.patch('console_link.logic.snapshot.status')

# Set the mock return value
mock.return_value = SnapshotStatus.COMPLETED, "Snapshot status: COMPLETED"

# Test snapshot status
result = runner.invoke(cli, ['--config-file', str(VALID_SERVICES_YAML), 'snapshot', 'status'],
catch_exceptions=True)
assert result.exit_code == 0
assert "Snapshot status: COMPLETED" in result.output

# Ensure the mocks were called
mock.assert_called_once()

Expand Down Expand Up @@ -124,7 +162,7 @@ def test_cli_cat_indices_e2e(runner, env):
text=target_cat_indices)
result = runner.invoke(cli, ['--config-file', str(VALID_SERVICES_YAML), 'clusters', 'cat-indices'],
catch_exceptions=True)

assert result.exit_code == 0
assert 'SOURCE CLUSTER' in result.output
assert 'TARGET CLUSTER' in result.output
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ export class MigrationConsoleStack extends MigrationServiceCore {
let servicePortMappings: PortMapping[]|undefined
let serviceDiscoveryPort: number|undefined
let serviceDiscoveryEnabled = false
let imageCommand: string[]|undefined
let imageCommand = ['/bin/sh', '-c', '/root/loadServicesFromParameterStore.sh']

const osClusterEndpoint = getMigrationStringParameterValue(this, {
...props,
Expand Down
70 changes: 70 additions & 0 deletions jenkins/migrationIntegPipelines/ec2SourceE2EPipeline.groovy
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
// Note:
// 1. We are using an existing common VPC that we provide through a 'vpcId' parameter on the pipeline for now until we move
// to a proper Jenkins accounts and can create a setup without public subnets as well as request an extension to allow more than 5 VPCs per region
// 2. There is a still a manual step needed on the EC2 source load balancer to replace its security group rule which allows all traffic (0.0.0.0/0) to
// allow traffic for the relevant service security group. This needs a better story around accepting user security groups in our Migration CDK.

def sourceContextId = 'source-single-node-ec2'
def migrationContextId = 'migration-default'
def gitUrl = 'https://github.com/opensearch-project/opensearch-migrations.git'
def gitBranch = 'main'
def stageId = 'aws-integ'
def source_cdk_context = """
{
"source-single-node-ec2": {
"suffix": "ec2-source-<STAGE>",
"networkStackSuffix": "ec2-source-<STAGE>",
"vpcId": "$vpcId",
"distVersion": "7.10.2",
"cidr": "12.0.0.0/16",
"distributionUrl": "https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-oss-7.10.2-linux-x86_64.tar.gz",
"captureProxyEnabled": true,
"securityDisabled": true,
"minDistribution": false,
"cpuArch": "x64",
"isInternal": true,
"singleNodeCluster": true,
"networkAvailabilityZones": 2,
"dataNodeCount": 1,
"managerNodeCount": 0,
"serverAccessType": "ipv4",
"restrictServerAccessTo": "0.0.0.0/0"
}
}
"""
def migration_cdk_context = """
{
"migration-default": {
"stage": "<STAGE>",
"vpcId": "$vpcId",
"engineVersion": "OS_2.11",
"domainName": "os-cluster-<STAGE>",
"dataNodeCount": 2,
"openAccessPolicyEnabled": true,
"domainRemovalPolicy": "DESTROY",
"artifactBucketRemovalPolicy": "DESTROY",
"trafficReplayerExtraArgs": "--speedup-factor 10.0",
"fetchMigrationEnabled": true,
"reindexFromSnapshotServiceEnabled": true,
"sourceClusterEndpoint": "<SOURCE_CLUSTER_ENDPOINT>",
"dpPipelineTemplatePath": "../../../test/dp_pipeline_aws_integ.yaml",
"migrationConsoleEnableOSI": true,
"migrationAPIEnabled": true
}
}
"""

@Library("migrations-shared-lib@main")_

defaultIntegPipeline(
sourceContext: source_cdk_context,
migrationContext: migration_cdk_context,
sourceContextId: sourceContextId,
migrationContextId: migrationContextId,
gitUrl: gitUrl,
gitBranch: gitBranch,
stageId: stageId
//deployStep: {
// echo 'Custom Test Step'
//}
)
66 changes: 66 additions & 0 deletions jenkins/migrationIntegPipelines/rfsBackfillE2EPipeline.groovy
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
// Note:
// 1. We are using an existing common VPC that we provide through a 'vpcId' parameter on the pipeline for now until we move
// to a proper Jenkins accounts and can create a setup without public subnets as well as request an extension to allow more than 5 VPCs per region
// 2. There is a still a manual step needed on the EC2 source load balancer to replace its security group rule which allows all traffic (0.0.0.0/0) to
// allow traffic for the relevant service security group. This needs a better story around accepting user security groups in our Migration CDK.

def sourceContextId = 'source-single-node-ec2'
def migrationContextId = 'migration-rfs'
def gitUrl = 'https://github.com/opensearch-project/opensearch-migrations.git'
def gitBranch = 'main'
def stageId = 'rfs-integ'
def source_cdk_context = """
{
"source-single-node-ec2": {
"suffix": "ec2-source-<STAGE>",
"networkStackSuffix": "ec2-source-<STAGE>",
"vpcId": "$vpcId",
"distVersion": "7.10.2",
"distributionUrl": "https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-oss-7.10.2-linux-x86_64.tar.gz",
"captureProxyEnabled": false,
"securityDisabled": true,
"minDistribution": false,
"cpuArch": "x64",
"isInternal": true,
"singleNodeCluster": true,
"networkAvailabilityZones": 2,
"dataNodeCount": 1,
"managerNodeCount": 0,
"serverAccessType": "ipv4",
"restrictServerAccessTo": "0.0.0.0/0"
}
}
"""
def migration_cdk_context = """
{
"migration-rfs": {
"stage": "<STAGE>",
"vpcId": "$vpcId",
"engineVersion": "OS_2.11",
"domainName": "os-cluster-<STAGE>",
"dataNodeCount": 2,
"openAccessPolicyEnabled": true,
"domainRemovalPolicy": "DESTROY",
"artifactBucketRemovalPolicy": "DESTROY",
"kafkaBrokerServiceEnabled": true,
"trafficReplayerServiceEnabled": false,
"reindexFromSnapshotServiceEnabled": true,
"sourceClusterEndpoint": "<SOURCE_CLUSTER_ENDPOINT>"
}
}
"""

@Library("migrations-shared-lib@main")_

defaultIntegPipeline(
sourceContext: source_cdk_context,
migrationContext: migration_cdk_context,
sourceContextId: sourceContextId,
migrationContextId: migrationContextId,
gitUrl: gitUrl,
gitBranch: gitBranch,
stageId: stageId,
finishStep: {
echo 'Skipping step for RFS'
}
)
Loading

0 comments on commit 8ecb500

Please sign in to comment.