1
0
mirror of https://github.com/haiwen/ccnet-server.git synced 2025-09-01 21:07:18 +00:00

Initial commit of Ccnet server.

This commit is contained in:
Jiaqiang Xu
2016-08-18 17:39:55 +08:00
commit ed8404c061
206 changed files with 37441 additions and 0 deletions

13
python/ccnet/Makefile.am Normal file
View File

@@ -0,0 +1,13 @@
ccnetdir=${pyexecdir}/ccnet
ccnet_PYTHON = __init__.py errors.py status_code.py utils.py \
packet.py message.py \
client.py sync_client.py \
pool.py rpc.py
ccnet_asyncdir = ${ccnetdir}/async
ccnet_async_PYTHON = async/__init__.py \
async/async_client.py async/processor.py \
async/rpcserverproc.py async/sendcmdproc.py \
async/mqclientproc.py async/timer.py

7
python/ccnet/__init__.py Normal file
View File

@@ -0,0 +1,7 @@
from ccnet.errors import NetworkError
from ccnet.sync_client import SyncClient
from ccnet.pool import ClientPool
from ccnet.rpc import RpcClientBase, CcnetRpcClient, CcnetThreadedRpcClient
from ccnet.message import Message

View File

@@ -0,0 +1,14 @@
'''
@module: ccnet.async
@description: The async client of ccnet depends on python-libevent,
so we move it to a standalone package.
'''
from .async_client import AsyncClient
from .processor import Processor
from .rpcserverproc import RpcServerProc
from .sendcmdproc import SendCmdProc
from .mqclientproc import MqClientProc
from .timer import Timer

View File

@@ -0,0 +1,250 @@
import logging
import libevent
from ccnet.client import Client, parse_update, parse_response
from ccnet.packet import response_to_packet, parse_header, Packet
from ccnet.packet import to_response_id, to_master_id, to_slave_id, to_packet_id
from ccnet.packet import CCNET_MSG_REQUEST, CCNET_MSG_UPDATE, CCNET_MSG_RESPONSE, \
CCNET_HEADER_LENGTH, CCNET_MAX_PACKET_LENGTH
from ccnet.status_code import SC_PROC_DONE, SC_PROC_DEAD, SS_PROC_DEAD, \
SC_UNKNOWN_SERVICE, SC_PROC_KEEPALIVE, SS_PROC_KEEPALIVE, SC_PERM_ERR
from ccnet.status_code import PROC_NO_SERVICE, PROC_PERM_ERR, \
PROC_BAD_RESP, PROC_REMOTE_DEAD
from ccnet.errors import NetworkError
from .processor import Processor
from .sendcmdproc import SendCmdProc
from .mqclientproc import MqClientProc
__all__ = [
'AsyncClient',
]
def debug_print(msg):
print msg
class AsyncClient(Client):
'''Async mode client'''
def __init__(self, config_dir, event_base, central_config_dir=None):
Client.__init__(self, config_dir, central_config_dir)
self.proc_types = {}
self.procs = {}
self.register_processors()
self._bev = None
self._evbase = event_base
def get_event_base(self):
return self._evbase
def add_processor(self, proc):
self.procs[proc.id] = proc
def remove_processor(self, proc):
if proc.id in self.procs:
del self.procs[proc.id]
def get_proc(self, id):
return self.procs.get(id, None)
def write_packet(self, pkt):
outbuf = self._bev.output
outbuf.add(pkt.header.to_string())
outbuf.add(pkt.body)
def send_response(self, id, code, code_msg, content=''):
id = to_response_id(id)
pkt = response_to_packet(id, code, code_msg, content)
self.write_packet(pkt)
def handle_packet(self, pkt):
ptype = pkt.header.ptype
if ptype == CCNET_MSG_REQUEST:
self.handle_request(pkt.header.id, pkt.body)
elif ptype == CCNET_MSG_UPDATE:
code, code_msg, content = parse_update(pkt.body)
self.handle_update(pkt.header.id, code, code_msg, content)
elif ptype == CCNET_MSG_RESPONSE:
code, code_msg, content = parse_response(pkt.body)
self.handle_response(pkt.header.id, code, code_msg, content)
else:
logging.warning("unknown packet type %d", ptype)
def handle_request(self, id, req):
commands = req.split()
self.create_slave_processor(to_slave_id(id), commands)
def create_slave_processor(self, id, commands):
peer_id = self.peer_id
if commands[0] == 'remote':
if len(commands) < 3:
logging.warning("invalid request %s", commands)
return
peer_id = commands[1]
commands = commands[2:]
proc_name = commands[0]
if not proc_name in self.proc_types:
logging.warning("unknown processor type %s", proc_name)
return
cls = self.proc_types[proc_name]
proc = cls(proc_name, id, peer_id, self)
self.add_processor(proc)
proc.start(*commands[1:])
def create_master_processor(self, proc_name):
id = self.get_request_id()
cls = self.proc_types.get(proc_name, None)
if cls == None:
logging.error('unknown processor type %s', proc_name)
return None
proc = cls(proc_name, id, self.peer_id, self)
self.add_processor(proc)
return proc
def handle_update(self, id, code, code_msg, content):
proc = self.get_proc(to_slave_id(id))
if proc == None:
if code != SC_PROC_DEAD:
self.send_response(id, SC_PROC_DEAD, SS_PROC_DEAD)
return
if code[0] == '5':
logging.info('shutdown processor %s(%d): %s %s\n',
proc.name, to_packet_id(proc.id), code, code_msg)
if code == SC_UNKNOWN_SERVICE:
proc.shutdown(PROC_NO_SERVICE)
elif code == SC_PERM_ERR:
proc.shutdown(PROC_PERM_ERR)
else:
proc.shutdown(PROC_BAD_RESP)
elif code == SC_PROC_KEEPALIVE:
proc.send_response(SC_PROC_KEEPALIVE, SS_PROC_KEEPALIVE)
elif code == SC_PROC_DEAD:
logging.info('shutdown processor %s(%d): when peer(%.8s) processor is dead\n',
proc.name, to_packet_id(proc.id), proc.peer_id)
proc.shutdown(PROC_REMOTE_DEAD)
elif code == SC_PROC_DONE:
proc.done(True)
else:
proc.handle_update(code, code_msg, content)
def handle_response(self, id, code, code_msg, content):
proc = self.get_proc(to_master_id(id))
if proc == None:
if code != SC_PROC_DEAD:
self.send_update(id, SC_PROC_DEAD, SS_PROC_DEAD)
return
if code[0] == '5':
logging.info('shutdown processor %s(%d): %s %s\n',
proc.name, to_packet_id(proc.id), code, code_msg)
if code == SC_UNKNOWN_SERVICE:
proc.shutdown(PROC_NO_SERVICE)
elif code == SC_PERM_ERR:
proc.shutdown(PROC_PERM_ERR)
else:
proc.shutdown(PROC_BAD_RESP)
elif code == SC_PROC_KEEPALIVE:
proc.send_update(id, SC_PROC_KEEPALIVE, SS_PROC_KEEPALIVE)
elif code == SC_PROC_DEAD:
logging.info('shutdown processor %s(%d): when peer(%.8s) processor is dead\n',
proc.name, to_packet_id(proc.id), proc.peer_id)
proc.shutdown(PROC_REMOTE_DEAD)
else:
proc.handle_response(code, code_msg, content)
def register_processor(self, proc_name, proc_type):
assert Processor in proc_type.mro()
self.proc_types[proc_name] = proc_type
def register_processors(self):
self.register_processor("send-cmd", SendCmdProc)
self.register_processor("mq-client", MqClientProc)
def register_service(self, service, group, proc_type, callback=None):
self.register_processor(service, proc_type)
cmd = 'register-service %s %s' % (service, group)
self.send_cmd(cmd, callback)
def send_cmd(self, cmd, callback=None):
proc = self.create_master_processor("send-cmd")
if callback:
proc.set_callback(callback)
proc.start()
proc.send_cmd(cmd)
def _read_cb(self, bev, cb_data):
dummy = bev, cb_data
inbuf = self._bev.input
while (True):
raw = inbuf.copyout(CCNET_HEADER_LENGTH)
header = parse_header(raw)
if len(inbuf) < CCNET_HEADER_LENGTH + header.length:
break
inbuf.drain(CCNET_HEADER_LENGTH)
data = inbuf.copyout(header.length)
pkt = Packet(header, data)
self.handle_packet(pkt)
inbuf.drain(header.length)
if len(inbuf) < CCNET_HEADER_LENGTH:
break
def _event_cb(self, bev, what, cb_data):
dummy = bev, cb_data
logging.warning('libevent error: what = %s' % what)
if what & libevent.BEV_EVENT_EOF or \
what & libevent.BEV_EVENT_ERROR or \
what & libevent.BEV_EVENT_READING or \
what & libevent.BEV_EVENT_WRITING:
if self._bev is not None:
self._bev = None
raise NetworkError('libevent error: what = %s' % what)
def base_loop(self):
'''Create an event base -> register socket events -> loop'''
self._bev = libevent.BufferEvent(self._evbase,
self._connfd.fileno())
self._bev.set_watermark(libevent.EV_READ,
CCNET_HEADER_LENGTH, # low wartermark
CCNET_MAX_PACKET_LENGTH * 2) # highmark
self._bev.set_callbacks(self._read_cb, # read callback
None, # write callback
self._event_cb) # event callback
self._bev.enable(libevent.EV_READ | libevent.EV_WRITE)
self._evbase.loop()
def main_loop(self):
self.base_loop()

