Source code for embci.utils._looptask

#!/usr/bin/env python3
# coding=utf-8
#
# File: EmBCI/embci/utils/_looptask.py
# Authors: Hank <hankso1106@gmail.com>
# Create: 2019-09-24 12:52:14

# built-in
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import re
import time
import atexit
import functools
import threading
import traceback

from . import logger, get_boolean, ensure_unicode

__all__ = [
    'LoopTaskMixin', 'LoopTaskInThread', 'SkipIteration',
    'find_task_by_name', 'find_tasks_by_class'
]

_tasks = []


def _ensure_tasks_closed():
    '''
    In case of exiting Python without calling `LoopTaskMixin.close`, this
    function will be called by `atexit`. NOT for runtime usage.
    '''
    while _tasks:
        try:
            task = _tasks.pop()
            if task.started:
                logger.debug('close %s at exit' % task)
                task.close()
        except Exception:
            pass
            traceback.print_exc()
atexit.register(_ensure_tasks_closed)                              # noqa: E305


def find_task_by_name(name, cls=None, arr=None):
    if cls is not None:
        arr = find_tasks_by_class(cls, arr)
    arr = arr if arr is not None else _tasks
    maybe = set()
    for task in arr:
        for attr in ['name', '__name__']:
            if not hasattr(task, attr):
                continue
            n = getattr(task, attr)
            if n == name:
                return task
            elif name in n:
                maybe.add(task)
    return list(maybe)[0] if maybe else None


def find_tasks_by_class(cls, arr=None):
    arr = arr if arr is not None else _tasks
    return list(filter(lambda task: issubclass(task.__class__, cls), arr))


class SkipIteration(Exception):
    '''Exception used inside LoopTask to skip current iteration/loop.'''
    pass


class LoopTaskMixin(object):
    '''
    Establish a task to execute a function looply. Stream control methods are
    integrated, such as `start`, `pause`, `resume`, `close`, etc.

    Attributes
    ----------
    __flag_pause__ : Event
    __flag_close__ : Event
    __started__ : bool
        Read-only attribute by `self.started`
    __status__ : bytes
        Read-only attribute by `self.status`

    Examples
    --------
    >>> class Clock(LoopTaskMixin):
    ...     def loop_before(self):  # optional function
    ...         print('this is a simple clock')
    ...     def loop_after(self):  # optional function
    ...         print('clock end')
    ...     def loop_display_time(self, name):
    ...         print('{}: {}'.format(name, time.time()))
    ...         time.sleep(1)
    ...     def start(self):
    ...         if LoopTaskMixin.start(self) is False:
    ...             return 'this clock is already started'
    ...         self.loop(
    ...             func=self.loop_display_time, args=('MyClock', ),
    ...             before=self.loop_before, after=self.loop_after)
    >>> c = Clock()
    >>> c.start()
    this is a simple clock
    MyClock: 1556458048.83
    MyClock: 1556458049.83
    MyClock: 1556458050.83
    ^C (KeyboardInterrupt Ctrl-C)
    clock end

    Notes
    -----
    A mixin class should not be used directly. The `LoopTaskMixin.__init__`
    is used primarily for testing. But if you need to subclass this Mixin and
    use your own __init__, remember to call `LoopTaskMixin.__init__(self)` or
    `super(YOUR_CLASS, self).__init__()`. Or simply config the attributes
    correctly.

    See Also
    --------
    embci.utils.LoopTaskInThread
    embci.io.readers.BaseReader
    '''
    __name_pattern__ = re.compile(r'^LoopTask_(\d+)')

    def __init__(self):
        self.__flag_pause__ = threading.Event()
        self.__flag_close__ = threading.Event()
        self.__started__ = False
        self.__status__ = b'closed'  # use bytes for c_char_p compatiable

    def __new__(cls, *a, **k):
        obj = object.__new__(cls)
        ids = {
            int(cls.__name_pattern__.findall(task.__name__)[0])
            for task in _tasks if cls.__name_pattern__.match(task.__name__)
        }
        obj.__name__ = 'LoopTask_{:d}'.format(
            list(set(range(len(ids) + 1)).difference(ids))[0])
        _tasks.append(obj)
        return obj

    @property
    def name(self):
        return self.__name__

    @property
    def status(self):
        '''`status` of the loopTask is read-only'''
        return ensure_unicode(self.__status__)

    @property
    def started(self):
        '''`started` of the loopTask is read-only'''
        return self.__started__

    def start(self, *a, **k):
        if self.started:
            if self.status == 'paused':
                self.resume()
            return False
        self.__flag_pause__.set()
        self.__flag_close__.clear()
        self.__started__ = True
        self.__status__ = b'started'
        self.start_time = time.time()
        try:
            self.hook_before()
        except Exception:
            logger.error(traceback.format_exc())
            self.close()
            return False
        return True

    def close(self):
        if not self.started:
            return False
        try:
            self.hook_after()
        except Exception:
            logger.error(traceback.format_exc())
        self.__flag_close__.set()
        self.__flag_pause__.clear()
        # you can restart this task now
        self.__started__ = False
        self.__status__ = b'closed'
        return True

    def restart(self, *args, **kwargs):
        if self.started:
            self.close()
        return self.start(*args, **kwargs)

    def pause(self):
        if not self.started or self.status == 'paused':
            return False
        self.__flag_pause__.clear()
        self.__status__ = b'paused'
        return True

    def resume(self):
        if self.status != 'paused':
            return False
        self.__flag_pause__.set()
        self.__status__ = b'resumed'
        return True

    def hook_before(self):
        '''Hook function executed outside self.loop after start.'''
        pass

    def hook_after(self):
        '''Hook function executed outside self.loop before close.'''
        pass

    def loop_before(self):
        '''Hook function executed inside self.loop before loop task.'''
        pass

    def loop_after(self):
        '''Hook function executed inside self.loop after loop task.'''
        pass

    def loop(self, func, args=(), kwargs={}):
        try:
            assert callable(func), 'Loop function `%s` is not callable' % func
            self.loop_before()
        except Exception:
            logger.error(traceback.format_exc())
            return self.close()
        try:
            while not self.__flag_close__.is_set():
                if self.__flag_pause__.wait(2):
                    try:
                        func(*args, **kwargs)
                    except SkipIteration as e:
                        logger.warning(e)
                self.loop_actions()
        except KeyboardInterrupt:
            logger.info('KeyboardInterrupt detected.')
        except Exception:
            logger.error(traceback.format_exc())
        try:
            self.loop_after()
        except Exception:
            logger.error(traceback.format_exc())
        if self.started:
            self.close()

    def loop_actions(self):
        '''
        Hook function called inside self.loop in each iteration. May be
        overridden by a subclass / Mixin to implement any code that needs
        to be run even SkipIteration error is raised.
        '''
        pass


