blob: a3a4ce58c020406f30eeff94bd0d64907cf145af [file] [log] [blame]
 #!/usr/bin/python """Signal Analysis Program. This program takes in a recording of an interference pattern and extracts the frequency, duty-cycle, and square of the signal so that the signal can be regenerated using parameters. """ import sys import matplotlib.pyplot as plt import numpy as np from scipy.signal import decimate from scipy.signal import hilbert from scipy.signal import symiirorder1 import options SAMPLE_RATE = 30e6 DOWNSAMPLE_FACTOR = 1000 MAX_DUTY_PERIOD_USEC = 10000.0 MIN_DUTY_PERIOD = SAMPLE_RATE / DOWNSAMPLE_FACTOR / MAX_DUTY_PERIOD_USEC optspec = """ signal_analyzer [options...] filename -- w,window= Specify the number of samples to be analyzed. [100000000] d,display Display the raw data. o,output= Specify the output filename for the profile. """ def get_active_rms(raw_data, active_indices): """Calculates the rms of the signal when its on.""" square_data = np.square(np.absolute(raw_data)) raw_active_indices = np.repeat(active_indices, DOWNSAMPLE_FACTOR) active_values = square_data[raw_active_indices] # TODO(sunnyd): There's a lot of values here that are not part of the active # signal, need to find a way to parse them out in the future return np.sqrt(np.average(active_values)) def get_period(switch_indices): """Calculates the frequency of the signal peaks.""" period_samples = [] for i in range(0, len(switch_indices) - 1): period_samples.append(switch_indices[i + 1] - switch_indices[i]) return np.median(period_samples) * DOWNSAMPLE_FACTOR / SAMPLE_RATE def get_switch_indices(active_indices): # Locate the indices where the signal is on off_switch_indices = [] on_switch_indices = [] for i in range(len(active_indices) - 1): if active_indices[i + 1] - active_indices[i] > MIN_DUTY_PERIOD: off_switch_indices.append(active_indices[i]) on_switch_indices.append(active_indices[i+1]) return (off_switch_indices, on_switch_indices) def save_profile(raw_data, off_switch_indices, on_switch_indices, filename): """Extract the fourier transform of the signal.""" # TODO(sunnyd): Find an average of many peaks rather than choosing an # arbitrary one. cur_slice = raw_data[off_switch_indices[3] * DOWNSAMPLE_FACTOR : on_switch_indices[3] * DOWNSAMPLE_FACTOR] fourier = np.fft.fft(cur_slice, int(SAMPLE_RATE)) fourier_down = decimate(fourier, DOWNSAMPLE_FACTOR, 2, ftype='fir') # Filter out some of the noise fourier_mean = np.average(np.absolute(fourier_down)) # TODO(sunnyd): 1.5 is a bit arbitrary, need a good metric for this. STD Dev # might work but could filter out a lot of useful signal if the entire band # is high fourier_down[np.absolute(fourier_down) < fourier_mean * 1.5] = 0 np.save(filename, np.array(fourier_down)) def get_duty_cycle(active_bitmap, switch_indices): """Calculate the Duty cycle of the signal.""" duty_cycles = [] for i in range(len(switch_indices) - 1): duty_cycles.append(np.average(active_bitmap[switch_indices[i] : switch_indices[i + 1]])) return np.median(duty_cycles) def main(): o = options.Options(optspec) opt, _, extra = o.parse(sys.argv[1:]) window_size = int(opt.window) if len(extra) != 1: o.fatal('Missing filename or extra arguments') if not opt.output: o.fatal('Missing output option required') f = open(extra[0], 'rb') raw_data = np.fromfile(f, dtype=np.complex64, count=window_size) downsampled_data = decimate(raw_data, DOWNSAMPLE_FACTOR, 2, ftype='fir') hilbert_data = hilbert(np.absolute(downsampled_data)) envelope = symiirorder1(hilbert_data, 1, 0.5) abs_envelope = abs(envelope) if opt.display: display_size = min(opt.window, 10e6) if opt.window > 10e6: sys.stderr.write('Warning: window size too large to display. Using ' 'default value of 10e6\n') plt.plot(downsampled_data[0: display_size].real, color='red', hold=True) plt.plot(abs_envelope) plt.show() # Calculate useful values for processing. mean = np.average(abs_envelope) active_bitmap = abs_envelope > mean active_indices = active_bitmap.nonzero()[0] off_switch_indices, on_switch_indices = get_switch_indices(active_indices) duty = get_duty_cycle(active_bitmap, off_switch_indices) period = get_period(off_switch_indices) rms = get_active_rms(raw_data, active_bitmap) save_profile(raw_data, off_switch_indices, on_switch_indices, opt.output) print 'Duty-Cycle: %f, Period: %fs, Active RMS: %f' % (duty, period, rms) if __name__ == '__main__': main()