View File

@@ -0,0 +1,50 @@
import logging
from .processor import Processor
from ccnet.message import message_from_string, message_to_string
INIT = 0
REQUEST_SENT = 1
READY = 2
SC_MSG = '300'
SC_UNSUBSCRIBE = '301'
class MqClientProc(Processor):
def __init__(self, *args, **kwargs):
Processor.__init__(self, *args, **kwargs)
self.state = INIT
self.callback = None
def start(self, *argv):
req = 'mq-server ' + ' '.join(argv)
self.send_request(req)
self.state = REQUEST_SENT
def set_callback(self, cb):
self.callback = cb
def handle_response(self, code, code_msg, content):
if self.state == REQUEST_SENT:
if code[0] != '2':
logging.warning('bad response: %s %s\n', code, code_msg)
self.done(False)
self.state = READY
elif self.state == READY:
if code[0] != '2' and code[0] != '3':
logging.warning('bad response: %s %s\n', code, code_msg)
return
if code[0] == '3' and code[2] == '0':
msg = message_from_string(content[:-1])
if self.callback:
self.callback(msg)
def put_message(self, msg):
buf = message_to_string(msg)
self.send_update(SC_MSG, '', buf + '\000')
def unsubscribe(self):
self.send_update(SC_UNSUBSCRIBE, '')
self.done(True)

View File

