转载自

https://www.mnstory.net/2017/04/16/design-is-self-imposed/

https://www.mnstory.net/2017/04/16/locate-problem-of-python-child-zombie/

非常感谢! 写代码+抓bug一条龙,思路相当不错,开阔眼界


一直有个悖论,如果一个人,没有设计能力,那就不会给你模块设计;但是,一个人的设计能力,需要从实际的设计中锻炼出来,如果不给你模块锻炼,如何得来设计能力?

看样子是这样的,但是,也不尽然,我之前给同事吹过牛逼:设计,是自找的。

你可以从每天改BUG的生活中,找到设计,之前举了一个我在改BUG的时候如何为HCI引入redis的例子,我今天看一下,一个普通的API,如何自找设计。

V1

写一个Python执行Shell命令的API,看似乎非常简单,我的需求是,可以输入一点数据也可不输入(不交互),主要是能分别获取STDOUT和STDERR,还有退出码,方便外部判断命令是否执行正确(然而我最害怕的是,有同事根本不关心返回值,那就没下面什么事了)。

一般来说,写到这个水平,已经差不多了:


def run(cmd, input=None):
    try:
        p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        (out, err) = p.communicate(input)
    except Exception, e:
        return (127, "", str(e))
    
    return (p.returncode, out, err)

V2

当然,作为老码农,写的代码总应该和新员工有所区别,必须细读API DOC,搞懂每个参数是做啥的,测试验证,有疑问的配合源码阅读,然后我又发现几个问题:

  1. 是否应该记录一下程序的执行时间? 毕竟太多时候,定位性能问题,就靠这个时间。(经验)
  2. 是否需要对输出的数据做一下formal处理? 例如out数据有的是带回车换行,有的不带,当然,作为通用API,我应该原封不动返回,但是我是个懒人,我不想每次外部获取到的out数据还要自己trim一下,事实上,我至今没有见过谁的命令调用,结果分析依赖于out的首位两端空白符的,所以,我认为应该API内部做formal处理。(个人需求)
  3. 此API里面是否应该输出一些正常日志。 不是异常日志,异常日志我是一定会输出的,也会返回,但是正常日志,一般情况下,我是拒绝的。 但这个地方我认为有必要,因为我是一个反对在程序里面掉命令来完成任务的人,所以说,这个run函数,使用应该非常少,也需要非常明确哪些逻辑使用了,所以我输出一些日志,第一,可以警示使用者,命令是否调用过多;第二,调命令完成任务是最容易出错的逻辑,应该有全面的日志记录。(设计取舍)
  4. 经验告诉我们,毫不相干的子进程应该close所有继承自Parent的句柄。(经验)

于是更改为如下版本:


_lastOutDict={}
def run(cmd, input=None):
    # 1. 记录执行时间
    timeStart = time.time()
    try:
        #4. close_fds=True 关闭所有从父进程继承的句柄
        p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, close_fds=True)
        (out, err) = p.communicate(input)
    except Exception, e:
        timeEnd = time.time()
        # 3. 错误日志输出
        l.error("<EXE>(%ds):%s failed(%s)" % ((timeEnd-timeStart), cmd, str(e)))
        return (127, "", str(e))
    timeEnd = time.time()

    # 2. 对out和err做trim处理
    if out:
        out = out.strip()
    else:
        out = ""
    if err:
        err = err.strip()
    exitCode = p.returncode

    # 3. 正常日志输出的时候,要考虑是否太过冗余,所有对于超过256字节的相同输出信息,第二次就做了supress,防止日志干扰
    debugSupressOut = out
    if out and len(out) > 256:
        if _lastOutDict.get(cmd, "") == out:
            debugSupressOut = "<equal last...>"
        else:
            _lastOutDict[cmd] = out
    # 3. 正常日志输出
    l.debug("<EXE>(%d,%ds):%s%s%s%s" % (exitCode, (timeEnd-timeStart), cmd, (" <IN>:%s" % input) if input else "", (" <OUT>:%s" % debugSupressOut) if debugSupressOut else "", (" <ERR>:%s" % err) if err else ""))
    return (exitCode, out, err)

