Source code for pedal.command_line.modes

"""
Simple enumeration of the Modes available for Pedal's command line.
"""
import json
import os
import sys
import traceback
from contextlib import redirect_stdout
from io import StringIO
import unittest
from textwrap import indent
from unittest.mock import patch
from pprint import pprint
import warnings
import argparse

from pedal.command_line.report import StatReport
from pedal.command_line.verify import generate_report_out, ReportVerifier
from pedal.core.report import MAIN_REPORT
from pedal.core.submission import Submission
from pedal.utilities.files import normalize_path, find_possible_filenames
from pedal.utilities.progsnap import SqlProgSnap2
from pedal.utilities.text import chomp
from pedal.resolvers.export import PedalJSONEncoder, clean_json

try:
    from tqdm import tqdm
except ImportError:
    def tqdm(values):
        """ Trivial helper function to replace TQDM's behavior """
        yield from values


# Disable all warnings from student files
# TODO: Move this strictly into ICS and Submission execution
warnings.filterwarnings("ignore")


def is_progsnap(path):
    """ Determines if this is a ProgSnap file or folder. """
    if os.path.isfile(path):
        name, extension = os.path.splitext(path)
        return extension in ('.zip', '.db')
    else:
        # Does it have all the necessary components?
        return os.path.isfile(os.path.join(path, 'MainTable.csv'))


def get_python_files(paths):
    result = {}
    for path in paths:
        name, ext = os.path.splitext(path)
        if ext != ".py":
            continue
        with open(path, 'r') as submission_file:
            contents = submission_file.read()
        result[path] = contents
    return result


class BundleResult:
    """
    Represents the result of running an instructor control script on a submission.
    This includes not only the output and error, but also the resolution of the feedback (aka
    the final feedback). Also includes the data that was generated during the execution.
    """
    def __init__(self, data, output, error, resolution):
        self.data = data
        self.output = output
        self.error = error
        self.resolution = resolution

    def to_json(self):
        resolution = self.resolution.copy() if self.resolution else {}
        #if 'considered' in resolution:
        #    for c in resolution['considered']:
        #        if 'fields' in c:
        #            del c['fields']
        return dict(
            output=self.output,
            error=self.error,
            **resolution
        )

class Bundle:
    """
    Represents the combination of an instructor control script and a submission that it is
    being run on. Also includes the environment that the script is being run in, and the
    result of the execution (once the execution is finished). Finally, also includes the configuration
    that was used to run the bundle.
    """
    def __init__(self, config, script, submission):
        self.config = config
        self.script = script
        self.submission = submission
        self.environment = None
        self.result = None

    def to_json(self):
        return dict(
            script=self.script,
            submission=self.submission.to_json(),
            environment=self.environment,
            result=self.result.to_json()
        )

    def run_ics_bundle(self, resolver='resolve', skip_tifa=False, skip_run=False):
        """
        Runs the given instructor control script on the given submission, with the
        accompany contextualizations.
        """
        ics_args = [self.submission, self.environment]
        captured_output = StringIO()
        global_data = {}
        error = None
        resolution = None
        if self.environment:
            env = __import__('pedal.environments.' + self.environment,
                             fromlist=[''])
            global_data.update(env.setup_environment(self.submission,
                                                     skip_tifa=skip_tifa,
                                                     skip_run=skip_run,
                                                     threaded=self.config.threaded).fields)
        else:
            MAIN_REPORT.contextualize(self.submission)
        with redirect_stdout(captured_output):
            with patch.object(sys, 'argv', ics_args):
                try:
                    grader_exec = compile(self.script,
                                          self.submission.instructor_file, 'exec')
                    exec(grader_exec, global_data)
                    if 'MAIN_REPORT' in global_data:
                        if not global_data['MAIN_REPORT'].resolves:
                            if resolver in global_data:
                                resolution = global_data[resolver]()
                            # TODO: Need more elegance/configurability here
                            elif self.config.resolver == 'resolve':
                                exec("from pedal.resolvers import print_resolve", global_data)
                                resolution = global_data["print_resolve"]()
                        else:
                            resolution = global_data['MAIN_REPORT'].resolves[-1]
                except Exception as e:
                    error = e
        actual_output = captured_output.getvalue()
        self.result = BundleResult(global_data, actual_output, error, resolution)


