diff --git a/.github/workflows/build_package.yml b/.github/workflows/build_package.yml index 02a177d..0a9a0b9 100644 --- a/.github/workflows/build_package.yml +++ b/.github/workflows/build_package.yml @@ -12,11 +12,12 @@ jobs: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v4 - name: Set up JDK 1.17 - uses: actions/setup-java@v1 + uses: actions/setup-java@v4 with: - java-version: 1.17 + distribution: 'zulu' + java-version: '17' - name: Load local Maven repository cache uses: actions/cache@v2 @@ -27,4 +28,6 @@ jobs: ${{ runner.os }}-maven- - name: Run mvn package - run: mvn -B package --file pom.xml + env: + SONAR_TOKEN: ${{ secrets.SONAR_TOKEN }} + run: mvn -B verify org.sonarsource.scanner.maven:sonar-maven-plugin:sonar -Dsonar.projectKey=qbicsoftware_data-processing diff --git a/.github/workflows/codeql-analysis.yml b/.github/workflows/codeql-analysis.yml index d030d5a..46ff394 100644 --- a/.github/workflows/codeql-analysis.yml +++ b/.github/workflows/codeql-analysis.yml @@ -38,15 +38,16 @@ jobs: steps: - name: Checkout repository - uses: actions/checkout@v2 + uses: actions/checkout@v4 - name: Set up JDK 1.17 - uses: actions/setup-java@v1 + uses: actions/setup-java@v4 with: - java-version: 1.17 + distribution: 'zulu' + java-version: '17' settings-path: ${{ github.workspace }} - name: Load local Maven repository cache - uses: actions/cache@v2 + uses: actions/cache@v4 with: path: ~/.m2/repository key: ${{ runner.os }}-maven-${{ hashFiles('**/pom.xml') }} @@ -55,7 +56,7 @@ jobs: # Initializes the CodeQL tools for scanning. - name: Initialize CodeQL - uses: github/codeql-action/init@v1 + uses: github/codeql-action/init@v3 with: languages: ${{ matrix.language }} # If you wish to specify custom queries, you can do so here or in a config file. @@ -66,7 +67,7 @@ jobs: # Autobuild attempts to build any compiled languages (C/C++, C#, or Java). # If this step fails, then you should remove it and run the build manually (see below) - name: Autobuild - uses: github/codeql-action/autobuild@v1 + uses: github/codeql-action/autobuild@v3 # ℹ️ Command-line programs to run using the OS shell. # 📚 https://git.io/JvXDl @@ -80,4 +81,4 @@ jobs: # make release - name: Perform CodeQL Analysis - uses: github/codeql-action/analyze@v1 + uses: github/codeql-action/analyze@v3 diff --git a/.github/workflows/create-release.yml b/.github/workflows/create-release.yml index 38ce57d..4e8e459 100644 --- a/.github/workflows/create-release.yml +++ b/.github/workflows/create-release.yml @@ -11,15 +11,16 @@ jobs: release: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v4 - name: Set up JDK 1.17 - uses: actions/setup-java@v1 + uses: actions/setup-java@v4 with: - java-version: 1.17 + distribution: 'zulu' + java-version: '17' settings-path: ${{ github.workspace }} - name: Load local Maven repository cache - uses: actions/cache@v2 + uses: actions/cache@v4 with: path: ~/.m2/repository key: ${{ runner.os }}-maven-${{ hashFiles('**/pom.xml') }} diff --git a/.github/workflows/nexus-publish-snapshots.yml b/.github/workflows/nexus-publish-snapshots.yml index fc25f08..01dddb9 100644 --- a/.github/workflows/nexus-publish-snapshots.yml +++ b/.github/workflows/nexus-publish-snapshots.yml @@ -15,15 +15,16 @@ jobs: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v4 - name: Set up JDK 1.17 - uses: actions/setup-java@v1 + uses: actions/setup-java@v4 with: - java-version: 1.17 + distribution: 'zulu' + java-version: '17' settings-path: ${{ github.workspace }} - name: Load local Maven repository cache - uses: actions/cache@v2 + uses: actions/cache@v4 with: path: ~/.m2/repository key: ${{ runner.os }}-maven-${{ hashFiles('**/pom.xml') }} diff --git a/.github/workflows/run_tests.yml b/.github/workflows/run_tests.yml index 1dde144..4f3ce07 100644 --- a/.github/workflows/run_tests.yml +++ b/.github/workflows/run_tests.yml @@ -13,14 +13,15 @@ jobs: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v4 - name: Set up JDK 1.17 - uses: actions/setup-java@v1 + uses: actions/setup-java@v4 with: - java-version: 1.17 + distribution: 'zulu' + java-version: '17' - name: Load local Maven repository cache - uses: actions/cache@v2 + uses: actions/cache@v4 with: path: ~/.m2/repository key: ${{ runner.os }}-maven-${{ hashFiles('**/pom.xml') }} diff --git a/LICENSE b/LICENSE index 9ac8ded..f43eaba 100644 --- a/LICENSE +++ b/LICENSE @@ -1,6 +1,6 @@ MIT License -Copyright (c) 2021 QBiC +Copyright (c) 2023 University of Tübingen Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal diff --git a/README.md b/README.md index 6c46469..496371e 100644 --- a/README.md +++ b/README.md @@ -1,10 +1,11 @@ -# Spring Boot Starter (template) +# Data processing -A minimal working starter template for a Spring Boot non-web applications using the -Spring [CommandLineRunner](https://docs.spring.io/spring-boot/docs/current/api/org/springframework/boot/CommandLineRunner.html) -interface with a demonstration of Java -annotation-based [Inversion of Control](https://stackoverflow.com/questions/3058/what-is-inversion-of-control) -via [Dependency Injection](https://stackoverflow.com/questions/130794/what-is-dependency-injection). +A small Java application that listens to data ready for registration and performs several +pre-registration +checks before moving the dataset to an openBIS ETL routine. + +> [!NOTE] +> Requires Java SE 17 or newer. ## Run the app @@ -14,174 +15,248 @@ Checkout the latest code from `main` and run the Maven goal `spring-boot:run`: mvn spring-boot:run ``` -## What the app does - -This small app just parses a file with a collection of good coding prayers and creates a singleton -instance of an `CodingPrayersMessageService`. This concrete implementation uses the -interface `MessageService`, that comes with only one public method: `String collectMessage()`. - -This service is used to demonstrate the IoC principle. We have defined another interface `NewsMedia` -and provide a concrete implementation `DeveloperNews`, that will call the message service to receive -recent news and forward them to the caller. +You have to set different environment variables first to configure the individual process parts. +Have a look at the [configuration](#configuration) setting to learn more . -In the main app code, we just retrieve this Singleton instance or Bean in Spring lingua from the -loaded context and call the news media `getNEws()` method. The collected message is then printed out -to the command line interface: +## What the app does -``` +The following figure gives an overview of the process building blocks and flow: - . ____ _ __ _ _ - /\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \ -( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \ - \\/ ___)| |_)| | | | | || (_| | ) ) ) ) - ' |____| .__|_| |_|_| |_\__, | / / / / - =========|_|==============|___/=/_/_/_/ - :: Spring Boot :: (v2.5.6) - -2021-11-18 09:17:37.640 INFO 68052 --- [ main] l.q.s.SpringMinimalTemplateApplication : Starting SpringMinimalTemplateApplication using Java 17.0.1 on imperator.am10.uni-tuebingen.de with PID 68052 (/Users/sven1103/git/spring-boot-starter-template/target/classes started by sven1103 in /Users/sven1103/git/spring-boot-starter-template) -2021-11-18 09:17:37.641 INFO 68052 --- [ main] l.q.s.SpringMinimalTemplateApplication : No active profile set, falling back to default profiles: default -2021-11-18 09:17:38.164 INFO 68052 --- [ main] l.q.s.SpringMinimalTemplateApplication : Started SpringMinimalTemplateApplication in 0.808 seconds (JVM running for 1.489) -####################### Message of the day ################## -Have you written unit tests yet? If not, do it! -############################################################## + -``` +The **basic process flow** can be best described with: -## Realisation of IoC and DPI +1. Scanning step +2. Registration step (preparation) +3. 1 to N processing steps -The messages collection is stored in a simple text file `messages.txt`, that is provided with the -apps `resources`. Just go ahead and change the content of the file and run the app! +The last processing step usually hands the dataset over to the actual registration system, in our +case it is several +openBIS ETL dropboxes. In the current implementation, a marker file is created after successful +transfer into the target folder: `.MARKER_is_finished_[task ID]` -grafik +The current implementation consists of 4 steps: _scanning, registration, processing, evaluation_ and +are described in the following subsections. +### Scanning -So how does the app know where to find this file and **load the messages content**? +In this step, the application scans a [pre-defined path](#scanner-step-config) and looks for +existing registration folders. +If a registration folder is present, it is recorded and will be investigated. All other files in a +user's directory will be ignored. -We have configured it as a **external property** in a file `application.properties` and load the -configuration on application startup! Cool eh? +> [!NOTE] +> It is important that the move operation of any dataset in the registration folder is **atomic**! +> Otherwise, data corruption will occur. Ideally the dataset is staged into the user's home folder +> first (e.g. a copy operation, an upload via SFTP or SSH) and then **moved** into the registration +> folder. +> +> Moving operations on the same file system are basically a rename of the file path and +> atomic. -grafik +Within a user's registration directory, the application expect a registration task to be bundled in one +folder, e.g.: -This is how the file content looks like: - -``` -messages.file=messages.txt +```bash +|- myuser/registration // registration folder for user `myuser` + |- my-registration-batch // folder name is irrelevant + |- file1_1.fastq.gz + |- file1_2.fastq.gz + |- file2_1.fastq.gz + |- file2_2.fastq.gz + |- metadata.txt // mandatory! ``` -So how do we access the value of the `messages.file` property in our application with Spring? - -Have a look in the class `AppConfig`, there the magic happens: +The folder ``my-registration-batch`` represents an atomic registration unit and must contain the `metadata.txt` with information +about the measurement ID and the files belonging to this measurement dataset. -```groovy -@Configuration -@PropertySource("application.properties") -class AppConfig { +Following the previous example, the content of a matching `metadata.txt` would look like this: - @Value('${messages.file}') - public String messagesFile +```bash +NGSQTEST001AE-1234512312 file1_1.fastq.gz +NGSQTEST001AE-1234512312 file1_2.fastq.gz +NGSQTEST002BC-3321314441 file2_1.fastq.gz +NGSQTEST002BC-3321314441 file2_2.fastq.gz ``` +Make sure that the columns are `TAB`-separated (`\t`)! -We define the property source, which is the file `application.properties` that is provided in the -resource folder of the app and available to the classpath. We also tell with the -annotation `@Configuration` Spring, hey this is a class that holds app configuration data! +Once a new registration unit is detected, it gets queued for registration and the next step will take over. -With the annotation `@Value('${messages.file}')` we tell Spring, which property's value should be -injected. Here we make use of field injection, other types of injection like method and constructor -injection are also possible. +A registration request gets only submitted once to the registration queue and will subsequently get +ignored by the scanning process, as long as the folder name or modification timestamp does not change. -So how is the concrete implementation of the `MessageService` presented to Spring? We can use -the `@Bean` annotation here, to tell Spring: _hey, this is sth you must load on startup and provide -to the context_. +If the application quits or stops unexpectedly, on re-start they will get detected and resubmitted +again. -```java -@Configuration -@PropertySource("application.properties") -class AppConfig { +### Registration - .... +This process step is preparing the dataset registration for subsequent pre-registration task, to +guarantee a unified structure and processing model, other steps can build on and take actions +accordingly (e.g. +harmonised error handling). - @Bean - MessageService messageService() { - return new CodingPrayersMessageService(messagesFile) - } -``` - -That is all there is, you can now load the bean in your main application code: +Its configuration parameters can be set via environment variables, see +the [registration step config](#registration-step-config) section to learn more. -```java -@SpringBootApplication -class SpringMinimalTemplateApplication { +In the current implementation, the registration step does several things: - static void main(String[] args) { - SpringApplication.run(SpringMinimalTemplateApplication, args) - // load the annotation context - AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(AppConfig.class) - // get the service bean - MessageService service = context.getBean("messageService", MessageService.class) - // collect the message and praise the magic - println service.collectMessage() +1. Validating the registration metadata file +2. Aggregate all measurement files per measurement ID +3. Assign every task (measurement) a unique task ID +4. Provide provenance information -``` +The task id is just a randomly generated UUID-4 to ensure that datasets with the same name do not +get +overwritten during the processing. -### Inversion of Control +The provenance information will be written into the task directory in an own file next to the +dataset +and is of type JSON. -grafik +The final task directory structure looks then like this (task dir name is an example): -You might have already spotted the interface `NewsMedia` and its implementing class `DeveloperNews` -in the app's source code. Here you can see an example for the magic of inversion of control. - -The `NewsMedia` interface is just an abstraction that we will later use, because we don't care about -the actual implementation details. By this, we also do not create any dependencies to concrete -implementation details but on actual behaviour. Concrete implementations can then later be exchanged -without causing any breaking changes in the client code base. - -The interface has only one method: `String getNews()`. Now let's have a closer look into the -class `DeveloperNews` that implements this interface: - -```java -class DeveloperNews implements NewsMedia{ - - private MessageService service +```bash provenance.json + |- 74c5d26f-b756-42c3-b6f4-2b4825670a2d + |- file1_1.fastq.gz + |- file1_2.fastq.gz + |- provenance.json +``` - DeveloperNews(MessageService service) { - this.service = service - } +Here is an example of the provenance file: - @Override - String getNews() { - return service.collectMessage() - } +```json +{ + "origin": "/Users/myuser/registration", + "user": "/Users/myuser", + "measurementId": "QTEST001AE-1234512312", + "history": [ + "/opt/scanner-app/scanner-processing-dir/74c5d26f-b756-42c3-b6f4-2b4825670a2d" + ] } ``` -When you check the constructor signature, you see that this method has only one argument, which is a -reference to an object of type `MessageService`. And when the `getNews()` method is called by the -client, the class delegates this request to the message service. Since we have stored the reference -in a private field, that is super easy, we known how to call the service. +> [!NOTE] +> The following properties can be expected after all process steps have been executed: +> +> `origin`: from which path the dataset has been detected during scanning +> +> `user`: from which user directory the dataset has been picked up +> +> `measurementId`: any valid QBiC measurement ID that has been found in the dataset (this might +> be `null`) in case the evaluation has not been done yet. +> +> `history`: a list of history items, which steps have been performed. The list is ordered by first +> processing steps being at the start and the latest at the end. + +### Processing + +In the current implementation, this process step only does some simple checks, and can be extended to e.g. +perform checksum validation. Feel free to use it as template for subsequent process steps. + +### Evaluation + +Last but not least, this step looks for any present QBiC measurement ID in the dataset name. If none +is given, the registration cannot be executed. + +In this case the process moves the task directory into the user's home error folder. After the user +has +provided a valid QBiC measurement id, they can move the dataset into registration again. + +## Configuration + +### Global settings + +```properties +#------------------------ +# Global settings +#------------------------ +# Directory name that will be used for the manual intervention directory +# Created in the users' home folders +# e.g. /home//error +users.error.directory.name=error +# Directory name that will be used for the detecting dropped datasets +# Needs to be present in the users' home folders +# e.g. /home//registration +users.registration.directory.name=registration +``` -So why is this inversion of control? +Configure the names of the two application directories for error handling and registration. + +> [!NOTE] +> The `registration` folder needs to be present, the application is not creating it automatically, +> no +> prevent accidental dataset overwrite. + +### Scanner step config + +```properties +#-------------------------------------- +# Settings for the data scanning thread +#-------------------------------------- +# Path to the directory that contains all user directories +# e.g. /home in Linux or /Users in macOS +scanner.directory=${SCANNER_DIR:/home} +# The time interval (milliseconds) the scanner thread iterates through the scanner directory +# Value must be an integer > 0 +scanner.interval=1000 +``` -Because the `DeveloperNews` class does not manage the instantiation of a concrete message service. -The configuration happened outside of the class, therefore the DeveloperNews class has no direct -control over the instantiation. If it had, it would look like this: +Sets the applications top level scanning directory and considers every folder in it as an own +user directory. -```java -DeveloperNews(String filePathToMessages) { - this.service = new CodingPrayersMessageService(filePathToMessages) -} -``` +The scanner interval is set to 1 second by default is not yet supposed to be configured via +environment variables (if required, override it with command line arguments). -That doesn't look good, does it? In order to create an instance of a message service, we would need -to know the conrete implementation and its required properties (here it is the file path to -the `messages.txt`). So the `DeveloperNews` class has the control over the message service. +### Registration step config -Instead, we would like to not take care about these details, so we invert the control and inject the -dependency via the constructor. +Sets the number of threads per process, its working directory and the target directory, to where +finished tasks are moved to after successful operation. -Please find more in depth documentation on the -official [Spring website](https://spring.io/projects/spring-framework). +```properties +#---------------- +# Settings for the registration worker threads +#---------------- +registration.threads=2 +registration.working.dir=${WORKING_DIR:} +registration.target.dir=${PROCESSING_DIR:} +``` +### Processing step config +Sets the number of threads per process, its working directory and the target directory, to where +finished tasks are moved to after successful operation. +```properties +#------------------------------------ +# Settings for the 1. processing step +# Proper packaging and provenance data, some simple checks +#------------------------------------ +processing.threads=2 +processing.working.dir=${PROCESSING_DIR} +processing.target.dir=${EVALUATION_DIR} +``` +### Evaluation step config + +Sets the number of threads per process, its working directory and the target directory, to where +finished tasks are moved to after successful operation. + +```properties +#---------------------------------- +# Setting for the 2. processing step: +# Measurement ID evaluation +# --------------------------------- +evaluations.threads=2 +evaluation.working.dir=${EVALUATION_DIR} +# Define one or more target directories here +# Example single target dir: +# evaluation.target.dirs=/my/example/target/dir +# Example multiple target dir: +# evaluation.target.dirs=/my/example/target/dir1,/my/example/target/dir2,/my/example/target/dir3 +evaluation.target.dirs=${OPENBIS_ETL_DIRS} +evaluation.measurement-id.pattern=^(MS|NGS)Q[A-Z0-9]{4}[0-9]{3}[A-Z0-9]{2}-[0-9]* +``` +> [!NOTE] +> You can define multiple target directories for this process! You just have to provide a `,`-separated list +> of target directory paths. The implementation will assign the target directories based on a round-robin draw. diff --git a/img/process-flow.jpg b/img/process-flow.jpg new file mode 100644 index 0000000..6bb527d Binary files /dev/null and b/img/process-flow.jpg differ diff --git a/pom.xml b/pom.xml index 7f819b9..d3173ae 100644 --- a/pom.xml +++ b/pom.xml @@ -1,97 +1,134 @@ - - 4.0.0 - - org.springframework.boot - spring-boot-starter-parent - 2.5.6 - - - life.qbic - spring-minimal-template - 1.0.2 - spring-minimal-template - Demo project for Spring Boot - - 17 - 3.0.8 - - - - org.springframework.boot - spring-boot-starter - - - org.codehaus.groovy - groovy - + + 4.0.0 + + org.springframework.boot + spring-boot-starter-parent + 3.2.4 + + + life.qbic + data-processing + 1.0.0 + data processing + A Java tool that scans file move events and triggers a cascade of dataset + pre-processing + steps, before it moves the dataset finally to an ETL routine. + + + 17 + qbicsoftware + https://sonarcloud.io + + + + org.springframework.boot + spring-boot-starter + - - org.springframework.boot - spring-boot-starter-test - test - + + org.springframework.boot + spring-boot-starter-test + - - org.spockframework - spock-core - 2.0-groovy-3.0 - test - + + org.spockframework + spock-core + - - org.spockframework - spock-spring - 2.0-groovy-3.0 - test - - + + org.spockframework + spock-spring + - - - - true - nexus-releases - QBiC Releases - https://qbic-repo.qbic.uni-tuebingen.de/repository/maven-releases - - - false - nexus-snapshots - QBiC Snapshots - https://qbic-repo.qbic.uni-tuebingen.de/repository/maven-snapshots - - + + + true + nexus-releases + QBiC Releases + https://qbic-repo.qbic.uni-tuebingen.de/repository/maven-releases + + + false + nexus-snapshots + QBiC Snapshots + https://qbic-repo.qbic.uni-tuebingen.de/repository/maven-snapshots + + - - - - org.springframework.boot - spring-boot-maven-plugin - - - org.codehaus.gmavenplus - gmavenplus-plugin - 1.13.0 - - - - addSources - addTestSources - generateStubs - compile - generateTestStubs - compileTests - removeStubs - removeTestStubs - - - - - - + + + + org.springframework.boot + spring-boot-maven-plugin + + + org.codehaus.gmavenplus + gmavenplus-plugin + 1.13.0 + + + + addSources + addTestSources + generateStubs + compile + generateTestStubs + compileTests + removeStubs + removeTestStubs + + + + + + diff --git a/src/main/groovy/.DS_Store b/src/main/groovy/.DS_Store deleted file mode 100644 index f94f028..0000000 Binary files a/src/main/groovy/.DS_Store and /dev/null differ diff --git a/src/main/groovy/life/.DS_Store b/src/main/groovy/life/.DS_Store deleted file mode 100644 index e426088..0000000 Binary files a/src/main/groovy/life/.DS_Store and /dev/null differ diff --git a/src/main/groovy/life/qbic/.DS_Store b/src/main/groovy/life/qbic/.DS_Store deleted file mode 100644 index 73c441a..0000000 Binary files a/src/main/groovy/life/qbic/.DS_Store and /dev/null differ diff --git a/src/main/groovy/life/qbic/springminimaltemplate/AppConfig.groovy b/src/main/groovy/life/qbic/springminimaltemplate/AppConfig.groovy deleted file mode 100644 index 70c3187..0000000 --- a/src/main/groovy/life/qbic/springminimaltemplate/AppConfig.groovy +++ /dev/null @@ -1,32 +0,0 @@ -package life.qbic.springminimaltemplate - -import org.springframework.beans.factory.annotation.Value -import org.springframework.context.annotation.Bean -import org.springframework.context.annotation.Configuration -import org.springframework.context.annotation.PropertySource - -/** - * Spring configuration class - * - *

