| #!/usr/bin/python |
| """Signal Generator Tool. |
| |
| This module uses the Gnuradio framework to take in multiple signals as |
| specified on command line and adds them together and broadcasts them using a |
| USRP software defined radio. |
| |
| """ |
| from collections import namedtuple |
| import ConfigParser |
| import math |
| import os |
| import re |
| import sys |
| import tempfile |
| |
| from gnuradio import blocks |
| from gnuradio import gr |
| from gnuradio import uhd |
| import numpy as np |
| from scipy.interpolate import interp1d |
| |
| import options |
| |
| optspec = """ |
| signal_generator [options...] |
| -- |
| Radio Parameters: |
| F,frequency= Specify the frequency of the signal generated in KHz [2412000] |
| G,gain= Specify the Db gain of the radio when it broadcasts. [50.0] |
| C,config= Specify the config file for signal parameters. |
| o,filename= Specify filename to save signal instead of recording. |
| Signal Types: |
| f,file= e.g "-f foo.dat^10.0 to play foo.dat at 10x signal strength |
| p,periodic= e.g "-p 1000%0.5@1000000^10.0" to repeat a sine signal every 1000us with 50% duty centered at 1000000Hz and 10x amplification |
| b,bluetooth= e.g "-b 100" to broadcast a bluetooth-like signal at specified power. |
| w,wifi= e.g "-w 100" to broadcast a wifi-like signal at specified power. |
| """ |
| SAMPLE_RATE = 30e6 |
| PROFILE_SIZE = 30000 |
| |
| SignalParameters = namedtuple('ParseSignal', ['prefix', 'amplification', |
| 'frequency', 'duty']) |
| |
| |
| def parse_signal(parameters): |
| """Parse the command line signal parameters. |
| |
| Parameter notation corresponds special symbols with parameter values |
| ^: amplification |
| %: duty cycle |
| @: frequency |
| |
| Args: |
| parameters: parameter string to be parsed |
| Returns: |
| ParseSignal object with parsed parameter |
| """ |
| |
| split_params = re.split(r'([%@^])', parameters.strip('"\'')) |
| param_dict = {'%': 0.0, '@': 0.0, '^': 0.0, 'filename': '', 'period': 0.0} |
| param_dict['prefix'] = split_params[0] |
| for i in range(1, len(split_params) - 1, 2): |
| param_dict[split_params[i]] = float(split_params[i + 1]) |
| |
| return SignalParameters(param_dict['prefix'], param_dict['^'], |
| param_dict['@'], param_dict['%']) |
| |
| |
| def interpolate(data, factor): |
| x_old = range(0, len(data) * factor, factor) |
| f = interp1d(x_old, data) |
| |
| xnew = range(0, len(data) * (factor - 1)) |
| ynew = f(xnew) |
| return ynew |
| |
| |
| class SignalGenerator(gr.top_block): |
| """Signal Generation Class. |
| |
| Class to create a GnuRadio top-block which takes multiple signals and |
| broadcasts the sum of the signals. The power and frequency of the signal |
| can be changed. Different types of signals can be added by calling the |
| corresponding method. |
| """ |
| |
| def __init__(self, filename): |
| """Initialize the top block and create the URSP sink block.""" |
| super(SignalGenerator, self).__init__('Signal Generator') |
| |
| stream_args = uhd.stream_args(cpu_format='fc32', otw_format='sc8', |
| channels=range(1)) |
| if filename: |
| self.sink = blocks.file_sink(gr.sizeof_gr_complex, filename, False) |
| else: |
| self.sink = uhd.usrp_sink(',', stream_args) |
| self.sink.set_samp_rate(SAMPLE_RATE) |
| self.sink.set_bandwidth(SAMPLE_RATE) |
| |
| # Create a list of the file source streams so we can add them as we read |
| # arguments. |
| self.sources = [] |
| self.temp_files = [] |
| |
| self.add_block = blocks.add_vcc(1) |
| self.connect((self.add_block, 0), (self.sink, 0)) |
| |
| def add_file_source(self, filename, factor): |
| """Create and add a file source to the sources list. |
| |
| Args: |
| filename: Path to the file to be broadcasted |
| factor: Amplitude is multiplied by this |
| """ |
| |
| src = blocks.file_source(gr.sizeof_gr_complex, filename, True) |
| multiply_block = blocks.multiply_const_vcc((factor,)) |
| self.connect((src, 0), (multiply_block, 0)) |
| self.sources.append(multiply_block) |
| |
| def add_periodic_signal(self, freq, length, duty, factor): |
| """Add an oscillating parameterized source. |
| |
| Args: |
| freq: Frequency in KHz of the peak |
| length: Duration in micro-seconds of one period |
| duty: Proportion of time that the signal is high (0-1) |
| factor: Amplitude is multiplied by this value |
| """ |
| up_time = int(length * duty * 30) |
| signal = np.zeros(shape=30 * length, dtype=np.complex64) |
| up_sig = freq * math.pi / 15e6 * np.array(range(up_time)) |
| signal.imag[0 : up_time] = np.sin(up_sig) |
| signal.real[0 : up_time] = np.cos(up_sig) |
| |
| # Write the signal to the file so it can be read by the file source. |
| f = tempfile.NamedTemporaryFile(mode='w+b', bufsize=0) |
| signal.tofile(f.name) |
| self.temp_files.append(f) |
| |
| self.add_file_source(f.name, factor) |
| |
| def add_periodic_profile(self, length, duty, factor, profile): |
| """Add an oscillating parameterized source with a frequency profile. |
| |
| Args: |
| length: Duration in micro-seconds of one period |
| duty: Proportion of time that the signal is high (0-1) |
| factor: Amplitude is multiplied by this value |
| profile: Vector of the frequency profile |
| """ |
| up_time = int(length * duty * 30) |
| signal = np.zeros(shape=30 * length, dtype=np.complex64) |
| |
| profile_path = os.path.abspath(os.path.expanduser(profile.strip('\'"'))) |
| fourier_down = np.load(profile_path) |
| fourier = interpolate(fourier_down, int(SAMPLE_RATE) / len(fourier_down)) |
| interference_sample = np.fft.ifft(fourier, int(SAMPLE_RATE)) |
| |
| signal[0 : up_time] = interference_sample[0 : up_time] |
| f = tempfile.NamedTemporaryFile(mode='w+b', bufsize=0) |
| |
| signal.tofile(f.name) |
| self.temp_files.append(f) |
| |
| self.add_file_source(f.name, factor) |
| |
| def run(self): |
| """Connect all the sources to the adder and run the system.""" |
| for i in range(len(self.sources)): |
| self.connect((self.sources[i], 0), (self.add_block, i)) |
| gr.top_block.run(self) |
| |
| |
| def parse_config(filename): |
| """Parse the config file. |
| |
| Args: |
| filename: The filename of the configuration file. |
| Returns: |
| Dictionary of the parameters dividied into signal types and parameter type. |
| """ |
| config = ConfigParser.ConfigParser() |
| config.read(filename) |
| |
| params = {} |
| for section in config.sections(): |
| params[section] = {'duty': config.getfloat(section, 'duty'), |
| 'amplitude': config.getfloat(section, 'amplitude'), |
| 'length': config.getint(section, 'length'), |
| 'profile': config.get(section, 'profile') |
| } |
| return params |
| |
| |
| def main(): |
| o = options.Options(optspec) |
| opt, flags, _ = o.parse(sys.argv[1:]) |
| |
| signal_gen_block = SignalGenerator(opt.filename) |
| if not opt.filename: |
| signal_gen_block.sink.set_center_freq(float(opt.frequency) * 1000.0) |
| signal_gen_block.sink.set_gain(float(opt.gain)) |
| |
| if opt.config: |
| signal_params = parse_config(opt.config) |
| else: |
| signal_params = [] |
| |
| # Add the signals to the top_block |
| # 'signal_count' is used to keep track of the number of signals being |
| # broadcasted for error checking. |
| signal_count = 0 |
| print 'Processing Signals...' |
| for flag, value in flags: |
| parameter = parse_signal(value) |
| if flag == '--file' or flag == '-f': |
| # Parse out the parameters from the syntax. |
| factor = parameter.amplification |
| filename = os.path.expanduser(parameter.prefix) |
| |
| signal_gen_block.add_file_source(filename, factor) |
| signal_count += 1 |
| |
| if flag == '--periodic' or flag == '-p': |
| length = int(parameter.prefix) |
| duty = parameter.duty |
| frequency = parameter.frequency |
| factor = parameter.amplification |
| |
| signal_gen_block.add_periodic_signal(frequency, length, duty, factor) |
| signal_count += 1 |
| |
| if flag == '--bluetooth' or flag == '-b': |
| if not signal_params: |
| o.fatal('Config file required in order to use bluetooth option') |
| |
| bt_params = signal_params['bluetooth'] |
| # Round to nearest micro-second |
| signal_power_fraction = float(parameter.prefix) |
| |
| length = bt_params['length'] |
| duty = bt_params['duty'] |
| factor = bt_params['amplitude'] * signal_power_fraction |
| profile = bt_params['profile'] |
| |
| signal_gen_block.add_periodic_profile(length, duty, factor, profile) |
| signal_count += 1 |
| |
| if flag == '--wifi' or flag == '-w': |
| if not signal_params: |
| o.fatal('Config file required in order to use wifi option') |
| |
| params = signal_params['wifi'] |
| # Round to nearest micro-second |
| signal_power_fraction = float(parameter.prefix) |
| |
| length = params['length'] |
| duty = params['duty'] |
| factor = params['amplitude'] * signal_power_fraction |
| profile = params['profile'] |
| |
| signal_gen_block.add_periodic_profile(length, duty, factor, profile) |
| signal_count += 1 |
| |
| # If there are no inputted signals throw an error. |
| if signal_count == 0: |
| o.fatal('At least 1 signal is required to run') |
| |
| try: |
| print 'Running...' |
| signal_gen_block.run() |
| except KeyboardInterrupt: |
| # Remove the Keyboard Interrupt Error from displaying |
| pass |
| if __name__ == '__main__': |
| main() |
| |