diff --git a/docs/datasets.rst b/docs/datasets.rst index 903634f..465c22b 100644 --- a/docs/datasets.rst +++ b/docs/datasets.rst @@ -19,7 +19,7 @@ All the datasets have almost similar API. They all have a common argument: ``transform`` to transform the input data. -.. currentmodule:: torchsig.datasets +.. currentmodule:: torchsig.datasets.sig53 Sig53 @@ -28,24 +28,32 @@ Sig53 .. autoclass:: Sig53 +.. currentmodule:: torchsig.datasets.wideband_sig53 + WidebandSig53 ~~~~~~~~~~~~~~ .. autoclass:: WidebandSig53 +.. currentmodule:: torchsig.datasets.modulations + ModulationsDataset ~~~~~~~~~~~~~~~~~~~~ .. autoclass:: ModulationsDataset +.. currentmodule:: torchsig.datasets.wideband + WidebandModulationsDataset ~~~~~~~~~~~~~~~~~~~~~~~~~~~~ .. autoclass:: WidebandModulationsDataset +.. currentmodule:: torchsig.datasets.synthetic + DigitalModulationDataset ~~~~~~~~~~~~~~~~~~~~~~~~~~ @@ -82,6 +90,8 @@ FMDataset .. autoclass:: FMDataset +.. currentmodule:: torchsig.datasets.wideband + WidebandDataset ~~~~~~~~~~~~~~~~~~ @@ -94,12 +104,17 @@ SyntheticBurstSourceDataset .. autoclass:: SyntheticBurstSourceDataset +.. currentmodule:: torchsig.datasets.file_datasets + + FileBurstSourceDataset ~~~~~~~~~~~~~~~~~~~~~~~~ .. autoclass:: FileBurstSourceDataset +.. currentmodule:: torchsig.datasets.radioml + RadioML2016 ~~~~~~~~~~~~~~ diff --git a/docs/transforms.rst b/docs/transforms.rst index 62d41ef..e9e83be 100644 --- a/docs/transforms.rst +++ b/docs/transforms.rst @@ -11,9 +11,9 @@ This is useful if you have to build a more complex transformation pipeline .. contents:: Transforms :local: -General Transforms ------------------- -.. currentmodule:: torchsig.transforms +Transforms +---------- +.. currentmodule:: torchsig.transforms.transforms Transform ^^^^^^^^^ @@ -23,9 +23,9 @@ Compose ^^^^^^^^^ .. autoclass:: Compose -NoTransform -^^^^^^^^^^^^^ -.. autoclass:: NoTransform +Identity +^^^^^^^^^ +.. autoclass:: Identity Lambda ^^^^^^^^^ @@ -51,48 +51,6 @@ TargetConcatenate ^^^^^^^^^^^^^^^^^^^^^ .. autoclass:: TargetConcatenate -RandAugment -^^^^^^^^^^^^^ -.. autoclass:: RandAugment - - -Deep Learning Techniques ------------------------- -.. currentmodule:: torchsig.transforms.deep_learning_techniques.dlt - -DatasetBasebandMixUp -^^^^^^^^^^^^^^^^^^^^^ -.. autoclass:: DatasetBasebandMixUp - -DatasetBasebandCutMix -^^^^^^^^^^^^^^^^^^^^^^^^^ -.. autoclass:: DatasetBasebandCutMix - -CutOut -^^^^^^^^^ -.. autoclass:: CutOut - -PatchShuffle -^^^^^^^^^^^^^ -.. autoclass:: PatchShuffle - -DatasetWidebandMixUp -^^^^^^^^^^^^^^^^^^^^^ -.. autoclass:: DatasetWidebandMixUp - -DatasetWidebandCutMix -^^^^^^^^^^^^^^^^^^^^^^^^^ -.. autoclass:: DatasetWidebandCutMix - -SpectrogramRandomResizeCrop -^^^^^^^^^^^^^^^^^^^^^^^^^^^^ -.. autoclass:: SpectrogramRandomResizeCrop - - -Expert Feature Transforms -------------------------- -.. currentmodule:: torchsig.transforms.expert_feature.eft - InterleaveComplex ^^^^^^^^^^^^^^^^^ .. autoclass:: InterleaveComplex @@ -137,24 +95,51 @@ ReshapeTransform ^^^^^^^^^^^^^^^^^ .. autoclass:: ReshapeTransform - -Signal Processing Transforms ----------------------------- -.. currentmodule:: torchsig.transforms.signal_processing.sp +RandAugment +^^^^^^^^^^^^^ +.. autoclass:: RandAugment Normalize ^^^^^^^^^ .. autoclass:: Normalize + +Augmentations +------------- +.. currentmodule:: torchsig.transforms.transforms + +DatasetBasebandMixUp +^^^^^^^^^^^^^^^^^^^^^ +.. autoclass:: DatasetBasebandMixUp + +DatasetBasebandCutMix +^^^^^^^^^^^^^^^^^^^^^^^^^ +.. autoclass:: DatasetBasebandCutMix + +CutOut +^^^^^^^^^ +.. autoclass:: CutOut + +PatchShuffle +^^^^^^^^^^^^^ +.. autoclass:: PatchShuffle + +DatasetWidebandMixUp +^^^^^^^^^^^^^^^^^^^^^ +.. autoclass:: DatasetWidebandMixUp + +DatasetWidebandCutMix +^^^^^^^^^^^^^^^^^^^^^^^^^ +.. autoclass:: DatasetWidebandCutMix + +SpectrogramRandomResizeCrop +^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +.. autoclass:: SpectrogramRandomResizeCrop + RandomResample ^^^^^^^^^^^^^^^^^ .. autoclass:: RandomResample - -System Impairment Transforms ------------------------------ -.. currentmodule:: torchsig.transforms.system_impairment.si - RandomTimeShift ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ .. autoclass:: RandomTimeShift @@ -231,11 +216,6 @@ RandomConvolve ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ .. autoclass:: RandomConvolve - -Wireless Channel Transforms ----------------------------- -.. currentmodule:: torchsig.transforms.wireless_channel.wce - TargetSNR ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ .. autoclass:: TargetSNR @@ -260,15 +240,6 @@ RandomPhaseShift ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ .. autoclass:: RandomPhaseShift - -Spectrogram Transforms ----------------------------- -.. currentmodule:: torchsig.transforms.spectrogram_transforms.spec - -SpectrogramResize -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ -.. autoclass:: SpectrogramResize - SpectrogramDropSamples ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ .. autoclass:: SpectrogramDropSamples @@ -288,3 +259,113 @@ SpectrogramMosaicCrop SpectrogramMosaicDownsample ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ .. autoclass:: SpectrogramMosaicDownsample + + +Target Transforms +----------------- +.. currentmodule:: torchsig.transforms.target_transforms + +DescToClassName +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +.. autoclass:: DescToClassName + +DescToClassNameSNR +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +.. autoclass:: DescToClassNameSNR + +DescToClassIndex +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +.. autoclass:: DescToClassIndex + +DescToClassIndexSNR +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +.. autoclass:: DescToClassIndexSNR + +DescToMask +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +.. autoclass:: DescToMask + +DescToMaskSignal +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +.. autoclass:: DescToMaskSignal + +DescToMaskFamily +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +.. autoclass:: DescToMaskFamily + +DescToMaskClass +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +.. autoclass:: DescToMaskClass + +DescToSemanticClass +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +.. autoclass:: DescToSemanticClass + +DescToBBox +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +.. autoclass:: DescToBBox + +DescToAnchorBoxes +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +.. autoclass:: DescToAnchorBoxes + +DescPassThrough +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +.. autoclass:: DescPassThrough + +DescToBinary +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +.. autoclass:: DescToBinary + +DescToCustom +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +.. autoclass:: DescToCustom + +DescToClassEncoding +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +.. autoclass:: DescToClassEncoding + +DescToWeightedMixUp +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +.. autoclass:: DescToWeightedMixUp + +DescToWeightedCutMix +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +.. autoclass:: DescToWeightedCutMix + +DescToBBoxDict +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +.. autoclass:: DescToBBoxDict + +DescToBBoxSignalDict +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +.. autoclass:: DescToBBoxSignalDict + +DescToBBoxFamilyDict +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +.. autoclass:: DescToBBoxFamilyDict + +DescToInstMaskDict +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +.. autoclass:: DescToInstMaskDict + +DescToSignalInstMaskDict +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +.. autoclass:: DescToSignalInstMaskDict + +DescToSignalFamilyInstMaskDict +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +.. autoclass:: DescToSignalFamilyInstMaskDict + +DescToListTuple +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +.. autoclass:: DescToListTuple + +ListTupleToDesc +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +.. autoclass:: ListTupleToDesc + +LabelSmoothing +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +.. autoclass:: LabelSmoothing + diff --git a/tests/test_transforms_benchmark.py b/tests/test_transforms_benchmark.py index cdecb1c..60ce01b 100644 --- a/tests/test_transforms_benchmark.py +++ b/tests/test_transforms_benchmark.py @@ -74,8 +74,8 @@ def generate_data(): ("magnitude_rescale", RandomMagRescale(0.5, 3), RandomMagRescale(0.5, 3)), ( "drop_samples", - RandomDropSamples(0.3, 50, ["zero"]), - RandomDropSamples(0.3, 50, ["zero"]), + RandomDropSamples(0.01, 50, ["zero"]), + RandomDropSamples(0.01, 50, ["zero"]), ), ("quantize", Quantize(32, ["floor"]), Quantize(32, ["floor"])), ("clip", Clip(0.85), Clip(0.85)), diff --git a/tests/test_transforms_figures.py b/tests/test_transforms_figures.py index a70d435..faff928 100644 --- a/tests/test_transforms_figures.py +++ b/tests/test_transforms_figures.py @@ -101,7 +101,12 @@ def generate_data(modulation_name): @pytest.mark.parametrize( - "transform, modulation_name", itertools.product(transforms_list, modulations) + "transform, modulation_name", + itertools.product(transforms_list, modulations), + ids=[ + "{}-{}".format(x[0], y) + for x, y in itertools.product(transforms_list, modulations) + ], ) def test_transform_figures(transform, modulation_name): short_data, long_data = generate_data(modulation_name) diff --git a/torchsig/datasets/file_datasets.py b/torchsig/datasets/file_datasets.py index c6d5cb7..eb5d951 100644 --- a/torchsig/datasets/file_datasets.py +++ b/torchsig/datasets/file_datasets.py @@ -1,21 +1,9 @@ -import json -import os -import xml -import xml.etree.ElementTree as ET +from torchsig.datasets.wideband import BurstSourceDataset, SignalBurst from typing import Any, List, Optional - import numpy as np import pandas as pd - -from torchsig.datasets.wideband import BurstSourceDataset, SignalBurst -from torchsig.transforms.functional import ( - FloatParameter, - NumericParameter, - to_distribution, - uniform_continuous_distribution, - uniform_discrete_distribution, -) -from torchsig.utils.types import SignalDescription +import json +import os class WidebandFileSignalBurst(SignalBurst): @@ -70,7 +58,9 @@ def generate_iq(self): # Read desired number of samples from file iq_data = ( np.frombuffer( - file_object.read(int(self.num_iq_samples) * self.bytes_per_sample), + file_object.read( + int(self.num_iq_samples) * self.bytes_per_sample + ), dtype=self.capture_type, ) .astype(np.float64) @@ -82,8 +72,8 @@ def generate_iq(self): # file repetitively and summing with itself iq_data = np.zeros(self.num_iq_samples, dtype=np.complex128) return iq_data[: self.num_iq_samples] - - + + class TargetInterpreter: """The TargetInterpreter base class is meant to be inherited and modified for specific interpreters such that each sub-class implements a transform @@ -168,13 +158,16 @@ def convert_to_signalburst( for label in self.detections_df.iloc[df_indicies].itertuples(): # Determine cut vs full capture relationship startInWindow = bool( - label.start >= start_sample and label.start < start_sample + self.num_iq_samples + label.start >= start_sample + and label.start < start_sample + self.num_iq_samples ) stopInWindow = bool( - label.stop > start_sample and label.stop <= start_sample + self.num_iq_samples + label.stop > start_sample + and label.stop <= start_sample + self.num_iq_samples ) spansFullWindow = bool( - label.start <= start_sample and label.stop >= start_sample + self.num_iq_samples + label.start <= start_sample + and label.stop >= start_sample + self.num_iq_samples ) fullyContainedInWindow = bool(startInWindow and stopInWindow) @@ -337,7 +330,9 @@ def __init__( self.class_column = class_column # Generate dataframe self.detections_df = self._convert_to_dataframe() - self.detections_df = self.detections_df.sort_values(by=["start"]).reset_index(drop=True) + self.detections_df = self.detections_df.sort_values(by=["start"]).reset_index( + drop=True + ) self.num_labels = len(self.detections_df) self.detections_df = self._convert_class_name_to_index() @@ -400,7 +395,9 @@ def __init__( self.class_target = class_target # Generate dataframe self.detections_df = self._convert_to_dataframe() - self.detections_df = self.detections_df.sort_values(by=["start"]).reset_index(drop=True) + self.detections_df = self.detections_df.sort_values(by=["start"]).reset_index( + drop=True + ) self.num_labels = len(self.detections_df) self.detections_df = self._convert_class_name_to_index() @@ -561,7 +558,10 @@ def __init__( # Distribute randomness evenly over labels, rather than files then labels # If more than 10,000 files, omit this step for speed - if self.sample_policy == "random_labels" and len(self.target_files) < 10_000: + if ( + self.sample_policy == "random_labels" + and len(self.target_files) < 10_000 + ): annotations_per_file = [] for file_index, target_file in enumerate(self.target_files): # Read total file size @@ -581,11 +581,14 @@ def __init__( # Track number of annotations annotations_per_file.append(len(annotations)) total_annotations = sum(annotations_per_file) - self.file_probabilities = np.asarray(annotations_per_file) / total_annotations + self.file_probabilities = ( + np.asarray(annotations_per_file) / total_annotations + ) # Generate the index by creating a set of bursts. self.index = [ - (collection, idx) for idx, collection in enumerate(self._generate_burst_collections()) + (collection, idx) + for idx, collection in enumerate(self._generate_burst_collections()) ] def _generate_burst_collections(self) -> List[List[SignalBurst]]: @@ -639,7 +642,9 @@ def _generate_burst_collections(self) -> List[List[SignalBurst]]: self.capture_type.itemsize * 2 ) else: - sample_burst_collection[0].bytes_per_sample = self.capture_type.itemsize + sample_burst_collection[ + 0 + ].bytes_per_sample = self.capture_type.itemsize else: # Create invalid SignalBurst for data file information only sample_burst_collection = [] @@ -703,22 +708,32 @@ def _generate_burst_collections(self) -> List[List[SignalBurst]]: while null_interval < self.num_iq_samples: # Randomly sample label index to search around label_index = np.random.randint(interpreter.num_labels) - if interpreter.num_labels > 1 and label_index + 1 <= interpreter.num_labels - 1: + if ( + interpreter.num_labels > 1 + and label_index + 1 <= interpreter.num_labels - 1 + ): # Max over previous annotation stop and previous null start to handle cases of long signals - null_start_index = max(annotations.iloc[label_index].stop, null_start_index) + null_start_index = max( + annotations.iloc[label_index].stop, null_start_index + ) null_stop_index = annotations.iloc[label_index + 1].start elif ( - interpreter.num_labels > 1 and label_index + 1 > interpreter.num_labels - 1 + interpreter.num_labels > 1 + and label_index + 1 > interpreter.num_labels - 1 ): # Start start index at end of final label - null_start_index = max(annotations.iloc[label_index].stop, null_start_index) + null_start_index = max( + annotations.iloc[label_index].stop, null_start_index + ) null_stop_index = capture_duration_samples elif interpreter.num_labels == 1: # Sample from before or after the only label before = True if np.random.rand() >= 0.5 else False null_start_index = 0 if before else annotations.iloc[0].stop null_stop_index = ( - annotations.iloc[0].start if before else capture_duration_samples + annotations.iloc[0].start + if before + else capture_duration_samples ) else: # Sample from anywhere in file @@ -771,7 +786,9 @@ def _generate_burst_collections(self) -> List[List[SignalBurst]]: for sample_idx in range(self.num_valid_samples): if self.sample_policy == "random_labels": # Sample random file, weighted by number of annotations - file_index = np.random.choice(len(self.data_files), p=self.file_probabilities) + file_index = np.random.choice( + len(self.data_files), p=self.file_probabilities + ) # Read total file size capture_duration_samples = ( os.path.getsize(os.path.join(self.data_files[file_index])) @@ -810,11 +827,15 @@ def _generate_burst_collections(self) -> List[List[SignalBurst]]: latest_sample_index = burst_start_index + burst_duration / 2 else: # Long burst: Ensure at least a quarter of the window is occupied - earliest_sample_index = burst_start_index - (0.75 * self.num_iq_samples) + earliest_sample_index = burst_start_index - ( + 0.75 * self.num_iq_samples + ) latest_sample_index = annotations.iloc[label_index].stop - ( 0.25 * self.num_iq_samples ) - data_index = max(0, np.random.randint(earliest_sample_index, latest_sample_index)) + data_index = max( + 0, np.random.randint(earliest_sample_index, latest_sample_index) + ) # Check duration if capture_duration_samples - data_index < self.num_iq_samples: @@ -838,7 +859,9 @@ def _generate_burst_collections(self) -> List[List[SignalBurst]]: self.capture_type.itemsize * 2 ) else: - sample_burst_collection[0].bytes_per_sample = self.capture_type.itemsize + sample_burst_collection[ + 0 + ].bytes_per_sample = self.capture_type.itemsize # If sequentially sampling, increment if self.sample_policy == "sequential_labels": diff --git a/torchsig/datasets/wideband.py b/torchsig/datasets/wideband.py index 5e15b29..b58fdc2 100644 --- a/torchsig/datasets/wideband.py +++ b/torchsig/datasets/wideband.py @@ -34,13 +34,10 @@ from torchsig.transforms.functional import ( FloatParameter, NumericParameter, - to_distribution, - uniform_continuous_distribution, - uniform_discrete_distribution, ) from torchsig.utils.dataset import SignalDataset from torchsig.utils.dsp import low_pass -from torchsig.utils.types import SignalData, SignalDescription +from torchsig.utils.types import SignalData, SignalDescription, RandomDistribution class SignalBurst(SignalDescription): @@ -226,9 +223,8 @@ def __init__( modulation = self.class_list else: modulation = [modulation] if isinstance(modulation, str) else modulation - self.classes = to_distribution( + self.classes = RandomDistribution.to_distribution( modulation, - random_generator=self.random_generator, ) # Update freq values @@ -526,9 +522,8 @@ def __init__( **kwargs, ): super(FileSignalBurst, self).__init__(**kwargs) - self.file_path = to_distribution( + self.file_path = RandomDistribution.to_distribution( file_path, - random_generator=self.random_generator, ) self.file_reader = file_reader self.class_list = class_list @@ -674,20 +669,12 @@ def __init__( self.num_iq_samples = num_iq_samples self.num_samples = num_samples self.burst_class = burst_class - self.bandwidths = to_distribution( - bandwidths, random_generator=self.random_generator - ) - self.center_frequencies = to_distribution( - center_frequencies, random_generator=self.random_generator - ) - self.burst_durations = to_distribution( - burst_durations, random_generator=self.random_generator - ) - self.silence_durations = to_distribution( - silence_durations, random_generator=self.random_generator - ) - self.snrs_db = to_distribution(snrs_db, random_generator=self.random_generator) - self.start = to_distribution(start, random_generator=self.random_generator) + self.bandwidths = RandomDistribution.to_distribution(bandwidths) + self.center_frequencies = RandomDistribution.to_distribution(center_frequencies) + self.burst_durations = RandomDistribution.to_distribution(burst_durations) + self.silence_durations = RandomDistribution.to_distribution(silence_durations) + self.snrs_db = RandomDistribution.to_distribution(snrs_db) + self.start = RandomDistribution.to_distribution(start) # Generate the index by creating a set of bursts. self.index = [ @@ -720,7 +707,6 @@ def _generate_burst_collections(self) -> List[List[SignalBurst]]: center_frequency=center_frequency, bandwidth=bandwidth, snr=snr, - random_generator=self.random_generator, ) ) start = start + burst_duration + silence_duration @@ -1024,10 +1010,8 @@ def __init__( ) self.target_transform = target_transform - self.num_signals = to_distribution( - num_signals, random_generator=self.random_generator - ) - self.snrs = to_distribution(snrs, random_generator=self.random_generator) + self.num_signals = RandomDistribution.to_distribution(num_signals) + self.snrs = RandomDistribution.to_distribution(snrs) def __gen_metadata__(self, modulation_list: List) -> pd.DataFrame: """This method defines the parameters of the modulations to be inserted @@ -1135,13 +1119,11 @@ def __getitem__(self, item: int) -> Tuple[np.ndarray, Any]: ): # Signal is bursty bursty = True - burst_duration = to_distribution( + burst_duration = RandomDistribution.to_distribution( literal_eval(self.metadata.iloc[meta_idx].burst_duration), - random_generator=self.random_generator, )() - silence_multiple = to_distribution( + silence_multiple = RandomDistribution.to_distribution( literal_eval(self.metadata.iloc[meta_idx].silence_multiple), - random_generator=self.random_generator, )() stops_in_frame = False if hop_random_var < self.metadata.iloc[meta_idx].freq_hopping_prob: @@ -1152,11 +1134,10 @@ def __getitem__(self, item: int) -> Tuple[np.ndarray, Any]: bandwidth = self.random_generator.uniform(0.025, 0.05) silence_duration = burst_duration * (silence_multiple - 1) - freq_channels = to_distribution( + freq_channels = RandomDistribution.to_distribution( literal_eval( self.metadata.iloc[meta_idx].freq_hopping_channels ), - random_generator=self.random_generator, )() # Convert channel count to list of center frequencies @@ -1486,7 +1467,9 @@ def __call__(self, data: Any) -> Any: (x + bandwidth / 2, y - bandwidth / 2) for x, y in unoccupied_bands ] rand_band_idx = np.random.randint(len(center_freqs)) - center_freqs_dist = to_distribution(center_freqs[rand_band_idx]) + center_freqs_dist = RandomDistribution.to_distribution( + center_freqs[rand_band_idx] + ) center_freq = center_freqs_dist() bursty = True if np.random.rand() < 0.5 else False burst_duration = np.random.uniform(0.05, 1.0) if bursty else 1.0 diff --git a/torchsig/transforms/functional.py b/torchsig/transforms/functional.py index 1211fb7..14306af 100644 --- a/torchsig/transforms/functional.py +++ b/torchsig/transforms/functional.py @@ -1,13 +1,13 @@ -from functools import partial from typing import Callable, List, Literal, Optional, Tuple, Union - -import numpy as np -import pywt from numba import complex64, float64, int64, njit +from torchsig.utils.types import RandomDistribution +from torchsig.utils.dsp import low_pass from scipy import interpolate from scipy import signal as sp +import numpy as np +import pywt + -from torchsig.utils.dsp import low_pass import cv2 @@ -15,9 +15,6 @@ "FloatParameter", "IntParameter", "NumericParameter", - "uniform_discrete_distribution", - "uniform_continuous_distribution", - "to_distribution", "normalize", "resample", "make_sinc_filter", @@ -62,76 +59,11 @@ ] -FloatParameter = Union[Callable[[int], float], float, Tuple[float, float], List] -IntParameter = Union[Callable[[int], int], int, Tuple[int, int], List] +FloatParameter = Union[RandomDistribution, float, Tuple[float, float], List] +IntParameter = Union[RandomDistribution, int, Tuple[int, int], List] NumericParameter = Union[FloatParameter, IntParameter] -def uniform_discrete_distribution( - choices: List, random_generator: Optional[np.random.RandomState] = None -): - random_generator = random_generator if random_generator else np.random.RandomState() - return partial(random_generator.choice, choices) - - -def uniform_continuous_distribution( - lower: Union[int, float], - upper: Union[int, float], - random_generator: Optional[np.random.RandomState] = None, -): - random_generator = random_generator if random_generator else np.random.RandomState() - return partial(random_generator.uniform, lower, upper) - - -def to_distribution( - param: Union[ - int, - float, - str, - Callable, - List[int], - List[float], - List[str], - Tuple[int, int], - Tuple[float, float], - ], - random_generator: Optional[np.random.RandomState] = None, -): - random_generator = random_generator if random_generator else np.random.RandomState() - if isinstance(param, Callable): # type: ignore - return param - - if isinstance(param, list): - ####################################################################### - # [BUG ALERT]: Nested tuples within lists does not function as desired. - # Below will instantiate a random distribution from the list; however, - # each call will only come from the previously randomized selection, - # but the desired behavior would be for this to randomly select each - # region at call time. Commenting out for now, but should revisit in - # the future to add back the functionality. - ####################################################################### - # if isinstance(param[0], tuple): - # tuple_from_list = param[random_generator.randint(len(param))] - # return uniform_continuous_distribution( - # tuple_from_list[0], - # tuple_from_list[1], - # random_generator, - # ) - return uniform_discrete_distribution(param, random_generator) - - if isinstance(param, tuple): - return uniform_continuous_distribution( - param[0], - param[1], - random_generator, - ) - - if isinstance(param, int) or isinstance(param, float): - return uniform_discrete_distribution([param], random_generator) - - return param - - def normalize( tensor: np.ndarray, norm_order: Optional[Union[float, int, Literal["fro", "nuc"]]] = 2, @@ -252,7 +184,9 @@ def awgn(tensor: np.ndarray, noise_power_db: float) -> np.ndarray: """ real_noise = np.random.randn(*tensor.shape) imag_noise = np.random.randn(*tensor.shape) - return tensor + (10.0 ** (noise_power_db / 20.0)) * (real_noise + 1j * imag_noise) / np.sqrt(2) + return tensor + (10.0 ** (noise_power_db / 20.0)) * ( + real_noise + 1j * imag_noise + ) / np.sqrt(2) def time_varying_awgn( @@ -312,9 +246,13 @@ def time_varying_awgn( duration = stop_idx - start_idx start_power = noise_power_db_low if idx % 2 == 0 else noise_power_db_high stop_power = noise_power_db_high if idx % 2 == 0 else noise_power_db_low - noise_power_db[start_idx:stop_idx] = np.linspace(start_power, stop_power, duration) + noise_power_db[start_idx:stop_idx] = np.linspace( + start_power, stop_power, duration + ) - return tensor + (10.0 ** (noise_power_db / 20.0)) * (real_noise + 1j * imag_noise) / np.sqrt(2) + return tensor + (10.0 ** (noise_power_db / 20.0)) * ( + real_noise + 1j * imag_noise + ) / np.sqrt(2) @njit(cache=False) @@ -380,7 +318,9 @@ def rayleigh_fading( ) ) # Generate initial taps - rayleigh_taps = np.random.randn(num_taps) + 1j * np.random.randn(num_taps) # multi-path channel + rayleigh_taps = np.random.randn(num_taps) + 1j * np.random.randn( + num_taps + ) # multi-path channel # Linear interpolate taps by a factor of 100 -- so we can get accurate coherence bandwidths old_time = np.linspace(0, 1.0, num_taps, endpoint=True) @@ -611,7 +551,9 @@ def continuous_wavelet_transform( Scalogram of tensor along time dimension """ scales = np.arange(1, nscales) - cwtmatr, _ = pywt.cwt(tensor, scales=scales, wavelet=wavelet, sampling_period=1.0 / sample_rate) + cwtmatr, _ = pywt.cwt( + tensor, scales=scales, wavelet=wavelet, sampling_period=1.0 / sample_rate + ) # if the dtype is complex then return the magnitude if np.iscomplexobj(cwtmatr): @@ -695,7 +637,9 @@ def freq_shift(tensor: np.ndarray, f_shift: float) -> np.ndarray: transformed (:class:`numpy.ndarray`): Tensor that has been frequency shifted along time dimension of size tensor.shape """ - sinusoid = np.exp(2j * np.pi * f_shift * np.arange(tensor.shape[0], dtype=np.float64)) + sinusoid = np.exp( + 2j * np.pi * f_shift * np.arange(tensor.shape[0], dtype=np.float64) + ) return np.multiply(tensor, np.asarray(sinusoid)) @@ -735,7 +679,9 @@ def freq_shift_avoid_aliasing( # Filter to remove out-of-band regions taps = low_pass(cutoff=1 / 4, transition_bandwidth=(0.5 - 1 / 4) / 4) tensor = sp.convolve(tensor, taps, mode="same") - tensor = tensor[: int(num_iq_samples * up)] # prune to be correct size out of filter + tensor = tensor[ + : int(num_iq_samples * up) + ] # prune to be correct size out of filter # Decimate back down to correct sample rate tensor = sp.resample_poly(tensor, down, up) @@ -775,7 +721,9 @@ def _fractional_shift_helper( return output -def fractional_shift(tensor: np.ndarray, taps: np.ndarray, stride: int, delay: float) -> np.ndarray: +def fractional_shift( + tensor: np.ndarray, taps: np.ndarray, stride: int, delay: float +) -> np.ndarray: """Applies fractional sample delay of delay using a polyphase interpolator Args: @@ -795,8 +743,12 @@ def fractional_shift(tensor: np.ndarray, taps: np.ndarray, stride: int, delay: f transformed (:class:`numpy.ndarray`): Tensor that has been fractionally-shifted along time dimension of size tensor.shape """ - real_part = _fractional_shift_helper(taps, tensor.real, stride, int(stride * float(delay))) - imag_part = _fractional_shift_helper(taps, tensor.imag, stride, int(stride * float(delay))) + real_part = _fractional_shift_helper( + taps, tensor.real, stride, int(stride * float(delay)) + ) + imag_part = _fractional_shift_helper( + taps, tensor.imag, stride, int(stride * float(delay)) + ) tensor = real_part[: tensor.shape[0]] + 1j * imag_part[: tensor.shape[0]] zero_idx = -1 if delay < 0 else 0 # do not extrapolate, zero-pad. tensor[zero_idx] = 0 @@ -838,9 +790,11 @@ def iq_imbalance( 1j * (np.pi / 2.0 + iq_phase_imbalance / 2.0) ) * np.imag(tensor) - tensor += 10 ** (iq_dc_offset_db / 10.0) * np.real(tensor) + 1j * 10 ** ( - iq_dc_offset_db / 10.0 - ) * np.imag(tensor) + tensor = ( + tensor + + 10 ** (iq_dc_offset_db / 10.0) * np.real(tensor) + + 1j * 10 ** (iq_dc_offset_db / 10.0) * np.imag(tensor) + ) return tensor @@ -944,8 +898,12 @@ def roll_off( fltorder += 1 bandwidth = uppercutfreq - lowercutfreq center_freq = lowercutfreq - 0.5 + bandwidth / 2 - taps = low_pass(cutoff=bandwidth / 2, transition_bandwidth=(0.5 - bandwidth / 2) / 4) - sinusoid = np.exp(2j * np.pi * center_freq * np.linspace(0, len(taps) - 1, len(taps))) + taps = low_pass( + cutoff=bandwidth / 2, transition_bandwidth=(0.5 - bandwidth / 2) / 4 + ) + sinusoid = np.exp( + 2j * np.pi * center_freq * np.linspace(0, len(taps) - 1, len(taps)) + ) taps = taps * sinusoid return sp.convolve(tensor, taps, mode="same") @@ -1022,17 +980,22 @@ def drop_samples( """ for idx, drop_start in enumerate(drop_starts): if fill == "ffill": - drop_region = np.ones(drop_sizes[idx], dtype=np.complex64) * tensor[drop_start - 1] + drop_region = ( + np.ones(drop_sizes[idx], dtype=np.complex64) * tensor[drop_start - 1] + ) elif fill == "bfill": drop_region = ( - np.ones(drop_sizes[idx], dtype=np.complex64) * tensor[drop_start + drop_sizes[idx]] + np.ones(drop_sizes[idx], dtype=np.complex64) + * tensor[drop_start + drop_sizes[idx]] ) elif fill == "mean": drop_region = np.ones(drop_sizes[idx], dtype=np.complex64) * np.mean(tensor) elif fill == "zero": drop_region = np.zeros(drop_sizes[idx], dtype=np.complex64) else: - raise ValueError("fill expects ffill, bfill, mean, or zero. Found {}".format(fill)) + raise ValueError( + "fill expects ffill, bfill, mean, or zero. Found {}".format(fill) + ) # Update drop region tensor[drop_start : drop_start + drop_sizes[idx]] = drop_region @@ -1224,7 +1187,9 @@ def agc( elif sample_idx == 0: # first sample, no smoothing level_db = np.log(np.abs(sample)) else: - level_db = level_db * alpha_smooth + np.log(np.abs(sample)) * (1 - alpha_smooth) + level_db = level_db * alpha_smooth + np.log(np.abs(sample)) * ( + 1 - alpha_smooth + ) output_db = level_db + gain_db diff_db = ref_level_db - output_db @@ -1284,7 +1249,11 @@ def cut_out( real_noise = np.random.randn(cut_mask_length) imag_noise = np.random.randn(cut_mask_length) noise_power_db = -100 - cut_mask = (10.0 ** (noise_power_db / 20.0)) * (real_noise + 1j * imag_noise) / np.sqrt(2) + cut_mask = ( + (10.0 ** (noise_power_db / 20.0)) + * (real_noise + 1j * imag_noise) + / np.sqrt(2) + ) elif cut_type == "avg_noise": real_noise = np.random.randn(cut_mask_length) imag_noise = np.random.randn(cut_mask_length) @@ -1294,7 +1263,11 @@ def cut_out( real_noise = np.random.randn(cut_mask_length) imag_noise = np.random.randn(cut_mask_length) noise_power_db = 40 - cut_mask = (10.0 ** (noise_power_db / 20.0)) * (real_noise + 1j * imag_noise) / np.sqrt(2) + cut_mask = ( + (10.0 ** (noise_power_db / 20.0)) + * (real_noise + 1j * imag_noise) + / np.sqrt(2) + ) else: raise ValueError( "cut_type must be: zeros, ones, low_noise, avg_noise, or high_noise. Found: {}".format( @@ -1377,34 +1350,52 @@ def drop_spec_samples( for idx, drop_start in enumerate(drop_starts): if fill == "ffill": drop_region_real = np.ones(drop_sizes[idx]) * flat_spec[0, drop_start - 1] - drop_region_complex = np.ones(drop_sizes[idx]) * flat_spec[1, drop_start - 1] + drop_region_complex = ( + np.ones(drop_sizes[idx]) * flat_spec[1, drop_start - 1] + ) flat_spec[0, drop_start : drop_start + drop_sizes[idx]] = drop_region_real - flat_spec[1, drop_start : drop_start + drop_sizes[idx]] = drop_region_complex + flat_spec[ + 1, drop_start : drop_start + drop_sizes[idx] + ] = drop_region_complex elif fill == "bfill": - drop_region_real = np.ones(drop_sizes[idx]) * flat_spec[0, drop_start + drop_sizes[idx]] + drop_region_real = ( + np.ones(drop_sizes[idx]) * flat_spec[0, drop_start + drop_sizes[idx]] + ) drop_region_complex = ( np.ones(drop_sizes[idx]) * flat_spec[1, drop_start + drop_sizes[idx]] ) flat_spec[0, drop_start : drop_start + drop_sizes[idx]] = drop_region_real - flat_spec[1, drop_start : drop_start + drop_sizes[idx]] = drop_region_complex + flat_spec[ + 1, drop_start : drop_start + drop_sizes[idx] + ] = drop_region_complex elif fill == "mean": drop_region_real = np.ones(drop_sizes[idx]) * np.mean(flat_spec[0]) drop_region_complex = np.ones(drop_sizes[idx]) * np.mean(flat_spec[1]) flat_spec[0, drop_start : drop_start + drop_sizes[idx]] = drop_region_real - flat_spec[1, drop_start : drop_start + drop_sizes[idx]] = drop_region_complex + flat_spec[ + 1, drop_start : drop_start + drop_sizes[idx] + ] = drop_region_complex elif fill == "zero": drop_region = np.zeros(drop_sizes[idx]) flat_spec[:, drop_start : drop_start + drop_sizes[idx]] = drop_region elif fill == "min": drop_region_real = np.ones(drop_sizes[idx]) * np.min(np.abs(flat_spec[0])) - drop_region_complex = np.ones(drop_sizes[idx]) * np.min(np.abs(flat_spec[1])) + drop_region_complex = np.ones(drop_sizes[idx]) * np.min( + np.abs(flat_spec[1]) + ) flat_spec[0, drop_start : drop_start + drop_sizes[idx]] = drop_region_real - flat_spec[1, drop_start : drop_start + drop_sizes[idx]] = drop_region_complex + flat_spec[ + 1, drop_start : drop_start + drop_sizes[idx] + ] = drop_region_complex elif fill == "max": drop_region_real = np.ones(drop_sizes[idx]) * np.max(np.abs(flat_spec[0])) - drop_region_complex = np.ones(drop_sizes[idx]) * np.max(np.abs(flat_spec[1])) + drop_region_complex = np.ones(drop_sizes[idx]) * np.max( + np.abs(flat_spec[1]) + ) flat_spec[0, drop_start : drop_start + drop_sizes[idx]] = drop_region_real - flat_spec[1, drop_start : drop_start + drop_sizes[idx]] = drop_region_complex + flat_spec[ + 1, drop_start : drop_start + drop_sizes[idx] + ] = drop_region_complex elif fill == "low": drop_region = np.ones(drop_sizes[idx]) * 1e-3 flat_spec[:, drop_start : drop_start + drop_sizes[idx]] = drop_region @@ -1413,7 +1404,9 @@ def drop_spec_samples( flat_spec[:, drop_start : drop_start + drop_sizes[idx]] = drop_region else: raise ValueError( - "fill expects ffill, bfill, mean, zero, min, max, low, ones. Found {}".format(fill) + "fill expects ffill, bfill, mean, zero, min, max, low, ones. Found {}".format( + fill + ) ) new_tensor = flat_spec.reshape(tensor.shape[0], tensor.shape[1], tensor.shape[2]) return new_tensor @@ -1517,13 +1510,14 @@ def spec_translate( return new_tensor + def spectrogram_image( - tensor: np.ndarray, - nperseg=512, - noverlap=0, - nfft=None, - mode='psd', - colormap='viridis', + tensor: np.ndarray, + nperseg=512, + noverlap=0, + nfft=None, + mode="psd", + colormap="viridis", ) -> np.ndarray: """Computes spectrogram of complex IQ vector @@ -1532,7 +1526,7 @@ def spectrogram_image( (batch_size, vector_length, ...)-sized tensor. nperseg (:obj:`int`): - Length of each segment. + Length of each segment. Default 512 noverlap (:obj:`int`): @@ -1542,15 +1536,15 @@ def spectrogram_image( nfft (:obj:`int`): Length of the FFT used, if a zero padded FFT is desired. Default same as nperseg - + mode (:obj:`str`): Mode of the spectrogram to be computed. Default psd - + colormap (:obj:'str'): Define OpenCV colormap to use for spectrogram image Default twilight - + Returns: transformed (:class:`numpy.ndarray`): Spectrogram of tensor along time dimension @@ -1558,20 +1552,18 @@ def spectrogram_image( if nfft is None: nfft = nperseg - spectrogram_np = spectrogram( + spec_data = spectrogram( tensor, nperseg=nperseg, noverlap=noverlap, nfft=nfft, window_fcn=np.blackman, - mode=mode + mode=mode, ) - spec_data = spectrogram_np - spec_data = spec_data * (1 / np.linalg.norm(spec_data.flatten(), ord=float('inf'), keepdim=True)) - spec = 20*np.log10(spec_data.numpy()) + flattened = spec_data.flatten() + spec_data = spec_data / np.linalg.norm(flattened, ord=np.inf, keepdims=True) + spec = 20 * np.log10(spec_data) img = np.zeros((spec.shape[0], spec.shape[1], 3), dtype=np.float32) img = cv2.normalize(spec, img, 0, 255, cv2.NORM_MINMAX) colormap = colormap.upper() - img_new = cv2.applyColorMap(img.astype(np.uint8), cv2.COLORMAP_+colormap) - - return img_new \ No newline at end of file + return cv2.applyColorMap(img.astype(np.uint8), cv2.COLORMAP_VIRIDIS + colormap) diff --git a/torchsig/transforms/transforms.py b/torchsig/transforms/transforms.py index e945190..ed595f6 100644 --- a/torchsig/transforms/transforms.py +++ b/torchsig/transforms/transforms.py @@ -1,21 +1,22 @@ -import warnings -from copy import deepcopy from typing import Any, Callable, List, Literal, Optional, Tuple, Union - -import numpy as np -from scipy import signal - from torchsig.transforms import functional as F from torchsig.transforms.functional import ( FloatParameter, IntParameter, NumericParameter, - to_distribution, - uniform_continuous_distribution, - uniform_discrete_distribution, ) from torchsig.utils.dataset import SignalDataset -from torchsig.utils.types import SignalData, SignalDescription +from torchsig.utils.types import ( + SignalData, + SignalDescription, + RandomDistribution, + UniformContinuousRD, + UniformDiscreteRD, +) +from copy import deepcopy +from scipy import signal +import numpy as np + __all__ = [ "Transform", @@ -89,14 +90,6 @@ class Transform: """ - def __init__(self, seed: Optional[int] = None) -> None: - if seed is not None: - warnings.warn( - "Seeding transforms is deprecated and does nothing", DeprecationWarning - ) - - self.random_generator = np.random.RandomState() - def __call__(self, data: Any) -> Any: raise NotImplementedError @@ -187,7 +180,7 @@ class FixedRandom(Transform): def __init__(self, transform: Transform, num_seeds: int, **kwargs) -> None: super(FixedRandom, self).__init__(**kwargs) self.transform = transform - self.num_seeds = num_seeds + self.seeds = UniformDiscreteRD(np.asarray(range(num_seeds))) self.string: str = ( self.__class__.__name__ + "(" @@ -200,7 +193,7 @@ def __repr__(self) -> str: return self.string def __call__(self, data: Any) -> Any: - seed = self.random_generator.choice(self.num_seeds) + seed = self.seeds() orig_state = ( np.random.get_state() ) # we do not want to somehow fix other random number generation processes. @@ -249,7 +242,7 @@ def __repr__(self) -> str: def __call__(self, data: Any) -> Any: return ( self.transform(data) - if self.random_generator.rand() < self.probability + if RandomDistribution.rng.random() < self.probability else data ) @@ -403,7 +396,7 @@ def __repr__(self) -> str: return self.string def __call__(self, data: Any) -> Any: - transforms = self.random_generator.choice( + transforms = RandomDistribution.rng.choice( self.transforms, # type: ignore size=self.num_transforms, replace=self.allow_multiple_same, @@ -456,7 +449,7 @@ def __repr__(self) -> str: return self.string def __call__(self, data: Any) -> Any: - t: SignalTransform = self.random_generator.choice( + t: SignalTransform = RandomDistribution.rng.choice( self.transforms, # type: ignore p=self.probabilities, ) @@ -516,14 +509,9 @@ class RandomResample(SignalTransform): """Resample using poly-phase rational resampling technique. Args: - rate_ratio (:py:class:`~Callable`, :obj:`int`, :obj:`float`, :obj:`list`, :obj:`tuple`): + rate_ratio (:py:class:`~torchsig.types.RandomDistribution`): new_rate = rate_ratio*old_rate - * If Callable, resamples to new_rate by calling rate_ratio() - * If int or float, rate_ratio is fixed by value provided - * If list, rate_ratio is any element in the list - * If tuple, rate_ratio is in range of (tuple[0], tuple[1]) - num_iq_samples (:obj:`int`): Since resampling changes the number of points in a tensor, it is necessary to designate how many samples should be returned. In the case more samples are produced, the last num_iq_samples of @@ -553,12 +541,12 @@ class RandomResample(SignalTransform): def __init__( self, - rate_ratio: NumericParameter = (1.5, 3.0), + rate_ratio: FloatParameter = UniformContinuousRD(1.5, 3.0), num_iq_samples: int = 4096, keep_samples: bool = False, ) -> None: super(RandomResample, self).__init__() - self.rate_ratio: Callable = to_distribution(rate_ratio, self.random_generator) + self.rate_ratio: Callable = RandomDistribution.to_distribution(rate_ratio) self.num_iq_samples = num_iq_samples self.keep_samples = keep_samples self.string: str = ( @@ -729,15 +717,10 @@ class TargetSNR(SignalTransform): warning. Args: - target_snr (:py:class:`~Callable`, :obj:`int`, :obj:`float`, :obj:`list`, :obj:`tuple`): + target_snr (:py:class:`~torchsig.types.RandomDistribution`): Defined as 10*log10(np.mean(np.abs(x)**2)/np.mean(np.abs(n)**2)) if in dB, np.mean(np.abs(x)**2)/np.mean(np.abs(n)**2) if linear. - * If Callable, produces a sample by calling target_snr() - * If int or float, target_snr is fixed at the value provided - * If list, target_snr is any element in the list - * If tuple, target_snr is in range of (tuple[0], tuple[1]) - eb_no (:obj:`bool`): Defines SNR as 10*log10(np.mean(np.abs(x)**2)/np.mean(np.abs(n)**2))*samples_per_symbol/bits_per_symbol. Defining SNR this way effectively normalized the noise level with respect to spectral efficiency and @@ -752,13 +735,13 @@ class TargetSNR(SignalTransform): def __init__( self, - target_snr: NumericParameter = uniform_continuous_distribution(-10, 10), + target_snr: FloatParameter = UniformContinuousRD(-10, 10), eb_no: bool = False, linear: bool = False, **kwargs, ) -> None: super(TargetSNR, self).__init__(**kwargs) - self.target_snr = to_distribution(target_snr, self.random_generator) + self.target_snr = RandomDistribution.to_distribution(target_snr) self.eb_no = eb_no self.linear = linear self.string = ( @@ -831,15 +814,10 @@ class AddNoise(SignalTransform): level of noise to either a narrowband or wideband input. Args: - noise_power_db (:py:class:`~Callable`, :obj:`int`, :obj:`float`, :obj:`list`, :obj:`tuple`): + noise_power_db (:py:class:`~torchsig.types.RandomDistribution`): Defined as 10*log10(np.mean(np.abs(x)**2)/np.mean(np.abs(n)**2)) if in dB, np.mean(np.abs(x)**2)/np.mean(np.abs(n)**2) if linear. - * If Callable, produces a sample by calling target_snr() - * If int or float, target_snr is fixed at the value provided - * If list, target_snr is any element in the list - * If tuple, target_snr is in range of (tuple[0], tuple[1]) - input_noise_floor_db (:obj:`float`): The noise floor of the input data in dB @@ -855,13 +833,13 @@ class AddNoise(SignalTransform): def __init__( self, - noise_power_db: NumericParameter = uniform_continuous_distribution(-80, -60), + noise_power_db: FloatParameter = UniformContinuousRD(-80, -60), input_noise_floor_db: float = 0.0, linear: bool = False, **kwargs, ) -> None: super(AddNoise, self).__init__(**kwargs) - self.noise_power_db = to_distribution(noise_power_db, self.random_generator) + self.noise_power_db = RandomDistribution.to_distribution(noise_power_db) self.input_noise_floor_db = input_noise_floor_db self.linear = linear self.string = ( @@ -936,36 +914,21 @@ class TimeVaryingNoise(SignalTransform): """Add time-varying random AWGN at specified input parameters Args: - noise_power_db_low (:py:class:`~Callable`, :obj:`int`, :obj:`float`, :obj:`list`, :obj:`tuple`): + noise_power_db_low (:py:class:`~torchsig.types.RandomDistribution`): Defined as 10*log10(np.mean(np.abs(x)**2)/np.mean(np.abs(n)**2)) if in dB, np.mean(np.abs(x)**2)/np.mean(np.abs(n)**2) if linear. - * If Callable, produces a sample by calling noise_power_db_low() - * If int or float, noise_power_db_low is fixed at the value provided - * If list, noise_power_db_low is any element in the list - * If tuple, noise_power_db_low is in range of (tuple[0], tuple[1]) - noise_power_db_high (:py:class:`~Callable`, :obj:`int`, :obj:`float`, :obj:`list`, :obj:`tuple`): + noise_power_db_high (:py:class:`~torchsig.types.RandomDistribution`): Defined as 10*log10(np.mean(np.abs(x)**2)/np.mean(np.abs(n)**2)) if in dB, np.mean(np.abs(x)**2)/np.mean(np.abs(n)**2) if linear. - * If Callable, produces a sample by calling noise_power_db_low() - * If int or float, noise_power_db_low is fixed at the value provided - * If list, noise_power_db_low is any element in the list - * If tuple, noise_power_db_low is in range of (tuple[0], tuple[1]) - inflections (:py:class:`~Callable`, :obj:`int`, :obj:`float`, :obj:`list`, :obj:`tuple`): + inflections (:py:class:`~torchsig.types.RandomDistribution`): Number of inflection points in time-varying noise - * If Callable, produces a sample by calling inflections() - * If int or float, inflections is fixed at the value provided - * If list, inflections is any element in the list - * If tuple, inflections is in range of (tuple[0], tuple[1]) - random_regions (:py:class:`~Callable`, :obj:`bool`, :obj:`list`, :obj:`tuple`): + random_regions (:py:class:`~torchsig.types.RandomDistribution`): If inflections > 0, random_regions specifies whether each inflection point should be randomly selected or evenly divided among input data - * If Callable, produces a sample by calling random_regions() - * If bool, random_regions is fixed at the value provided - * If list, random_regions is any element in the list linear (:obj:`bool`): If True, powers input are on linear scale not dB. @@ -974,22 +937,20 @@ class TimeVaryingNoise(SignalTransform): def __init__( self, - noise_power_db_low: NumericParameter = uniform_continuous_distribution( - -80, -60 - ), - noise_power_db_high: NumericParameter = uniform_continuous_distribution( - -40, -20 - ), - inflections: IntParameter = uniform_continuous_distribution(0, 10), + noise_power_db_low: FloatParameter = UniformContinuousRD(-80, -60), + noise_power_db_high: FloatParameter = UniformContinuousRD(-40, -20), + inflections: IntParameter = UniformDiscreteRD(np.arange(0, 10, dtype=int)), random_regions: Union[List, bool] = True, linear: bool = False, **kwargs, ) -> None: super(TimeVaryingNoise, self).__init__(**kwargs) - self.noise_power_db_low = to_distribution(noise_power_db_low) - self.noise_power_db_high = to_distribution(noise_power_db_high) - self.inflections = to_distribution(inflections) - self.random_regions = to_distribution(random_regions) + self.noise_power_db_low = RandomDistribution.to_distribution(noise_power_db_low) + self.noise_power_db_high = RandomDistribution.to_distribution( + noise_power_db_high + ) + self.inflections = RandomDistribution.to_distribution(inflections) + self.random_regions = RandomDistribution.to_distribution(random_regions) self.linear = linear self.string = ( self.__class__.__name__ @@ -1078,11 +1039,7 @@ class RayleighFadingChannel(SignalTransform): inversely proportional to the maximum Doppler spread. This time variance is not included in this model. Args: - coherence_bandwidth (:py:class:`~Callable`, :obj:`int`, :obj:`float`, :obj:`list`, :obj:`tuple`): - * If Callable, produces a sample by calling coherence_bandwidth() - * If int or float, coherence_bandwidth is fixed at the value provided - * If list, coherence_bandwidth is any element in the list - * If tuple, coherence_bandwidth is in range of (tuple[0], tuple[1]) + coherence_bandwidth (:py:class:`~torchsig.types.RandomDistribution`): power_delay_profile (:obj:`list`, :obj:`tuple`): A list of positive values assigning power to taps of the channel model. When the number of taps @@ -1107,15 +1064,13 @@ class RayleighFadingChannel(SignalTransform): def __init__( self, - coherence_bandwidth: FloatParameter = uniform_continuous_distribution( - 0.01, 0.1 - ), + coherence_bandwidth: FloatParameter = UniformContinuousRD(0.01, 0.1), power_delay_profile: Union[Tuple, List, np.ndarray] = (1, 1), **kwargs, ) -> None: super(RayleighFadingChannel, self).__init__(**kwargs) - self.coherence_bandwidth = to_distribution( - coherence_bandwidth, self.random_generator + self.coherence_bandwidth = RandomDistribution.to_distribution( + coherence_bandwidth ) self.power_delay_profile = np.asarray(power_delay_profile) self.string = ( @@ -1147,29 +1102,21 @@ class ImpulseInterferer(SignalTransform): """Applies an impulse interferer Args: - amp (:py:class:`~Callable`, :obj:`int`, :obj:`float`, :obj:`list`, :obj:`tuple`): - * If Callable, produces a sample by calling amp() - * If int or float, amp is fixed at the value provided - * If list, amp is any element in the list - * If tuple, amp is in range of (tuple[0], tuple[1]) - - pulse_offset (:py:class:`~Callable`, :obj:`int`, :obj:`float`, :obj:`list`, :obj:`tuple`): - * If Callable, produces a sample by calling phase_offset() - * If int or float, pulse_offset is fixed at the value provided - * If list, phase_offset is any element in the list - * If tuple, phase_offset is in range of (tuple[0], tuple[1]) + amp (:py:class:`~torchsig.types.RandomDistribution`): + + pulse_offset (:py:class:`~torchsig.types.RandomDistribution`): """ def __init__( self, - amp: FloatParameter = uniform_continuous_distribution(0.1, 100.0), - pulse_offset: FloatParameter = uniform_continuous_distribution(0.0, 1), + amp: FloatParameter = UniformContinuousRD(0.1, 100.0), + pulse_offset: FloatParameter = UniformContinuousRD(0.0, 1), **kwargs, ) -> None: super(ImpulseInterferer, self).__init__(**kwargs) - self.amp = to_distribution(amp, self.random_generator) - self.pulse_offset = to_distribution(pulse_offset, self.random_generator) + self.amp = RandomDistribution.to_distribution(amp) + self.pulse_offset = RandomDistribution.to_distribution(pulse_offset) self.string = ( self.__class__.__name__ + "(" @@ -1199,16 +1146,12 @@ class RandomPhaseShift(SignalTransform): """Applies a random phase offset to tensor Args: - phase_offset (:py:class:`~Callable`, :obj:`int`, :obj:`float`, :obj:`list`, :obj:`tuple`): - * If Callable, produces a sample by calling phase_offset() - * If int or float, phase_offset is fixed at the value provided - * If list, phase_offset is any element in the list - * If tuple, phase_offset is in range of (tuple[0], tuple[1]) + phase_offset (:py:class:`~torchsig.types.RandomDistribution`): Example: >>> import torchsig.transforms as ST >>> # Phase Offset in range [-pi, pi] - >>> transform = ST.RandomPhaseShift(uniform_continuous_distribution(-1, 1)) + >>> transform = ST.RandomPhaseShift(UniformContinuousRD(-1, 1)) >>> # Phase Offset from [-pi/2, 0, and pi/2] >>> transform = ST.RandomPhaseShift(uniform_discrete_distribution([-.5, 0, .5])) >>> # Phase Offset in range [-pi, pi] @@ -1221,11 +1164,11 @@ class RandomPhaseShift(SignalTransform): def __init__( self, - phase_offset: FloatParameter = uniform_continuous_distribution(-1, 1), + phase_offset: FloatParameter = UniformContinuousRD(-1, 1), **kwargs, ) -> None: super(RandomPhaseShift, self).__init__(**kwargs) - self.phase_offset = to_distribution(phase_offset, self.random_generator) + self.phase_offset = RandomDistribution.to_distribution(phase_offset) self.string = ( self.__class__.__name__ + "(" + "phase_offset={}".format(phase_offset) + ")" ) @@ -1638,11 +1581,7 @@ class RandomTimeShift(SignalTransform): """Shifts tensor in the time dimension by shift samples. Zero-padding is applied to maintain input size. Args: - shift (:py:class:`~Callable`, :obj:`int`, :obj:`float`, :obj:`list`, :obj:`tuple`): - * If Callable, produces a sample by calling shift() - * If int or float, shift is fixed at the value provided - * If list, shift is any element in the list - * If tuple, shift is in range of (tuple[0], tuple[1]) + shift (:py:class:`~torchsig.types.RandomDistribution`): interp_rate (:obj:`int`): Interpolation rate used by internal interpolation filter @@ -1669,12 +1608,12 @@ class RandomTimeShift(SignalTransform): def __init__( self, - shift: NumericParameter = (-10, 10), + shift: FloatParameter = UniformContinuousRD(-10, 10), interp_rate: int = 100, taps_per_arm: int = 24, ) -> None: super(RandomTimeShift, self).__init__() - self.shift = to_distribution(shift, self.random_generator) + self.shift = RandomDistribution.to_distribution(shift) self.interp_rate = interp_rate num_taps = int(taps_per_arm * interp_rate) self.taps = ( @@ -2015,11 +1954,7 @@ class RandomFrequencyShift(SignalTransform): """Shifts each tensor in freq by freq_shift along the time dimension. Args: - freq_shift (:py:class:`~Callable`, :obj:`int`, :obj:`float`, :obj:`list`, :obj:`tuple`): - * If Callable, produces a sample by calling freq_shift() - * If int or float, freq_shift is fixed at the value provided - * If list, freq_shift is any element in the list - * If tuple, freq_shift is in range of (tuple[0], tuple[1]) + freq_shift (:py:class:`~torchsig.types.RandomDistribution`): Example: >>> import torchsig.transforms as ST @@ -2034,9 +1969,11 @@ class RandomFrequencyShift(SignalTransform): """ - def __init__(self, freq_shift: NumericParameter = (-0.5, 0.5)) -> None: + def __init__( + self, freq_shift: FloatParameter = UniformContinuousRD(-0.5, 0.5) + ) -> None: super(RandomFrequencyShift, self).__init__() - self.freq_shift = to_distribution(freq_shift, self.random_generator) + self.freq_shift = RandomDistribution.to_distribution(freq_shift) self.string = ( self.__class__.__name__ + "(" + "freq_shift={}".format(freq_shift) + ")" ) @@ -2147,30 +2084,18 @@ class RandomDelayedFrequencyShift(SignalTransform): """Apply a delayed frequency shift to the input data Args: - start_shift (:py:class:`~Callable`, :obj:`int`, :obj:`float`, :obj:`list`, :obj:`tuple`): - start_shift sets the start time of the delayed shift - * If Callable, produces a sample by calling start_shift() - * If int, start_shift is fixed at the value provided - * If list, start_shift is any element in the list - * If tuple, start_shift is in range of (tuple[0], tuple[1]) - - freq_shift (:py:class:`~Callable`, :obj:`int`, :obj:`float`, :obj:`list`, :obj:`tuple`): - freq_shift sets the translation along the freq-axis - * If Callable, produces a sample by calling freq_shift() - * If int, freq_shift is fixed at the value provided - * If list, freq_shift is any element in the list - * If tuple, freq_shift is in range of (tuple[0], tuple[1]) - + start_shift (:py:class:`~RandomDistribution`): + freq_shift (:py:class:`~RandomDistribution`): """ def __init__( self, - start_shift: FloatParameter = (0.1, 0.9), - freq_shift: FloatParameter = (-0.2, 0.2), + start_shift: FloatParameter = UniformContinuousRD(0.1, 0.9), + freq_shift: FloatParameter = UniformContinuousRD(-0.2, 0.2), ) -> None: super(RandomDelayedFrequencyShift, self).__init__() - self.start_shift = to_distribution(start_shift, self.random_generator) - self.freq_shift = to_distribution(freq_shift, self.random_generator) + self.start_shift = RandomDistribution.to_distribution(start_shift) + self.freq_shift = RandomDistribution.to_distribution(freq_shift) self.string = ( self.__class__.__name__ + "(" @@ -2183,9 +2108,9 @@ def __repr__(self) -> str: return self.string def __call__(self, data: Any) -> Any: - start_shift = self.start_shift() + start_shift: float = self.start_shift() # Randomly generate a freq shift that is not near the original fc - freq_shift = 0 + freq_shift: float = 0.0 while freq_shift < 0.05 and freq_shift > -0.05: freq_shift = self.freq_shift() @@ -2355,21 +2280,21 @@ class LocalOscillatorDrift(SignalTransform): Args: max_drift (FloatParameter, optional): - [description]. Defaults to uniform_continuous_distribution(0.005,0.015). + [description]. Defaults to UniformContinuousRD(0.005,0.015). max_drift_rate (FloatParameter, optional): - [description]. Defaults to uniform_continuous_distribution(0.001,0.01). + [description]. Defaults to UniformContinuousRD(0.001,0.01). """ def __init__( self, - max_drift: FloatParameter = uniform_continuous_distribution(0.005, 0.015), - max_drift_rate: FloatParameter = uniform_continuous_distribution(0.001, 0.01), + max_drift: FloatParameter = UniformContinuousRD(0.005, 0.015), + max_drift_rate: FloatParameter = UniformContinuousRD(0.001, 0.01), **kwargs, ) -> None: super(LocalOscillatorDrift, self).__init__(**kwargs) - self.max_drift = to_distribution(max_drift, self.random_generator) - self.max_drift_rate = to_distribution(max_drift_rate, self.random_generator) + self.max_drift = RandomDistribution.to_distribution(max_drift) + self.max_drift_rate = RandomDistribution.to_distribution(max_drift_rate) self.string = ( self.__class__.__name__ + "(" @@ -2389,7 +2314,7 @@ def __call__(self, data: Any) -> Any: assert iq_data is not None # Apply drift as a random walk. - random_walk = self.random_generator.choice([-1, 1], size=iq_data.shape[0]) + random_walk = RandomDistribution.rng.choice([-1, 1], size=iq_data.shape[0]) # limit rate of change to at most 1/max_drift_rate times the length of the data sample frequency = np.cumsum(random_walk) * max_drift_rate / np.sqrt(iq_data.shape[0]) @@ -2450,25 +2375,25 @@ class GainDrift(SignalTransform): Args: max_drift (FloatParameter, optional): - [description]. Defaults to uniform_continuous_distribution(0.005,0.015). + [description]. Defaults to UniformContinuousRD(0.005,0.015). min_drift (FloatParameter, optional): - [description]. Defaults to uniform_continuous_distribution(0.005,0.015). + [description]. Defaults to UniformContinuousRD(0.005,0.015). drift_rate (FloatParameter, optional): - [description]. Defaults to uniform_continuous_distribution(0.001,0.01). + [description]. Defaults to UniformContinuousRD(0.001,0.01). """ def __init__( self, - max_drift: FloatParameter = uniform_continuous_distribution(0.005, 0.015), - min_drift: FloatParameter = uniform_continuous_distribution(0.005, 0.015), - drift_rate: FloatParameter = uniform_continuous_distribution(0.001, 0.01), + max_drift: FloatParameter = UniformContinuousRD(0.005, 0.015), + min_drift: FloatParameter = UniformContinuousRD(0.005, 0.015), + drift_rate: FloatParameter = UniformContinuousRD(0.001, 0.01), **kwargs, ) -> None: super(GainDrift, self).__init__(**kwargs) - self.max_drift = to_distribution(max_drift, self.random_generator) - self.min_drift = to_distribution(min_drift, self.random_generator) - self.drift_rate = to_distribution(drift_rate, self.random_generator) + self.max_drift = RandomDistribution.to_distribution(max_drift) + self.min_drift = RandomDistribution.to_distribution(min_drift) + self.drift_rate = RandomDistribution.to_distribution(drift_rate) self.string = ( self.__class__.__name__ + "(" @@ -2490,7 +2415,7 @@ def __call__(self, data: Any) -> Any: assert iq_data is not None # Apply drift as a random walk. - random_walk = self.random_generator.choice([-1, 1], size=iq_data.shape[0]) + random_walk = RandomDistribution.rng.choice([-1, 1], size=iq_data.shape[0]) # limit rate of change to at most 1/max_drift_rate times the length of the data sample gain = np.cumsum(random_walk) * drift_rate / np.sqrt(iq_data.shape[0]) @@ -2526,12 +2451,8 @@ class AutomaticGainControl(SignalTransform): """Automatic gain control (AGC) implementation Args: - rand_scale (:py:class:`~Callable`, :obj:`int`, :obj:`float`, :obj:`list`, :obj:`tuple`): + rand_scale (:py:class:`~torchsig.types.RandomDistribution`): Random scaling of alpha values - * If Callable, produces a sample by calling rand_scale() - * If int or float, rand_scale is fixed at the value provided - * If list, rand_scale is any element in the list - * If tuple, rand_scale is in range of (tuple[0], tuple[1]) initial_gain_db (:obj:`float`): Initial gain value in linear units @@ -2580,7 +2501,7 @@ def __init__( high_level_db: float = 6.0, ) -> None: super(AutomaticGainControl, self).__init__() - self.rand_scale = to_distribution(rand_scale, self.random_generator) + self.rand_scale = RandomDistribution.to_distribution(rand_scale) self.initial_gain_db = initial_gain_db self.alpha_smooth = alpha_smooth self.alpha_overflow = alpha_overflow @@ -2662,23 +2583,11 @@ class IQImbalance(SignalTransform): """Applies various types of IQ imbalance to a tensor Args: - iq_amplitude_imbalance_db (:py:class:`~Callable`, :obj:`int`, :obj:`float`, :obj:`list`, :obj:`tuple`): - * If Callable, produces a sample by calling iq_amplitude_imbalance() - * If int or float, iq_amplitude_imbalance is fixed at the value provided - * If list, iq_amplitude_imbalance is any element in the list - * If tuple, iq_amplitude_imbalance is in range of (tuple[0], tuple[1]) - - iq_phase_imbalance (:py:class:`~Callable`, :obj:`int`, :obj:`float`, :obj:`list`, :obj:`tuple`): - * If Callable, produces a sample by calling iq_phase_imbalance() - * If int or float, iq_phase_imbalance is fixed at the value provided - * If list, iq_phase_imbalance is any element in the list - * If tuple, iq_phase_imbalance is in range of (tuple[0], tuple[1]) - - iq_dc_offset_db (:py:class:`~Callable`, :obj:`int`, :obj:`float`, :obj:`list`, :obj:`tuple`): - * If Callable, produces a sample by calling iq_dc_offset() - * If int or float, iq_dc_offset_db is fixed at the value provided - * If list, iq_dc_offset is any element in the list - * If tuple, iq_dc_offset is in range of (tuple[0], tuple[1]) + iq_amplitude_imbalance_db (:py:class:`~torchsig.types.RandomDistribution`): + + iq_phase_imbalance (:py:class:`~torchsig.types.RandomDistribution`): + + iq_dc_offset_db (:py:class:`~torchsig.types.RandomDistribution`): Note: For more information about IQ imbalance in RF systems, check out @@ -2693,21 +2602,18 @@ class IQImbalance(SignalTransform): def __init__( self, - iq_amplitude_imbalance_db: NumericParameter = (0, 3), - iq_phase_imbalance: NumericParameter = ( - -np.pi * 1.0 / 180.0, - np.pi * 1.0 / 180.0, + iq_amplitude_imbalance_db=UniformContinuousRD(0, 3), + iq_phase_imbalance=UniformContinuousRD( + -np.pi * 1.0 / 180.0, np.pi * 1.0 / 180.0 ), - iq_dc_offset_db: NumericParameter = (-0.1, 0.1), + iq_dc_offset_db=UniformContinuousRD(-0.1, 0.1), ) -> None: super(IQImbalance, self).__init__() - self.amp_imbalance = to_distribution( - iq_amplitude_imbalance_db, self.random_generator - ) - self.phase_imbalance = to_distribution( - iq_phase_imbalance, self.random_generator + self.amp_imbalance = RandomDistribution.to_distribution( + iq_amplitude_imbalance_db ) - self.dc_offset = to_distribution(iq_dc_offset_db, self.random_generator) + self.phase_imbalance = RandomDistribution.to_distribution(iq_phase_imbalance) + self.dc_offset = RandomDistribution.to_distribution(iq_dc_offset_db) self.string = ( self.__class__.__name__ + "(" @@ -2739,17 +2645,9 @@ class RollOff(SignalTransform): """Applies a band-edge RF roll-off effect simulating front end filtering Args: - low_freq (:py:class:`~Callable`, :obj:`int`, :obj:`float`, :obj:`list`, :obj:`tuple`): - * If Callable, produces a sample by calling low_freq() - * If int or float, low_freq is fixed at the value provided - * If list, low_freq is any element in the list - * If tuple, low_freq is in range of (tuple[0], tuple[1]) - - upper_freq (:py:class:`~Callable`, :obj:`int`, :obj:`float`, :obj:`list`, :obj:`tuple`): - * If Callable, produces a sample by calling upper_freq() - * If int or float, upper_freq is fixed at the value provided - * If list, upper_freq is any element in the list - * If tuple, upper_freq is in range of (tuple[0], tuple[1]) + low_freq (:py:class:`~torchsig.types.RandomDistribution`): + + upper_freq (:py:class:`~torchsig.types.RandomDistribution`): low_cut_apply (:obj:`float`): Probability that the low frequency provided above is applied @@ -2757,28 +2655,24 @@ class RollOff(SignalTransform): upper_cut_apply (:obj:`float`): Probability that the upper frequency provided above is applied - order (:py:class:`~Callable`, :obj:`int`, :obj:`float`, :obj:`list`, :obj:`tuple`): - * If Callable, produces a sample by calling order() - * If int or float, order is fixed at the value provided - * If list, order is any element in the list - * If tuple, order is in range of (tuple[0], tuple[1]) + order (:py:class:`~torchsig.types.RandomDistribution`): """ def __init__( self, - low_freq: NumericParameter = (0.00, 0.05), - upper_freq: NumericParameter = (0.95, 1.00), + low_freq: FloatParameter = UniformContinuousRD(0.00, 0.05), + upper_freq: FloatParameter = UniformContinuousRD(0.95, 1.00), low_cut_apply: float = 0.5, upper_cut_apply: float = 0.5, - order: NumericParameter = (6, 20), + order: FloatParameter = UniformContinuousRD(6, 20), ) -> None: super(RollOff, self).__init__() - self.low_freq = to_distribution(low_freq, self.random_generator) - self.upper_freq = to_distribution(upper_freq, self.random_generator) + self.low_freq = RandomDistribution.to_distribution(low_freq) + self.upper_freq = RandomDistribution.to_distribution(upper_freq) self.low_cut_apply = low_cut_apply self.upper_cut_apply = upper_cut_apply - self.order = to_distribution(order, self.random_generator) + self.order = RandomDistribution.to_distribution(order) self.string = ( self.__class__.__name__ + "(" @@ -2942,30 +2836,22 @@ class RandomMagRescale(SignalTransform): gain control Args: - start (:py:class:`~Callable`, :obj:`int`, :obj:`float`, :obj:`list`, :obj:`tuple`): + start (:py:class:`~torchsig.types.RandomDistribution`): start sets the time when the rescaling kicks in - * If Callable, produces a sample by calling start() - * If int or float, start is fixed at the value provided - * If list, start is any element in the list - * If tuple, start is in range of (tuple[0], tuple[1]) - scale (:py:class:`~Callable`, :obj:`int`, :obj:`float`, :obj:`list`, :obj:`tuple`): + scale (:py:class:`~torchsig.types.RandomDistribution`): scale sets the magnitude of the rescale - * If Callable, produces a sample by calling scale() - * If int or float, scale is fixed at the value provided - * If list, scale is any element in the list - * If tuple, scale is in range of (tuple[0], tuple[1]) """ def __init__( self, - start: NumericParameter = (0.0, 0.9), - scale: NumericParameter = (-4.0, 4.0), + start: FloatParameter = UniformContinuousRD(0.0, 0.9), + scale: FloatParameter = UniformContinuousRD(-4.0, 4.0), ) -> None: super(RandomMagRescale, self).__init__() - self.start = to_distribution(start, self.random_generator) - self.scale = to_distribution(scale, self.random_generator) + self.start = RandomDistribution.to_distribution(start) + self.scale = RandomDistribution.to_distribution(scale) self.string = ( self.__class__.__name__ + "(" @@ -3011,38 +2897,27 @@ class RandomDropSamples(SignalTransform): `TSAug Dropout Transform `_. Args: - drop_rate (:py:class:`~Callable`, :obj:`int`, :obj:`float`, :obj:`list`, :obj:`tuple`): + drop_rate (:py:class:`~torchsig.types.RandomDistribution`): drop_rate sets the rate at which to drop samples - * If Callable, produces a sample by calling drop_rate() - * If int or float, drop_rate is fixed at the value provided - * If list, drop_rate is any element in the list - * If tuple, drop_rate is in range of (tuple[0], tuple[1]) - size (:py:class:`~Callable`, :obj:`int`, :obj:`float`, :obj:`list`, :obj:`tuple`): + size (:py:class:`~torchsig.types.RandomDistribution`): size sets the size of each instance of dropped samples - * If Callable, produces a sample by calling size() - * If int or float, size is fixed at the value provided - * If list, size is any element in the list - * If tuple, size is in range of (tuple[0], tuple[1]) fill (:py:class:`~Callable`, :obj:`list`, :obj:`str`): fill sets the method of how the dropped samples should be filled - * If Callable, produces a sample by calling fill() - * If list, fill is any element in the list - * If str, fill is fixed at the method provided """ def __init__( self, - drop_rate: NumericParameter = (0.01, 0.05), - size: NumericParameter = (1, 10), + drop_rate: FloatParameter = UniformContinuousRD(0.01, 0.05), + size: IntParameter = UniformDiscreteRD(np.arange(1, 10, dtype=int)), fill: List[str] = (["ffill", "bfill", "mean", "zero"]), ) -> None: super(RandomDropSamples, self).__init__() - self.drop_rate = to_distribution(drop_rate, self.random_generator) - self.size = to_distribution(size, self.random_generator) - self.fill = to_distribution(fill, self.random_generator) + self.drop_rate = RandomDistribution.to_distribution(drop_rate) + self.size = RandomDistribution.to_distribution(size) + self.fill = RandomDistribution.to_distribution(fill) self.string = ( self.__class__.__name__ + "(" @@ -3096,29 +2971,25 @@ class Quantize(SignalTransform): """Quantize the input to the number of levels specified Args: - num_levels (:py:class:`~Callable`, :obj:`int`, :obj:`float`, :obj:`list`, :obj:`tuple`): + num_levels (:py:class:`~torchsig.types.RandomDistribution`): num_levels sets the number of quantization levels - * If Callable, produces a sample by calling num_levels() - * If int or float, num_levels is fixed at the value provided - * If list, num_levels is any element in the list - * If tuple, num_levels is in range of (tuple[0], tuple[1]) round_type (:py:class:`~Callable`, :obj:`str`, :obj:`list`): round_type sets the rounding direction of the quantization. Options include: 'floor', 'middle', & 'ceiling' - * If Callable, produces a sample by calling round_type() - * If str, round_type is fixed at the value provided - * If list, round_type is any element in the list + """ def __init__( self, - num_levels: NumericParameter = ([16, 24, 32, 40, 48, 56, 64]), + num_levels: IntParameter = UniformDiscreteRD( + np.asarray([16, 24, 32, 40, 48, 56, 64], dtype=int) + ), round_type: List[str] = (["floor", "middle", "ceiling"]), ) -> None: super(Quantize, self).__init__() - self.num_levels = to_distribution(num_levels, self.random_generator) - self.round_type = to_distribution(round_type, self.random_generator) + self.num_levels = RandomDistribution.to_distribution(num_levels) + self.round_type = RandomDistribution.to_distribution(round_type) self.string = ( self.__class__.__name__ + "(" @@ -3156,22 +3027,19 @@ class Clip(SignalTransform): """Clips the input values to a percentage of the max/min values Args: - clip_percentage (:py:class:`~Callable`, :obj:`int`, :obj:`float`, :obj:`list`, :obj:`tuple`): + clip_percentage (:py:class:`~torchsig.types.RandomDistribution`): Specifies the percentage of the max/min values to clip - * If Callable, produces a sample by calling clip_percentage() - * If int or float, clip_percentage is fixed at the value provided - * If list, clip_percentage is any element in the list - * If tuple, clip_percentage is in range of (tuple[0], tuple[1]) + """ def __init__( self, - clip_percentage: NumericParameter = (0.75, 0.95), + clip_percentage: FloatParameter = UniformContinuousRD(0.75, 0.95), **kwargs, ) -> None: super(Clip, self).__init__(**kwargs) - self.clip_percentage = to_distribution(clip_percentage) + self.clip_percentage = RandomDistribution.to_distribution(clip_percentage) self.string = ( self.__class__.__name__ + "(" @@ -3208,35 +3076,28 @@ class RandomConvolve(SignalTransform): """Convolve a random complex filter with the input data Args: - num_taps (:py:class:`~Callable`, :obj:`int`, :obj:`float`, :obj:`list`, :obj:`tuple`): + num_taps (:py:class:`~torchsig.types.RandomDistribution`): Number of taps for the random filter - * If Callable, produces a sample by calling num_taps() - * If int or float, num_taps is fixed at the value provided - * If list, num_taps is any element in the list - * If tuple, num_taps is in range of (tuple[0], tuple[1]) - alpha (:py:class:`~Callable`, :obj:`int`, :obj:`float`, :obj:`list`, :obj:`tuple`): + + alpha (:py:class:`~torchsig.types.RandomDistribution`): The effect of the filtered data is dampened using an alpha factor that determines the weightings for the summing of the filtered data and the original data. `alpha` should be in range `[0,1]` where a value of 0 applies all of the weight to the original data, and a value of 1 applies all of the weight to the filtered data - * If Callable, produces a sample by calling alpha() - * If int or float, alpha is fixed at the value provided - * If list, alpha is any element in the list - * If tuple, alpha is in range of (tuple[0], tuple[1]) """ def __init__( self, - num_taps: IntParameter = (2, 5), - alpha: FloatParameter = (0.1, 0.5), + num_taps: IntParameter = UniformDiscreteRD(np.arange(2, 5, dtype=int)), + alpha: FloatParameter = UniformContinuousRD(0.1, 0.5), **kwargs, ) -> None: super(RandomConvolve, self).__init__(**kwargs) - self.num_taps = to_distribution(num_taps, self.random_generator) - self.alpha = to_distribution(alpha, self.random_generator) + self.num_taps = RandomDistribution.to_distribution(num_taps) + self.alpha = RandomDistribution.to_distribution(alpha) self.string = ( self.__class__.__name__ + "(" @@ -3291,13 +3152,10 @@ class DatasetBasebandMixUp(SignalTransform): A SignalDataset of complex-valued examples to be used as a source for the synthetic insertion/mixup - alpha (:py:class:`~Callable`, :obj:`int`, :obj:`float`, :obj:`list`, :obj:`tuple`): + alpha (:py:class:`~torchsig.types.RandomDistribution`): alpha sets the difference in power level between the main dataset example and the inserted example - * If Callable, produces a sample by calling target_snr() - * If int or float, target_snr is fixed at the value provided - * If list, target_snr is any element in the list - * If tuple, target_snr is in range of (tuple[0], tuple[1]) + Example: >>> import torchsig.transforms as ST @@ -3318,10 +3176,10 @@ class DatasetBasebandMixUp(SignalTransform): def __init__( self, dataset: SignalDataset, - alpha: NumericParameter = (-5, -3), + alpha: FloatParameter = UniformContinuousRD(-5, -3), ) -> None: super(DatasetBasebandMixUp, self).__init__() - self.alpha = to_distribution(alpha, self.random_generator) + self.alpha = RandomDistribution.to_distribution(alpha) self.dataset = dataset self.dataset_num_samples = len(dataset) self.string = ( @@ -3437,13 +3295,10 @@ class DatasetBasebandCutMix(SignalTransform): An SignalDataset of complex-valued examples to be used as a source for the synthetic insertion/mixup - alpha (:py:class:`~Callable`, :obj:`int`, :obj:`float`, :obj:`list`, :obj:`tuple`): + alpha (:py:class:`~torchsig.types.RandomDistribution`): alpha sets the difference in power level between the main dataset example and the inserted example - * If Callable, produces a sample by calling target_snr() - * If int or float, target_snr is fixed at the value provided - * If list, target_snr is any element in the list - * If tuple, target_snr is in range of (tuple[0], tuple[1]) + Example: >>> import torchsig.transforms as ST @@ -3464,10 +3319,10 @@ class DatasetBasebandCutMix(SignalTransform): def __init__( self, dataset: SignalDataset, - alpha: NumericParameter = (0.2, 0.5), + alpha: FloatParameter = UniformContinuousRD(0.2, 0.5), ) -> None: super(DatasetBasebandCutMix, self).__init__() - self.alpha = to_distribution(alpha, self.random_generator) + self.alpha = RandomDistribution.to_distribution(alpha) self.dataset = dataset self.dataset_num_samples = len(dataset) self.string = ( @@ -3620,33 +3475,25 @@ class CutOut(SignalTransform): `"Improved Regularization of Convolutional Neural Networks with Cutout" `_. Args: - cut_dur (:py:class:`~Callable`, :obj:`int`, :obj:`float`, :obj:`list`, :obj:`tuple`): + cut_dur (:py:class:`~torchsig.types.RandomDistribution`): cut_dur sets the duration of the region to cut out - * If Callable, produces a sample by calling cut_dur() - * If int or float, cut_dur is fixed at the value provided - * If list, cut_dur is any element in the list - * If tuple, cut_dur is in range of (tuple[0], tuple[1]) cut_type (:py:class:`~Callable`, :obj:`list`, :obj:`str`): cut_type sets the type of data to fill in the cut region with from the options: `zeros`, `ones`, `low_noise`, `avg_noise`, and `high_noise` - * If Callable, produces a sample by calling cut_type() - * If list, cut_type is any element in the list - * If str, cut_type is fixed at the method provided - """ def __init__( self, - cut_dur: NumericParameter = (0.01, 0.2), + cut_dur: FloatParameter = UniformContinuousRD(0.01, 0.2), cut_type: List[str] = ( ["zeros", "ones", "low_noise", "avg_noise", "high_noise"] ), ) -> None: super(CutOut, self).__init__() - self.cut_dur = to_distribution(cut_dur, self.random_generator) - self.cut_type = to_distribution(cut_type, self.random_generator) + self.cut_dur = RandomDistribution.to_distribution(cut_dur) + self.cut_type = RandomDistribution.to_distribution(cut_type) self.string = ( self.__class__.__name__ + "(" @@ -3739,30 +3586,23 @@ class PatchShuffle(SignalTransform): `"PatchShuffle Regularization" `_. Args: - patch_size (:py:class:`~Callable`, :obj:`int`, :obj:`float`, :obj:`list`, :obj:`tuple`): + patch_size (:py:class:`~torchsig.types.RandomDistribution`): patch_size sets the size of each patch to shuffle - * If Callable, produces a sample by calling patch_size() - * If int or float, patch_size is fixed at the value provided - * If list, patch_size is any element in the list - * If tuple, patch_size is in range of (tuple[0], tuple[1]) - shuffle_ratio (:py:class:`~Callable`, :obj:`int`, :obj:`float`, :obj:`list`, :obj:`tuple`): + shuffle_ratio (:py:class:`~torchsig.types.RandomDistribution`): shuffle_ratio sets the ratio of the patches to shuffle - * If Callable, produces a sample by calling shuffle_ratio() - * If int or float, shuffle_ratio is fixed at the value provided - * If list, shuffle_ratio is any element in the list - * If tuple, shuffle_ratio is in range of (tuple[0], tuple[1]) + """ def __init__( self, - patch_size: NumericParameter = (3, 10), - shuffle_ratio: FloatParameter = (0.01, 0.05), + patch_size: IntParameter = UniformDiscreteRD(np.arange(3, 10, dtype=int)), + shuffle_ratio: FloatParameter = UniformContinuousRD(0.01, 0.05), ) -> None: super(PatchShuffle, self).__init__() - self.patch_size = to_distribution(patch_size, self.random_generator) - self.shuffle_ratio = to_distribution(shuffle_ratio, self.random_generator) + self.patch_size = RandomDistribution.to_distribution(patch_size) + self.shuffle_ratio = RandomDistribution.to_distribution(shuffle_ratio) self.string = ( self.__class__.__name__ + "(" @@ -3812,13 +3652,9 @@ class DatasetWidebandCutMix(SignalTransform): An SignalDataset of complex-valued examples to be used as a source for the synthetic insertion/mixup - alpha (:py:class:`~Callable`, :obj:`int`, :obj:`float`, :obj:`list`, :obj:`tuple`): + alpha (:py:class:`~torchsig.types.RandomDistribution`): alpha sets the difference in durations between the main dataset example and the inserted example - * If Callable, produces a sample by calling alpha() - * If int or float, alpha is fixed at the value provided - * If list, alpha is any element in the list - * If tuple, alpha is in range of (tuple[0], tuple[1]) Example: >>> import torchsig.transforms as ST @@ -3832,10 +3668,10 @@ class DatasetWidebandCutMix(SignalTransform): def __init__( self, dataset: SignalDataset, - alpha: NumericParameter = (0.2, 0.7), + alpha: FloatParameter = UniformContinuousRD(0.2, 0.7), ) -> None: super(DatasetWidebandCutMix, self).__init__() - self.alpha = to_distribution(alpha, self.random_generator) + self.alpha = RandomDistribution.to_distribution(alpha) self.dataset = dataset self.dataset_num_samples = len(dataset) self.string = ( @@ -4002,13 +3838,9 @@ class DatasetWidebandMixUp(SignalTransform): An SignalDataset of complex-valued examples to be used as a source for the synthetic insertion/mixup - alpha (:py:class:`~Callable`, :obj:`int`, :obj:`float`, :obj:`list`, :obj:`tuple`): + alpha (:py:class:`~torchsig.types.RandomDistribution`): alpha sets the difference in power level between the main dataset example and the inserted example - * If Callable, produces a sample by calling alpha() - * If int or float, alpha is fixed at the value provided - * If list, alpha is any element in the list - * If tuple, alpha is in range of (tuple[0], tuple[1]) Example: >>> import torchsig.transforms as ST @@ -4022,10 +3854,10 @@ class DatasetWidebandMixUp(SignalTransform): def __init__( self, dataset: SignalDataset, - alpha: NumericParameter = (0.4, 0.6), + alpha: FloatParameter = UniformContinuousRD(0.4, 0.6), ) -> None: super(DatasetWidebandMixUp, self).__init__() - self.alpha = to_distribution(alpha, self.random_generator) + self.alpha = RandomDistribution.to_distribution(alpha) self.dataset = dataset self.dataset_num_samples = len(dataset) self.string = ( @@ -4104,18 +3936,12 @@ class SpectrogramRandomResizeCrop(SignalTransform): Args: nfft (:py:class:`~Callable`, :obj:`int`, :obj:`list`, :obj:`tuple`): The number of FFT bins for the random spectrogram. - * If Callable, nfft is set by calling nfft() - * If int, nfft is fixed by value provided - * If list, nfft is any element in the list - * If tuple, nfft is in range of (tuple[0], tuple[1]) + overlap_ratio (:py:class:`~Callable`, :obj:`int`, :obj:`list`, :obj:`tuple`): The ratio of the (nfft-1) value to use as the overlap parameter for the spectrogram operation. Setting as ratio ensures the overlap is a lower value than the bin size. - * If Callable, nfft is set by calling overlap_ratio() - * If float, overlap_ratio is fixed by value provided - * If list, overlap_ratio is any element in the list - * If tuple, overlap_ratio is in range of (tuple[0], tuple[1]) + window_fcn (:obj:`str`): Window to be used in spectrogram operation. Default value is 'np.blackman'. @@ -4136,16 +3962,16 @@ class SpectrogramRandomResizeCrop(SignalTransform): def __init__( self, - nfft: IntParameter = (256, 1024), - overlap_ratio: FloatParameter = (0.0, 0.2), + nfft: IntParameter = UniformDiscreteRD(np.arange(256, 1024, dtype=int)), + overlap_ratio: FloatParameter = UniformContinuousRD(0.0, 0.2), window_fcn: Callable[[int], np.ndarray] = np.blackman, mode: str = "complex", width: int = 512, height: int = 512, ) -> None: super(SpectrogramRandomResizeCrop, self).__init__() - self.nfft = to_distribution(nfft, self.random_generator) - self.overlap_ratio = to_distribution(overlap_ratio, self.random_generator) + self.nfft = RandomDistribution.to_distribution(nfft) + self.overlap_ratio = RandomDistribution.to_distribution(overlap_ratio) self.window_fcn = window_fcn self.mode = mode self.width = width @@ -4427,40 +4253,29 @@ class SpectrogramDropSamples(SignalTransform): `TSAug Dropout Transform `_. Args: - drop_rate (:py:class:`~Callable`, :obj:`int`, :obj:`float`, :obj:`list`, :obj:`tuple`): + drop_rate (:py:class:`~torchsig.types.RandomDistribution`): drop_rate sets the rate at which to drop samples - * If Callable, produces a sample by calling drop_rate() - * If int or float, drop_rate is fixed at the value provided - * If list, drop_rate is any element in the list - * If tuple, drop_rate is in range of (tuple[0], tuple[1]) - size (:py:class:`~Callable`, :obj:`int`, :obj:`float`, :obj:`list`, :obj:`tuple`): + size (:py:class:`~torchsig.types.RandomDistribution`): size sets the size of each instance of dropped samples - * If Callable, produces a sample by calling size() - * If int or float, size is fixed at the value provided - * If list, size is any element in the list - * If tuple, size is in range of (tuple[0], tuple[1]) fill (:py:class:`~Callable`, :obj:`list`, :obj:`str`): fill sets the method of how the dropped samples should be filled - * If Callable, produces a sample by calling fill() - * If list, fill is any element in the list - * If str, fill is fixed at the method provided """ def __init__( self, - drop_rate: NumericParameter = (0.001, 0.005), - size: NumericParameter = (1, 10), + drop_rate: FloatParameter = UniformContinuousRD(0.001, 0.005), + size: IntParameter = UniformDiscreteRD(np.arange(1, 10, dtype=int)), fill: List[str] = ( ["ffill", "bfill", "mean", "zero", "low", "min", "max", "ones"] ), ) -> None: super(SpectrogramDropSamples, self).__init__() - self.drop_rate = to_distribution(drop_rate, self.random_generator) - self.size = to_distribution(size, self.random_generator) - self.fill = to_distribution(fill, self.random_generator) + self.drop_rate = RandomDistribution.to_distribution(drop_rate) + self.size = RandomDistribution.to_distribution(size) + self.fill = RandomDistribution.to_distribution(fill) self.string = ( self.__class__.__name__ + "(" @@ -4524,30 +4339,22 @@ class SpectrogramPatchShuffle(SignalTransform): `PatchShuffle Regularization `_. Args: - patch_size (:py:class:`~Callable`, :obj:`int`, :obj:`float`, :obj:`list`, :obj:`tuple`): + patch_size (:py:class:`~torchsig.types.RandomDistribution`): patch_size sets the size of each patch to shuffle - * If Callable, produces a sample by calling patch_size() - * If int or float, patch_size is fixed at the value provided - * If list, patch_size is any element in the list - * If tuple, patch_size is in range of (tuple[0], tuple[1]) - shuffle_ratio (:py:class:`~Callable`, :obj:`int`, :obj:`float`, :obj:`list`, :obj:`tuple`): + shuffle_ratio (:py:class:`~torchsig.types.RandomDistribution`): shuffle_ratio sets the ratio of the patches to shuffle - * If Callable, produces a sample by calling shuffle_ratio() - * If int or float, shuffle_ratio is fixed at the value provided - * If list, shuffle_ratio is any element in the list - * If tuple, shuffle_ratio is in range of (tuple[0], tuple[1]) """ def __init__( self, - patch_size: NumericParameter = (2, 16), - shuffle_ratio: FloatParameter = (0.01, 0.10), + patch_size: IntParameter = UniformDiscreteRD(np.arange(2, 16, dtype=int)), + shuffle_ratio: FloatParameter = UniformContinuousRD(0.01, 0.10), ) -> None: super(SpectrogramPatchShuffle, self).__init__() - self.patch_size = to_distribution(patch_size, self.random_generator) - self.shuffle_ratio = to_distribution(shuffle_ratio, self.random_generator) + self.patch_size = RandomDistribution.to_distribution(patch_size) + self.shuffle_ratio = RandomDistribution.to_distribution(shuffle_ratio) self.string = ( self.__class__.__name__ + "(" @@ -4592,30 +4399,22 @@ class SpectrogramTranslation(SignalTransform): translation Args: - time_shift (:py:class:`~Callable`, :obj:`int`, :obj:`float`, :obj:`list`, :obj:`tuple`): + time_shift (:py:class:`~torchsig.types.RandomDistribution`): time_shift sets the translation along the time-axis - * If Callable, produces a sample by calling time_shift() - * If int, time_shift is fixed at the value provided - * If list, time_shift is any element in the list - * If tuple, time_shift is in range of (tuple[0], tuple[1]) - freq_shift (:py:class:`~Callable`, :obj:`int`, :obj:`float`, :obj:`list`, :obj:`tuple`): + freq_shift (:py:class:`~torchsig.types.RandomDistribution`): freq_shift sets the translation along the freq-axis - * If Callable, produces a sample by calling freq_shift() - * If int, freq_shift is fixed at the value provided - * If list, freq_shift is any element in the list - * If tuple, freq_shift is in range of (tuple[0], tuple[1]) """ def __init__( self, - time_shift: IntParameter = (-128, 128), - freq_shift: IntParameter = (-128, 128), + time_shift: IntParameter = UniformDiscreteRD(np.arange(-128, 128, dtype=int)), + freq_shift: IntParameter = UniformDiscreteRD(np.arange(-128, 128, dtype=int)), ) -> None: super(SpectrogramTranslation, self).__init__() - self.time_shift = to_distribution(time_shift, self.random_generator) - self.freq_shift = to_distribution(freq_shift, self.random_generator) + self.time_shift = RandomDistribution.to_distribution(time_shift) + self.freq_shift = RandomDistribution.to_distribution(freq_shift) self.string = ( self.__class__.__name__ + "(" @@ -5340,7 +5139,7 @@ class SpectrogramImage(SignalTransform): """ - def __init__(self, size=512, colormap="viridis") -> np.ndarray: + def __init__(self, size=512, colormap="viridis"): super(SpectrogramImage, self).__init__() self.size = size self.colormap = colormap @@ -5349,14 +5148,24 @@ def __init__(self, size=512, colormap="viridis") -> np.ndarray: self.nfft = self.size self.mode = "psd" - def __call__(self, data: SignalData) -> SignalData: - data.iq_data = F.spectrogram_image( - data.iq_data, - nperseg=self.nperseg, - noverlap=self.noverlap, - nfft=self.nfft, - mode=self.mode, - colormap=self.colormap, - ) + def __call__(self, data) -> SignalData: + if isinstance(data, SignalData): + data.iq_data = F.spectrogram_image( + data.iq_data, # type: ignore + nperseg=self.nperseg, + noverlap=self.noverlap, + nfft=self.nfft, + mode=self.mode, + colormap=self.colormap, + ) + else: + data = F.spectrogram_image( + data, + nperseg=self.nperseg, + noverlap=self.noverlap, + nfft=self.nfft, + mode=self.mode, + colormap=self.colormap, + ) return data diff --git a/torchsig/utils/dsp.py b/torchsig/utils/dsp.py index eb69d10..aaf7177 100644 --- a/torchsig/utils/dsp.py +++ b/torchsig/utils/dsp.py @@ -1,5 +1,5 @@ -import numpy as np from scipy import signal as sp +import numpy as np def convolve(signal: np.ndarray, taps: np.ndarray) -> np.ndarray: @@ -33,7 +33,9 @@ def estimate_filter_length( # N ~= (sampling rate/transition bandwidth)*(sidelobe attenuation in dB / 22) # fred harris, Multirate Signal Processing for Communication Systems, # Second Edition, p.59 - filter_length = int(np.round((sample_rate / transition_bandwidth) * (attenuation_db / 22))) + filter_length = int( + np.round((sample_rate / transition_bandwidth) * (attenuation_db / 22)) + ) # odd-length filters are desirable because they do not introduce a half-sample delay if np.mod(filter_length, 2) == 0: @@ -42,7 +44,9 @@ def estimate_filter_length( return filter_length -def rrc_taps(iq_samples_per_symbol: int, size_in_symbols: int, alpha: float = 0.35) -> np.ndarray: +def rrc_taps( + iq_samples_per_symbol: int, size_in_symbols: int, alpha: float = 0.35 +) -> np.ndarray: # this could be made into a transform M = size_in_symbols Ns = float(iq_samples_per_symbol) @@ -73,6 +77,8 @@ def gaussian_taps(samples_per_symbol: int, BT: float = 0.35) -> np.ndarray: # pre-modulation Bb*T product which sets the bandwidth of the Gaussian lowpass filter M = 4 # duration in symbols n = np.arange(-M * samples_per_symbol, M * samples_per_symbol + 1) - p = np.exp(-2 * np.pi**2 * BT**2 / np.log(2) * (n / float(samples_per_symbol)) ** 2) + p = np.exp( + -2 * np.pi**2 * BT**2 / np.log(2) * (n / float(samples_per_symbol)) ** 2 + ) p = p / np.sum(p) return p diff --git a/torchsig/utils/types.py b/torchsig/utils/types.py index b84fcd5..6c334e4 100644 --- a/torchsig/utils/types.py +++ b/torchsig/utils/types.py @@ -1,8 +1,59 @@ from typing import List, Optional, Union - import numpy as np +class RandomDistribution: + rng = np.random.default_rng() + + @staticmethod + def to_distribution(dist): + if isinstance(dist, RandomDistribution): + return dist + + if isinstance(dist, (int, float)): + return ConstantRD(dist) + + if isinstance(dist, tuple): + return UniformContinuousRD(dist[0], dist[1]) + + if isinstance(dist, list): + return UniformDiscreteRD(dist) + + def __call__(self, num: int = 1): + raise NotImplementedError + + +class ConstantRD(RandomDistribution): + def __init__(self, constant: float) -> None: + super(ConstantRD, self).__init__() + self.constant = constant + + def __call__(self, num: int = 1): + if num == 1: + return self.constant + + return np.repeat(self.constant, repeats=num) + + +class UniformContinuousRD(RandomDistribution): + def __init__(self, low: float, high: float) -> None: + super(UniformContinuousRD, self).__init__() + self.low = low + self.high = high + + def __call__(self, num: int = 1) -> np.ndarray: + return RandomDistribution.rng.uniform(low=self.low, high=self.high, size=num) + + +class UniformDiscreteRD(RandomDistribution): + def __init__(self, choices: np.ndarray) -> None: + super(UniformDiscreteRD, self).__init__() + self.choices = choices + + def __call__(self, num: int = 1) -> np.ndarray: + return RandomDistribution.rng.choice(self.choices, size=num) + + class SignalDescription: """A class containing typically necessary details to understand the data @@ -75,7 +126,9 @@ def __init__( bandwidth if bandwidth else upper_frequency - lower_frequency ) self.center_frequency: Optional[float] = ( - center_frequency if center_frequency else lower_frequency + self.bandwidth / 2 + center_frequency + if center_frequency + else lower_frequency + self.bandwidth / 2 ) else: self.bandwidth = bandwidth @@ -113,7 +166,9 @@ def __init__( data: Optional[bytes], item_type: np.dtype, data_type: np.dtype, - signal_description: Optional[Union[List[SignalDescription], SignalDescription]] = None, + signal_description: Optional[ + Union[List[SignalDescription], SignalDescription] + ] = None, ) -> None: self.iq_data: Optional[np.ndarray] = None self.signal_description: Optional[ @@ -121,7 +176,9 @@ def __init__( ] = signal_description if data is not None: # No matter the underlying item type, we convert to double-precision - self.iq_data = np.frombuffer(data, dtype=item_type).astype(np.float64).view(data_type) + self.iq_data = ( + np.frombuffer(data, dtype=item_type).astype(np.float64).view(data_type) + ) self.signal_description = ( [signal_description]