V3

我对命令行调用的敬畏之心,远远超过很多人,所以,我还觉得差点什么。

是的,差一个TIMEOUT。

经验告诉我们,依赖外部命令的时候,有一个常见的风险,便是卡死,这是个头疼的问题。 Python2.7里面没有TIMEOUT执行命令的API,需要借助线程的TIMOUT来实现。

于是,有了第三个版本:


# 杀进程树,而不是子进程,单杀子进程,孙子进程还在,残留逻辑没人收拾
def killTree(rootPid, killRoot):
    try:
        rootProcess = psutil.Process(rootPid)
        children = rootProcess.get_children(recursive=True)
        for child in children:
            l.info("kill tree child %d:%s (parent %d:%s)" % (child.pid, child.cmdline, rootProcess.pid, rootProcess.cmdline))
            child.kill()
        psutil.wait_procs(children, timeout=7)

        if killRoot:
            l.info("kill tree root %d:%s" % (rootProcess.pid, rootProcess.cmdline))
            rootProcess.kill()
            rootProcess.wait(5)
    except Exception, e:
        l.warning("kill tree %d found exception, %s" % (rootPid, str(e)))

class Command(object):
    def __init__(self, cmd, input=None):
        self.cmd = cmd
        self.input = input
        self.process = None
        self.out = ""
        self.err = ""
        self.errDesc = ""

    def _target(self):
        try:
            self.process = subprocess.Popen(self.cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, close_fds=True)
            (self.out, self.err) = self.process.communicate(self.input)
        except Exception, e:
            if self.errDesc:
                self.errDesc += ", "
            self.errDesc += str(e)

    def _run(self):
        self._target()

    def _runTimeout(self, timeout):
        thread = threading.Thread(target=self._target)
        thread.start()
        thread.join(timeout)

        if thread.is_alive(): # 超时后,线程还没有主动结束,表示还卡着,这个时候,就要主动KILL了
            if self.errDesc:
                self.errDesc += ", "
            self.errDesc += "timeout(%ds)" % timeout
            if None == self.process:
                self.errDesc += ", no process object"
            else:
                self.errDesc += ", kill process tree(%d)" % self.process.pid
                killTree(self.process.pid, True) #全部杀死
            thread.join(1) #再给他一个机会

    def run(self, timeout=-1):
        global _lastOutDict

        timeStart = time.time()
        if timeout <= 0:
            self._run() #如果是没有timeout,就不需要开启线程
        else:
            self._runTimeout(timeout) #用新线程来等待
        timeEscape = (time.time()-timeStart)

        if self.out:
            self.out = self.out.strip()
        else:
            self.out = ""

        if self.err:
            self.err = self.err.strip()
        else:
            self.err = ""

        if self.errDesc:
            exitCode = -1
            if self.err:
                self.err += " "
            self.err += "<EXCEPTION>:" +self.errDesc
        else:
            exitCode = self.process.returncode

        debugSupressOut = self.out
        if self.out and len(self.out) > 256:
            if _lastOutDict.get(self.cmd, "") == self.out:
                debugSupressOut = "<equal last...>"
            else:
                _lastOutDict[self.cmd] = self.out

        l.debug("<EXE>(%d,%ds):%s%s%s%s" % (exitCode, timeEscape, self.cmd, (" <IN>:%s" % self.input) if self.input else "", (" <OUT>:%s" % debugSupressOut) if debugSupressOut else "", (" <ERR>:%s" % self.err) if self.err else ""))
        return (exitCode, self.out, self.err)

def run(cmd, input=None, timeout=-1):
    command = Command(cmd, input)
    return command.run(timeout)

有TIMEOUT的逻辑和最开始的逻辑比起来,多了很多代码,很满意,这过程中,你是不是需要学习很多东西,例如,为何上面要KILL进程树而不是进程?例如,如何利用jone做线程协同?所有的细微知识,积累起来,就是功力。