@@ -0,0 +1,57 @@
import logging
from ccnet.packet import SLAVE_BIT_MASK, to_print_id
from ccnet.status_code import SC_PROC_DONE, SS_PROC_DONE, PROC_DONE
class Processor(object):
"""Base processor class"""
name = "Processor"
def __init__(self, name, id, peer_id, client):
self.name = name
self.id = id
self.peer_id = peer_id
self.client = client
def start(self, *args, **kwargs):
raise NotImplementedError
def handle_request(self, *args, **kwargs):
raise NotImplementedError
def handle_update(self, *args, **kwargs):
raise NotImplementedError
def handle_response(self, *args, **kwargs):
raise NotImplementedError
def __str__(self):
return "<proc %s(%d)>" % (self.name, to_print_id(self.id))
def is_master(self):
return not (self.id & SLAVE_BIT_MASK)
def send_request(self, buf):
assert self.is_master()
return self.client.send_request(self.id, buf)
def send_response(self, code, code_msg, content=''):
assert not self.is_master()
return self.client.send_response(self.id, code, code_msg, content)
def send_update(self, code, code_msg, content=''):
assert self.is_master()
return self.client.send_update(self.id, code, code_msg, content)
def done(self, success):
if self.is_master() and success:
self.send_update(SC_PROC_DONE, SS_PROC_DONE, '')
self.client.remove_processor(self)
def shutdown(self, reason):
if reason > PROC_DONE:
logging.debug('shut down %s: %s', self, reason)
self.client.remove_processor(self)

View File

@@ -0,0 +1,43 @@
from pysearpc import searpc_server
from ccnet.status_code import SC_OK, SS_OK
from ccnet.status_code import SC_SERVER_RET, SS_SERVER_RET, SC_SERVER_MORE, SS_SERVER_MORE, \
SC_CLIENT_CALL, SC_CLIENT_MORE, SC_CLIENT_CALL_MORE
from .processor import Processor
class RpcServerProc(Processor):
name = 'rpcserver-proc'
max_transfer_length = 65535 - 128
def __init__(self, *args, **kwargs):
Processor.__init__(self, *args, **kwargs)
self.fretstr = ''
self.fcallstr = ''
def start(self, *argv):
self.send_response(SC_OK, SS_OK, '')
def send_fret(self):
maxlen = self.max_transfer_length
l = len(self.fretstr)
if l < maxlen:
self.send_response(SC_SERVER_RET, SS_SERVER_RET, self.fretstr)
self.fretstr = ''
else:
buf = self.fretstr[:maxlen]
self.send_response(SC_SERVER_MORE, SS_SERVER_MORE, buf)
self.fretstr = self.fretstr[maxlen:]
def handle_update(self, code, code_msg, content):
if code == SC_CLIENT_CALL_MORE:
self.fcallstr += content
return
elif code == SC_CLIENT_CALL:
self.fcallstr += content
self.fretstr = searpc_server.call_function(self.name, self.fcallstr)
self.fcallstr = ''
self.send_fret()
elif code == SC_CLIENT_MORE:
self.send_fret()

View File

@@ -0,0 +1,34 @@
import logging
from .processor import Processor
INIT = 0
REQUET_SENT = 1
CONNECTED = 2
class SendCmdProc(Processor):
name = "send-cmd"
def __init__(self, *args, **kwargs):
Processor.__init__(self, *args, **kwargs)
self.callback = None
self.state = INIT
def start(self, *argv):
self.send_request('receive-cmd')
self.state = REQUET_SENT
def set_callback(self, cb):
self.callback = cb
def send_cmd(self, cmd):
self.send_update('200', '', cmd + '\000')
def handle_response(self, code, code_msg, content):
if code[0] != '2':
logging.warning("Received bad response %s %s", code, code_msg)
if self.state == REQUET_SENT:
self.state = CONNECTED
elif self.state == CONNECTED:
if self.callback:
self.callback(code, code_msg, content)

View File

@@ -0,0 +1,21 @@
import libevent
import logging
class Timer(object):
'''Wraps aroud a libevent timeout event'''
def __init__(self, ev_base, timeout):
self._timeout = timeout
self._evtimer = libevent.Timer(ev_base, self._callback, None)
self._evtimer.add(timeout) # pylint: disable=E1101
def _callback(self, evtimer, user_data):
dummy = user_data
try:
self.callback()
except:
logging.exception('error in timer callback:')
evtimer.add(self._timeout)
def callback(self):
raise NotImplementedError

145
python/ccnet/client.py Normal file
View File

@@ -0,0 +1,145 @@
#coding: UTF-8
import os
import socket
import ConfigParser
import logging
from ccnet.packet import to_request_id, to_update_id
from ccnet.packet import request_to_packet, update_to_packet
from ccnet.packet import write_packet
from ccnet.errors import NetworkError
from .utils import is_win32, make_socket_closeonexec
CCNET_PIPE_NAME = 'ccnet.sock'
def parse_response(body):
'''Parse the content of the response
The struct of response data:
- first 3 bytes is the <status code>
- from the 4th byte to the first occurrence of '\n' is the <status message>. If the 4th byte is '\n', then there is no <status message>
- from the first occurrence of '\n' to the end is the <content>
'''
code = body[:3]
if body[3] == '\n':
code_msg = ''
content = body[4:]
else:
pos = body.index('\n')
code_msg = body[4:pos]
content = body[pos + 1:]
return code, code_msg, content
def parse_update(body):
'''The structure of an update is the same with a response'''
code = body[:3]
if body[3] == '\n':
code_msg = ''
content = body[4:]
else:
pos = body.index('\n')
code_msg = body[4:pos]
content = body[pos + 1:]
return code, code_msg, content
class Client(object):
'''Base ccnet client class'''
def __init__(self, config_dir, central_config_dir=None):
if not isinstance(config_dir, unicode):
config_dir = config_dir.decode('UTF-8')
if central_config_dir:
central_config_dir = os.path.expanduser(central_config_dir)
if not os.path.exists(central_config_dir):
raise RuntimeError(u'%s does not exits' % central_config_dir)
config_dir = os.path.expanduser(config_dir)
config_file = os.path.join(central_config_dir if central_config_dir else config_dir,
u'ccnet.conf')
logging.debug('using config file %s', config_file)
if not os.path.exists(config_file):
raise RuntimeError(u'%s does not exits' % config_file)
self.central_config_dir = central_config_dir
self.config_dir = config_dir
self.config_file = config_file
self.config = None
self.port = None
self.peer_id = None
self.peer_name = None
self.parse_config()
self._connfd = None
self._req_id = 1000
def __del__(self):
'''Destructor of the client class. We close the socket here, if
connetced to daemon
'''
if self.is_connected():
try:
self._connfd.close()
except:
pass
def parse_config(self):
self.config = ConfigParser.ConfigParser()
self.config.read(self.config_file)
self.port = self.config.getint('Client', 'PORT')
self.un_path = ''
if self.config.has_option('Client', 'UNIX_SOCKET'):
self.un_path = self.config.get('Client', 'UNIX_SOCKET')
self.peer_id = self.config.get('General', 'ID')
self.peer_name = self.config.get('General', 'NAME')
def connect_daemon_with_pipe(self):
self._connfd = socket.socket(socket.AF_UNIX)
if not self.un_path:
pipe_name = os.path.join(self.config_dir, CCNET_PIPE_NAME)
else:
pipe_name = self.un_path
try:
self._connfd.connect(pipe_name)
except:
raise NetworkError("Can't connect to daemon")
make_socket_closeonexec(self._connfd.fileno())
def connect_daemon_with_socket(self):
self._connfd = socket.socket()
self._connfd.setsockopt(socket.SOL_TCP, socket.TCP_NODELAY, 1)
try:
self._connfd.connect(('127.0.0.1', self.port))
except:
raise NetworkError("Can't connect to daemon")
make_socket_closeonexec(self._connfd.fileno())
def connect_daemon(self):
if is_win32():
return self.connect_daemon_with_socket()
else:
return self.connect_daemon_with_pipe()
def is_connected(self):
return self._connfd is not None
def send_request(self, id, req):
id = to_request_id(id)
pkt = request_to_packet(id, req)
write_packet(self._connfd, pkt)
def send_update(self, id, code, code_msg, content=''):
id = to_update_id(id)
pkt = update_to_packet(id, code, code_msg, content)
write_packet(self._connfd, pkt)
def get_request_id(self):
self._req_id += 1
return self._req_id

7
python/ccnet/errors.py Normal file
View File

@@ -0,0 +1,7 @@
class NetworkError(Exception):
def __init__(self, msg):
Exception.__init__(self)
self.msg = msg
def __str__(self):
return self.msg

47
python/ccnet/message.py Normal file
View File

@@ -0,0 +1,47 @@
#coding: UTF-8
'''Message is the carrier of a simple Pub/Sub system on top of ccnet'''
import datetime
import re
import uuid
import time
MESSAGE_PATTERN = re.compile(r'(?P<flags>[\d]+) (?P<from>[^ ]+) (?P<to>[^ ]+) (?P<id>[^ ]+) (?P<ctime>[^ ]+) (?P<rtime>[^ ]+) (?P<app>[^ ]+) (?P<body>.*)')
class Message(object):
def __init__(self, d):
self.flags = int(d['flags'])
self.from_ = d['from']
self.to = d['to']
self.id = d['id']
self.ctime = float(d['ctime'])
self.rtime = float(d['rtime'])
self.app = d['app']
self.body = d['body']
def message_from_string(s):
results = MESSAGE_PATTERN.match(s)
if results is None:
raise RuntimeError('Bad message: %s' % s)
d = results.groupdict()
return Message(d)
def gen_inner_message_string(self_id, app, content):
result = "%d %s %s %s %d %d %s %s\000" % (0, self_id, self_id, str(uuid.uuid1()),
int(time.time()), 0,
app, content)
return result
def message_to_string(msg):
f = '%(flags)s %(from_)s %(to)s %(id)s %(ctime)s %(rtime)s %(app)s %(body)s'
return f % dict(flags=msg.flags,
from_=msg.from_,
to=msg.to,
id=msg.id,
ctime=msg.ctime,
rtime=msg.rtime,
app=msg.app,
body=msg.body)

129
python/ccnet/packet.py Normal file
View File

@@ -0,0 +1,129 @@
#coding: UTF-8
"""
Packet level protocol of ccnet.
About various types of id:
- A slave processor's id has its highest bit set; a master processor has its highest bit clear
- The <id> field of a ccnet packet always has its highest bit clear. The
<type> field of the packet determines what type of the packet is (a
request, a response, or an update)
"""
import logging
import struct
from ccnet.utils import recvall, sendall, NetworkError
REQUEST_ID_MASK = 0x7fffffff
SLAVE_BIT_MASK = 0x80000000
CCNET_MSG_OK = 0
CCNET_MSG_HANDSHAKE = 1
CCNET_MSG_REQUEST = 2
CCNET_MSG_RESPONSE = 3
CCNET_MSG_UPDATE = 4
CCNET_MSG_RELAY = 5
def to_request_id(id):
return id & REQUEST_ID_MASK
to_response_id = to_request_id
to_update_id = to_request_id
to_master_id = to_request_id
to_packet_id = to_request_id
def to_slave_id(id):
return id | SLAVE_BIT_MASK
def to_print_id(id):
if id & SLAVE_BIT_MASK:
return -to_request_id(id)
else:
return id
# the byte sequence of ccnet packet header
CCNET_HEADER_FORMAT = '>BBHI'
# Number of bytes for the header
CCNET_HEADER_LENGTH = struct.calcsize(CCNET_HEADER_FORMAT)
CCNET_MAX_PACKET_LENGTH = 65535
class PacketHeader(object):
def __init__(self, ver, ptype, length, id):
self.ver = ver
self.ptype = ptype
self.length = length
self.id = id
def to_string(self):
return struct.pack(CCNET_HEADER_FORMAT, self.ver, self.ptype, self.length, self.id)
def __str__(self):
return "<PacketHeader: type = %d, length = %d, id = %u>" % (self.ptype, self.length, self.id)
class Packet(object):
version = 1
def __init__(self, header, body):
self.header = header
self.body = body
def parse_header(buf):
try:
ver, ptype, length, id = struct.unpack(CCNET_HEADER_FORMAT, buf)
except struct.error, e:
raise NetworkError('error when unpack packet header: %s' % e)
return PacketHeader(ver, ptype, length, id)
def format_response(code, code_msg, content):
body = code
if code_msg:
body += " " + code_msg
body += "\n"
if content:
body += content
return body
format_update = format_response
def request_to_packet(id, buf):
hdr = PacketHeader(1, CCNET_MSG_REQUEST, len(buf), to_request_id(id))
return Packet(hdr, buf)
def response_to_packet(id, code, code_msg, content):
body = format_response(code, code_msg, content)
hdr = PacketHeader(1, CCNET_MSG_RESPONSE, len(body), to_response_id(id))
return Packet(hdr, body)
def update_to_packet(id, code, code_msg, content):
body = format_update(code, code_msg, content)
hdr = PacketHeader(1, CCNET_MSG_UPDATE, len(body), to_update_id(id))
return Packet(hdr, body)
def read_packet(fd):
hdr = recvall(fd, CCNET_HEADER_LENGTH)
if len(hdr) == 0:
logging.warning('connection to daemon is lost')
raise NetworkError('Connection to daemon is lost')
elif len(hdr) < CCNET_HEADER_LENGTH:
raise NetworkError('Only read %d bytes header, expected 8' % len(hdr))
header = parse_header(hdr)
if header.length == 0:
body = ''
else:
body = recvall(fd, header.length)
if len(body) < header.length:
raise NetworkError('Only read %d bytes body, expected %d' % (len(body), header.length))
return Packet(header, body)
def write_packet(fd, packet):
hdr = packet.header.to_string()
sendall(fd, hdr)
sendall(fd, packet.body)

