| #!/usr/bin/python |
| # Copyright 2015 Google Inc. All Rights Reserved. |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| # |
| """Signal Analysis Program. |
| |
| This program takes in a recording of an interference pattern and extracts the |
| frequency, duty-cycle, and square of the signal so that the signal can be |
| regenerated using parameters. |
| """ |
| import sys |
| |
| import matplotlib.pyplot as plt |
| import numpy as np |
| from scipy.signal import decimate |
| from scipy.signal import hilbert |
| from scipy.signal import symiirorder1 |
| |
| import options |
| |
| SAMPLE_RATE = 30e6 |
| DOWNSAMPLE_FACTOR = 1000 |
| MAX_DUTY_PERIOD_USEC = 10000.0 |
| MIN_DUTY_PERIOD = SAMPLE_RATE / DOWNSAMPLE_FACTOR / MAX_DUTY_PERIOD_USEC |
| optspec = """ |
| signal_analyzer [options...] filename |
| -- |
| w,window= Specify the number of samples to be analyzed. [100000000] |
| d,display Display the raw data. |
| o,output= Specify the output filename for the profile. |
| """ |
| |
| |
| def get_active_rms(raw_data, active_indices): |
| """Calculates the rms of the signal when its on.""" |
| square_data = np.square(np.absolute(raw_data)) |
| raw_active_indices = np.repeat(active_indices, DOWNSAMPLE_FACTOR) |
| active_values = square_data[raw_active_indices] |
| |
| # TODO(sunnyd): There's a lot of values here that are not part of the active |
| # signal, need to find a way to parse them out in the future |
| return np.sqrt(np.average(active_values)) |
| |
| |
| def get_period(switch_indices): |
| """Calculates the frequency of the signal peaks.""" |
| period_samples = [] |
| for i in range(0, len(switch_indices) - 1): |
| period_samples.append(switch_indices[i + 1] - switch_indices[i]) |
| return np.median(period_samples) * DOWNSAMPLE_FACTOR / SAMPLE_RATE |
| |
| |
| def get_switch_indices(active_indices): |
| # Locate the indices where the signal is on |
| off_switch_indices = [] |
| on_switch_indices = [] |
| for i in range(len(active_indices) - 1): |
| if active_indices[i + 1] - active_indices[i] > MIN_DUTY_PERIOD: |
| off_switch_indices.append(active_indices[i]) |
| on_switch_indices.append(active_indices[i+1]) |
| return (off_switch_indices, on_switch_indices) |
| |
| |
| def save_profile(raw_data, off_switch_indices, on_switch_indices, filename): |
| """Extract the fourier transform of the signal.""" |
| # TODO(sunnyd): Find an average of many peaks rather than choosing an |
| # arbitrary one. |
| cur_slice = raw_data[off_switch_indices[3] * DOWNSAMPLE_FACTOR : |
| on_switch_indices[3] * DOWNSAMPLE_FACTOR] |
| fourier = np.fft.fft(cur_slice, int(SAMPLE_RATE)) |
| fourier_down = decimate(fourier, DOWNSAMPLE_FACTOR, 2, ftype='fir') |
| |
| # Filter out some of the noise |
| fourier_mean = np.average(np.absolute(fourier_down)) |
| |
| # TODO(sunnyd): 1.5 is a bit arbitrary, need a good metric for this. STD Dev |
| # might work but could filter out a lot of useful signal if the entire band |
| # is high |
| fourier_down[np.absolute(fourier_down) < fourier_mean * 1.5] = 0 |
| np.save(filename, np.array(fourier_down)) |
| |
| |
| def get_duty_cycle(active_bitmap, switch_indices): |
| """Calculate the Duty cycle of the signal.""" |
| duty_cycles = [] |
| for i in range(len(switch_indices) - 1): |
| duty_cycles.append(np.average(active_bitmap[switch_indices[i] : |
| switch_indices[i + 1]])) |
| return np.median(duty_cycles) |
| |
| |
| def main(): |
| o = options.Options(optspec) |
| opt, _, extra = o.parse(sys.argv[1:]) |
| |
| window_size = int(opt.window) |
| if len(extra) != 1: |
| o.fatal('Missing filename or extra arguments') |
| if not opt.output: |
| o.fatal('Missing output option required') |
| |
| f = open(extra[0], 'rb') |
| raw_data = np.fromfile(f, dtype=np.complex64, count=window_size) |
| downsampled_data = decimate(raw_data, DOWNSAMPLE_FACTOR, 2, ftype='fir') |
| |
| hilbert_data = hilbert(np.absolute(downsampled_data)) |
| envelope = symiirorder1(hilbert_data, 1, 0.5) |
| abs_envelope = abs(envelope) |
| |
| if opt.display: |
| display_size = min(opt.window, 10e6) |
| if opt.window > 10e6: |
| sys.stderr.write('Warning: window size too large to display. Using ' |
| 'default value of 10e6\n') |
| plt.plot(downsampled_data[0: display_size].real, color='red', hold=True) |
| plt.plot(abs_envelope) |
| plt.show() |
| |
| # Calculate useful values for processing. |
| mean = np.average(abs_envelope) |
| active_bitmap = abs_envelope > mean |
| active_indices = active_bitmap.nonzero()[0] |
| |
| off_switch_indices, on_switch_indices = get_switch_indices(active_indices) |
| duty = get_duty_cycle(active_bitmap, off_switch_indices) |
| period = get_period(off_switch_indices) |
| rms = get_active_rms(raw_data, active_bitmap) |
| save_profile(raw_data, off_switch_indices, on_switch_indices, opt.output) |
| print 'Duty-Cycle: %f, Period: %fs, Active RMS: %f' % (duty, period, rms) |
| |
| |
| if __name__ == '__main__': |
| main() |