Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RFS refactoring into separate component programs + new work coordination mechanisms #739

Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
dd4b1d1
POC/unit test code to use painless scripts to do lease acquisition.
gregschohn Jun 10, 2024
f29c786
Drastically simplify taking a snapshot, assuming that there will be n…
gregschohn Jun 11, 2024
6a457ff
Merge branch 'main' into RfsCoordinationWork
gregschohn Jun 11, 2024
dd360ac
Bugfix in update logic of the CMS unit test POC.
gregschohn Jun 11, 2024
3d5a5a2
Checkpoint on the coordination POC test harness. No guarantees on th…
gregschohn Jun 13, 2024
dc24af9
WIP: In the process of refactoring the unit test POC into a more prod…
gregschohn Jun 13, 2024
7746f5f
Bugfixes and test improvement to show doling out work in the happy ca…
gregschohn Jun 14, 2024
f805701
Make some improvements to mitigate contention for acquiring work.
gregschohn Jun 15, 2024
b7923f6
Start converting the document migration phase to use the new primitives.
gregschohn Jun 15, 2024
611140f
Starting to change how process management (death) works in cases wher…
gregschohn Jun 17, 2024
4949388
Convert the remaining Runner classes to plain old java functions, w/o…
gregschohn Jun 17, 2024
3683c44
Remove "CMS" implementations as they're being replaced by the new Wor…
gregschohn Jun 17, 2024
e33c187
Merge branch 'main' into RfsCoordinationWork.
gregschohn Jun 17, 2024
e075d83
Merge branch 'main' into RfsCoordinationWork
gregschohn Jun 18, 2024
8703c20
WIP, doesn't compile. A lot of in-progress changes to support a full…
gregschohn Jun 18, 2024
0275dd5
A lot of bugfixes to get the FullTest moving further along
gregschohn Jun 19, 2024
8733c01
Update the DocumentsReader API to not take indexname/shard and instea…
gregschohn Jun 19, 2024
a9b6709
Extract some of the build logic to build a docker image for RFS and m…
gregschohn Jun 19, 2024
d2514dc
Update the DocumentsReader API to not take indexname/shard and instea…
gregschohn Jun 19, 2024
10866ff
WIP - start thinking about overall termination of RFS.
gregschohn Jun 20, 2024
69d7d58
Fix some lingering reporting issues to try to get better certainty in…
gregschohn Jun 20, 2024
19cf133
Make the FullTest more of a realistic functional test...
gregschohn Jun 21, 2024
0db6f63
Merge branch 'BuildReindexFromSnapshotViaNewDocumentProject' into Rfs…
gregschohn Jun 21, 2024
ee6fbc8
Merge branch 'main' into RfsCoordinationWork
gregschohn Jun 21, 2024
7905f4c
Fix documentation linting errors
gregschohn Jun 21, 2024
9f075c7
Minor fixes to build the rfs_source image and to get the FullTest to …
gregschohn Jun 21, 2024
6f50219
Path updates to move "RFS" references to "DocumentsFromSnapshotMigrat…
gregschohn Jun 21, 2024
9aead15
Minor bugfixes and refactoring improvements as per PR feedback.
gregschohn Jun 21, 2024
787fd26
Merge branch 'main' into BuildReindexFromSnapshotViaNewDocumentProject
gregschohn Jun 21, 2024
8ba51fc
Merge branch 'BuildReindexFromSnapshotViaNewDocumentProject' into Coo…
gregschohn Jun 21, 2024
8089d8b
Test bugfix for when not EVERY worker throws an exception.
gregschohn Jun 21, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 40 additions & 45 deletions CreateSnapshot/src/main/java/com/rfs/CreateSnapshot.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,84 +3,79 @@
import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;

import com.rfs.common.UsernamePassword;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager;

import com.rfs.cms.CmsClient;
import com.rfs.cms.OpenSearchCmsClient;
import com.rfs.common.ConnectionDetails;
import com.rfs.common.Logging;
import com.rfs.common.OpenSearchClient;
import com.rfs.common.SnapshotCreator;
import com.rfs.common.TryHandlePhaseFailure;
import com.rfs.common.S3SnapshotCreator;
import com.rfs.worker.GlobalState;
import com.rfs.worker.SnapshotRunner;

import java.util.Optional;
import java.util.function.Function;

