Skip to content

Latest commit

 

History

History
422 lines (347 loc) · 14.9 KB

cEP-0002.md

File metadata and controls

422 lines (347 loc) · 14.9 KB

Next Gen Bear Design

Metadata
cEP 2
Version 1.1.3
Title Next gen bear design
Authors Udayan Tandon [email protected]
Status Implementation Due
Type Process

Abstract

This document describes how the next generation bears will be designed and implemented.

This cEP describes a new implementation of bears in coala which should make them simpler and make their design more intuitive and better.

Introduction

Currently the bear design is clunky and bad. The current design makes global bears dependant on local bears through its queuing mechanism. It uses communication signals over a queue to indicate that local bears are done with their execution and global bears can begin. The queueing mechanism doesn't work very well as it sometimes has synchronization problems. Also if local bears fail to run it stalls the global bears. Also we have a permanent logger thread. An extra logger thread consumes time and resources of the system which we can avoid.

The new proposal aims to eliminate the use of the queuing control signals utilising asynchronous I/O and process pool to generate results from the bears. No longer will the local bears be dependent on global bears to finish. The logging will also be handled by the main thread itself instead of utilising an extra thread.

Proposed Change

Here is the detailed implementation stepwise:

  1. A ProcessPoolExecutor is created with the maximum number of processes your CPU can run in parallel.
  2. We proceed with getting an asynchronous event loop. This asynchronous event loop creates Concurrent.Futures for tasks and schedules them to the process pool.
  3. Each task is given a callback when it has a result ready. This callback is responsible for processing the result accordingly.
  4. The Bear class will get two new methods. execute_task() and generate_tasks().
    • The generate_tasks() is resposible for generating the correct arguments for the upcoming tasks.
    • The execute_task() method is scheduled by the event loop to the process pool using the arguments from the generate_tasks() method.
  5. Each bear will generate the following type of tasks:
    • Global bears fire tasks that process all files.
    • Local bears fire tasks for each individual file.
  6. A bear that has dependencies will only be scheduled to the process pool once all its dependencies have been resolved. It's done in the following way:
    • It's checked if the bear has some dependencies.
    • If they haven't been resolved the bear is not scheduled.
    • As soon as the dependencies are resolved for a bear it is removed from the dependency dictionary and scheduled to the ProcessPoolExecutor.
  7. A special purpose Dependencies class will be introduced to do the above mentioned task. This will be responsible for keeping track of dependencies. Whenever a bear completes it will be removed from the dependency class.
  8. When all the tasks are done the event loop is stopped and closed.
  9. You would not need to change any bears because coala will handle the backwards compatibility. There will be new versions of LocalBear and GlobalBear that will be called FileWiseBear and ProjectWideBear respectively.

The File Proxy

With the new bear design we are also moving to a new wrapper object to wrap every file. The file proxy is an object that contains properties about the file currently processed (such as the filename). It contains:

  • The filename, this is the starting point for every other attribute.
  • The content of the file as a string.
  • The content of the file, line-splitted.

Note: Other properties can be included later as and when required.

A prototype of the file proxy:

from coala_utils.decorators import generate_eq


@generate_eq("filename")
class FileProxy:
    def __init__(self, filename):
        self.filename = filename
        with open(self.filename) as filehandle:
            self.content = filehandle.read()
        self.lines = self.content.splitlines(True)

    def __iter__(self):
        return iter(self.lines)

    def __hash__(self):
        return hash(self.filename)

    def __str__(self):
        return self.content

Dependency Tracking

A new class will be responsible for keeping track of the dependencies of each bear and it will work in the following manner:

  • The list of all the bears is passed to the class.
  • It takes this list and runs a method check_circular_dependency on it to check if any bears in the list have circular dependencies. If they do then we stop execution.
  • Each bear will add its dependencies to this tracker class.
  • Whenever a bear completes its execution the resolve() method of the new class will be called with this bear as an argument.
  • This would mark this bear as complete in the dependency tracker.
  • Once the marking is complete the tracker returns all bears which have no dependencies remaining as pending, so that their tasks can be scheduled to the process pool.

