added api2 handler and server
This commit is contained in:
parent
e4cd3b064f
commit
e9c0730c12
1 changed files with 186 additions and 26 deletions
|
@ -32,7 +32,7 @@ except ImportError:
|
|||
sys.path.append("/usr/local/lib64/python2.7/site-packages")
|
||||
from core import pycore
|
||||
from core.constants import *
|
||||
from core.api import coreapi
|
||||
from core.api import coreapi, coreapi2
|
||||
from core.coreobj import PyCoreNet
|
||||
from core.misc.utils import hexdump, daemonize, cmdresult, mutedetach, closeonexec
|
||||
from core.misc.xmlsession import opensessionxml, savesessionxml
|
||||
|
@ -286,29 +286,37 @@ class CoreRequestHandler(SocketServer.BaseRequestHandler):
|
|||
|
||||
try:
|
||||
replies = msghandler(msg)
|
||||
for reply in replies:
|
||||
if self.debug:
|
||||
msgtype, msgflags, msglen = \
|
||||
coreapi.CoreMessage.unpackhdr(reply)
|
||||
try:
|
||||
rmsg = coreapi.msg_class(msgtype)(msgflags,
|
||||
reply[:coreapi.CoreMessage.hdrsiz],
|
||||
reply[coreapi.CoreMessage.hdrsiz:])
|
||||
except KeyError:
|
||||
# multiple TLVs of same type cause KeyError exception
|
||||
rmsg = "CoreMessage (type %d flags %d length %d)" % \
|
||||
(msgtype, msgflags, msglen)
|
||||
self.info("%s: reply msg:\n%s" %
|
||||
(threading.currentThread().getName(), rmsg))
|
||||
try:
|
||||
self.sendall(reply)
|
||||
except Exception, e:
|
||||
self.warn("Error sending reply data: %s" % e)
|
||||
self.dispatchreplies(replies)
|
||||
except Exception, e:
|
||||
self.warn("%s: exception while handling msg:\n%s\n%s" %
|
||||
(threading.currentThread().getName(), msg,
|
||||
traceback.format_exc()))
|
||||
|
||||
|
||||
# Added to allow the API2 handler to define a different behavior when replying
|
||||
# to messages from clients
|
||||
def dispatchreplies(self, reply):
|
||||
''' Dispatch replies to a handled message.
|
||||
'''
|
||||
for reply in replies:
|
||||
if self.debug:
|
||||
msgtype, msgflags, msglen = \
|
||||
coreapi.CoreMessage.unpackhdr(reply)
|
||||
try:
|
||||
rmsg = coreapi.msg_class(msgtype)(msgflags,
|
||||
reply[:coreapi.CoreMessage.hdrsiz],
|
||||
reply[coreapi.CoreMessage.hdrsiz:])
|
||||
except KeyError:
|
||||
# multiple TLVs of same type cause KeyError exception
|
||||
rmsg = "CoreMessage (type %d flags %d length %d)" % \
|
||||
(msgtype, msgflags, msglen)
|
||||
self.info("%s: reply msg:\n%s" %
|
||||
(threading.currentThread().getName(), rmsg))
|
||||
try:
|
||||
self.sendall(reply)
|
||||
except Exception, e:
|
||||
self.warn("Error sending reply data: %s" % e)
|
||||
|
||||
def handle(self):
|
||||
''' Handle a new connection request from a client. Dispatch to the
|
||||
recvmsg() method for receiving data into CORE API messages, and
|
||||
|
@ -924,7 +932,9 @@ class CoreRequestHandler(SocketServer.BaseRequestHandler):
|
|||
try:
|
||||
self.info("executing '%s'" % ex)
|
||||
if isinstance(self.server, CoreUdpServer):
|
||||
server = self.server.tcpserver
|
||||
server = self.server.mainserver
|
||||
elif isinstance(self.server, CoreApi2Server):
|
||||
server = self.server.mainserver
|
||||
else:
|
||||
server = self.server
|
||||
if msg.flags & coreapi.CORE_API_STR_FLAG:
|
||||
|
@ -1272,8 +1282,8 @@ class CoreDatagramRequestHandler(CoreRequestHandler):
|
|||
self.nodestatusreq = {}
|
||||
self.master = False
|
||||
self.session = None
|
||||
self.verbose = bool(server.tcpserver.cfg['verbose'].lower() == "true")
|
||||
self.debug = bool(server.tcpserver.cfg['debug'].lower() == "true")
|
||||
self.verbose = bool(server.mainserver.cfg['verbose'].lower() == "true")
|
||||
self.debug = bool(server.mainserver.cfg['debug'].lower() == "true")
|
||||
SocketServer.BaseRequestHandler.__init__(self, request,
|
||||
client_address, server)
|
||||
|
||||
|
@ -1325,7 +1335,7 @@ class CoreDatagramRequestHandler(CoreRequestHandler):
|
|||
#self.info("UDP message has session numbers: %s" % sids)
|
||||
if len(sids) > 0:
|
||||
for sid in sids:
|
||||
sess = self.server.tcpserver.getsession(sessionid=sid,
|
||||
sess = self.server.mainserver.getsession(sessionid=sid,
|
||||
useexisting=True)
|
||||
if sess:
|
||||
self.session = sess
|
||||
|
@ -1336,7 +1346,7 @@ class CoreDatagramRequestHandler(CoreRequestHandler):
|
|||
(sid, msg.typestr()))
|
||||
else:
|
||||
# no session specified, find an existing one
|
||||
sess = self.server.tcpserver.getsession(sessionid=0,
|
||||
sess = self.server.mainserver.getsession(sessionid=0,
|
||||
useexisting=True)
|
||||
if sess or msg.msgtype == coreapi.CORE_API_REG_MSG:
|
||||
self.session = sess
|
||||
|
@ -1360,6 +1370,108 @@ class CoreDatagramRequestHandler(CoreRequestHandler):
|
|||
|
||||
|
||||
|
||||
|
||||
|
||||
class CoreApi2RequestHandler(CoreRequestHandler):
|
||||
''' A child of the CoreRequestHandler class for handling API 2 specification
|
||||
messages. **TODO: Verify this statement: 'No new session is created; messages are handled immediately or
|
||||
sometimes queued on existing session handlers.'
|
||||
'''
|
||||
|
||||
def __init__(self, request, client_address, server):
|
||||
self.msghandler = {
|
||||
coreapi.CORE_API_NODE_MSG: self.handlenodemsg,
|
||||
coreapi.CORE_API_LINK_MSG: self.handlelinkmsg,
|
||||
coreapi.CORE_API_EXEC_MSG: self.handleexecmsg,
|
||||
coreapi.CORE_API_REG_MSG: self.handleregmsg,
|
||||
coreapi.CORE_API_CONF_MSG: self.handleconfmsg,
|
||||
coreapi.CORE_API_FILE_MSG: self.handlefilemsg,
|
||||
coreapi.CORE_API_IFACE_MSG: self.handleifacemsg,
|
||||
coreapi.CORE_API_EVENT_MSG: self.handleeventmsg,
|
||||
coreapi.CORE_API_SESS_MSG: self.handlesessionmsg,
|
||||
}
|
||||
self.nodestatusreq = {}
|
||||
self.master = False
|
||||
self.session = None
|
||||
self.verbose = bool(server.mainserver.cfg['verbose'].lower() == "true")
|
||||
self.debug = bool(server.mainserver.cfg['debug'].lower() == "true")
|
||||
SocketServer.BaseRequestHandler.__init__(self, request,
|
||||
client_address, server)
|
||||
|
||||
def setup(self):
|
||||
''' Client has connected, set up a new connection.
|
||||
'''
|
||||
if self.verbose:
|
||||
self.info("new API 2 connection: %s:%s" % self.client_address)
|
||||
|
||||
def handle(self):
|
||||
port = self.request.getpeername()[1]
|
||||
self.session = self.server.mainserver.getsession(sessionid = port,
|
||||
useexisting = False)
|
||||
self.session.connect(self)
|
||||
while True:
|
||||
try:
|
||||
data = self.recvmsg()
|
||||
msgs = coreapi2.CoreMessage.toLegacyApi(data)
|
||||
if msgs:
|
||||
for msg in msgs:
|
||||
print msg
|
||||
self.session.broadcast(self, msg)
|
||||
self.handlemsg(msg)
|
||||
except EOFError:
|
||||
break;
|
||||
except IOError, e:
|
||||
self.warn("API2 IOError: %s" % e)
|
||||
break;
|
||||
|
||||
def dispatchreplies(self, replies):
|
||||
''' Dispatch a reply to a previously received message.
|
||||
'''
|
||||
api2Replies = coreapi2.CoreMessage.toApi2(replies)
|
||||
for reply in api2Replies:
|
||||
try:
|
||||
self.sendall(reply)
|
||||
except Exception, e:
|
||||
self.warn("Error sending reply data: %s" % e)
|
||||
|
||||
def finish(self):
|
||||
return SocketServer.BaseRequestHandler.finish(self)
|
||||
|
||||
def recvmsg(self):
|
||||
''' Receive data, parse a CoreMessage and queue it onto an existing
|
||||
session handler's queue, if available.
|
||||
'''
|
||||
|
||||
try:
|
||||
hdr = self.request.recv(coreapi2.CoreMessage.hdrsiz)
|
||||
if self.debug and len(msghdr) > 0:
|
||||
self.info("received message header:\n%s" % hexdump(msghdr))
|
||||
except Exception, e:
|
||||
raise IOError, "error receiving API 2 header (%s)" % e
|
||||
|
||||
if len(hdr) != coreapi2.CoreMessage.hdrsiz:
|
||||
if len(hdr) == 0:
|
||||
raise EOFError, "client disconnected"
|
||||
else:
|
||||
print coreapi2.CoreMessage.hdrsiz, len(hdr)
|
||||
raise IOError, "invalid message header size"
|
||||
|
||||
dataToRead = struct.unpack(coreapi2.CoreMessage.hdrfmt, hdr)[0]
|
||||
data = ""
|
||||
while len(data) < dataToRead:
|
||||
data += self.request.recv(dataToRead - len(data))
|
||||
|
||||
return data
|
||||
|
||||
|
||||
|
||||
def queuemsg(self, msg):
|
||||
raise Exception, "TO BE IMPLEMENTED if needed" % msg.typestr()
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
class CoreServer(SocketServer.ThreadingMixIn, SocketServer.TCPServer):
|
||||
''' TCP server class, manages sessions and spawns request handlers for
|
||||
incoming connections.
|
||||
|
@ -1555,6 +1667,7 @@ class CoreServer(SocketServer.ThreadingMixIn, SocketServer.TCPServer):
|
|||
break
|
||||
finally:
|
||||
self._sessionslock.release()
|
||||
|
||||
return found
|
||||
|
||||
def startudp(self, server_address):
|
||||
|
@ -1567,6 +1680,23 @@ class CoreServer(SocketServer.ThreadingMixIn, SocketServer.TCPServer):
|
|||
self.udpthread.daemon = True
|
||||
self.udpthread.start()
|
||||
|
||||
#
|
||||
# API 2.0 server
|
||||
#
|
||||
def startapi2(self, api2_address):
|
||||
''' Start a thread running a TCP server on the given address. This
|
||||
server will communicate with remotes using API 2.0 specification
|
||||
'''
|
||||
self.api2server = CoreApi2Server(api2_address,
|
||||
CoreApi2RequestHandler,
|
||||
self)
|
||||
self.api2thread = threading.Thread(target = self.api2server.start)
|
||||
self.api2thread.daemon = True
|
||||
self.api2thread.start()
|
||||
|
||||
|
||||
|
||||
|
||||
class CoreUdpServer(SocketServer.ThreadingMixIn, SocketServer.UDPServer):
|
||||
''' UDP server class, manages sessions and spawns request handlers for
|
||||
incoming connections.
|
||||
|
@ -1574,11 +1704,11 @@ class CoreUdpServer(SocketServer.ThreadingMixIn, SocketServer.UDPServer):
|
|||
daemon_threads = True
|
||||
allow_reuse_address = True
|
||||
|
||||
def __init__(self, server_address, RequestHandlerClass, tcpserver):
|
||||
def __init__(self, server_address, RequestHandlerClass, mainserver):
|
||||
''' Server class initialization takes configuration data and calls
|
||||
the SocketServer constructor
|
||||
'''
|
||||
self.tcpserver = tcpserver
|
||||
self.mainserver = mainserver # tcpserver is the main server
|
||||
SocketServer.UDPServer.__init__(self, server_address,
|
||||
RequestHandlerClass)
|
||||
|
||||
|
@ -1589,6 +1719,30 @@ class CoreUdpServer(SocketServer.ThreadingMixIn, SocketServer.UDPServer):
|
|||
|
||||
|
||||
|
||||
class CoreApi2Server(SocketServer.ThreadingMixIn, SocketServer.TCPServer):
|
||||
''' TCP server for API version 2.
|
||||
'''
|
||||
daemon_threads = True
|
||||
allow_reuse_address = True
|
||||
|
||||
def __init__(self, server_address, RequestHandlerClass, mainserver):
|
||||
self.mainserver = mainserver
|
||||
sys.stdout.write("API2 server started, listening on: %s:%s\n" % server_address)
|
||||
sys.stdout.flush()
|
||||
SocketServer.TCPServer.__init__(self, server_address, RequestHandlerClass)
|
||||
|
||||
def start(self):
|
||||
self.serve_forever()
|
||||
|
||||
def setsessionmaster(self, handler):
|
||||
found = self.mainserver.setsessionmaster(handler)
|
||||
print "api2 setsessionmaster", found
|
||||
return found
|
||||
|
||||
def tosessionmsg(self, flags = 0):
|
||||
return self.mainserver.tosessionmsg(flags)
|
||||
|
||||
|
||||
def banner():
|
||||
''' Output the program banner printed to the terminal or log file.
|
||||
'''
|
||||
|
@ -1616,6 +1770,11 @@ def cored(cfg = None):
|
|||
sys.stdout.flush()
|
||||
server.startudp((host,port))
|
||||
closeonexec(server.udpserver.fileno())
|
||||
api2port = cfg['api2port']
|
||||
if api2port:
|
||||
server.startapi2((host, int(api2port)))
|
||||
closeonexec(server.api2server.fileno())
|
||||
|
||||
server.serve_forever()
|
||||
|
||||
def cleanup():
|
||||
|
@ -1676,6 +1835,7 @@ def getMergedConfig(filename):
|
|||
'daemonize' : 'False',
|
||||
'debug' : 'False',
|
||||
'execfile' : None,
|
||||
'api2port' : None,
|
||||
}
|
||||
|
||||
usagestr = "usage: %prog [-h] [options] [args]\n\n" + \
|
||||
|
|
Loading…
Reference in a new issue