Skip to content
Snippets Groups Projects
pipeline.py 6.59 KiB
Newer Older
Chris Lattman's avatar
Chris Lattman committed
from re import findall, sub, search
from json import dump
from time import sleep, time
from pprint import pformat

import mysql.connector
from pymongo import MongoClient, DESCENDING
from parse import parse

# Get the root password of the MySQL database
with open("config.yml") as file:
    line = file.readline()
    while "MYSQL_ROOT_PASSWORD: " not in line:
        line = file.readline()
    mysql_password = line[21:].strip()

# Connect to the databases
cnx = mysql.connector.connect(user="root", password=mysql_password, host="mysql", database="openedx")
mysql_cursor = cnx.cursor()
client = MongoClient("mongodb")
mongodb_collection = client["openedx"]["modulestore.structures"]

while True:
    start_time = time()

    # Get emails, student IDs, and usernames of students (excludes default users and instructors)
    query = ("SELECT id, username, email FROM auth_user WHERE last_login IS NOT NULL AND is_superuser = 0")
    mysql_cursor.execute(query)
    student_dict = {}
    for (id, username, email) in mysql_cursor:
        student_dict[id] = [username, email]

    # Get students overall grades
    query = ("SELECT user_id, percent_grade FROM grades_persistentcoursegrade")
    mysql_cursor.execute(query)
    for (user_id, percent_grade) in mysql_cursor:
        if user_id in student_dict: # In case an instructor took quizzes to test course functionality
            student_dict[user_id].append(percent_grade)
            student_dict[user_id].append({}) # This is for storing the grades later

    # Get the last record of the MongoDB database, which is the most recent version of the course
    document = mongodb_collection.find_one(sort=[("$natural", DESCENDING)])
    formatted_document = pformat(document, indent=4)

    # Get names of quizzes and their module IDs
    quiz_names_matches = findall(
        "'fields': {.*\s+.*\s+'display_name': '.*[q|Q]uiz.*',\s+'format': .*\s+'graded': .*\s+'xml_attributes'[^}]+}}",
        formatted_document
    )
    quiz_name_to_quiz_id = {}
    quiz_id_to_quiz_name = {}
    for match in quiz_names_matches:
        elems = match.split('\n')
        display_name = elems[2].strip()
        quiz_name = parse("'display_name': '{}',", display_name)[0]
        filename = elems[6].strip()[:-2]
        quiz_id = parse("'sequential/{}.xml']", filename)[0]
        quiz_name_to_quiz_id[quiz_name] = quiz_id
        quiz_id_to_quiz_name[quiz_id] = quiz_name

    # Get module IDs of quiz questions
    quiz_questions_matches = findall(
        "'fields': {[^_]+_name': '.*[q|Q]uiz.*',\s+'xml_attributes': [^}]+}}",
        formatted_document
    )
    question_id_to_quiz_id = {}
    question_id_to_question_index = {}
    quiz_id_to_num_questions = {}
    for match in quiz_questions_matches:
        elems = match.split('\n')
        display_name = elems[-3].strip()
        quiz_name = parse("'display_name': '{}',", display_name)[0]
        question_ids = elems[2:-3]
        question_ids_length = len(question_ids)
        i = 1
        question_index = 0
        while i < question_ids_length:
            question_id = sub(r'[^a-f0-9]', "", question_ids[i].strip())
            question_id_to_quiz_id[question_id] = quiz_name_to_quiz_id[quiz_name]
            question_id_to_question_index[question_id] = question_index
            i += 2
            question_index += 1
        quiz_id_to_num_questions[quiz_name_to_quiz_id[quiz_name]] = question_index

    # Get overall grades for quizzes
    query = ("SELECT user_id, usage_key, earned_all, possible_all FROM grades_persistentsubsectiongrade")
    mysql_cursor.execute(query)
    for (user_id, usage_key, earned_all, possible_all) in mysql_cursor:
        quiz_id = search("block@.*", usage_key)[0][6:]
        if quiz_id in quiz_id_to_num_questions: # Check for bogus quiz grades
            if possible_all != 0: # Divide by zero check
                quiz_grade = round(earned_all / possible_all, 2)
                # List of 0s after quiz_grade is for points earned and points possible (in that order) for each possible question
                student_dict[user_id][3][quiz_id] = [quiz_grade, [0] * 2 * quiz_id_to_num_questions[quiz_id]]

    # Get grades for individual questions
    query = ("SELECT module_id, student_id, grade, max_grade FROM courseware_studentmodule WHERE grade IS NOT NULL")
    mysql_cursor.execute(query)
    for (module_id, student_id, grade, max_grade) in mysql_cursor:
        question_id = search("block@.*", module_id)[0][6:]
        if question_id in question_id_to_quiz_id: # Check for bogus question grades
            points_earned = round(grade, 9)
            points_possible = int(max_grade)
            grades = student_dict[student_id][3][question_id_to_quiz_id[question_id]][1]
            question_index = question_id_to_question_index[question_id]
            grades[2 * question_index] = points_earned
            grades[2 * question_index + 1] = points_possible

    # Construct JSON string and write to file
    json_data = []
    index = 0
    for student_id in student_dict:
        json_data.append({})
        json_data[index]["Email"] = student_dict[student_id][1]
        json_data[index]["Username"] = student_dict[student_id][0]
        json_data[index]["Overall Grade"] = str(student_dict[student_id][2])
        grade_data = student_dict[student_id][3]
        quiz_ids_ordered = []
        for quiz_id in grade_data:
            quiz_name = quiz_id_to_quiz_name[quiz_id]
            grades = grade_data[quiz_id]
            json_data[index]["Grade on " + quiz_name] = str(grades[0])
            quiz_ids_ordered.append(quiz_id)
        for quiz_id in quiz_ids_ordered:
            quiz_name = quiz_id_to_quiz_name[quiz_id]
            question_grades = grade_data[quiz_id][1]
            question_grades_length = len(question_grades)
            question_index = 0
            while question_index < question_grades_length:
                question_index_string = str(int(question_index / 2) + 1)
                json_data[index][quiz_name + " - Q" + question_index_string + " (Earned)"] = str(question_grades[question_index])
                json_data[index][quiz_name + " - Q" + question_index_string + " (Possible)"] = str(question_grades[question_index + 1])
                question_index += 2
        index += 1
    with open("csvToPy.json", "w") as json_file:
        dump(json_data, json_file, indent=2)

    end_time = time()

    print(end_time - start_time)

    # Ensure loop runs at a 5 second interval (above script takes less than 1 second to complete with our data)
    sleep(5 - (end_time - start_time))