効果的なPython その3

python

 5章の読解です。ほぼ個人のメモなので詳細は本を読んでください。

5章 並行性と並列性

 並行は1つのCPUで複数の仕事を並行して動かすこと。時間を分割して仕事をとっかえひっかえします。

 並列は複数のCPUで複数の仕事を並列して動かすこと。実際に同時に複数の仕事ができるので、高速動作します。

 Pythonで書いた並行プログラムを並列に動かすのはそれなりに難儀なので、そのコツを記しています。相変わらず小難しいので自分の理解が正しいかどうかは自信がありません。

subprocessを使って子プロセスを管理する

 シェルスクリプトが複雑になってきたらPython使ってきれいにしようぜ。ここはsubprocess使おうぜ。だそうです。

 sbuprocessをググるとcall関数を使ってシリアルに動かす例が多いですが、ここではPopen使って「並列」に動かしています。そしてcommunicateを使って結果を待って(受け取って)います。

 Popenによって作られたプロセスは親プロセスとは独立に動きます。(以下のサンプルはwindows環境で動くように本文のコードをいじっています。)

import subprocess
from time import time

def run_sleep(period):
    proc = subprocess.Popen(['powershell', 'sleep', str(period)], shell=True)
#    proc = subprocess.call(['powershell', 'sleep', str(period)], shell=True)
    return proc

start = time()
procs = []

for _ in range(10):
    proc = run_sleep(0.1)
    procs.append(proc)

for proc in procs:
    proc.communicate()

end = time()

print(end - start, "sec")

 10回呼んでるので、1秒はかかりそうですが、結果は0.3秒ほど。複数のCPU使って並列に処理してます。

 コメントしているcallを使い、communicateをコメントアウトすると、かかる時間は1.5秒ほど。オーバヘッドがなければ並行処理して1秒なので、callでは並列に処理できていない(終了を待つ)ことがわかります。

 プロセスが何らかの理由で死んでしまっているかもしれません。心配ならtimeoutつけましょう。

for _ in range(10):
    proc = run_sleep(2)
    procs.append(proc)

for proc in procs:
    try:
        proc.communicate(timeout=1)
    except subprocess.TimeoutExpired:
        print("time out")
        proc.terminate()
        proc.wait()

 2秒スリープするプロセスを10個並列に起こして、直後に1秒のタイムアウトで待ち構えます。1秒ずつ待ち構えるので、2個のプロセスはタイムアウトしていますが、残りの8個は通り抜けました。ほんと並列で動いてますね。

スレッドはブロッキングI/Oに使い、並列性に使うのは避ける

 残念。Threadは並列に動いていないそうです…。見かけ上並列に動いて見えますが、処理は逐次、時分割で動いているようです。この検証コードです。

import time
import threading

def func():
    time.sleep(1)

def func2():
    sum = 0
    for n in range(17000000):
        sum += n
    return sum

start = time.time()    
threads = []

for i in range(4):
    thread = threading.Thread(target=func2)
    threads.append(thread)

for thread in threads:
    thread.start()

for thread in threads:
    thread.join()

end = time.time()
print(end - start, " sec")

 func2は自分の環境で調整した約1秒の関数です。この並列演算では期待されるのは4秒より早く、CPUが2つなら2倍の速さで処理が終了することですが、残念ながら4秒以上かかりました。

4.354208707809448 sec

 理由もこの章に書かれていますが、よくわかりません。じゃあなんでスレッドなんてあるのよ。ってことですが、こっちはまぁなんとなくわかりました。

 これがこの章のタイトルにもなってるブロッキングI/Oを使う時のため。やれヘリコプターを制御する例が載ってましたがややこしいので、上のサンプルのsleepを使っている方がそのブロッキングI/Oの例になります。

 ブロッキングI/Oってのは何ぞどこかに処理を投げてる間終わるのをボーっと待ってます。ファイルを読むのならそのファイルにアクセスして読み込んでっていう一連のOSの処理を待って、続きをすることになります。そんなイメージ。

 待つのも暇だから続きの処理しとこう。って時にThreadを使うとよいよ。ってことのようです。ディスプレイに対する描画とか、ファイルのIOとかそんな時ですかね。このsleepも処理依頼したら暇なので続きの処理をするので、こちらは4秒もかからず

1.0021929740905762 sec

 で終わってます。「1秒寝といて」ってお願いをしてから待たずに次の「1秒寝といて」をお願いしているので、結果として1秒ちょいで終わっています。これは期待通り。

 というわけで、Threadは任意の関数に対して複数CPU並列動作をさせてくれないよ。ブロッキングIOとかのシステムコールをするときに使うといいよ。だそうです。

スレッドでのデータ競合を防ぐためにLockを使う

 これはあまり難しい話ではないですね。変数へのアクセスに排他制御しましょう。という話です。前述のようにスレッドが同時に並列に走るわけではないが、排他制御しなくていいわけではない。ということを言っているようです。

