优秀的编程知识分享平台

网站首页 > 技术文章 正文

深入理解子进程(续)(子进程和子线程)

nanyue 2024-09-09 04:53:20 技术文章 6 ℃

sh 的实现

sh 为 Shell 操作提供了一个非常好用的接口,功能也非常强大。

  1. from sh import ls

  2. print(ls('-l'))

源码:https://github.com/amoffat/sh/blob/1.12.14/sh.py

代码总共 3500 多行,这里只介绍它实现中的关键部分,有兴趣可以详细阅读它的源码。 读代码之前最好先看一下它的文档 Reference,了解各个对象间的逻辑关系以及代码执行流程,避免陷入细节中。

从 sh 模块导入任意命令

我们可以从 sh 模块导入任意命令,而且这些命令都是动态生成的。

下面解析它的实现:

  1. # magicmodule.py

  2. import sys

  3. from types import ModuleType

  4. class MagicModule(ModuleType):

  5. def __init__(self, name):

  6. super.__init__(name)

  7. def __getattr__(self, name):

  8. return name

  9. sys.modules[__name__] = MagicModule(__name__)

当导入一个模块时,Python 的导入机制会创建一个模块对象存到 sys.modules[__name__]中,而from module import attr就会获取该模块对象的属性。 把系统创建的模块对象替换成实现了__getattr__方法的模块对象,就能导入任意命令了。

sh 中相关代码在 L3333 SelfWrapper类 和 L3581 修改sys.modules

优雅的异常处理

sh 支持这样的用法:

  1. from sh import ErrorReturnCode_12

  2. from sh import SignalException_9

  3. from sh import SignalException_SIGHUP

  4. try:

  5. some_cmd

  6. except ErrorReturnCode_12:

  7. print("couldn't do X")

这些异常类都是动态生成的(L433):

  1. def get_exc_from_name(name):

  2. """takes an exception name, like:

  3. ErrorReturnCode_1

  4. SignalException_9

  5. SignalException_SIGHUP

  6. and returns the corresponding exception. this is primarily used for

  7. importing exceptions from sh into user code, for instance, to capture those

  8. exceptions

  9. """

它用下面这个正则表达式(L424) 匹配异常类的名称,取出返回码或者信号码:

  1. rc_exc_regex = re.compile("(ErrorReturnCode|SignalException)_((d )|SIG[a-zA-Z] )")

然后动态生成一个类:

  1. exc = ErrorReturnCodeMeta(name, (base,), {"exit_code": rc})

ErrorReturnCodeMeta(L325) 是一个元类,动态生成的异常类是元类的实例。

which 命令

sh 用 Python 实现了 which命令的功能(L522):

实现原理就是遍历 PATH环境变量中所有的路径,找到符合要求的可执行文件:

  1. # L528

  2. def is_exe(fpath):

  3. # 存在 & 可执行 & 是个文件

  4. return (os.path.exists(fpath) and

  5. os.access(fpath, os.X_OK) and

  6. os.path.isfile(os.path.realpath(fpath)))

  7. # L554

  8. for path in paths_to_search:

  9. exe_file = os.path.join(path, program)

  10. if is_exe(exe_file):

  11. found_path = exe_file

  12. break

后面启动子进程需要用到这个函数。

  1. def resolve_command_path(program):

  2. # 查找可执行文件

  3. path = which(program)

  4. if not path:

  5. # 替换下划线为横杠,再次尝试

  6. if "_" in program:

  7. path = which(program.replace("_", "-"))

  8. if not path:

  9. return None

  10. return path

  11. # 导入命令时调用此函数,创建 Command 对象

  12. def resolve_command(name, baked_args=None):

  13. path = resolve_command_path(name)

  14. cmd = None

  15. if path:

  16. cmd = Command(path)

  17. if baked_args:

  18. cmd = cmd.bake(**baked_args)

  19. return cmd

Command 类的实现

接下来先看 Command类的实现(L1054),所有命令都是这个类的实例。

  1. class Command(object):

  2. """ represents an un-run system program, like "ls" or "cd". """

  3. # L1065

  4. _call_args = {

  5. "fg": False, # run command in foreground

  6. # run a command in the background. commands run in the background

  7. # ignore SIGHUP and do not automatically exit when the parent process

  8. # ends

  9. "bg": False,

  10. # ...一堆参数

  11. }

  12. # L1188

  13. def __init__(self, path, search_paths=None):

  14. found = which(path, search_paths)

  15. # L1209

  16. self._path = encode_to_py3bytes_or_py2str(found)

