[컴][파이썬] python 의 Future 가 동작하는 방법




python 3 에서 Future 의 동작 정리


python 3 의 Future 라는 녀석이 있다. 이녀석은 그냥 data 를 나르는 class 정도로 여기면 된다. c 에서 이야기하는 structure 같은 느낌으로 말이다.

그런데 python 의 code 에서 이녀석이 동작하는 방식이 library 안으로 들어가 있어서 언뜻 잘 이해가 안되다.

그래서 동작하는 순서대로 소스를 정리해봤다.


thread 와 future 의 flow


간단한 예제

간단한 아래 예제 소스를 한 번 보자.(출처 : Python: A quick introduction to the concurrent.futures module | Abu Ashraf Masnun)

아래 소스는 간단하다. pool.submit 을 하면 thread 가 만들어지고, thread 가 만들어지면서 future 를 return 해준다. 그러면 thread 가 동작을 하다가 동작을 완료하면 state 를 변경하고, future.set_result() 를 통해 결과를 future 에 넣어준다. 그러면 그 결과를 future.result() 를 통해 가져오게 되는 것이다.

future 를 share 해서 다른 thread 의 결과값을 가져온다고 생각하면 될 것 같다. 자세한 동작은 다음을 보자.

from concurrent.futures import ThreadPoolExecutor
from time import sleep
 
def return_after_5_secs(message):
    sleep(5)
    return message


 
pool = ThreadPoolExecutor(3)
 
future = pool.submit(return_after_5_secs, ("hello"))
print(future.done())
sleep(5)
print(future.done())
print(future.result())



동작

아래 소스를 쭉 따라가보면 최종적으로 _WorkItem.run() 에서 self.future.set_result(result) 를 해서 future 에 result 를 넣어주게 된다.

그래서 우리는 이 future 로 현재 동작하고 있는 thread 의 결과를 쉽게 얻어오게 된다. 이것은 특별한 방법은 아니지만, worker thread 와 main thread 와의 data 전달을 하는 방법을 future 라는 것으로 규격화했다. 그래서 개념만 잘 익힌다면, programming 을 쉽게(?) 할 수 있다.

이런 pattern 이전에 썼던 방법들---queue 를 이용하던지, 아니면 공유되는 variable 을 사용하던지 하는 방법등---을 생각해 보면 이해가 쉬울 듯 하다.


# thread.py

class ThreadPoolExecutor(_base.Executor):
    def __init__(self, max_workers):
        ...
        self._work_queue = queue.Queue()
        ...

    def submit(self, fn, *args, **kwargs):
        with self._shutdown_lock:
            if self._shutdown:
                raise RuntimeError('cannot schedule new futures after shutdown')

            f = _base.Future()
            w = _WorkItem(f, fn, args, kwargs)

            self._work_queue.put(w)
            self._adjust_thread_count()
            return f

    def _adjust_thread_count(self):
        # When the executor gets lost, the weakref callback will wake up
        # the worker threads.
        def weakref_cb(_, q=self._work_queue):
            q.put(None)
        # TODO(bquinlan): Should avoid creating new threads if there are more
        # idle threads than items in the work queue.
        if len(self._threads) < self._max_workers:
            t = threading.Thread(target=_worker,
                                 args=(weakref.ref(self, weakref_cb),
                                       self._work_queue))
            t.daemon = True
            t.start()
            self._threads.add(t)
            _threads_queues[t] = self._work_queue

class _WorkItem(object):
    def __init__(self, future, fn, args, kwargs):
        self.future = future
        self.fn = fn
        self.args = args
        self.kwargs = kwargs

    def run(self):
        if not self.future.set_running_or_notify_cancel():
            return

        try:
            result = self.fn(*self.args, **self.kwargs)
        except BaseException as e:
            self.future.set_exception(e)
        else:
            self.future.set_result(result)

            
def _worker(executor_reference, work_queue):
    try:
        while True:
            work_item = work_queue.get(block=True)
            if work_item is not None:
                work_item.run()
                # Delete references to object. See issue16284
                del work_item
                continue
            executor = executor_reference()
            # Exit if:
            #   - The interpreter is shutting down OR
            #   - The executor that owns the worker has been collected OR
            #   - The executor that owns the worker has been shutdown.
            if _shutdown or executor is None or executor._shutdown:
                # Notice other workers
                work_queue.put(None)
                return
            del executor
    except BaseException:
        _base.LOGGER.critical('Exception in worker', exc_info=True)






# threading.py
class Thread:
    ...
    def __init__(self, group=None, target=None, name=None,
                 args=(), kwargs=None, *, daemon=None):
        ...
        self._target = target
        ...
    def start(self):
        ...
           _start_new_thread(self._bootstrap, ())
        ...
        self._started.wait()

    def _bootstrap(self):
        try:
            self._bootstrap_inner()
        except:
            if self._daemonic and _sys is None:
                return
            raise

    def _bootstrap_inner(self):
        try:
            ...
            try:
                self.run()
            except SystemExit:
                ...

    def run(self):
        try:
            if self._target:
                self._target(*self._args, **self._kwargs)
        finally:
            del self._target, self._args, self._kwargs


# thread.py




댓글 없음:

댓글 쓰기