36
python/ccnet/pool.py Normal file
View File

@@ -0,0 +1,36 @@
from ccnet.sync_client import SyncClient
import Queue
class ClientPool(object):
"""ccnet client pool."""
def __init__(self, conf_dir, pool_size=5, central_config_dir=None):
"""
:param central_config_dir: path to the central config dir for ccnet/seafile/seahub/seafdav etc.
:param conf_dir: the ccnet configuration directory
:param pool_size:
"""
self.central_config_dir = central_config_dir
self.conf_dir = conf_dir
self.pool_size = pool_size
self._pool = Queue.Queue(pool_size)
def _create_client(self):
client = SyncClient(self.conf_dir, self.central_config_dir)
client.req_ids = {}
client.connect_daemon()
return client
def get_client(self):
try:
client = self._pool.get(False)
except:
client = self._create_client()
return client
def return_client(self, client):
try:
self._pool.put(client, False)
except Queue.Full:
pass

410
python/ccnet/rpc.py Normal file
View File

@@ -0,0 +1,410 @@
from pysearpc import SearpcClient, searpc_func, SearpcError
from ccnet.status_code import SC_CLIENT_CALL, SS_CLIENT_CALL, \
SC_CLIENT_MORE, SS_CLIENT_MORE, SC_SERVER_RET, \
SC_SERVER_MORE, SC_PROC_DEAD
from ccnet.errors import NetworkError
class DeadProcError(Exception):
def __str__(self):
return "Processor is dead"
class RpcClientBase(SearpcClient):
def __init__(self, ccnet_client_pool, service_name, retry_num=1,
is_remote=False, remote_peer_id='', req_pool=False):
SearpcClient.__init__(self)
self.pool = ccnet_client_pool
self.service_name = service_name
self.retry_num = retry_num
self.is_remote = is_remote
self.remote_peer_id = remote_peer_id
self.req_pool = req_pool
if self.is_remote and len(self.remote_peer_id) != 40:
raise ValueError("Invalid remote peer id")
def _start_service(self, client):
req_id = client.get_request_id()
req_str = self.service_name
if self.is_remote:
req_str = "remote " + self.remote_peer_id + " " + self.service_name
client.send_request(req_id, req_str)
rsp = client.read_response()
if rsp.code != "200":
raise SearpcError("Error received: %s %s (In _start_service)" % (rsp.code, rsp.code_msg))
return req_id
def _real_call(self, client, req_id, fcall_str):
client.send_update(req_id, SC_CLIENT_CALL, SS_CLIENT_CALL, fcall_str)
rsp = client.read_response()
if rsp.code == SC_SERVER_RET:
return rsp.content
elif rsp.code == SC_SERVER_MORE:
buf = rsp.content
while True:
client.send_update(req_id, SC_CLIENT_MORE,
SS_CLIENT_MORE, '')
rsp = client.read_response()
if rsp.code == SC_SERVER_MORE:
buf += rsp.content
elif rsp.code == SC_SERVER_RET:
buf += rsp.content
break
else:
raise SearpcError("Error received: %s %s (In Read More)" % (rsp.code, rsp.code_msg))
return buf
elif rsp.code == SC_PROC_DEAD:
raise DeadProcError()
else:
raise SearpcError("Error received: %s %s" % (rsp.code, rsp.code_msg))
def call_remote_func_sync(self, fcall_str):
"""Call remote function `fcall_str` and wait response."""
retried = 0
while True:
try:
client = self.pool.get_client()
if self.req_pool:
req_id = client.req_ids.get(self.service_name, -1)
if req_id == -1:
req_id = self._start_service(client)
client.req_ids[self.service_name] = req_id
try:
ret = self._real_call(client, req_id, fcall_str)
except DeadProcError:
client.req_ids[self.service_name] = -1
self.pool.return_client(client)
if retried < self.retry_num:
retried = retried + 1
continue
else:
raise
self.pool.return_client(client)
return ret
else:
# no req pool
req_id = self._start_service(client)
ret = self._real_call(client, req_id, fcall_str)
client.send_update(req_id, "103", "service is done", "")
self.pool.return_client(client)
return ret
except (NetworkError, SearpcError):
# the client is not returned to the pool and is freed automatically
if retried < self.retry_num:
retried = retried + 1
continue
else:
raise
class CcnetRpcClient(RpcClientBase):
def __init__(self, ccnet_client_pool, retry_num=1, *args, **kwargs):
RpcClientBase.__init__(self, ccnet_client_pool, "ccnet-rpcserver",
*args, **kwargs)
@searpc_func("string", [])
def list_peers(self):
pass
@searpc_func("objlist", [])
def list_resolving_peers(self):
pass
@searpc_func("objlist", ["string"])
def get_peers_by_role(self):
pass
@searpc_func("object", ["string"])
def get_peer(self):
pass
@searpc_func("object", [])
def get_session_info(self):
pass
@searpc_func("int", ["string"])
def add_client(self):
pass
@searpc_func("int", ["string", "string"])
def add_role(self, peer_id, role):
pass
@searpc_func("int", ["string", "string"])
def remove_role(self, peer_id, role):
pass
@searpc_func("objlist", ["int", "int"])
def get_procs_alive(self, offset, limit):
pass
@searpc_func("int", [])
def count_procs_alive(self):
pass
@searpc_func("objlist", ["int", "int"])
def get_procs_dead(self, offset, limit):
pass
@searpc_func("int", [])
def count_procs_dead(self):
pass
@searpc_func("string", ["string"])
def get_config(self, key):
pass
@searpc_func("int", ["string", "string"])
def set_config(self, key, value):
pass
@searpc_func("objlist", [])
def list_peer_stat(self, key, value):
pass
class CcnetThreadedRpcClient(RpcClientBase):
def __init__(self, ccnet_client_pool, retry_num=1, *args, **kwargs):
RpcClientBase.__init__(self, ccnet_client_pool, "ccnet-threaded-rpcserver",
*args, **kwargs)
@searpc_func("int", ["string", "string", "int", "int"])
def add_emailuser(self, email, passwd, is_staff, is_active):
pass
@searpc_func("int", ["string", "string"])
def remove_emailuser(self, source, email):
pass
@searpc_func("int", ["string", "string"])
def validate_emailuser(self, email, passwd):
pass
@searpc_func("object", ["string"])
def get_emailuser(self, email):
pass
@searpc_func("object", ["string"])
def get_emailuser_with_import(self, email):
pass
@searpc_func("object", ["int"])
def get_emailuser_by_id(self, user_id):
pass
@searpc_func("objlist", ["string", "int", "int", "string"])
def get_emailusers(self, source, start, limit, status):
pass
@searpc_func("objlist", ["string", "string", "int", "int"])
def search_emailusers(self, source, email_patt, start, limit):
pass
@searpc_func("objlist", ["string", "int", "int"])
def search_ldapusers(self, keyword, start, limit):
pass
@searpc_func("int64", ["string"])
def count_emailusers(self, source):
pass
@searpc_func("int64", ["string"])
def count_inactive_emailusers(self, source):
pass
@searpc_func("objlist", ["string"])
def filter_emailusers_by_emails(self):
pass
@searpc_func("int", ["string", "int", "string", "int", "int"])
def update_emailuser(self, source, user_id, password, is_staff, is_active):
pass
@searpc_func("int", ["string", "string"])
def update_role_emailuser(self, email, role):
pass
@searpc_func("objlist", [])
def get_superusers(self):
pass
@searpc_func("int", ["string", "string"])
def add_binding(self, email, peer_id):
pass
@searpc_func("string", ["string"])
def get_binding_email(self, peer_id):
pass
@searpc_func("string", ["string"])
def get_binding_peerids(self, email):
pass
@searpc_func("int", ["string"])
def remove_binding(self, email):
pass
@searpc_func("int", ["string", "string"])
def remove_one_binding(self, email, peer_id):
pass
@searpc_func("objlist", ["string"])
def get_peers_by_email(self, email):
pass
@searpc_func("int", ["string", "string", "string"])
def create_group(self, group_name, user_name, gtype):
pass
@searpc_func("int", ["int", "string", "string"])
def create_org_group(self, org_id, group_name, user_name):
pass
@searpc_func("int", ["int"])
def remove_group(self, group_id):
pass
@searpc_func("int", ["int", "string", "string"])
def group_add_member(self, group_id, user_name, member_name):
pass
@searpc_func("int", ["int", "string", "string"])
def group_remove_member(self, group_id, user_name, member_name):
pass
@searpc_func("int", ["int", "string"])
def group_set_admin(self, group_id, member_name):
pass
@searpc_func("int", ["int", "string"])
def group_unset_admin(self, group_id, member_name):
pass
@searpc_func("int", ["int", "string"])
def set_group_name(self, group_id, group_name):
pass
@searpc_func("int", ["int", "string"])
def quit_group(self, group_id, user_name):
pass
@searpc_func("objlist", ["string"])
def get_groups(self, user_name):
pass
@searpc_func("objlist", ["int", "int", "string"])
def get_all_groups(self, start, limit, source):
pass
@searpc_func("object", ["int"])
def get_group(self, group_id):
pass
@searpc_func("objlist", ["int"])
def get_group_members(self, group_id):
pass
@searpc_func("int", ["int", "string"])
def check_group_staff(self, group_id, username):
pass
@searpc_func("int", ["string"])
def remove_group_user(self, username):
pass
@searpc_func("int", ["int", "string"])
def is_group_user(self, group_id, user):
pass
@searpc_func("int", ["int", "string"])
def set_group_creator(self, group_id, user_name):
pass
@searpc_func("int", ["string", "string", "string"])
def create_org(self, org_name, url_prefix, creator):
pass
@searpc_func("int", ["int"])
def remove_org(self, org_id):
pass
@searpc_func("objlist", ["int", "int"])
def get_all_orgs(self, start, limit):
pass
@searpc_func("int64", [])
def count_orgs(self):
pass
@searpc_func("object", ["string"])
def get_org_by_url_prefix(self, url_prefix):
pass
@searpc_func("object", ["string"])
def get_org_by_id(self, org_id):
pass
@searpc_func("int", ["int", "string", "int"])
def add_org_user(self, org_id, email, is_staff):
pass
@searpc_func("int", ["int", "string"])
def remove_org_user(self, org_id, email):
pass
@searpc_func("objlist", ["string"])
def get_orgs_by_user(self, email):
pass
@searpc_func("objlist", ["string", "int", "int"])
def get_org_emailusers(self, url_prefix, start, limit):
pass
@searpc_func("int", ["int", "int"])
def add_org_group(self, org_id, group_id):
pass
@searpc_func("int", ["int", "int"])
def remove_org_group(self, org_id, group_id):
pass
@searpc_func("int", ["int"])
def is_org_group(self, group_id):
pass
@searpc_func("int", ["int"])
def get_org_id_by_group(self, group_id):
pass
@searpc_func("objlist", ["int", "int", "int"])
def get_org_groups(self, org_id, start, limit):
pass
@searpc_func("int", ["int", "string"])
def org_user_exists(self, org_id, email):
pass
@searpc_func("int", ["int", "string"])
def is_org_staff(self, org_id, user):
pass
@searpc_func("int", ["int", "string"])
def set_org_staff(self, org_id, user):
pass
@searpc_func("int", ["int", "string"])
def unset_org_staff(self, org_id, user):
pass
@searpc_func("int", ["int", "string"])
def set_org_name(self, org_id, org_name):
pass