它把大量的参数定义放在 _call_args中,这样有几个好处:

  1. 很方便处理大量参数

  2. 很方便写注释

  3. 可读性更好

因为要允许用户直接创建 Command 对象,所以又调用了一次 which

接着看它怎么处理参数的(L1236):

  1. @staticmethod

  2. def _extract_call_args(kwargs):

  3. """ takes kwargs that were passed to a command's __call__ and extracts

  4. out the special keyword arguments, we return a tuple of special keyword

  5. args, and kwargs that will go to the execd command """

  6. kwargs = kwargs.copy

  7. call_args = {}

  8. for parg, default in Command._call_args.items:

  9. key = "_" parg

  10. if key in kwargs:

  11. call_args[parg] = kwargs[key]

  12. del kwargs[key]

  13. invalid_kwargs = special_kwarg_validator(call_args,

  14. Command._kwarg_validators)

  15. if invalid_kwargs:

  16. exc_msg =

  17. for args, error_msg in invalid_kwargs:

  18. exc_msg.append(" %r: %s" % (args, error_msg))

  19. exc_msg = "n".join(exc_msg)

  20. raise TypeError("Invalid special arguments:nn%sn" % exc_msg)

  21. return call_args, kwargs

它将参数分成特殊参数(下划线开头) 和普通参数,特殊参数能够控制命令的执行过程, 还能看到它对特殊参数进行了统一校验,出错提示也非常清晰。

Command 对象的 bake方法,功能类似于functools.partial:

  1. >>> from sh import ls

  2. >>> lslh = ls.bake('-l', '-h')

  3. >>> lslh

  4. total 56K

  5. -rw-r--r-- 1 guyskk guyskk 155 May 12 13:00 aaa.json

  6. -rw-r--r-- 1 guyskk guyskk 162 May 12 13:00 bbb.py

  7. ...

  8. >>> ls

  9. <Command '/usr/bin/ls'>

  10. >>> lslh

  11. <Command '/usr/bin/ls -l -h'>

  12. >>>

bake方法的实现(L1265):

  1. def bake(self, *args, **kwargs):

  2. fn = type(self)(self._path)

  3. fn._partial = True

  4. call_args, kwargs = self._extract_call_args(kwargs)

  5. pruned_call_args = call_args

  6. for k, v in Command._call_args.items:

  7. try:

  8. if pruned_call_args[k] == v:

  9. del pruned_call_args[k]

  10. except KeyError:

  11. continue

  12. fn._partial_call_args.update(self._partial_call_args)

  13. fn._partial_call_args.update(pruned_call_args)

  14. fn._partial_baked_args.extend(self._partial_baked_args)

  15. sep = pruned_call_args.get("long_sep", self._call_args["long_sep"])

  16. prefix = pruned_call_args.get("long_prefix",

  17. self._call_args["long_prefix"])

  18. fn._partial_baked_args.extend(compile_args(args, kwargs, sep, prefix))

  19. return fn

_partial_call_args属性稍后会用到。

Command 是 callable 对象,它的 __call__方法实现比较复杂(L1324):

  1. def __call__(self, *args, **kwargs):

  2. # ...中间的具体实现不太好理解,先看最后一行

  3. return RunningCommand(cmd, call_args, stdin, stdout, stderr)

RunningCommand 是创建子进程执行命令,所以这里主要是处理参数和三个标准 IO。

处理 管道命令, 如果第一个参数是正在运行的命令,就复用它的标准输入:

  1. # L1373

  2. # check if we're piping via composition

  3. stdin = call_args["in"]

  4. if args:

  5. first_arg = args.pop(0)

  6. if isinstance(first_arg, RunningCommand):

  7. if first_arg.call_args["piped"]:

  8. stdin = first_arg.process

  9. else:

  10. stdin = first_arg.process._pipe_queue

  11. else:

  12. args.insert(0, first_arg)

处理 fg(foreground) 参数:

  1. if call_args["fg"]:

  2. if call_args["env"] is None:

  3. launch = lambda: os.spawnv(os.P_WAIT, cmd[0], cmd)

  4. else:

  5. launch = lambda: os.spawnve(os.P_WAIT, cmd[0], cmd, call_args["env"])

  6. exit_code = launch

os.spawn*os.system运行的效果差不多,区别在于它不需要通过 sh 进程执行命令。

out参数(err参数也差不多):

  1. # stdout redirection

  2. stdout = call_args["out"]

  3. if output_redirect_is_filename(stdout):

  4. stdout = open(str(stdout), "wb")

RunningCommand 的实现

先看接口(L649):

  1. class RunningCommand(object):

  2. """this represents an executing Command object."""

  3. def __init__(self, cmd, call_args, stdin, stdout, stderr):

  4. """

  5. cmd is an array, where each element is encoded as bytes (PY3) or str

  6. (PY2)

  7. """

其实和 Popen 的接口差不多,只是它把一堆参数放在 call_args里面了。

其中有个 iter参数允许迭代获取输出,而不是等子进程结束后再一次性获取。

  1. # set up which stream should write to the pipe

  2. # TODO, make pipe None by default and limit the size of the Queue

  3. # in oproc.OProc

  4. pipe = OProc.STDOUT

  5. if call_args["iter"] == "out" or call_args["iter"] is True:

  6. pipe = OProc.STDOUT

  7. elif call_args["iter"] == "err":

  8. pipe = OProc.STDERR

通过 OProc创建进程执行命令(L750),之后等待进程结束:

  1. if spawn_process:

  2. # this lock is needed because of a race condition where a background

  3. # thread, created in the OProc constructor, may try to access

  4. # self.process, but it has not been assigned yet

  5. process_assign_lock = threading.Lock

  6. with process_assign_lock:

  7. self.process = OProc(self, self.log, cmd, stdin, stdout, stderr,

  8. self.call_args, pipe, process_assign_lock)

  9. if should_wait:

  10. self.wait

其实 RunningCommand实现了__str____repr__方法, 所以它看上去像字符串,它也实现了__iter__方法,也就能迭代获取输出。

  1. >>> ret = sh.ls('-l')

  2. >>> type(ret)

  3. <class 'sh.RunningCommand'>

  4. >>> for line in ret:

  5. ... print(line)

  6. total 28392

  7. -rwxr-xr-x 1 guyskk guyskk 8464 Jul 1 18:34 a.out

  8. -rw-r--r-- 1 guyskk guyskk 421 Jun 4 22:15 app.py

OProc 的实现

OProc(L1678) 封装了创建进程以及进程通信的逻辑,绝大部分特殊参数都是在这处理的。 它的构造函数特别特别长,逻辑太多了。

特殊参数这么多,估计作者也很无奈:

  1. # convenience

  2. ca = self.call_args

这里主要看一下 伪终端相关的参数:

  • _tty_inDefault value: False, meaning a os.pipe will be used.

  • _tty_outDefault value: True

If True, sh creates a TTY for STDOUT, otherwise use a os.pipe.

子进程的输入默认是管道,输出默认是伪终端。 伪终端是行缓存模式,所以能不停地取到输出,对比一下前面用 Popen 运行 hello.py 的效果:

sh 把 _tty_out默认值设为 True 使得它在兼容性方面比 Popen 好很多, Why isttyout=True the default?

大致看一下实现:

  1. # L1770

  2. elif ca["tty_in"]:

  3. self._stdin_read_fd, self._stdin_write_fd = pty.openpty

  4. # tty_in=False is the default

  5. else:

  6. self._stdin_write_fd, self._stdin_read_fd = os.pipe

  7. # L1782

  8. # tty_out=True is the default

  9. elif ca["tty_out"]:

  10. self._stdout_read_fd, self._stdout_write_fd = pty.openpty

  11. else:

  12. self._stdout_read_fd, self._stdout_write_fd = os.pipe

后面的 fork, exec 和 Popen 几乎一样,就不重复介绍了。

读后感

sh 的代码我看了大约一周才理清其中的逻辑,代码太复杂了。

大致有几个原因:

  • Unix 进程本身的复杂性,概念和暗坑很多

  • 用了一些不为人知的 Python 特性(黑魔法)

  • 源码只有一个文件,3500 代码,代码结构不清晰, bottle 项目也是这样的问题

  • 功能太强大了

源码中还有非常多细节我没有提到,其实很多我也不明白,所以只能大致地介绍几个要点, 梳理一下命令的执行过程,希望能有所帮助 ╮( ̄▽ ̄")╭

Tags:

最近发表
标签列表