pythonで並列処理
python2.7で実装した際のメモ。
実装イメージ
- csvファイルの行ごとに重たい処理をするので、行ごとに並列処理したい
- 行の先頭列にはIDがあり、IDをキーに並列処理の結果を受け取って、出力したい
- csv上、IDには重複があるが、同一IDは2回処理したくない
- 各プロセスで共通して参照するデータがある。このデータはIDをキーに参照
- csvのどこまで処理が進んだか進捗を見たい
以下のサイトを参考にした。
メモ: multiprocessingを使うまで(Part 2) – Momentum
まずは、上記サイトのmultiprocess_with_instance_method.py をありがたくimportする。
from multiprocessing import Pool, Manager import multiprocess_with_instance_method
以下のようにして並列処理できる
def process(i, line, list, common_result): # lineを元にした処理 print "i=" + str(i) # 進捗を出力 cols = line.rstrip().split(",") id = cols[0] if common_result.has_key(id) == False: # 同一IDは一回のみ処理 common_result[id] = heavy_func(list[id]) return cols[1] def heavy_func(data): # 重たい処理 return data # 各プロセスの処理結果を代入する変数。各プロセスで共通参照される common_result = Manager().dict() # 各プロセスの中で参照するデータ list = {"1":"a", "2":"b", "3":"c"} #lines = open(csv_file, "r").readlines() lines = [] lines.append("1,aaa") lines.append("2,bbb") lines.append("3,ccc") p = Pool(3) result = [p.apply_async(process, args=(i, line, list, common_result)) for i, line in enumerate(lines)] p.close() p.join() # output for id, val in common_result.items(): print id + "," + val # process()でreturnした結果をresultで受け取って出力してもよい print [r.get(timeout=1) for r in result]
ちなみに、進捗を出力しても、python test.py > log & のように実行すると、バッファリングされてしまい、すぐにはファイルに出力されない。 この場合は python -u test.py > log & とすれば、すぐに出力される。
注意
並列処理するプロセスで参照するデータサイズが大きいと 並列処理のためのオーバーヘッドが大きくて、逆に遅くなるので注意。
KBレベルならいいが、100MBレベルだと厳しい。 その場合、参照データはDBにおいて、SQLでアクセスした方がよい。