@Slf4j
public class CreateSnapshot {
public static class Args {
@Parameter(names = {"--snapshot-name"}, description = "The name of the snapshot to migrate", required = true)
@Parameter(names = {"--snapshot-name"},
required = true,
description = "The name of the snapshot to migrate")
public String snapshotName;

@Parameter(names = {"--s3-repo-uri"}, description = "The S3 URI of the snapshot repo, like: s3://my-bucket/dir1/dir2", required = true)
@Parameter(names = {"--s3-repo-uri"},
required = true,
description = "The S3 URI of the snapshot repo, like: s3://my-bucket/dir1/dir2")
public String s3RepoUri;

@Parameter(names = {"--s3-region"}, description = "The AWS Region the S3 bucket is in, like: us-east-2", required = true)
@Parameter(names = {"--s3-region"},
required = true,
description = "The AWS Region the S3 bucket is in, like: us-east-2"
)
public String s3Region;

@Parameter(names = {"--source-host"}, description = "The source host and port (e.g. http://localhost:9200)", required = true)
@Parameter(names = {"--source-host"},
required = true,
description = "The source host and port (e.g. http://localhost:9200)")
public String sourceHost;

@Parameter(names = {"--source-username"}, description = "Optional. The source username; if not provided, will assume no auth on source", required = false)
@Parameter(names = {"--source-username"},
description = "Optional. The source username; if not provided, will assume no auth on source")
public String sourceUser = null;

@Parameter(names = {"--source-password"}, description = "Optional. The source password; if not provided, will assume no auth on source", required = false)
@Parameter(names = {"--source-password"},
description = "Optional. The source password; if not provided, will assume no auth on source")
public String sourcePass = null;
}

@Parameter(names = {"--target-host"}, description = "The target host and port (e.g. http://localhost:9200)", required = true)
public String targetHost;

@Parameter(names = {"--target-username"}, description = "Optional. The target username; if not provided, will assume no auth on target", required = false)
public String targetUser = null;

@Parameter(names = {"--target-password"}, description = "Optional. The target password; if not provided, will assume no auth on target", required = false)
public String targetPass = null;
@Getter
@AllArgsConstructor
public static class S3RepoInfo {
String awsRegion;
String repoUri;
}

public static void main(String[] args) throws Exception {
// Grab out args
Args arguments = new Args();
JCommander.newBuilder()
.addObject(arguments)
.build()
.parse(args);
.addObject(arguments)
.build()
.parse(args);

final String snapshotName = arguments.snapshotName;
final String s3RepoUri = arguments.s3RepoUri;
final String s3Region = arguments.s3Region;
final String sourceHost = arguments.sourceHost;
final String sourceUser = arguments.sourceUser;
final String sourcePass = arguments.sourcePass;
final String targetHost = arguments.targetHost;
final String targetUser = arguments.targetUser;
final String targetPass = arguments.targetPass;

final ConnectionDetails sourceConnection = new ConnectionDetails(sourceHost, sourceUser, sourcePass);
final ConnectionDetails targetConnection = new ConnectionDetails(targetHost, targetUser, targetPass);
log.info("Running CreateSnapshot with " + String.join(" ", args));
run(c -> new S3SnapshotCreator(arguments.snapshotName, c, arguments.s3RepoUri, arguments.s3Region),
new OpenSearchClient(arguments.sourceHost, arguments.sourceUser, arguments.sourcePass));
}

public static void run(Function<OpenSearchClient,SnapshotCreator> snapshotCreatorFactory,
OpenSearchClient openSearchClient)
throws Exception {
TryHandlePhaseFailure.executeWithTryCatch(() -> {
log.info("Running RfsWorker");
GlobalState globalState = GlobalState.getInstance();
OpenSearchClient sourceClient = new OpenSearchClient(sourceConnection);
OpenSearchClient targetClient = new OpenSearchClient(targetConnection);
final CmsClient cmsClient = new OpenSearchCmsClient(targetClient);

final SnapshotCreator snapshotCreator = new S3SnapshotCreator(snapshotName, sourceClient, s3RepoUri, s3Region);
final SnapshotRunner snapshotWorker = new SnapshotRunner(globalState, cmsClient, snapshotCreator);
snapshotWorker.run();
SnapshotRunner.runAndWaitForCompletion(snapshotCreatorFactory.apply(openSearchClient));
});
}
}
108 changes: 108 additions & 0 deletions DocumentsFromSnapshotMigration/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -3,30 +3,73 @@ plugins {
id 'java'
id 'jacoco'
id 'io.freefair.lombok' version '8.6'
id "com.avast.gradle.docker-compose" version "0.17.4"
id 'com.bmuschko.docker-remote-api'
}

import com.bmuschko.gradle.docker.tasks.image.DockerBuildImage
import groovy.transform.Canonical
import org.opensearch.migrations.common.CommonUtils

java.sourceCompatibility = JavaVersion.VERSION_11
java.targetCompatibility = JavaVersion.VERSION_11

@Canonical
class DockerServiceProps {
String projectName = ""
String dockerImageName = ""
String inputDir = ""
Map<String, String> buildArgs = [:]
List<String> taskDependencies = []
}

repositories {
mavenCentral()
}

