blob: a3a4ce58c020406f30eeff94bd0d64907cf145af [file] [log] [blame]
"""Signal Analysis Program.
This program takes in a recording of an interference pattern and extracts the
frequency, duty-cycle, and square of the signal so that the signal can be
regenerated using parameters.
import sys
import matplotlib.pyplot as plt
import numpy as np
from scipy.signal import decimate
from scipy.signal import hilbert
from scipy.signal import symiirorder1
import options
optspec = """
signal_analyzer [options...] filename
w,window= Specify the number of samples to be analyzed. [100000000]
d,display Display the raw data.
o,output= Specify the output filename for the profile.
def get_active_rms(raw_data, active_indices):
"""Calculates the rms of the signal when its on."""
square_data = np.square(np.absolute(raw_data))
raw_active_indices = np.repeat(active_indices, DOWNSAMPLE_FACTOR)
active_values = square_data[raw_active_indices]
# TODO(sunnyd): There's a lot of values here that are not part of the active
# signal, need to find a way to parse them out in the future
return np.sqrt(np.average(active_values))
def get_period(switch_indices):
"""Calculates the frequency of the signal peaks."""
period_samples = []
for i in range(0, len(switch_indices) - 1):
period_samples.append(switch_indices[i + 1] - switch_indices[i])
return np.median(period_samples) * DOWNSAMPLE_FACTOR / SAMPLE_RATE
def get_switch_indices(active_indices):
# Locate the indices where the signal is on
off_switch_indices = []
on_switch_indices = []
for i in range(len(active_indices) - 1):
if active_indices[i + 1] - active_indices[i] > MIN_DUTY_PERIOD:
return (off_switch_indices, on_switch_indices)
def save_profile(raw_data, off_switch_indices, on_switch_indices, filename):
"""Extract the fourier transform of the signal."""
# TODO(sunnyd): Find an average of many peaks rather than choosing an
# arbitrary one.
cur_slice = raw_data[off_switch_indices[3] * DOWNSAMPLE_FACTOR :
on_switch_indices[3] * DOWNSAMPLE_FACTOR]
fourier = np.fft.fft(cur_slice, int(SAMPLE_RATE))
fourier_down = decimate(fourier, DOWNSAMPLE_FACTOR, 2, ftype='fir')
# Filter out some of the noise
fourier_mean = np.average(np.absolute(fourier_down))
# TODO(sunnyd): 1.5 is a bit arbitrary, need a good metric for this. STD Dev
# might work but could filter out a lot of useful signal if the entire band
# is high
fourier_down[np.absolute(fourier_down) < fourier_mean * 1.5] = 0, np.array(fourier_down))
def get_duty_cycle(active_bitmap, switch_indices):
"""Calculate the Duty cycle of the signal."""
duty_cycles = []
for i in range(len(switch_indices) - 1):
duty_cycles.append(np.average(active_bitmap[switch_indices[i] :
switch_indices[i + 1]]))
return np.median(duty_cycles)
def main():
o = options.Options(optspec)
opt, _, extra = o.parse(sys.argv[1:])
window_size = int(opt.window)
if len(extra) != 1:
o.fatal('Missing filename or extra arguments')
if not opt.output:
o.fatal('Missing output option required')
f = open(extra[0], 'rb')
raw_data = np.fromfile(f, dtype=np.complex64, count=window_size)
downsampled_data = decimate(raw_data, DOWNSAMPLE_FACTOR, 2, ftype='fir')
hilbert_data = hilbert(np.absolute(downsampled_data))
envelope = symiirorder1(hilbert_data, 1, 0.5)
abs_envelope = abs(envelope)
if opt.display:
display_size = min(opt.window, 10e6)
if opt.window > 10e6:
sys.stderr.write('Warning: window size too large to display. Using '
'default value of 10e6\n')
plt.plot(downsampled_data[0: display_size].real, color='red', hold=True)
# Calculate useful values for processing.
mean = np.average(abs_envelope)
active_bitmap = abs_envelope > mean
active_indices = active_bitmap.nonzero()[0]
off_switch_indices, on_switch_indices = get_switch_indices(active_indices)
duty = get_duty_cycle(active_bitmap, off_switch_indices)
period = get_period(off_switch_indices)
rms = get_active_rms(raw_data, active_bitmap)
save_profile(raw_data, off_switch_indices, on_switch_indices, opt.output)
print 'Duty-Cycle: %f, Period: %fs, Active RMS: %f' % (duty, period, rms)
if __name__ == '__main__':