| #!/usr/bin/python |
| # Copyright 2012 Google Inc. All Rights Reserved. |
| # |
| # gpylint function naming pretty much sucks for appengine apps; disable the |
| # offending complaints: |
| #gpylint: disable-msg=C6409 |
| # |
| """A simple app to let users upload files, then other users download them. |
| |
| This service is used by the upload-logs script in Google Fiber's set top box. |
| """ |
| |
| __author__ = 'apenwarr@google.com (Avery Pennarun)' |
| |
| import datetime |
| import logging |
| import os |
| import re |
| import time |
| import urllib |
| import zlib |
| from google.appengine.api import memcache |
| from google.appengine.api import taskqueue |
| from google.appengine.api import users |
| from google.appengine.ext import blobstore |
| from google.appengine.ext import deferred |
| from google.appengine.ext import ndb |
| from google.appengine.ext import webapp |
| from google.appengine.ext.webapp import blobstore_handlers |
| from google.appengine.ext.webapp import template |
| |
| |
| MAX_UNCOMPRESSED_BYTES = 5*1024*1024 # max bytes in a single download |
| MAX_MEM_PER_DOWNLOAD = 10*1024*1024 # max memory for buffering a download |
| MAX_PARALLEL_BLOBS = 64 # max blobstore requests at once |
| |
| BUFSIZE = min(blobstore.MAX_BLOB_FETCH_SIZE, |
| MAX_MEM_PER_DOWNLOAD / MAX_PARALLEL_BLOBS) |
| PARALLEL_BLOBS = min(MAX_MEM_PER_DOWNLOAD / BUFSIZE, MAX_PARALLEL_BLOBS) |
| |
| |
| _path = os.path.dirname(__file__) |
| _memcache = memcache.Client() |
| |
| |
| class _Model(ndb.Model): |
| |
| @property |
| def key_name(self): |
| """Property for the primary key, which you can alias to a member.""" |
| return self.key.id |
| |
| |
| class Machine(_Model): |
| """Represents a machine that uploads files. |
| |
| Eventually we can have user-defined labels etc. for each machine. For now, |
| it's mostly useful as an efficient way of getting a list of distinct |
| machines. |
| """ |
| keymeta = _Model.key_name # key=value string to match in File list |
| modified_time = ndb.DateTimeProperty(auto_now=True) |
| |
| def __repr__(self): |
| return 'Machine(%s)' % self.keymeta() |
| |
| def files(self): |
| return File.query().filter(File.meta == self.keymeta()) |
| |
| @property |
| def nice_modified_time(self): |
| return self.modified_time.strftime('%Y-%m-%d %H:%M:%S UTC') |
| |
| def most_recent_meta(self, sets=None): |
| f = self.files().order(-File.create_time).get() |
| meta = f and f.meta or [] |
| if sets is not None: |
| sets['meta-%s' % self.keymeta()] = meta |
| else: |
| _memcache.set('meta-%s' % self.keymeta(), meta) |
| return meta |
| |
| |
| class File(_Model): |
| """Represents an uploaded file.""" |
| name = ndb.StringProperty() # filename |
| blobid = ndb.BlobKeyProperty() # pointer to gzip'd file content |
| size = ndb.IntegerProperty() # uncompressed file size |
| create_time = ndb.DateTimeProperty(auto_now_add=True) # when it was uploaded |
| meta = ndb.StringProperty(repeated=True) # metadata: key=value strings |
| |
| # We split keys and values here so we index the database by key, and thus |
| # search for all rows with key='panic', ordered by date, etc. |
| flag_keys = ndb.StringProperty(repeated=True) # list of flag names |
| flag_values = ndb.StringProperty(repeated=True) # list of flag values |
| |
| def _get_flags(self): |
| return zip(self.flag_keys, self.flag_values) |
| |
| def _set_flags(self, flags): |
| self.flag_keys = [i[0] for i in flags] |
| self.flag_values = [i[1] for i in flags] |
| |
| flags = property(fget=_get_flags, fset=_set_flags) |
| |
| @property |
| def nice_create_time(self): |
| return self.create_time.strftime('%Y-%m-%d %H:%M:%S UTC') |
| |
| @property |
| def create_time_code(self): |
| return self.create_time.strftime('%Y%m%d-%H%M%S') |
| |
| def __repr__(self): |
| return 'File(%s)' % self.name |
| |
| @classmethod |
| def New(cls, name, blobid, size, meta=None, flags=None): |
| meta = [('%s=%s' % i) for i in (meta or [])] |
| flag_keys = [i[0] for i in flags] |
| flag_values = [i[1] for i in flags] |
| return cls(name=name, blobid=blobid, size=size, meta=meta, |
| flag_keys=flag_keys, |
| flag_values=flag_values) |
| |
| def machine_key(self): |
| """Return a key=value string that uniquely identifies the Machine.""" |
| for kv in self.meta: |
| if kv.startswith('hw='): |
| return kv |
| for kv in self.meta: |
| if kv.startswith('serial='): |
| return kv |
| if self.meta: |
| return self.meta[0] |
| return 'nometa' |
| |
| def machine(self): |
| """Return a Machine object that owns this file; may create one.""" |
| for machine in ndb.get_multi(ndb.Key(Machine, i) for i in self.meta): |
| if machine: |
| return machine |
| # still here? need to create it then. |
| return Machine(id=self.machine_key()) |
| |
| |
| def Render(filename, **kwargs): |
| """A wrapper for template.render that handles some boilerplate. |
| |
| Args: |
| filename: the basename (not the full path) of the template file. |
| **kwargs: additional contents for the rendering dictionary. |
| Returns: |
| The rendered template text. |
| """ |
| return template.render(os.path.join(_path, '.', filename), kwargs) |
| |
| |
| class _Handler(webapp.RequestHandler): |
| """A wrapper for webapp.RequestHandler providing some helper functions.""" |
| Redirect = webapp.RequestHandler.redirect |
| |
| def __init__(self, *args, **kwargs): |
| super(_Handler, self).__init__(*args, **kwargs) |
| self.futures_queue = [] |
| |
| def Write(self, *args): |
| self.response.out.write(*args) |
| |
| def Render(self, filename, **kwargs): |
| self.Write(Render(filename, **kwargs)) |
| |
| def ValidateUser(self): |
| user = users.get_current_user() |
| if not user or not user.email().endswith('@google.com'): |
| raise Exception('invalid user') |
| |
| def PutAsync(self, model): |
| """model.put() in the background. Call self.Sync() to let it finish.""" |
| f = ndb.Model.put_async(model) |
| self.futures_queue.append(f) |
| return f |
| |
| def Sync(self): |
| while self.futures_queue: |
| f = self.futures_queue.pop() |
| f.get_result() # may throw exception |
| |
| |
| def _MachinesHelper(machines): |
| found = _memcache.get_multi((str(m.keymeta()) for m in machines), |
| key_prefix='meta-') |
| sets = {} |
| for m in machines: |
| key = str(m.keymeta()) |
| if key in found: |
| yield m, found[key] |
| else: |
| yield m, m.most_recent_meta(sets=sets) |
| _memcache.set_multi(sets) |
| |
| |
| class ListMachines(_Handler): |
| """Returns a list of available machines.""" |
| |
| def get(self): |
| """HTTP GET handler.""" |
| self.ValidateUser() |
| machines = Machine.query().order(-Machine.modified_time).fetch(1000) |
| self.Render('machines.djt', machines=_MachinesHelper(machines)) |
| |
| |
| def _FilesHelper(files): |
| running_size = 0 |
| end_time = None |
| splitter = False |
| for f in files: |
| if running_size is not None and f.size is not None: |
| running_size += f.size |
| else: # file created before we had a 'size' attribute |
| running_size = None |
| if running_size: |
| yield f, '%.2fk' % (running_size/1024.), end_time, splitter |
| else: |
| yield f, '', end_time, splitter |
| splitter = False |
| for key, _ in f.flags: |
| if key == 'version': # reboot detected |
| end_time = f.create_time_code |
| running_size = f.size |
| splitter = True |
| |
| |
| def FilesForMachine(machine): |
| #gpylint: disable-msg=E1103 |
| files = machine.files().order(-File.create_time).fetch(1000) |
| cached = list(_FilesHelper(files)) |
| _memcache.set('files-%s' % machine.keymeta(), cached) |
| return cached |
| |
| |
| def CachedFilesForMachine(machine): |
| cached = _memcache.get('files-%s' % machine.keymeta()) |
| if not cached: |
| logging.info('Regenerating files list for %r', machine) |
| cached = FilesForMachine(machine) |
| return cached |
| |
| |
| class ListFiles(_Handler): |
| """Returns a list of available files.""" |
| |
| #gpylint: disable-msg=W0221 |
| def get(self, machineid): |
| """HTTP GET handler.""" |
| self.ValidateUser() |
| machine = ndb.Key(Machine, machineid).get() |
| if not machine: |
| return self.error(404) |
| cached = CachedFilesForMachine(machine) |
| self.Render('files.djt', |
| machineid=machineid, |
| files=cached, |
| firstfile=cached and cached[0][0]) |
| |
| |
| def _AsyncBlobReader(blobkey): |
| """Yield a series of content chunks for the given blobid. |
| |
| The first yielded entry is just ''; before yielding it, we start a |
| background blobstore request to get the next block. This means you can |
| construct muple AsyncBlobReaders, call .next() on each one, and then |
| iterate through them, thus reducing latency. |
| |
| Args: |
| blobkey: the blobid of a desired blob. |
| Yields: |
| A sequence of uncompressed data chunks (the contents of the blob) |
| """ |
| ofs = 0 |
| future = blobstore.fetch_data_async(blobkey, ofs, ofs + BUFSIZE - 1) |
| yield '' |
| cacheable = True |
| while future: |
| data = future.get_result() |
| ofs += len(data) |
| if len(data) < BUFSIZE: |
| future = None |
| else: |
| future = blobstore.fetch_data_async(blobkey, ofs, ofs + BUFSIZE - 1) |
| cacheable = False |
| yield data |
| if cacheable: |
| _memcache.set('zblob-%s' % blobkey, data) |
| |
| |
| def _TrivialReader(data): |
| """Works like _AsyncBlobReader, if you already have the data.""" |
| yield '' |
| yield data |
| |
| |
| def _Decompress(dataiter): |
| """Yield a series of un-gzipped content chunks for the given data iterator. |
| |
| This is a bit complicated because we need to make sure "gzip bombs" don't |
| suck up all our RAM. A gzip bomb is a very small file that expands to |
| a very large file (eg. a long series of zeroes). Any single chunk of |
| the file can be a bomb, so we have to decompress carefully. |
| |
| Args: |
| dataiter: an iterator that retrieves the data, one chunk at a time. |
| Yields: |
| A series of uncompressed data chunks. |
| """ |
| dataiter.next() # get it started downloading in the background |
| decomp = zlib.decompressobj() |
| yield '' |
| for data in dataiter: |
| #logging.debug('received %d compressed bytes', len(data)) |
| yield decomp.decompress(data, BUFSIZE) |
| while decomp.unconsumed_tail: |
| yield decomp.decompress(decomp.unconsumed_tail, BUFSIZE) |
| yield decomp.flush(BUFSIZE) |
| while decomp.unconsumed_tail: |
| yield decomp.flush(BUFSIZE) |
| |
| |
| def _DecompressBlob(blobkey): |
| """Yield a series of un-gzipped content chunks for the given blobid.""" |
| return _Decompress(_AsyncBlobReader(blobkey)) |
| |
| |
| def _DecompressBlobs(blobkeys): |
| """Like _DecompressBlob(), but for a sequence of blobkeys. |
| |
| We start prefetching up to PARALLEL_BLOBS blobs at a time for better |
| pipelining. |
| |
| Args: |
| blobkeys: a list of blobid |
| Yields: |
| A sequence of uncompressed data chunks, from each of the blobs in order. |
| """ |
| iters = [] |
| blobkeys = list(str(i) for i in blobkeys) |
| |
| while blobkeys: |
| needed = PARALLEL_BLOBS - len(iters) |
| logging.warn('fetching %d\n', needed) |
| want = blobkeys[:needed] |
| blobkeys[:needed] = [] |
| found = _memcache.get_multi(want, key_prefix='zblob-') |
| for blobkey in want: |
| zblob = found.get(blobkey) |
| if zblob: |
| logging.warn('found: %r\n', blobkey) |
| dataiter = _Decompress(_TrivialReader(zblob)) |
| else: |
| logging.warn('not found: %r\n', blobkey) |
| dataiter = _DecompressBlob(blobkey) |
| dataiter.next() # get it started downloading |
| iters.append(dataiter) |
| while iters: |
| dataiter = iters.pop(0) |
| for data in dataiter: |
| yield data |
| |
| |
| def _AllArgs(req, keys): |
| query = [] |
| if keys: |
| for key in keys: |
| for val in req.get_all(key): |
| query.append((key, val)) |
| return query |
| |
| |
| def _ParseTime(s): |
| if not s: |
| return None |
| try: |
| return datetime.datetime.strptime(s, '%Y%m%d-%H%M%S') |
| except ValueError: |
| return datetime.datetime.strptime(s, '%Y%m%d') |
| |
| |
| class Download(_Handler): |
| """Retrieves a given file (and all its successors) from the blobstore.""" |
| |
| #pylint: disable-msg=W0221 |
| def get(self, metakey): |
| """HTTP GET handler.""" |
| self.ValidateUser() |
| start_time = time.time() |
| machine = ndb.Key(Machine, metakey).get() |
| if not machine: |
| return self.error(404) |
| |
| start = _ParseTime(self.request.get('start')) |
| end = _ParseTime(self.request.get('end')) |
| |
| self.response.headers.add_header('Content-Type', 'text/plain') |
| |
| # AppEngine is annoying and won't just let us serve pre-encoded zlib |
| # encoded files. So let's decompress it and let appengine recompress |
| # it if the client supports it. |
| #gpylint: disable-msg=E1103 |
| q = machine.files().order(File.create_time) |
| if start: |
| q = q.filter(File.create_time >= start) |
| if end: |
| q = q.filter(File.create_time < end) |
| |
| nbytes = 0 |
| blobids = (f.blobid for f in q.fetch(1000)) |
| for data in _DecompressBlobs(blobids): |
| self.Write(data.replace('\0', '')) |
| nbytes += len(data) |
| if nbytes > MAX_UNCOMPRESSED_BYTES: |
| self.Write('\n(stopping after %d bytes)\n' % nbytes) |
| break |
| end_time = time.time() |
| logging.info('Download: %d bytes in %.2f seconds', |
| nbytes, end_time - start_time) |
| |
| |
| def _ScanBlob(blobid): |
| flags = [] |
| size = 0 |
| last_chunk = '' |
| for chunk in _DecompressBlob(blobid): |
| size += len(chunk) |
| for panic in re.findall(r'(?:Kernel panic[^:]*|BUG):\s*(.*)', chunk): |
| flags.append(('panic', panic)) |
| if 'Restarting system.' in chunk: |
| flags.append(('reboot', 'soft')) |
| for ver in re.findall(r'SOFTWARE_VERSION=(\S+)', chunk): |
| flags.append(('version', ver)) |
| for uptime in re.findall(r'\[([\s\d]+\.\d+)\].*\n[^\[]*\[[\s0.]+\]', |
| chunk): |
| uptimev = float(uptime.strip()) |
| if uptimev > 0: |
| flags.append(('uptime_end', str(int(uptimev)))) |
| if chunk: |
| last_chunk = chunk |
| # the last chunk should have the most recent uptime in it |
| uptimes = re.findall(r'\[([\s\d]+\.\d+)\]', last_chunk) |
| if uptimes: |
| uptimev = float(uptimes[-1].strip()) |
| flags.append(('uptime', str(int(uptimev)))) |
| return size, flags |
| |
| |
| class Upload(_Handler, blobstore_handlers.BlobstoreUploadHandler): |
| """Allows the user to upload a file.""" |
| |
| #pylint: disable-msg=W0221 |
| def get(self, filename): |
| """HTTP GET handler. Returns an URL that can be used for uploads.""" |
| path = '/upload/%s' % filename |
| query = _AllArgs(self.request, self.request.arguments()) |
| if query: |
| for key in dict(query).iterkeys(): |
| query.append(('_', key)) |
| path += '?' + urllib.urlencode(query) |
| url = blobstore.create_upload_url(path) |
| self.Write(url) |
| |
| #pylint: disable-msg=W0221 |
| def post(self, filename): |
| """HTTP POST handler.""" |
| uploads = list(self.get_uploads('file')) |
| if len(uploads) != 1: |
| logging.error('Exactly one attachment expected; got %d.', |
| len(uploads)) |
| self.error(500) |
| meta = _AllArgs(self.request, self.request.get_all('_')) |
| blobid = uploads[0].key() |
| size, flags = _ScanBlob(blobid) |
| f = File.New(name=filename, blobid=blobid, size=size, meta=meta, |
| flags=flags) |
| self.PutAsync(f) |
| machine = f.machine() |
| self.PutAsync(machine) |
| self.Sync() |
| machine.most_recent_meta() # populate the memcache pre-emptively |
| _memcache.delete('files-%s' % machine.keymeta()) |
| self.Redirect('/') |
| |
| |
| class Regen(_Handler): |
| """Make sure all objects are indexed correctly for latest schema.""" |
| |
| def get(self): |
| """HTTP GET handler.""" |
| self.response.headers.add_header('Content-Type', 'text/plain') |
| |
| def _handle(machinekeys): |
| count = 0 |
| keys = machinekeys.keys() |
| machines = ndb.get_multi(ndb.Key(Machine, key) for key in keys) |
| for key, machine in zip(keys, machines): |
| if not machine: |
| self.PutAsync(Machine(id=key)) |
| count += 1 |
| machinekeys.clear() |
| self.Write('created %d\n' % count) |
| self.Sync() |
| |
| machinekeys = {} |
| for f in File.query().order(-File.create_time): |
| f.size, f.flags = _ScanBlob(f.blobid) |
| self.PutAsync(f) |
| machinekeys[f.machine_key()] = 1 |
| if len(machinekeys) > 500: |
| _handle(machinekeys) |
| _handle(machinekeys) |
| self.Write('ok\n') |
| self.Sync() |
| |
| |
| class StartRegen(_Handler): |
| """Start a Regen operation using the TaskQueue, which has a long timeout.""" |
| |
| def get(self): |
| """HTTP GET handler.""" |
| taskqueue.add(url='/_regen', method='GET') |
| |
| |
| def _Query(meta): |
| return (File.query() |
| .filter(File.meta == meta) |
| .order(-File.create_time) |
| .iter(keys_only=True, batch_size=1000)) |
| |
| |
| class Query(_Handler): |
| """Return a list of all File keys containing the given metadata.""" |
| |
| def get(self, meta): |
| """HTTP GET handler.""" |
| self.response.headers.add_header('Content-Type', 'text/plain') |
| for k in _Query(meta): |
| self.Write('%s\n' % k) |
| |
| |
| class DeleteAll(_Handler): |
| """Delete all File keys containing the given metadata.""" |
| |
| def get(self, meta): |
| """HTTP GET handler.""" |
| ndb.delete_multi(_Query(meta)) |
| self.Write('ok\n') |
| |
| |
| wsgi_app = webapp.WSGIApplication([ |
| ('/', ListMachines), |
| ('/_regen', Regen), |
| ('/_start_regen', StartRegen), |
| ('/_query/(.+)', Query), |
| ('/_deleteall/(.+)', DeleteAll), |
| ('/upload/(.+)', Upload), |
| ('/([^/]+)/', ListFiles), |
| ('/([^/]+)/log', Download), |
| ], debug=True) |