Post

python asynico

Python async

  • 동기 함수

python 에서 일반적인 함수를 실행하면 작업을 처리하다 return 문을 만나면 함수는 종료되고 제어권이 다시 호출자에게 주어진다. 그래서 호출자는 실행한 함수가 return 될 때 까지 기다려야 하고 이런 함수를 동기함수 라고한다.

  • 비동기 함수

동기함수와 반대로, 비동기 함수는 실행이 완료되지 않더라도 즉 return 값을 받지 않았더라도 제어권을 호출자에 넘기고 백그라운드에서 작업을 처리한다. 그리고 작업이 완료되면 호출자에게 작업이 완료되었음을 통보 한다.

비동기 처리가 필요한 경우

전통적인 Concurrency 작업은 여러개의 thread를 활용해서 처리되었지만, thread-safe한 프로그램을 만드는 것은 상당히 어려운 일이다. 그래서 하나의 쓰레드로 동시 처리를 하는 asynchronous programming이 등장했다.

용량이 매우 큰 파일을 읽어와야 할 때, 대부분의 작업은 I/O 관련 하드웨어가 하고 CPU는 대기 상태에서 입출력 이 완료되기를 기다린다. 파일의 용량이 클 수록 대기 상태에 있는 시간도 길어지게 되고 그 만큼 작업 시간이 길어지게 된다.

이럴 때 CPU가 입출력 완료를 기다리지 않고 다른 작업을 하다가, 입출력이 완료 됬다는 알림을 받고 다시 해당 파일을 처리하는 것. 이것이 비동기 처리이다.

즉, CPU가 쉬는 시간을 줄여 생산성을 높이는 것이 비동기 처리의 목적

python asynchronous

1
2
async def do_something():
	pass

def 앞에 async 를 선언하면 해당 함수는 비동기 처리되고 이런 함수를 Coroutine 이라고 부른다.

이런 비동기 함수를 일반적인 함수를 호출하듯 호출하면 coroutine 객체가 리턴됨

<coroutine object do_async at 0x1038de710>

그래서 비동기 함수는 async 가 선언된 다른 비동기 함수에서 await 키워드를 붙여서 호출해야 제대로 실행된다.

  • await : 다음으로 바로 진행하지 않고 완료 되었다는 통보가 오기 전까지 기다리며, 이벤트 루프에 다른 작업이 있다면 해당 작업을 처리하면서 대기한다,
1
2
async def another():
	await do_something()

async 가 선언되지 않은 일반 함수에서 비동기 함수를 호출하려면 아래와 같이 이벤트 루프를 이용해야 한다

1
2
3
loop = asyncio.get_event_loop()
loop.run_until_complete(main_async())
loop.close()

runloop

런루프는 무한루프라고 보면 되고, 특정한 이벤트나 콜이 발생하면 런루프에 해당 작업이 등록된다. 그리고 루프가 끝날 때 쯤 처리해야 할 함수들을 차례대로 호출해준다.

웹서버를 예로 들어보자면, 특정 웹 서버에 접속해보면 실행 직후엔 아무것도 안하고 사용자가 마우스나 키보드로 입력하기 전까지 그냥 기다린다. 이때 아무것도 안하고 기다리는 부분을 구현해 주는것이 런루프이고, 특정 이벤트가 발생하면 런루프에 해당 이벤트를 처리할 핸들러 함수가 등록되었다가 처리된다.

asynico 에서는 ensure_future() 를 실행하려면 코루틴을 걸어둘 런루프가 필요하고 비동기 작업을 처리하기 전에 꼭 런루프를 돌려야한다. 위 예시와 같이 get_event_loop 로 런루프를 얻고 실행하는 것이다.

  • asyncio.run(coroutin, debug) : 전달받은 코루틴을 실행하고, asyncio 이벤트 루프를 관리함. 이 함수는 실행될 때 마다 항상 새로운 이벤트 루프를 만들고 마지막에 이벤트 루프를 닫는다.