Reads properties from a properties file and creates beans for the application.

- * - * @since 0.1.0 - */ -@Configuration -@PropertySource("application.properties") -class AppConfig { - - @Value('${messages.file}') - public String messagesFile - - @Bean - MessageService messageService() { - return new CodingPrayersMessageService(messagesFile) - } - - @Bean - NewsMedia newsMedia() { - return new DeveloperNews(messageService()) - } - -} diff --git a/src/main/groovy/life/qbic/springminimaltemplate/CodingPrayersMessageService.groovy b/src/main/groovy/life/qbic/springminimaltemplate/CodingPrayersMessageService.groovy deleted file mode 100644 index 54c29f1..0000000 --- a/src/main/groovy/life/qbic/springminimaltemplate/CodingPrayersMessageService.groovy +++ /dev/null @@ -1,29 +0,0 @@ -package life.qbic.springminimaltemplate - -/** - * Example implementation of a {@link MessageService} - * - * @since 0.1.0 - */ -class CodingPrayersMessageService implements MessageService { - - private List messages - - CodingPrayersMessageService() { - this.messages = new ArrayList<>() - } - - CodingPrayersMessageService(String filePath) { - this.messages = readMessagesFromClassPath(filePath) - } - - @Override - String collectMessage() { - return messages.get(new Random().nextInt(messages.size())) - } - - private List readMessagesFromClassPath(String path) { - URL url = getClass().getClassLoader().getResource(path) - return url.readLines().each { it.trim() }.collect() - } -} diff --git a/src/main/groovy/life/qbic/springminimaltemplate/DeveloperNews.groovy b/src/main/groovy/life/qbic/springminimaltemplate/DeveloperNews.groovy deleted file mode 100644 index 04ed5d1..0000000 --- a/src/main/groovy/life/qbic/springminimaltemplate/DeveloperNews.groovy +++ /dev/null @@ -1,20 +0,0 @@ -package life.qbic.springminimaltemplate - -/** - * An example {@link NewsMedia} implementation for developer news. - * - * @since 0.1.0 - */ -class DeveloperNews implements NewsMedia { - - private MessageService service - - DeveloperNews(MessageService service) { - this.service = service - } - - @Override - String getNews() { - return service.collectMessage() - } -} diff --git a/src/main/groovy/life/qbic/springminimaltemplate/MessageService.groovy b/src/main/groovy/life/qbic/springminimaltemplate/MessageService.groovy deleted file mode 100644 index eaadf3e..0000000 --- a/src/main/groovy/life/qbic/springminimaltemplate/MessageService.groovy +++ /dev/null @@ -1,18 +0,0 @@ -package life.qbic.springminimaltemplate - -/** - * Small toy interface that represents message services - * - *

Message services shall provide access to received messages.

- * - * @since 0.1.0 - */ -interface MessageService { - - /** - * Collects the latest message - * @return the latest message - * @since 0.1.0 - */ - String collectMessage() -} diff --git a/src/main/groovy/life/qbic/springminimaltemplate/NewsMedia.groovy b/src/main/groovy/life/qbic/springminimaltemplate/NewsMedia.groovy deleted file mode 100644 index d335196..0000000 --- a/src/main/groovy/life/qbic/springminimaltemplate/NewsMedia.groovy +++ /dev/null @@ -1,17 +0,0 @@ -package life.qbic.springminimaltemplate - -/** - * Example interface for a news media - * - * @since 0.1.0 - */ -interface NewsMedia { - - /** - * Returns latest news - * @return news stuff! - * @since 0.1.0 - */ - String getNews() - -} \ No newline at end of file diff --git a/src/main/groovy/life/qbic/springminimaltemplate/SpringMinimalTemplateApplication.groovy b/src/main/groovy/life/qbic/springminimaltemplate/SpringMinimalTemplateApplication.groovy deleted file mode 100644 index e9ba274..0000000 --- a/src/main/groovy/life/qbic/springminimaltemplate/SpringMinimalTemplateApplication.groovy +++ /dev/null @@ -1,23 +0,0 @@ -package life.qbic.springminimaltemplate - -import org.springframework.boot.SpringApplication -import org.springframework.boot.autoconfigure.SpringBootApplication -import org.springframework.context.annotation.AnnotationConfigApplicationContext - -@SpringBootApplication -class SpringMinimalTemplateApplication { - - static void main(String[] args) { - SpringApplication.run(SpringMinimalTemplateApplication, args) - - AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(AppConfig.class) - - NewsMedia media = context.getBean("newsMedia", NewsMedia.class) - println "####################### Message of the day ##################" - println media.getNews() - println "##############################################################" - - context.close() - } - -} diff --git a/src/main/java/life/qbic/data/processing/AppConfig.java b/src/main/java/life/qbic/data/processing/AppConfig.java new file mode 100644 index 0000000..c9f6b32 --- /dev/null +++ b/src/main/java/life/qbic/data/processing/AppConfig.java @@ -0,0 +1,93 @@ +package life.qbic.data.processing; + +import java.nio.file.Path; +import java.util.Arrays; +import life.qbic.data.processing.config.EvaluationWorkersConfig; +import life.qbic.data.processing.config.ProcessingWorkersConfig; +import life.qbic.data.processing.config.RegistrationWorkersConfig; +import life.qbic.data.processing.evaluation.EvaluationConfiguration; +import life.qbic.data.processing.processing.ProcessingConfiguration; +import life.qbic.data.processing.registration.RegistrationConfiguration; +import life.qbic.data.processing.scanner.ScannerConfiguration; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.PropertySource; + +/** + * Spring configuration class + * + *

Reads properties from a properties file and creates beans for the application.

+ * + * @since 0.1.0 + */ +@Configuration +@PropertySource("application.properties") +class AppConfig { + + @Bean + ScannerConfiguration scannerConfiguration( + @Value("${scanner.directory}") String scannerDirectory, + @Value("${scanner.interval}") int interval, @Value("${scanner.ignore}") String[] ignore) { + return new ScannerConfiguration(scannerDirectory, interval, ignore); + } + + @Bean + RegistrationWorkersConfig registrationWorkersConfig( + @Value("${registration.threads}") int amountOfWorkers, + @Value("${registration.working.dir}") String workingDirectory, + @Value("${registration.target.dir}") String targetDirectory, + @Value("${registration.metadata.filename}") String metadataFileName) { + return new RegistrationWorkersConfig(amountOfWorkers, workingDirectory, targetDirectory, + metadataFileName); + } + + @Bean + RegistrationConfiguration registrationConfiguration( + RegistrationWorkersConfig registrationWorkersConfig) { + return new RegistrationConfiguration(registrationWorkersConfig.workingDirectory().toString(), + registrationWorkersConfig.targetDirectory().toString(), + registrationWorkersConfig.metadataFileName()); + } + + @Bean + EvaluationWorkersConfig evaluationWorkersConfig( + @Value("${evaluations.threads}") int amountOfWorkers, + @Value("${evaluation.working.dir}") String workingDirectory, + @Value("${evaluation.target.dirs}") String[] targetDirectory, + @Value("${evaluation.measurement-id.pattern}") String measurementIdPattern) { + return new EvaluationWorkersConfig(amountOfWorkers, workingDirectory, + measurementIdPattern, Arrays.stream(targetDirectory).toList()); + } + + @Bean + EvaluationConfiguration evaluationConfiguration(EvaluationWorkersConfig evaluationWorkersConfig, + GlobalConfig globalConfig) { + return new EvaluationConfiguration(evaluationWorkersConfig.workingDirectory().toString(), + evaluationWorkersConfig.targetDirectories(), + evaluationWorkersConfig.measurementIdPattern().toString(), globalConfig); + } + + @Bean + ProcessingWorkersConfig processingWorkersConfig( + @Value("${processing.threads}") int amountOfWorkers, + @Value("${processing.working.dir}") String workingDirectory, + @Value("${processing.target.dir}") String targetDirectory) { + return new ProcessingWorkersConfig(amountOfWorkers, Path.of(workingDirectory), + Path.of(targetDirectory)); + } + + @Bean + ProcessingConfiguration processingConfiguration(ProcessingWorkersConfig processingWorkersConfig) { + return new ProcessingConfiguration(processingWorkersConfig.workingDirectory(), + processingWorkersConfig.targetDirectory()); + } + + @Bean + GlobalConfig globalConfig( + @Value("${users.error.directory.name}") String usersErrorDirectoryName, + @Value("${users.registration.directory.name}") String usersRegistrationDirectoryName) { + return new GlobalConfig(usersErrorDirectoryName, usersRegistrationDirectoryName); + } + +} diff --git a/src/main/java/life/qbic/data/processing/Application.java b/src/main/java/life/qbic/data/processing/Application.java new file mode 100644 index 0000000..0f7dd51 --- /dev/null +++ b/src/main/java/life/qbic/data/processing/Application.java @@ -0,0 +1,83 @@ +package life.qbic.data.processing; + +import java.util.LinkedList; +import java.util.List; +import life.qbic.data.processing.config.EvaluationWorkersConfig; +import life.qbic.data.processing.config.ProcessingWorkersConfig; +import life.qbic.data.processing.config.RegistrationWorkersConfig; +import life.qbic.data.processing.evaluation.EvaluationConfiguration; +import life.qbic.data.processing.evaluation.EvaluationRequest; +import life.qbic.data.processing.processing.ProcessingConfiguration; +import life.qbic.data.processing.processing.ProcessingRequest; +import life.qbic.data.processing.registration.ProcessRegistrationRequest; +import life.qbic.data.processing.registration.RegistrationConfiguration; +import life.qbic.data.processing.scanner.Scanner; +import life.qbic.data.processing.scanner.ScannerConfiguration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.context.annotation.AnnotationConfigApplicationContext; + +@SpringBootApplication +public class Application { + + private static final Logger log = LoggerFactory.getLogger(Application.class); + + public static void main(String[] args) { + SpringApplication.run(Application.class, args); + + AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext( + AppConfig.class); + + ScannerConfiguration scannerConfiguration = context.getBean(ScannerConfiguration.class); + RegistrationWorkersConfig registrationWorkersConfig = context.getBean(RegistrationWorkersConfig.class); + RegistrationConfiguration registrationConfiguration = context.getBean(RegistrationConfiguration.class); + ProcessingWorkersConfig processingWorkersConfig = context.getBean(ProcessingWorkersConfig.class); + ProcessingConfiguration processingConfiguration = context.getBean(ProcessingConfiguration.class); + EvaluationWorkersConfig evaluationWorkersConfig = context.getBean(EvaluationWorkersConfig.class); + EvaluationConfiguration evaluationConfiguration = context.getBean(EvaluationConfiguration.class); + GlobalConfig globalConfig = context.getBean(GlobalConfig.class); + + var requestQueue = new ConcurrentRegistrationQueue(); + var scannerThread = new Scanner(scannerConfiguration, requestQueue, globalConfig); + + log.info("Registering {} registration workers...", registrationWorkersConfig.amountOfWorkers()); + + List registrationWorkers = new LinkedList<>(); + for (int i=0; i processingWorkers = new LinkedList<>(); + for (int i=0; i evaluationWorkers = new LinkedList<>(); + for (int i=0; i + { + log.info("Shutting sequence initiated..."); + scannerThread.interrupt(); + registrationWorkers.forEach(Thread::interrupt); + processingWorkers.forEach(Thread::interrupt); + evaluationWorkers.forEach(Thread::interrupt); + // if every worker thread has shut down successfully, the application can exit with status code 0 + Runtime.getRuntime().halt(0); + }, "Shutdown-thread")); + + } +} diff --git a/src/main/java/life/qbic/data/processing/ConcurrentRegistrationQueue.java b/src/main/java/life/qbic/data/processing/ConcurrentRegistrationQueue.java new file mode 100644 index 0000000..8e9ba3b --- /dev/null +++ b/src/main/java/life/qbic/data/processing/ConcurrentRegistrationQueue.java @@ -0,0 +1,87 @@ +package life.qbic.data.processing; + +import static org.apache.logging.log4j.LogManager.getLogger; + +import java.util.Queue; +import java.util.concurrent.LinkedBlockingQueue; +import life.qbic.data.processing.registration.RegistrationRequest; +import org.apache.logging.log4j.Logger; + +/** + * Concurrent Registration Queue + *

+ * Simple FIFO queue, that allows for backpressure. + * + * @since 1.0.0 + */ +public class ConcurrentRegistrationQueue { + + private static final int DEFAULT_CAPACITY = 10; + private final Queue queue = new LinkedBlockingQueue<>(); + private final int capacity; + private static final Logger log = getLogger(ConcurrentRegistrationQueue.class); + + + public ConcurrentRegistrationQueue() { + this(DEFAULT_CAPACITY); + } + + public ConcurrentRegistrationQueue(int capacity) { + this.capacity = capacity; + } + + /** + * Adds a new {@link RegistrationRequest} to the registration queue. + *

+ * If the queue has reached its maximal capacity, the calling thread is put into the wait state, + * until the queue's load is reduced below its configured maximal capacity. + * + * @param request the request to add to the registration queue + * @since 1.0.0 + */ + public synchronized void add(RegistrationRequest request) { + while (queue.size() >= capacity) { + try { + wait(); + } catch (InterruptedException e) { + log.error("Interrupted while waiting for registration request", e); + Thread.currentThread().interrupt(); + } + } + queue.add(request); + notifyAll(); + } + + /** + * Requests the next {@link RegistrationRequest} in the queue. + *

+ * If the queue is empty, the calling thread is put into the wait state (via + * {@link Object#wait()}) until it gets notified again when a new task is available in the queue. + * + * @return the next registration request available. + * @since 1.0.0 + */ + public synchronized RegistrationRequest poll() { + while (queue.isEmpty()) { + try { + wait(); + } catch (InterruptedException e) { + log.error("Interrupted while waiting for registration request", e); + Thread.currentThread().interrupt(); + } + } + var request = queue.poll(); + if (queue.size() == capacity - 1) { + notifyAll(); + } + return request; + } + + public synchronized boolean hasItems() { + return !queue.isEmpty(); + } + + public int items() { + return queue.size(); + } +} diff --git a/src/main/java/life/qbic/data/processing/ErrorSummary.java b/src/main/java/life/qbic/data/processing/ErrorSummary.java new file mode 100644 index 0000000..f24c609 --- /dev/null +++ b/src/main/java/life/qbic/data/processing/ErrorSummary.java @@ -0,0 +1,40 @@ +package life.qbic.data.processing; + +import java.util.HashMap; +import java.util.Map; + +/** + * Error Summary + * + *

Provides the data submitter with some helpful contextual information about a + * registration failure

+ * + * @since 1.0.0 + */ +public record ErrorSummary(String taskId, String affectedDataset, String reason, String description, + Map contextProperties) { + + public static ErrorSummary create(String taskId, String affectedDataset, String reason, + String description, Map contextProperties) { + return new ErrorSummary(taskId, affectedDataset, reason, description, + new HashMap<>(contextProperties)); + } + + public static ErrorSummary createSimple(String taskId, String affectedDataset, String reason, + String description) { + return new ErrorSummary(taskId, affectedDataset, reason, description, new HashMap<>()); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("Task ID: ").append(taskId).append("\n"); + sb.append("Affected Dataset: ").append(affectedDataset).append("\n"); + sb.append("Reason: ").append(reason).append("\n"); + sb.append("Description: ").append(description).append("\n"); + for (Map.Entry entry : contextProperties.entrySet()) { + sb.append(entry.getKey()).append(": ").append(entry.getValue()).append("\n"); + } + return sb.toString(); + } +} diff --git a/src/main/java/life/qbic/data/processing/GlobalConfig.java b/src/main/java/life/qbic/data/processing/GlobalConfig.java new file mode 100644 index 0000000..9bf90ea --- /dev/null +++ b/src/main/java/life/qbic/data/processing/GlobalConfig.java @@ -0,0 +1,31 @@ +package life.qbic.data.processing; + +import java.nio.file.Path; +import java.nio.file.Paths; + +public class GlobalConfig { + + private final Path usersErrorDirectoryName; + + private final Path usersDirectoryRegistrationName; + + public GlobalConfig(String usersErrorDirectoryName, String usersRegistrationDirectoryName) { + if (usersErrorDirectoryName == null || usersErrorDirectoryName.isBlank()) { + throw new IllegalArgumentException("usersErrorDirectoryName cannot be null or empty"); + } + if (usersRegistrationDirectoryName == null || usersRegistrationDirectoryName.isBlank()) { + throw new IllegalArgumentException("usersRegistrationDirectoryName cannot be null or empty"); + } + this.usersErrorDirectoryName = Paths.get(usersErrorDirectoryName); + this.usersDirectoryRegistrationName = Paths.get(usersRegistrationDirectoryName); + } + + public Path usersErrorDirectory() { + return this.usersErrorDirectoryName; + } + + public Path usersDirectoryRegistration() { + return this.usersDirectoryRegistrationName; + } + +} diff --git a/src/main/java/life/qbic/data/processing/Provenance.java b/src/main/java/life/qbic/data/processing/Provenance.java new file mode 100644 index 0000000..10e6a6c --- /dev/null +++ b/src/main/java/life/qbic/data/processing/Provenance.java @@ -0,0 +1,138 @@ +package life.qbic.data.processing; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.Optional; + +/** + * Provenance Information of Datasets + *

+ * Captures some dataset provenance metadata for pre-processing purposes. + * + * @since 1.0.0 + */ +@JsonIgnoreProperties +public class Provenance { + + public static final String FILE_NAME = "provenance.json"; + + /** + * The path from where the dataset has been picked up originally. + */ + @JsonProperty("origin") + public String originPath; + + /** + * + */ + @JsonProperty("user") + public String userWorkDirectoryPath; + + @JsonProperty("measurementId") + public String qbicMeasurementID; + + @JsonProperty("datasetFiles") + public List datasetFiles; + + /** + * A list of ordered processing folder stops the dataset has traversed and passed successfully. + *

+ * The time of processing is ordered from oldest to latest. + */ + @JsonProperty("history") + public List history; + + public static Provenance parse(Path json) throws ProvenanceException { + File provenanceFile = json.toFile(); + if (!provenanceFile.exists()) { + throw new ProvenanceException("File does not exist: %s".formatted(provenanceFile), + ERROR_CODE.NOT_FOUND); + } + if (!provenanceFile.canRead()) { + throw new ProvenanceException("Cannot read file: %s".formatted(provenanceFile), + ERROR_CODE.PERMISSION_DENIED); + } + ObjectMapper mapper = new ObjectMapper().configure( + DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + ; + Provenance provenance; + try { + provenance = mapper.readValue(Files.readString(json), Provenance.class); + } catch (JsonProcessingException e) { + throw new ProvenanceException("Cannot read content %s".formatted(json), e, + ERROR_CODE.UNKNOWN_CONTENT); + } catch (IOException e) { + throw new ProvenanceException("IO Error: %s".formatted(e.getMessage()), e, + ERROR_CODE.IO_ERROR); + } + return provenance; + } + + public static Optional findProvenance(Path directory) { + return Arrays.stream(Objects.requireNonNull(directory.toFile().listFiles())) + .filter(file -> file.getName().equals(Provenance.FILE_NAME)).findFirst(); + } + + public void addToHistory(String event) { + if (history == null) { + history = new ArrayList<>(); + } + history.add(event); + } + + public enum ERROR_CODE { + PERMISSION_DENIED, + UNKNOWN_CONTENT, + NOT_FOUND, + IO_ERROR + } + + public void addDatasetFiles(Collection datasetFiles) { + Objects.requireNonNull(datasetFiles); + if (this.datasetFiles == null) { + this.datasetFiles = new ArrayList<>(); + } + this.datasetFiles.addAll(datasetFiles); + } + + public void addDatasetFile(String datasetFile) { + addDatasetFiles(Collections.singletonList(datasetFile)); + } + + public Collection datasetFiles() { + return datasetFiles.stream().toList(); + } + + public static class ProvenanceException extends RuntimeException { + + private final ERROR_CODE code; + + public ProvenanceException(String message, Throwable t, ERROR_CODE code) { + super(message, t); + this.code = code; + } + + public ProvenanceException(String message, ERROR_CODE code) { + super(message); + this.code = code; + } + + public ERROR_CODE code() { + return code; + } + + } +} diff --git a/src/main/java/life/qbic/data/processing/config/EvaluationWorkersConfig.java b/src/main/java/life/qbic/data/processing/config/EvaluationWorkersConfig.java new file mode 100644 index 0000000..a675921 --- /dev/null +++ b/src/main/java/life/qbic/data/processing/config/EvaluationWorkersConfig.java @@ -0,0 +1,56 @@ +package life.qbic.data.processing.config; + +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.Collection; +import java.util.regex.Pattern; + +public class EvaluationWorkersConfig { + + private final int threads; + private final Path workingDirectory; + private final Collection targetDirectories; + private final Pattern measurementIdPattern; + + public EvaluationWorkersConfig(int threads, String workingDirectory, String measurementIdPattern, + Collection targetDirectories) { + if (threads < 1) { + throw new IllegalArgumentException( + "Number of evaluation worker threads must be greater than 0"); + } + if (targetDirectories.isEmpty()) { + throw new IllegalArgumentException( + "Target directories cannot be empty, please specify at least one target directory"); + } + this.threads = threads; + this.workingDirectory = Paths.get(workingDirectory); + if (!this.workingDirectory.toFile().exists()) { + throw new IllegalArgumentException("Evaluation worker directory does not exist"); + } + this.targetDirectories = targetDirectories.stream().map(Paths::get).toList(); + this.targetDirectories.stream().filter(path -> !path.toFile().exists()).forEach(path -> { + throw new IllegalArgumentException( + "Evaluation target directory '%s' does not exist".formatted(path)); + }); + if (measurementIdPattern.isBlank()) { + throw new IllegalArgumentException("Measurement id pattern cannot be blank"); + } + this.measurementIdPattern = Pattern.compile(measurementIdPattern); + } + + public int threads() { + return threads; + } + + public Path workingDirectory() { + return workingDirectory; + } + + public Collection targetDirectories() { + return targetDirectories; + } + + public Pattern measurementIdPattern() { + return measurementIdPattern; + } +} diff --git a/src/main/java/life/qbic/data/processing/config/ProcessingWorkersConfig.java b/src/main/java/life/qbic/data/processing/config/ProcessingWorkersConfig.java new file mode 100644 index 0000000..d13a3b3 --- /dev/null +++ b/src/main/java/life/qbic/data/processing/config/ProcessingWorkersConfig.java @@ -0,0 +1,41 @@ +package life.qbic.data.processing.config; + +import java.nio.file.Path; + +public class ProcessingWorkersConfig { + + private final int threads; + + private final Path workingDirectory; + + private final Path targetDirectory; + + public ProcessingWorkersConfig(int threads, Path workingDirectory, Path targetDirectory) { + if (threads < 1) { + throw new IllegalArgumentException("threads must be greater than 0"); + } + this.threads = threads; + + if (!workingDirectory.toFile().exists()) { + throw new IllegalArgumentException("working directory does not exist"); + } + this.workingDirectory = workingDirectory; + + if (!targetDirectory.toFile().exists()) { + throw new IllegalArgumentException("target directory does not exist"); + } + this.targetDirectory = targetDirectory; + } + + public int threads() { + return threads; + } + + public Path workingDirectory() { + return workingDirectory; + } + + public Path targetDirectory() { + return targetDirectory; + } +} diff --git a/src/main/java/life/qbic/data/processing/config/RegistrationWorkersConfig.java b/src/main/java/life/qbic/data/processing/config/RegistrationWorkersConfig.java new file mode 100644 index 0000000..44be342 --- /dev/null +++ b/src/main/java/life/qbic/data/processing/config/RegistrationWorkersConfig.java @@ -0,0 +1,49 @@ +package life.qbic.data.processing.config; + +import java.nio.file.Path; +import java.nio.file.Paths; + +public class RegistrationWorkersConfig { + + private final int amountOfWorkers; + + private final Path workingDirectory; + + private final Path targetDirectory; + + private final String metadataFileName; + + public RegistrationWorkersConfig(int threads, String workingDirectory, String targetDirectory, String metadataFileName) { + if (threads < 1) { + throw new IllegalArgumentException("Number of threads must be greater than 0"); + } + Path directory = Paths.get(workingDirectory); + if (!directory.toFile().exists()) { + throw new IllegalArgumentException("Directory " + directory + " does not exist"); + } + Path targetDirectoryPath = Paths.get(targetDirectory); + if (!targetDirectoryPath.toFile().exists()) { + throw new IllegalArgumentException("Target directory " + targetDirectory + " does not exist"); + } + this.workingDirectory = directory; + this.amountOfWorkers = threads; + this.targetDirectory = targetDirectoryPath; + this.metadataFileName = metadataFileName; + } + + public int amountOfWorkers() { + return amountOfWorkers; + } + + public Path workingDirectory() { + return this.workingDirectory; + } + + public Path targetDirectory() { + return this.targetDirectory; + } + + public String metadataFileName() { + return this.metadataFileName; + } +} diff --git a/src/main/java/life/qbic/data/processing/config/RoundRobinDraw.java b/src/main/java/life/qbic/data/processing/config/RoundRobinDraw.java new file mode 100644 index 0000000..fec7fa7 --- /dev/null +++ b/src/main/java/life/qbic/data/processing/config/RoundRobinDraw.java @@ -0,0 +1,56 @@ +package life.qbic.data.processing.config; + +import java.util.ArrayList; +import java.util.Collection; + +/** + * Round Robin Draw + *

+ * Enables a thread-safe access of items in a given collection based on the round robin method. + * + * @since 1.0.0s + */ +public class RoundRobinDraw { + + private final ArrayList items; + private final int itemsAmount; + private int currentIndex = 0; + + private RoundRobinDraw(Collection items) { + this.items = new ArrayList<>(items); + this.itemsAmount = items.size(); + } + + /** + * Creates an instance of {@link RoundRobinDraw} based on the type {@link T} of the collection provided + * @param items a collection of items the round robin method shall be applied. + * @return an instance of this class + * @throws IllegalArgumentException if an empty collection is provided or the collection is null + * @since 1.0.0 + */ + public static RoundRobinDraw create(Collection items) throws IllegalArgumentException { + if (items == null || items.isEmpty()) { + throw new IllegalArgumentException("Collection must not be null or empty"); + } + return new RoundRobinDraw<>(items); + } + + /** + * Returns the next element {@link T} of a {@link RoundRobinDraw } instance. + *

+ * If the last item of the instance has been already been provided, it will start again from the + * first item. + * + * @return an object of type {@link T}. + * @since 1.0.0 + */ + public synchronized T next() { + if (currentIndex == itemsAmount) { + currentIndex = 0; + } + T value = items.get(currentIndex); + currentIndex++; + return value; + } + +} diff --git a/src/main/java/life/qbic/data/processing/evaluation/EvaluationConfiguration.java b/src/main/java/life/qbic/data/processing/evaluation/EvaluationConfiguration.java new file mode 100644 index 0000000..1ab8c58 --- /dev/null +++ b/src/main/java/life/qbic/data/processing/evaluation/EvaluationConfiguration.java @@ -0,0 +1,61 @@ +package life.qbic.data.processing.evaluation; + +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.Collection; +import java.util.regex.Pattern; +import life.qbic.data.processing.GlobalConfig; +import life.qbic.data.processing.config.RoundRobinDraw; + +/** + * Evaluation Configuration + *

+ * The configuration class for {@link EvaluationRequest} workers. + * + * @since 1.0.0 + */ +public class EvaluationConfiguration { + + private final Path workingDirectory; + private final Collection targetDirectories; + private final Pattern measurementIdPattern; + private final Path usersErrorDirectory; + private final RoundRobinDraw targetDirectoriesRoundRobinDraw; + + public EvaluationConfiguration(String workingDirectory, Collection targetDirectories, + String measurementIdPattern, + GlobalConfig globalConfig) { + this.workingDirectory = Paths.get(workingDirectory); + if (!this.workingDirectory.toFile().exists()) { + throw new IllegalArgumentException("Evaluation worker directory does not exist"); + } + this.targetDirectories = targetDirectories.stream().toList(); + this.targetDirectories.stream().filter(path -> !path.toFile().exists()).forEach(path -> { + throw new IllegalArgumentException( + "Evaluation target directory '%s' does not exist".formatted(path)); + }); + this.targetDirectoriesRoundRobinDraw = RoundRobinDraw.create(targetDirectories); + if (measurementIdPattern.isBlank()) { + throw new IllegalArgumentException("Measurement id pattern cannot be blank"); + } + this.usersErrorDirectory = globalConfig.usersErrorDirectory(); + this.measurementIdPattern = Pattern.compile(measurementIdPattern); + } + + public Path workingDirectory() { + return workingDirectory; + } + + public RoundRobinDraw targetDirectories() { + return targetDirectoriesRoundRobinDraw; + } + + public Pattern measurementIdPattern() { + return measurementIdPattern; + } + + public Path usersErrorDirectory() { + return usersErrorDirectory; + } + +} diff --git a/src/main/java/life/qbic/data/processing/evaluation/EvaluationRequest.java b/src/main/java/life/qbic/data/processing/evaluation/EvaluationRequest.java new file mode 100644 index 0000000..2753d8d --- /dev/null +++ b/src/main/java/life/qbic/data/processing/evaluation/EvaluationRequest.java @@ -0,0 +1,252 @@ +package life.qbic.data.processing.evaluation; + +import static org.apache.logging.log4j.LogManager.getLogger; + +import com.fasterxml.jackson.databind.ObjectMapper; +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.ReentrantLock; +import java.util.regex.MatchResult; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; +import life.qbic.data.processing.ErrorSummary; +import life.qbic.data.processing.Provenance; +import life.qbic.data.processing.Provenance.ProvenanceException; +import life.qbic.data.processing.config.RoundRobinDraw; +import org.apache.logging.log4j.Logger; + +/** + * Evaluation Request - Last process + * + *

Validates the presence of a QBiC measurement ID in the dataset root + * folder.

If a valid measurement ID is found, the process updates the provenance file with the + * ID and moves the dataset to the openBIS ETL. After successful transfer, an openBIS marker-file is + * created, to integrate the dataset registration with openBIS ETL. + *

+ * If none is present, or the identifier does not match the requirements, it is moved back to the + * users error folder. + * + * @since 1.0.0 + */ +public class EvaluationRequest extends Thread { + + private static final String THREAD_NAME = "Evaluation-%s"; + private static final String INTERVENTION_DIRECTORY = "interventions"; + private static final Logger LOG = getLogger(EvaluationRequest.class); + private static final Set ACTIVE_TASKS = new HashSet<>(); + private static final ReentrantLock LOCK = new ReentrantLock(); + private static int threadNumber = 1; + private final Path interventionDirectory; + private final AtomicBoolean active = new AtomicBoolean(false); + private final AtomicBoolean terminated = new AtomicBoolean(false); + private final Path workingDirectory; + private final Pattern measurementIdPattern; + private final Path usersErrorDirectory; + private final RoundRobinDraw targetDirectories; + private Path assignedTargetDirectory; + + public EvaluationRequest(Path workingDirectory, RoundRobinDraw targetDirectories, + Pattern measurementIdPattern, Path usersErrorDirectory) { + this.setName(THREAD_NAME.formatted(nextThreadNumber())); + this.workingDirectory = workingDirectory; + this.targetDirectories = targetDirectories; + this.measurementIdPattern = measurementIdPattern; + if (!workingDirectory.resolve(INTERVENTION_DIRECTORY).toFile().mkdir() + && !workingDirectory.resolve( + INTERVENTION_DIRECTORY).toFile().exists()) { + throw new RuntimeException( + "Could not create intervention directory for processing request at " + workingDirectory); + } + this.usersErrorDirectory = usersErrorDirectory; + this.interventionDirectory = workingDirectory.resolve(INTERVENTION_DIRECTORY); + } + + public EvaluationRequest(EvaluationConfiguration evaluationConfiguration) { + this(evaluationConfiguration.workingDirectory(), evaluationConfiguration.targetDirectories(), + evaluationConfiguration.measurementIdPattern(), + evaluationConfiguration.usersErrorDirectory()); + } + + private static int nextThreadNumber() { + return threadNumber++; + } + + private static boolean push(String taskId) { + LOCK.lock(); + boolean notActiveYet; + try { + notActiveYet = ACTIVE_TASKS.add(taskId); + } finally { + LOCK.unlock(); + } + return notActiveYet; + } + + @Override + public void run() { + while (true) { + active.set(true); + for (File taskDir : tasks()) { + if (push(taskDir.getAbsolutePath()) && taskDir.exists()) { + assignedTargetDirectory = getAssignedTargetDir(); + evaluateDirectory(taskDir); + removeTask(taskDir); + } + } + active.set(false); + if (terminated.get()) { + LOG.warn("Thread {} terminated", Thread.currentThread().getName()); + break; + } + try { + Thread.sleep(100); + } catch (InterruptedException e) { + // We don't want to interrupt the thread here, only explicit to enable graceful shutdown + // via its interrupt() method + } + } + } + + private Path getAssignedTargetDir() { + return targetDirectories.next(); + } + + private void removeTask(File taskDir) { + LOCK.lock(); + try { + ACTIVE_TASKS.remove(taskDir.getAbsolutePath()); + } finally { + LOCK.unlock(); + } + } + + private void evaluateDirectory(File taskDir) { + var provenanceSearch = Provenance.findProvenance(taskDir.toPath()); + if (provenanceSearch.isEmpty()) { + LOG.error("No provenance file found: {}", taskDir.getAbsolutePath()); + moveToSystemIntervention(taskDir, "Provenance file was not found"); + return; + } + + Provenance provenance = null; + try { + provenance = Provenance.parse(provenanceSearch.get().toPath()); + } catch (ProvenanceException e) { + LOG.error("Could not parse provenance file: {}}", taskDir.getAbsolutePath(), e); + moveToSystemIntervention(taskDir, e.getMessage()); + return; + } + + var measurementIdResult = provenance.qbicMeasurementID == null || provenance.qbicMeasurementID.isBlank() ? Optional.empty() : Optional.of(provenance.qbicMeasurementID); + if (measurementIdResult.isPresent()) { + provenance.addToHistory(taskDir.getAbsolutePath()); + try { + updateProvenanceFile(provenanceSearch.get(), provenance); + } catch (IOException e) { + LOG.error("Could not update provenance file: {}", taskDir.getAbsolutePath(), e); + moveToSystemIntervention(taskDir, e.getMessage()); + } + moveToTargetDir(taskDir); + try { + createMarkerFile(assignedTargetDirectory, taskDir.getName()); + } catch (IOException e) { + LOG.error("Could not create marker file: {}", taskDir.getAbsolutePath(), e); + moveToSystemIntervention(taskDir, e.getMessage()); + } + return; + } + var errorMessage = ErrorSummary.createSimple(taskDir.getName(), + String.join(", ", provenance.datasetFiles), + "Missing QBiC measurement ID", + "For a successful registration please provide the pre-registered QBiC measurement ID"); + LOG.error( + "Missing measurement identifier: no known measurement id was found in the content of directory '{}' in task '{}'", + String.join(", ", provenance.datasetFiles), taskDir.getName()); + moveBackToOrigin(taskDir, provenance, errorMessage.toString()); + } + + private void updateProvenanceFile(File provenanceFile, Provenance provenance) throws IOException { + var mapper = new ObjectMapper(); + mapper.writerWithDefaultPrettyPrinter().writeValue(provenanceFile, provenance); + } + + private boolean createMarkerFile(Path targetDirectory, String name) throws IOException { + Path markerFileName = Paths.get(".MARKER_is_finished_" + name); + return targetDirectory.resolve(markerFileName).toFile().createNewFile(); + } + + private Optional findDataset(File taskDir) { + return Arrays.stream(taskDir.listFiles()).filter(File::isDirectory).findFirst(); + } + + private void moveToSystemIntervention(File taskDir, String reason) { + try { + var errorFile = taskDir.toPath().resolve("error.txt").toFile(); + errorFile.createNewFile(); + Files.writeString(errorFile.toPath(), reason); + Files.move(taskDir.toPath(), interventionDirectory.resolve(taskDir.getName())); + } catch (IOException e) { + throw new RuntimeException("Cannot move task to intervention: %s".formatted(taskDir), e); + } + } + + private void moveBackToOrigin(File taskDir, Provenance provenance, String reason) { + LOG.info("Moving back to original user directory: {}", + Paths.get(provenance.userWorkDirectoryPath).resolve(usersErrorDirectory)); + try { + var errorFile = taskDir.toPath().resolve("error.txt").toFile(); + errorFile.createNewFile(); + Files.writeString(errorFile.toPath(), reason); + Paths.get(provenance.userWorkDirectoryPath).resolve(usersErrorDirectory).toFile().mkdir(); + Files.move(taskDir.toPath(), + Paths.get(provenance.userWorkDirectoryPath).resolve(usersErrorDirectory) + .resolve(taskDir.getName())); + } catch (IOException e) { + LOG.error("Cannot move task to user intervention: %s".formatted(Paths.get(provenance.userWorkDirectoryPath).resolve(usersErrorDirectory)), e); + moveToSystemIntervention(taskDir, e.getMessage()); + } + } + + private void moveToTargetDir(File taskDir) { + LOG.info( + "Moving %s to target directory %s".formatted(taskDir.getAbsolutePath(), assignedTargetDirectory)); + try { + Files.move(taskDir.toPath(), assignedTargetDirectory.resolve(taskDir.getName())); + } catch (IOException e) { + LOG.error("Cannot move task to target directory: %s".formatted(targetDirectories), e); + moveToSystemIntervention(taskDir, + "Cannot move task to target directory: %s".formatted(targetDirectories)); + } + + } + + private List tasks() { + return Arrays.stream(workingDirectory.toFile().listFiles()).filter(File::isDirectory) + .filter(file -> !file.getName().equals(INTERVENTION_DIRECTORY)).toList(); + } + + public void interrupt() { + terminated.set(true); + while (active.get()) { + LOG.debug("Thread is still active..."); + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + // we don't want to interrupt the worker thread before its task is done, since it might + // render the application in a non-recoverable state + } + } + LOG.debug("Task has been finished"); + } + +} diff --git a/src/main/java/life/qbic/data/processing/processing/ProcessingConfiguration.java b/src/main/java/life/qbic/data/processing/processing/ProcessingConfiguration.java new file mode 100644 index 0000000..6084645 --- /dev/null +++ b/src/main/java/life/qbic/data/processing/processing/ProcessingConfiguration.java @@ -0,0 +1,38 @@ +package life.qbic.data.processing.processing; + +import java.nio.file.Path; + +/** + * Processing Configuration + * + *

Holds processing worker configuration settings, such as the working directory of the process + * and the next target directory the dataset will be moved to, after a successful task + * performance.

+ * + * @since 1.0.0 + */ +public class ProcessingConfiguration { + + private final Path workingDirectory; + + private final Path targetDirectory; + + public ProcessingConfiguration(Path workingDirectory, Path targetDirectory) { + this.workingDirectory = workingDirectory; + if (!workingDirectory.toFile().exists()) { + throw new IllegalArgumentException("Working directory does not exist: " + workingDirectory); + } + this.targetDirectory = targetDirectory; + if (!targetDirectory.toFile().exists()) { + throw new IllegalArgumentException("Target directory does not exist: " + targetDirectory); + } + } + + public Path getWorkingDirectory() { + return workingDirectory; + } + + public Path getTargetDirectory() { + return targetDirectory; + } +} diff --git a/src/main/java/life/qbic/data/processing/processing/ProcessingRequest.java b/src/main/java/life/qbic/data/processing/processing/ProcessingRequest.java new file mode 100644 index 0000000..fcfff17 --- /dev/null +++ b/src/main/java/life/qbic/data/processing/processing/ProcessingRequest.java @@ -0,0 +1,220 @@ +package life.qbic.data.processing.processing; + +import static org.apache.logging.log4j.LogManager.getLogger; + +import com.fasterxml.jackson.databind.ObjectMapper; +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.ReentrantLock; +import life.qbic.data.processing.Provenance; +import life.qbic.data.processing.Provenance.ProvenanceException; +import org.apache.logging.log4j.Logger; + +/** + * + * + *

Does some simple checks: + * + *

    + *
  • the content is not empty
  • + *
  • there is a dataset and a provenance file
  • + *
  • the provenance file can be parsed and the content passes the sanity check
  • + *
  • package a dataset properly, if it is a file
  • + *
+ * + * @since 1.0.0 + */ +public class ProcessingRequest extends Thread { + + private static final Logger LOG = getLogger(ProcessingRequest.class); + private static final String THREAD_NAME = "Processing-%s"; + private static final Set ACTIVE_TASKS = new HashSet<>(); + private static final ReentrantLock LOCK = new ReentrantLock(); + private static final String INTERVENTION_DIRECTORY = "interventions"; + private static int threadNumber = 1; + private final Path workingDirectory; + private final Path targetDirectory; + private final AtomicBoolean active = new AtomicBoolean(false); + private final AtomicBoolean terminated = new AtomicBoolean(false); + private final Path interventionDirectory; + + public ProcessingRequest(ProcessingConfiguration processingConfiguration) { + this.setName(THREAD_NAME.formatted(nextThreadNumber())); + this.workingDirectory = processingConfiguration.getWorkingDirectory(); + this.targetDirectory = processingConfiguration.getTargetDirectory(); + if (!workingDirectory.resolve(INTERVENTION_DIRECTORY).toFile().mkdir() + && !workingDirectory.resolve( + INTERVENTION_DIRECTORY).toFile().exists()) { + throw new RuntimeException( + "Could not create intervention directory for processing request at " + workingDirectory); + } + this.interventionDirectory = workingDirectory.resolve(INTERVENTION_DIRECTORY); + } + + private static int nextThreadNumber() { + return threadNumber++; + } + + private boolean push(String taskId) { + LOCK.lock(); + boolean notActiveYet; + try { + notActiveYet = ACTIVE_TASKS.add(taskId); + } finally { + LOCK.unlock(); + } + return notActiveYet; + } + + @Override + public void run() { + while (true) { + active.set(true); + for (File taskDir : tasks()) { + if (push(taskDir.getAbsolutePath()) && taskDir.exists()) { + LOG.info("Registering task " + taskDir.getAbsolutePath()); + processFile(taskDir); + clearTask(taskDir); + } else { + clearTask(taskDir); + } + } + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + // This thread will only handle interrupted signals via its explicit + // API method call + } + active.set(false); + if (terminated.get()) { + LOG.warn("Thread {} terminated", Thread.currentThread().getName()); + break; + } + } + } + + /** + * @param taskDir + * @since + */ + private void processFile(File taskDir) { + var taskDirContent = Arrays.stream(Objects.requireNonNull(taskDir.listFiles(), "Task dir must not be null: " + taskDir)).toList(); + + if (checkForEmpty(taskDir, taskDirContent)) { + return; + } + + Optional provenanceFileSearch = findProvenanceFile(taskDirContent); + if (provenanceFileSearch.isEmpty()) { + LOG.error("Task {} has no provenance file", taskDir.getAbsolutePath()); + moveToSystemIntervention(taskDir, "No provenance file provided"); + return; + } + + Provenance provenance = null; + try { + provenance = Provenance.parse(provenanceFileSearch.get().toPath()); + } catch (ProvenanceException e) { + LOG.error("Error parsing provenance file", e); + switch (e.code()) { + case IO_ERROR, NOT_FOUND, UNKNOWN_CONTENT, PERMISSION_DENIED -> + moveToSystemIntervention(taskDir, e.getMessage()); + } + return; + } + + Provenance finalProvenance = provenance; + taskDirContent.stream().filter(file -> !file.getName().equals(Provenance.FILE_NAME)).findFirst() + .ifPresent(file -> { + finalProvenance.addToHistory(taskDir.getAbsolutePath()); + try { + writeProvenance(provenanceFileSearch.get(), finalProvenance); + } catch (IOException e) { + LOG.error("Could not write provenance file {}", file.getAbsolutePath(), e); + moveToSystemIntervention(taskDir, "Writing provenance file failed"); + } + try { + moveToTargetFolder(taskDir); + } catch (IOException e) { + LOG.error("Could not move task {} to target location", file.getAbsolutePath(), + e); + moveToSystemIntervention(taskDir, "Writing task directory failed"); + } + }); + } + + private Optional findProvenanceFile(List taskDirContent) { + Optional provenanceFileSearch = taskDirContent.stream() + .filter(file -> file.getName().equals(Provenance.FILE_NAME)).findFirst(); + return provenanceFileSearch; + } + + private boolean checkForEmpty(File taskDir, List taskDirContent) { + if (taskDirContent.isEmpty()) { + LOG.error("Task {} has no files", taskDir.getAbsolutePath()); + clearTask(taskDir); + taskDir.delete(); + LOG.info("Empty task {} deleted", taskDir.getAbsolutePath()); + return true; + } + return false; + } + + private void moveToTargetFolder(File taskDir) throws IOException { + LOG.info("Moving task {} to target folder", taskDir.getAbsolutePath()); + Files.move(taskDir.toPath(), targetDirectory.resolve(taskDir.getName())); + } + + private void writeProvenance(File provenanceFile, Provenance provenance) throws IOException { + var mapper = new ObjectMapper(); + mapper.writerWithDefaultPrettyPrinter().writeValue(provenanceFile, provenance); + } + + private void moveToSystemIntervention(File taskDir, String reason) { + try { + var errorFile = taskDir.toPath().resolve("error.txt").toFile(); + errorFile.createNewFile(); + Files.writeString(errorFile.toPath(), reason); + Files.move(taskDir.toPath(), interventionDirectory.resolve(taskDir.getName())); + } catch (IOException e) { + throw new RuntimeException("Cannot move task to intervention: %s".formatted(taskDir), e); + } + } + + private void clearTask(File taskDir) { + LOCK.lock(); + try { + ACTIVE_TASKS.remove(taskDir.getAbsolutePath()); + } finally { + LOCK.unlock(); + } + } + + private List tasks() { + return Arrays.stream(workingDirectory.toFile().listFiles()).filter(File::isDirectory) + .filter(file -> !file.getName().equals(INTERVENTION_DIRECTORY)).toList(); + } + + public void interrupt() { + terminated.set(true); + while (active.get()) { + LOG.debug("Thread is still active..."); + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + // we don't want to interrupt the worker thread before its task is done, since it might + // render the application in a non-recoverable state + } + } + LOG.debug("Task has been finished"); + } +} diff --git a/src/main/java/life/qbic/data/processing/registration/ErrorCode.java b/src/main/java/life/qbic/data/processing/registration/ErrorCode.java new file mode 100644 index 0000000..cbf7726 --- /dev/null +++ b/src/main/java/life/qbic/data/processing/registration/ErrorCode.java @@ -0,0 +1,13 @@ +package life.qbic.data.processing.registration; + +/** + * + * + *

+ * + * @since + */ +public enum ErrorCode { + METADATA_FILE_NOT_FOUND, + INCOMPLETE_METADATA, FILE_NOT_FOUND, MISSING_FILE_ENTRY, IO_EXCEPTION +} diff --git a/src/main/java/life/qbic/data/processing/registration/ProcessRegistrationRequest.java b/src/main/java/life/qbic/data/processing/registration/ProcessRegistrationRequest.java new file mode 100644 index 0000000..8a064b6 --- /dev/null +++ b/src/main/java/life/qbic/data/processing/registration/ProcessRegistrationRequest.java @@ -0,0 +1,261 @@ +package life.qbic.data.processing.registration; + +import static org.apache.logging.log4j.LogManager.getLogger; + +import com.fasterxml.jackson.databind.ObjectMapper; +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; +import life.qbic.data.processing.ConcurrentRegistrationQueue; +import life.qbic.data.processing.GlobalConfig; +import life.qbic.data.processing.Provenance; +import org.apache.logging.log4j.Logger; +import org.springframework.lang.NonNull; + +/** + * Process Registration Request + *

+ * This must be the first process of handling a new incoming dataset. It will consume a processing + * request item, that is then used to prepare the dataset for the following downstream processes. + *

+ * The process polls the {@link life.qbic.data.processing.ConcurrentRegistrationQueue} shared with + * the scanning thread. + * + *

+ * The process will do the following tasks: + *

    + *
  • Wraps a new dataset into a task directory with a random UUID as task name
  • + *
  • Creating a provenance JSON file, that is used on downstream processes and holds required provenance data
  • + *
  • Moving the dataset to the next processing directory
  • + *
+ * + * @since 1.0.0 + */ +public class ProcessRegistrationRequest extends Thread { + + private static final Logger log = getLogger(ProcessRegistrationRequest.class); + private static final String threadName = "Registration-%s"; + private static int threadNumber = 1; + private final ConcurrentRegistrationQueue registrationQueue; + private final Path workingDirectory; + private final Path targetDirectory; + private final String metadataFileName; + private final Path userErrorDirectory; + private AtomicBoolean active = new AtomicBoolean(false); + + public ProcessRegistrationRequest(@NonNull ConcurrentRegistrationQueue registrationQueue, + @NonNull RegistrationConfiguration configuration, @NonNull GlobalConfig globalConfig) { + this.setName(threadName.formatted(nextThreadNumber())); + this.registrationQueue = registrationQueue; + this.workingDirectory = configuration.workingDirectory(); + this.targetDirectory = configuration.targetDirectory(); + this.metadataFileName = configuration.metadataFileName(); + this.userErrorDirectory = globalConfig.usersErrorDirectory(); + } + + private static int nextThreadNumber() { + return threadNumber++; + } + + private static void cleanup(Path workingTargetDir) throws IOException { + try (var content = Files.walk(workingTargetDir)) { + content.map(Path::toFile).sorted(Comparator.reverseOrder()).forEach(File::delete); + } + } + + private static void processMeasurement(String measurementId, + Map> aggregatedFilesByMeasurementId, Path workingTargetDir, + Path taskDir) throws IOException { + for (RegistrationMetadata metadataEntry : aggregatedFilesByMeasurementId.get( + measurementId)) { + Files.move(workingTargetDir.resolve(metadataEntry.file()), + taskDir.resolve(metadataEntry.file())); + } + } + + private void moveBackToOrigin(Path target, Path usersHomePath, String reason) { + log.info("Moving back to original user directory: {}", usersHomePath); + try { + Path taskDir = createTaskDirectory(); + Files.move(target, taskDir.resolve(target.getFileName())); + var errorFile = taskDir.resolve("error.txt").toFile(); + errorFile.createNewFile(); + Files.writeString(errorFile.toPath(), reason); + usersHomePath.resolve(userErrorDirectory).toFile().mkdir(); + Files.move(taskDir, + usersHomePath.resolve(userErrorDirectory) + .resolve(taskDir.toFile().getName())); + } catch (IOException e) { + log.error("Cannot move task to user intervention: %s".formatted( + usersHomePath.resolve(userErrorDirectory)), e); + } + } + + private void validateFileEntries(Collection metadata, Path request) + throws ValidationException { + for (RegistrationMetadata metadataEntry : metadata) { + if (!request.resolve(Paths.get(metadataEntry.file())).toFile().exists()) { + throw new ValidationException( + "Unknown file reference in metadata: %s".formatted(metadataEntry.file()), + ErrorCode.FILE_NOT_FOUND); + } + } + // To save the user from any trouble, let's also check if all dataset files are described by a metadata entry + var filesInMetadata = metadata.stream().map(RegistrationMetadata::file).toList(); + var physicalFiles = request.toFile().listFiles(); + for (File file : physicalFiles) { + // we ignore hidden files + if (file.isHidden()) { + continue; + } + if (file.getName().endsWith(metadataFileName)) { + continue; + } + if (!filesInMetadata.contains(file.getName())) { + throw new ValidationException( + "Found more files than described in the metadata file: %s".formatted(file.getName()), + ErrorCode.MISSING_FILE_ENTRY); + } + } + } + + private List findAndParseMetadata(Path request) throws ValidationException { + Optional metadataFile = findMetadataFile(request); + if (metadataFile.isEmpty()) { + throw new ValidationException("Metadata file does not exist", + ErrorCode.METADATA_FILE_NOT_FOUND); + } + + List content; + try { + content = Files.readAllLines(Paths.get(metadataFile.get().getPath())).stream() + .filter(row -> !row.isBlank()).toList(); + } catch (IOException e) { + log.error("Error reading metadata file", e); + throw new ValidationException("Cannot read metadata file", ErrorCode.IO_EXCEPTION); + } + + return content.stream().map(this::parseMetadataRow).toList(); + } + + private RegistrationMetadata parseMetadataRow(String value) throws ValidationException { + try { + var splitValues = value.split("\t"); + return new RegistrationMetadata(splitValues[0], splitValues[1]); + } catch (IndexOutOfBoundsException e) { + log.error("Error parsing metadata row: %s".formatted(value), e); + throw new ValidationException("Cannot parse metadata entry", ErrorCode.INCOMPLETE_METADATA); + } + } + + private Optional findMetadataFile(Path path) { + for (File file : Objects.requireNonNull(path.toFile().listFiles())) { + if (file.isFile() && file.getName().endsWith(metadataFileName)) { + return Optional.of(file); + } + } + return Optional.empty(); + } + + private void writeProvenanceInformation(Path taskDir, Path newLocation, + RegistrationRequest request, String measurementId, + List datasetFiles) + throws IOException { + Provenance provenance = new Provenance(); + provenance.originPath = request.origin().toString(); + provenance.history = new ArrayList<>(); + provenance.history.add(newLocation.toString()); + provenance.userWorkDirectoryPath = String.valueOf(request.userPath()); + provenance.qbicMeasurementID = measurementId; + provenance.addDatasetFiles(datasetFiles); + ObjectMapper mapper = new ObjectMapper(); + mapper.writerWithDefaultPrettyPrinter() + .writeValue(taskDir.resolve("provenance.json").toFile(), provenance); + } + + private Path createTaskDirectory() { + UUID taskId = UUID.randomUUID(); + var taskDir = workingDirectory.resolve(taskId.toString()); + taskDir.toFile().mkdirs(); + return workingDirectory.resolve(taskId.toString()); + } + + public void interrupt() { + while (active.get()) { + log.debug("Thread is still active..."); + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + // we don't want to interrupt the worker thread before its task is done, since it might + // render the application in a non-recoverable state + } + } + log.debug("Task has been finished"); + } + + @Override + public void run() { + while (true) { + var request = registrationQueue.poll(); + active.set(true); + log.info("Processing request: {}", request); + var intermediateTaskDir = createTaskDirectory(); + try { + // Let's first move the registration request content to the working directory of the process + + Files.move(request.target(), intermediateTaskDir.resolve(request.target().getFileName())); + var workingTargetDir = intermediateTaskDir.resolve(request.target().getFileName()); + + var registrationMetadata = findAndParseMetadata(workingTargetDir); + validateFileEntries(registrationMetadata, workingTargetDir); + + var aggregatedFilesByMeasurementId = registrationMetadata.stream().collect( + Collectors.groupingBy(RegistrationMetadata::measurementId)); + + processAll(aggregatedFilesByMeasurementId, workingTargetDir, request); + + // Finally clean up the task directory, which should only contain the original metadata file + cleanup(workingTargetDir); + } catch (ValidationException e) { + log.error("Failed validation processing request: %s".formatted(request), e); + moveBackToOrigin(intermediateTaskDir, request.userPath(), e.getMessage()); + } catch (RuntimeException e) { + log.error("Error moving task directory", e); + // TODO move back to user folder + } catch (IOException e) { + log.error("Error while processing registration request", e); + // TODO move back to user folder + } finally { + active.set(false); + log.info("Processing completed: {}", request); + } + } + } + + private void processAll(Map> aggregatedFilesByMeasurementId, + Path workingTargetDir, RegistrationRequest request) throws IOException { + for (String measurementId : aggregatedFilesByMeasurementId.keySet()) { + // We now create individual task directories for every measurement dataset + Path taskDir = createTaskDirectory(); + processMeasurement(measurementId, aggregatedFilesByMeasurementId, workingTargetDir, taskDir); + + writeProvenanceInformation(taskDir, targetDirectory, request, measurementId, + aggregatedFilesByMeasurementId.get(measurementId).stream() + .map(RegistrationMetadata::file).toList()); + Files.move(taskDir, targetDirectory.resolve(taskDir.getFileName())); + } + } + +} diff --git a/src/main/java/life/qbic/data/processing/registration/RegistrationConfiguration.java b/src/main/java/life/qbic/data/processing/registration/RegistrationConfiguration.java new file mode 100644 index 0000000..67c8693 --- /dev/null +++ b/src/main/java/life/qbic/data/processing/registration/RegistrationConfiguration.java @@ -0,0 +1,44 @@ +package life.qbic.data.processing.registration; + +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.Objects; + +public class RegistrationConfiguration { + + + private final Path workingDirectory; + private final Path targetDirectory; + private final String metadataFileName; + + public RegistrationConfiguration(String workingDirectory, String targetDirectory, String metadataFileName) { + this.workingDirectory = Paths.get(Objects.requireNonNull(workingDirectory, "workingDirectory must not be null")); + if (!workingDirectory().toFile().exists()) { + throw new IllegalArgumentException(targetDirectory + " does not exist"); + } + if (!workingDirectory().toFile().isDirectory()) { + throw new IllegalArgumentException(targetDirectory + " is not a directory"); + } + this.targetDirectory = Paths.get(Objects.requireNonNull(targetDirectory, "targetDirectories must not be null")); + if (!targetDirectory().toFile().exists()) { + throw new IllegalArgumentException(targetDirectory + " does not exist"); + } + if (!targetDirectory().toFile().isDirectory()) { + throw new IllegalArgumentException(targetDirectory + " is not a directory"); + } + if (metadataFileName == null || metadataFileName.isEmpty()) { + throw new IllegalArgumentException("metadataFileName must not be null or empty"); + } + this.metadataFileName = metadataFileName; + } + + public Path workingDirectory() { + return workingDirectory; + } + + public Path targetDirectory() { + return targetDirectory; + } + + public String metadataFileName() { return metadataFileName; } +} diff --git a/src/main/java/life/qbic/data/processing/registration/RegistrationMetadata.java b/src/main/java/life/qbic/data/processing/registration/RegistrationMetadata.java new file mode 100644 index 0000000..dfc6cdd --- /dev/null +++ b/src/main/java/life/qbic/data/processing/registration/RegistrationMetadata.java @@ -0,0 +1,5 @@ +package life.qbic.data.processing.registration; + +public record RegistrationMetadata(String measurementId, String file) { + +} diff --git a/src/main/java/life/qbic/data/processing/registration/RegistrationRequest.java b/src/main/java/life/qbic/data/processing/registration/RegistrationRequest.java new file mode 100644 index 0000000..3c586d0 --- /dev/null +++ b/src/main/java/life/qbic/data/processing/registration/RegistrationRequest.java @@ -0,0 +1,26 @@ +package life.qbic.data.processing.registration; + +import java.nio.file.Path; +import java.time.Instant; +import java.util.Objects; + +public record RegistrationRequest(Instant timestamp, long lastModified, Path origin, Path target, Path userPath) { + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + RegistrationRequest that = (RegistrationRequest) o; + return Objects.equals(origin, that.origin) && Objects.equals(target, + that.target) && Objects.equals(lastModified, that.lastModified); + } + + @Override + public int hashCode() { + return Objects.hash(lastModified, origin, target); + } +} diff --git a/src/main/java/life/qbic/data/processing/registration/ValidationException.java b/src/main/java/life/qbic/data/processing/registration/ValidationException.java new file mode 100644 index 0000000..cc33497 --- /dev/null +++ b/src/main/java/life/qbic/data/processing/registration/ValidationException.java @@ -0,0 +1,19 @@ +package life.qbic.data.processing.registration; + +public class ValidationException extends RuntimeException { + + private final ErrorCode errorCode; + + public ValidationException(ErrorCode errorCode) { + this.errorCode = errorCode; + } + + public ValidationException(String message, ErrorCode errorCode) { + super(message); + this.errorCode = errorCode; + } + + public ErrorCode errorCode() { + return errorCode; + } +} diff --git a/src/main/java/life/qbic/data/processing/scanner/Scanner.java b/src/main/java/life/qbic/data/processing/scanner/Scanner.java new file mode 100644 index 0000000..fa32692 --- /dev/null +++ b/src/main/java/life/qbic/data/processing/scanner/Scanner.java @@ -0,0 +1,152 @@ +package life.qbic.data.processing.scanner; + +import static org.apache.logging.log4j.LogManager.getLogger; + +import java.io.File; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.time.Instant; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import life.qbic.data.processing.ConcurrentRegistrationQueue; +import life.qbic.data.processing.GlobalConfig; +import life.qbic.data.processing.registration.RegistrationRequest; +import org.apache.logging.log4j.Logger; + +/** + * Scanner + *

+ * The scanner thread iterates over the specified directory and records every directory present as + * "user" directory. + *

+ * In the current configuration, the scanner detects activity in a directory named "registration" + * within each user directory. Every event outside this directory is ignored, since data that is + * currently uploaded must not be processed, which will lead to corrupt data. + * + * @since 1.0.0 + */ +public class Scanner extends Thread { + + private static final Logger log = getLogger(Scanner.class); + private static final Path REGISTRATION_PATH = Paths.get("registration"); + + private final Path scannerPath; + private final int scanInterval; + private final HashSet userProcessDirectories = new HashSet<>(); + private final ConcurrentRegistrationQueue registrationQueue; + private final HashSet submittedRequests = new HashSet<>(); + private final Set ignoredDirectories = new HashSet<>(); + + public Scanner(ScannerConfiguration scannerConfiguration, + ConcurrentRegistrationQueue registrationQueue, GlobalConfig globalConfig) { + this.setName("Scanner-Thread"); + Objects.requireNonNull(scannerConfiguration, "scannerConfiguration must not be null"); + scannerPath = Path.of(scannerConfiguration.scannerDirectory()); + if (!scannerPath.toFile().exists()) { + throw new RuntimeException("Could not find scanner directory: " + scannerPath); + } + this.scanInterval = scannerConfiguration.scanInterval(); + this.registrationQueue = Objects.requireNonNull(registrationQueue, + "registrationQueue must not be null"); + this.ignoredDirectories.addAll(scannerConfiguration.ignore()); + if (!this.ignoredDirectories.isEmpty()) { + log.info("Ignoring {} directories", ignoredDirectories.size()); + } + } + + private boolean notToIgnore(String filename) { + return !ignoredDirectories.contains(filename); + } + + @Override + public void run() { + log.info("Started scanning '{}'", scannerPath); + while (!Thread.interrupted()) { + try { + var userFolderIterator = Arrays.stream( + Objects.requireNonNull(scannerPath.toFile().listFiles())).filter(File::isDirectory) + .filter(file -> notToIgnore(file.getName())) + .toList().iterator(); + + while (userFolderIterator.hasNext()) { + fetchRegistrationDirectory(userFolderIterator.next().toPath()).ifPresent( + this::addRegistrationDirectory); + } + + List requests = detectDataForRegistration(); + for (RegistrationRequest request : requests) { + if (submittedRequests.contains(request)) { + log.info("Skipping registration request '{}'", request); + continue; + } + registrationQueue.add(request); + submittedRequests.add(request); + log.info("New registration requested: {}", request); + } + removePathZombies(); + Thread.sleep(scanInterval); + } catch (InterruptedException e) { + interrupt(); + } + } + log.info("Stopped scanning '{}'", scannerPath); + } + + private List detectDataForRegistration() { + return userProcessDirectories.parallelStream() + .map(Path::toFile) + .map(file -> createRequests(file.listFiles(), file.toPath())).flatMap( + Collection::stream).toList(); + } + + private List createRequests(File[] files, Path userDirectory) { + if (files == null || files.length == 0) { + return new ArrayList<>(); + } + return Arrays.stream(files).filter(file -> !file.isHidden()) + .map(file -> createRequest(file, userDirectory)).toList(); + } + + private RegistrationRequest createRequest(File file, Path userDirectory) { + return new RegistrationRequest(Instant.now(), file.lastModified(), + file.getParentFile().toPath(), file.toPath(), userDirectory.getParent()); + } + + private void removePathZombies() { + List zombies = new LinkedList<>(); + for (Path processFolder : userProcessDirectories) { + if (!processFolder.toFile().exists()) { + zombies.add(processFolder); + } + } + + zombies.forEach(zombie -> { + userProcessDirectories.remove(zombie); + log.warn("Removing orphaned process directory: '%s'".formatted(zombie)); + }); + + } + + private void addRegistrationDirectory(Path path) { + if (userProcessDirectories.add(path)) { + log.info("New user process directory found: '{}'", path.toString()); + } + } + + public Optional fetchRegistrationDirectory(Path userDirectory) { + Path resolvedPath = userDirectory.resolve(REGISTRATION_PATH); + return Optional.ofNullable(resolvedPath.toFile().exists() ? resolvedPath : null); + } + + @Override + public void interrupt() { + log.info("Interrupted scanning '{}'", scannerPath); + } +} diff --git a/src/main/java/life/qbic/data/processing/scanner/ScannerConfiguration.java b/src/main/java/life/qbic/data/processing/scanner/ScannerConfiguration.java new file mode 100644 index 0000000..ac558b1 --- /dev/null +++ b/src/main/java/life/qbic/data/processing/scanner/ScannerConfiguration.java @@ -0,0 +1,33 @@ +package life.qbic.data.processing.scanner; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Objects; + +public class ScannerConfiguration { + + private final String scannerDirectory; + private final int scanInterval; + private final String[] ignore; + + public ScannerConfiguration(String scannerDirectory, int interval, String[] ignore) { + this.scannerDirectory = scannerDirectory; + if (interval <= 0) { + throw new IllegalArgumentException("Interval must be greater than 0"); + } + this.scanInterval = interval; + this.ignore = Arrays.copyOf(Objects.requireNonNull(ignore), ignore.length); + } + + public String scannerDirectory() { + return scannerDirectory; + } + + public int scanInterval() { + return scanInterval; + } + + public Collection ignore () { + return Arrays.stream(ignore).toList(); + } +} diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index 43f0696..29d5e52 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -1 +1,68 @@ -messages.file=messages.txt +#================================ +# Data processing app properties +#================================ + +#------------------------ +# Global settings +#------------------------ +# Directory name that will be used for the manual intervention directory +# Created in the users' home folders +# e.g. /home//error +users.error.directory.name=error +# Directory name that will be used for the detecting dropped datasets +# Needs to be present in the users' home folders +# e.g. /home//registration +users.registration.directory.name=registration + +#-------------------------------------- +# Settings for the data scanning thread +#-------------------------------------- +# Path to the directory that contains all user directories +# e.g. /home in Linux or /Users in macOS +scanner.directory=${SCANNER_DIR:/home} +# Ignored directories are skipped during scanning +# your can provide a list of comma-separated names if you want to ignore multiple directories +scanner.ignore=${SCANNER_IGNORE:} +# The time interval (milliseconds) the scanner thread iterates through the scanner directory +# Value must be an integer > 0 +scanner.interval=1000 + +#---------------- +# Settings for the registration worker threads +#---------------- +registration.threads=2 +registration.metadata.filename=metadata.txt +registration.working.dir=${WORKING_DIR:} +registration.target.dir=${PROCESSING_DIR:} + +#------------------------------------ +# Settings for the 1. processing step +# Proper packaging and provenance data, some simple checks +#------------------------------------ +processing.threads=2 +processing.working.dir=${PROCESSING_DIR} +processing.target.dir=${EVALUATION_DIR} + +#---------------------------------- +# Setting for the 2. processing step: +# Measurement ID evaluation +# --------------------------------- +evaluations.threads=2 +evaluation.working.dir=${EVALUATION_DIR} +# Define one or more target directories here +# Example single target dir: +# evaluation.target.dirs=/my/example/target/dir +# Example multiple target dir: +# evaluation.target.dirs=/my/example/target/dir1,/my/example/target/dir2,/my/example/target/dir3 +evaluation.target.dirs=${OPENBIS_ETL_DIRS} +evaluation.measurement-id.pattern=^(MS|NGS)Q[A-Z0-9]{4}[0-9]{3}[A-Z0-9]{2}-[0-9]* + +# ---------------- +# Logging settings +# ---------------- +# We want logging being enabled even during shutdown procedure, so log information from active +# workers are still capture in the log output. +# Setting it to 'true' means that a thread is registered to the shutdown hook that will terminate the +# logging eventually. Log events in threads that are still running might get lost, which might not be desired +# Setting it to 'false' means that the logging remains active until the JVM completely stops. +logging.register-shutdown-hook=false diff --git a/src/test/groovy/life/qbic/springminimaltemplate/SpringMinimalTemplateApplicationTests.groovy b/src/test/groovy/life/qbic/springminimaltemplate/SpringMinimalTemplateApplicationTests.groovy deleted file mode 100644 index f472c3e..0000000 --- a/src/test/groovy/life/qbic/springminimaltemplate/SpringMinimalTemplateApplicationTests.groovy +++ /dev/null @@ -1,26 +0,0 @@ -package life.qbic.springminimaltemplate - -import org.junit.jupiter.api.Test -import org.springframework.beans.factory.annotation.Autowired -import org.springframework.boot.test.context.SpringBootTest -import spock.lang.Specification - -@SpringBootTest -class SpringMinimalTemplateApplicationTests extends Specification { - - @Test - void contextLoads() { - } - - @Autowired - private MessageService messageService - - def "autowired works"() { - when: - String messages = messageService.collectMessage() - println(messages) - then: - messages != null - } - -}