Skip to content
Snippets Groups Projects
Commit a01ba762 authored by muhammad-ammar's avatar muhammad-ammar
Browse files

transcript util

EDUCATOR-2131
parent b57b813f
No related branches found
No related tags found
No related merge requests found
# -*- coding: utf-8 -*-
""" Tests for transcripts_utils. """
import copy
import tempfile
import ddt
import json
import textwrap
......@@ -19,7 +20,8 @@ from xmodule.contentstore.content import StaticContent
from xmodule.contentstore.django import contentstore
from xmodule.exceptions import NotFoundError
from xmodule.modulestore.tests.django_utils import SharedModuleStoreTestCase
from xmodule.modulestore.tests.factories import CourseFactory
from xmodule.modulestore.tests.factories import CourseFactory, ItemFactory
from student.tests.factories import UserFactory
from xmodule.video_module import transcripts_utils
TEST_DATA_CONTENTSTORE = copy.deepcopy(settings.CONTENTSTORE)
......@@ -721,3 +723,220 @@ class TestVideoIdsInfo(unittest.TestCase):
"""
actual_result = transcripts_utils.get_video_ids_info(edx_video_id, youtube_id_1_0, html5_sources)
self.assertEqual(actual_result, expected_result)
@ddt.ddt
class TestGetTranscript(SharedModuleStoreTestCase):
"""Tests for `get_transcript` function."""
def setUp(self):
super(TestGetTranscript, self).setUp()
self.course = CourseFactory.create()
self.subs_id = 'video_101'
self.subs_sjson = {
'start': [100, 200, 240, 390, 1000],
'end': [200, 240, 380, 1000, 1500],
'text': [
'subs #1',
'subs #2',
'subs #3',
'subs #4',
'subs #5'
]
}
self.subs_srt = transcripts_utils.Transcript.convert(json.dumps(self.subs_sjson), 'sjson', 'srt')
self.subs = {
u'en': self.subs_srt,
u'ur': transcripts_utils.Transcript.convert(json.dumps(self.subs_sjson), 'sjson', 'srt'),
}
self.srt_mime_type = transcripts_utils.Transcript.mime_types[transcripts_utils.Transcript.SRT]
self.sjson_mime_type = transcripts_utils.Transcript.mime_types[transcripts_utils.Transcript.SJSON]
self.user = UserFactory.create()
self.vertical = ItemFactory.create(category='vertical', parent_location=self.course.location)
self.video = ItemFactory.create(category='video', parent_location=self.vertical.location)
def create_transcript(self, subs_id, language=u'en', filename='video.srt'):
"""
create transcript.
"""
transcripts = {}
if language != u'en':
transcripts = {language: filename}
self.video = ItemFactory.create(
category='video',
parent_location=self.vertical.location,
sub=subs_id,
transcripts=transcripts
)
if subs_id:
transcripts_utils.save_subs_to_store(
self.subs_sjson,
subs_id,
self.video,
language=language,
)
def create_srt_file(self, content):
"""
Create srt file.
"""
srt_file = tempfile.NamedTemporaryFile(suffix=".srt")
srt_file.content_type = transcripts_utils.Transcript.SRT
srt_file.write(content)
srt_file.seek(0)
return srt_file
def upload_file(self, subs_file, location, filename):
"""
Upload a file in content store.
Arguments:
subs_file (File): pointer to file to be uploaded
location (Locator): Item location
filename (unicode): Name of file to be uploaded
"""
mime_type = subs_file.content_type
content_location = StaticContent.compute_location(
location.course_key, filename
)
content = StaticContent(content_location, filename, mime_type, subs_file.read())
contentstore().save(content)
@ddt.data(
# en lang does not exist so NotFoundError will be raised
(u'en',),
# ur lang does not exist so KeyError and then NotFoundError will be raised
(u'ur',),
)
@ddt.unpack
def test_get_transcript_not_found(self, lang):
"""
Verify that `NotFoundError` exception is raised when transcript is not found in both the content store and val.
"""
with self.assertRaises(NotFoundError):
transcripts_utils.get_transcript(self.course.id, self.video.location.block_id, lang=lang)
@ddt.data(
{
'language': u'en',
'subs_id': 'video_101',
'filename': 'en_video_101.srt',
},
{
'language': u'ur',
'subs_id': '',
'filename': 'ur_video_101.srt',
},
)
@ddt.unpack
def test_get_transcript_from_content_store(self, language, subs_id, filename):
"""
Verify that `get_transcript` function returns correct data when transcript is in content store.
"""
self.upload_file(self.create_srt_file(self.subs_srt), self.video.location, filename)
self.create_transcript(subs_id, language, filename)
content, filename, mimetype = transcripts_utils.get_transcript(
self.course.id,
self.video.location.block_id,
language
)
self.assertEqual(content, self.subs[language])
self.assertEqual(filename, filename)
self.assertEqual(mimetype, self.srt_mime_type)
def test_get_transcript_from_content_store_for_ur(self):
"""
Verify that `get_transcript` function returns correct data for non-english when transcript is in content store.
"""
language = u'ur'
self.create_transcript(self.subs_id, language)
content, filename, mimetype = transcripts_utils.get_transcript(
self.course.id,
self.video.location.block_id,
language,
output_format=transcripts_utils.Transcript.SJSON
)
self.assertEqual(json.loads(content), self.subs_sjson)
self.assertEqual(filename, 'ur_video_101.sjson')
self.assertEqual(mimetype, self.sjson_mime_type)
@patch(
'openedx.core.djangoapps.video_config.models.VideoTranscriptEnabledFlag.feature_enabled',
Mock(return_value=True),
)
@patch('xmodule.video_module.transcripts_utils.get_video_transcript_content')
def test_get_transcript_from_val(self, mock_get_video_transcript_content):
"""
Verify that `get_transcript` function returns correct data when transcript is in val.
"""
mock_get_video_transcript_content.return_value = {
'content': json.dumps(self.subs_sjson),
'file_name': 'edx.sjson'
}
content, filename, mimetype = transcripts_utils.get_transcript(
self.course.id,
self.video.location.block_id,
)
self.assertEqual(content, self.subs_srt)
self.assertEqual(filename, 'edx.srt')
self.assertEqual(mimetype, self.srt_mime_type)
def test_get_transcript_invalid_format(self):
"""
Verify that `get_transcript` raises correct exception if transcript format is invalid.
"""
with self.assertRaises(NotFoundError) as invalid_format_exception:
transcripts_utils.get_transcript(
self.course.id,
self.video.location.block_id,
'ur',
output_format='mpeg'
)
exception_message = text_type(invalid_format_exception.exception)
self.assertEqual(exception_message, 'Invalid transcript format `mpeg`')
def test_get_transcript_no_content(self):
"""
Verify that `get_transcript` function returns correct exception when transcript content is empty.
"""
self.upload_file(self.create_srt_file(''), self.video.location, 'ur_video_101.srt')
self.create_transcript('', 'ur', 'ur_video_101.srt')
with self.assertRaises(NotFoundError) as no_content_exception:
transcripts_utils.get_transcript(
self.course.id,
self.video.location.block_id,
'ur'
)
exception_message = text_type(no_content_exception.exception)
self.assertEqual(exception_message, 'No transcript content')
def test_get_transcript_no_en_transcript(self):
"""
Verify that `get_transcript` function returns correct exception when no transcript exists for `en`.
"""
self.video.youtube_id_1_0 = ''
self.store.update_item(self.video, self.user.id)
with self.assertRaises(NotFoundError) as no_en_transcript_exception:
transcripts_utils.get_transcript(
self.course.id,
self.video.location.block_id,
'en'
)
exception_message = text_type(no_en_transcript_exception.exception)
self.assertEqual(exception_message, 'No transcript for `en` language')
......@@ -11,9 +11,11 @@ import logging
from pysrt import SubRipTime, SubRipItem, SubRipFile
from pysrt.srtexc import Error
from lxml import etree
from opaque_keys.edx.locator import BlockUsageLocator
from HTMLParser import HTMLParser
from six import text_type
from xmodule.modulestore.django import modulestore
from xmodule.exceptions import NotFoundError
from xmodule.contentstore.content import StaticContent
from xmodule.contentstore.django import contentstore
......@@ -863,3 +865,137 @@ class VideoTranscriptsMixin(object):
"sub": sub,
"transcripts": transcripts,
}
def get_transcript_from_val(edx_video_id, lang=None, output_format=Transcript.SRT):
"""
Get video transcript from edx-val.
Arguments:
edx_video_id (unicode): course identifier
lang (unicode): transcript language
output_format (unicode): transcript output format
Returns:
tuple containing content, filename, mimetype
"""
transcript = get_video_transcript_content(edx_video_id, lang)
if not transcript:
raise NotFoundError(u'Transcript not found for {}, lang: {}'.format(edx_video_id, lang))
transcript_conversion_props = dict(transcript, output_format=output_format)
transcript = convert_video_transcript(**transcript_conversion_props)
filename = transcript['filename']
content = transcript['content']
mimetype = Transcript.mime_types[output_format]
return content, filename, mimetype
def get_transcript_for_video(video_location, subs_id, file_name, language):
"""
Get video transcript from content store.
Arguments:
video_location (Locator): Video location
subs_id (unicode): id for a transcript in content store
file_name (unicode): file_name for a transcript in content store
language (unicode): transcript language
Returns:
tuple containing transcript input_format, basename, content
"""
try:
content = Transcript.asset(video_location, subs_id, language).data
base_name = subs_id
input_format = Transcript.SJSON
except NotFoundError:
content = Transcript.asset(video_location, None, language, file_name).data
base_name = os.path.splitext(file_name)[0]
input_format = Transcript.SRT
return input_format, base_name, content
def get_transcript_from_contentstore(video, language, output_format, youtube_id=None, is_bumper=False):
"""
Get video transcript from content store.
Arguments:
video (Video Descriptor): Video descriptor
language (unicode): transcript language
output_format (unicode): transcript output format
youtube_id (unicode): youtube video id
is_bumper (bool): indicates bumper video
Returns:
tuple containing content, filename, mimetype
"""
if output_format not in (Transcript.SRT, Transcript.SJSON, Transcript.TXT):
raise NotFoundError('Invalid transcript format `{output_format}`'.format(output_format=output_format))
transcripts_info = video.get_transcripts_info(is_bumper=is_bumper)
sub, other_languages = transcripts_info['sub'], transcripts_info['transcripts']
transcripts = dict(other_languages)
# this is sent in case of a translation dispatch and we need to use it as our subs_id.
if youtube_id:
transcripts['en'] = youtube_id
elif sub:
transcripts['en'] = sub
elif video.youtube_id_1_0:
transcripts['en'] = video.youtube_id_1_0
elif language == u'en':
raise NotFoundError('No transcript for `en` language')
try:
input_format, base_name, transcript_content = get_transcript_for_video(
video.location,
subs_id=transcripts['en'],
file_name=language and transcripts[language],
language=language
)
except KeyError:
raise NotFoundError
# add language prefix to transcript file only if language is not None
language_prefix = '{}_'.format(language) if language else ''
transcript_name = u'{}{}.{}'.format(language_prefix, base_name, output_format)
transcript_content = Transcript.convert(transcript_content, input_format=input_format, output_format=output_format)
if not transcript_content.strip():
raise NotFoundError('No transcript content')
if youtube_id:
youtube_ids = youtube_speed_dict(video)
transcript_content = json.dumps(
generate_subs(youtube_ids.get(youtube_id, 1), 1, json.loads(transcript_content))
)
return transcript_content, transcript_name, Transcript.mime_types[output_format]
def get_transcript(course_id, block_id, lang=None, output_format=Transcript.SRT, is_bumper=False):
"""
Get video transcript from edx-val or content store.
Arguments:
course_id (CourseLocator): course identifier
block_id (unicode): a unique identifier for an item in modulestore
lang (unicode): transcript language
output_format (unicode): transcript output format
is_bumper (bool): indicates bumper video
Returns:
tuple containing content, filename, mimetype
"""
usage_key = BlockUsageLocator(course_id, block_type='video', block_id=block_id)
video_descriptor = modulestore().get_item(usage_key)
try:
return get_transcript_from_val(video_descriptor.edx_video_id, lang, output_format)
except NotFoundError:
return get_transcript_from_contentstore(
video_descriptor,
lang,
output_format=output_format,
is_bumper=is_bumper
)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment