pandazx's blog

Hadoop, データ分析など雑多な技術ブログ

pythonで並列処理

python2.7で実装した際のメモ。

実装イメージ

  • csvファイルの行ごとに重たい処理をするので、行ごとに並列処理したい
  • 行の先頭列にはIDがあり、IDをキーに並列処理の結果を受け取って、出力したい
  • csv上、IDには重複があるが、同一IDは2回処理したくない
  • 各プロセスで共通して参照するデータがある。このデータはIDをキーに参照
  • csvのどこまで処理が進んだか進捗を見たい

以下のサイトを参考にした。

メモ: multiprocessingを使うまで(Part 2) – Momentum

まずは、上記サイトのmultiprocess_with_instance_method.py をありがたくimportする。

from multiprocessing import Pool, Manager
import multiprocess_with_instance_method

以下のようにして並列処理できる

def process(i, line, list, common_result):
    # lineを元にした処理
    print "i=" + str(i) # 進捗を出力
    cols = line.rstrip().split(",")
    id = cols[0]
    if common_result.has_key(id) == False:
        # 同一IDは一回のみ処理
        common_result[id] = heavy_func(list[id])
    return cols[1]

def heavy_func(data):
    # 重たい処理
    return data

# 各プロセスの処理結果を代入する変数。各プロセスで共通参照される
common_result = Manager().dict()

# 各プロセスの中で参照するデータ
list = {"1":"a", "2":"b", "3":"c"}

#lines = open(csv_file, "r").readlines()
lines = []
lines.append("1,aaa")
lines.append("2,bbb")
lines.append("3,ccc")

p = Pool(3)
result = [p.apply_async(process, args=(i, line, list, common_result)) for i, line in enumerate(lines)]
p.close()
p.join()

# output
for id, val in common_result.items():
    print id + "," + val

# process()でreturnした結果をresultで受け取って出力してもよい
print [r.get(timeout=1) for r in result]

ちなみに、進捗を出力しても、python test.py > log & のように実行すると、バッファリングされてしまい、すぐにはファイルに出力されない。 この場合は python -u test.py > log & とすれば、すぐに出力される。

注意

並列処理するプロセスで参照するデータサイズが大きいと 並列処理のためのオーバーヘッドが大きくて、逆に遅くなるので注意。

KBレベルならいいが、100MBレベルだと厳しい。 その場合、参照データはDBにおいて、SQLでアクセスした方がよい。