#!/usr/bin/env python
# -*- coding: utf-8 -*-
import sys, os, tempfile, traceback, stat
import ctypes as C
ON_WIN = sys.platform.startswith("win")
if ON_WIN:
kernel32 = C.windll.kernel32
VOID = C.c_void_p
INT = C.c_int
BOOL = C.c_long
BYTE = C.c_ubyte
WORD = C.c_ushort
DWORD = C.c_ulong
LONG = C.c_long
HANDLE = C.c_void_p
NULL = None
PIPE_ACCESS_DUPLEX = 3
PIPE_TYPE_MESSAGE = 4
PIPE_READMODE_MESSAGE = 2
PIPE_WAIT = 0
PIPE_UNLIMITED_INSTANCES = 255
GENERIC_READ = 0x80000000
GENERIC_WRITE = 0x40000000
OPEN_EXISTING = 3
INVALID_HANDLE_VALUE = -1
class NamedPipe(object):
""" Work the the pipe
"""
BUFSIZE = 4096
def __init__(self, pipe_name, tmp_dir=None, mkfifo_path=None):
""" Startup info. Passme the pipe_name and optionally, on POSIX
the tmp_dir and the mkfifo exec path
"""
self.tmp_dir = tmp_dir or tempfile.gettempdir()
self.mkfifo_path = mkfifo_path or "/usr/bin/mkfifo"
self.set_pipe_name(pipe_name)
if ON_WIN:
self._win_h_pipe = -1
#
# Interface methods
#
def create_server_pipe(self):
""" Create the server
"""
if ON_WIN:
return self._win_create_server_pipe()
else:
return self._posix_create_server_pipe()
def create_client_pipe(self):
""" Create the client
"""
if ON_WIN:
return self._win_create_client_pipe()
else:
# No work needed into the posix OS
return self._posix_create_client_pipe()
def read_pipe(self):
""" Read from pipe
"""
if ON_WIN:
return self._win_read_pipe()
else:
return self._posix_read_pipe()
def write_pipe(self, buffer_):
""" Wrote into pipe
"""
if ON_WIN:
return self._win_write_pipe(buffer_)
else:
return self._posix_write_pipe(buffer_)
def close_pipe(self):
""" Close the pipe
"""
if ON_WIN:
return self._win_close_pipe(self._win_h_pipe)
else:
return self._posix_close_pipe()
#
# Property
#
def get_pipe_name(self):
""" Return the pipe name
"""
return self._pipe_name
def set_pipe_name(self, pipe_name):
""" Set the pipe name
"""
self._pipe_name = pipe_name
if ON_WIN:
self._pipe_name_b = self._win_create_pipe_name(pipe_name)
else:
self._pipe_name = os.path.join(self.tmp_dir, self._pipe_name)
pipe_name = property(get_pipe_name, set_pipe_name)
#
# Windows methods
#
def _win_create_server_pipe(self, pipe_name=None):
""" Create the server pipe
"""
if pipe_name is not None:
self.set_pipe_name(pipe_name)
h_pipe = kernel32.CreateNamedPipeA(self._pipe_name_b,
PIPE_ACCESS_DUPLEX, PIPE_TYPE_MESSAGE|PIPE_READMODE_MESSAGE|PIPE_WAIT,
PIPE_UNLIMITED_INSTANCES, self.BUFSIZE, self.BUFSIZE, 0, NULL)
if h_pipe == INVALID_HANDLE_VALUE: ERROR = kernel32.GetLastError()
else: ERROR = 0
self._win_h_pipe = h_pipe
return ERROR
def _win_create_client_pipe(self, pipe_name=None):
""" Attach a client to a already create pipe
"""
if pipe_name is not None:
self.set_pipe_name(pipe_name)
h_pipe = kernel32.CreateFileA(self._pipe_name_b, GENERIC_READ| GENERIC_WRITE,
0, NULL, OPEN_EXISTING, 0, NULL)
if h_pipe == INVALID_HANDLE_VALUE: ERROR = kernel32.GetLastError()
else: ERROR = 0
self._win_h_pipe = h_pipe
return ERROR
def _win_read_pipe(self):
""" Read from pipe and return if success and bytes read
"""
buffer_ = C.create_string_buffer(self.BUFSIZE)
buffer_read = DWORD()
success = kernel32.ReadFile(self._win_h_pipe, buffer_, len(buffer_),
C.byref(buffer_read), NULL)
return success, buffer_.value, buffer_read.value
def _win_write_pipe(self, buffer_):
""" Write the buffer into the pipe
"""
buffer_ = C.create_string_buffer(buffer_)
buffer_written = DWORD()
success = kernel32.WriteFile(self._win_h_pipe, buffer_,
len(buffer_), C.byref(buffer_written), NULL)
return success, buffer_written.value
def _win_close_pipe(self, h_pipe):
""" Close the pipe
"""
return kernel32.CloseHandle(h_pipe)
def _win_create_pipe_name(self, pipe_name):
""" Create the right name for the pipe
"""
return C.c_char_p("\\\\.\\pipe\\%s" % pipe_name)
#
# Posix methods
#
def _posix_create_server_pipe(self):
""" Create a posix pipe
"""
try:
if not os.path.exists(self._pipe_name):
os.mkfifo(self._pipe_name)
os.chmod(self._pipe_name, stat.S_IRUSR|stat.S_IRGRP|stat.S_IROTH|stat.S_IWUSR|stat.S_IWGRP|stat.S_IWOTH)
h_pipe = 1
ret_val = 0
except:
h_pipe = INVALID_HANDLE_VALUE
ret_val = traceback.format_exc()
print ret_val
return h_pipe, ret_val
def _posix_create_client_pipe(self):
""" Check only if there is a pipe in the path
"""
if not os.path.exists(self._pipe_name):
return 1
mode = os.stat(self._pipe_name)[stat.ST_MODE]
if not stat.S_ISFIFO(mode):
return 2
else:
return 0
def _posix_read_pipe(self):
""" Read from pipe
"""
pipe = open(self._pipe_name, "rb")
buffer_ = pipe.read(self.BUFSIZE)
pipe.close()
return True, buffer_, len(buffer_)
def _posix_write_pipe(self, buffer_):
""" Write to pipe
"""
try:
open(self._pipe_name, "wb").write(buffer_)
res = 1; num_bytest = len(buffer_)
except:
res = 1; num_bytest = 0
return res, num_bytest
def _posix_close_pipe(self):
""" Close and remove the pipe
"""
try:
return os.remove(self._pipe_name)
except:
return 1
if __name__ == "__main__":
np_srv = NamedPipe("test")
np_cli = NamedPipe("test")
if ON_WIN:
msg_err_srv = np_srv.create_server_pipe()
msg_err_cli = np_cli.create_client_pipe()
#print msg_err_srv, msg_err_cli
print np_srv.write_pipe("Luca cazzone")
print np_cli.read_pipe()
#np_srv.close_pipe()
np_cli.close_pipe()
else:
pass
|