diff --git a/flyteplugins/go/tasks/pluginmachinery/ioutils/remote_file_output_reader.go b/flyteplugins/go/tasks/pluginmachinery/ioutils/remote_file_output_reader.go index 27d7748701..4f8d678c14 100644 --- a/flyteplugins/go/tasks/pluginmachinery/ioutils/remote_file_output_reader.go +++ b/flyteplugins/go/tasks/pluginmachinery/ioutils/remote_file_output_reader.go @@ -17,6 +17,8 @@ type RemoteFileOutputReader struct { maxPayloadSize int64 } +var ErrRemoteFileExceedsMaxSize = errors.New("remote file exceeds max size") + func (r RemoteFileOutputReader) IsError(ctx context.Context) (bool, error) { metadata, err := r.store.Head(ctx, r.outPath.GetErrorPath()) if err != nil { @@ -81,7 +83,7 @@ func (r RemoteFileOutputReader) Exists(ctx context.Context) (bool, error) { } if md.Exists() { if md.Size() > r.maxPayloadSize { - return false, errors.Errorf("output file @[%s] is too large [%d] bytes, max allowed [%d] bytes", r.outPath.GetOutputPath(), md.Size(), r.maxPayloadSize) + return false, errors.Wrapf(ErrRemoteFileExceedsMaxSize, "output file @[%s] is too large [%d] bytes, max allowed [%d] bytes", r.outPath.GetOutputPath(), md.Size(), r.maxPayloadSize) } return true, nil } diff --git a/flytepropeller/pkg/controller/nodes/task/handler.go b/flytepropeller/pkg/controller/nodes/task/handler.go index 9ec47985c9..2adea27312 100644 --- a/flytepropeller/pkg/controller/nodes/task/handler.go +++ b/flytepropeller/pkg/controller/nodes/task/handler.go @@ -734,6 +734,15 @@ func (t *Handler) ValidateOutput(ctx context.Context, nodeID v1alpha1.NodeID, i } ok, err := r.Exists(ctx) if err != nil { + if regErrors.Is(err, ioutils.ErrRemoteFileExceedsMaxSize) { + return &io.ExecutionError{ + ExecutionError: &core.ExecutionError{ + Code: "OutputSizeExceeded", + Message: fmt.Sprintf("Remote output size exceeds max, err: [%s]", err.Error()), + }, + IsRecoverable: false, + }, nil + } logger.Errorf(ctx, "Failed to check if the output file exists. Error: %s", err.Error()) return nil, err }