View File

@@ -0,0 +1,77 @@
#coding: UTF-8
'''Status code and status messages used in ccnet. Should be treated as constants'''
EC_NETWORK_ERR = 1
ES_NETWORK_ERR = 'Network Error'
SC_PROC_KEEPALIVE = '100'
SS_PROC_KEEPALIVE = 'processor keep alive'
SC_PROC_ALIVE = '101'
SS_PROC_ALIVE = 'processor is alive'
SC_PROC_DEAD = '102'
SS_PROC_DEAD = 'processor is dead'
SC_PROC_DONE = '103'
SS_PROC_DONE = 'service is done'
SC_OK = '200'
SS_OK = 'OK'
SC_SERV_EXISTED = '210'
SS_SERV_EXISTED = 'The service existed'
SC_PERM_CHECKING = '250'
SS_PERM_CHECKING = 'Permission Checking'
SC_SHUTDOWN = '500'
SS_SHUTDOWN = 'Shutdown'
SC_CREATE_PROC_ERR = '501'
SS_CREATE_PROC_ERR = 'Create Processor Error'
SC_BAD_PEER = '502'
SS_BAD_PEER = 'Bad peer id'
SC_BAD_USER = '502'
SS_BAD_USER = 'Bad user id'
SC_BAD_ARGS = '503'
SS_BAD_ARGS = 'Bad arguments'
SC_PERM_ERR = '504'
SS_PERM_ERR = 'Permission Error'
SC_BAD_UPDATE_CODE = '506'
SS_BAD_UPDATE_CODE = 'Bad update code'
SC_BAD_RESPONSE_CODE = '507'
SS_BAD_RESPONSE_CODE = 'Bad response code'
SC_VERSION_MISMATCH = '508'
SS_VERSION_MISMATCH = 'Version Mismatch'
SC_UNKNOWN_PEER = '510'
SS_UNKNOWN_PEER = 'Unknown peer'
SC_UNKNOWN_SERVICE = '511'
SS_UNKNOWN_SERVICE = 'Unknown service'
SC_PEER_UNREACHABLE = '512'
SS_PEER_UNREACHABLE = 'Peer Unreachable'
SC_CON_TIMEOUT = '513'
SS_CON_TIMEOUT = 'connection timeout'
SC_KEEPALIVE_TIMEOUT = '514'
SS_KEEPALIVE_TIMEOUT = 'keepalive timeout'
SC_NETDOWN = '515'
SS_NETDOWN = 'peer down'
PROC_NOTSET = 0
PROC_DONE = 1
PROC_REMOTE_DEAD = 2
PROC_NO_SERVICE = 3
PROC_PERM_ERR = 4
PROC_BAD_RESP = 5
SC_CLIENT_CALL = '301'
SS_CLIENT_CALL = 'CLIENT CALL'
SC_CLIENT_MORE = '302'
SS_CLIENT_MORE = 'MORE'
SC_CLIENT_CALL_MORE = '303'
SS_CLIENT_CALL_MORE = 'CLIENT HAS MORE'
SC_SERVER_RET = '311'
SS_SERVER_RET = 'SERVER RET'
SC_SERVER_MORE = '312'
SS_SERVER_MORE = 'HAS MORE'
SC_SERVER_ERR = '411'
SS_SERVER_ERR = 'Fail to invoke the function, check the function'

