[PATCH v2] tools: add jwt renewal function to acvp_tool
jspewock at iol.unh.edu
jspewock at iol.unh.edu
Mon Jun 26 23:36:01 CEST 2023
From: Jeremy Spewock <jspewock at iol.unh.edu>
Adds a method that follows the process for renewing your jwt according
to NIST API documentation. This way, if there are load issues and it
takes too long to get vectors back with multi-algorithm testing, the
script will always handle the jwt expiring.
Also added a maximum number of renewals so that the script cannot run
infinitely.
Signed-off-by: Jeremy Spewock <jspewock at iol.unh.edu>
---
tools/acvp/acvp_tool.py | 71 ++++++++++++++++++++++++++++++++++++-----
1 file changed, 63 insertions(+), 8 deletions(-)
mode change 100755 => 100644 tools/acvp/acvp_tool.py
diff --git a/tools/acvp/acvp_tool.py b/tools/acvp/acvp_tool.py
old mode 100755
new mode 100644
index 40d2f2f..a28760d
--- a/tools/acvp/acvp_tool.py
+++ b/tools/acvp/acvp_tool.py
@@ -1,7 +1,7 @@
#!/usr/bin/env python3
# SPDX-License-Identifier: BSD-3-Clause
-# Copyright 2022 The University of New Hampshire
+# Copyright 2023 The University of New Hampshire
import hashlib
import sys
@@ -36,6 +36,8 @@ class ACVPProxy:
self.totp_path: str = totp_path
self.login_data: Optional[Dict[str, Any]] = None
self.session_data: Optional[Dict[str, Any]] = None
+ self.retries: int = 0
+ self.max_retries: int = 2
with open(config_path, 'r') as f:
self.config: Any = json.load(f)
@@ -70,7 +72,13 @@ class ACVPProxy:
cert=self.cert,
headers={'Authorization': f'Bearer {token}'}
)
- if not response.ok:
+ if response.status_code == 401:
+ if self.__renew_jwt():
+ token = self.session_data['jwt']
+ continue
+ logging.error("Failed to renew expired jwt")
+ return None
+ elif not response.ok:
logging.error(f'Failed to fetch vector set {url}')
logging.error(json.dumps(response.json(), indent=4))
return None
@@ -85,6 +93,35 @@ class ACVPProxy:
logging.info(f'Downloaded vector set {url}')
return vector_set_json
+ def __renew_jwt(self) -> bool:
+ """Renews the jwt in session_data.
+
+ JWTs provided by the NIST API last 30 minutes which can cause this
+ script to fail even with good data. This method renews the jwt using
+ the login endpoint.
+
+ @return: True if successfully renewed token
+ """
+ if self.retries >= self.max_retries:
+ logging.error("Maximum number of jwt renewals has been reached.")
+ return False
+ response = requests.post(
+ url=f'{self.config["url"]}/acvp/v1/login',
+ json=[
+ {'acvVersion': '1.0'},
+ {
+ 'password': self.__get_totp(),
+ 'accessToken': self.session_data["jwt"]
+ }
+ ],
+ cert=self.cert,
+ )
+ if response.ok:
+ self.retries += 1
+ self.session_data["jwt"] = response.json()[1].pop("accessToken")
+ return True
+ return False
+
def login(self) -> bool:
"""Log into the API server.
@@ -178,6 +215,12 @@ class ACVPProxy:
'Authorization': f'Bearer {self.session_data["jwt"]}'
}
)
+ if result.status_code == 401:
+ if self.__renew_jwt():
+ write_data[0]["jwt"] = self.session_data["jwt"]
+ continue
+ logging.error("Failed to renew jwt")
+ return None
version, result_json = result.json()
if 'retry' in result_json:
duration = result_json['retry']
@@ -208,7 +251,19 @@ class ACVPProxy:
headers={'Authorization': f'Bearer {self.session_data["jwt"]}'}
)
- if not response.ok:
+ if response.status_code == 401:
+ if self.__renew_jwt():
+ response = requests.post(
+ f'{self.config["url"]}{session_url}/vectorSets/'
+ f'{vector_set["vsId"]}/results',
+ json=[version, vector_set],
+ cert=self.cert,
+ headers={'Authorization': f'Bearer {self.session_data["jwt"]}'}
+ )
+ else:
+ logging.error("Failed to renew jwt")
+ return False
+ elif not response.ok:
has_error = True
logging.error(f'Could not upload vector set response for '
f'vector set ID {vector_set["vsId"]}.')
@@ -239,13 +294,13 @@ def main(request_path: Optional[str],
config_path=config_path,
)
- logging.info('Attempting to log in...')
- if not proxy.login():
- logging.error('Could not log in.')
- sys.exit(1)
- logging.info('Successfully logged in.')
if request_path:
+ logging.info('Attempting to log in...')
+ if not proxy.login():
+ logging.error('Could not log in.')
+ sys.exit(1)
+ logging.info('Successfully logged in.')
logging.info('Creating a new test session and downloading vectors...')
test_session = proxy.register()
if not test_session:
--
2.41.0
More information about the ci
mailing list