结合《设计是自找的》看,前面设计了一个Python调用命令行的封装,我一般在自测上做很多功夫,所以,幸与不幸,还是测试出了问题。

构造必现环境

在启动mysql的时候,进程会卡主直到TIMEOUT,子进程是僵尸进程defunct,如下:


root     28058  0.4  0.0 120068 15648 pts/4    S+   11:41   0:00  \_ python test.py
root     28455  0.0  0.0      0     0 pts/4    Z+   11:41   0:00      \_ [sh] <defunct>
root     28459  0.0  0.0  22416  1328 pts/4    S+   11:41   0:00 /bin/sh /var/lib/mysql/bin/mysqld_safe --datadir=/var/lib/mysql/data --pid-file=/var/lib/mysql/data/host-a0369f033dcb.pid
mysql    28678  1.4  0.5 727476 175156 pts/4   Sl+  11:41   0:00  \_ /var/lib/mysql/bin/mysqld --basedir=/var/lib/mysql --datadir=/var/lib/mysql/data --plugin-dir=/var/lib/mysql/lib/plugin --user=mysql --log-error=/var/log/mysql/mysql-error.log --pid-file=/var/lib/mysql

子进程是僵尸进程,那肯定是父进程没有去waitpid,这个问题不是必然的,如果调用的是其他命令,不会出现,所以,我先把命令精简一下,构造一个必现的环境。 test.py代码:


p=subprocess.Popen("./b.sh", shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, close_fds=False)
(out, err) = p.communicate(None)
print out, err

b.sh代码:


#!/bin/sh
pkill mysqld_safe
pkill mysqld
/var/lib/mysql/bin/mysqld_safe --datadir=/var/lib/mysql/data --pid-file=/var/lib/mysql/data/host-a0369f033dcb.pid &
echo "start ok"

strace工具定位

先strace看下test.py在做啥。


