/*
 * Copyright 2012-2014 Google Inc. All rights reserved.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

/*
 * A program that reads log messages from stdin, processes them, and writes
 * them to /dev/kmsg (usually) or stdout (if LOGOS_DEBUG=1).
 *
 * Features:
 *  - limits the number of log message bytes per second.
 *  - writes only entire lines at a time in a single syscall, to keep the
 *    kernel from overlapping messages from other threads/instances.
 *  - cleans up control characters (ie. chars < 32).
 *  - makes sure output lines are in "facility: message" format.
 *  - doesn't rely on syslogd.
 */
#include <assert.h>
#include <ctype.h>
#include <errno.h>
#include <fcntl.h>
#include <signal.h>
#include <stdio.h>
#include <string.h>
#include <stdint.h>
#include <stdlib.h>
#include <sys/uio.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <time.h>
#include <unistd.h>
#ifndef COMPILE_FOR_HOST
#include <stacktrace.h>
#endif  // COMPILE_FOR_HOST

#include "utils.h"


// Total size of kernel log buffer.
// We use CONFIG_PRINTK_PERSIST in the kernel to keep our log buffer across
// reboots, then configure the kernel buffer to be extra large, then dump
// *both* kernel and userspace messages into it.  This gives us a clearly
// timestamped log of all events across the whole system.
// The kernel log buffer size is actually set by the log_buf_len kernel
// parameter; if you change it to be <= BURST_LOG_SIZE, please change it
// here too.
#define BURST_LOG_SIZE     (10*1000LL*1000LL)

// Maximum bytes to log per day.
// This limit reflects our server-side quota (and is also enforced server
// side).  We need to know it client-side in order to calculate the right
// default bucket size so we never run into the server-side quota
// unexpectedly.
#define DAILY_LOG_SIZE     (100*1000LL*1000LL)

// Amount of time between system-wide log uploads.
// (The system might actually upload more of than this, which is harmless.
// If it uploads less often, we risk an overflow, because we're calculating
// our bucket sizes based on this amount.)
#define SECS_PER_BURST     300

// Amount of time in daily bucket.
// (That is, DAILY_LOG_SIZE is a limit reflecting this many seconds.)
#define SECS_PER_DAY       (24*60*60)

// Worst-case number of programs bursting out of control at once
#define MAX_BURSTING_APPS  10

// Worst-case number of programs maxing out the daily byte counter
#define MAX_DAILY_APPS     20

// Default bytes per burst period
#define DEFAULT_BYTES_PER_BURST  (BURST_LOG_SIZE / MAX_BURSTING_APPS)

// Default bytes per day
#define DEFAULT_BYTES_PER_DAY  (DAILY_LOG_SIZE / MAX_DAILY_APPS)

// This is arbitrary.  It matters more when using syslogd (which
// has pretty strict limits) but we could make this arbitrarily large
// if we really wanted to allow obscenely long lines.  Anything larger
// than th minimum bucket size makes no sense, of course.
#define MAX_LINE_LENGTH    768


enum BucketIds {
  B_BURST = 0,     // fast, small bucket (per-cycle limit; allows bursts)
  B_DAILY,         // slow, big bucket (per-day limit)
  B_WARNING,       // slow, small bucket (warns if you've made a burst)
  NUM_BUCKETS
};


enum BucketType {
  BT_INFORMATIONAL = 0,
  BT_MANDATORY = 1,
};


struct Bucket {
  char *name;           // short name of this bucket
  char *msg_start;      // message when bucket is first exceeded
  char *msg_end;        // message when bucket has some space again
  enum BucketType type; // controls whether this bucket causes drops
  ssize_t max_bytes;    // maximum bytes in this bucket when it's full
  ssize_t fill_rate;    // bytes added to this bucket per sec when not full
  ssize_t available;    // bytes currently in this bucket (<= max_bytes)
  int num_skipped;      // number of messages skipped because of this bucket
} buckets[NUM_BUCKETS] = {
  // B_BURST
  {
    "burst",
    "W: burst limit: dropping messages to prevent overflow (%d bytes/sec).",
    "W: burst limit: %d messages were dropped.",
    BT_MANDATORY,
    0, 0, 0, 0,
  },
  // B_DAILY
  {
    "daily",
    "W: daily limit: dropping messages (%d bytes/sec).",
    "W: daily limit: %d messages were dropped.",
    BT_MANDATORY,
    0, 0, 0, 0,
  },
  // B_WARNING
  {
    "warning",
    "I: burst notice: this log rate is unsustainable (%d bytes/sec).",
    "I: burst notice: %d messages would have been dropped.",
    BT_INFORMATIONAL,
    0, 0, 0, 0,
  },
};


