Python のsubprocess

やりたいこと

pythonのコードから、コマンドラインツールを呼び出したい。日本語版Windowsでもちゃんと走って欲しい。実行状態をリアルタイムにpython側から見たい。となるとかなりハマるのでメモ。

非Windows/実行待ちの場合

import subprocess
import sys
result=subprocess.run(
    ['some.exe','argument1','argument2"],
    capture_output=True,encoding=sys.stdin.encode,errors='replace')
print("result out:",result.stdout)
print("result err:",result.stderr)

とても簡単です。実行ファイルとそれに渡す引数をrun関数に引数に渡すだけです。出力結果が欲しいときecaputer_outputを指定した上でencodingで普通はsys.stdin.encoding (=utf-8)を指定すれば良いです。

ここで’utf-8’とハードコードせずににsys.stdin.encodingとしているのは、UNIX系システムでは管理者の方針でutf-8じゃなくてEUCとか使っている事があります。そういうシステムではきっと管理者が気を利かせてPYTHONIOENCODING環境変数を設定してくれてるはずです。この環境変数の通りにsys.stdin.encodingが設定されます。

注意:シェル組込関数のforだとかシェル変数設定とかを使いたい場合にrun関数の引数にshell=Trueを使ってcmd.exe経由で実行したくなるかもしれません。その際はセキュリティ上のリスクがあります。必ず実行コマンドをハードコードしてください。あなたがcmd.exeに対する完全なエスケープ処理を実装できるならこんなサイトを見ているはずはありません。

Windowsでの文字コード

windows標準のコマンドラインツールの文字コードは、utf-8ではなく各言語固有の文字コードで実行されています。なのにsys.stdin.encodingの値は’utf-8’です。実際にstdinでは何も考える事無くutf-8で文字列が入力されてきます。しかしsubprocessでの入出力ではこのオシャレなトリックは動きません。日本語では’cp932’という文字コードを設定する必要があります。

result=subprocess.run(
    ['some.exe','argument1','argument2"],
    capture_output=True,encoding="cp932",errors='replace')

当然ながら、他の言語のwindowsでは上のコードは動きません。Pythonの標準ライブラリにもwindowsの端末文字コードを調べる機能は無いようです。

Windowsの文字コードを調べる

趣味のコードは国内よりも下手すれば国外のほうが再利用される時代ですね。ということで、そこそこの国数をカバーしてみました。win32consoleでAPIをたたけば、文字コード(コードページ番号)が得られるので、pythonが標準的に対応している文字コードのうちcpから始まる番号があるものを片端から登録しています。

import sys
TERMINAL_ENCODING=sys.stdin.encoding 
if sys.platform == "win32": # In case of windows, you have to get it via Win32API
    import win32console
    #https://docs.python.org/3/library/codecs.html#standard-encodings
    cp_to_enc={37:"cp037",273:"cp273",424:"cp424",437:"cp437",500:"cp500",720:"cp720",
        737:"cp737",775:"cp775",850:"cp850",852:"cp852",855:"cp855",856:"cp856",
        857:"cp857",858:"cp858",860:"cp860",861:"cp861",862:"cp862",863:"cp863",
        864:"cp864",865:"cp865",866:"cp866",869:"cp869",874:"cp874",875:"cp875",
        932:"cp932",949:"cp949",950:"cp950",1006:"cp1006",1026:"cp1026",1125:"cp1125",
        1140:"cp1140",1250:"cp1250",1251:"cp1251",1252:"cp1252",1253:"cp1253",
        1254:"cp1254",1255:"cp1255",1256:"cp1256",1257:"cp1257",1258:"cp1258",
        936:"gbk",819:"latin_1",1361:"johab",154:"ptcp154",65001:"utf-8",20127:"ascii",
        }
    if win32console.GetConsoleCP() in cp_to_enc:
        TERMINAL_ENCODING=cp_to_enc[win32console.GetConsoleCP()] 

あとは以下のようにして得られた文字コード(TERMINAL_ENCODING)を使えばOKです。当然ながらrunを使うと実行完了までの出力を全てメモリに蓄えるので相応のメモリ量が必要になります。

cmd="""
import sys
for i in range(1000): 
    print(list(range(30)) ) 
    print(sys.argv,file=sys.stderr) 
"""
result=subprocess.run(['python.exe','-c',cmd,'ほげ & " ',"'"],
    capture_output=True,encoding=TERMINAL_ENCODING)
print("result out #:", len(result.stdout.split("\n")) )
print("result err #:", len(result.stderr.split("\n")) )

リアルタイムに状態を確認したい

進捗状況が出てくるタイプのコマンドラインツールは何処まで処理したか確認したかったり、そもそも大量のログをメモリに保持しておきたくないといった事ありますね。標準出力のみで標準エラー出力がなければ簡単です。Popenでstdout=PIPEととして、stdoutからreadlineなりで読み込めばOKです。

cmd="""
import sys
import time
for i in range(10):
    print(list(range(30)) ) 
    time.sleep(0.0001)
"""

with subprocess.Popen(['python.exe','-c',cmd,'ほげ & " ',"'"],
    encoding=TERMINAL_ENCODING,stdout=subprocess.PIPE) as proc:
    c=0
    while(proc.stdout.readline()): c+=1
    print("result out #:", c )

問題は標準エラーも有る時

以下のコードは動きません。デフォルトではwindowsは512byte Linuxは4096byteのバッファがpipeに設定されています(うろ覚え)。下のコードでは先に標準出力だけを読んでいます。なので標準エラー出力がバッファサイズ以上貯まると、popen先のプログラムが停止してしまいます。このバッファ詰まりの時はCtrl-Cでも止めれないし、標準エラー出力に出てくる文字列のサイズによって再現したりしなかったりで面倒です。Unix系の環境では、ノンブロッキングIOをつかってビジーループでstdoutとstderrを交互に読む方法もありますが、windows版pythonにはノンブロッキングIOのサポートが有りません。

cmd="""
import sys
import time
for i in range(10000):
    print(list(range(30)) ) 
    print(sys.argv,file=sys.stderr) 
    time.sleep(0.0001)
"""
with subprocess.Popen(['python.exe','-c',cmd,'ほげ & " ',"'"],
    encoding=TERMINAL_ENCODING,stdout=subprocess.PIPE,stderr=subprocess.PIPE) as proc:
    c=0
    while(proc.stdout.readline()): c+=1
    print("result out #:", c )
    #BUG---------Never reach HERE because of stderr buffer stucks---------
    c=0
    while(proc.stderr.readline()): c+=1
    print("result err #:", c )

回避策1:標準出力に混ぜる

標準出力と標準エラー出力の2つを同時にIO待ちできないのが問題なので、以下のように標準エラーを標準出力にリダイレクトして一本化すれば走ります。表示用途なら大きな問題にならないでしょう。

#Workaroud 1:redirect stderr to stdout (outputs are mixed)
cmd="""
import sys
import time
for i in range(1000): 
    print(list(range(30)) ) 
    print(sys.argv,file=sys.stderr) 
"""
with subprocess.Popen(['python.exe','-u','-c',cmd,'ほげ & " ',"'"],
    encoding=TERMINAL_ENCODING,stdout=subprocess.PIPE,stderr=subprocess.STDOUT) as proc:
    c=0
    while(proc.stdout.readline()): c+=1
    print("result out #:", c )

回避策2:標準エラーのみ一括処理

標準エラーをtempfileに溜めておき、後から処理します。ファイルという名はついてますがOSによってはわざわざファイルを作る事はなく、オンメモリで済ませてくれます。

進捗表示がのようなリアルタイム処理が必要なものは標準出力で、標準エラーはエラーメッセージにすぎず、リアルタイム性が要らない場合に適しています。多くの用途はこれで十分だと思います。

#workaround2: real-time output on stdout, batch output on stderr
import tempfile
cmd="""
import sys
import time
for i in range(1000): 
    print(list(range(30)) ) 
    print(sys.argv,file=sys.stderr) 
"""

with tempfile.TemporaryFile() as stderr_io:
    with subprocess.Popen(['python.exe','-u','-c',cmd,'ほげ & " ',"'"],
        encoding=TERMINAL_ENCODING,stdout=subprocess.PIPE,stderr=stderr_io) as proc:
        c=0
        while(proc.stdout.readline()): c+=1
        print("result out #:", c )

    while(stderr_io.readline()): c+=1
    print("result err #:", c )

最終手段:asyncioで真面目に処理する

両方の出力をリアルタイム処理したい場合です。普通はないと思うけど・・・。

既存ソースコードにバッファ詰まりデッドロックが有る時の修正案としては受け入れられやすいかと思います。(多分こっちだね)

一気に処理が複雑になるうえに、readline()が直接は使えないのでお勧めしません。

import asyncio
#Jupyter や spyderなどのipython環境ではnest_asyncioを有効にしておかないとasyncioはつかえない
#import nest_asyncio
#nest_asyncio.apply()
cmd="""
import sys
import time
for i in range(1000):
    print(list(range(30)) ) 
    print(list(reversed(range(30))),file=sys.stderr) 
    time.sleep(0.0001)
"""
import codecs
class MyProtocol(asyncio.SubprocessProtocol):
    def __init__(self, exit_future):
        self.exit_future=exit_future
        Decoder=codecs.getincrementaldecoder(TERMINAL_ENCODING)
        #you have to prepare the decoder for each output(stdout/stderr)
        #because decoder have some states
        self.decoder=[Decoder(errors="ignore") for i in range(2)]
        self.outputs=["" for i in range(2)]
        self.count=[0]*2
    def pipe_data_received(self, fd, data):
        s=self.decoder[fd-1].decode(data) #byte列を文字列に変換 (改行単位であることは保証されてない)
        self.count[fd-1]+=s.count("\n")
        self.outputs[fd-1]+=s
        sys.stdout.write(s)  #real-time dumping
    def process_exited(self):#sub process exited
        self.exit_future.set_result(True) 


async def get_date(loop):
    exit_future = asyncio.Future(loop=loop) #Flag for waiting sub process exit
    tranport,protocol = await loop.subprocess_exec(lambda: MyProtocol(exit_future),
                                "python.exe", '-u','-c', cmd,
                                stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    # Wait for exit_futer==True
    await exit_future
    return protocol #Myprotocol have some processed data

if sys.platform == "win32":
    loop = asyncio.ProactorEventLoop()
    asyncio.set_event_loop(loop)
else:
    loop = asyncio.get_event_loop()

res = loop.run_until_complete(get_date(loop)) #wait until the exit of subprocess
print("Line counts:",res.count)#process result is in res(MyProtocol)
loop.close()#必要に応じて

asyncioをつかって、標準出力(fd=1)、標準エラー(fd=2)の処理をProtocolハンドラ(MyProtocol)で処理します。出力/エラーが入ってくる度にpipe_data_received関数がfd引数付きで呼ばれるので、fd値でどちらかを判別してdataを処理します。

asyncio.subprocessではencodingの処理をしてくれないので、getincrementaldecoderを使って自前でエンコードします(emptypagesさん解説記事)。byte.encode()は利用できません。マルチバイト文字列のちょうど良い境界でハンドラが呼ばれるとは限らないからです。readline的な動作をさせたければ、さらに自前で改行文字(\n)を見つけ次第、各行を処理するといった処理が必要になります。

最終手段2:疑似non-blockIO

スレッドとキューを使って擬似的にnon-blockIOを作ります。non-blockIOではeofか単にデータが来てないだけか判別出来ないので、is_eof関数を追加しています。

def proc_readline_ext(proc):
    from threading import Thread
    from queue import Queue, Empty
    from time import sleep
    qout = Queue()
    qerr = Queue()
    
    #Function of the thread : readline loop (push lines to queue)
    def _enqueue_output(out, queue):
        def _r():
            for line in iter(out.readline,''): #it means loop until EOF=''
                queue.put(line)
            out.close()
        return _r
    def _readline_nowait(q):
        def _r():
            try:
                return q.get_nowait()
            except Empty:
                return ''
        return _r
    #start thread for stdout/stderr
    to = Thread(target=_enqueue_output(proc.stdout,qout))
    to.daemon = True  # thread dies with the program
    to.start()
    te = Thread(target=_enqueue_output(proc.stderr,qerr ))
    te.daemon = True
    te.start()

    def no_newline():
        return qout.empty() and qerr.empty()

    def wait_newline(timeout=0.5):
        if(no_newline()):
            sleep(timeout)
            return False
        return True

    def wait_eof():
        to.join()
        te.join()
    def is_eof():
        if(no_newline and proc.stdout.closed and proc.stderr.closed):
            return True
        else:
            return False
    #
    #Add Non-block readline method/objects 
    #
    proc.is_eof=is_eof
    qout.readline = _readline_nowait(qout)
    qerr.readline = _readline_nowait(qerr)

    proc.qout = qout
    proc.qerr = qerr
    proc.wait_newline = wait_newline
    proc.wait_eof =wait_eof


print("with thread")
cmd="""
import sys
import time
for i in range(100): 
    print(list(range(30)) ) 
    print(sys.argv,file=sys.stderr) 
    time.sleep(0.0001)
"""
with subprocess.Popen(['python.exe','-u','-c',cmd,'ほげ & " ',"'"],
        encoding=TERMINAL_ENCODING,stdout=subprocess.PIPE,stderr=subprocess.PIPE) as proc:
    proc_readline_ext(proc) #add non-block readline(q*.readline) to proc object
    while not proc.is_eof()  :
        proc.wait_newline(0.01)
        o=proc.qout.readline()
        if(o is not ''):print(o)
        o=proc.qerr.readline()
        if(o is not ''):print(o)

どうせThreadを使うならばその中で処理してしまうというのも手ですが、自分以外の誰かがメンテナンスする可能性を考えるとお勧めしません。スレッドの事を知っている人が触るコードと、スレッドの事を知らない人が触るコードは分離すべきです。