Skip to content

Commit

Permalink
made named pipe more modular, take advantage of related compounded me…
Browse files Browse the repository at this point in the history
…ssages
  • Loading branch information
jborean93 committed Feb 26, 2018
1 parent 8a16a86 commit 5bbd7e5
Show file tree
Hide file tree
Showing 8 changed files with 225 additions and 111 deletions.
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,9 @@ how the remote process will work. These args are;
* `priority`: (pypsexec.ProcessPriority) The priority level of the process, default `NORMAL_PRIORITY_CLASS`
* `remote_log_path`: (string) A path on the remote host to log the PAExec service details
* `timeout_seconds`: (int) The maximum time the process can run for, default is `0` (no timeout)
* `stdin`: (bytes) A byte string to send over the stdin pipe, does not work with `interactive=True` and `async=True`
* `stdout`: (pipe.OutputPipe) A class that implements pipe.OutputPipe that controls how the stdout output is processed and returned, will default to returning the byte string of the stdout. Is ignored when `interactive=True` and `asynchronous=True`
* `stderr`: (pipe.OutputPipe) A class that implements pipe.OutputPipe that controls how the stderr output is processed and returned, will default to returning the byte string of the stderr. Is ignored when `interactive=True` and `asynchronous=True`
* `stdin`: (bytes/generator) A byte string or generator that yields a byte string to send over the stdin pipe, does not work with `interactive=True` and `asynchronous=True`


## Logging
Expand Down
128 changes: 83 additions & 45 deletions pypsexec/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from pypsexec.paexec import PAExecMsg, PAExecMsgId, PAExecReturnBuffer, \
PAExecSettingsBuffer, PAExecSettingsMsg, PAExecStartBuffer, \
ProcessPriority, get_unique_id, paexec_out_stream
from pypsexec.pipe import InputPipe, OutputPipe, open_pipe
from pypsexec.pipe import InputPipe, OutputPipeBytes, open_pipe
from pypsexec.scmr import EnumServiceState, Service, ServiceType

if sys.version[0] == '2':
Expand Down Expand Up @@ -80,12 +80,12 @@ def create_service(self):
smb_tree.connect()
paexec_file = Open(smb_tree, self._exe_file)
log.debug("Creating open to PAExec file")
paexec_file.open(ImpersonationLevel.Impersonation,
FilePipePrinterAccessMask.FILE_WRITE_DATA,
FileAttributes.FILE_ATTRIBUTE_NORMAL,
ShareAccess.FILE_SHARE_READ,
CreateDisposition.FILE_OVERWRITE_IF,
CreateOptions.FILE_NON_DIRECTORY_FILE)
paexec_file.create(ImpersonationLevel.Impersonation,
FilePipePrinterAccessMask.FILE_WRITE_DATA,
FileAttributes.FILE_ATTRIBUTE_NORMAL,
ShareAccess.FILE_SHARE_READ,
CreateDisposition.FILE_OVERWRITE_IF,
CreateOptions.FILE_NON_DIRECTORY_FILE)
log.info("Creating PAExec executable at %s\\%s"
% (smb_tree.share_name, self._exe_file))
for (data, o) in paexec_out_stream(self.connection.max_write_size):
Expand Down Expand Up @@ -146,19 +146,33 @@ def cleanup(self):
smb_tree.connect()

