Newer
Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
from re import findall, sub, search
from json import dump
from time import sleep, time
from pprint import pformat
import mysql.connector
from pymongo import MongoClient, DESCENDING
from parse import parse
# Get the root password of the MySQL database
with open("config.yml") as file:
line = file.readline()
while "MYSQL_ROOT_PASSWORD: " not in line:
line = file.readline()
mysql_password = line[21:].strip()
# Connect to the databases
cnx = mysql.connector.connect(user="root", password=mysql_password, host="mysql", database="openedx")
mysql_cursor = cnx.cursor()
client = MongoClient("mongodb")
mongodb_collection = client["openedx"]["modulestore.structures"]
while True:
start_time = time()
# Get emails, student IDs, and usernames of students (excludes default users and instructors)
query = ("SELECT id, username, email FROM auth_user WHERE last_login IS NOT NULL AND is_superuser = 0")
mysql_cursor.execute(query)
student_dict = {}
for (id, username, email) in mysql_cursor:
student_dict[id] = [username, email]
# Get students overall grades
query = ("SELECT user_id, percent_grade FROM grades_persistentcoursegrade")
mysql_cursor.execute(query)
for (user_id, percent_grade) in mysql_cursor:
if user_id in student_dict: # In case an instructor took quizzes to test course functionality
student_dict[user_id].append(percent_grade)
student_dict[user_id].append({}) # This is for storing the grades later
# Get the last record of the MongoDB database, which is the most recent version of the course
document = mongodb_collection.find_one(sort=[("$natural", DESCENDING)])
formatted_document = pformat(document, indent=4)
# Get names of quizzes and their module IDs
quiz_names_matches = findall(
"'fields': {.*\s+.*\s+'display_name': '.*[q|Q]uiz.*',\s+'format': .*\s+'graded': .*\s+'xml_attributes'[^}]+}}",
formatted_document
)
quiz_name_to_quiz_id = {}
quiz_id_to_quiz_name = {}
for match in quiz_names_matches:
elems = match.split('\n')
display_name = elems[2].strip()
quiz_name = parse("'display_name': '{}',", display_name)[0]
filename = elems[6].strip()[:-2]
quiz_id = parse("'sequential/{}.xml']", filename)[0]
quiz_name_to_quiz_id[quiz_name] = quiz_id
quiz_id_to_quiz_name[quiz_id] = quiz_name
# Get module IDs of quiz questions
quiz_questions_matches = findall(
"'fields': {[^_]+_name': '.*[q|Q]uiz.*',\s+'xml_attributes': [^}]+}}",
formatted_document
)
question_id_to_quiz_id = {}
question_id_to_question_index = {}
quiz_id_to_num_questions = {}
for match in quiz_questions_matches:
elems = match.split('\n')
display_name = elems[-3].strip()
quiz_name = parse("'display_name': '{}',", display_name)[0]
question_ids = elems[2:-3]
question_ids_length = len(question_ids)
i = 1
question_index = 0
while i < question_ids_length:
question_id = sub(r'[^a-f0-9]', "", question_ids[i].strip())
question_id_to_quiz_id[question_id] = quiz_name_to_quiz_id[quiz_name]
question_id_to_question_index[question_id] = question_index
i += 2
question_index += 1
quiz_id_to_num_questions[quiz_name_to_quiz_id[quiz_name]] = question_index
# Get overall grades for quizzes
query = ("SELECT user_id, usage_key, earned_all, possible_all FROM grades_persistentsubsectiongrade")
mysql_cursor.execute(query)
for (user_id, usage_key, earned_all, possible_all) in mysql_cursor:
if user_id in student_dict: # In case an instructor took quizzes to test course functionality
quiz_id = search("block@.*", usage_key)[0][6:]
if quiz_id in quiz_id_to_num_questions: # Check for bogus quiz grades
if possible_all != 0: # Divide by zero check
quiz_grade = round(earned_all / possible_all, 2)
# List of 0s after quiz_grade is for points earned and points possible (in that order) for each possible question
student_dict[user_id][3][quiz_id] = [quiz_grade, [0] * 2 * quiz_id_to_num_questions[quiz_id]]
# Get grades for individual questions
query = ("SELECT module_id, student_id, grade, max_grade FROM courseware_studentmodule WHERE grade IS NOT NULL")
mysql_cursor.execute(query)
for (module_id, student_id, grade, max_grade) in mysql_cursor:
if student_id in student_dict: # In case an instructor took quizzes to test course functionality
question_id = search("block@.*", module_id)[0][6:]
if question_id in question_id_to_quiz_id: # Check for bogus question grades
points_earned = round(grade, 9)
points_possible = int(max_grade)
grades = student_dict[student_id][3][question_id_to_quiz_id[question_id]][1]
question_index = question_id_to_question_index[question_id]
grades[2 * question_index] = points_earned
grades[2 * question_index + 1] = points_possible
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
# Construct JSON string and write to file
json_data = []
index = 0
for student_id in student_dict:
json_data.append({})
json_data[index]["Email"] = student_dict[student_id][1]
json_data[index]["Username"] = student_dict[student_id][0]
json_data[index]["Overall Grade"] = str(student_dict[student_id][2])
grade_data = student_dict[student_id][3]
quiz_ids_ordered = []
for quiz_id in grade_data:
quiz_name = quiz_id_to_quiz_name[quiz_id]
grades = grade_data[quiz_id]
json_data[index]["Grade on " + quiz_name] = str(grades[0])
quiz_ids_ordered.append(quiz_id)
for quiz_id in quiz_ids_ordered:
quiz_name = quiz_id_to_quiz_name[quiz_id]
question_grades = grade_data[quiz_id][1]
question_grades_length = len(question_grades)
question_index = 0
while question_index < question_grades_length:
question_index_string = str(int(question_index / 2) + 1)
json_data[index][quiz_name + " - Q" + question_index_string + " (Earned)"] = str(question_grades[question_index])
json_data[index][quiz_name + " - Q" + question_index_string + " (Possible)"] = str(question_grades[question_index + 1])
question_index += 2
index += 1
with open("csvToPy.json", "w") as json_file:
dump(json_data, json_file, indent=2)
end_time = time()
print(end_time - start_time)
# Ensure loop runs at a 5 second interval (above script takes less than 1 second to complete with our data)