blob: cd17bccee62adc5aadd699cc5851851e20c05a10 [file] [log] [blame]
#!/usr/bin/python
"""Signal Generator Tool.
This module uses the Gnuradio framework to take in multiple signals as
specified on command line and adds them together and broadcasts them using a
USRP software defined radio.
"""
from collections import namedtuple
import ConfigParser
import math
import os
import re
import sys
import tempfile
from gnuradio import blocks
from gnuradio import gr
from gnuradio import uhd
import numpy as np
from scipy.interpolate import interp1d
import options
optspec = """
signal_generator [options...]
--
Radio Parameters:
F,frequency= Specify the frequency of the signal generated in KHz [2412000]
G,gain= Specify the Db gain of the radio when it broadcasts. [50.0]
C,config= Specify the config file for signal parameters.
o,filename= Specify filename to save signal instead of recording.
Signal Types:
f,file= e.g "-f foo.dat^10.0 to play foo.dat at 10x signal strength
p,periodic= e.g "-p 1000%0.5@1000000^10.0" to repeat a sine signal every 1000us with 50% duty centered at 1000000Hz and 10x amplification
b,bluetooth= e.g "-b 100" to broadcast a bluetooth-like signal at specified power.
w,wifi= e.g "-w 100" to broadcast a wifi-like signal at specified power.
"""
SAMPLE_RATE = 30e6
PROFILE_SIZE = 30000
SignalParameters = namedtuple('ParseSignal', ['prefix', 'amplification',
'frequency', 'duty'])
def parse_signal(parameters):
"""Parse the command line signal parameters.
Parameter notation corresponds special symbols with parameter values
^: amplification
%: duty cycle
@: frequency
Args:
parameters: parameter string to be parsed
Returns:
ParseSignal object with parsed parameter
"""
split_params = re.split(r'([%@^])', parameters.strip('"\''))
param_dict = {'%': 0.0, '@': 0.0, '^': 0.0, 'filename': '', 'period': 0.0}
param_dict['prefix'] = split_params[0]
for i in range(1, len(split_params) - 1, 2):
param_dict[split_params[i]] = float(split_params[i + 1])
return SignalParameters(param_dict['prefix'], param_dict['^'],
param_dict['@'], param_dict['%'])
def interpolate(data, factor):
x_old = range(0, len(data) * factor, factor)
f = interp1d(x_old, data)
xnew = range(0, len(data) * (factor - 1))
ynew = f(xnew)
return ynew
class SignalGenerator(gr.top_block):
"""Signal Generation Class.
Class to create a GnuRadio top-block which takes multiple signals and
broadcasts the sum of the signals. The power and frequency of the signal
can be changed. Different types of signals can be added by calling the
corresponding method.
"""
def __init__(self, filename):
"""Initialize the top block and create the URSP sink block."""
super(SignalGenerator, self).__init__('Signal Generator')
stream_args = uhd.stream_args(cpu_format='fc32', otw_format='sc8',
channels=range(1))
if filename:
self.sink = blocks.file_sink(gr.sizeof_gr_complex, filename, False)
else:
self.sink = uhd.usrp_sink(',', stream_args)
self.sink.set_samp_rate(SAMPLE_RATE)
self.sink.set_bandwidth(SAMPLE_RATE)
# Create a list of the file source streams so we can add them as we read
# arguments.
self.sources = []
self.temp_files = []
self.add_block = blocks.add_vcc(1)
self.connect((self.add_block, 0), (self.sink, 0))
def add_file_source(self, filename, factor):
"""Create and add a file source to the sources list.
Args:
filename: Path to the file to be broadcasted
factor: Amplitude is multiplied by this
"""
src = blocks.file_source(gr.sizeof_gr_complex, filename, True)
multiply_block = blocks.multiply_const_vcc((factor,))
self.connect((src, 0), (multiply_block, 0))
self.sources.append(multiply_block)
def add_periodic_signal(self, freq, length, duty, factor):
"""Add an oscillating parameterized source.
Args:
freq: Frequency in KHz of the peak
length: Duration in micro-seconds of one period
duty: Proportion of time that the signal is high (0-1)
factor: Amplitude is multiplied by this value
"""
up_time = int(length * duty * 30)
signal = np.zeros(shape=30 * length, dtype=np.complex64)
up_sig = freq * math.pi / 15e6 * np.array(range(up_time))
signal.imag[0 : up_time] = np.sin(up_sig)
signal.real[0 : up_time] = np.cos(up_sig)
# Write the signal to the file so it can be read by the file source.
f = tempfile.NamedTemporaryFile(mode='w+b', bufsize=0)
signal.tofile(f.name)
self.temp_files.append(f)
self.add_file_source(f.name, factor)
def add_periodic_profile(self, length, duty, factor, profile):
"""Add an oscillating parameterized source with a frequency profile.
Args:
length: Duration in micro-seconds of one period
duty: Proportion of time that the signal is high (0-1)
factor: Amplitude is multiplied by this value
profile: Vector of the frequency profile
"""
up_time = int(length * duty * 30)
signal = np.zeros(shape=30 * length, dtype=np.complex64)
profile_path = os.path.abspath(os.path.expanduser(profile.strip('\'"')))
fourier_down = np.load(profile_path)
fourier = interpolate(fourier_down, int(SAMPLE_RATE) / len(fourier_down))
interference_sample = np.fft.ifft(fourier, int(SAMPLE_RATE))
signal[0 : up_time] = interference_sample[0 : up_time]
f = tempfile.NamedTemporaryFile(mode='w+b', bufsize=0)
signal.tofile(f.name)
self.temp_files.append(f)
self.add_file_source(f.name, factor)
def run(self):
"""Connect all the sources to the adder and run the system."""
for i in range(len(self.sources)):
self.connect((self.sources[i], 0), (self.add_block, i))
gr.top_block.run(self)
def parse_config(filename):
"""Parse the config file.
Args:
filename: The filename of the configuration file.
Returns:
Dictionary of the parameters dividied into signal types and parameter type.
"""
config = ConfigParser.ConfigParser()
config.read(filename)
params = {}
for section in config.sections():
params[section] = {'duty': config.getfloat(section, 'duty'),
'amplitude': config.getfloat(section, 'amplitude'),
'length': config.getint(section, 'length'),
'profile': config.get(section, 'profile')
}
return params
def main():
o = options.Options(optspec)
opt, flags, _ = o.parse(sys.argv[1:])
signal_gen_block = SignalGenerator(opt.filename)
if not opt.filename:
signal_gen_block.sink.set_center_freq(float(opt.frequency) * 1000.0)
signal_gen_block.sink.set_gain(float(opt.gain))
if opt.config:
signal_params = parse_config(opt.config)
else:
signal_params = []
# Add the signals to the top_block
# 'signal_count' is used to keep track of the number of signals being
# broadcasted for error checking.
signal_count = 0
print 'Processing Signals...'
for flag, value in flags:
parameter = parse_signal(value)
if flag == '--file' or flag == '-f':
# Parse out the parameters from the syntax.
factor = parameter.amplification
filename = os.path.expanduser(parameter.prefix)
signal_gen_block.add_file_source(filename, factor)
signal_count += 1
if flag == '--periodic' or flag == '-p':
length = int(parameter.prefix)
duty = parameter.duty
frequency = parameter.frequency
factor = parameter.amplification
signal_gen_block.add_periodic_signal(frequency, length, duty, factor)
signal_count += 1
if flag == '--bluetooth' or flag == '-b':
if not signal_params:
o.fatal('Config file required in order to use bluetooth option')
bt_params = signal_params['bluetooth']
# Round to nearest micro-second
signal_power_fraction = float(parameter.prefix)
length = bt_params['length']
duty = bt_params['duty']
factor = bt_params['amplitude'] * signal_power_fraction
profile = bt_params['profile']
signal_gen_block.add_periodic_profile(length, duty, factor, profile)
signal_count += 1
if flag == '--wifi' or flag == '-w':
if not signal_params:
o.fatal('Config file required in order to use wifi option')
params = signal_params['wifi']
# Round to nearest micro-second
signal_power_fraction = float(parameter.prefix)
length = params['length']
duty = params['duty']
factor = params['amplitude'] * signal_power_fraction
profile = params['profile']
signal_gen_block.add_periodic_profile(length, duty, factor, profile)
signal_count += 1
# If there are no inputted signals throw an error.
if signal_count == 0:
o.fatal('At least 1 signal is required to run')
try:
print 'Running...'
signal_gen_block.run()
except KeyboardInterrupt:
# Remove the Keyboard Interrupt Error from displaying
pass
if __name__ == '__main__':
main()