Skip to content
Snippets Groups Projects
Unverified Commit f9b3c843 authored by Muhammad Ammar's avatar Muhammad Ammar Committed by GitHub
Browse files

Merge pull request #17378 from edx/ammar/add-video-pipeline-integration-management-command

add video pipeline integration management command
parents 4d7c9113 27f023f3
No related merge requests found
"""
Management command `create_video_pipeline_integration` is used to create video pipeline integration record.
"""
from openedx.core.djangoapps.video_pipeline.models import VideoPipelineIntegration
from django.core.management.base import BaseCommand
class Command(BaseCommand):
# pylint: disable=missing-docstring
help = 'Creates the video pipeline integration record.'
def add_arguments(self, parser):
parser.add_argument('client_name')
parser.add_argument('api_url')
parser.add_argument('service_username')
parser.add_argument('--enabled', dest='enabled', action='store_true')
def handle(self, **fields):
VideoPipelineIntegration.objects.get_or_create(
client_name=fields['client_name'],
api_url=fields['api_url'],
service_username=fields['service_username'],
enabled=fields['enabled'],
)
"""
Tests for create_video_pipeline_integration management command.
"""
import ddt
from django.core.management import call_command
from django.test import TestCase
from openedx.core.djangoapps.video_pipeline.models import VideoPipelineIntegration
@ddt.ddt
class CreateVideoPipelineIntegration(TestCase):
"""
Management command test class.
"""
def setUp(self):
super(CreateVideoPipelineIntegration, self).setUp()
def assert_integration_created(self, args, options):
"""
Verify that the integration record was created.
"""
integration = VideoPipelineIntegration.current()
for index, attr in enumerate(('client_name', 'api_url', 'service_username')):
self.assertEqual(args[index], getattr(integration, attr))
self.assertEqual(integration.enabled, options.get('enabled'))
@ddt.data(
(
[
'veda',
'http://veda.edx.org/api/',
'veda_service_user',
],
{'enabled': False}
),
(
[
'veda',
'http://veda.edx.org/api/',
'veda_service_user',
],
{'enabled': True}
),
)
@ddt.unpack
def test_integration_creation(self, args, options):
"""
Verify that the create_video_pipeline_integration command works as expected.
"""
call_command('create_video_pipeline_integration', *args, **options)
self.assert_integration_created(args, options)
def test_idempotency(self):
"""
Verify that the command can be run repeatedly with the same args and options without any ill effects.
"""
args = [
'veda',
'http://veda.edx.org/api/',
'veda_service_user',
]
options = {'enabled': False}
call_command('create_video_pipeline_integration', *args, **options)
self.assert_integration_created(args, options)
# Verify that the command is idempotent
call_command('create_video_pipeline_integration', *args, **options)
self.assert_integration_created(args, options)
# Verify that only one record exists
self.assertEqual(VideoPipelineIntegration.objects.count(), 1)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment