From a01ba7621ed735c4ca3652c3b03378df28ca5c12 Mon Sep 17 00:00:00 2001 From: muhammad-ammar <mammar@gmail.com> Date: Mon, 26 Feb 2018 20:40:50 +0500 Subject: [PATCH] transcript util EDUCATOR-2131 --- .../tests/test_transcripts_utils.py | 221 +++++++++++++++++- .../xmodule/video_module/transcripts_utils.py | 136 +++++++++++ 2 files changed, 356 insertions(+), 1 deletion(-) diff --git a/cms/djangoapps/contentstore/tests/test_transcripts_utils.py b/cms/djangoapps/contentstore/tests/test_transcripts_utils.py index ae2db05e313..2a0823a0858 100644 --- a/cms/djangoapps/contentstore/tests/test_transcripts_utils.py +++ b/cms/djangoapps/contentstore/tests/test_transcripts_utils.py @@ -1,6 +1,7 @@ # -*- coding: utf-8 -*- """ Tests for transcripts_utils. """ import copy +import tempfile import ddt import json import textwrap @@ -19,7 +20,8 @@ from xmodule.contentstore.content import StaticContent from xmodule.contentstore.django import contentstore from xmodule.exceptions import NotFoundError from xmodule.modulestore.tests.django_utils import SharedModuleStoreTestCase -from xmodule.modulestore.tests.factories import CourseFactory +from xmodule.modulestore.tests.factories import CourseFactory, ItemFactory +from student.tests.factories import UserFactory from xmodule.video_module import transcripts_utils TEST_DATA_CONTENTSTORE = copy.deepcopy(settings.CONTENTSTORE) @@ -721,3 +723,220 @@ class TestVideoIdsInfo(unittest.TestCase): """ actual_result = transcripts_utils.get_video_ids_info(edx_video_id, youtube_id_1_0, html5_sources) self.assertEqual(actual_result, expected_result) + + +@ddt.ddt +class TestGetTranscript(SharedModuleStoreTestCase): + """Tests for `get_transcript` function.""" + + def setUp(self): + super(TestGetTranscript, self).setUp() + + self.course = CourseFactory.create() + + self.subs_id = 'video_101' + + self.subs_sjson = { + 'start': [100, 200, 240, 390, 1000], + 'end': [200, 240, 380, 1000, 1500], + 'text': [ + 'subs #1', + 'subs #2', + 'subs #3', + 'subs #4', + 'subs #5' + ] + } + + self.subs_srt = transcripts_utils.Transcript.convert(json.dumps(self.subs_sjson), 'sjson', 'srt') + + self.subs = { + u'en': self.subs_srt, + u'ur': transcripts_utils.Transcript.convert(json.dumps(self.subs_sjson), 'sjson', 'srt'), + } + + self.srt_mime_type = transcripts_utils.Transcript.mime_types[transcripts_utils.Transcript.SRT] + self.sjson_mime_type = transcripts_utils.Transcript.mime_types[transcripts_utils.Transcript.SJSON] + + self.user = UserFactory.create() + self.vertical = ItemFactory.create(category='vertical', parent_location=self.course.location) + self.video = ItemFactory.create(category='video', parent_location=self.vertical.location) + + def create_transcript(self, subs_id, language=u'en', filename='video.srt'): + """ + create transcript. + """ + transcripts = {} + if language != u'en': + transcripts = {language: filename} + + self.video = ItemFactory.create( + category='video', + parent_location=self.vertical.location, + sub=subs_id, + transcripts=transcripts + ) + + if subs_id: + transcripts_utils.save_subs_to_store( + self.subs_sjson, + subs_id, + self.video, + language=language, + ) + + def create_srt_file(self, content): + """ + Create srt file. + """ + srt_file = tempfile.NamedTemporaryFile(suffix=".srt") + srt_file.content_type = transcripts_utils.Transcript.SRT + srt_file.write(content) + srt_file.seek(0) + return srt_file + + def upload_file(self, subs_file, location, filename): + """ + Upload a file in content store. + + Arguments: + subs_file (File): pointer to file to be uploaded + location (Locator): Item location + filename (unicode): Name of file to be uploaded + """ + mime_type = subs_file.content_type + content_location = StaticContent.compute_location( + location.course_key, filename + ) + content = StaticContent(content_location, filename, mime_type, subs_file.read()) + contentstore().save(content) + + @ddt.data( + # en lang does not exist so NotFoundError will be raised + (u'en',), + # ur lang does not exist so KeyError and then NotFoundError will be raised + (u'ur',), + ) + @ddt.unpack + def test_get_transcript_not_found(self, lang): + """ + Verify that `NotFoundError` exception is raised when transcript is not found in both the content store and val. + """ + with self.assertRaises(NotFoundError): + transcripts_utils.get_transcript(self.course.id, self.video.location.block_id, lang=lang) + + @ddt.data( + { + 'language': u'en', + 'subs_id': 'video_101', + 'filename': 'en_video_101.srt', + }, + { + 'language': u'ur', + 'subs_id': '', + 'filename': 'ur_video_101.srt', + }, + ) + @ddt.unpack + def test_get_transcript_from_content_store(self, language, subs_id, filename): + """ + Verify that `get_transcript` function returns correct data when transcript is in content store. + """ + self.upload_file(self.create_srt_file(self.subs_srt), self.video.location, filename) + self.create_transcript(subs_id, language, filename) + content, filename, mimetype = transcripts_utils.get_transcript( + self.course.id, + self.video.location.block_id, + language + ) + + self.assertEqual(content, self.subs[language]) + self.assertEqual(filename, filename) + self.assertEqual(mimetype, self.srt_mime_type) + + def test_get_transcript_from_content_store_for_ur(self): + """ + Verify that `get_transcript` function returns correct data for non-english when transcript is in content store. + """ + language = u'ur' + self.create_transcript(self.subs_id, language) + content, filename, mimetype = transcripts_utils.get_transcript( + self.course.id, + self.video.location.block_id, + language, + output_format=transcripts_utils.Transcript.SJSON + ) + + self.assertEqual(json.loads(content), self.subs_sjson) + self.assertEqual(filename, 'ur_video_101.sjson') + self.assertEqual(mimetype, self.sjson_mime_type) + + @patch( + 'openedx.core.djangoapps.video_config.models.VideoTranscriptEnabledFlag.feature_enabled', + Mock(return_value=True), + ) + @patch('xmodule.video_module.transcripts_utils.get_video_transcript_content') + def test_get_transcript_from_val(self, mock_get_video_transcript_content): + """ + Verify that `get_transcript` function returns correct data when transcript is in val. + """ + mock_get_video_transcript_content.return_value = { + 'content': json.dumps(self.subs_sjson), + 'file_name': 'edx.sjson' + } + + content, filename, mimetype = transcripts_utils.get_transcript( + self.course.id, + self.video.location.block_id, + ) + self.assertEqual(content, self.subs_srt) + self.assertEqual(filename, 'edx.srt') + self.assertEqual(mimetype, self.srt_mime_type) + + def test_get_transcript_invalid_format(self): + """ + Verify that `get_transcript` raises correct exception if transcript format is invalid. + """ + with self.assertRaises(NotFoundError) as invalid_format_exception: + transcripts_utils.get_transcript( + self.course.id, + self.video.location.block_id, + 'ur', + output_format='mpeg' + ) + + exception_message = text_type(invalid_format_exception.exception) + self.assertEqual(exception_message, 'Invalid transcript format `mpeg`') + + def test_get_transcript_no_content(self): + """ + Verify that `get_transcript` function returns correct exception when transcript content is empty. + """ + self.upload_file(self.create_srt_file(''), self.video.location, 'ur_video_101.srt') + self.create_transcript('', 'ur', 'ur_video_101.srt') + + with self.assertRaises(NotFoundError) as no_content_exception: + transcripts_utils.get_transcript( + self.course.id, + self.video.location.block_id, + 'ur' + ) + + exception_message = text_type(no_content_exception.exception) + self.assertEqual(exception_message, 'No transcript content') + + def test_get_transcript_no_en_transcript(self): + """ + Verify that `get_transcript` function returns correct exception when no transcript exists for `en`. + """ + self.video.youtube_id_1_0 = '' + self.store.update_item(self.video, self.user.id) + with self.assertRaises(NotFoundError) as no_en_transcript_exception: + transcripts_utils.get_transcript( + self.course.id, + self.video.location.block_id, + 'en' + ) + + exception_message = text_type(no_en_transcript_exception.exception) + self.assertEqual(exception_message, 'No transcript for `en` language') diff --git a/common/lib/xmodule/xmodule/video_module/transcripts_utils.py b/common/lib/xmodule/xmodule/video_module/transcripts_utils.py index e831a67e932..f09cd0a22d5 100644 --- a/common/lib/xmodule/xmodule/video_module/transcripts_utils.py +++ b/common/lib/xmodule/xmodule/video_module/transcripts_utils.py @@ -11,9 +11,11 @@ import logging from pysrt import SubRipTime, SubRipItem, SubRipFile from pysrt.srtexc import Error from lxml import etree +from opaque_keys.edx.locator import BlockUsageLocator from HTMLParser import HTMLParser from six import text_type +from xmodule.modulestore.django import modulestore from xmodule.exceptions import NotFoundError from xmodule.contentstore.content import StaticContent from xmodule.contentstore.django import contentstore @@ -863,3 +865,137 @@ class VideoTranscriptsMixin(object): "sub": sub, "transcripts": transcripts, } + + +def get_transcript_from_val(edx_video_id, lang=None, output_format=Transcript.SRT): + """ + Get video transcript from edx-val. + Arguments: + edx_video_id (unicode): course identifier + lang (unicode): transcript language + output_format (unicode): transcript output format + Returns: + tuple containing content, filename, mimetype + """ + transcript = get_video_transcript_content(edx_video_id, lang) + if not transcript: + raise NotFoundError(u'Transcript not found for {}, lang: {}'.format(edx_video_id, lang)) + + transcript_conversion_props = dict(transcript, output_format=output_format) + transcript = convert_video_transcript(**transcript_conversion_props) + filename = transcript['filename'] + content = transcript['content'] + mimetype = Transcript.mime_types[output_format] + + return content, filename, mimetype + + +def get_transcript_for_video(video_location, subs_id, file_name, language): + """ + Get video transcript from content store. + + Arguments: + video_location (Locator): Video location + subs_id (unicode): id for a transcript in content store + file_name (unicode): file_name for a transcript in content store + language (unicode): transcript language + + Returns: + tuple containing transcript input_format, basename, content + """ + try: + content = Transcript.asset(video_location, subs_id, language).data + base_name = subs_id + input_format = Transcript.SJSON + except NotFoundError: + content = Transcript.asset(video_location, None, language, file_name).data + base_name = os.path.splitext(file_name)[0] + input_format = Transcript.SRT + + return input_format, base_name, content + + +def get_transcript_from_contentstore(video, language, output_format, youtube_id=None, is_bumper=False): + """ + Get video transcript from content store. + + Arguments: + video (Video Descriptor): Video descriptor + language (unicode): transcript language + output_format (unicode): transcript output format + youtube_id (unicode): youtube video id + is_bumper (bool): indicates bumper video + + Returns: + tuple containing content, filename, mimetype + """ + if output_format not in (Transcript.SRT, Transcript.SJSON, Transcript.TXT): + raise NotFoundError('Invalid transcript format `{output_format}`'.format(output_format=output_format)) + + transcripts_info = video.get_transcripts_info(is_bumper=is_bumper) + sub, other_languages = transcripts_info['sub'], transcripts_info['transcripts'] + transcripts = dict(other_languages) + + # this is sent in case of a translation dispatch and we need to use it as our subs_id. + if youtube_id: + transcripts['en'] = youtube_id + elif sub: + transcripts['en'] = sub + elif video.youtube_id_1_0: + transcripts['en'] = video.youtube_id_1_0 + elif language == u'en': + raise NotFoundError('No transcript for `en` language') + + try: + input_format, base_name, transcript_content = get_transcript_for_video( + video.location, + subs_id=transcripts['en'], + file_name=language and transcripts[language], + language=language + ) + except KeyError: + raise NotFoundError + + # add language prefix to transcript file only if language is not None + language_prefix = '{}_'.format(language) if language else '' + transcript_name = u'{}{}.{}'.format(language_prefix, base_name, output_format) + transcript_content = Transcript.convert(transcript_content, input_format=input_format, output_format=output_format) + + if not transcript_content.strip(): + raise NotFoundError('No transcript content') + + if youtube_id: + youtube_ids = youtube_speed_dict(video) + transcript_content = json.dumps( + generate_subs(youtube_ids.get(youtube_id, 1), 1, json.loads(transcript_content)) + ) + + return transcript_content, transcript_name, Transcript.mime_types[output_format] + + +def get_transcript(course_id, block_id, lang=None, output_format=Transcript.SRT, is_bumper=False): + """ + Get video transcript from edx-val or content store. + + Arguments: + course_id (CourseLocator): course identifier + block_id (unicode): a unique identifier for an item in modulestore + lang (unicode): transcript language + output_format (unicode): transcript output format + is_bumper (bool): indicates bumper video + + Returns: + tuple containing content, filename, mimetype + """ + usage_key = BlockUsageLocator(course_id, block_type='video', block_id=block_id) + video_descriptor = modulestore().get_item(usage_key) + + try: + return get_transcript_from_val(video_descriptor.edx_video_id, lang, output_format) + except NotFoundError: + return get_transcript_from_contentstore( + video_descriptor, + lang, + output_format=output_format, + is_bumper=is_bumper + ) -- GitLab