一尘不染

子流程中有多个管道

python

我正在尝试使用Ruffus管道中的多个Sailq文件作为参数的Sailfish。我使用python<()中的子流程模块执行Sailfish,但即使设置,在子流程调用中也不起作用shell=True

这是我要使用python执行的命令:

sailfish quant [options] -1 <(cat sample1a.fastq sample1b.fastq) -2 <(cat sample2a.fastq sample2b.fastq) -o [output_file]

或(最好):

sailfish quant [options] -1 <(gunzip sample1a.fastq.gz sample1b.fastq.gz) -2 <(gunzip sample2a.fastq.gz sample2b.fastq.gz) -o [output_file]

概括:

someprogram <(someprocess) <(someprocess)

我将如何在python中执行此操作?子过程正确吗?


阅读 265

收藏
2021-01-20

共1个答案

一尘不染

模拟bash进程替换

#!/usr/bin/env python
from subprocess import check_call

check_call('someprogram <(someprocess) <(anotherprocess)',
           shell=True, executable='/bin/bash')

在Python中,您可以使用命名管道:

#!/usr/bin/env python
from subprocess import Popen

with named_pipes(n=2) as paths:
    someprogram = Popen(['someprogram'] + paths)
    processes = []
    for path, command in zip(paths, ['someprocess', 'anotherprocess']):
        with open(path, 'wb', 0) as pipe:
            processes.append(Popen(command, stdout=pipe, close_fds=True))
    for p in [someprogram] + processes:
        p.wait()

在哪里named_pipes(n)

import os
import shutil
import tempfile
from contextlib import contextmanager

@contextmanager
def named_pipes(n=1):
    dirname = tempfile.mkdtemp()
    try:
        paths = [os.path.join(dirname, 'named_pipe' + str(i)) for i in range(n)]
        for path in paths:
            os.mkfifo(path)
        yield paths
    finally:
        shutil.rmtree(dirname)

实现bash进程替换的另一种更可取的方式(无需在磁盘上创建命名条目)是使用@Dunes建议的/dev/fd/N文件名(如果可用)。在FreeBSD上,)为进程打开的所有文件描述符创建条目。要测试可用性,请运行:
fdescfs(5)/dev/fd/#

$ test -r /dev/fd/3 3</dev/null && echo /dev/fd is available

如果失败;尝试进行符号链接/dev/fdproc(5)就像在某些Linux上所做的那样:

$ ln -s /proc/self/fd /dev/fd

这是/dev/fd基于someprogram <(someprocess) <(anotherprocess)bash命令的实现:

#!/usr/bin/env python3
from contextlib import ExitStack
from subprocess import CalledProcessError, Popen, PIPE

def kill(process):
    if process.poll() is None: # still running
        process.kill()

with ExitStack() as stack: # for proper cleanup
    processes = []
    for command in [['someprocess'], ['anotherprocess']]:  # start child processes
        processes.append(stack.enter_context(Popen(command, stdout=PIPE)))
        stack.callback(kill, processes[-1]) # kill on someprogram exit

    fds = [p.stdout.fileno() for p in processes]
    someprogram = stack.enter_context(
        Popen(['someprogram'] + ['/dev/fd/%d' % fd for fd in fds], pass_fds=fds))
    for p in processes: # close pipes in the parent
        p.stdout.close()
# exit stack: wait for processes
if someprogram.returncode != 0: # errors shouldn't go unnoticed
   raise CalledProcessError(someprogram.returncode, someprogram.args)

注意:在我的Ubuntu机器上,该subprocess代码仅在Python 3.4+中有效,尽管pass_fds自Python 3.2起可用。

2021-01-20