import threading
import time

class Counter(object):
    def __init__(self):
        self.count = 0
        
    def increment(self, offset):
        self.count += offset

def worker (indx, how_many, counter):
    for _ in range(how_many):
        time.sleep(0.01)
        counter.increment(1)

def run_thread(func, how_many, counter):
    threads = []
    for i in range(5):
        args = (i, how_many, counter)
        thread = threading.Thread(target=func, args=args)
        threads.append(thread)
        thread.start()
    
    for thread in threads:
        thread.join()

how_many = 10**3
counter = Counter()
run_thread(worker, how_many, counter)
print('counter should be', 5 * how_many, ' found', counter.count)

 排他制御していない例です。がうまくいってしまったので、この例では効果が確認できませんでしたが、どのスレッドも同じCounterに対してインクリメントしているので、危険なコードになっています。

 mutexですね。Lockクラスでそれを実現させるのがオーソドックスなやり方のようです。with文でLockをかけるようです。

import threading
import time

class LockingCounter(object):
    def __init__(self):
        self.count = 0
        self.lock = threading.Lock()
        
    def increment(self, offset):
        with self.lock:            
            self.count += offset

def worker (indx, how_many, counter):
    for _ in range(how_many):
        time.sleep(0.01)
        counter.increment(1)

def run_thread(func, how_many, counter):
    threads = []
    for i in range(5):
        args = (i, how_many, counter)
        thread = threading.Thread(target=func, args=args)
        threads.append(thread)
        thread.start()
    
    for thread in threads:
        thread.join()

how_many = 10**3
counter = LockingCounter()
run_thread(worker, how_many, counter)
print('counter should be', 5 * how_many, ' found', counter.count)

 微小変化です。これで排他制御されるみたいです。効果確認できませんでしたが。

スレッド間の協調作業にはQueueを使う

 パイプラインでふん詰まりを防ぐためにはQueueを使いましょう。ということです。Queue最高。って話です。

 カメラで撮影、画像サイズ変更、画像の送信 という一連の処理をスレッドで非同期に処理させ結果を自前のQueueに置いておくようなことを考えてみます。

 その場合処理が遅い人の入力が溜まりにたまってメモリの枯渇とか、処理の早い人が前の処理終了をポーリングしまくってCPU資源を無駄に食ってしまったり、とかそんな問題を解決してくれるのがQueueクラスになります。

from queue import Queue
from threading import Thread
import time

queue = Queue()

def consumer():
    print('consumer waiting')
    queue.get()
    print('consumer done')
    
thread = Thread(target=consumer)
thread.start()

time.sleep(0.1) # これを入れないと waitingをプリントしている間にputtingがプリントされる。

print('producer putting')
queue.put(object())
thread.join()
print('producer done')

 この例は本文中のコードそのままですが、putされない限りgetが動かないことを確認できます。

consumer waiting
producer putting
consumer done
producer done

 と実行されます。Queueのバッファサイズの指定もできます。

from queue import Queue
from threading import Thread
import time

queue = Queue(2)

def consumer():
    time.sleep(0.5)
    print('consumer waiting')
    queue.get()    
    print('consumer waiting 2nd')
    queue.get()
    print('consumer done')
    
thread = Thread(target=consumer)
thread.start()

print('producer putting')
queue.put(object())
print('producer putting 2nd')
queue.put(object())
print('producer done')

thread.join()

 バッファが2つあるので、2つのputが終わってから2つのgetが走ってます

producer putting
producer putting 2nd
producer done
consumer waiting
consumer waiting 2nd
consumer done

 バッファを1つにすると、2回目のputが待たされて、

producer putting
producer putting 2nd
consumer waiting
consumer waiting 2nd
producer done (2回目のputがここで完了)
consumer done

 この順になります。

 Queueでjoinする方法もあるようです。threadで終わりをポーリングするのではなく、Queueのtask_doneを待つパターンです。

from queue import Queue
from threading import Thread
import time

queue = Queue()

def consumer():
    time.sleep(0.5)
    print('consumer waiting')
    queue.get()    
    print('consumer working')
    time.sleep(0.5)
    print('consumer done')
    queue.task_done()
    
Thread(target=consumer).start()

print('producer putting')
queue.put(object())
print('producer done')

queue.join()
print('queue done')

 一回はまったのは、queueにput/getした分だけ、task_doneを呼ばないといけないので上のコードのように2回putしたら2回task_doneしないといけません。

 最後にこの章ではQueueの派生クラスでcloseができるクラスを作り、そのQueueの中身をすべて処理するthreadの派生クラスの例が載っています。理解はできましたが、疲れたのでQueueによるパイプラインを作ることになったときに再度読み込みたいと思います…。抽象コードでしたが、一応動くようにできたのでコードだけは載せておきます。

