numam-spdk/scripts/rpc/client.py
Michal Berger 6c1a1a3dca scripts/rpc: Make sure address argument is properly interpreted
In case the addr argument was not an existing unix socket file the rpc
client would consider it to be an actual ip address. As a result
connect() would be called with improper set of arguments. This could
cause the rpc.py to block for undesired amount of time until connect()
finally decided to return (seen on some fedora33 builds).

This was affecting sh wrapper functions like waitforlisten() which
use rpc.py to determine if given app is ready to be talk to blocking
execution of the tests for way too long then intendent.

To avoid such a scenario determine the format of the address and use
routines proper for given address family.

Signed-off-by: Michal Berger <michalx.berger@intel.com>
Change-Id: Iaac701d72c772629fa7c6478ff4781b0c5d485d5
Reviewed-on: https://review.spdk.io/gerrit/c/spdk/spdk/+/7777
Community-CI: Mellanox Build Bot
Tested-by: SPDK CI Jenkins <sys_sgci@intel.com>
Reviewed-by: Paul Luse <paul.e.luse@intel.com>
Reviewed-by: Jim Harris <james.r.harris@intel.com>
Reviewed-by: Tomasz Zawadzki <tomasz.zawadzki@intel.com>
Reviewed-by: Karol Latecki <karol.latecki@intel.com>
2021-05-24 10:11:05 +00:00

202 lines
6.7 KiB
Python

import json
import socket
import time
import os
import logging
import copy
def print_dict(d):
print(json.dumps(d, indent=2))
def print_json(s):
print(json.dumps(s, indent=2).strip('"'))
def get_addr_type(addr):
try:
socket.inet_pton(socket.AF_INET, addr)
return socket.AF_INET
except Exception as e:
pass
try:
socket.inet_pton(socket.AF_INET6, addr)
return socket.AF_INET6
except Exception as e:
pass
if os.path.exists(addr):
return socket.AF_UNIX
return None
class JSONRPCException(Exception):
def __init__(self, message):
self.message = message
class JSONRPCClient(object):
def __init__(self, addr, port=None, timeout=60.0, **kwargs):
self.sock = None
ch = logging.StreamHandler()
ch.setFormatter(logging.Formatter('%(levelname)s: %(message)s'))
ch.setLevel(logging.DEBUG)
self._logger = logging.getLogger("JSONRPCClient(%s)" % addr)
self._logger.addHandler(ch)
self.log_set_level(kwargs.get('log_level', logging.ERROR))
connect_retries = kwargs.get('conn_retries', 0)
self.timeout = timeout
self._request_id = 0
self._recv_buf = ""
self._reqs = []
for i in range(connect_retries):
try:
self._connect(addr, port)
return
except Exception as e:
# ignore and retry in 200ms
time.sleep(0.2)
# try one last time without try/except
self._connect(addr, port)
def __enter__(self):
return self
def __exit__(self, exception_type, exception_value, traceback):
self.close()
def _connect(self, addr, port):
try:
addr_type = get_addr_type(addr)
if addr_type == socket.AF_UNIX:
self._logger.debug("Trying to connect to UNIX socket: %s", addr)
self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
self.sock.connect(addr)
elif addr_type == socket.AF_INET6:
self._logger.debug("Trying to connect to IPv6 address addr:%s, port:%i", addr, port)
for res in socket.getaddrinfo(addr, port, socket.AF_INET6, socket.SOCK_STREAM, socket.SOL_TCP):
af, socktype, proto, canonname, sa = res
self.sock = socket.socket(af, socktype, proto)
self.sock.connect(sa)
elif addr_type == socket.AF_INET:
self._logger.debug("Trying to connect to IPv4 address addr:%s, port:%i'", addr, port)
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.connect((addr, port))
else:
raise socket.error("Invalid or non-existing address: '%s'" % addr)
except socket.error as ex:
raise JSONRPCException("Error while connecting to %s\n"
"Is SPDK application running?\n"
"Error details: %s" % (addr, ex))
def get_logger(self):
return self._logger
"""Set logging level
Args:
lvl: Log level to set as accepted by logger.setLevel
"""
def log_set_level(self, lvl):
self._logger.info("Setting log level to %s", lvl)
self._logger.setLevel(lvl)
self._logger.info("Log level set to %s", lvl)
def close(self):
if getattr(self, "sock", None):
self.sock.shutdown(socket.SHUT_RDWR)
self.sock.close()
self.sock = None
def add_request(self, method, params):
self._request_id += 1
req = {
'jsonrpc': '2.0',
'method': method,
'id': self._request_id
}
if params:
req['params'] = copy.deepcopy(params)
self._logger.debug("append request:\n%s\n", json.dumps(req))
self._reqs.append(req)
return self._request_id
def flush(self):
self._logger.debug("Flushing buffer")
# TODO: We can drop indent parameter
reqstr = "\n".join(json.dumps(req, indent=2) for req in self._reqs)
self._reqs = []
self._logger.info("Requests:\n%s\n", reqstr)
self.sock.sendall(reqstr.encode("utf-8"))
def send(self, method, params=None):
id = self.add_request(method, params)
self.flush()
return id
def decode_one_response(self):
try:
self._logger.debug("Trying to decode response '%s'", self._recv_buf)
buf = self._recv_buf.lstrip()
obj, idx = json.JSONDecoder().raw_decode(buf)
self._recv_buf = buf[idx:]
return obj
except ValueError:
self._logger.debug("Partial response")
return None
def recv(self):
start_time = time.process_time()
response = self.decode_one_response()
while not response:
try:
timeout = self.timeout - (time.process_time() - start_time)
self.sock.settimeout(timeout)
newdata = self.sock.recv(4096)
if not newdata:
self.sock.close()
self.sock = None
raise JSONRPCException("Connection closed with partial response:\n%s\n" % self._recv_buf)
self._recv_buf += newdata.decode("utf-8")
response = self.decode_one_response()
except socket.timeout:
break # throw exception after loop to avoid Python freaking out about nested exceptions
except ValueError:
continue # incomplete response; keep buffering
if not response:
raise JSONRPCException("Timeout while waiting for response:\n%s\n" % self._recv_buf)
self._logger.info("response:\n%s\n", json.dumps(response, indent=2))
return response
def call(self, method, params={}):
self._logger.debug("call('%s')" % method)
req_id = self.send(method, params)
try:
response = self.recv()
except JSONRPCException as e:
""" Don't expect response to kill """
if not self.sock and method == "spdk_kill_instance":
self._logger.info("Connection terminated but ignoring since method is '%s'" % method)
return {}
else:
raise e
if 'error' in response:
params["method"] = method
params["req_id"] = req_id
msg = "\n".join(["request:", "%s" % json.dumps(params, indent=2),
"Got JSON-RPC error response",
"response:",
json.dumps(response['error'], indent=2)])
raise JSONRPCException(msg)
return response['result']