6c1a1a3dca
In case the addr argument was not an existing unix socket file the rpc client would consider it to be an actual ip address. As a result connect() would be called with improper set of arguments. This could cause the rpc.py to block for undesired amount of time until connect() finally decided to return (seen on some fedora33 builds). This was affecting sh wrapper functions like waitforlisten() which use rpc.py to determine if given app is ready to be talk to blocking execution of the tests for way too long then intendent. To avoid such a scenario determine the format of the address and use routines proper for given address family. Signed-off-by: Michal Berger <michalx.berger@intel.com> Change-Id: Iaac701d72c772629fa7c6478ff4781b0c5d485d5 Reviewed-on: https://review.spdk.io/gerrit/c/spdk/spdk/+/7777 Community-CI: Mellanox Build Bot Tested-by: SPDK CI Jenkins <sys_sgci@intel.com> Reviewed-by: Paul Luse <paul.e.luse@intel.com> Reviewed-by: Jim Harris <james.r.harris@intel.com> Reviewed-by: Tomasz Zawadzki <tomasz.zawadzki@intel.com> Reviewed-by: Karol Latecki <karol.latecki@intel.com>
202 lines
6.7 KiB
Python
202 lines
6.7 KiB
Python
import json
|
|
import socket
|
|
import time
|
|
import os
|
|
import logging
|
|
import copy
|
|
|
|
|
|
def print_dict(d):
|
|
print(json.dumps(d, indent=2))
|
|
|
|
|
|
def print_json(s):
|
|
print(json.dumps(s, indent=2).strip('"'))
|
|
|
|
|
|
def get_addr_type(addr):
|
|
try:
|
|
socket.inet_pton(socket.AF_INET, addr)
|
|
return socket.AF_INET
|
|
except Exception as e:
|
|
pass
|
|
try:
|
|
socket.inet_pton(socket.AF_INET6, addr)
|
|
return socket.AF_INET6
|
|
except Exception as e:
|
|
pass
|
|
if os.path.exists(addr):
|
|
return socket.AF_UNIX
|
|
return None
|
|
|
|
|
|
class JSONRPCException(Exception):
|
|
def __init__(self, message):
|
|
self.message = message
|
|
|
|
|
|
class JSONRPCClient(object):
|
|
def __init__(self, addr, port=None, timeout=60.0, **kwargs):
|
|
self.sock = None
|
|
ch = logging.StreamHandler()
|
|
ch.setFormatter(logging.Formatter('%(levelname)s: %(message)s'))
|
|
ch.setLevel(logging.DEBUG)
|
|
self._logger = logging.getLogger("JSONRPCClient(%s)" % addr)
|
|
self._logger.addHandler(ch)
|
|
self.log_set_level(kwargs.get('log_level', logging.ERROR))
|
|
connect_retries = kwargs.get('conn_retries', 0)
|
|
|
|
self.timeout = timeout
|
|
self._request_id = 0
|
|
self._recv_buf = ""
|
|
self._reqs = []
|
|
|
|
for i in range(connect_retries):
|
|
try:
|
|
self._connect(addr, port)
|
|
return
|
|
except Exception as e:
|
|
# ignore and retry in 200ms
|
|
time.sleep(0.2)
|
|
|
|
# try one last time without try/except
|
|
self._connect(addr, port)
|
|
|
|
def __enter__(self):
|
|
return self
|
|
|
|
def __exit__(self, exception_type, exception_value, traceback):
|
|
self.close()
|
|
|
|
def _connect(self, addr, port):
|
|
try:
|
|
addr_type = get_addr_type(addr)
|
|
|
|
if addr_type == socket.AF_UNIX:
|
|
self._logger.debug("Trying to connect to UNIX socket: %s", addr)
|
|
self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
|
|
self.sock.connect(addr)
|
|
elif addr_type == socket.AF_INET6:
|
|
self._logger.debug("Trying to connect to IPv6 address addr:%s, port:%i", addr, port)
|
|
for res in socket.getaddrinfo(addr, port, socket.AF_INET6, socket.SOCK_STREAM, socket.SOL_TCP):
|
|
af, socktype, proto, canonname, sa = res
|
|
self.sock = socket.socket(af, socktype, proto)
|
|
self.sock.connect(sa)
|
|
elif addr_type == socket.AF_INET:
|
|
self._logger.debug("Trying to connect to IPv4 address addr:%s, port:%i'", addr, port)
|
|
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
self.sock.connect((addr, port))
|
|
else:
|
|
raise socket.error("Invalid or non-existing address: '%s'" % addr)
|
|
except socket.error as ex:
|
|
raise JSONRPCException("Error while connecting to %s\n"
|
|
"Is SPDK application running?\n"
|
|
"Error details: %s" % (addr, ex))
|
|
|
|
def get_logger(self):
|
|
return self._logger
|
|
|
|
"""Set logging level
|
|
|
|
Args:
|
|
lvl: Log level to set as accepted by logger.setLevel
|
|
"""
|
|
def log_set_level(self, lvl):
|
|
self._logger.info("Setting log level to %s", lvl)
|
|
self._logger.setLevel(lvl)
|
|
self._logger.info("Log level set to %s", lvl)
|
|
|
|
def close(self):
|
|
if getattr(self, "sock", None):
|
|
self.sock.shutdown(socket.SHUT_RDWR)
|
|
self.sock.close()
|
|
self.sock = None
|
|
|
|
def add_request(self, method, params):
|
|
self._request_id += 1
|
|
req = {
|
|
'jsonrpc': '2.0',
|
|
'method': method,
|
|
'id': self._request_id
|
|
}
|
|
|
|
if params:
|
|
req['params'] = copy.deepcopy(params)
|
|
|
|
self._logger.debug("append request:\n%s\n", json.dumps(req))
|
|
self._reqs.append(req)
|
|
return self._request_id
|
|
|
|
def flush(self):
|
|
self._logger.debug("Flushing buffer")
|
|
# TODO: We can drop indent parameter
|
|
reqstr = "\n".join(json.dumps(req, indent=2) for req in self._reqs)
|
|
self._reqs = []
|
|
self._logger.info("Requests:\n%s\n", reqstr)
|
|
self.sock.sendall(reqstr.encode("utf-8"))
|
|
|
|
def send(self, method, params=None):
|
|
id = self.add_request(method, params)
|
|
self.flush()
|
|
return id
|
|
|
|
def decode_one_response(self):
|
|
try:
|
|
self._logger.debug("Trying to decode response '%s'", self._recv_buf)
|
|
buf = self._recv_buf.lstrip()
|
|
obj, idx = json.JSONDecoder().raw_decode(buf)
|
|
self._recv_buf = buf[idx:]
|
|
return obj
|
|
except ValueError:
|
|
self._logger.debug("Partial response")
|
|
return None
|
|
|
|
def recv(self):
|
|
start_time = time.process_time()
|
|
response = self.decode_one_response()
|
|
while not response:
|
|
try:
|
|
timeout = self.timeout - (time.process_time() - start_time)
|
|
self.sock.settimeout(timeout)
|
|
newdata = self.sock.recv(4096)
|
|
if not newdata:
|
|
self.sock.close()
|
|
self.sock = None
|
|
raise JSONRPCException("Connection closed with partial response:\n%s\n" % self._recv_buf)
|
|
self._recv_buf += newdata.decode("utf-8")
|
|
response = self.decode_one_response()
|
|
except socket.timeout:
|
|
break # throw exception after loop to avoid Python freaking out about nested exceptions
|
|
except ValueError:
|
|
continue # incomplete response; keep buffering
|
|
|
|
if not response:
|
|
raise JSONRPCException("Timeout while waiting for response:\n%s\n" % self._recv_buf)
|
|
|
|
self._logger.info("response:\n%s\n", json.dumps(response, indent=2))
|
|
return response
|
|
|
|
def call(self, method, params={}):
|
|
self._logger.debug("call('%s')" % method)
|
|
req_id = self.send(method, params)
|
|
try:
|
|
response = self.recv()
|
|
except JSONRPCException as e:
|
|
""" Don't expect response to kill """
|
|
if not self.sock and method == "spdk_kill_instance":
|
|
self._logger.info("Connection terminated but ignoring since method is '%s'" % method)
|
|
return {}
|
|
else:
|
|
raise e
|
|
|
|
if 'error' in response:
|
|
params["method"] = method
|
|
params["req_id"] = req_id
|
|
msg = "\n".join(["request:", "%s" % json.dumps(params, indent=2),
|
|
"Got JSON-RPC error response",
|
|
"response:",
|
|
json.dumps(response['error'], indent=2)])
|
|
raise JSONRPCException(msg)
|
|
|
|
return response['result']
|