from queue import Queue
from threading import Thread
import time

class ClosableQueue(Queue):
    SENTINEL = object()
    
    def close(self):
        self.put(self.SENTINEL)

    def __iter__(self):
        while True:
            item = self.get()
            try:
                if item is self.SENTINEL:
                    return
                yield item
            finally:
                self.task_done()

class StoppableWorker(Thread):
    def __init__(self, func, in_queue, out_queue):
        self.func = func
        self.in_queue = in_queue
        self.out_queue = out_queue
        super().__init__()
        
    def run(self):
        for item in self.in_queue:
            result = self.func(item)
            self.out_queue.put(result)

def put(queue):
    for i in range(10):
        time.sleep(0.01) # working
        print("set ", i)
        queue.put(i)
    
def func(i):
    print("calc ", i)
    time.sleep(0.02) # working
    return i * 2

def func2(queue):
    for q in queue:
        print(q)
 
in_queue = ClosableQueue()
out_queue = ClosableQueue()

Thread(target = put, args=(in_queue, )).start()
StoppableWorker(func, in_queue, out_queue).start()
Thread(target = func2, args=(out_queue, )).start()

time.sleep(3)
in_queue.close()
in_queue.join()
out_queue.close()
out_queue.join()

 処理の重さが微妙に違う2つのスレッドで速度差を調節すると処理順がいろいろ動きます。setが速ければ後ろが詰まるし、遅ければ後ろが待つ様子がわかりました。

多くの関数を並列に実行するにはコルーチンを考える

 コルーチンはスレッドより実行コスト(メモリ、開始時間)が削減できる。だからコルーチンを使おう。とのことです。

 相変わらず小難しいのでいろいろ調べてみると、本書に乗っているgeneratorベースのコルーチンは3.10から非サポートのようです。もちろん代替手段は用意されているので、概念だけ理解しようと思います。

 コルーチンは何もPython特有の記述ではないので、他の言語でも使われています。

 ググるとIOなどの時間のかかる処理を投げた後にその関数は一時停止しつつ呼び元の関数に制御を戻すような、そんな例が出てきます。一時停止と再開がコルーチンのポイントのようです。

 ただ、このサンプルだと「実態として結局シーケンシャルに動いてるんだけど、コードの記述にそれを意識させないようにできる(隠蔽できている)ことがうれしいよ。」と言ってる気がします。多分。少なくともこのサンプルは非同期並列処理ではないです。

 本章の最後にも「コルーチンは、取り巻く環境との相互作用からプログラムの中核となるロジックを分離する強力なツールである」とあります。少し哲学的です。

 書籍のコードをそのままコピーして動作するようにしたコードが以下です。コピペで動きます。

 ライフゲームというものらしいです。5×9のグリッドの中にはALIVEかEMPTYが入っており、ある注目セルの周囲8セルの状態によって次の世代での注目セルの状態が確定するというものです。

 正直for文で普通に書けばよさそうなもんですが、for文を記述する関数がえらいシンプルで周りから絶縁されていることがこの書き方のポイントのようです。よくわかりませんが。

 generatorのyieldと、yield fromの書式、あとはnamedtupleあたりをググりながら読めばなんとなく雰囲気はつかめました。

from collections import namedtuple

ALIVE = '*'
EMPTY = '_'
TICK = object()

Query = namedtuple('Query', ('y', 'x'))
Transition = namedtuple('Transition', ('y', 'x', 'state'))

def count_neighbors(y, x):
    # 8回Queryをyieldして周りの生存数をカウントして終わるコルーチン
    # 確かにすっきりして見える。
    n_ = yield Query(y + 1, x + 0) # north
    ne = yield Query(y + 1, x + 1) # 
    e_ = yield Query(y + 0, x + 1) # 
    se = yield Query(y - 1, x + 1) # 
    s_ = yield Query(y - 1, x + 0) # 
    sw = yield Query(y - 1, x - 1) # 
    w_ = yield Query(y + 0, x - 1) # 
    nw = yield Query(y + 1, x - 1) # 
    
    neighbor_states = [n_, ne, e_, se, s_, sw, w_, nw]
    count = 0
    for state in neighbor_states:
        if state == ALIVE:
            count += 1

        # returnはyield fromで呼び出された時の返り値
    return count


def game_logic(state, neighbors):
    # 周りが3人生きてれば生きる。これはコルーチンではない。
    if state == ALIVE:
        if neighbors < 2:
            return EMPTY
        elif neighbors > 3:
            return EMPTY
    else:
        if neighbors == 3:
            return ALIVE
    return state


