멀티 프로세싱 도입 계기
지금 다니는 랩실에서 몇 십만개나 되는 파일에서 feature를 추출해야 하는 일이 있었는데, linear하게 그냥 차례대로 처리하다보니 너무 오래 걸렸다. 사실 몇 시간 정도만 걸렸으면 딱히 멀티 프로세싱 생각을 못했을 것 같은데, 1~2일 지켜보면서 파일 처리 속도를 계산해보니 거의 일주일이 걸리는 걸로 결론이 나서 도저히 안되겠다 싶어서 멀티 프로세싱을 도입했다.
파일이 몇 십만 개라는 점, 우리 랩실 서버에는 CPU가 64개나 있다는 점을 고려하면 진짜 무조건 멀티 프로세싱을 도입했어야 하는 건데, 도대체 이 생각을 왜 못했지? 싶어서 너무 후회스러웠다. AIML에서 파일 전처리하고 feature 추출하는 상황 및 환경이 멀티 프로세싱을 딱 쓰기 좋은 단계인데, 앞으로는 이런 방황을 하지 않기 위해서 과정을 정리해보고자 한다.
멀티 프로세싱 사용하기(glob, multiprocessing)
멀티 프로세싱은 파이썬 내장 라이브러리이기 때문에 따로 pip install
또는 conda install
해줄 게 없다. 바로 import 해서 사용하면 된다. 서버가 가지고 있는 CPU를 pool로 사용하고, 그 pool에다가 처리할 파일 목록과 파일을 처리할 로직이 있는 메소드를 전달해주면 끝이다.
이때 처리할 파일 목록을 파이썬 내장 라이브러리인 glob
를 사용하면 매우 간편하다. 파일을 읽어올 루트 디렉토리 안에 있는 하위 디렉토리를 recursive=True
옵션만 주면 재귀적으로 모두 읽어올 수 있고, 와일드 카드를 사용해서 읽어올 파일 규칙을 쉽게 명시할 수 있다.
import multiprocessing
import os
import glob
def process_file(file):
print("feature extracting in: ", file)
...(feature 추출 로직)...
def extract_in_files_parallel(input):
# input 디렉토리에서 재귀적으로 모든 하위 디렉토리를 뒤지면서 JSON 파일만 모두 찾아 list로 반환
files = glob.glob(os.path.join(input, '**/*.json'), recursive=True)
# CPU 숫자 카운트
num_processes = multiprocessing.cpu_count()
# CPU에 나눠서 파일 전처리 수행
with multiprocessing.Pool(num_processes) as pool:
pool.map(process_file, files)
가끔 파일을 읽는 로직을 os.walk()
로 for문 여러 개를 쓰거나, 파일명을 규칙적으로 명시해(예시: 0-0.txt, 0-1.txt, …
) 파일까지 가는 경로를 문자열 조합으로 만드는 방식을 쓰는 경우가 있는데, 둘 다 매우 유지보수가 힘들어서 데이터 세트의 파일 규칙이나 디렉토리 깊이 모두 항상 잘 맞춰줘야 해서 힘들다. 다만 여러 스케일로 데이터 세트를 사용하는데, 항상 동일한 파일을 읽어오고 싶다면 문자열 조합 방식을 써도 좋은 것 같다. (예를 들어, 100개, 200개, 300개 class 스케일로 테스트할 건데, 100개 스케일은 항상 0~100번까지의 class를 읽어오고, 200개 스케일은 0~200번까지의 class를 읽어오고 싶은 경우)
파일 전처리 진행 정도 보기(tqdm, parmap)
위에서 사용한 pool.map()
은 iterator를 반환하는 게 아니라서 진행 정도를 볼 수 없다. 그래서 위의 코드에서는 print로 로그를 찍었지만, 몇 십만 개의 파일을 처리하는데 이게 전부 터미널에 찍히면 이런 시간도 낭비가 될 수 있다. 또한 print 문을 찍는다고 해도 전체 진행 상황을 한 눈에 보는 게 아니라 불편하다.
이때 pool.imap()
& tqdm
을 쓰거나 아니면 parmap.map()
을 쓰면 간편하게 진행 정도를 볼 수 있다. imap()
은 함수 명에서 볼 수 있듯이 imap은 map()
과 비슷하지만 iterator를 반환해주는 함수이다.
# 설치
pip install tqdm
import multiprocessing
from tqdm import tqdm
import os
import glob
def process_file(file):
...(feature 추출 로직)...
def extract_in_files_parallel(input):
# input 디렉토리에서 재귀적으로 모든 하위 디렉토리를 뒤지면서 JSON 파일만 모두 찾아 list로 반환
files = glob.glob(os.path.join(input, '**/*.json'), recursive=True)
# CPU 숫자 카운트
num_processes = multiprocessing.cpu_count()
with multiprocessing.Pool(num_processes) as pool:
with tqdm(total=len(files)) as pbar:
# 실행할 함수, 처리할 input
for _ in pool.imap_unordered(process_file, files):
pbar.update()
아니면 다른 라이브러리인 parmap
을 쓰는 방법도 있다. parmap
은 이름 그대로 map
에서 pbar를 보여주는 라이브러리로, 위의 방법보다 depth가 하나 적어서 더 직관적이다.
pip install parmap
# 또는
conda install parmap
import multiprocessing
import parmap
import os
import glob
def process_file(file):
...(feature 추출 로직)...
def extract_in_files_parallel(input):
# input 디렉토리에서 재귀적으로 모든 하위 디렉토리를 뒤지면서 JSON 파일만 모두 찾아 list로 반환
files = glob.glob(os.path.join(input, '**/*.json'), recursive=True)
# CPU 숫자 카운트
num_processes = multiprocessing.cpu_count()
with multiprocessing.Pool(num_processes) as pool:
# 실행할 함수, 처리할 input, bar 표시 여부, 활용 가능한 CPU 코어수
parmap.map(process_file, files, pm_pbar=True, pm_processes=num_processes)
멀티 프로세스를 공유 메모리에 접근 가능하게 하기(Manager)
feature 추출 시에는 추출한 값을 리스트 등에 append 해줘야 한다. 그런데 문제는 멀티 프로세스로 나누어 놓은 각 프로세스들이 하나의 list를 공유해야 한다는 점이다. 이때 해당 리스트를 manager.list()
로 공유 메모리로 만들어주면, 해당 리스트를 모든 멀티 프로세스가 공유 가능하다. 다만 X 리스트와 y 리스트를 따로 만들어서 append하면 feature-label 쌍이 맞지 않는 경우가 생길 수 있기 때문에, 리스트를 하나만 만들어서 X_y.append([feature, index])
로 추가해준 후, 멀티 프로세싱이 다 끝나면 그 후에 X, y = zip(*X_y)
로 나누어 주면 좋다.
import multiprocessing
from multiprocessing import Manager
import parmap
import os
import glob
## 설명: 공유 메모리 리스트
manager = Manager()
X_y = manager.list()
def process_file(file):
...(feature 추출 로직)...
X_y.append([feature, index])
def extract_in_files_parallel(input):
# input 디렉토리에서 재귀적으로 모든 하위 디렉토리를 뒤지면서 JSON 파일만 모두 찾아 list로 반환
files = glob.glob(os.path.join(input, '**/*.json'), recursive=True)
# CPU 숫자 카운트
num_processes = multiprocessing.cpu_count()
with multiprocessing.Pool(num_processes) as pool:
# 실행할 함수, 처리할 input, bar 표시 여부, 활용 가능한 CPU 코어수
parmap.map(process_file, files, pm_pbar=True, pm_processes=num_processes)
X, y = zip(*X_y)
X = np.array(X)
y = np.array(y)
파일 전처리 시 주의할 점(shutil)
glob 함수를 쓰면 어디까지 파일이 처리되었는지 확인하기 매우 힘들 수 있다. 하지만 파일을 전처리하다 보면 100%의 확률로 에러가 발생해서 중간에 멈출 것이므로 exception이 나면 해당 파일만 따로 다른 디렉토리로 옮기고 다른 파일로 넘어가는 에러 처리 로직을 반드시 작성해줘야 한다. 굳이 다른 디렉토리로 옮기는 이유는 나중에 에러가 난 파일만 따로 처리하고 싶어도 몇 십만 개 데이터 중에서 찾기가 매우 어렵기 때문에 그냥 복사해서 다른 디렉토리에 모아 두는 게 좋다. 따라서 try-catch
문으로 에러를 잡아서 처리한다.
파일 이동에 사용할 라이브러리인 shutil
은 내장 라이브러리이기 때문에 따로 설치하지 않아도 된다.
import multiprocessing
import parmap
import os
import glob
import shutil
EXCEPTION_DIRECTORY = "/directory/for/exception"
def process_file(file):
print("feature extracting in: ", file)
...(feature 추출 로직)...
def extract_in_files_parallel(input):
# input 디렉토리에서 재귀적으로 모든 하위 디렉토리를 뒤지면서 JSON 파일만 모두 찾아 list로 반환
files = glob.glob(os.path.join(input, '**/*.json'), recursive=True)
# CPU 숫자 카운트
num_processes = multiprocessing.cpu_count()
# try-except 처리하기
try:
with multiprocessing.Pool(num_processes) as pool:
parmap.map(process_file, files, pm_pbar=True, pm_processes=num_processes)
except Exception as e:
print("Error ", e, " error occurred during processing file ", file)
# 에러 발생한 파일만 따로 옮겨주기
if not os.path.exists(EXCEPTION_DIRECTORY):
os.makedirs(EXCEPTION_DIRECTORY)
if os.path.exists(EXCEPTION_DIRECTORY):
shutil.move(file, EXCEPTION_DIRECTORY)
참고로 호출되는 함수 process_file
안에 try-except
처리해봐도 이 함수는 콜백(callback) 함수
이기 때문에 처리가 되지 않는다. 에러를 처리하는 로직은 호출하는 쪽에 만들어줘야 한다.
위기 해결
일주일이 걸릴 것으로 예상되었던 코드가 1시간 반만에 끝났다🚀
레퍼런스
'🤖AIML' 카테고리의 다른 글
[LLM의 임베딩 이해하기] One-hot vector, Embedding vector, Word2Vec (0) | 2024.06.01 |
---|---|
Apple M3 Max 칩의 변화 및 M3 Ultra/M3 Extreme 칩에 대한 루머 (1) | 2024.03.29 |
[Keras] keras.layers.GlobalAveragePooling1D 레이어 이해하기 (0) | 2024.03.11 |
[Keras] keras.layers.Dropout 레이어 이해하기 (0) | 2024.02.21 |
[Keras] keras.layers.Flatten 레이어 이해하기 (0) | 2024.02.19 |