pipe([3, 4])                            = 0
fcntl(3, F_GETFD)                       = 0
fcntl(3, F_SETFD, FD_CLOEXEC)           = 0
fstat(3, {st_mode=S_IFIFO|0600, st_size=0, ...}) = 0
fcntl(3, F_GETFL)                       = 0 (flags O_RDONLY)
fstat(3, {st_mode=S_IFIFO|0600, st_size=0, ...}) = 0
mmap(NULL, 4096, PROT_READ|PROT_WRITE, MAP_PRIVATE|MAP_ANONYMOUS, -1, 0) = 0x7fb1b3282000
lseek(3, 0, SEEK_CUR)                   = -1 ESPIPE (Illegal seek)
munmap(0x7fb1b3282000, 4096)            = 0
fstat(3, {st_mode=S_IFIFO|0600, st_size=0, ...}) = 0
lseek(3, 0, SEEK_CUR)                   = -1 ESPIPE (Illegal seek)
read(3, "start ", 6)                    = 6
fstat(3, {st_mode=S_IFIFO|0600, st_size=0, ...}) = 0
lseek(3, 0, SEEK_CUR)                   = -1 ESPIPE (Illegal seek)
read(3, "ok\n", 6)                      = 3
--- SIGCHLD (Child exited) @ 0 (0) ---
read(3, "201", 3)                       = 3
fstat(3, {st_mode=S_IFIFO|0600, st_size=0, ...}) = 0
read(3, "31T03:47:18.655324Z mysqld_sa", 29) = 29
fstat(3, {st_mode=S_IFIFO|0600, st_size=0, ...}) = 0
lseek(3, 0, SEEK_CUR)                   = -1 ESPIPE (Illegal seek)
read(3, "fe Starting mysqld daemon with d"..., 33) = 33
fstat(3, {st_mode=S_IFIFO|0600, st_size=0, ...}) = 0
lseek(3, 0, SEEK_CUR)                   = -1 ESPIPE (Illegal seek)
read(3, "tabases from /var/lib/mysql/data"..., 37) = 33
read(3,

卡在了read函数,从strace跟踪看fd=3是读管道,子进程已经退出了,父进程还在读管道。

proc文件系统定位

从上面的进程列表可以看出, 28459进程是28058的孙子进程,既然28058卡在读管道上,那孙子进程是否会有相应的写管道未CLOSE?我们查看一下:


# lh /proc/28058/fd
total 0
lrwx------ 1 root root 64 Mar 31 14:31 0 -> /dev/pts/4
lrwx------ 1 root root 64 Mar 31 14:31 1 -> /dev/pts/4
lr-x------ 1 root root 64 Mar 31 14:33 11 -> /dev/urandom
lrwx------ 1 root root 64 Mar 31 14:31 2 -> /dev/pts/4
lr-x------ 1 root root 64 Mar 31 14:31 3 -> pipe:[44876238]

# lh /proc/28459/fd
total 0
lr-x------ 1 root root 64 Mar 31 14:33 0 -> /dev/null
l-wx------ 1 root root 64 Mar 31 14:33 1 -> /dev/null
lr-x------ 1 root root 64 Mar 31 14:33 10 -> /var/lib/mysql/bin/mysqld_safe*
lr-x------ 1 root root 64 Mar 31 14:33 11 -> /dev/null
l-wx------ 1 root root 64 Mar 31 14:33 12 -> pipe:[44876238]
l-wx------ 1 root root 64 Mar 31 14:33 13 -> pipe:[44876238]
l-wx------ 1 root root 64 Mar 31 14:31 2 -> /dev/null

GDB工具验证

的确,孙子继承了咱们的句柄,我们尝试关闭孙子进程继承的管道看看:


# gdb -p 28459
(gdb) call close(12)
$1 = 0
(gdb) call close(13)
$2 = 0

28058进程的终于往下走了,从fork到我们close管道另一端,耗时690s:


#strace -Ttt python test.py
14:31:25.203008 fstat(3, {st_mode=S_IFIFO|0600, st_size=0, ...}) = 0 <0.000004>
14:31:25.203040 lseek(3, 0, SEEK_CUR)   = -1 ESPIPE (Illegal seek) <0.000003>
14:31:25.203064 read(3, "tabases from /var/lib/mysql/data"..., 37) = 33 <0.000005>
14:31:25.203092 read(3, 
"", 4)          = 0 <690.316968>

这里,等待了很久没有继续,现在终于结束了


14:42:55.520112 close(3)                = 0 <0.000010>
14:42:55.520203 wait4(28455, [{WIFEXITED(s) && WEXITSTATUS(s) == 0}], 0, NULL) = 28455 <0.000020>
14:42:55.520275 write(1, "start ok\n2017-03-31T06:31:25.182"..., 282start ok

问题确认,造成python 执行命令卡死的原因是管道读写句柄继承了,然而继承端并没有关闭管道。

原因分析

我们知道,linux句柄会继承是个好事也是个头疼的问题,很少有人记得加:FD_CLOEXEC或SOCK_CLOEXEC,于是,我在做supervisor模块的时候,特别处理过类似问题,处理办法很暴力,直接在fork后exec前关闭句柄:


int closeAllfds(int bIngoreDftFD) {
    struct rlimit rl;
    int closeCnt = 0;

    if(-1 == getrlimit(RLIMIT_NOFILE, &rl)) {
        lerror("getrlimit RLIMIT_NOFILE failed %d:%s\n", errno, strerror(errno));
        return -1;
    }
    if(rl.rlim_max == RLIM_INFINITY) {
        //If many files were opened and then this limit was reduced to 1024, 
        //we may not close all file descriptors.
        rl.rlim_max = 1024;
    }

    int fd = 0;
    while(fd < (int)rl.rlim_max) {
        if(!bIngoreDftFD || (fd != STDIN_FILENO && fd != STDOUT_FILENO && fd != STDERR_FILENO)) {
            if(-1 == close(fd)) {
                if(EINTR == errno) {
                    continue; //try again
                }
                if(EBADF != errno) {
                    lerror("close fd %d failed %d:%s\n", fd, errno, strerror(errno));
                }
            } else {
                ++closeCnt;
                lerror("close fd %d, total count %d\n", fd, closeCnt);
            }
        }
        ++fd;
    }

    return closeCnt;
}

既然subprocess.Popen对象参数里面可以设置close_fds标记,那为何不生效? 看看subprocess的源码:


try:
    MAXFD = os.sysconf("SC_OPEN_MAX")
except:
    MAXFD = 256

errpipe_read, errpipe_write = self.pipe_cloexec()

# Close all other fds, if asked for - after
# preexec_fn(), which may open FDs.
if close_fds:
    self._close_fds(but=errpipe_write)

def _close_fds(self, but):
    if hasattr(os, 'closerange'):
        os.closerange(3, but)
        os.closerange(but + 1, MAXFD)
    else:
        for i in xrange(3, MAXFD):
            if i == but:
                continue
            try:
                os.close(i)
            except:
                pass

关闭方法和我的一样暴力,但是有一个but参数,会将写入端的管道排除,子进程其实是没有继承其他句柄的,但是,偏偏就在排除的句柄上,出了问题,真是防不胜防。

再一睹Python库communicate代码,看是否和分析吻合:


def _readerthread(self, fh, buffer):
    buffer.append(fh.read())

def _communicate(self, input):
    stdout = None  # Return
    stderr = None  # Return

    # 指定了stdout为PIPE的时候,会开一个线程来读取
    if self.stdout:
        stdout = []
        stdout_thread = threading.Thread(target=self._readerthread,
                                         args=(self.stdout, stdout))
        stdout_thread.setDaemon(True)
        stdout_thread.start()
    if self.stderr:
        stderr = []
        stderr_thread = threading.Thread(target=self._readerthread,
                                         args=(self.stderr, stderr))
        stderr_thread.setDaemon(True)
        stderr_thread.start()

    if self.stdin:
        if input is not None:
            try:
                self.stdin.write(input)
            except IOError as e:
                if e.errno == errno.EPIPE:
                    # communicate() should ignore broken pipe error
                    pass
                elif (e.errno == errno.EINVAL
                      and self.poll() is not None):
                    # Issue #19612: stdin.write() fails with EINVAL
                    # if the process already exited before the write
                    pass
                else:
                    raise
        self.stdin.close()

    # 主线程会JOIN
    if self.stdout:
        stdout_thread.join()
    if self.stderr:
        stderr_thread.join()

    # All data exchanged.  Translate lists into strings.
    if stdout is not None:
        stdout = stdout[0]
    if stderr is not None:
        stderr = stderr[0]

    # Translate newlines, if requested.  We cannot let the file
    # object do the translation: It is based on stdio, which is
    # impossible to combine with select (unless forcing no
    # buffering).
    if self.universal_newlines and hasattr(file, 'newlines'):
        if stdout:
            stdout = self._translate_newlines(stdout)
        if stderr:
            stderr = self._translate_newlines(stderr)

    # 等PIPE读完了,才会waitpid
    self.wait()
    return (stdout, stderr)

虽然子进程已经退出了,但是test.py并没有调用wait,因为它被read PIPE卡主了,所以才会出现子进程defunct,而父进程一直不去回收,和现象完全吻合。

解决

解决方法比较简单,问题出在脚本执行的时候,不能用后台运行符号&简单了事,需要用daemon命令替代,daemon命令的源码我之前参考过,里面特别干过close fd的事情,所以,将:


/var/lib/mysql/bin/mysqld_safe --datadir=/var/lib/mysql/data --pid-file=/var/lib/mysql/data/host-a0369f033dcb.pid &

改为:


daemon -U -- /var/lib/mysql/bin/mysqld_safe --datadir=/var/lib/mysql/data --pid-file=/var/lib/mysql/data/host-a0369f033dcb.pid

即可。

或者,Python里面不要用PIPE方式取STDOUT亦可。

通过调试过程记录,可以看出,也就是一些知识和工具的运用,技巧不多,还在于积累。