Skip to content

Commit

Permalink
Merge branch 'main' of github.com:subugoe/operandi
Browse files Browse the repository at this point in the history
  • Loading branch information
MehmedGIT committed Mar 5, 2024
2 parents 65be5fe + c8708eb commit bf90c9e
Show file tree
Hide file tree
Showing 4 changed files with 132 additions and 94 deletions.
181 changes: 108 additions & 73 deletions src/utils/operandi_utils/hpc/batch_scripts/submit_workflow_job.sh
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
SIF_PATH="/scratch1/users/${USER}/ocrd_all_maximum_image.sif"
OCRD_MODELS_DIR="/scratch1/users/${USER}/ocrd_models"
OCRD_MODELS_DIR_IN_DOCKER="/usr/local/share"
BIND_OCRD_MODELS="${OCRD_MODELS_DIR}:${OCRD_MODELS_DIR_IN_DOCKER}"

SCRATCH_BASE=$1
WORKFLOW_JOB_ID=$2
Expand All @@ -30,9 +31,14 @@ FORKS=$9
PAGES=${10}

WORKFLOW_JOB_DIR="${SCRATCH_BASE}/${WORKFLOW_JOB_ID}"
WORKSPACE_DIR="${WORKFLOW_JOB_DIR}/${WORKSPACE_ID}"
NF_SCRIPT_PATH="${WORKFLOW_JOB_DIR}/${NEXTFLOW_SCRIPT_ID}"
WORKSPACE_DIR="${WORKFLOW_JOB_DIR}/${WORKSPACE_ID}"
WORKSPACE_DIR_IN_DOCKER="/ws_data"
BIND_WORKSPACE_DIR="${WORKSPACE_DIR}:${WORKSPACE_DIR_IN_DOCKER}"
METS_FILE_PATH="${WORKSPACE_DIR_IN_DOCKER}/${METS_BASENAME}"
METS_SOCKET_BASENAME="mets_server.sock"
METS_SOCKET_PATH="${WORKSPACE_DIR_IN_DOCKER}/${METS_SOCKET_BASENAME}"


hostname
slurm_resources
Expand All @@ -45,89 +51,118 @@ module load nextflow
# To submit separate jobs for each process in the NF script
# export NXF_EXECUTOR=slurm

# The SIF file of the OCR-D All docker image must be previously created
if [ ! -f "${SIF_PATH}" ]; then
echo "Required ocrd_all_image sif file not found at: ${SIF_PATH}"
exit 1
fi

# Models directory must be previously filled with OCR-D models
if [ ! -d "${OCRD_MODELS_DIR}" ]; then
echo "Ocrd models directory not found at: ${OCRD_MODELS_DIR}"
exit 1
fi
# Define functions to be used
check_existence_of_paths () {
# The SIF file of the OCR-D All docker image must be previously created
if [ ! -f "${SIF_PATH}" ]; then
echo "Required ocrd_all_image sif file not found at: ${SIF_PATH}"
exit 1
fi

# Models directory must be previously filled with OCR-D models
if [ ! -d "${OCRD_MODELS_DIR}" ]; then
echo "Ocrd models directory not found at: ${OCRD_MODELS_DIR}"
exit 1
fi

if [ ! -d "${SCRATCH_BASE}" ]; then
mkdir -p "${SCRATCH_BASE}"
fi

if [ ! -d "${SCRATCH_BASE}" ]; then
mkdir -p "${SCRATCH_BASE}"
if [ ! -d "${SCRATCH_BASE}" ]; then
echo "Required scratch base dir was not created: ${SCRATCH_BASE}"
exit 1
fi
fi
}