dependencies {
implementation platform('io.projectreactor:reactor-bom:2023.0.5')
testImplementation platform('io.projectreactor:reactor-bom:2023.0.5')

implementation project(":commonDependencyVersionConstraints")

implementation project(":RFS")
implementation group: 'org.apache.logging.log4j', name: 'log4j-api'
implementation group: 'org.apache.logging.log4j', name: 'log4j-core'
implementation group: 'com.beust', name: 'jcommander'
implementation group: 'com.fasterxml.jackson.core', name: 'jackson-databind'
implementation group: 'com.fasterxml.jackson.core', name: 'jackson-annotations'
implementation group: 'com.fasterxml.jackson.core', name: 'jackson-core'
implementation group: 'com.fasterxml.jackson.dataformat', name: 'jackson-dataformat-smile'
implementation group: 'io.projectreactor.netty', name: 'reactor-netty-core'
implementation group: 'io.projectreactor.netty', name:'reactor-netty-http'
implementation group: 'org.slf4j', name: 'slf4j-api'
implementation group: 'org.apache.logging.log4j', name: 'log4j-slf4j2-impl'


testImplementation testFixtures(project(":RFS"))
testImplementation project(":CreateSnapshot")
testImplementation project(":MetadataMigration")
testImplementation group: 'org.apache.lucene', name: 'lucene-core'
testImplementation group: 'org.junit.jupiter', name: 'junit-jupiter-api'
testImplementation group: 'org.junit.jupiter', name: 'junit-jupiter-params'
testImplementation group: 'org.opensearch', name: 'opensearch-testcontainers'
testImplementation group: 'org.testcontainers', name: 'testcontainers'

testRuntimeOnly group: 'org.junit.jupiter', name: 'junit-jupiter-engine'

testImplementation platform('io.projectreactor:reactor-bom:2023.0.5')
}

application {
mainClassName = 'com.rfs.RfsMigrateDocuments'
}

// Cleanup additional docker build directory
clean.doFirst {
delete project.file("./docker/build")
}

// Utility task to allow copying required libraries into a 'dependencies' folder for security scanning
tasks.register('copyDependencies', Sync) {
duplicatesStrategy = DuplicatesStrategy.EXCLUDE
Expand All @@ -35,6 +78,71 @@ tasks.register('copyDependencies', Sync) {
into "${buildDir}/dependencies"
}

task copyDockerRuntimeJars (type: Sync) {
duplicatesStrategy = DuplicatesStrategy.EXCLUDE
description = 'Copy runtime JARs and app jar to docker build directory'

// Define the destination directory
def buildDir = project.file("./docker/build/runtimeJars")
into buildDir

// Add all the required runtime JARs to be copied
from configurations.runtimeClasspath
from tasks.named('jar')
include '*.jar'
}

DockerServiceProps[] dockerServices = [
new DockerServiceProps([projectName:"reindexFromSnapshot",
dockerImageName:"reindex_from_snapshot",
inputDir:"./docker",
taskDependencies:["copyDockerRuntimeJars"]]),
new DockerServiceProps([projectName:"emptyElasticsearchSource_7_10",
dockerImageName:"empty_elasticsearch_source_7_10",
inputDir:"./docker/TestSource_ES_7_10"]),
new DockerServiceProps([projectName:"emptyElasticsearchSource_7_17",
dockerImageName:"empty_elasticsearch_source_7_17",
inputDir:"./docker/TestSource_ES_7_17"]),
new DockerServiceProps([projectName:"trafficGenerator",
dockerImageName:"osb_traffic_generator",
inputDir:"./docker/TrafficGenerator",
taskDependencies:[":TrafficCapture:dockerSolution:buildDockerImage_elasticsearchTestConsole"]]),
] as DockerServiceProps[]

for (dockerService in dockerServices) {
task "buildDockerImage_${dockerService.projectName}" (type: DockerBuildImage) {
def hash = CommonUtils.calculateDockerHash(project.fileTree("docker/${dockerService.projectName}"))
for (dep in dockerService.taskDependencies) {
dependsOn dep
}
inputDir = project.file(dockerService.inputDir)
buildArgs = dockerService.buildArgs
images.add("migrations/${dockerService.dockerImageName}:${hash}")
images.add("migrations/${dockerService.dockerImageName}:${version}")
images.add("migrations/${dockerService.dockerImageName}:latest")
}
}

apply from: 'build-preloaded-source-image.gradle'

dockerCompose {
useComposeFiles = ['docker/docker-compose.yml']
projectName = 'rfs-compose'
}

// ../gradlew buildDockerImages
task buildDockerImages {
for (dockerService in dockerServices) {
dependsOn "buildDockerImage_${dockerService.projectName}"
}
}

tasks.named("buildDockerImage_elasticsearchRFSSource") {
dependsOn(':TrafficCapture:dockerSolution:buildDockerImage_elasticsearchTestConsole')
}
tasks.getByName('composeUp')
.dependsOn(tasks.getByName('buildDockerImages'))

jacocoTestReport {
reports {
xml.required = true
Expand Down
File renamed without changes.
Loading