static int debug = 0, want_unlimited_mode = 0, unlimited_mode = 0;
static char **g_argv = NULL;


// Returns 1 if 's' starts with 'contains' (which is null terminated).
static int startswith(const void *s, const char *contains) {
  return strncasecmp(s, contains, strlen(contains)) == 0;
}


// However, we want to allow short-term bursts of more bytes, with a lower
// average when taken over the course of a longer time period.  So we
// actually need two token buckets: a "burst" bucket (to control short term
// burstiness so we don't overflow the local buffer) and a "daily" bucket
// (to control the long term average so we don't overflow the remote
// server's quota).
static void init_buckets(ssize_t bytes_per_burst, ssize_t bytes_per_day) {
  // Divide by 2 is just in case we go two cycles between successful log
  // uploads; we want to allow for 2x the buffer usage in that case.
  // Note that this algorithm still isn't perfect: if your program times
  // things exactly right, it could have a full bucket at the beginning
  // of a cycle, empty it out, then it would refill at fill_rate throughout
  // the cycle, allowing more than max_bytes to be written during a given
  // cycle.  I hope this is sufficiently rare that we don't have to pessimize
  // the bucket sizes just to deal with this almost-never occurrence, but it's
  // still worrisome that the condition can exist at all.
  //
  // We initialize buckets with available > 0 to allow for bursts
  // of messages at startup time (which is a common time to want to log
  // logs of stuff).
  buckets[B_BURST].max_bytes = bytes_per_burst / 2;
  buckets[B_BURST].fill_rate = buckets[B_BURST].max_bytes / SECS_PER_BURST;
  buckets[B_BURST].available = buckets[B_BURST].max_bytes / 2;

  // max_bytes divide by 2 not needed here because not affected by uploads.
  buckets[B_DAILY].max_bytes = bytes_per_day;
  buckets[B_DAILY].fill_rate = buckets[B_DAILY].max_bytes / SECS_PER_DAY;
  buckets[B_DAILY].available = buckets[B_DAILY].max_bytes / 2;

  // The warning bucket goes off if you would have emptied the slow (daily)
  // bucket, had it been as small as the burst bucket.  Basically, this
  // triggers a message when you are relying on the short term "burst"
  // feature, giving you early warning that if you keep this up, you will
  // eventually exceed the daily bucket and your bandwidth will be cut.
  // It doesn't actually prevent you from writing anything though.
  buckets[B_WARNING].max_bytes = buckets[B_BURST].max_bytes;
  buckets[B_WARNING].fill_rate = buckets[B_DAILY].fill_rate;
  buckets[B_WARNING].available = buckets[B_BURST].available;
}


static void _flush_unlimited(uint8_t *header, ssize_t headerlen,
                             const uint8_t *buf, ssize_t len) {
  ssize_t total = headerlen + len + 1;
  struct iovec iov[] = {
    { header, headerlen },
    { (uint8_t *)buf, len },
    { "\n", 1 },
  };
  uint8_t lvl;

  assert(headerlen > 3);
  assert(header[0] == '<');
  assert(header[2] == '>');

  if (startswith(buf, "weird:") ||
      startswith(buf, "fatal:") ||
      startswith(buf, "critical:")) {
    lvl = '2';
  } else if (startswith(buf, "e:") ||
             startswith(buf, "error:")) {
    lvl = '3';
  } else if (startswith(buf, "w:") ||
             startswith(buf, "warning:")) {
    lvl = '4';
  } else if (startswith(buf, "n:") ||
             startswith(buf, "notice:")) {
    lvl = '5';
  } else if (startswith(buf, "i:") ||
             startswith(buf, "info:")) {
    lvl = '6';
  } else {
    // default is debug
    lvl = '7';
  }
  header[1] = lvl;  // header starts with <x>; replace the x

  ssize_t wrote = writev(1, iov, sizeof(iov)/sizeof(iov[0]));
  if (wrote >= 0 && wrote < total) {
    // should never happen because stdout should be non-blocking
    fprintf(stderr, "WEIRD: logos: writev(%zd) returned %zd\n", total, wrote);
    // not fatal
  } else if (wrote < 0) {
    perror("logos: writev");
    // not fatal
  }
}


// Returns the kernel monotonic timestamp in milliseconds.
static long long mstime(void) {
  struct timespec ts;
  if (clock_gettime(CLOCK_MONOTONIC, &ts) < 0) {
    perror("logos: clock_gettime");
    exit(7); // really should never happen, so don't try to recover
  }
  return ts.tv_sec * 1000LL + ts.tv_nsec / 1000000;
}