View File

@@ -0,0 +1,94 @@
from ccnet.client import Client, parse_response
from ccnet.packet import read_packet, CCNET_MSG_RESPONSE
from ccnet.status_code import SC_PROC_DONE, SS_PROC_DONE
from ccnet.message import message_from_string, gen_inner_message_string
_REQ_ID_START = 1000
class Response(object):
def __init__(self, code, code_msg, content):
self.code = code
self.code_msg = code_msg
self.content = content
class SyncClient(Client):
'''sync mode client'''
def __init__(self, config_dir, central_config_dir=None):
Client.__init__(self, config_dir, central_config_dir)
self._req_id = _REQ_ID_START
self.mq_req_id = -1
def disconnect_daemon(self):
if self.is_connected():
try:
self._connfd.close()
except:
pass
def read_response(self):
packet = read_packet(self._connfd)
if packet.header.ptype != CCNET_MSG_RESPONSE:
raise RuntimeError('Invalid Response')
code, code_msg, content = parse_response(packet.body)
return Response(code, code_msg, content)
def send_cmd(self, cmd):
req_id = self.get_request_id()
self.send_request(req_id, 'receive-cmd')
resp = self.read_response()
if resp.code != '200':
raise RuntimeError('Failed to send-cmd: %s %s' % (resp.code, resp.code_msg))
cmd += '\000'
self.send_update(req_id, '200', '', cmd)
resp = self.read_response()
if resp.code != '200':
raise RuntimeError('Failed to send-cmd: %s %s' % (resp.code, resp.code_msg))
self.send_update(req_id, SC_PROC_DONE, SS_PROC_DONE, '')
def prepare_recv_message(self, msg_type):
request = 'mq-server %s' % msg_type
req_id = self.get_request_id()
self.send_request(req_id, request)
resp = self.read_response()
if resp.code != '200':
raise RuntimeError('bad response: %s %s' % (resp.code, resp.code_msg))
def receive_message(self):
resp = self.read_response()
# the message from ccnet daemon has the trailing null byte included
msg = message_from_string(resp.content[:-1])
return msg
def prepare_send_message(self):
request = 'mq-server'
mq_req_id = self.get_request_id()
self.send_request(mq_req_id, request)
resp = self.read_response()
if resp.code != '200':
raise RuntimeError('bad response: %s %s' % (resp.code, resp.code_msg))
self.mq_req_id = mq_req_id
def send_message(self, msg_type, content):
if self.mq_req_id == -1:
self.prepare_send_message()
msg = gen_inner_message_string(self.peer_id, msg_type, content)
self.send_update(self.mq_req_id, "300", '', msg)
resp = self.read_response()
if resp.code != '200':
self.mq_req_id = -1
raise RuntimeError('bad response: %s %s' % (resp.code, resp.code_msg))
def register_service_sync(self, service, group):
'''Mainly used by a program to register a dummy service to ensure only
single instance of that program is running
'''
cmd = 'register-service %s %s' % (service, group)
self.send_cmd(cmd)

