Skip to content
Snippets Groups Projects
Unverified Commit f098633c authored by Nimisha Asthagiri's avatar Nimisha Asthagiri Committed by GitHub
Browse files

Merge pull request #18681 from edx/arch/generate-keys-command

Management command: generate_jwt_signing_key
parents 3222d3fd ae9b8956
No related merge requests found
......@@ -581,6 +581,34 @@ VIDEO_TRANSCRIPTS_SETTINGS = dict(
DIRECTORY_PREFIX='video-transcripts/',
)
####################### Authentication Settings ##########################
JWT_AUTH.update({
'JWT_PUBLIC_SIGNING_JWK_SET': (
'{"keys": [{"kid": "TEST_KEY", "e": "AQAB", "kty": "RSA", "n": "smKFSYowG6nNUAdeqH1jQQnH1PmIHphzBmwJ5vRf1vu'
'48BUI5VcVtUWIPqzRK_LDSlZYh9D0YFL0ZTxIrlb6Tn3Xz7pYvpIAeYuQv3_H5p8tbz7Fb8r63c1828wXPITVTv8f7oxx5W3lFFgpFAyYMmROC'
'4Ee9qG5T38LFe8_oAuFCEntimWxN9F3P-FJQy43TL7wG54WodgiM0EgzkeLr5K6cDnyckWjTuZbWI-4ffcTgTZsL_Kq1owa_J2ngEfxMCObnzG'
'y5ZLcTUomo4rZLjghVpq6KZxfS6I1Vz79ZsMVUWEdXOYePCKKsrQG20ogQEkmTf9FT_SouC6jPcHLXw"}, {"kid": "BTZ9HA6K", "e": "A'
'QAB", "kty": "RSA", "n": "o5cn3ljSRi6FaDEKTn0PS-oL9EFyv1pI7dRgffQLD1qf5D6sprmYfWWokSsrWig8u2y0HChSygR6Jn5KXBqQ'
'n6FpM0dDJLnWQDRXHLl3Ey1iPYgDSmOIsIGrV9ZyNCQwk03wAgWbfdBTig3QSDYD-sTNOs3pc4UD_PqAvU2nz_1SS2ZiOwOn5F6gulE1L0iE3K'
'EUEvOIagfHNVhz0oxa_VRZILkzV-zr6R_TW1m97h4H8jXl_VJyQGyhMGGypuDrQ9_vaY_RLEulLCyY0INglHWQ7pckxBtI5q55-Vio2wgewe2_'
'qYcGsnBGaDNbySAsvYcWRrqDiFyzrJYivodqTQ"}]}'
),
'JWT_PRIVATE_SIGNING_JWK': (
'{"e": "AQAB", "d": "HIiV7KNjcdhVbpn3KT-I9n3JPf5YbGXsCIedmPqDH1d4QhBofuAqZ9zebQuxkRUpmqtYMv0Zi6ECSUqH387GYQF_Xv'
'FUFcjQRPycISd8TH0DAKaDpGr-AYNshnKiEtQpINhcP44I1AYNPCwyoxXA1fGTtmkKChsuWea7o8kytwU5xSejvh5-jiqu2SF4GEl0BEXIAPZs'
'gbzoPIWNxgO4_RzNnWs6nJZeszcaDD0CyezVSuH9QcI6g5QFzAC_YuykSsaaFJhZ05DocBsLczShJ9Omf6PnK9xlm26I84xrEh_7x4fVmNBg3x'
'WTLh8qOnHqGko93A1diLRCrKHOvnpvgQ", "n": "o5cn3ljSRi6FaDEKTn0PS-oL9EFyv1pI7dRgffQLD1qf5D6sprmYfWWokSsrWig8u2y0H'
'ChSygR6Jn5KXBqQn6FpM0dDJLnWQDRXHLl3Ey1iPYgDSmOIsIGrV9ZyNCQwk03wAgWbfdBTig3QSDYD-sTNOs3pc4UD_PqAvU2nz_1SS2ZiOwO'
'n5F6gulE1L0iE3KEUEvOIagfHNVhz0oxa_VRZILkzV-zr6R_TW1m97h4H8jXl_VJyQGyhMGGypuDrQ9_vaY_RLEulLCyY0INglHWQ7pckxBtI5'
'q55-Vio2wgewe2_qYcGsnBGaDNbySAsvYcWRrqDiFyzrJYivodqTQ", "q": "3T3DEtBUka7hLGdIsDlC96Uadx_q_E4Vb1cxx_4Ss_wGp1Lo'
'z3N3ZngGyInsKlmbBgLo1Ykd6T9TRvRNEWEtFSOcm2INIBoVoXk7W5RuPa8Cgq2tjQj9ziGQ08JMejrPlj3Q1wmALJr5VTfvSYBu0WkljhKNCy'
'1KB6fCby0C9WE", "p": "vUqzWPZnDG4IXyo-k5F0bHV0BNL_pVhQoLW7eyFHnw74IOEfSbdsMspNcPSFIrtgPsn7981qv3lN_staZ6JflKfH'
'ayjB_lvltHyZxfl0dvruShZOx1N6ykEo7YrAskC_qxUyrIvqmJ64zPW3jkuOYrFs7Ykj3zFx3Zq1H5568G0", "kid": "TEST_KEY", "kty"'
': "RSA"}'
),
})
####################### Plugin Settings ##########################
from openedx.core.djangoapps.plugins import plugin_settings, constants as plugin_constants
......
"""
Management command for generating an asymmetric keypair to sign JSON Web Tokens.
"""
# pylint: disable=missing-docstring
from __future__ import print_function, unicode_literals
import logging
import json
import random
import string
from argparse import RawTextHelpFormatter
from django.conf import settings
from django.core.management.base import BaseCommand
from Cryptodome.PublicKey import RSA
from jwkest import jwk
log = logging.getLogger(__name__)
class Command(BaseCommand):
"""Generates an asymmetric keypair to sign JSON Web Tokens."""
help = '''
Generates an asymmetric keypair to sign JSON Web Tokens. Outputs the
generated public and private keys in YAML format as required by Open edX
configuration settings.
This same command can be used over time to rotate keys. Simply rerun this
command and public keys configured in the past will be automatically
included in the JWK keyset in the YAML output (unless the option
not-add-previous-public-keys is provided).
New keys are identified by a "kid" value that is automatically generated of
length 'key-id-size' (unless you explicitly provide a "kid" of your own via
the 'key-id' option).
See https://github.com/edx/edx-platform/blob/master/openedx/core/djangoapps/oauth_dispatch/docs/decisions/0008-use-asymmetric-jwts.rst
'''
def create_parser(self, *args, **kwargs): # pylint: disable=arguments-differ
parser = super(Command, self).create_parser(*args, **kwargs)
parser.formatter_class = RawTextHelpFormatter
return parser
def add_arguments(self, parser):
parser.add_argument(
'--key-size',
action='store',
dest='key_size',
default=2048,
type=int,
help='Size of RSA key, in bits; defaults to 2048',
)
parser.add_argument(
'--add-previous-public-keys',
action='store_true',
dest='add_previous_public_keys',
default=True,
help='Whether to add the previous set of public keys to the new public key set',
)
parser.add_argument(
'--not-add-previous-public-keys',
action='store_false',
dest='add_previous_public_keys',
help='Whether to NOT add the previous set of public keys to the new public key set',
)
group = parser.add_mutually_exclusive_group()
group.add_argument(
'--key-id',
action='store',
dest='key_id',
help='Unique identifier ("kid") of new key; defaults to a random value',
)
group.add_argument(
'--key-id-size',
action='store',
dest='key_id_size',
default=8,
type=int,
help='Size of randomly generated unique identifier ("kid") of the new key; defaults to 8',
)
def handle(self, *args, **options):
jwk_key = self._generate_key_pair(
options['key_size'],
options['key_id'] or self._generate_key_id(options['key_id_size']),
)
self._output_public_keys(jwk_key, options['add_previous_public_keys'])
self._output_private_keys(jwk_key)
def _generate_key_id(self, size, chars=string.ascii_uppercase + string.digits):
return ''.join(random.choice(chars) for _ in range(size))
def _generate_key_pair(self, key_size, key_id):
log.info('Generating new JWT signing keypair for key id %s.', key_id)
rsa_key = RSA.generate(key_size)
rsa_jwk = jwk.RSAKey(kid=key_id, key=rsa_key)
return rsa_jwk
def _output_public_keys(self, jwk_key, add_previous):
public_keys = jwk.KEYS()
if add_previous:
self._add_previous_public_keys(public_keys)
public_keys.append(jwk_key)
serialized_public_keys = public_keys.dump_jwks()
log.info('New JWT_PUBLIC_SIGNING_JWK_SET: %s.', serialized_public_keys)
print(" ")
print(" ")
print(" *** YAML to share with ALL IDAs ***")
print(" ")
print(" # The following is the string representation of a JSON Web Key Set (JWK set)")
print(" # containing all active public keys for verifying JWT signatures.")
print(
" # See https://github.com/edx/edx-platform/blob/master/openedx/core/djangoapps/oauth_dispatch/"
"docs/decisions/0008-use-asymmetric-jwts.rst"
)
print(" ")
print(" COMMON_JWT_PUBLIC_SIGNING_JWK_SET: '{}'".format(serialized_public_keys))
def _add_previous_public_keys(self, public_keys):
previous_signing_keys = settings.JWT_AUTH.get('JWT_PUBLIC_SIGNING_JWK_SET')
if previous_signing_keys:
log.info('Old JWT_PUBLIC_SIGNING_JWK_SET: %s.', previous_signing_keys)
public_keys.load_jwks(previous_signing_keys)
def _output_private_keys(self, jwk_key):
serialized_keypair = jwk_key.serialize(private=True)
serialized_keypair_json = json.dumps(serialized_keypair)
print(" ")
print(" ")
print(" *** YAML to keep PRIVATE within a single authentication service (LMS) ***")
print(" ")
print(" # The following is the string representation of a JSON Web Key (JWK)")
print(" # containing the single active private key for signing JSON Web Tokens (JWTs).")
print(
" # See https://github.com/edx/edx-platform/blob/master/openedx/core/djangoapps/oauth_dispatch/"
"docs/decisions/0008-use-asymmetric-jwts.rst"
)
print(" ")
print(" EDXAPP_JWT_PRIVATE_SIGNING_JWK: '{}'".format(serialized_keypair_json))
print(" ")
print(" EDXAPP_JWT_SIGNING_ALGORITHM: 'RS512'")
"""
Tests the ``generate_jwt_signing_key`` management command.
"""
# pylint: disable=missing-docstring
import sys
from contextlib import contextmanager
from StringIO import StringIO
import ddt
from mock import patch
from django.core.management import call_command
from django.test import TestCase
from openedx.core.djangolib.testing.utils import skip_unless_lms
COMMAND_NAME = 'generate_jwt_signing_key'
LOGGER = 'openedx.core.djangoapps.oauth_dispatch.management.commands.generate_jwt_signing_key.log.info'
TEST_KEY_IDENTIFIER = 'some_key_identifier'
@skip_unless_lms
@ddt.ddt
class TestGenerateJwtSigningKey(TestCase):
"""
Tests the ``generate_jwt_signing_key`` management command.
"""
@contextmanager
def _captured_output(self):
new_out, new_err = StringIO(), StringIO()
old_out, old_err = sys.stdout, sys.stderr
try:
sys.stdout, sys.stderr = new_out, new_err
yield sys.stdout, sys.stderr
finally:
sys.stdout, sys.stderr = old_out, old_err
def _assert_log_message(self, mock_log, message, expected_to_exist):
log_message_exists = any(
message in log_entry[0][0]
for log_entry in mock_log.call_args_list
)
self.assertEqual(log_message_exists, expected_to_exist)
def _assert_key_output(self, output_stream):
expected_in_output = (
'EDXAPP_JWT_PRIVATE_SIGNING_JWK', 'EDXAPP_JWT_SIGNING_ALGORITHM', 'COMMON_JWT_PUBLIC_SIGNING_JWK_SET'
)
for expected in expected_in_output:
self.assertIn(expected, output_stream.getvalue())
def _assert_presence_of_old_keys(self, mock_log, add_previous_public_keys):
self._assert_log_message(mock_log, 'Old JWT_PUBLIC_SIGNING_JWK_SET', expected_to_exist=add_previous_public_keys)
def _assert_presence_of_key_id(self, mock_log, output_stream, provide_key_id, key_id_size):
if provide_key_id:
self.assertIn(TEST_KEY_IDENTIFIER, output_stream.getvalue())
else:
self.assertNotIn(TEST_KEY_IDENTIFIER, output_stream.getvalue())
key_id = mock_log.call_args_list[0][0][1]
self.assertEqual(len(key_id), key_id_size or 8)
@ddt.data(
dict(add_previous_public_keys=True, provide_key_id=False, key_id_size=None),
dict(add_previous_public_keys=True, provide_key_id=False, key_id_size=16),
dict(add_previous_public_keys=False, provide_key_id=True, key_id_size=None),
)
@ddt.unpack
def test_command(self, add_previous_public_keys, provide_key_id, key_id_size):
command_options = dict(add_previous_public_keys=add_previous_public_keys)
if provide_key_id:
command_options['key_id'] = TEST_KEY_IDENTIFIER
if key_id_size:
command_options['key_id_size'] = key_id_size
with self._captured_output() as (output_stream, _):
with patch(LOGGER) as mock_log:
call_command(COMMAND_NAME, **command_options)
self._assert_key_output(output_stream)
self._assert_presence_of_old_keys(mock_log, add_previous_public_keys)
self._assert_presence_of_key_id(mock_log, output_stream, provide_key_id, key_id_size)
""" Utils for RSA keys"""
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives.serialization import Encoding, NoEncryption, PrivateFormat, PublicFormat
def generate_rsa_key_pair(key_size=2048):
""" Generates a public and private RSA PEM encoded key pair"""
private_key = rsa.generate_private_key(
public_exponent=65537,
key_size=key_size,
backend=default_backend()
)
private_key_str = private_key.private_bytes(Encoding.PEM, PrivateFormat.PKCS8, NoEncryption())
public_key_str = private_key.public_key().public_bytes(Encoding.PEM, PublicFormat.SubjectPublicKeyInfo)
# Not intented for programmatic use, so we print the keys out
print public_key_str
print private_key_str
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment