Bất đồng bộ trong Python - Event loop | Phần 2

Thứ hai, ngày 29 tháng 3 năm 2021

Một trong những khái niệm quan trọng trong lập trình là event loop hay vòng lặp sự kiện. Thật không quá khi nói rằng, vòng lặp sự kiện là trái tim của lập trình bất đồng bộ trong các ngôn ngữ như Python hay Javascript, ...

Đây là bài viết tiếp theo của mình trong chuỗi bài viết về lập trình bất đồng bộ trong Python. Các bạn có thể theo dõi bài viết trước của mình về coroutine trước khi đọc bài viết này nhé. Giờ thì cùng bắt đầu thôi, Let's go

Một event loop (hay một vòng lặp sự kiện) là một vòng lặp (vô hạn hoặc hữu hạn), nó duyệt qua tất cảc các task và thực thi chúng. Vòng lặp sự kiện cũng có vai trò duy trì các task này trong một hàng đợi (queue). Sau khi bạn tạo một task, nó sẽ được event loop đẩy vào hàng đợi này để được lập lịch và thực thi.

Hình bên dưới cho ta thấy cái nhìn cơ bản của một vòng lặp sự kiện làm việc như thế nào. Nó rất đơn giản để hiểu phải không nào :smile:

Event loop

Nhớ lại trong bài viết trước, mình nói là các coroutine có thể ghi nhớ vị trí mà nó trả quyền điều khiển chương trình cho lời gọi coroutine đó và nó có thể tái khởi động tại vị trí đó trong lần gọi tiếp theo.

Một hàng đợi sẽ chứa (maintain) các coroutine và với mỗi coroutine sẽ được thực thi đoạn code nằm giữa 2 lời gọi yield (đây chính là trap trong OS) bởi vòng lặp sự kiện.

Trạng thái của các coroutine sẽ được lưu lại trong chính không gian bộ nhớ tĩnh của coroutine đó và sau đó coroutine sẽ được đẩy vào queue (chờ đến lượt tiếp theo nếu nó chưa kết thúc).

Vòng lặp sự kiện sẽ dừng chỉ khi hàng đợi này rỗng hoặc nó bị terminate giữa chừng, việc này sẽ phụ thuộc người lập trình cũng như policy của loại vòng lặp sự kiện cụ thể.

Đến đây các bạn có thể hỏi policy của vòng lặp sự kiện là gì? Để đơn giản, mình sẽ giải thích khái niệm này trong bài viết tiếp theo :smile:

Làm thế nào mà vòng lặp sự kiện có thể thực thi và duy trì các task?

Vòng lặp sự kiện với một task (coroutine)

Sau khi task thực thi xong một phần (chưa kết thúc), nó sẽ trả quyền điều khiển về cho người gọi (vòng lặp sự kiện) sau khi đã thay đổi và lưu trạng thái nội tại của nó. Lúc này, vòng lặp sự kiện đã có quyền điều khiển chương trình và sẽ đẩy task vào trong hàng đợi.

OK, bay giờ chúng ta thử viết một chương trình bất đồng bằng các sử dụng một generator (dạng đơn giản của coroutine).

Để phục vụ cho những yêu cầu phức tạp say này, chúng ta sẽ bọc coroutine trong một class như sau:

class Task:
    def __init__(self, waiter=None):
        self._task = iter(self)
        self._waiter = waiter

    def __iter__(self):
        yield self

    def execute(self):
        try:
            ret = next(self._task)
        except StopIteration:
            return self._waiter
        else:
            return ret

Bạn có thể thấy rằng, mỗi khi gọi exceute task, coroutine sẽ được thực thi (ret = next(self._task).

Nêú task thực sự kết thúc, nó sẽ nói với event loop rằng "Tôi đã thực thi xong rồi, anh hãy lập lịch cho những người đang chờ tôi nhé" (return self._waiter)

Nếu task đó chưa thực sự xong (chờ kết quả từ socket chảng hạn), nó sẽ nói là "Tôi chưa xong đâu, tôi còn phải chờ lấy dữ liệu, anh hay lập lịch cho tôi để tôi thực thi sau nhé"

OK, task sẽ chỉ làm những việc đơn giản như vậy thôi :smile:

Bây giờ, chúng ta sẽ định nghĩa một EventLoop class, nó nên đơn giản để chúng ta có thể hiểu một cách dễ dàng

import queue

class EventLoop:
    def __init__(self):
        self._queue = queue.Queue()

    def push_task(self, *tasks):
        for task in tasks:
            self._queue.put(task)

    def _run_by(self, stop_fn):
        _queue = self._queue
        while not stop_fn():
            task = _queue.get()
            next_task = task.execute()
            if next_task:
                self.push_task(next_task)

    def run_until_complete(self):
        self._run_by(self._queue.empty)

    def run_forever(self):
        self._run_by(lambda: False)

Công việc của EventLoop cũng cực kì đơn giản.

push_task là một hàm lập lịch và đơn giản nó chỉ đẩy task cần lập lịch vào hàng đợi.

_run_by cũng khá dễ hiểu, nó sẽ lấy các task từ hàng đợi và thực thi, nếu có task cần lập lịch nó sẽ lập lịch cho task đó đế khi gặp điều kiện dừng (stop_fn) thì dừng lại hay trong Python sẽ là closed.

Sau khi đã chuẩn bị xong, chúng ta có thể viết một chương trình bất đồng bộ đơn giản như sau:

import time

current_loop = None

class Sleep(Task):
    def __init__(self, *args, n=0, **kwargs):
        super().__init__(*args, **kwargs)
        self._n = n
        self._start = time.time()

    def __iter__(self):
        while 1:
            if time.time() - self._start > self._n:
                return self._waiter
            yield self

class Ping(Task):
    def __iter__(self):
        print('Start ping')
        yield Sleep(self, n=3)
        print('End ping')

class Pong(Task):
    def __iter__(self):
        print('Start pong')
        yield Sleep(self, n=2)
        print('End pong')

current_loop = EventLoop()
current_loop.push_task(Ping(), Pong())
current_loop.run_until_complete()

Các bạn có thể thấy, đoạn code trên sẽ tương đương với đoạn code async/await ở dưới đây :smile:

import asyncio

def ping():
    print('Start ping')
    await asyncio.sleep(3)
    print('End ping')

def pong():
    print('Start pong')
    await asyncio.sleep(2)
    print('End pong')

task = asyncio.gather(ping(), pong())
asyncio.run(task)

Và đây là kết quả

Start ping
Start pong
End pong
End ping

Bài tập nhỏ nhưng hiệu quả lớn:

  • Bạn hãy so sánh awaityield
  • Trong đoạn code viết bằng async/await, đâu là waiter?

Trả lời được hai câu hỏi đó là bạn đã hiểu khá kĩ về vòng lặp sự kiện cũng như bản chất của từ khoá async/await rồi đó.

Hẹn gặp lại các bạn trong các bài viết sau nha :smile:, bye bye.