static long long last_add_time;
static int skipping, backoff = 10*1000 / 2;
static void maybe_fill_buckets(void) {
  long long now = mstime(), tdiff;
  int i;

  if (!last_add_time) {
    // buckets always start out half-full, particularly because programs tend
    // to spew a lot of content at startup.  Also, last_add_time gets
    // reset to 0 when we enable/disable unlimited_mode, so the buckets
    // refill.
    last_add_time = now;
    for (i = 0; i < NUM_BUCKETS; i++) {
      buckets[i].available = buckets[i].max_bytes / 2;
    }
  } else {
    tdiff = now - last_add_time;

    // only update last_add_time if we added any bytes.  Otherwise there's
    // an edge case where if bytes_per_millisecond is < 1.0 and there's
    // a message every millisecond, we'd never add to the bucket.
    //
    // Also, if we had to start dropping messages, wait for a minimal
    // filling of the bucket so we don't just constantly toggle between
    // empty/nonempty.  It's more useful to show fewer uninterrupted bursts
    // of messages than just one message here and there.
    if ((!skipping && tdiff >= 1000) || (skipping && tdiff >= backoff)) {
      for (int i = 0; i < NUM_BUCKETS; i++) {
        long long add = tdiff * buckets[i].fill_rate / 1000;
        assert(add >= 0);
        buckets[i].available += add;
        if (buckets[i].available > buckets[i].max_bytes) {
          buckets[i].available = buckets[i].max_bytes;
        }
      }
      last_add_time = now;
    }
  }
}


static int all_buckets_have_room(uint8_t *header, ssize_t headerlen,
                                 ssize_t total) {
  int all_ok = 1, now_skipping = 0;
  for (int i = 0; i < NUM_BUCKETS; i++) {
    if (buckets[i].available >= total || unlimited_mode) {
      if (buckets[i].num_skipped) {
        char tmp[1024];
        ssize_t n = snprintf(tmp, sizeof(tmp),
                             buckets[i].msg_end, buckets[i].num_skipped);
        _flush_unlimited(header, headerlen, (uint8_t *)tmp, n);
        buckets[i].num_skipped = 0;
      }
      // in unlimited_mode this could go negative; that's ok
      buckets[i].available -= total;
    } else {
      if (!buckets[i].num_skipped) {
        char tmp[1024];
        ssize_t n = snprintf(tmp, sizeof(tmp),
                             buckets[i].msg_start, buckets[i].fill_rate);
        _flush_unlimited(header, headerlen, (uint8_t *)tmp, n);
        buckets[i].available = 0;
        if (!now_skipping && !skipping) backoff *= 2;
        if (backoff > 120*1000) backoff = 120*1000;
      }
      now_skipping = 1;
      buckets[i].num_skipped++;
      switch (buckets[i].type) {
        case BT_MANDATORY:
          all_ok = 0;
          break;
        case BT_INFORMATIONAL:
          break;
      }
    }
  }
  skipping = now_skipping;
  return all_ok;
}


// This implements the rate limiting using a token bucket algorithm.
static void _flush_ratelimited(uint8_t *header, ssize_t headerlen,
                               uint8_t *buf, ssize_t len) {
  ssize_t total = headerlen + len + 1;

  if (debug) {
    char buf[1024], *p = buf;
    assert(sizeof(buf) >= 100 * NUM_BUCKETS);
    p += sprintf(p, "logos: ");
    for (int i = 0; i < NUM_BUCKETS; i++) {
      p += sprintf(p, "%s=%zd ", buckets[i].name, buckets[i].available);
      assert(p < buf + sizeof(buf));
      assert(p < buf + 100*(i+1));
    }
    p += sprintf(p, "want=%zd\n", total);
    fputs(buf, stderr);
  }

  maybe_fill_buckets();

  if (all_buckets_have_room(header, headerlen, total)) {
    _flush_unlimited(header, headerlen, buf, len);
  }
}


// This SIGHUP handler is needed for the unit test, but it may occasionally
// be useful in real life too, in case rate limiting kicks in and you really
// want to see what's going on this instant.
static void refill_ratelimiter(int sig) {
  last_add_time = 0;
}


// SIGUSR1 disables the rate limit entirely, for debugging on test devices
static void disable_ratelimit(int sig) {
  want_unlimited_mode = 1;
}