Here is a prototype:

from collections import defaultdict


class CircularDependencyError(Exception):

    def __init__(self, bears):
        """
        Creates the CircularDependencyError with a helpful message about the
        dependency.
        """
        bear_names = (bear.name for bear in bears)
        super(CircularDependencyError, self).__init__(
            "Circular dependency detected: " + " -> ".join(bear_names))


class Dependencies:
    def __init__(self):
        self.dependency_dict = defaultdict(list)
        self.dependency_set = set()

    def _add_dependency(self, bear_instance, dependency):
        self.dependency_dict[bear_instance].append(dependency)
        self.dependency_set.add(dependency)

    def add_bear_dependencies(self, bears):
        """
        Take the full bear list and check it for circular dependencies. Then
        continue to add the dependencies of the bears to the class to be
        resolved later.

        :param bears: List of all bears.
        """
        self.check_circular_dependency(bears)
        for bear_instance in bears:
            for bear in bear_instance.BEAR_DEPS:
                self._add_dependency(bear_instance, bear)

    @staticmethod
    def check_circular_dependency(bears, resolved_bears=[], seen=[]):
        for bear in bears:
            if bear in resolved_bears:
                continue

            missing = bear.missing_dependencies(resolved_bears)
            if not missing:
                resolved_bears.append(bear)
                continue

            if bear in seen:
                seen.append(bear)
                raise CircularDependencyError(seen)

            resolved_bears = self.check_circular_dependency(
                missing, resolved_bears, seen + [bear])
            resolved_bears.append(bear)
            seen.remove(bear)  # Already resolved, no candidate for circular dep

        return resolved_bears

    def resolve(self, bear_instance):
        """
        When a bear completes this method is called with the instance of that
        bear. The method deletes this bear from the list of dependencies of
        each bear in the dependency dictionary. It returns the bears which
        have all of its dependencies resolved.

        :param bear_instance: The instance of the bear.
        :return:              Returns a list of bears whose dependencies were
                              all resolved and are ready to be scheduled.
        """
        return_bears = []
        bear_type = type(bear_instance)
        if bear_type in self.dependency_set:
            for bear in self.dependency_dict:
                if bear_type in self.dependency_dict[bear]:
                    self.dependency_dict[bear].remove(bear_type)
                    if not self.dependency_dict[bear]:
                        return_bears.append(bear)
        for bear in return_bears:
            del self.dependency_dict[bear]
        return return_bears

A Prototype

A rough look at how the new LocalBear class implementation will look:

from coalib.bears.Bear import Bear


class FileWiseBear(Bear):
    def __init__(self, section, file_set):
        """
        Constructs a new bear.

        :param section:  The section object where bear settings are
                         contained.
        :param file_set: A set of FileProxy objects.
        """
        self.section = section
        self.file_set = file_set

        # May raise RuntimeError so bear doesn't get executed on invalid params
        self.kwargs = get_kwargs_for_function(self.analyze, section)

    def execute_task(self, task, kwargs):
        """
        This method is responsible for getting results from the
        analysis routines and populating the result queue.
        """
        return list(self.analyze(task, **kwargs))

    def analyze(self,
                file,
                *args,
                dependency_results=None,
                **kwargs):
        """
        Handles the given file.

        :param file_proxy: Object containing filename and contents.
        :return:           An iterable of Result.
        """
        raise NotImplementedError("This function has to be implemented for a "
                                  "runnable bear.")

    def generate_tasks(self):
        """
        This method is responsible for providing the files.
        """
        return ((file, self.kwargs) for file in self.file_set)

A test bear:

from coalib.results.Result import Result


class TestBear(FileWiseBear):
    def analyze(self, file):
        """
        Handles the given file.

        :param file_proxy: Object containing filename and contents.
        :return:           An iterable of Result.
        """
        for i, line in enumerate(file):
            # This will contain the analysis routine
            yield Result.from_values(origin=self,
                                     message="Line has a problem",
                                     file=file.name,
                                     line=i)

