Test the AES-CCM test vectors from the NIST Known Answer Tests.
The CCM test vectors use a slightly different file format in that there are global key-value pairs as well as section key-value pairs that need to be used in each test. In addition, the sections can set multiple key-value pairs in the section name. The CCM KAT parser class is an iterator that returns a dictionary once per test where the dictionary contains all of the relevant key-value pairs for a given test (global, section name, section, test-specific). Note that all of the CCM decrypt tests use nonce and tag lengths that are not supported by OCF (OCF only supports a 12 byte nonce and 16 byte tag), so none of the decryption vectors are actually tested. Reviewed by: ngie MFC after: 1 month Sponsored by: Chelsio Communications Differential Revision: https://reviews.freebsd.org/D19978
This commit is contained in:
parent
5ca25aadd9
commit
84609db994
@ -381,6 +381,112 @@ class KATParser:
|
||||
|
||||
yield values
|
||||
|
||||
# The CCM files use a bit of a different syntax that doesn't quite fit
|
||||
# the generic KATParser. In particular, some keys are set globally at
|
||||
# the start of the file, and some are set globally at the start of a
|
||||
# section.
|
||||
class KATCCMParser:
|
||||
def __init__(self, fname):
|
||||
self.fp = open(fname)
|
||||
self._pending = None
|
||||
self.read_globals()
|
||||
|
||||
def read_globals(self):
|
||||
self.global_values = {}
|
||||
while True:
|
||||
line = self.fp.readline()
|
||||
if not line:
|
||||
return
|
||||
if line[0] == '#' or not line.strip():
|
||||
continue
|
||||
if line[0] == '[':
|
||||
self._pending = line
|
||||
return
|
||||
|
||||
try:
|
||||
f, v = line.split(' =')
|
||||
except:
|
||||
print('line:', repr(line))
|
||||
raise
|
||||
|
||||
v = v.strip()
|
||||
|
||||
if f in self.global_values:
|
||||
raise ValueError('already present: %r' % repr(f))
|
||||
self.global_values[f] = v
|
||||
|
||||
def read_section_values(self, kwpairs):
|
||||
self.section_values = self.global_values.copy()
|
||||
for pair in kwpairs.split(', '):
|
||||
f, v = pair.split(' = ')
|
||||
if f in self.section_values:
|
||||
raise ValueError('already present: %r' % repr(f))
|
||||
self.section_values[f] = v
|
||||
|
||||
while True:
|
||||
line = self.fp.readline()
|
||||
if not line:
|
||||
return
|
||||
if line[0] == '#' or not line.strip():
|
||||
continue
|
||||
if line[0] == '[':
|
||||
self._pending = line
|
||||
return
|
||||
|
||||
try:
|
||||
f, v = line.split(' =')
|
||||
except:
|
||||
print('line:', repr(line))
|
||||
raise
|
||||
|
||||
if f == 'Count':
|
||||
self._pending = line
|
||||
return
|
||||
|
||||
v = v.strip()
|
||||
|
||||
if f in self.section_values:
|
||||
raise ValueError('already present: %r' % repr(f))
|
||||
self.section_values[f] = v
|
||||
|
||||
def __iter__(self):
|
||||
while True:
|
||||
if self._pending:
|
||||
line = self._pending
|
||||
self._pending = None
|
||||
else:
|
||||
line = self.fp.readline()
|
||||
if not line:
|
||||
return
|
||||
|
||||
if (line and line[0] == '#') or not line.strip():
|
||||
continue
|
||||
|
||||
if line[0] == '[':
|
||||
section = line[1:].split(']', 1)[0]
|
||||
self.read_section_values(section)
|
||||
continue
|
||||
|
||||
values = self.section_values.copy()
|
||||
|
||||
while True:
|
||||
try:
|
||||
f, v = line.split(' =')
|
||||
except:
|
||||
print('line:', repr(line))
|
||||
raise
|
||||
v = v.strip()
|
||||
|
||||
if f in values:
|
||||
raise ValueError('already present: %r' % repr(f))
|
||||
values[f] = v
|
||||
line = self.fp.readline().strip()
|
||||
if not line:
|
||||
break
|
||||
|
||||
yield values
|
||||
|
||||
|
||||
def _spdechex(s):
|
||||
return ''.join(s.split()).decode('hex')
|
||||
|
||||
|
@ -71,6 +71,14 @@ def GenTestCase(cname):
|
||||
for i in katg('KAT_AES', 'CBC[GKV]*.rsp'):
|
||||
self.runCBC(i)
|
||||
|
||||
@unittest.skipIf(cname not in aesmodules, 'skipping AES-CCM on %s' % (cname))
|
||||
def test_ccm(self):
|
||||
for i in katg('ccmtestvectors', 'V*.rsp'):
|
||||
self.runCCMEncrypt(i)
|
||||
|
||||
for i in katg('ccmtestvectors', 'D*.rsp'):
|
||||
self.runCCMDecrypt(i)
|
||||
|
||||
@unittest.skipIf(cname not in aesmodules, 'skipping AES-GCM on %s' % (cname))
|
||||
def test_gcm(self):
|
||||
for i in katg('gcmtestvectors', 'gcmEncrypt*'):
|
||||
@ -220,6 +228,93 @@ def GenTestCase(cname):
|
||||
continue
|
||||
self.assertEqual(r, ct)
|
||||
|
||||
def runCCMEncrypt(self, fname):
|
||||
for data in cryptodev.KATCCMParser(fname):
|
||||
Nlen = int(data['Nlen'])
|
||||
if Nlen != 12:
|
||||
# OCF only supports 12 byte IVs
|
||||
continue
|
||||
key = data['Key'].decode('hex')
|
||||
nonce = data['Nonce'].decode('hex')
|
||||
Alen = int(data['Alen'])
|
||||
if Alen != 0:
|
||||
aad = data['Adata'].decode('hex')
|
||||
else:
|
||||
aad = None
|
||||
payload = data['Payload'].decode('hex')
|
||||
ct = data['CT'].decode('hex')
|
||||
|
||||
try:
|
||||
c = Crypto(crid=crid,
|
||||
cipher=cryptodev.CRYPTO_AES_CCM_16,
|
||||
key=key,
|
||||
mac=cryptodev.CRYPTO_AES_CCM_CBC_MAC,
|
||||
mackey=key, maclen=16)
|
||||
r, tag = Crypto.encrypt(c, payload,
|
||||
nonce, aad)
|
||||
except EnvironmentError, e:
|
||||
if e.errno != errno.EOPNOTSUPP:
|
||||
raise
|
||||
continue
|
||||
|
||||
out = r + tag
|
||||
self.assertEqual(out, ct,
|
||||
"Count " + data['Count'] + " Actual: " + \
|
||||
repr(out.encode("hex")) + " Expected: " + \
|
||||
repr(data) + " on " + cname)
|
||||
|
||||
def runCCMDecrypt(self, fname):
|
||||
# XXX: Note that all of the current CCM
|
||||
# decryption test vectors use IV and tag sizes
|
||||
# that aren't supported by OCF none of the
|
||||
# tests are actually ran.
|
||||
for data in cryptodev.KATCCMParser(fname):
|
||||
Nlen = int(data['Nlen'])
|
||||
if Nlen != 12:
|
||||
# OCF only supports 12 byte IVs
|
||||
continue
|
||||
Tlen = int(data['Tlen'])
|
||||
if Tlen != 16:
|
||||
# OCF only supports 16 byte tags
|
||||
continue
|
||||
key = data['Key'].decode('hex')
|
||||
nonce = data['Nonce'].decode('hex')
|
||||
Alen = int(data['Alen'])
|
||||
if Alen != 0:
|
||||
aad = data['Adata'].decode('hex')
|
||||
else:
|
||||
aad = None
|
||||
ct = data['CT'].decode('hex')
|
||||
tag = ct[-16:]
|
||||
ct = ct[:-16]
|
||||
|
||||
try:
|
||||
c = Crypto(crid=crid,
|
||||
cipher=cryptodev.CRYPTO_AES_CCM_16,
|
||||
key=key,
|
||||
mac=cryptodev.CRYPTO_AES_CCM_CBC_MAC,
|
||||
mackey=key, maclen=16)
|
||||
except EnvironmentError, e:
|
||||
if e.errno != errno.EOPNOTSUPP:
|
||||
raise
|
||||
continue
|
||||
|
||||
if data['Result'] == 'Fail':
|
||||
self.assertRaises(IOError,
|
||||
c.decrypt, payload, nonce, aad, tag)
|
||||
else:
|
||||
r = Crypto.decrypt(c, payload, nonce,
|
||||
aad, tag)
|
||||
|
||||
payload = data['Payload'].decode('hex')
|
||||
Plen = int(data('Plen'))
|
||||
payload = payload[:plen]
|
||||
self.assertEqual(r, payload,
|
||||
"Count " + data['Count'] + \
|
||||
" Actual: " + repr(r.encode("hex")) + \
|
||||
" Expected: " + repr(data) + \
|
||||
" on " + cname)
|
||||
|
||||
###############
|
||||
##### DES #####
|
||||
###############
|
||||
|
Loading…
x
Reference in New Issue
Block a user