// SIGUSR2 does the opposite of SIGUSR1.  We could make SIGUSR1 a toggle
// instead, but this way you can just do 'pkill -USR1 logos' and make sure
// all the processes have log limits disabled, where a toggle would leave you
// uncertain.
static void enable_ratelimit(int sig) {
  want_unlimited_mode = 0;
}


// strlen is not async-safe, supply one which is.
static size_t my_strlen(const char *string) {
  size_t i;
  for (i = 0; string[i] != '\0'; ++i);
  return i;
}

// We don't have a way to babysit logos externally, as it is in
// a pipe from some other process. Make it try again if it fails.
static void rejuvinate_process(int sig) {
  char *restart = "<2>logos: restarting on fatal signal\n";
  char *giveup = "<2>logos: Cannot find logos binary to exec\n";
  size_t unused __attribute__((unused));
  unused = write(1, restart, my_strlen(restart));

  // execvp is not async-signal safe, so check likely paths.
  execve("/bin/logos", g_argv, environ);
  execve("/usr/bin/logos", g_argv, environ);
  execve("/sbin/logos", g_argv, environ);
  execve("/usr/sbin/logos", g_argv, environ);
  unused = write(1, giveup, my_strlen(giveup));
  exit(99);
}

// Return a malloc()ed buffer that's a copy of buf, with a terminating
// nul and control characters replaced by printable characters.
static uint8_t *fix_buf(uint8_t *buf, ssize_t len) {
  uint8_t *outbuf = malloc(len * 8 + 1), *inp, *outp;
  if (!outbuf) {
    perror("logos: allocating memory");
    return NULL;
  }
  for (inp = buf, outp = outbuf; inp < buf + len; inp++) {
    if (*inp >= 32 || *inp == '\n') {
      *outp++ = *inp;
    } else if (*inp == '\t') {
      // align tabs (ignoring prefixes etc) for nicer-looking output
      do {
        *outp++ = ' ';
      } while ((outp - outbuf) % 8 != 0);

    } else if (*inp == '\r') {
      // just ignore CR characters
    } else {
      snprintf((char *)outp, 5, "\\x%02x", (int)*inp);
      outp += 4;
    }
  }
  *outp = '\0';
  return outbuf;
}


static void flush(uint8_t *header, ssize_t headerlen,
                  uint8_t *buf, ssize_t len) {
  // We can assume the header doesn't have any invalid bytes in it since
  // it'll tend to be a hardcoded string.  We also pass through chars >=
  // 128 without validating that they're correct utf-8, just in case seeing
  // the verbatim values helps someone sometime.
  uint8_t *p;
  for (p = buf; p < buf + len; p++) {
    if (*p < 32 && *p != '\n') {
      p = fix_buf(buf, len);
      if (p) {
        _flush_ratelimited(header, headerlen, p, strlen((char *)p));
        free(p);
      }
      return;
    }
  }
  // if we get here, there were no special characters
  _flush_ratelimited(header, headerlen, buf, len);
}


static void usage(void) {
  fprintf(stderr,
      "Usage: [LOGOS_DEBUG=1] logos <facilityname> [bytes/burst] [bytes/day]\n"
      "  Copies logs from stdin to /dev/kmsg, formatting them to be\n"
      "  suitable for /dev/kmsg. If LOGOS_DEBUG is >= 1, writes to\n"
      "  stdout instead.\n"
      "  \n"
      "  Default bytes/burst = %ld - use 0 (for default) if possible.\n"
      "  Default bytes/day = %ld - use 0 (for default) if possible.\n"
      "  Signals:\n"
      "    SIGHUP: refill the token buckets once.\n"
      "    SIGUSR1: disable rate limiting.\n"
      "    SIGUSR2: re-enable rate limiting.\n"
      "    Example: pkill -USR1 logos  -- disables rate limit on all logos.\n",
      (long)DEFAULT_BYTES_PER_BURST, (long)DEFAULT_BYTES_PER_DAY);
  exit(99);
}

