diff --git a/src/utils/operandi_utils/hpc/batch_scripts/submit_workflow_job.sh b/src/utils/operandi_utils/hpc/batch_scripts/submit_workflow_job.sh index 5d4ca937..dd291707 100755 --- a/src/utils/operandi_utils/hpc/batch_scripts/submit_workflow_job.sh +++ b/src/utils/operandi_utils/hpc/batch_scripts/submit_workflow_job.sh @@ -17,6 +17,7 @@ SIF_PATH="/scratch1/users/${USER}/ocrd_all_maximum_image.sif" OCRD_MODELS_DIR="/scratch1/users/${USER}/ocrd_models" OCRD_MODELS_DIR_IN_DOCKER="/usr/local/share" +BIND_OCRD_MODELS="${OCRD_MODELS_DIR}:${OCRD_MODELS_DIR_IN_DOCKER}" SCRATCH_BASE=$1 WORKFLOW_JOB_ID=$2 @@ -30,9 +31,14 @@ FORKS=$9 PAGES=${10} WORKFLOW_JOB_DIR="${SCRATCH_BASE}/${WORKFLOW_JOB_ID}" -WORKSPACE_DIR="${WORKFLOW_JOB_DIR}/${WORKSPACE_ID}" NF_SCRIPT_PATH="${WORKFLOW_JOB_DIR}/${NEXTFLOW_SCRIPT_ID}" +WORKSPACE_DIR="${WORKFLOW_JOB_DIR}/${WORKSPACE_ID}" +WORKSPACE_DIR_IN_DOCKER="/ws_data" +BIND_WORKSPACE_DIR="${WORKSPACE_DIR}:${WORKSPACE_DIR_IN_DOCKER}" +METS_FILE_PATH="${WORKSPACE_DIR_IN_DOCKER}/${METS_BASENAME}" METS_SOCKET_BASENAME="mets_server.sock" +METS_SOCKET_PATH="${WORKSPACE_DIR_IN_DOCKER}/${METS_SOCKET_BASENAME}" + hostname slurm_resources @@ -45,89 +51,118 @@ module load nextflow # To submit separate jobs for each process in the NF script # export NXF_EXECUTOR=slurm -# The SIF file of the OCR-D All docker image must be previously created -if [ ! -f "${SIF_PATH}" ]; then - echo "Required ocrd_all_image sif file not found at: ${SIF_PATH}" - exit 1 -fi -# Models directory must be previously filled with OCR-D models -if [ ! -d "${OCRD_MODELS_DIR}" ]; then - echo "Ocrd models directory not found at: ${OCRD_MODELS_DIR}" - exit 1 -fi +# Define functions to be used +check_existence_of_paths () { + # The SIF file of the OCR-D All docker image must be previously created + if [ ! -f "${SIF_PATH}" ]; then + echo "Required ocrd_all_image sif file not found at: ${SIF_PATH}" + exit 1 + fi + + # Models directory must be previously filled with OCR-D models + if [ ! -d "${OCRD_MODELS_DIR}" ]; then + echo "Ocrd models directory not found at: ${OCRD_MODELS_DIR}" + exit 1 + fi + + if [ ! -d "${SCRATCH_BASE}" ]; then + mkdir -p "${SCRATCH_BASE}" + fi -if [ ! -d "${SCRATCH_BASE}" ]; then - mkdir -p "${SCRATCH_BASE}" if [ ! -d "${SCRATCH_BASE}" ]; then echo "Required scratch base dir was not created: ${SCRATCH_BASE}" exit 1 fi -fi +} + +unzip_workflow_job_dir () { + if [ ! -f "${WORKFLOW_JOB_DIR}.zip" ]; then + echo "Required scratch slurm workspace zip is not available: ${WORKFLOW_JOB_DIR}.zip" + exit 1 + fi -if [ ! -f "${WORKFLOW_JOB_DIR}.zip" ]; then - echo "Required scratch slurm workspace zip is not available: ${WORKFLOW_JOB_DIR}.zip" - exit 1 -else echo "Unzipping ${WORKFLOW_JOB_DIR}.zip to: ${WORKFLOW_JOB_DIR}" unzip "${WORKFLOW_JOB_DIR}.zip" -d "${SCRATCH_BASE}" > "${SCRATCH_BASE}/workflow_job_unzipping.log" echo "Removing zip: ${WORKFLOW_JOB_DIR}.zip" mv "${SCRATCH_BASE}/workflow_job_unzipping.log" "${WORKFLOW_JOB_DIR}/workflow_job_unzipping.log" rm "${WORKFLOW_JOB_DIR}.zip" -fi -if [ ! -d "${WORKFLOW_JOB_DIR}" ]; then - echo "Required scratch slurm workflow dir not available: ${WORKFLOW_JOB_DIR}" - exit 1 -else + if [ ! -d "${WORKFLOW_JOB_DIR}" ]; then + echo "Required scratch slurm workflow dir not available: ${WORKFLOW_JOB_DIR}" + exit 1 + fi + cd "${WORKFLOW_JOB_DIR}" || exit 1 -fi - -# TODO: Would be better to start the mets server as an instance, but this is still broken -# singularity instance start \ -# --bind "${WORKSPACE_DIR}:/ws_data" \ -# "${SIF_PATH}" \ -# instance_mets_server \ -# ocrd workspace -U "/ws_data/${METS_SOCKET_BASENAME}" -d "/ws_data" server start - -# Start the mets server for the specific workspace in the background -singularity exec \ - --bind "${WORKSPACE_DIR}:/ws_data" \ - "${SIF_PATH}" \ - ocrd workspace -U "/ws_data/${METS_SOCKET_BASENAME}" -d "/ws_data" server start \ - > "${WORKSPACE_DIR}/mets_server.log" 2>&1 & - -sleep 3 - -# Execute the Nextflow script -nextflow run "${NF_SCRIPT_PATH}" \ --ansi-log false \ --with-report \ ---input_file_group "${IN_FILE_GRP}" \ ---mets "/ws_data/${METS_BASENAME}" \ ---mets_socket "/ws_data/${METS_SOCKET_BASENAME}" \ ---workspace_dir "/ws_data" \ ---pages "${PAGES}" \ ---singularity_wrapper "singularity exec --bind ${WORKSPACE_DIR}:/ws_data --bind ${OCRD_MODELS_DIR}:${OCRD_MODELS_DIR_IN_DOCKER} --env OCRD_METS_CACHING=true ${SIF_PATH}" \ ---cpus "${CPUS}" \ ---ram "${RAM}" \ ---forks "${FORKS}" - -# Not supported in the HPC (the version there is <7.40) -# curl -X DELETE --unix-socket "${WORKSPACE_DIR}/${METS_SOCKET_BASENAME}" "http://localhost/" - -# TODO Stop the instance here -# singularity instance stop instance_mets_server - -# Stop the mets server started above -singularity exec \ - --bind "${WORKSPACE_DIR}:/ws_data" \ - "${SIF_PATH}" \ - ocrd workspace -U "/ws_data/${METS_SOCKET_BASENAME}" -d "/ws_data" server stop - -# Delete symlinks created for the Nextflow workers -find "${WORKFLOW_JOB_DIR}" -type l -delete -# Create a zip of the ocrd workspace dir -cd "${WORKSPACE_DIR}" && zip -r "${WORKSPACE_ID}.zip" "." -x "*.sock" > "workspace_zipping.log" -# Create a zip of the Nextflow run results by excluding the ocrd workspace dir -cd "${WORKFLOW_JOB_DIR}" && zip -r "${WORKFLOW_JOB_ID}.zip" "." -x "${WORKSPACE_ID}**" > "workflow_job_zipping.log" +} + +start_mets_server () { + # TODO: Would be better to start the mets server as an instance, but this is still broken + # singularity instance start \ + # --bind "${BIND_WORKSPACE_DIR}" \ + # "${SIF_PATH}" \ + # instance_mets_server \ + # ocrd workspace -U "${METS_SOCKET_PATH}" -d "${WORKSPACE_DIR_IN_DOCKER}" server start + + # Start the mets server for the specific workspace in the background + singularity exec \ + --bind "${BIND_WORKSPACE_DIR}" \ + "${SIF_PATH}" \ + ocrd workspace -U "${METS_SOCKET_PATH}" -d "${WORKSPACE_DIR_IN_DOCKER}" server start \ + > "${WORKSPACE_DIR}/mets_server.log" 2>&1 & +} + +stop_mets_server () { + # Not supported in the HPC (the version there is <7.40) + # curl -X DELETE --unix-socket "${WORKSPACE_DIR}/${METS_SOCKET_BASENAME}" "http://localhost/" + + # TODO Stop the instance here + # singularity instance stop instance_mets_server + + # Stop the mets server started above + singularity exec \ + --bind "${BIND_WORKSPACE_DIR}" \ + "${SIF_PATH}" \ + ocrd workspace -U "${METS_SOCKET_PATH}" -d "${WORKSPACE_DIR_IN_DOCKER}" server stop +} + +execute_nextflow_workflow () { + local SINGULARITY_CMD="singularity exec --bind ${BIND_WORKSPACE_DIR} --bind ${BIND_OCRD_MODELS} --env OCRD_METS_CACHING=true ${SIF_PATH}" + # Execute the Nextflow script + nextflow run "${NF_SCRIPT_PATH}" \ + -ansi-log false \ + -with-report \ + --input_file_group "${IN_FILE_GRP}" \ + --mets "${METS_FILE_PATH}" \ + --mets_socket "${METS_SOCKET_PATH}" \ + --workspace_dir "${WORKSPACE_DIR_IN_DOCKER}" \ + --pages "${PAGES}" \ + --singularity_wrapper "${SINGULARITY_CMD}" \ + --cpus "${CPUS}" \ + --ram "${RAM}" \ + --forks "${FORKS}" + + # Useful for handling all kinds of exit status codes in the future + case $? in + 0) echo "The nextflow workflow execution has finished successfully" ;; + *) echo "The nextflow workflow execution has failed" >&2 exit 1 ;; + esac +} + +zip_results () { + # Delete symlinks created for the Nextflow workers + find "${WORKFLOW_JOB_DIR}" -type l -delete + # Create a zip of the ocrd workspace dir + cd "${WORKSPACE_DIR}" && zip -r "${WORKSPACE_ID}.zip" "." -x "*.sock" > "workspace_zipping.log" + # Create a zip of the Nextflow run results by excluding the ocrd workspace dir + cd "${WORKFLOW_JOB_DIR}" && zip -r "${WORKFLOW_JOB_ID}.zip" "." -x "${WORKSPACE_ID}**" > "workflow_job_zipping.log" +} + +# Main loop for workflow job execution +check_existence_of_paths +unzip_workflow_job_dir +start_mets_server +sleep 5 +execute_nextflow_workflow +stop_mets_server +zip_results diff --git a/src/utils/operandi_utils/hpc/connector.py b/src/utils/operandi_utils/hpc/connector.py index 6442ad93..669d8dd5 100644 --- a/src/utils/operandi_utils/hpc/connector.py +++ b/src/utils/operandi_utils/hpc/connector.py @@ -172,27 +172,29 @@ def is_transport_responsive(transport: Transport) -> bool: except EOFError: return False - @staticmethod - def is_ssh_connection_still_responsive(ssh_client: SSHClient) -> bool: + def is_ssh_connection_still_responsive(self, ssh_client: SSHClient) -> bool: + self.log.debug("Checking SSH connection responsiveness") if not ssh_client: return False transport = ssh_client.get_transport() - return HPCConnector.is_transport_responsive(transport) + return self.is_transport_responsive(transport) def is_proxy_tunnel_still_responsive(self) -> bool: + self.log.debug("Checking proxy tunel responsiveness") if not self.proxy_tunnel: return False transport = self.proxy_tunnel.get_transport() - return HPCConnector.is_transport_responsive(transport) + return self.is_transport_responsive(transport) def is_sftp_still_responsive(self) -> bool: + self.log.debug("Checking SFTP connection responsiveness") if not self.sftp_client: return False channel = self.sftp_client.get_channel() if not channel: return False transport = channel.get_transport() - return HPCConnector.is_transport_responsive(transport) + return self.is_transport_responsive(transport) def reconnect_if_required( self, @@ -205,13 +207,13 @@ def reconnect_if_required( hpc_host = self.last_used_hpc_host if not proxy_host: proxy_host = self.last_used_proxy_host - if not HPCConnector.is_ssh_connection_still_responsive(self.ssh_proxy_client): + if not self.is_ssh_connection_still_responsive(self.ssh_proxy_client): self.log.warning("The connection to proxy server is not responsive, trying to open a new connection") self.ssh_proxy_client = self.connect_to_proxy_server(host=proxy_host, port=proxy_port) if not self.is_proxy_tunnel_still_responsive(): self.log.warning("The proxy tunnel is not responsive, trying to establish a new proxy tunnel") self.proxy_tunnel = self.establish_proxy_tunnel(dst_host=hpc_host, dst_port=hpc_port) - if not HPCConnector.is_ssh_connection_still_responsive(self.ssh_hpc_client): + if not self.is_ssh_connection_still_responsive(self.ssh_hpc_client): self.log.warning("The connection to hpc frontend server is not responsive, trying to open a new connection") self.ssh_hpc_client = self.connect_to_hpc_frontend_server(proxy_host, proxy_port, self.proxy_tunnel) @@ -230,9 +232,8 @@ def recreate_sftp_if_required( hpc_host=hpc_host, hpc_port=hpc_port, proxy_host=proxy_host, proxy_port=proxy_port ) - if not self.is_sftp_still_responsive(): - self.log.warning("The SFTP client is not responsive, trying to create a new SFTP client") - self.create_sftp_client() + self.log.warning("Creating a new SFTP client") + self.create_sftp_client() def create_ssh_connection_to_hpc_by_iteration(self, try_times: int = 1) -> None: while try_times > 0: diff --git a/src/utils/operandi_utils/hpc/nextflow_workflows/template_workflow.nf b/src/utils/operandi_utils/hpc/nextflow_workflows/template_workflow.nf index 536dec61..9175a3d9 100755 --- a/src/utils/operandi_utils/hpc/nextflow_workflows/template_workflow.nf +++ b/src/utils/operandi_utils/hpc/nextflow_workflows/template_workflow.nf @@ -11,9 +11,10 @@ params.pages = "null" params.singularity_wrapper = "null" params.cpus = "null" params.ram = "null" -// by default single instance of each OCR-D processor -params.forks = 2 -params.pages_per_range = params.pages / params.forks +params.forks = params.cpus +// Do not pass these parameters from the caller unless you know what you are doing +params.cpus_per_fork = (params.cpus.toInteger() / params.forks.toInteger()).intValue() +params.ram_per_fork = sprintf("%dGB", (params.ram.toInteger() / params.forks.toInteger()).intValue()) log.info """\ O P E R A N D I - H P C - T E M P L A T E P I P E L I N E @@ -27,7 +28,8 @@ log.info """\ cpus : ${params.cpus} ram : ${params.ram} forks : ${params.forks} - pages_per_range : ${params.pages_per_range} + cpus_per_fork : ${params.cpus_per_fork} + ram_per_fork : ${params.ram_per_fork} """ .stripIndent() @@ -50,24 +52,24 @@ process split_page_ranges { process ocrd_cis_ocropy_binarize { maxForks params.forks - cpus params.cpus - memory params.ram + cpus params.cpus_per_fork + memory params.ram_per_fork debug true input: val page_range - path mets_file val input_group val output_group script: + """ - ${params.singularity_wrapper} ocrd-cis-ocropy-binarize -U ${params.mets_socket} -w ${params.workspace_dir} -m ${mets_file} --page-id ${page_range} -I ${input_group} -O ${output_group} + ${params.singularity_wrapper} ocrd-cis-ocropy-binarize -U ${params.mets_socket} -w ${params.workspace_dir} -m ${params.mets} --page-id ${page_range} -I ${input_group} -O ${output_group} """ } workflow { main: - ch_range_multipliers = Channel.of(0..params.forks-1) + ch_range_multipliers = Channel.of(0..params.forks.intValue()-1) split_page_ranges(ch_range_multipliers) - ocrd_cis_ocropy_binarize(split_page_ranges.out[0], params.mets, params.input_file_group, "OCR-D-BIN") + ocrd_cis_ocropy_binarize(split_page_ranges.out, params.input_file_group, "OCR-D-BIN") } diff --git a/tests/tests_utils/test_hpc_xcombined.py b/tests/tests_utils/test_hpc_xcombined.py index 33c3ff8b..13ffc8ef 100644 --- a/tests/tests_utils/test_hpc_xcombined.py +++ b/tests/tests_utils/test_hpc_xcombined.py @@ -64,7 +64,7 @@ def test_hpc_connector_run_batch_script(hpc_command_executor, template_workflow) mets_basename="mets.xml", job_deadline_time="1:00:00", cpus=2, - ram=8, + ram=16, nf_process_forks=2, ws_pages_amount=8 )