Skip to content
This repository has been archived by the owner on Jul 19, 2021. It is now read-only.

Commit

Permalink
Require ee_id in execute_queue_async
Browse files Browse the repository at this point in the history
  • Loading branch information
jondequinor authored and sondreso committed May 7, 2021
1 parent fbefcd4 commit 974d957
Showing 1 changed file with 8 additions and 8 deletions.
16 changes: 8 additions & 8 deletions python/res/job_queue/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -413,11 +413,11 @@ def execute_queue(self, pool_sema, evaluators):
self._transition()

@staticmethod
def _translate_change_to_cloudevent(real_id, status):
def _translate_change_to_cloudevent(ee_id, real_id, status):
return CloudEvent(
{
"type": _queue_state_event_type(status),
"source": f"/ert/ee/{0}/real/{real_id}/step/{0}",
"source": f"/ert/ee/{ee_id}/real/{real_id}/step/{0}",
"datacontenttype": "application/json",
},
{
Expand All @@ -426,16 +426,16 @@ def _translate_change_to_cloudevent(real_id, status):
)

@staticmethod
async def _publish_changes(changes, websocket):
async def _publish_changes(ee_id, changes, websocket):
events = [
JobQueue._translate_change_to_cloudevent(real_id, status)
JobQueue._translate_change_to_cloudevent(ee_id, real_id, status)
for real_id, status in changes.items()
]
for event in events:
await websocket.send(to_json(event))

async def execute_queue_async(
self, ws_uri, pool_sema, evaluators, cert=None, token=None
self, ws_uri, ee_id, pool_sema, evaluators, cert=None, token=None
):
if cert is not None:
ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
Expand All @@ -448,7 +448,7 @@ async def execute_queue_async(
async with websockets.connect(
ws_uri, ssl=ssl_context, extra_headers=headers
) as websocket:
await JobQueue._publish_changes(self.snapshot(), websocket)
await JobQueue._publish_changes(ee_id, self.snapshot(), websocket)

try:
while self.is_active() and not self.stopped:
Expand All @@ -461,7 +461,7 @@ async def execute_queue_async(
func()

await JobQueue._publish_changes(
self._changes_after_transition(), websocket
ee_id, self._changes_after_transition(), websocket
)
except asyncio.CancelledError:
if self.stopped:
Expand All @@ -478,7 +478,7 @@ async def execute_queue_async(
logger.debug("jobs now stopped")
self.assert_complete()
self._transition()
await JobQueue._publish_changes(self.snapshot(), websocket)
await JobQueue._publish_changes(ee_id, self.snapshot(), websocket)

def add_job_from_run_arg(self, run_arg, res_config, max_runtime, ok_cb, exit_cb):
job_name = run_arg.job_name
Expand Down

0 comments on commit 974d957

Please sign in to comment.