| #!/usr/bin/python -S |
| # Copyright 2012 Google Inc. All Rights Reserved. |
| |
| """A script for linearizing log messages produced during parallel operations. |
| |
| The idea is when you start a subprocess, instead of executing, say, |
| make -C subdir all |
| you would run this: |
| loglinear make -C subdir all |
| |
| This works recursively and in parallel, so you can spawn multiple |
| subprocesses that use loglinear, and they can spawn subprocesses, and so on. |
| As much as possible, the logs will be sorted "as if" all the programs had |
| been run sequentially instead of in parallel. |
| """ |
| |
| import errno |
| import os |
| import select |
| import signal |
| import socket |
| import sys |
| import traceback |
| |
| __author__ = 'apenwarr@google.com (Avery Pennarun)' |
| |
| |
| sessions = {} |
| magic = [0] |
| |
| |
| def _Write(fd, buf): |
| """Like os.write(), but keeps trying if write() returns < len(buf).""" |
| i = 0 |
| while i < len(buf): |
| wrote = os.write(fd, buf[i:]) |
| if wrote == 0: |
| raise IOError('unexpected EOF on write()') |
| i += wrote |
| return i |
| |
| |
| class Session(object): |
| """Describes a server-side session talking to a client.""" |
| |
| def __init__(self, conn): |
| self.conn = conn |
| magic[0] += 1 |
| self.id = magic[0] |
| self.buf = '' |
| self.live_children = [] |
| self.errored_children = [] |
| self.parent = None |
| self.parent_id = None |
| self.nicename = self.fullname = '(starting)' |
| self.got_init_line = False |
| self.exitcode = None |
| sessions[self.id] = self |
| _Write(self.fileno(), '%d\n' % self.id) |
| |
| def __repr__(self): |
| return 'Session(%d)' % self.id |
| |
| def fileno(self): #gpylint: disable-msg=C6409 |
| """Returns self.conn.fileno(), so you can select() on this object.""" |
| return self.conn.fileno() |
| |
| def SetParent(self, parent_id, nicename): |
| self.parent_id = parent_id |
| self.parent = sessions.get(parent_id, None) |
| self.nicename = nicename or str(self.id) |
| if self.parent: |
| self.fullname = '%s.%s' % (self.parent.fullname, self.nicename) |
| self.parent.live_children.append(self) |
| else: |
| self.fullname = self.nicename |
| self.PrintStatus() |
| |
| def Received(self, data): |
| self.buf += data |
| if not self.got_init_line and '\n' in self.buf: |
| init_line, self.buf = self.buf.split('\n', 1) |
| parent_id, nicename = init_line.split(' ', 1) |
| self.SetParent(int(parent_id), nicename) |
| self.got_init_line = True |
| if '\n\0EXIT=' in self.buf: |
| self.buf, exitcode = self.buf.split('\n\0EXIT=', 1) |
| exitcode = int(exitcode.split('\n')[0].strip()) |
| self.Done(exitcode) |
| |
| def PrintStatus(self): |
| if sessions: |
| names = sorted(i.nicename for i in sessions.values()) |
| sys.stdout.flush() |
| sys.stderr.write('waiting(%d): %s\n' |
| % (len(sessions), ', '.join(names))) |
| |
| def PrintLn(self, s): |
| sys.stdout.write('%s: %s\n' % (self.fullname, s)) |
| |
| def Flush(self): |
| if self.exitcode is not None: |
| if self.parent: |
| self.parent.Flush() |
| lines = self.buf.split('\n') |
| self.buf = '' |
| for line in lines: |
| line = line.split('\r')[-1] # \r blanks everything before it |
| if line: |
| self.PrintLn(line) |
| if not self.live_children: |
| while self.errored_children: |
| i = self.errored_children.pop(0) |
| i.Flush() |
| self.PrintLn('exit: %d' % self.exitcode) |
| |
| def Done(self, exitcode): |
| assert self.exitcode is None |
| self.exitcode = exitcode |
| assert self.exitcode is not None |
| del sessions[self.id] |
| if self.parent: |
| self.parent.live_children.remove(self) |
| if self.parent and self.exitcode: |
| assert self.parent.id in sessions |
| self.parent.errored_children.append(self) |
| else: |
| self.Flush() |
| self.PrintStatus() |
| |
| def Run(self): |
| data = os.read(self.fileno(), 4096) |
| if not data: |
| self.Done(999) |
| else: |
| self.Received(data) |
| |
| |
| def RunServer(listensock): |
| Session(listensock.accept()[0]) |
| while sessions: |
| r, _, _ = select.select([listensock] + sessions.values(), [], []) |
| for i in r: |
| if i == listensock: |
| Session(listensock.accept()[0]) |
| else: |
| i.Run() |
| |
| |
| def ForkServer(): |
| """Fork the server subprocess and connect to it.""" |
| sock = socket.socket(socket.AF_INET) |
| sock.bind(('127.0.0.1', 0)) # auto-assign the port |
| sock.listen(100) |
| pid = os.fork() |
| if pid != 0: |
| # parent |
| serveraddr = sock.getsockname() |
| conn = socket.socket(socket.AF_INET) |
| conn.connect(serveraddr) |
| port = serveraddr[1] |
| os.environ['LOGLINEAR_PORT'] = str(port) |
| sock.close() |
| return conn, pid |
| # child |
| rv = 0 |
| try: |
| # don't exit on signals - that's for our child processes to do. |
| # otherwise their messages will get lost. |
| signal.signal(signal.SIGTERM, signal.SIG_IGN) |
| signal.signal(signal.SIGINT, signal.SIG_IGN) |
| RunServer(sock) |
| except: |
| rv = 5 |
| traceback.print_exc() |
| raise |
| finally: |
| try: |
| if rv: |
| sys.stdout.write('loglinear server: exiting with code %d\n' % rv) |
| sys.stdout.flush() |
| finally: |
| os._exit(rv) #gpylint: disable-msg=W0212 |
| |
| |
| def WaitPid(pid): |
| """Like os.waitpid, but returns -signalnumber or +retcode, and retries.""" |
| while True: |
| try: |
| rpid, rv = os.waitpid(pid, 0) |
| except OSError, e: |
| if e.errno not in (errno.EAGAIN, errno.EINTR): |
| raise |
| assert rpid == pid |
| if rv & 0xff: |
| # killed by signal |
| return -(rv & 0xff) |
| else: |
| # died normally |
| return rv >> 8 |
| |
| |
| def SimplifyCmdline(cmdline): |
| """Given an argv array, try to extract a make target name from it.""" |
| is_make = False |
| words = [] |
| for i in cmdline: |
| if i == 'make' or i.endswith('/make'): |
| is_make = True |
| elif is_make: |
| if not i.startswith('-') and '=' not in i: |
| while i.endswith('/'): |
| i = i[:-1] |
| words.append(i.split('/')[-1]) |
| if not is_make and not words: |
| words.append('%s-%s' % (os.path.basename(os.getcwd()), |
| os.path.basename(cmdline[0]))) |
| if words: |
| return '+'.join(words).replace('\n', ' ') |
| |
| |
| def RunClient(conn, cmdline): |
| """Run the client side of a loglinear session.""" |
| myid = int(os.read(conn.fileno(), 4096).strip()) |
| parent = os.environ.get('LOGLINEAR_PARENT', '0') |
| nice_cmdline = SimplifyCmdline(cmdline) |
| pid = os.fork() |
| if pid: |
| # parent |
| rv = WaitPid(pid) |
| _Write(conn.fileno(), '\n\0EXIT=%d\n' % rv) |
| conn.close() |
| return rv |
| else: |
| # child |
| try: |
| _Write(conn.fileno(), '%s %s\nPWD: %s\nStarting: %s (%s)\n' |
| % (parent, nice_cmdline or '', |
| os.getcwd(), cmdline, nice_cmdline)) |
| os.dup2(conn.fileno(), 1) |
| os.dup2(conn.fileno(), 2) |
| conn.close() |
| os.environ['LOGLINEAR_PARENT'] = str(myid) |
| os.execvp(cmdline[0], cmdline) |
| except: |
| traceback.print_exc() |
| raise |
| finally: |
| os._exit(99) # unrecoverable error #gpylint: disable-msg=W0212 |
| |
| |
| def main(): |
| with open('/dev/null') as new_stdin: |
| os.dup2(new_stdin.fileno(), 0) |
| if len(sys.argv) < 2: |
| sys.stderr.write('Usage: %s <command line>\n' % sys.argv[0]) |
| port = int(os.environ.get('LOGLINEAR_PORT', 0)) |
| if not port: |
| conn, pid = ForkServer() |
| else: |
| pid = 0 |
| conn = socket.socket(socket.AF_INET) |
| conn.connect(('0.0.0.0', port)) |
| |
| rv = 0 |
| try: |
| rv = RunClient(conn, sys.argv[1:]) |
| finally: |
| if pid: |
| rv_server = WaitPid(pid) |
| if rv_server: |
| rv = rv_server |
| return abs(rv) |
| |
| if __name__ == '__main__': |
| sys.exit(main()) |