Skip to content

Commit

Permalink
fix: always release resources in chat ui for csharp (#2427)
Browse files Browse the repository at this point in the history
# Description

Although new chat UI doesn't support streaming for now, PFS will call
flow operation with `stream_output` as `True`, which may bring some
issues in resource release. In this PR, we avoid that.

Improvements to the `TestSubmitter`:

*
[`src/promptflow/promptflow/_sdk/_submitter/test_submitter.py`](diffhunk://#diff-d9dd2c09a149b11eb54626edf065287eeb7b1a28e39719b8dd95377770f64138R287-R289):
The `destroy_if_all_generators_exhausted` method has been replaced with
the `destroy` method in the `TestSubmitter` class. This change is
accompanied by a comment noting that the current implementation may not
properly release resources.

Changes to the test submission process:

*
[`src/promptflow/promptflow/_sdk/data/executable/main.py`](diffhunk://#diff-c22ed7aec6ed0b5a5fde51716818d5695b917ab38cc385ee7e65390471fb723cR14-L20):
The `TestSubmitter` class is now used in the `submit` function to handle
test submissions. This change simplifies the submission process by
removing the need for the `run_flow` function.
[[1]](diffhunk://#diff-c22ed7aec6ed0b5a5fde51716818d5695b917ab38cc385ee7e65390471fb723cR14-L20)
[[2]](diffhunk://#diff-c22ed7aec6ed0b5a5fde51716818d5695b917ab38cc385ee7e65390471fb723cL74-R78)
[[3]](diffhunk://#diff-c22ed7aec6ed0b5a5fde51716818d5695b917ab38cc385ee7e65390471fb723cL104-L123)

Codebase simplification:

*
[`src/promptflow/promptflow/_sdk/data/executable/main.py`](diffhunk://#diff-c22ed7aec6ed0b5a5fde51716818d5695b917ab38cc385ee7e65390471fb723cL104-L123):
The `invoker` global variable and the `run_flow` function have been
removed, and a `resolve_flow_path` function has been added to handle the
flow path resolution.
[[1]](diffhunk://#diff-c22ed7aec6ed0b5a5fde51716818d5695b917ab38cc385ee7e65390471fb723cL104-L123)
[[2]](diffhunk://#diff-c22ed7aec6ed0b5a5fde51716818d5695b917ab38cc385ee7e65390471fb723cR195-R212)
*
[`src/promptflow/promptflow/_sdk/entities/_flow/dag.py`](diffhunk://#diff-fe4826ecd0e92a94a1769ec7376562b483dcb666b54acd0e6396d6262cbf5f2eR149-R151):
A comment has been added to the `invoke` method in the `Flow` class,
indicating that the use of a context manager for `stream_output` is not
appropriate.
*
[`src/promptflow/promptflow/batch/_base_executor_proxy.py`](diffhunk://#diff-44ac8eeb30a630bd24987e92f8bd5a180d57a3ad067db5d937561f21771e14c5L232-R238):
The `destroy_if_all_generators_exhausted` method now raises an
`UnexpectedError` if all generators are not exhausted, and the
`enable_stream_output` parameter has been removed from the `exec_line`
method.
[[1]](diffhunk://#diff-44ac8eeb30a630bd24987e92f8bd5a180d57a3ad067db5d937561f21771e14c5L232-R238)
[[2]](diffhunk://#diff-44ac8eeb30a630bd24987e92f8bd5a180d57a3ad067db5d937561f21771e14c5L323-L324)

# All Promptflow Contribution checklist:
- [ ] **The pull request does not introduce [breaking changes].**
- [ ] **CHANGELOG is updated for new features, bug fixes or other
significant changes.**
- [ ] **I have read the [contribution guidelines](../CONTRIBUTING.md).**
- [ ] **Create an issue and link to the pull request to get dedicated
review from promptflow team. Learn more: [suggested
workflow](../CONTRIBUTING.md#suggested-workflow).**

## General Guidelines and Best Practices
- [ ] Title of the pull request is clear and informative.
- [ ] There are a small number of commits, each of which have an
informative message. This means that previously merged commits do not
appear in the history of the PR. For more information on cleaning up the
commits in your PR, [see this
page](https://github.com/Azure/azure-powershell/blob/master/documentation/development-docs/cleaning-up-commits.md).

### Testing Guidelines
- [ ] Pull request includes test coverage for the included changes.
  • Loading branch information
elliotzh committed Mar 28, 2024
1 parent dffa5f7 commit a97a7cb
Show file tree
Hide file tree
Showing 8 changed files with 112 additions and 88 deletions.
8 changes: 7 additions & 1 deletion src/promptflow-devkit/promptflow/_sdk/_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,11 @@

from promptflow._constants import (
CONNECTION_SCRUBBED_VALUE,
CONNECTION_SCRUBBED_VALUE_NO_CHANGE,
PROMPT_FLOW_DIR_NAME,
ConnectionAuthMode,
ConnectionType,
CustomStrongTypeConnectionConfigs, CONNECTION_SCRUBBED_VALUE_NO_CHANGE,
CustomStrongTypeConnectionConfigs,
)

LOGGER_NAME = "promptflow"
Expand Down Expand Up @@ -478,6 +479,11 @@ class IdentityKeys(str, Enum):
CLIENT_ID = "client_id"


class OSType:
WINDOWS = "Windows"
LINUX = "Linux"


# Note: Keep these for backward compatibility
CustomStrongTypeConnectionConfigs = CustomStrongTypeConnectionConfigs
ConnectionType = ConnectionType
Expand Down
2 changes: 2 additions & 0 deletions src/promptflow-devkit/promptflow/_sdk/_service/apis/flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ def post(self):
node=node,
experiment=experiment,
output_path=output_path,
allow_generator_output=False,
stream_output=False,
)
finally:
if remove_dir:
Expand Down
27 changes: 12 additions & 15 deletions src/promptflow-devkit/promptflow/_sdk/_submitter/test_submitter.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
from promptflow.executor._result import LineResult
from promptflow.storage._run_storage import DefaultRunStorage

from ...batch import APIBasedExecutorProxy, CSharpExecutorProxy
from .._configuration import Configuration
from ..entities._flow import FlexFlow
from .utils import (
Expand Down Expand Up @@ -86,11 +85,11 @@ def __init__(
self._target_node = None
self._storage = None
self._enable_stream_output = None
self._executor_proxy: Optional[APIBasedExecutorProxy] = None
self._executor_proxy = None
self._within_init_context = False

@property
def executor_proxy(self) -> APIBasedExecutorProxy:
def executor_proxy(self):
self._raise_if_not_within_init_context()
return self._executor_proxy

Expand Down Expand Up @@ -268,23 +267,21 @@ def init(
sub_dir=output_sub / "intermediate",
)

# TODO: set up executor proxy for all languages
if self.flow.language == FlowLanguage.CSharp:
self._executor_proxy = async_run_allowing_running_loop(
CSharpExecutorProxy.create,
self.flow.path,
self.flow.code,
connections=self._connections,
storage=self._storage,
log_path=log_path,
enable_stream_output=stream_output,
)
self._executor_proxy = ProxyFactory().create_executor_proxy(
self.flow.path,
self.flow.code,
connections=self._connections,
storage=self._storage,
log_path=log_path,
enable_stream_output=stream_output,
language=self.flow.language,
)

try:
yield self
finally:
if self.executor_proxy:
async_run_allowing_running_loop(self.executor_proxy.destroy_if_all_generators_exhausted)
async_run_allowing_running_loop(self.executor_proxy.destroy)

self._within_init_context = False

Expand Down
103 changes: 52 additions & 51 deletions src/promptflow-devkit/promptflow/_sdk/data/executable/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,12 @@
from utils import dict_iter_render_message, parse_image_content, parse_list_from_html, render_single_dict_message

from promptflow._constants import STREAMING_ANIMATION_TIME
from promptflow._sdk._submitter import TestSubmitter
from promptflow._sdk._submitter.utils import resolve_generator, resolve_generator_output_with_cache
from promptflow._utils.flow_utils import dump_flow_result
from promptflow._utils.multimedia_utils import convert_multimedia_data_to_base64, persist_multimedia_data
from promptflow.client import load_flow

invoker = None


def start():
def clear_chat() -> None:
Expand Down Expand Up @@ -71,55 +70,45 @@ def submit(**kwargs) -> None:
render_message("user", kwargs)
# Force append chat history to kwargs
if is_chat_flow:
response = run_flow({chat_history_input_name: get_chat_history_from_session(), **kwargs})
else:
response = run_flow(kwargs)

if response.run_info.status.value == "Failed":
raise Exception(response.run_info.error)

if is_streaming:
# Display assistant response in chat message container
kwargs[chat_history_input_name] = get_chat_history_from_session()

flow = load_flow(flow_path)
with TestSubmitter(flow=flow, flow_context=flow.context).init(stream_output=is_streaming) as submitter:
# can't exit the context manager before the generator is fully consumed
response = submitter.flow_test(inputs=kwargs, allow_generator_output=is_streaming)

if response.run_info.status.value == "Failed":
raise Exception(response.run_info.error)

if is_streaming:
# Display assistant response in chat message container
with container:
with st.chat_message("assistant"):
message_placeholder = st.empty()
full_response = f"{chat_output_name}: "
prefix_length = len(full_response)
chat_output = response.output[chat_output_name]
if isinstance(chat_output, GeneratorType):
# Simulate stream of response with milliseconds delay
for chunk in resolve_generator_output_with_cache(
chat_output, generator_record, generator_key=f"run.outputs.{chat_output_name}"
):
# there should be no extra spaces between adjacent chunks?
full_response += chunk
time.sleep(STREAMING_ANIMATION_TIME)
# Add a blinking cursor to simulate typing
message_placeholder.markdown(full_response + "▌")
message_placeholder.markdown(full_response)
response.output[chat_output_name] = full_response[prefix_length:]
post_process_dump_result(response, session_state_history, generator_record=generator_record)
return

# generator in response has been fully consumed here
resolved_outputs = post_process_dump_result(
response, session_state_history, generator_record=generator_record
)
with container:
with st.chat_message("assistant"):
message_placeholder = st.empty()
full_response = f"{chat_output_name}: "
prefix_length = len(full_response)
chat_output = response.output[chat_output_name]
if isinstance(chat_output, GeneratorType):
# Simulate stream of response with milliseconds delay
for chunk in resolve_generator_output_with_cache(
chat_output, generator_record, generator_key=f"run.outputs.{chat_output_name}"
):
# there should be no extra spaces between adjacent chunks?
full_response += chunk
time.sleep(STREAMING_ANIMATION_TIME)
# Add a blinking cursor to simulate typing
message_placeholder.markdown(full_response + "▌")
message_placeholder.markdown(full_response)
response.output[chat_output_name] = full_response[prefix_length:]
post_process_dump_result(response, session_state_history, generator_record=generator_record)
return

resolved_outputs = post_process_dump_result(response, session_state_history, generator_record=generator_record)
with container:
render_message("assistant", resolved_outputs)

def run_flow(data: dict) -> dict:
global invoker
if not invoker:
if flow_path:
flow = Path(flow_path)
else:
flow = Path(__file__).parent / "flow"
if flow.is_dir():
os.chdir(flow)
else:
os.chdir(flow.parent)
invoker = load_flow(flow)
invoker.context.streaming = is_streaming
result = invoker.invoke(data)
return result
render_message("assistant", resolved_outputs)

image = Image.open(Path(__file__).parent / "logo.png")
st.set_page_config(
Expand Down Expand Up @@ -203,12 +192,24 @@ def run_flow(data: dict) -> dict:
st.rerun()


def resolve_flow_path(_from_config):
if _from_config:
result = Path(_from_config)
else:
result = Path(__file__).parent / "flow"
if result.is_dir():
os.chdir(result)
else:
os.chdir(result.parent)
return result


if __name__ == "__main__":
with open(Path(__file__).parent / "config.json", "r") as f:
config = json.load(f)
is_chat_flow = config["is_chat_flow"]
chat_history_input_name = config["chat_history_input_name"]
flow_path = config["flow_path"]
flow_path = resolve_flow_path(config["flow_path"])
flow_name = config["flow_name"]
flow_inputs = config["flow_inputs"]
label = config["label"]
Expand Down
3 changes: 3 additions & 0 deletions src/promptflow-devkit/promptflow/_sdk/entities/_flow/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,9 @@ def invoke(self, inputs: dict) -> "LineResult":
from promptflow._sdk._submitter import TestSubmitter

if self.language == FlowLanguage.CSharp:
# TODO 3033484: we shouldn't use context manager here for stream_output, as resource need to be released
# after the returned generator is fully consumed. If we use context manager here, the resource must be
# released before the generator is consumed.
with TestSubmitter(flow=self, flow_context=self.context).init(
stream_output=self.context.streaming
) as submitter:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,9 @@ def _test_with_ui(
:return: The result of flow or node
:rtype: dict
"""
# TODO : it's not clear why we need this method, please help verify:
# 1. why we can't use test method directly
# 2. is _chat_with_ui still necessary
experiment = kwargs.pop("experiment", None)
if Configuration.get_instance().is_internal_features_enabled() and experiment:
result = self.test(
Expand All @@ -286,6 +289,8 @@ def _test_with_ui(
environment_variables=environment_variables,
variant=variant,
node=node,
allow_generator_output=kwargs.pop("allow_generator_output", False),
stream_output=kwargs.pop("stream_output", False),
experiment=experiment,
output_path=output_path,
)
Expand Down
21 changes: 2 additions & 19 deletions src/promptflow-devkit/promptflow/batch/_base_executor_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,29 +209,14 @@ async def _deactivate_generator(self):
self._active_generator_count -= 1

async def _all_generators_exhausted(self):
"""For streaming output, we will return a generator for the output, and the execution service
should keep alive until the generator is exhausted.
"""For streaming output in api-based executor proxy, we will return a generator for the output,
and the execution service should keep alive until the generator is exhausted.
This method is to check if all generators are exhausted.
"""
# the count should never be negative, but still check it here for safety
return self._active_generator_count <= 0

async def destroy_if_all_generators_exhausted(self):
"""
client.stream api in exec_line function won't pass all response one time.
For API-based streaming chat flow, if executor proxy is destroyed, it will kill service process
and connection will close. this will result in subsequent getting generator content failed.
Besides, external caller usually wait for the destruction of executor proxy before it can continue and iterate
the generator content, so we can't keep waiting here.
On the other hand, the subprocess for execution service is not started in detach mode;
it wll exit when parent process exit. So we simply skip the destruction here.
"""
if await self._all_generators_exhausted():
await self.destroy()

# endregion

@classmethod
Expand Down Expand Up @@ -320,8 +305,6 @@ def exec_line(
:type index: Optional[int]
:param run_id: The id of the run.
:type run_id: Optional[str]
:param enable_stream_output: Whether to enable the stream output.
:type enable_stream_output: bool
:return: The line result.
:rtype: LineResult
"""
Expand Down
31 changes: 29 additions & 2 deletions src/promptflow-devkit/promptflow/batch/_csharp_executor_proxy.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
# ---------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# ---------------------------------------------------------
import platform
import signal
import socket
import subprocess
import uuid
from pathlib import Path
from typing import NoReturn, Optional

from promptflow._core._errors import UnexpectedError
from promptflow._sdk._constants import OSType
from promptflow._utils.flow_utils import is_flex_flow
from promptflow.batch._csharp_base_executor_proxy import CSharpBaseExecutorProxy
from promptflow.storage._run_storage import AbstractRunStorage
Expand Down Expand Up @@ -146,11 +149,35 @@ async def create(
return executor_proxy

async def destroy(self):
"""Destroy the executor"""
"""Destroy the executor service.
client.stream api in exec_line function won't pass all response one time.
For API-based streaming chat flow, if executor proxy is destroyed, it will kill service process
and connection will close. this will result in subsequent getting generator content failed.
Besides, external caller usually wait for the destruction of executor proxy before it can continue and iterate
the generator content, so we can't keep waiting here.
On the other hand, the subprocess for execution service is not started in detach mode;
it wll exit when parent process exit. So we simply skip the destruction here.
"""

# TODO 3033484: update this after executor service support graceful shutdown
if not await self._all_generators_exhausted():
raise UnexpectedError(
message_format="The executor service is still handling a stream request "
"whose response is not fully consumed yet."
)

# process is not None, it means the executor service is started by the current executor proxy
# and should be terminated when the executor proxy is destroyed if the service is still active
if self._process and self._is_executor_active():
self._process.terminate()
if platform.system() == OSType.WINDOWS:
# send CTRL_C_EVENT to the process to gracefully terminate the service
self._process.send_signal(signal.CTRL_C_EVENT)
else:
# for Linux and MacOS, Popen.terminate() will send SIGTERM to the process
self._process.terminate()
try:
self._process.wait(timeout=5)
except subprocess.TimeoutExpired:
Expand Down

0 comments on commit a97a7cb

Please sign in to comment.