def step_cell(y, x):
    # 9回Queryをyieldしてゲームのルールに従い次へ遷移してたものをyieldするコルーチン
    state = yield Query(y, x) # まずは自分を生成
    neighbors = yield from count_neighbors(y, x) # 周りを順に生成し合計を得る
    next_state = game_logic(state, neighbors)
    yield Transition(y, x, next_state) # 最後に自分を更新したものをyield


def simulate(height, width):
    # grid内を全部更新終わったらTickをyield
    # この関数がすごいらしいです。
    # 周囲の環境と絶縁されていて、全セルがstep_cellを実行していることが明白だから。
    while True:
        for y in range(height):
            for x in range(width):
                yield from step_cell(y, x)
        yield TICK


class Grid(object):
    # Gridクラスセットとゲットがあるだけ
    def __init__(self, height, width):
        self.height = height
        self.width = width
        self.rows = []
        for _ in range(self.height):
            self.rows.append([EMPTY] * self.width)
    
    def __str__(self):
        tmp = ''
        for y in range(self.height):
            for x in range(self.width):
                tmp += self.rows[y][x].__str__()
            tmp += '\n'
        return tmp
    
    def query(self, y, x):
        # 剰余で配列範囲外のアクセスを抑制
        return self.rows[y % self.height][x % self.width]
        
    def assign(self, y, x, state):
        self.rows[y % self.height][x % self.width] = state


def live_a_generation(grid, sim):
    # 今の世代のグリッドを引数に、次の世代のグリッドを返す
    progeny = Grid(grid.height, grid.width)
    item = next(sim)
    while item is not TICK:
        if isinstance(item, Query): # Queryの場合
            state = grid.query(item.y, item.x) # 受け取った座標の値をとってきて
            item = sim.send(state) # sendするだけ
        else: # Transitionの場合(周りから自分が確定)
            progeny.assign(item.y, item.x, item.state) # 次世代のgridを更新する
            item = next(sim)
    return progeny

# 動作確認サンプル
grid = Grid(5, 9)
grid.assign(0, 3, ALIVE)
grid.assign(1, 4, ALIVE)
grid.assign(2, 2, ALIVE)
grid.assign(2, 3, ALIVE)
grid.assign(2, 4, ALIVE)
print(grid)

for _ in range(4):
    sim = simulate(grid.height, grid.width)
    grid = live_a_generation(grid, sim)
    print(grid)

実行結果。ちゃんと書籍通りの結果になりました。

___*_____
____*____
__***____
_________
_________

_________
__*_*____
___**____
___*_____
_________

_________
____*____
__*_*____
___**____
_________

_________
___*_____
____**___
___**____
_________

_________
____*____
_____*___
___***___
_________

 前述したようにこのgeneratorベースのコルーチンはすでに非サポートのようなので、今後はasync/awaitを使うことになりそうです。調べるとサンプルはいろいろ出てきて、書き換えもできそうな気がしましたが、疲れたのでやめにします。

 ここに公式の説明があります。

本当の並列性のためにconcurrent.futuresを考える

 PythonではGIL(グローバルインタプリタロック)のおかげで、マルチコアCPUを活用するのは難儀です。

 C記述による拡張が手っ取り早いのですが、Cへの移植はコードが難読になる上に、テストの必要性やバグのリスクが増大するのでお勧めしません。というのが本書の見解。

 Python単独でCPU資源を活用するためには組み込みモジュールであるconcurrent.futuresからアクセスできるmultiprocessingがお勧めとのことです。これを使えばちゃんとCPUコアを複数使った並列処理が可能になるようです。

 紹介、比較されている関数は2つでThreadPoolExecutorProcessPoolExecutor

 ThreadPoolExecutor:いわゆるGILによるPythonスレッドの関数
 ProcessPoolExecutro:こいつがCPU並列してくれるPythonスレッド関数

 ということのようです。なので、前者ではCPU負荷のかかる処理の並列において処理時間の短縮は果たせない(むしろスレッドのオーバーヘッドにより遅くなる)が、後者では早くなったよ。って例が紹介されています。

 ThreadPoolExecutorは何のため?というとこれまでのスレッドと同様にIOへのアクセスなどCPU負荷の低い処理をするときに使うのだと思います。

def heavy():
    :

results = list(map(heavy, args)) # 普通に処理

pool = ThreadPoolExecutor(max_workers = 2)
results = list(pool.map(heavy, args)) # GILスレッド むしろ遅い

pool = ProcessPoolExecutor(max_workers = 2)
results = list(pool.map(heavy, args)) # マルチコアだと早くなる。

 こんな感じ。というわけでまとめは

 CPUボトルネックをC拡張するのは効果的だがリスクだよ。concurrent.futuresとその単純なProcessPoolExecutorを使うといいよ。だけどあまり高度な機能は複雑なので使わない方がいいよ。

 という感じ。

つづく

 6章に続く。予定

python
スポンサーリンク
キャンプ工学

コメント

タイトルとURLをコピーしました