unzip_workflow_job_dir () {
if [ ! -f "${WORKFLOW_JOB_DIR}.zip" ]; then
echo "Required scratch slurm workspace zip is not available: ${WORKFLOW_JOB_DIR}.zip"
exit 1
fi

if [ ! -f "${WORKFLOW_JOB_DIR}.zip" ]; then
echo "Required scratch slurm workspace zip is not available: ${WORKFLOW_JOB_DIR}.zip"
exit 1
else
echo "Unzipping ${WORKFLOW_JOB_DIR}.zip to: ${WORKFLOW_JOB_DIR}"
unzip "${WORKFLOW_JOB_DIR}.zip" -d "${SCRATCH_BASE}" > "${SCRATCH_BASE}/workflow_job_unzipping.log"
echo "Removing zip: ${WORKFLOW_JOB_DIR}.zip"
mv "${SCRATCH_BASE}/workflow_job_unzipping.log" "${WORKFLOW_JOB_DIR}/workflow_job_unzipping.log"
rm "${WORKFLOW_JOB_DIR}.zip"
fi

if [ ! -d "${WORKFLOW_JOB_DIR}" ]; then
echo "Required scratch slurm workflow dir not available: ${WORKFLOW_JOB_DIR}"
exit 1
else
if [ ! -d "${WORKFLOW_JOB_DIR}" ]; then
echo "Required scratch slurm workflow dir not available: ${WORKFLOW_JOB_DIR}"
exit 1
fi

cd "${WORKFLOW_JOB_DIR}" || exit 1
fi

# TODO: Would be better to start the mets server as an instance, but this is still broken
# singularity instance start \
# --bind "${WORKSPACE_DIR}:/ws_data" \
# "${SIF_PATH}" \
# instance_mets_server \
# ocrd workspace -U "/ws_data/${METS_SOCKET_BASENAME}" -d "/ws_data" server start

# Start the mets server for the specific workspace in the background
singularity exec \
--bind "${WORKSPACE_DIR}:/ws_data" \
"${SIF_PATH}" \
ocrd workspace -U "/ws_data/${METS_SOCKET_BASENAME}" -d "/ws_data" server start \
> "${WORKSPACE_DIR}/mets_server.log" 2>&1 &

sleep 3

# Execute the Nextflow script
nextflow run "${NF_SCRIPT_PATH}" \
-ansi-log false \
-with-report \
--input_file_group "${IN_FILE_GRP}" \
--mets "/ws_data/${METS_BASENAME}" \
--mets_socket "/ws_data/${METS_SOCKET_BASENAME}" \
--workspace_dir "/ws_data" \
--pages "${PAGES}" \
--singularity_wrapper "singularity exec --bind ${WORKSPACE_DIR}:/ws_data --bind ${OCRD_MODELS_DIR}:${OCRD_MODELS_DIR_IN_DOCKER} --env OCRD_METS_CACHING=true ${SIF_PATH}" \
--cpus "${CPUS}" \
--ram "${RAM}" \
--forks "${FORKS}"

# Not supported in the HPC (the version there is <7.40)
# curl -X DELETE --unix-socket "${WORKSPACE_DIR}/${METS_SOCKET_BASENAME}" "http://localhost/"

# TODO Stop the instance here
# singularity instance stop instance_mets_server

# Stop the mets server started above
singularity exec \
--bind "${WORKSPACE_DIR}:/ws_data" \
"${SIF_PATH}" \
ocrd workspace -U "/ws_data/${METS_SOCKET_BASENAME}" -d "/ws_data" server stop

# Delete symlinks created for the Nextflow workers
find "${WORKFLOW_JOB_DIR}" -type l -delete
# Create a zip of the ocrd workspace dir
cd "${WORKSPACE_DIR}" && zip -r "${WORKSPACE_ID}.zip" "." -x "*.sock" > "workspace_zipping.log"
# Create a zip of the Nextflow run results by excluding the ocrd workspace dir
cd "${WORKFLOW_JOB_DIR}" && zip -r "${WORKFLOW_JOB_ID}.zip" "." -x "${WORKSPACE_ID}**" > "workflow_job_zipping.log"
}

start_mets_server () {
# TODO: Would be better to start the mets server as an instance, but this is still broken
# singularity instance start \
# --bind "${BIND_WORKSPACE_DIR}" \
# "${SIF_PATH}" \
# instance_mets_server \
# ocrd workspace -U "${METS_SOCKET_PATH}" -d "${WORKSPACE_DIR_IN_DOCKER}" server start

# Start the mets server for the specific workspace in the background
singularity exec \
--bind "${BIND_WORKSPACE_DIR}" \
"${SIF_PATH}" \
ocrd workspace -U "${METS_SOCKET_PATH}" -d "${WORKSPACE_DIR_IN_DOCKER}" server start \
> "${WORKSPACE_DIR}/mets_server.log" 2>&1 &
}