share = Open(smb_tree, "")
share.open(ImpersonationLevel.Impersonation,
DirectoryAccessMask.FILE_READ_ATTRIBUTES |
DirectoryAccessMask.SYNCHRONIZE |
DirectoryAccessMask.FILE_LIST_DIRECTORY,
FileAttributes.FILE_ATTRIBUTE_DIRECTORY,
ShareAccess.FILE_SHARE_READ | ShareAccess.FILE_SHARE_WRITE |
ShareAccess.FILE_SHARE_DELETE,
CreateDisposition.FILE_OPEN,
CreateOptions.FILE_DIRECTORY_FILE)
query_msgs = [
share.create(ImpersonationLevel.Impersonation,
DirectoryAccessMask.FILE_READ_ATTRIBUTES |
DirectoryAccessMask.SYNCHRONIZE |
DirectoryAccessMask.FILE_LIST_DIRECTORY,
FileAttributes.FILE_ATTRIBUTE_DIRECTORY,
ShareAccess.FILE_SHARE_READ |
ShareAccess.FILE_SHARE_WRITE |
ShareAccess.FILE_SHARE_DELETE,
CreateDisposition.FILE_OPEN,
CreateOptions.FILE_DIRECTORY_FILE,
send=False),
share.query_directory("PAExec-*.exe",
FileInformationClass.FILE_NAMES_INFORMATION,
send=False),
share.close(False, send=False)
]
query_reqs = self.connection.send_compound([x[0] for x in query_msgs],
self.session.session_id,
smb_tree.tree_connect_id,
related=True)
# receive response for open and close
query_msgs[0][1](query_reqs[0])
query_msgs[2][1](query_reqs[2])
try:
files = share.query_directory("PAExec-*.exe",
FileInformationClass.
FILE_NAMES_INFORMATION)
# receive the response for query_directory
files = query_msgs[1][1](query_reqs[1])
except SMBResponseException as exc:
if exc.status != NtStatus.STATUS_NO_SUCH_FILE:
raise exc
Expand All @@ -175,7 +189,9 @@ def run_executable(self, executable, arguments=None, processors=None,
password=None, use_system_account=False,
working_dir=None, show_ui_on_win_logon=False,
priority=ProcessPriority.NORMAL_PRIORITY_CLASS,
remote_log_path=None, timeout_seconds=0, stdin=None):
remote_log_path=None, timeout_seconds=0,
stdout=OutputPipeBytes, stderr=OutputPipeBytes,
stdin=None):
"""
Runs a command over the PAExec/PSExec interface based on the options
provided. At a minimum the executable argument is required and the
Expand Down Expand Up @@ -225,12 +241,19 @@ def run_executable(self, executable, arguments=None, processors=None,
log files for the PAExec service process (for debugging purposes)
:param timeout_seconds: (Int) A timeout that will force the PAExec
process to stop once reached, default is 0 (no timeout)
:param stdin: (Bytes) A byte string to send over the stdin pipe once
the process has been spawned. This must be a bytes string and not
a normal Python string
:param stdout: (pipe.OutputPipe) An class that implements of
pipe.OutputPipe that handles the Named Pipe stdout output. The
default is pipe.OutputPipeBytes which returns a byte string of the
stdout
:param stderr: (pipe.OutputPipe) An class that implements of
pipe.OutputPipe that handles the Named Pipe stderr output. The
default is pipe.OutputPipeBytes which returns a byte string of the
stderr
:param stdin: Either a byte string of generator that yields multiple
byte strings to send over the stdin pipe.
:return: Tuple(stdout, stderr, rc)
stdout: (Bytes) The stdout as a byte string from the process
stderr: (Bytes) The stderr as a byte string from the process
stdout: (Bytes) The stdout.get_bytes() return result
stderr: (Bytes) The stderr.get_bytes() return result
rc: (Int) The return code of the process (The pid of the async
process when async=True)
"""
Expand Down Expand Up @@ -306,25 +329,32 @@ def run_executable(self, executable, arguments=None, processors=None,
# create a pipe for stdout, stderr, and stdin and run in a separate
# thread
log.info("Connecting to remote pipes to retrieve output")
stdout_pipe = OutputPipe(smb_tree, self._stdout_pipe_name)
stdout_pipe = stdout(smb_tree, self._stdout_pipe_name)
stdout_pipe.start()
stderr_pipe = OutputPipe(smb_tree, self._stderr_pipe_name)
stderr_pipe = stderr(smb_tree, self._stderr_pipe_name)
stderr_pipe.start()
stdin_pipe = InputPipe(smb_tree, self._stdin_pipe_name)
stdin_pipe.start()

# wait until the stdout and stderr pipes have sent their first
# response
log.debug("Waiting for stdout pipe to send first request")
stdout_pipe.pipe_buffer.get()
while not stdout_pipe.sent_first:
pass
log.debug("Waiting for stderr pipe to send first request")
stderr_pipe.pipe_buffer.get()
while not stderr_pipe.sent_first:
pass

# send any input if there was any
if stdin:
if stdin and isinstance(stdin, bytes):
log.info("Sending stdin bytes over stdin pipe: %s"
% self._stdin_pipe_name)
stdin_pipe.pipe_buffer.put(stdin)
elif stdin:
log.info("Sending stdin generator bytes over stdin pipe: %s"
% self._stdin_pipe_name)
for stdin_data in stdin():
stdin_pipe.pipe_buffer.put(stdin_data)

# read the final response from the process
log.info("Reading result of PAExec process")
Expand All @@ -337,11 +367,11 @@ def run_executable(self, executable, arguments=None, processors=None,
stderr_pipe.close()
stdin_pipe.close()
log.info("Gettings stdout and stderr from pipe buffer queue")
stdout = self._empty_queue(stdout_pipe.pipe_buffer)
stderr = self._empty_queue(stderr_pipe.pipe_buffer)
stdout_out = stdout_pipe.get_output()
stderr_bytes = stderr_pipe.get_output()
else:
stdout = None
stderr = None
stdout_out = None
stderr_bytes = None

log.info("Closing main PAExec pipe")
main_pipe.close()
Expand All @@ -360,10 +390,8 @@ def run_executable(self, executable, arguments=None, processors=None,

return_code = rc['return_code'].get_value()
log.info("Process finished with exit code: %d" % return_code)
log.debug("STDOUT: %s" % stdout)
log.debug("STDERR: %s" % stderr)
log.debug("RC: %d" % return_code)
return stdout, stderr, return_code
return stdout_out, stderr_bytes, return_code

def _encode_string(self, string):
return string.encode('utf-16-le') if string else None
Expand All @@ -380,11 +408,21 @@ def _empty_queue(self, queue):

def _delete_file(self, tree, name):
file_open = Open(tree, name)
file_open.open(ImpersonationLevel.Impersonation,
FilePipePrinterAccessMask.DELETE,
FileAttributes.FILE_ATTRIBUTE_NORMAL,
0,
CreateDisposition.FILE_OPEN_IF,
CreateOptions.FILE_NON_DIRECTORY_FILE |
CreateOptions.FILE_DELETE_ON_CLOSE)
file_open.close(get_attributes=False)
msgs = [
file_open.create(ImpersonationLevel.Impersonation,
FilePipePrinterAccessMask.DELETE,
FileAttributes.FILE_ATTRIBUTE_NORMAL,
0,
CreateDisposition.FILE_OPEN_IF,
CreateOptions.FILE_NON_DIRECTORY_FILE |
CreateOptions.FILE_DELETE_ON_CLOSE,
send=False),
file_open.close(get_attributes=False, send=False)
]
reqs = self.connection.send_compound([x[0] for x in msgs],
sid=self.session.session_id,
tid=tree.tree_connect_id,
related=True)
# remove the responses from the SMB outstanding requests
msgs[0][1](reqs[0])
msgs[1][1](reqs[1])
96 changes: 66 additions & 30 deletions pypsexec/pipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
import threading
import warnings

from abc import ABCMeta, abstractmethod
from six import with_metaclass
from smbprotocol.connection import NtStatus
from smbprotocol.exceptions import SMBResponseException
from smbprotocol.ioctl import CtlCode, IOCTLFlags, SMB2IOCTLRequest
Expand Down Expand Up @@ -67,13 +69,13 @@ def open_pipe(tree, name, access_mask, fsctl_wait=False):
% name)
tree.session.connection.receive(request)

pipe.open(ImpersonationLevel.Impersonation,
access_mask,
FileAttributes.FILE_ATTRIBUTE_NORMAL,
0,
CreateDisposition.FILE_OPEN,
CreateOptions.FILE_NON_DIRECTORY_FILE |
CreateOptions.FILE_SYNCHRONOUS_IO_NONALERT)
pipe.create(ImpersonationLevel.Impersonation,
access_mask,
FileAttributes.FILE_ATTRIBUTE_NORMAL,
0,
CreateDisposition.FILE_OPEN,
CreateOptions.FILE_NON_DIRECTORY_FILE |
CreateOptions.FILE_SYNCHRONOUS_IO_NONALERT)

return pipe

Expand Down Expand Up @@ -112,22 +114,20 @@ class _NamedPipe(threading.Thread):

def __init__(self, tree, name):
super(_NamedPipe, self).__init__()
self.pipe_buffer = Queue()
log.info("Initialising Named Pipe with the name: %s" % name)
self.name = name
self.connection = tree.session.connection
self.sid = tree.session.session_id
self.tid = tree.tree_connect_id
self.pipe = open_pipe(tree, name, self.ACCESS_MASK,
self.pipe = open_pipe(tree, name,
self.ACCESS_MASK,
fsctl_wait=True)

def _close_thread(self):
# waits until the Thread if closed for 5 seconds otherwise it throws
# a warning
log.debug("Waiting for pipe thread of pipe %s to close" % self.name)
self.join(timeout=5)
if self.is_alive():
warnings.warn("Timeout while waiting for pipe thread of pipe %s to"
" close: %s" % self.name, TheadCloseTimeoutWarning)
warnings.warn("Timeout while waiting for pipe thread to close: %s"
% self.name, TheadCloseTimeoutWarning)


class InputPipe(_NamedPipe):
Expand All @@ -141,11 +141,12 @@ class InputPipe(_NamedPipe):
FilePipePrinterAccessMask.SYNCHRONIZE

def __init__(self, tree, name):
super(InputPipe, self).__init__(tree, name)
self.pipe_buffer = Queue()
self.close_bytes = os.urandom(16)
log.info("Initialising Input Named Pipe with the name: %s" % name)
log.debug("Shutdown bytes for input pipe: %s"
% binascii.hexlify(self.close_bytes))
super(InputPipe, self).__init__(tree, name)
self.pipe_buffer = Queue()

def run(self):
try:
Expand Down Expand Up @@ -174,7 +175,7 @@ def close(self):
self._close_thread()


class OutputPipe(_NamedPipe):
class OutputPipe(with_metaclass(ABCMeta, _NamedPipe)):

ACCESS_MASK = FilePipePrinterAccessMask.FILE_READ_DATA | \
FilePipePrinterAccessMask.FILE_READ_ATTRIBUTES | \
Expand All @@ -183,14 +184,14 @@ class OutputPipe(_NamedPipe):
FilePipePrinterAccessMask.SYNCHRONIZE

def __init__(self, tree, name):
log.info("Initialising Output Named Pipe with the name: %s" % name)
"""Generic Output/Read Pipe that stores the output read in a Queue"""
super(OutputPipe, self).__init__(tree, name)
self.sent_first = False

def run(self):
# read from the pipe and close it at the end
try:
log.debug("Starting thread of Output Named Pipe: %s" % self.name)
sent_first = False
while True:
# get the read request and sent it so we can let the parent
# thread know it can continue before we are blocked by the read
Expand All @@ -200,26 +201,27 @@ def run(self):
request = self.connection.send(read_msg,
sid=self.sid,
tid=self.tid)
if not sent_first:
log.debug("Sending data to parent thread saying the first "
"read has been sent for Output Named Pipe: %s"
% self.name)
self.pipe_buffer.put(None)
sent_first = True

self.sent_first = True
try:
log.debug("Reading SMB Read response for Output Named "
"Pipe: %s" % self.name)
pipe_out = read_resp_func(request)
log.debug("Received SMB Read response for Output Named "
"Pipe: %s" % self.name)
self.pipe_buffer.put(pipe_out)
self.handle_output(pipe_out)
except SMBResponseException as exc:
# if the error was the pipe was broken exit the loop
# otherwise the error is serious so throw it
if exc.status == NtStatus.STATUS_PIPE_BROKEN:
log.debug("STATUS_PIPE_BROKEN received for Output "
"Named Pipe: %s, ending thread" % self.name)
close_errors = [
NtStatus.STATUS_PIPE_BROKEN,
NtStatus.STATUS_PIPE_CLOSING,
NtStatus.STATUS_PIPE_EMPTY,
NtStatus.STATUS_PIPE_DISCONNECTED
]
if exc.status in close_errors:
log.debug("%s received for Output Named Pipe: %s, "
"ending thread"
% (str(exc.header['status']), self.name))
break
else:
raise exc
Expand All @@ -228,6 +230,40 @@ def run(self):
self.pipe.close(get_attributes=False)
log.debug("Output Named Pipe %s thread finished" % self.name)

@abstractmethod
def handle_output(self, output):
"""
The method called in the running thread whenever any data was read
from the Named Pipe.
:param output: a byte string of the output that was received from the
Named Pipe
"""
pass # pragma: no cover

@abstractmethod
def get_output(self):
"""
Returns the stdout/stderr return value used in client.run_executable.
:return: The return object to return as part of the stdout/stderr
variable for client.run_executable
"""
pass # pragma: no cover

def close(self):
log.info("Closing Output Named Pipe: %s" % self.name)
self._close_thread()


class OutputPipeBytes(OutputPipe):

def __init__(self, tree, name):
self.pipe_buffer = b""
super(OutputPipeBytes, self).__init__(tree, name)

def handle_output(self, output):
self.pipe_buffer += output

def get_output(self):
return self.pipe_buffer
Loading

0 comments on commit 5bbd7e5

Please sign in to comment.