class AbstractPipeline:
    """
    Generic pipeline for handling all the phases of executing instructor
    control scripts on submissions, and reformating the output.
    Should be subclassed instead of used directly.
    """

    def __init__(self, config):
        if isinstance(config, dict):
            # TODO: Include default argument automatically
            config = argparse.Namespace(**config)
        self.config = config
        self.submissions = []
        self.result = None

    def execute(self):
        self.load_submissions()
        self.setup_execution()
        self.run_control_scripts()
        return self.process_output()

    def load_file_submissions(self, scripts):
        # Get instructor control scripts
        all_scripts = []
        for script in scripts:
            script_file_name, script_file_extension = os.path.splitext(script)
            # Single Python file
            if script_file_extension in ('.py',):
                with open(script, 'r') as scripts_file:
                    scripts_contents = scripts_file.read()
                all_scripts.append((script, scripts_contents))
        given_submissions = self.config.submissions
        # If submission is a directory, use it as a directory adjacent to each ics
        if os.path.isdir(given_submissions):
            for script, scripts_contents in all_scripts:
                directory_pattern = given_submissions
                submission_dir = normalize_path(directory_pattern, script)
                submission_files = [
                    os.path.join(submission_dir, sub)
                    for sub in os.listdir(submission_dir)
                ]
                subs = get_python_files(submission_files)
                for main_file, main_code in subs.items():
                    new_submission = Submission(
                        main_file=main_file, main_code=main_code,
                        instructor_file=script
                    )
                    self.submissions.append(Bundle(self.config, scripts_contents, new_submission))
        # Otherwise, if the submission is a single file:
        # Maybe it's a Progsnap DB file?
        elif given_submissions.endswith('.db'):
            for script, scripts_contents in all_scripts:
                self.load_progsnap(given_submissions, instructor_code=scripts_contents)
        # Otherwise, must just be a single python file.
        else:
            main_file = given_submissions
            load_error, possible_load_error = None, None
            alternatives = [given_submissions]
            # if alternative filenames given, we'll queue them up
            if self.config.alternate_filenames:
                alternatives.extend(find_possible_filenames(self.config.alternate_filenames))
            # Run through all possible filenames
            for possible in alternatives:
                try:
                    with open(possible, 'r') as single_submission_file:
                        main_code = single_submission_file.read()
                        main_file = possible
                    break
                except OSError as e:
                    # Only capture the first possible load error
                    if possible_load_error is None:
                        possible_load_error = e
            else:
                # Okay, file does not exist. Load error gets triggered.
                main_code = None
                load_error = possible_load_error
            for script, scripts_contents in all_scripts:
                new_submission = Submission(
                    main_file=main_file, main_code=main_code,
                    instructor_file=script, load_error=load_error
                )
                self.submissions.append(Bundle(self.config, scripts_contents, new_submission))
            return load_error

    progsnap_events_map = {
        'run': 'Run.Program',
        'compile': 'Compile',
        'edit': 'File.Edit',
        'last': 'File.Edit'
    }

    def load_progsnap(self, path, instructor_code=None):
        script_file_name, script_file_extension = os.path.splitext(path)
        if script_file_extension in ('.db',):
            with SqlProgSnap2(path, cache=self.config.cache) as progsnap:
                if self.config.progsnap_profile:
                    progsnap.set_profile(self.config.progsnap_profile)
                link_filters = {}
                include_scripts = self.config.include_scripts
                if include_scripts:
                    link_filters['Assignment'] = {}
                    if include_scripts.startswith('name='):
                        link_filters['Assignment']['X-Name'] = include_scripts[5:]
                    elif include_scripts.startswith('id='):
                        link_filters['Assignment']['AssignmentId'] = include_scripts[3:]
                    elif include_scripts.startswith('url='):
                        link_filters['Assignment']['X-URL'] = include_scripts[4:]
                    else:
                        link_filters['Assignment']['X-URL'] = include_scripts
                event_type = self.progsnap_events_map[self.config.progsnap_events]
                events = progsnap.get_events(event_filter={'EventType': event_type},
                                             link_filters=link_filters, limit=self.config.limit)
            if self.config.progsnap_events == 'last':
                runs_by_user_assign = {}
                for event in sorted(events, key=lambda e: e['event_id']):
                    key = (event[progsnap.PROFILES[progsnap.profile]['link_primary']['user']], event['assignment_name'])
                    runs_by_user_assign[key] = event
                events = list(runs_by_user_assign.values())
            for event in events:
                if instructor_code is None:
                    instructor_code_for_this_run = event['on_run']
                else:
                    instructor_code_for_this_run = instructor_code
                link_selections = progsnap._merge('link_selections', {})
                new_submission = Submission(
                    main_file='answer.py',
                    main_code=event['submission_code'] if isinstance(event['submission_code'], str) else
                        event['submission_code'].decode('utf-8'),
                    instructor_file='instructor.py',
                    #files={'cisc106.py': 'from bakery import *'},
                    execution=dict(client_timestamp=event['client_timestamp'],
                                   event_id=event['event_id']),
                    #user=dict(email=event['student_email'],
                    #          first=event['student_first'],
                    #          last=event['student_last']),
                    user={key: event[key] for key in link_selections['Subject'].values()}
                    if 'Subject' in link_selections else {'id': event['subject_id']},
                    assignment=dict(name=event['assignment_name'],
                                    url=event['assignment_url']),
                )
                self.submissions.append(Bundle(self.config, instructor_code_for_this_run, new_submission))
            # raise ValueError("TODO: ProgSnap DB files not yet supported")
            # Progsnap Zip
        elif script_file_extension in ('.zip',):
            raise ValueError("TODO: Zip files not yet supported")

    def load_submissions(self):
        given_script = self.config.instructor
        if self.config.ics_direct:
            # TODO: Allow non-progsnap ics_direct
            self.load_progsnap(self.config.submissions, instructor_code=given_script)
        elif is_progsnap(given_script):
            self.load_progsnap(given_script)
        elif os.path.isfile(given_script):
            self.load_file_submissions([given_script])
        elif os.path.isdir(given_script):
            python_files = os.listdir(given_script)
            self.load_file_submissions(python_files)
        else:
            potential_filenames = list(find_possible_filenames(given_script))
            for filename in potential_filenames:
                load_error = self.load_file_submissions([filename])
                if load_error is None:
                    return
            from pedal.source.feedbacks import source_file_not_found
            source_file_not_found(potential_filenames[0], False)

    def setup_execution(self):
        for bundle in self.submissions:
            bundle.environment = self.config.environment

    def run_control_scripts(self):
        for bundle in self.submissions:
            bundle.run_ics_bundle(resolver=self.config.resolver,
                                  skip_tifa=self.config.skip_tifa,
                                  skip_run=self.config.skip_run)

    def process_output(self):
        for bundle in self.submissions:
            print(bundle.submission.instructor_file,
                  bundle.submission.main_file,
                  bool(bundle.result.error))


