Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core][logging] Remove actor/task prefix from the logs shown on the driver process #47703

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 48 additions & 13 deletions python/ray/_private/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -554,6 +554,18 @@ def debugger_port(self):
worker_id = self.core_worker.get_worker_id()
return ray._private.state.get_worker_debugger_port(worker_id)

@property
def get_job_logging_config(self):
"""Get the job's logging config for this worker"""
if not hasattr(self, "core_worker"):
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In a test case like test_namespace_validation, ray.init() throws an exception (code) before initializing the core_worker. As a result, when the test calls ray.shutdown, which also calls get_job_logging_config, the core_worker doesn't exist.

return None
job_config = self.core_worker.get_job_config()
if job_config.serialized_py_logging_config == b"":
# py_logging_config is not set
return None
logging_config = pickle.loads(job_config.serialized_py_logging_config)
return logging_config

def set_debugger_port(self, port):
worker_id = self.core_worker.get_worker_id()
ray._private.state.update_worker_debugger_port(worker_id, port)
Expand Down Expand Up @@ -1908,7 +1920,7 @@ def custom_excepthook(type, value, tb):
sys.excepthook = custom_excepthook


def print_to_stdstream(data):
def print_to_stdstream(data, ignore_prefix: bool):
should_dedup = data.get("pid") not in ["autoscaler"]

if data["is_err"]:
Expand All @@ -1924,8 +1936,9 @@ def print_to_stdstream(data):
batches = [data]
sink = sys.stdout

# Ignore the prefix if the logging config is set.
for batch in batches:
print_worker_logs(batch, sink)
print_worker_logs(batch, sink, ignore_prefix)


# Start time of this process, used for relative time logs.
Expand Down Expand Up @@ -2016,7 +2029,9 @@ def time_string() -> str:
_worker_logs_enabled = True


def print_worker_logs(data: Dict[str, str], print_file: Any):
def print_worker_logs(
data: Dict[str, str], print_file: Any, ignore_prefix: bool = False
):
if not _worker_logs_enabled:
return

Expand Down Expand Up @@ -2096,11 +2111,19 @@ def color_for(data: Dict[str, str], line: str) -> str:
else:
color_pre = color_for(data, line)
color_post = colorama.Style.RESET_ALL
print(
f"{color_pre}({prefix_for(data)}{pid}{ip_prefix}){color_post} "
f"{message_for(data, line)}",
file=print_file,
)

if ignore_prefix:
print(
f"{message_for(data, line)}",
file=print_file,
)
else:
print(
f"{color_pre}({prefix_for(data)}{pid}{ip_prefix}){color_post} "
f"{message_for(data, line)}",
file=print_file,
)

# Restore once at end of batch to avoid excess hiding/unhiding of tqdm.
restore_tqdm()

Expand Down Expand Up @@ -2150,7 +2173,9 @@ def listen_error_messages(worker, threads_stopped):
error_message = _internal_kv_get(ray_constants.DEBUG_AUTOSCALING_ERROR)
if error_message is not None:
logger.warning(error_message.decode())

# If the job's logging config is set, don't add the prefix
# (task/actor's name and its PID) to the logs.
ignore_prefix = global_worker.get_job_logging_config is not None
while True:
# Exit if received a signal that the thread should stop.
if threads_stopped.is_set():
Expand All @@ -2171,7 +2196,8 @@ def listen_error_messages(worker, threads_stopped):
"lines": [error_message],
"pid": "raylet",
"is_err": False,
}
},
ignore_prefix,
)
except (OSError, ConnectionError) as e:
logger.error(f"listen_error_messages: {e}")
Expand Down Expand Up @@ -2449,9 +2475,16 @@ def connect(
)
worker.listener_thread.daemon = True
worker.listener_thread.start()
# If the job's logging config is set, don't add the prefix
# (task/actor's name and its PID) to the logs.
ignore_prefix = global_worker.get_job_logging_config is not None
print_to_stdstream_with_ignore = functools.partial(
print_to_stdstream, ignore_prefix=ignore_prefix
)

if log_to_driver:
global_worker_stdstream_dispatcher.add_handler(
"ray_print_logs", print_to_stdstream
"ray_print_logs", print_to_stdstream_with_ignore
)
worker.logger_thread = threading.Thread(
target=worker.print_logs, name="ray_print_logs"
Expand Down Expand Up @@ -2493,10 +2526,12 @@ def disconnect(exiting_interpreter=False):
worker.logger_thread.join()
worker.threads_stopped.clear()

# Ignore the prefix if the logging config is set.
ignore_prefix = worker.get_job_logging_config is not None
for leftover in stdout_deduplicator.flush():
print_worker_logs(leftover, sys.stdout)
print_worker_logs(leftover, sys.stdout, ignore_prefix)
for leftover in stderr_deduplicator.flush():
print_worker_logs(leftover, sys.stderr)
print_worker_logs(leftover, sys.stderr, ignore_prefix)
global_worker_stdstream_dispatcher.remove_handler("ray_print_logs")

worker.node = None # Disconnect the worker from the node.
Expand Down
24 changes: 24 additions & 0 deletions python/ray/tests/test_logging_2.py
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,30 @@ def existing_factory(*args, **kwargs):
assert record.__dict__["existing_factory"]


def test_text_mode_no_prefix(shutdown_only):
"""
If logging_config is set, remove the prefix that contains
the actor or task's name and their PIDs.
"""
script = """
import ray
import logging
ray.init(
logging_config=ray.LoggingConfig(encoding="TEXT")
)
@ray.remote
class MyActor:
def print_message(self):
logger = logging.getLogger(__name__)
logger.info("This is a Ray actor")
my_actor = MyActor.remote()
ray.get(my_actor.print_message.remote())
"""
stderr = run_string_as_driver(script)
assert "This is a Ray actor" in stderr
assert "(MyActor pid=" not in stderr


if __name__ == "__main__":
if os.environ.get("PARALLEL_CI"):
sys.exit(pytest.main(["-n", "auto", "--boxed", "-vs", __file__]))
Expand Down