stop_mets_server () {
# Not supported in the HPC (the version there is <7.40)
# curl -X DELETE --unix-socket "${WORKSPACE_DIR}/${METS_SOCKET_BASENAME}" "http://localhost/"

# TODO Stop the instance here
# singularity instance stop instance_mets_server

# Stop the mets server started above
singularity exec \
--bind "${BIND_WORKSPACE_DIR}" \
"${SIF_PATH}" \
ocrd workspace -U "${METS_SOCKET_PATH}" -d "${WORKSPACE_DIR_IN_DOCKER}" server stop
}

execute_nextflow_workflow () {
local SINGULARITY_CMD="singularity exec --bind ${BIND_WORKSPACE_DIR} --bind ${BIND_OCRD_MODELS} --env OCRD_METS_CACHING=true ${SIF_PATH}"
# Execute the Nextflow script
nextflow run "${NF_SCRIPT_PATH}" \
-ansi-log false \
-with-report \
--input_file_group "${IN_FILE_GRP}" \
--mets "${METS_FILE_PATH}" \
--mets_socket "${METS_SOCKET_PATH}" \
--workspace_dir "${WORKSPACE_DIR_IN_DOCKER}" \
--pages "${PAGES}" \
--singularity_wrapper "${SINGULARITY_CMD}" \
--cpus "${CPUS}" \
--ram "${RAM}" \
--forks "${FORKS}"

# Useful for handling all kinds of exit status codes in the future
case $? in
0) echo "The nextflow workflow execution has finished successfully" ;;
*) echo "The nextflow workflow execution has failed" >&2 exit 1 ;;
esac
}

zip_results () {
# Delete symlinks created for the Nextflow workers
find "${WORKFLOW_JOB_DIR}" -type l -delete
# Create a zip of the ocrd workspace dir
cd "${WORKSPACE_DIR}" && zip -r "${WORKSPACE_ID}.zip" "." -x "*.sock" > "workspace_zipping.log"
# Create a zip of the Nextflow run results by excluding the ocrd workspace dir
cd "${WORKFLOW_JOB_DIR}" && zip -r "${WORKFLOW_JOB_ID}.zip" "." -x "${WORKSPACE_ID}**" > "workflow_job_zipping.log"
}

# Main loop for workflow job execution
check_existence_of_paths
unzip_workflow_job_dir
start_mets_server
sleep 5
execute_nextflow_workflow
stop_mets_server
zip_results
21 changes: 11 additions & 10 deletions src/utils/operandi_utils/hpc/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,27 +172,29 @@ def is_transport_responsive(transport: Transport) -> bool:
except EOFError:
return False

@staticmethod
def is_ssh_connection_still_responsive(ssh_client: SSHClient) -> bool:
def is_ssh_connection_still_responsive(self, ssh_client: SSHClient) -> bool:
self.log.debug("Checking SSH connection responsiveness")
if not ssh_client:
return False
transport = ssh_client.get_transport()
return HPCConnector.is_transport_responsive(transport)
return self.is_transport_responsive(transport)

def is_proxy_tunnel_still_responsive(self) -> bool:
self.log.debug("Checking proxy tunel responsiveness")
if not self.proxy_tunnel:
return False
transport = self.proxy_tunnel.get_transport()
return HPCConnector.is_transport_responsive(transport)
return self.is_transport_responsive(transport)

def is_sftp_still_responsive(self) -> bool:
self.log.debug("Checking SFTP connection responsiveness")
if not self.sftp_client:
return False
channel = self.sftp_client.get_channel()
if not channel:
return False
transport = channel.get_transport()
return HPCConnector.is_transport_responsive(transport)
return self.is_transport_responsive(transport)