[docs] class FeedbackPipeline(AbstractPipeline): """ ``feedback``: Pipeline for running the instructor control script on a submission and then printing the resolver output to the console. Often the most useful if you are trying to deliver the feedback without a grade. """ def process_output(self): for bundle in self.submissions: #print(bundle.submission.instructor_file, # bundle.submission.main_file) if bundle.result.error: print(bundle.result.error) elif bundle.result.resolution: # report = bundle.result.data['MAIN_REPORT'] #print(bundle.result.resolution.title) #print(bundle.result.resolution.message) print(bundle.result.output) else: print("Error: No feedback determined.")
[docs] class RunPipeline(AbstractPipeline): """ ``run``: Pipeline for running the instructor control script on a submission and generating a report file in the `ini` file format. This is a simple file format that has a lot of the interesting fields. The file is not actually dumped to the filesystem, but instead printed directly. So this is a good way to run students' code in a sandbox and see what comes out. """ def process_output(self): for bundle in self.submissions: #print(bundle.submission.instructor_file, # bundle.submission.main_file) if bundle.result.error: print(bundle.result.error) elif bundle.result.resolution: report = bundle.result.data['MAIN_REPORT'] generate_report_out(None, self.config.environment, report) else: print("No feedback determined.")
[docs] class StatsPipeline(AbstractPipeline): """ ``stats``: Pipeline for running the instructor control script on a submission and then dumping a JSON report with all the feedback objects. This is useful for analyzing the feedback objects in a more programmatic way. """ def run_control_scripts(self): for bundle in tqdm(self.submissions): bundle.run_ics_bundle(resolver='stats_resolve', skip_tifa=self.config.skip_tifa, skip_run=self.config.skip_run) def process_output(self): total = 0 errors = 0 final = [] for bundle in self.submissions: if bundle.result.error: print(bundle.result.error) errors += 1 else: final.append(bundle.to_json()) total += 1 final = clean_json(final) if self.config.output is not None: #print(final) print("Total Processed:", total) print("Errors:", errors) pedal_json_encoder = PedalJSONEncoder(indent=2, skipkeys=True) if self.config.output == 'stdout': print(pedal_json_encoder.encode(final)) else: with open(self.config.output, 'w') as output_file: print(pedal_json_encoder.encode(final), file=output_file) return StatReport(final)
[docs] class VerifyPipeline(AbstractPipeline): """ ``verify``: Pipeline for running the instructor control script on a submission and then comparing the output to an expected output file. This is useful for verifying that the feedback is correct (at least, as correct as the expected output). You can also use this pipeline to generate the output files, to quickly create regression "tests" of your feedback scripts. """ def process_output(self): for bundle in self.submissions: bundle.run_ics_bundle(resolver=self.config.resolver, skip_tifa=self.config.skip_tifa, skip_run=self.config.skip_run) if bundle.result.error: raise bundle.result.error # Now get the expected output base = os.path.splitext(bundle.submission.main_file)[0] test_name = os.path.basename(base) if os.path.exists(base + ".out"): verifier = ReportVerifier(base + ".out", self.config.environment) self.add_case(test_name, bundle.result, verifier) else: if self.config.create_output: report = bundle.result.data['MAIN_REPORT'] generate_report_out(base + ".out", self.config.environment, report) self.skip_case(test_name) else: self.error_case(test_name, bundle.result) self.run_cases() def __init__(self, config): super().__init__(config) class TestReferenceSolutions(unittest.TestCase): __module__ = 'VerifyTestCase' __qualname__ = config.instructor_name maxDiff = None def __str__(self): return f"{config.instructor}, using {self._testMethodName[5:]}.py" # return f"{self.id().__name__} (Test{config.instructor})" def __repr__(self): return str(self) self.TestReferenceSolutions = TestReferenceSolutions self.tests = [] def add_case(self, name, bundle_result, verifier): """ Add this ``name`` as a new test. """ if bundle_result.output.strip(): print(bundle_result.output) error = bundle_result.error if bundle_result.error else None if error: print(error) if 'MAIN_REPORT' not in bundle_result.data: # TODO: Better error message here raise Exception("No MAIN_REPORT found; did you import Pedal?") actual = bundle_result.data['MAIN_REPORT'].result.to_json() error = bundle_result.error expected_fields = verifier.get_final() def new_test(self): if error: raise error entire_actual, entire_expected = "", "" differing_fields = [] for field, value in expected_fields: self.assertIn(field, actual, msg="Field was not in feedback.") actual_value = chomp(str(actual[field]).strip()) expected_value = chomp(str(value)) entire_actual += field + ": " + actual_value + "\n" entire_expected += field + ": " + expected_value + "\n" if expected_value != actual_value: differing_fields.append(field) differing_fields = ', '.join(f"'{field}'" for field in differing_fields) self.assertEqual(entire_expected, entire_actual, msg=f"Wrong value for {differing_fields} in {name}.") setattr(self.TestReferenceSolutions, "test_" + name, new_test) self.tests.append("test_" + name) def skip_case(self, name): def new_test(self): self.skipTest("Output did not exist; created.") setattr(self.TestReferenceSolutions, "test_" + name, new_test) self.tests.append("test_" + name) def error_case(self, name, bundle_result): def new_test(self): self.fail( "Expected output file was not found next to the instructor file. Perhaps you meant to use --create_output?") setattr(self.TestReferenceSolutions, "test_" + name, new_test) self.tests.append("test_" + name) def run_cases(self): suite = unittest.defaultTestLoader.loadTestsFromTestCase(self.TestReferenceSolutions) runner = unittest.TextTestRunner() runner.run(suite)
[docs] class GradePipeline(AbstractPipeline): """ ``grade``: Pipeline for running the instructor control script on a submission and then outputing the grade to the console. This is useful for quickly grading a set of submissions. The instructor file, student data, and assignment are also all printed out in the following CSV format: instructor_file, student_file, student_email, assignment_name, score, correct """ def process_output(self): if self.config.output == 'stdout': self.print_bundles(sys.stdout) else: with open(self.config.output, 'w') as output_file: self.print_bundles(output_file) def print_bundles(self, target): #print(len(self.submissions)) for bundle in self.submissions: print(bundle.result.output, file=target) if bundle.result.error: raise bundle.result.error # This info is not sent to the output target, just to stdout print(bundle.submission.instructor_file, bundle.submission.main_file, bundle.submission.user.get('student_email') if bundle.submission.user else 'Unknown User', bundle.submission.assignment.get('name') if bundle.submission.assignment else 'Unknown Assignment', 1 if bundle.result.resolution.correct else bundle.result.resolution.score, sep=", ", end="")
[docs] class SandboxPipeline(AbstractPipeline): """ ``sandbox``: Pipeline for running ONLY the student's code, and then outputing the results to the console. There is no instructor control script logic, although the Source tool does check that the student's code is syntactically correct. Otherwise, the students' code is run in a Sandbox mode. This is useful if you just want to safely execute student code and observe their output. """ ICS = """from pedal import * verify() run()""" def load_submissions(self): # Use first argument as student submissions given_script = self.config.instructor # ... either a single file if os.path.isfile(given_script): scripts = [given_script] # ... or a directory of files else: scripts = os.listdir(given_script) # Then create submission to run for script in scripts: script_file_name, script_file_extension = os.path.splitext(script) if script_file_extension in ('.py',): load_error = None try: with open(script, 'r') as scripts_file: scripts_contents = scripts_file.read() except Exception as e: load_error = e new_submission = Submission( main_file="answer.py", main_code=scripts_contents, instructor_file=script, load_error=load_error ) self.submissions.append(Bundle(self.config, self.ICS, new_submission)) def process_output(self): # Print output # Print runtime exceptions, if any for bundle in self.submissions: if bundle.result.error: traceback.print_tb(bundle.result.error.__traceback__) print(bundle.result.error) continue sandbox = bundle.result.data['MAIN_REPORT']['sandbox']['sandbox'] output = [sandbox.raw_output] if sandbox.feedback: output.append(sandbox.feedback.message) print("\n".join(output))
[docs] class DebugPipeline(AbstractPipeline): """ ``debug``: Pipeline for running the instructor control script on a submission and then outputing the full results to the console. This is useful for debugging the instructor control script, as it will show the full output, error, all of the feedback objects considered, and the final feedback. """ def process_output(self): for bundle in self.submissions: print(bundle.submission.instructor_file, bundle.submission.main_file) print("****** Student Code:") print(indent(bundle.submission.main_code, " ")) print("****** Results") if bundle.result.error: print(bundle.result.error) else: print("Output:") print(indent(bundle.result.output, " " * 4)) report = bundle.result.data['MAIN_REPORT'] print("Feedback:") for feedback in report.feedback: print("\t", feedback, repr(feedback.fields)) print("Final Feedback") pprint(report.result.to_json())
class MODES: """ The possible modes that Pedal can be run in. """ # Runs the instructor control script and outputs all feedback RUN = 'run' # Get just the feedback for submissions FEEDBACK = 'feedback' # Get just the score/correctness GRADE = 'grade' # Check against expected output VERIFY = 'verify' # Output all the feedback objects STATS = 'stats' # Run the file as student code using the sandbox SANDBOX = 'sandbox' # Output as much information as possible DEBUG = 'debug' PIPELINES = { RUN: RunPipeline, FEEDBACK: FeedbackPipeline, GRADE: GradePipeline, STATS: StatsPipeline, SANDBOX: SandboxPipeline, VERIFY: VerifyPipeline, DEBUG: DebugPipeline }