| #!/usr/bin/python |
| |
| # |
| # Talk to a GwInstek GPD-4303s benchtop power supply |
| # |
| |
| import time |
| import serial |
| import getopt |
| import sys |
| import subprocess |
| import re |
| import curses |
| |
| def findDevice(): |
| p = subprocess.Popen("dmesg", stdout = subprocess.PIPE, stderr = subprocess.PIPE) |
| pout, perr = p.communicate() |
| device = None |
| for line in pout.split("\n"): |
| if 'FTDI USB Serial Device converter now attached' in line: |
| device = '/dev/' + line.split()[-1] |
| return device |
| |
| def parseArgs(cfg): |
| try: |
| opts, args = getopt.getopt(sys.argv[1:], "hD:v", ["help", "device=", "port1=", "port2=", "port3=", "port4="]) |
| except getopt.GetoptError, err: |
| print str(err) |
| print usage() |
| sys.exit(2) |
| |
| device = None |
| for o, a in opts: |
| if o in ("-h", "--help"): |
| print usage() |
| sys.exit(0) |
| elif o in ("-D", "--device"): |
| device = a |
| elif o in ("--port1", "--port2", "--port3", "--port4"): |
| port = int(o[6:]) |
| cfg['port'][port]['name'] = a |
| else: |
| assert False, "unhandled option" |
| |
| if device == None: |
| device = findDevice() |
| if device == None: |
| print "Could not find serial device. Try -D argument" |
| sys.exit(2) |
| |
| return device |
| |
| def numQuery(ser, string): |
| ser.write(string + "\n") |
| l = ser.readline() |
| # return only the number |
| return float(re.compile('[^\\d\\.]').sub('', l)) |
| |
| def getCurrent(ser, port): |
| return numQuery(ser, "IOUT" + str(port) + "?") |
| |
| def getVoltage(ser, port): |
| return numQuery(ser, "VOUT" + str(port) + "?") |
| |
| def setup(ser, port, voltage): |
| ser.write("VSET" + str(port) + ":" + str(voltage) + "\n") |
| |
| def enableOutput(ser): |
| ser.write("OUT1\n") |
| |
| def getavg(samples, now, seconds): |
| r = [w for (t, w) in samples if t + seconds >= now] |
| return (seconds, sum(r), len(r)) |
| |
| def getPort(cfg, ser, port): |
| voltage = cfg['port'][port]['voltage'] |
| samples = cfg['port'][port]['samples'] |
| history = cfg['history'] |
| try: |
| current = getCurrent(ser, port) |
| watts = voltage * current |
| now = time.time() |
| samples.append((now, watts)) |
| samples = [(t, w) for (t, w) in samples if t + max(history) >= now] |
| timeaverages = [(0, watts, 1)] + [getavg(samples, now, s) for s in history] |
| cfg['port'][port]['samples'] = samples |
| return ', '.join(["%ds: %1.4fW (%d)" % (s, w / l, l) for (s, w, l) in timeaverages]) |
| except: |
| return '' |
| |
| |
| def runLoop(cfg, ser, stdscr): |
| for port in cfg['ports']: |
| cfg['port'][port]['samples'] = [] |
| running = True |
| while running: |
| stdscr.addstr(0, 0, cfg['id'] + ' (press \'q\' to quit)') |
| for port in cfg['ports']: |
| portname = cfg['port'][port]['name'] |
| s = getPort(cfg, ser, port) |
| stdscr.addstr(port, 0, portname + ": " + s) |
| c = stdscr.getch() |
| if c in [27, ord('q')]: |
| running = False |
| stdscr.refresh() |
| |
| def initCurses(): |
| stdscr = curses.initscr() |
| curses.noecho() |
| curses.cbreak() |
| curses.noqiflush() |
| stdscr.nodelay(True) |
| return stdscr |
| |
| def exitCurses(): |
| curses.endwin() |
| |
| def run(cfg, ser): |
| stdscr = initCurses() |
| for port in cfg['ports']: |
| setup(ser, port, cfg['port'][port]['voltage']) |
| enableOutput(ser) |
| runLoop(cfg, ser, stdscr) |
| exitCurses() |
| |
| def runSerial(cfg, device): |
| ser = serial.Serial( |
| port = device, |
| baudrate = 9600, |
| parity = serial.PARITY_NONE, |
| stopbits = serial.STOPBITS_ONE, |
| bytesize = serial.EIGHTBITS, |
| timeout = 2) |
| ser.open() |
| if ser.isOpen(): |
| ser.write("*IDN?\n") |
| id = ser.readline() |
| if "GW INSTEK" in id: |
| print "Connected to: " + id, |
| cfg['id'] = id.strip() |
| run(cfg, ser) |
| else: |
| print "Power supply not found on " + device |
| ser.close() |
| |
| if __name__ == '__main__': |
| cfg = {} |
| cfg['port'] = {} |
| cfg['ports'] = [1, 2] |
| cfg['history'] = [1, 10, 60] #seconds |
| for port in cfg['ports']: |
| cfg['port'][port] = {} |
| cfg['port'][port]['voltage'] = 5.000 |
| cfg['port'][port]['name'] = 'port' + str(port) |
| device = parseArgs(cfg) |
| runSerial(cfg, device) |
| |