멀티 프로세싱 도입 계기
지금 다니는 랩실에서 몇 십만개나 되는 파일에서 feature를 추출해야 하는 일이 있었는데, linear하게 그냥 차례대로 처리하다보니 너무 오래 걸렸다. 사실 몇 시간 정도만 걸렸으면 딱히 멀티 프로세싱 생각을 못했을 것 같은데, 1~2일 지켜보면서 파일 처리 속도를 계산해보니 거의 일주일이 걸리는 걸로 결론이 나서 도저히 안되겠다 싶어서 멀티 프로세싱을 도입했다.
파일이 몇 십만 개라는 점, 우리 랩실 서버에는 CPU가 64개나 있다는 점을 고려하면 진짜 무조건 멀티 프로세싱을 도입했어야 하는 건데, 도대체 이 생각을 왜 못했지? 싶어서 너무 후회스러웠다. AIML에서 파일 전처리하고 feature 추출하는 상황 및 환경이 멀티 프로세싱을 딱 쓰기 좋은 단계인데, 앞으로는 이런 방황을 하지 않기 위해서 과정을 정리해보고자 한다.
멀티 프로세싱 사용하기(glob, multiprocessing)
멀티 프로세싱은 파이썬 내장 라이브러리이기 때문에 따로 pip install 또는 conda install 해줄 게 없다. 바로 import 해서 사용하면 된다. 서버가 가지고 있는 CPU를 pool로 사용하고, 그 pool에다가 처리할 파일 목록과 파일을 처리할 로직이 있는 메소드를 전달해주면 끝이다.
이때 처리할 파일 목록을 파이썬 내장 라이브러리인 glob를 사용하면 매우 간편하다. 파일을 읽어올 루트 디렉토리 안에 있는 하위 디렉토리를 recursive=True 옵션만 주면 재귀적으로 모두 읽어올 수 있고, 와일드 카드를 사용해서 읽어올 파일 규칙을 쉽게 명시할 수 있다.
import multiprocessing
import os
import glob
def process_file(file):
print("feature extracting in: ", file)
...(feature 추출 로직)...
def extract_in_files_parallel(input):
# input 디렉토리에서 재귀적으로 모든 하위 디렉토리를 뒤지면서 JSON 파일만 모두 찾아 list로 반환
files = glob.glob(os.path.join(input, '**/*.json'), recursive=True)
# CPU 숫자 카운트
num_processes = multiprocessing.cpu_count()
# CPU에 나눠서 파일 전처리 수행
with multiprocessing.Pool(num_processes) as pool:
pool.map(process_file, files)
가끔 파일을 읽는 로직을 os.walk()로 for문 여러 개를 쓰거나, 파일명을 규칙적으로 명시해(예시: 0-0.txt, 0-1.txt, …) 파일까지 가는 경로를 문자열 조합으로 만드는 방식을 쓰는 경우가 있는데, 둘 다 매우 유지보수가 힘들어서 데이터 세트의 파일 규칙이나 디렉토리 깊이 모두 항상 잘 맞춰줘야 해서 힘들다. 다만 여러 스케일로 데이터 세트를 사용하는데, 항상 동일한 파일을 읽어오고 싶다면 문자열 조합 방식을 써도 좋은 것 같다. (예를 들어, 100개, 200개, 300개 class 스케일로 테스트할 건데, 100개 스케일은 항상 0~100번까지의 class를 읽어오고, 200개 스케일은 0~200번까지의 class를 읽어오고 싶은 경우)
파일 전처리 진행 정도 보기(tqdm, parmap)
위에서 사용한 pool.map()은 iterator를 반환하는 게 아니라서 진행 정도를 볼 수 없다. 그래서 위의 코드에서는 print로 로그를 찍었지만, 몇 십만 개의 파일을 처리하는데 이게 전부 터미널에 찍히면 이런 시간도 낭비가 될 수 있다. 또한 print 문을 찍는다고 해도 전체 진행 상황을 한 눈에 보는 게 아니라 불편하다.
이때 pool.imap() & tqdm을 쓰거나 아니면 parmap.map()을 쓰면 간편하게 진행 정도를 볼 수 있다. imap()은 함수 명에서 볼 수 있듯이 imap은 map()과 비슷하지만 iterator를 반환해주는 함수이다.
# 설치
pip install tqdm
import multiprocessing
from tqdm import tqdm
import os
import glob
def process_file(file):
...(feature 추출 로직)...
def extract_in_files_parallel(input):
# input 디렉토리에서 재귀적으로 모든 하위 디렉토리를 뒤지면서 JSON 파일만 모두 찾아 list로 반환
files = glob.glob(os.path.join(input, '**/*.json'), recursive=True)
# CPU 숫자 카운트
num_processes = multiprocessing.cpu_count()
with multiprocessing.Pool(num_processes) as pool:
with tqdm(total=len(files)) as pbar:
# 실행할 함수, 처리할 input
for _ in pool.imap_unordered(process_file, files):
pbar.update()
아니면 다른 라이브러리인 parmap을 쓰는 방법도 있다. parmap은 이름 그대로 map에서 pbar를 보여주는 라이브러리로, 위의 방법보다 depth가 하나 적어서 더 직관적이다.
pip install parmap
# 또는
conda install parmap
import multiprocessing
import parmap
import os
import glob
def process_file(file):
...(feature 추출 로직)...
def extract_in_files_parallel(input):
# input 디렉토리에서 재귀적으로 모든 하위 디렉토리를 뒤지면서 JSON 파일만 모두 찾아 list로 반환
files = glob.glob(os.path.join(input, '**/*.json'), recursive=True)
# CPU 숫자 카운트
num_processes = multiprocessing.cpu_count()
with multiprocessing.Pool(num_processes) as pool:
# 실행할 함수, 처리할 input, bar 표시 여부, 활용 가능한 CPU 코어수
parmap.map(process_file, files, pm_pbar=True, pm_processes=num_processes)
멀티 프로세스를 공유 메모리에 접근 가능하게 하기(Manager)
feature 추출 시에는 추출한 값을 리스트 등에 append 해줘야 한다. 그런데 문제는 멀티 프로세스로 나누어 놓은 각 프로세스들이 하나의 list를 공유해야 한다는 점이다. 이때 해당 리스트를 manager.list()로 공유 메모리로 만들어주면, 해당 리스트를 모든 멀티 프로세스가 공유 가능하다. 다만 X 리스트와 y 리스트를 따로 만들어서 append하면 feature-label 쌍이 맞지 않는 경우가 생길 수 있기 때문에, 리스트를 하나만 만들어서 X_y.append([feature, index])로 추가해준 후, 멀티 프로세싱이 다 끝나면 그 후에 X, y = zip(*X_y)로 나누어 주면 좋다.
import multiprocessing
from multiprocessing import Manager
import parmap
import os
import glob
## 설명: 공유 메모리 리스트
manager = Manager()
X_y = manager.list()
def process_file(file):
...(feature 추출 로직)...
X_y.append([feature, index])
def extract_in_files_parallel(input):
# input 디렉토리에서 재귀적으로 모든 하위 디렉토리를 뒤지면서 JSON 파일만 모두 찾아 list로 반환
files = glob.glob(os.path.join(input, '**/*.json'), recursive=True)
# CPU 숫자 카운트
num_processes = multiprocessing.cpu_count()
with multiprocessing.Pool(num_processes) as pool:
# 실행할 함수, 처리할 input, bar 표시 여부, 활용 가능한 CPU 코어수
parmap.map(process_file, files, pm_pbar=True, pm_processes=num_processes)
X, y = zip(*X_y)
X = np.array(X)
y = np.array(y)
파일 전처리 시 주의할 점(shutil)
glob 함수를 쓰면 어디까지 파일이 처리되었는지 확인하기 매우 힘들 수 있다. 하지만 파일을 전처리하다 보면 100%의 확률로 에러가 발생해서 중간에 멈출 것이므로 exception이 나면 해당 파일만 따로 다른 디렉토리로 옮기고 다른 파일로 넘어가는 에러 처리 로직을 반드시 작성해줘야 한다. 굳이 다른 디렉토리로 옮기는 이유는 나중에 에러가 난 파일만 따로 처리하고 싶어도 몇 십만 개 데이터 중에서 찾기가 매우 어렵기 때문에 그냥 복사해서 다른 디렉토리에 모아 두는 게 좋다. 따라서 try-catch 문으로 에러를 잡아서 처리한다.
파일 이동에 사용할 라이브러리인 shutil은 내장 라이브러리이기 때문에 따로 설치하지 않아도 된다.
import multiprocessing
import parmap
import os
import glob
import shutil
EXCEPTION_DIRECTORY = "/directory/for/exception"
def process_file(file):
print("feature extracting in: ", file)
...(feature 추출 로직)...
def extract_in_files_parallel(input):
# input 디렉토리에서 재귀적으로 모든 하위 디렉토리를 뒤지면서 JSON 파일만 모두 찾아 list로 반환
files = glob.glob(os.path.join(input, '**/*.json'), recursive=True)
# CPU 숫자 카운트
num_processes = multiprocessing.cpu_count()
# try-except 처리하기
try:
with multiprocessing.Pool(num_processes) as pool:
parmap.map(process_file, files, pm_pbar=True, pm_processes=num_processes)
except Exception as e:
print("Error ", e, " error occurred during processing file ", file)
# 에러 발생한 파일만 따로 옮겨주기
if not os.path.exists(EXCEPTION_DIRECTORY):
os.makedirs(EXCEPTION_DIRECTORY)
if os.path.exists(EXCEPTION_DIRECTORY):
shutil.move(file, EXCEPTION_DIRECTORY)
참고로 호출되는 함수 process_file안에 try-except 처리해봐도 이 함수는 콜백(callback) 함수이기 때문에 처리가 되지 않는다. 에러를 처리하는 로직은 호출하는 쪽에 만들어줘야 한다.
위기 해결
일주일이 걸릴 것으로 예상되었던 코드가 1시간 반만에 끝났다🚀
레퍼런스
Multiprocessing : use tqdm to display a progress bar
To make my code more "pythonic" and faster, I use multiprocessing and a map function to send it a) the function and b) the range of iterations. The implanted solution (i.e., calling tqdm
stackoverflow.com
파이썬 Multiprocessing + tqdm 활용
보통 수천~수만건의 API를 호출하거나 많은 양의 반복문을 처리할 때는 multiprocessing에서 pool.map() 함수를 활용한다.
john-analyst.medium.com
try-except command does not catch exception with multiprocessing.Pool
I'm trying to catch the exceptions I raise in a function called callback. Here's a simplification of the code. try: logger.info("This message gets logged!!!") ...
stackoverflow.com
python 멀티 프로세싱은 parmap 으로 하자.
[2020.07.24 에 남기는 글] 이 글은 레거시적인 성격이 있습니다. 파이썬에서 멀티프로세스, 쓰레드 이용하기 가장 좋은 법은 concurrent.futrues 에서의 ThreadPoolExecutor 와 ProcessPoolExecutor 를 사용하거나 as
dailyheumsi.tistory.com
'🤖AIML' 카테고리의 다른 글
| numpy에서 자주 쓰이는 함수 및 기능들 외우기 (creation, manipulation, search, math, sorting) (0) | 2025.04.18 |
|---|---|
| [LLM의 임베딩 이해하기] One-hot vector, Embedding vector, Word2Vec (0) | 2024.06.01 |
| Apple M3 Max 칩의 변화 및 M3 Ultra/M3 Extreme 칩에 대한 루머 (1) | 2024.03.29 |
| [Keras] keras.layers.GlobalAveragePooling1D 레이어 이해하기 (0) | 2024.03.11 |
| [Keras] keras.layers.Dropout 레이어 이해하기 (0) | 2024.02.21 |