View File

@@ -0,0 +1,62 @@
import sys
import os
from ccnet import ClientPool, RpcClientBase
from pysearpc import searpc_func
import threading
import logging
RPC_SERVICE_NAME = 'test-rpcserver'
CCNET_CONF_DIR = os.path.expanduser('~/.ccnet')
class TestRpcClient(RpcClientBase):
def __init__(self, client_pool, *args, **kwargs):
RpcClientBase.__init__(self, client_pool, RPC_SERVICE_NAME, *args, **kwargs)
@searpc_func('string', ['string', 'int'])
def str_mul(self, s, n):
pass
class Worker(threading.Thread):
def __init__(self, rpc):
threading.Thread.__init__(self)
self.rpc = rpc
def run(self):
s = 'abcdef'
n = 100
assert self.rpc.str_mul(s, n) == s * n
def test(n):
rpcclient = TestRpcClient(ClientPool(CCNET_CONF_DIR, CCNET_CONF_DIR))
workers = []
for i in xrange(n):
t = Worker(rpcclient)
t.start()
workers.append(t)
for t in workers:
t.join()
def setup_logging():
kw = {
'format': '[%(asctime)s][%(module)s]: %(message)s',
'datefmt': '%m/%d/%Y %H:%M:%S',
'level': logging.DEBUG,
'stream': sys.stdout,
}
logging.basicConfig(**kw)
def main():
setup_logging()
if len(sys.argv) > 1:
test(int(sys.argv[1]))
else:
test(100)
print 'test passed'
if __name__ == '__main__':
main()

View File

@@ -0,0 +1,45 @@
import os
import logging
import libevent
from pysearpc import searpc_server
from ccnet.async import AsyncClient, RpcServerProc
RPC_SERVICE_NAME = 'test-rpcserver'
CCNET_CONF_DIR = os.path.expanduser('~/.ccnet')
def init_logging():
"""Configure logging module"""
level = logging.DEBUG
kw = {
'format': '[%(asctime)s] %(message)s',
'datefmt': '%m/%d/%Y %H:%M:%S',
'level': level,
}
logging.basicConfig(**kw)
i = 0
def register_rpc_functions(session):
def str_mul(a, b):
global i
i = i + 1
print '[%s] a = %s, b = %s' % (i, a, b)
return a * b
searpc_server.create_service(RPC_SERVICE_NAME)
searpc_server.register_function(RPC_SERVICE_NAME, str_mul)
session.register_service(RPC_SERVICE_NAME, 'basic', RpcServerProc)
def main():
init_logging()
evbase = libevent.Base()
session = AsyncClient(CCNET_CONF_DIR, evbase, CCNET_CONF_DIR)
session.connect_daemon()
register_rpc_functions(session)
session.main_loop()
if __name__ == '__main__':
main()

45
python/ccnet/utils.py Normal file
View File

@@ -0,0 +1,45 @@
import os
import socket
from ccnet.errors import NetworkError
def recvall(fd, total):
remain = total
data = ''
while remain > 0:
try:
new = fd.recv(remain)
except socket.error as e:
raise NetworkError('Failed to read from socket: %s' % e)
n = len(new)
if n <= 0:
raise NetworkError("Failed to read from socket")
else:
data += new
remain -= n
return data
def sendall(fd, data):
total = len(data)
offset = 0
while offset < total:
try:
n = fd.send(data[offset:])
except socket.error as e:
raise NetworkError('Failed to write to socket: %s' % e)
if n <= 0:
raise NetworkError('Failed to write to socket')
else:
offset += n
def is_win32():
return os.name == 'nt'
def make_socket_closeonexec(fd):
if not is_win32():
import fcntl
old_flags = fcntl.fcntl(fd, fcntl.F_GETFD)
fcntl.fcntl(fd, fcntl.F_SETFD, old_flags | fcntl.FD_CLOEXEC)