Example

비동기 작업 처리 순서

  1. 비동기로 처리될 작업을 코루틴으로 정의 async
  2. 런루프 생성, asyncio.get_event_loop()
  3. 런루프에 스케쥴링, asyncio.ensure_future()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import asyncio
import random

async def lazy_greet(msg, delay=1):
  print(msg, "will be displayed in", delay, "seconds")
  await asyncio.sleep(delay)
  return msg.upper()

async def main():
  messages = ['hello', 'world', 'apple', 'banana', 'cherry']
  fts = [asyncio.ensure_future(lazy_greet(m,
                random.randrange(1, 5)))
           for m in messages]
  for f in asyncio.as_completed(fts):
  x = await f
  print(x)

loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()
  1. 런루프에 main() 이 스케줄링되고 즉시 실행됨
  2. main() 함수 안에서는 fts 를 생성하면서 5개의 코루틴을 스케줄링 한다.(아직 실행 X)
  3. for 문에서 await 키워드를 만나 main() 코루틴은 잠시 멈추고 fts 내의 첫번째 코루틴인 lazy_greet('hello') 가 실행된다.
  4. 실행된 후 다음 for문에서 또 await 을 만났으니 현재 코루틴을 일시정지하고 다음 코루틴 lazt_greet('world' 로 넘어간다
  5. 3,4와 같은 방식으로 5개의 코루틴 모두 await 에서 기다리게 된다.
  6. await asyncio.sleep(delay) 때문에 처음 1초간은 모든 코루틴이 await 에서 멈춰있다ㅣ.
  7. 5개의 코루틴 중 지연시간이 가장 짧은 코루틴의 asyncio.sleep() 이 끝이나고, 해당 코루틴은 await 이 선언된 라인의 그 다음 라인부터 진행한 후 리턴한다.
  8. 이렇게 런루프로부터 먼저 끝난 코루틴이 발생할 때 마다 print(x) 가 출력된다.
    • python 3.7 +
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import asyncio
import random

async def lazy_greet(msg, delay=1):
    print(f'{msg!r} will be displayed in {delay} seconds.')
    await asyncio.sleep(delay)
    return msg.upper()

async def main():
    messages = 'hello world apple banana cherry'.split()
    cos = [lazy_greet(m, random.randrange(1, 5)) for m in messages]
    for f in asyncio.as_completed(cos):
        print(await f)

asyncio.run(main())

as_completed 를 이용하면 코루틴의 결과를 개별적으로 꺼낼 수도 있고 ,asynico.gather() 를 이용해서 한꺼번에 받을 수도 있다.

위의 예시에서 asnico.gather() 로 바꾸면 모든 작업이 끝나야 한번에 종료되고 결과를 반환한다.

Sqlalchemy async

1
2
3
4
5
6
7
8
9
10
11
12
13
from sqlalchemy.ext.asyncio import AsyncEngine, AsyncSession, create_async_engine
from sqlalchemy.orm import declarative_base
from typing import Optional

Base = declarative_base()
engine = create_async_engine('postgresql+asyncpg://user:pass@server_addr/database', echo=True, pool_pre_ping=True)

async def get_db_session() -> AsyncSession:
    sess = AsyncSession(bind=engine)
    try:
        yield sess
    finally:
        await sess.close()
  • engin , Session 이 async 버전으로 따로 존재
  • DDL을 처리할 때는 동기처리로 진행해야 하기 때문에 run_sync() 사용

database I/O 비동기 작업은 각 db 드라이버에서 지원을 해줘야 함!

  • mysql - aiomysql
  • postgresql - asyncpg

oracle, mssql은 따로 지원하는 라이브러리가 없고 아래와 같이 aioodbc사용

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import asyncio
import aioodbc

async def main():
    dsn = 'Driver={ODBC Driver 17 for SQL Server};Server=server_name;Database=database_name;Uid=username;Pwd=password;'
    conn = await aioodbc.connect(dsn=dsn)

    async with conn.cursor() as cursor:
        # Insert query
        insert_query = "INSERT INTO table_name (column1, column2) VALUES (?, ?)"
        values = ('Value 1', 'Value 2')
        await cursor.execute(insert_query, values)
        await conn.commit()

        # Update query
        update_query = "UPDATE table_name SET column1 = ? WHERE column2 = ?"
        new_value = 'New Value'
        condition = 'Value 2'
        await cursor.execute(update_query, (new_value, condition))
        await conn.commit()

        # Select query
        select_query = "SELECT * FROM table_name"
        await cursor.execute(select_query)
        rows = await cursor.fetchall()
        for row in rows:
            print(row)

    conn.close()

loop = asyncio.get_event_loop()
loop.run_until_complete(main())

scoped_session

SQLAlchemy에서의 Session은 db와의 상호작용을 추적하고 트랜잭션 관리를 담당하는 객체로, 개발자가 정의한 모델과 데이터베이스 사이의 인터페이스 역활을 한다.

  • DB session과의 차이점
    1. 추상화 수준: SQLAlchemy 세션은 ORM(객체-관계 매핑) 계층에서 동작하며, 데이터베이스와 객체 간의 매핑을 관리합니다. 반면에 DB 세션은 데이터베이스 서버의 세션을 직접 다루며, 로우 데이터를 주고받습니다.
    2. 기능: SQLAlchemy 세션은 ORM의 개념에 따라 객체의 생성, 수정, 삭제 등의 작업을 수행합니다. 이를 통해 객체와 데이터베이스 간의 변환, 일관된 트랜잭션 처리, 캐싱, 지연 로딩 등을 제공합니다. 반면에 DB 세션은 데이터베이스 서버에 대한 접근과 쿼리 실행, 트랜잭션 제어, 리소스 관리 등을 담당합니다.
    3. 관리: SQLAlchemy 세션은 개발자가 생성하고 관리합니다. 개발자는 세션을 사용하여 ORM 작업을 수행하고, 세션을 커밋(commit)하거나 롤백(rollback)하여 트랜잭션을 제어합니다. DB 세션은 데이터베이스 서버에서 자동으로 생성되며, 일반적으로 데이터베이스 연결 풀링과 같은 기능을 통해 관리됩니다.
    4. 범위: SQLAlchemy 세션은 일반적으로 애플리케이션의 논리적인 작업 단위나 요청의 수명 주기에 따라 생성되고 사용됩니다. DB 세션은 데이터베이스 서버의 세션으로서 클라이언트와 데이터베이스 서버 간의 연결을 나타냅니다.

Untitled

  • SQLAlchemy의 session은 하나의 스레드에서 비동기 방식으로 사용하기 위한 것이기 때문에, multi-thread 간에 session이 공유되는 경우에는 thread-safe 하지 못하다
  • multi-thread 환경에서 여러 스레드가 하나의 session을 공유하게 되면 많은 에러를 유발
  • 이 문제를 해결하기 위해서는 각 스레드 마다 session을 연결해주면 되는데 이 작업을 자동으로 해준ㄴ 것이 scoped_session 이다.

scoped_session 은 스레드 고유의 데이터 영역에 Session을 저장해서 trhead-safe를 보장해준다. 즉, 스레드마다 session을 만들기 때문에, 스레드끼리 session을 공유하지 않는다

1
2
3
engine = create_engine(...)
session_maker = sessionmaker(autocommit=False, autoflush=False, bind=engine)
session = scoped_session(session_factory=session_maker)

참고 블로그

https://www.daleseo.com/python-asyncio/

https://soooprmx.com/python-asycnio-에서-런루프를-기반으로-non-blocking-코드-작성하기

https://kukuta.tistory.com/345

https://cosmosproject.tistory.com/474

This post is licensed under CC BY 4.0 by the author.