A rough idea of how the new bear will be called:

import asyncio
import concurrent.futures
import logging
import sys
from coalib.collecting.Collectors import collect_files
from Fileproxy import FileProxy
from TestBear import TestBear
from AnotherBear import AnotherBear
from Dependencies import Dependencies
from DependentBear import DependentBear
from AnotherDependentBear import AnotherDependentBear
import functools
import multiprocessing


def get_cpu_count():
    try:
        return multiprocessing.cpu_count()
    # cpu_count is not implemented for some CPU architectures/OSes
    except NotImplementedError:  # pragma: no cover
        return 2


def schedule_bears(bears,
                   dep_resolver,
                   event_loop,
                   blocking_tasks,
                   executor):
    """
    Schedules the tasks of bears to the process pool and run the event loop to
    complete those tasks.

    :param bears:          A list of bears to be scheduled onto the process
                           pool.
    :param dep_resolver:   The object that keeps track of dependencies.
    :param event_loop:     The asyncio event loop.
    :param blocking_tasks: Dictionary that keeps track of the remaining tasks
                           of each bear.
    :param executor:       The process pool to which the bear tasks are
                           scheduled.
    """
    for bear in bears:
        if bear not in dep_resolver.dependency_dict:
            blocking_tasks[bear] = [
                event_loop.run_in_executor(executor,
                                           bear.execute_task,
                                           task[0],
                                           task[1])
                for task in bear.generate_tasks()]
            for task in blocking_tasks[bear]:
                task.add_done_callback(functools.partial(finish_task,
                                                         bear,
                                                         dep_resolver,
                                                         blocking_tasks,
                                                         event_loop,
                                                         executor))


def finish_task(bear,
                dep_resolver,
                blocking_tasks,
                event_loop,
                executor,
                task):
    """
    The callback for when a task of a bear completes. It is responsible for
    checking if the bear completed its execution and the handling of the
    result generated by the task.

    :param bear:           The bear that the task belongs to.
    :param dep_resolver:   The object that keeps track of dependencies.
    :param blocking_tasks: Dictionary that keeps track of the remaining tasks
                           of each bear.
    :param event_loop:     The asyncio event loop.
    :param executor:       The process pool to which the bear tasks are
                           scheduled.
    :param task:           The task that completed.
    """
    log = logging.getLogger('Consumer task')
    log.info(bear)
    for result in task.result():
        log.info(result)
    blocking_tasks[bear].remove(task)
    if not blocking_tasks[bear]:
        resolved_bears = dep_resolver.resolve(bear)
        if resolved_bears:
            schedule_bears(resolved_bears,
                           dep_resolver,
                           event_loop,
                           blocking_tasks,
                           executor)
        del blocking_tasks[bear]
    if not blocking_tasks:
        event_loop.stop()


def main():
    # Configure logging to show the name of the thread where the log
    # message originates.
    logging.basicConfig(
        level=logging.INFO,
        format='%(process)5s %(name)18s: %(message)s',
        stream=sys.stderr,
    )

    # Create a limited thread pool.
    executor = concurrent.futures.ProcessPoolExecutor(
        max_workers=get_cpu_count(),
    )
    event_loop = asyncio.get_event_loop()

    file_list = collect_files(["~/Documents/gsoc/coala/*.py",
                               "~/Documents/gsoc/coala/coalib/*.py",
                               "~/Documents/gsoc/coala/coalib/**/*.py"], None)

    fileproxies = [FileProxy(file) for file in file_list]

    bears = [TestBear(fileproxies),
             AnotherBear(fileproxies),
             AnotherDependentBear(fileproxies),
             DependentBear(fileproxies)]

    blocking_tasks = {}
    dep_resolver = Dependencies()
    dep_resolver.add_bear_dependencies(bears)

    schedule_bears(bears,
                   dep_resolver,
                   event_loop,
                   blocking_tasks,
                   executor)
    try:
        event_loop.run_forever()
    finally:
        event_loop.close()