blob: c11f65b0fb4831db8d553886c7d4503090299543 [file] [log] [blame]
#!/usr/bin/python
# Copyright 2015 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Signal Analysis Program.
This program takes in a recording of an interference pattern and extracts the
frequency, duty-cycle, and square of the signal so that the signal can be
regenerated using parameters.
"""
import sys
import matplotlib.pyplot as plt
import numpy as np
from scipy.signal import decimate
from scipy.signal import hilbert
from scipy.signal import symiirorder1
import options
SAMPLE_RATE = 30e6
DOWNSAMPLE_FACTOR = 1000
MAX_DUTY_PERIOD_USEC = 10000.0
MIN_DUTY_PERIOD = SAMPLE_RATE / DOWNSAMPLE_FACTOR / MAX_DUTY_PERIOD_USEC
optspec = """
signal_analyzer [options...] filename
--
w,window= Specify the number of samples to be analyzed. [100000000]
d,display Display the raw data.
o,output= Specify the output filename for the profile.
"""
def get_active_rms(raw_data, active_indices):
"""Calculates the rms of the signal when its on."""
square_data = np.square(np.absolute(raw_data))
raw_active_indices = np.repeat(active_indices, DOWNSAMPLE_FACTOR)
active_values = square_data[raw_active_indices]
# TODO(sunnyd): There's a lot of values here that are not part of the active
# signal, need to find a way to parse them out in the future
return np.sqrt(np.average(active_values))
def get_period(switch_indices):
"""Calculates the frequency of the signal peaks."""
period_samples = []
for i in range(0, len(switch_indices) - 1):
period_samples.append(switch_indices[i + 1] - switch_indices[i])
return np.median(period_samples) * DOWNSAMPLE_FACTOR / SAMPLE_RATE
def get_switch_indices(active_indices):
# Locate the indices where the signal is on
off_switch_indices = []
on_switch_indices = []
for i in range(len(active_indices) - 1):
if active_indices[i + 1] - active_indices[i] > MIN_DUTY_PERIOD:
off_switch_indices.append(active_indices[i])
on_switch_indices.append(active_indices[i+1])
return (off_switch_indices, on_switch_indices)
def save_profile(raw_data, off_switch_indices, on_switch_indices, filename):
"""Extract the fourier transform of the signal."""
# TODO(sunnyd): Find an average of many peaks rather than choosing an
# arbitrary one.
cur_slice = raw_data[off_switch_indices[3] * DOWNSAMPLE_FACTOR :
on_switch_indices[3] * DOWNSAMPLE_FACTOR]
fourier = np.fft.fft(cur_slice, int(SAMPLE_RATE))
fourier_down = decimate(fourier, DOWNSAMPLE_FACTOR, 2, ftype='fir')
# Filter out some of the noise
fourier_mean = np.average(np.absolute(fourier_down))
# TODO(sunnyd): 1.5 is a bit arbitrary, need a good metric for this. STD Dev
# might work but could filter out a lot of useful signal if the entire band
# is high
fourier_down[np.absolute(fourier_down) < fourier_mean * 1.5] = 0
np.save(filename, np.array(fourier_down))
def get_duty_cycle(active_bitmap, switch_indices):
"""Calculate the Duty cycle of the signal."""
duty_cycles = []
for i in range(len(switch_indices) - 1):
duty_cycles.append(np.average(active_bitmap[switch_indices[i] :
switch_indices[i + 1]]))
return np.median(duty_cycles)
def main():
o = options.Options(optspec)
opt, _, extra = o.parse(sys.argv[1:])
window_size = int(opt.window)
if len(extra) != 1:
o.fatal('Missing filename or extra arguments')
if not opt.output:
o.fatal('Missing output option required')
f = open(extra[0], 'rb')
raw_data = np.fromfile(f, dtype=np.complex64, count=window_size)
downsampled_data = decimate(raw_data, DOWNSAMPLE_FACTOR, 2, ftype='fir')
hilbert_data = hilbert(np.absolute(downsampled_data))
envelope = symiirorder1(hilbert_data, 1, 0.5)
abs_envelope = abs(envelope)
if opt.display:
display_size = min(opt.window, 10e6)
if opt.window > 10e6:
sys.stderr.write('Warning: window size too large to display. Using '
'default value of 10e6\n')
plt.plot(downsampled_data[0: display_size].real, color='red', hold=True)
plt.plot(abs_envelope)
plt.show()
# Calculate useful values for processing.
mean = np.average(abs_envelope)
active_bitmap = abs_envelope > mean
active_indices = active_bitmap.nonzero()[0]
off_switch_indices, on_switch_indices = get_switch_indices(active_indices)
duty = get_duty_cycle(active_bitmap, off_switch_indices)
period = get_period(off_switch_indices)
rms = get_active_rms(raw_data, active_bitmap)
save_profile(raw_data, off_switch_indices, on_switch_indices, opt.output)
print 'Duty-Cycle: %f, Period: %fs, Active RMS: %f' % (duty, period, rms)
if __name__ == '__main__':
main()