def reconnect_if_required(
self,
Expand All @@ -205,13 +207,13 @@ def reconnect_if_required(
hpc_host = self.last_used_hpc_host
if not proxy_host:
proxy_host = self.last_used_proxy_host
if not HPCConnector.is_ssh_connection_still_responsive(self.ssh_proxy_client):
if not self.is_ssh_connection_still_responsive(self.ssh_proxy_client):
self.log.warning("The connection to proxy server is not responsive, trying to open a new connection")
self.ssh_proxy_client = self.connect_to_proxy_server(host=proxy_host, port=proxy_port)
if not self.is_proxy_tunnel_still_responsive():
self.log.warning("The proxy tunnel is not responsive, trying to establish a new proxy tunnel")
self.proxy_tunnel = self.establish_proxy_tunnel(dst_host=hpc_host, dst_port=hpc_port)
if not HPCConnector.is_ssh_connection_still_responsive(self.ssh_hpc_client):
if not self.is_ssh_connection_still_responsive(self.ssh_hpc_client):
self.log.warning("The connection to hpc frontend server is not responsive, trying to open a new connection")
self.ssh_hpc_client = self.connect_to_hpc_frontend_server(proxy_host, proxy_port, self.proxy_tunnel)

Expand All @@ -230,9 +232,8 @@ def recreate_sftp_if_required(
hpc_host=hpc_host, hpc_port=hpc_port,
proxy_host=proxy_host, proxy_port=proxy_port
)
if not self.is_sftp_still_responsive():
self.log.warning("The SFTP client is not responsive, trying to create a new SFTP client")
self.create_sftp_client()
self.log.warning("Creating a new SFTP client")
self.create_sftp_client()

def create_ssh_connection_to_hpc_by_iteration(self, try_times: int = 1) -> None:
while try_times > 0:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,10 @@ params.pages = "null"
params.singularity_wrapper = "null"
params.cpus = "null"
params.ram = "null"
// by default single instance of each OCR-D processor
params.forks = 2
params.pages_per_range = params.pages / params.forks
params.forks = params.cpus
// Do not pass these parameters from the caller unless you know what you are doing
params.cpus_per_fork = (params.cpus.toInteger() / params.forks.toInteger()).intValue()
params.ram_per_fork = sprintf("%dGB", (params.ram.toInteger() / params.forks.toInteger()).intValue())

log.info """\
O P E R A N D I - H P C - T E M P L A T E P I P E L I N E
Expand All @@ -27,7 +28,8 @@ log.info """\
cpus : ${params.cpus}
ram : ${params.ram}
forks : ${params.forks}
pages_per_range : ${params.pages_per_range}
cpus_per_fork : ${params.cpus_per_fork}
ram_per_fork : ${params.ram_per_fork}
"""
.stripIndent()

Expand All @@ -50,24 +52,24 @@ process split_page_ranges {

process ocrd_cis_ocropy_binarize {
maxForks params.forks
cpus params.cpus
memory params.ram
cpus params.cpus_per_fork
memory params.ram_per_fork
debug true

input:
val page_range
path mets_file
val input_group
val output_group
script:

"""
${params.singularity_wrapper} ocrd-cis-ocropy-binarize -U ${params.mets_socket} -w ${params.workspace_dir} -m ${mets_file} --page-id ${page_range} -I ${input_group} -O ${output_group}
${params.singularity_wrapper} ocrd-cis-ocropy-binarize -U ${params.mets_socket} -w ${params.workspace_dir} -m ${params.mets} --page-id ${page_range} -I ${input_group} -O ${output_group}
"""
}

workflow {
main:
ch_range_multipliers = Channel.of(0..params.forks-1)
ch_range_multipliers = Channel.of(0..params.forks.intValue()-1)
split_page_ranges(ch_range_multipliers)
ocrd_cis_ocropy_binarize(split_page_ranges.out[0], params.mets, params.input_file_group, "OCR-D-BIN")
ocrd_cis_ocropy_binarize(split_page_ranges.out, params.input_file_group, "OCR-D-BIN")
}
2 changes: 1 addition & 1 deletion tests/tests_utils/test_hpc_xcombined.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ def test_hpc_connector_run_batch_script(hpc_command_executor, template_workflow)
mets_basename="mets.xml",
job_deadline_time="1:00:00",
cpus=2,
ram=8,
ram=16,
nf_process_forks=2,
ws_pages_amount=8
)
Expand Down

0 comments on commit bf90c9e

Please sign in to comment.