class LoopTaskInThread(threading.Thread, LoopTaskMixin):
    '''
    Execute a function looply in a Thread, which can be paused, resumed, even
    restarted. This is an example usage of class `embci.utils.LoopTaskMixin`.

    Examples
    --------
    >>> task = LoopTaskInThread(lambda: time.sleep(1) or print(time.time()))
    >>> repr(task)
    <LoopTaskInThread(LoopFunc: <lambda>, initial daemon 139679343671040)>
    >>> task.start()
    True
    1556458048.83
    1556458049.83
    1556458050.83
    >>> task.pause(), task.pause()
    (True, False)  # can not pause an already paused task
    >>> task.close()

    Notes
    -----
    KeyboardInterrupt is raised when SIGINT(2) is detected in default signal
    handler. But this only happens in the main thread, according to Python doc
    on module `signal`:
        Python signal handlers are always executed in the main Python thread,
        even if the signal was received in another thread. This means that
        signals can’t be used as a means of inter-thread communication.

    That means one can NOT stop a LoopTaskInThread by Ctrl-C when it is set as
    non-daemon no matter whether the main thread has stopped or not. It even
    can't be closed by LoopTask GC :func:`_ensure_tasks_closed` because python
    will never reach the end point to call exit functions. **So you MUST
    remember to close(stop) the loop task manually.**

    See Also
    --------
    embci.utils.LoopTaskMixin
    embci.io.readers.BaseReader
    '''

    def __init__(self, func, before=None, after=None, args=(), kwargs={}, **k):
        '''
        Parameters
        ----------
        func : callable object
            The function to be looply executed. But note that the return
            value of function will be omitted.
        before : callable object, optional
            Hook function executed before loop task.
        after : callable object, optional
            Hook function executed after loop task.
        args : tuple, optional
            Positional arguemnts for invocation of loop function.
        kwargs : dict, optional
            Keyword arguments for loop function invocation.
        name : str, optional
            User specified task name. Defaults to function's name.
        daemon : bool, optional
            Whether to mark the task thread as daemonic.
        '''
        if callable(before):
            self.loop_before = before
        if callable(after):
            self.loop_after = after
        self._floop_ = func
        self._fargs_, self._fkwargs_ = args, kwargs
        k.setdefault('name', 'LoopFunc(%s)' % getattr(func, '__name__', None))
        k.setdefault('daemon', True)
        self._init_thread_ = functools.partial(self._init_thread_, **k)
        self._init_thread_()
        LoopTaskMixin.__init__(self)

    def _init_thread_(self, daemon, **kwargs):
        '''Call this function to re-init thread'''
        threading.Thread.__init__(self, **kwargs)
        self.daemon = get_boolean(daemon)  # python 2 & 3 compatiable
        self._thread_inited_ = True

    def __repr__(self):
        extra = ''
        if self.daemon:
            extra += ' daemon'
        if self.ident is not None:
            extra += ' %s' % self.ident
        return '<{name} {status}{extra}>'.format(
            name=self.name, status=self.status, extra=extra)

    def start(self, *a, **k):
        if not LoopTaskMixin.start(self):
            return False
        if not self._thread_inited_:
            self._init_thread_()
        threading.Thread.start(self)
        return True

    def close(self, *a, **k):
        if not LoopTaskMixin.close(self):
            return False
        self._thread_inited_ = False
        return True

    def run(self):
        self.loop(self._floop_, self._fargs_, self._fkwargs_)


# THE END