int main(int argc, char **argv) {
  static uint8_t overlong_warning[] =
      "W: previous log line was split. Use shorter lines.";
  static uint8_t now_unlimited[] =
      "W: SIGUSR1: rate limit disabled.";
  static uint8_t now_limited[] =
      "W: SIGUSR2: rate limit re-enabled.";
  const char *disable_limits_file = "/fiber/config/disable-log-limits";
  uint8_t buf[MAX_LINE_LENGTH], *header;
  ssize_t used = 0, got, headerlen;
  int overlong = 0;

  {
    char *p = getenv("LOGOS_DEBUG");
    if (p) {
      debug = atoi(p);
    }
  }

  if (argc < 2 || argc > 4) {
    usage();
  }

  // remove underscores form the facility name
  strip_underscores(argv[1]);
  if (strlen(argv[1]) == 0) {
    fprintf(stderr, "logos: facility name was empty, or all underscores.\n");
    return 1;
  }

#ifndef COMPILE_FOR_HOST
  stacktrace_setup();
#endif  // COMPILE_FOR_HOST
  g_argv = argv;
  signal(SIGHUP, refill_ratelimiter);
  signal(SIGUSR1, disable_ratelimit);
  signal(SIGUSR2, enable_ratelimit);
  signal(SIGILL, rejuvinate_process);
  signal(SIGBUS, rejuvinate_process);
  signal(SIGSEGV, rejuvinate_process);

  headerlen = 3 + strlen(argv[1]) + 1 + 1; // <x>, fac, :, space
  header = malloc(headerlen + 1);
  if (!header) {
    perror("logos: allocating memory");
    return 5;
  }
  snprintf((char *)header, headerlen + 1, "<x>%s: ", argv[1]);

  ssize_t bytes_per_burst = DEFAULT_BYTES_PER_BURST;
  if (argc > 2) {
    bytes_per_burst = atoll(argv[2]);
  }
  if (!bytes_per_burst) {
    bytes_per_burst = DEFAULT_BYTES_PER_BURST;
  }
  if (bytes_per_burst < SECS_PER_BURST * 2) {
    fprintf(stderr, "logos: bytes-per-burst (%s) must be an int >= %d\n",
            argv[2], (int)SECS_PER_BURST * 2);
    return 6;
  }

  ssize_t bytes_per_day = 0;
  if (argc > 3) {
    bytes_per_day = atoll(argv[3]);
  }
  if (!bytes_per_day) {
    bytes_per_day = DEFAULT_BYTES_PER_DAY;
  }
  if (bytes_per_day < SECS_PER_DAY) {
    fprintf(stderr, "logos: bytes-per-day (%s) must be an int >= %d\n",
            argv[2], (int)SECS_PER_DAY);
    return 6;
  }
  init_buckets(bytes_per_burst, bytes_per_day);

  struct stat fst;
  if (stat(disable_limits_file, &fst) == 0) {
    want_unlimited_mode = 1;
  }

  if (!debug) {
    int fd = open("/dev/kmsg", O_WRONLY);
    if (fd < 0) {
      perror("logos: /dev/kmsg");
      return 3;
    }
    dup2(fd, 1);  // make it stdout
    dup2(fd, 2);  // and stderr too
    close(fd);

    // Chdir to / so that we don't prevent filesystems from unmounting just
    // because we happened to be in that directory while starting a long-running
    // task.
    if (chdir("/") != 0) {
      perror("logos: chdir /");
      return 3;
    }
  }

  while (1) {
    if (unlimited_mode != want_unlimited_mode) {
      // we delay setting these variables until this point, in order to avoid
      // race conditions caused by changing unlimited_mode and last_add_time
      // inside a signal handler.
      unlimited_mode = want_unlimited_mode;
      last_add_time = 0;
      if (unlimited_mode) {
        _flush_unlimited(header, headerlen,
                         now_unlimited, strlen((char *)now_unlimited));
      } else {
        _flush_unlimited(header, headerlen,
                         now_limited, strlen((char *)now_limited));
      }
    }
    if (used == sizeof(buf)) {
      flush(header, headerlen, buf, used);
      overlong = 1;
      used = 0;
    }
    got = read(0, buf + used, sizeof(buf) - used);
    if (got == 0) {
      if (used > 0) {
        /* Only output if there is text in the buffer, avoid
         * printing a blank line when a process exits. */
        flush(header, headerlen, buf, used);
      }
      goto done;
    } else if (got < 0) {
      if (errno != EINTR && errno != EAGAIN) {
        flush(header, headerlen, buf, used);
        return 1;
      }
    } else {
      uint8_t *start = buf, *next = buf + used, *end = buf + used + got, *p;
      while ((p = memchr(next, '\n', end - next)) != NULL) {
        ssize_t linelen = p - start;
        flush(header, headerlen, start, linelen);
        if (overlong) {
          // that flush() was the first newline after buffer length
          // exceeded, which means the end of the overly long line.  Let's
          // print a warning about it.
          flush(header, headerlen,
                overlong_warning, strlen((char *)overlong_warning));
          overlong = 0;
        }
        start = next = p + 1;
      }
      used = end - start;
      memmove(buf, start, used);
    }
  }

done:
  free(header);
  return 0;
}
