From 4581113e9ab3a048296508ef69b6c1a417c625c8 Mon Sep 17 00:00:00 2001 From: Matt Dixon Date: Wed, 4 Dec 2024 17:55:26 -0500 Subject: [PATCH] Add initial project structure with harmonic balancer implementation, requirements, and demo interface --- .gitignore | 6 +- app.py | 49 ++++ css/style.css | 66 +++++ index.html | 41 +++ js/demo.js | 33 +++ requirements.txt | 3 + src/__init__.py | 1 + src/circuit.py | 73 +++++ src/harmonic_balancer.py | 310 +++++++++++--------- src/system.py | 55 ++++ src/utils/helpers.py | 34 +++ src/utils/utils.py | 37 +++ tests/test_harmonic_balancer.py | 67 +++++ tests/test_integration_harmonic_balancer.py | 126 ++++++++ 14 files changed, 757 insertions(+), 144 deletions(-) create mode 100644 app.py create mode 100644 css/style.css create mode 100644 index.html create mode 100644 js/demo.js create mode 100644 requirements.txt create mode 100644 src/__init__.py create mode 100644 src/circuit.py create mode 100644 src/system.py create mode 100644 src/utils/helpers.py create mode 100644 src/utils/utils.py create mode 100644 tests/test_harmonic_balancer.py create mode 100644 tests/test_integration_harmonic_balancer.py diff --git a/.gitignore b/.gitignore index 03e98ee..4190ed7 100644 --- a/.gitignore +++ b/.gitignore @@ -133,10 +133,10 @@ cython_debug/ snippitlib.md ethics.md docsnippitlib.md -AMI.code-worckspace +*.code-worckspace desktop.ini implimentation_list.md AMI.md -AMI.code-workspace -.gitHub/ + +# Working Documents themathupgrade.txt \ No newline at end of file diff --git a/app.py b/app.py new file mode 100644 index 0000000..e9eb8ef --- /dev/null +++ b/app.py @@ -0,0 +1,49 @@ +from flask import Flask, render_template, request, jsonify +from src.harmonic_balancer import EnhancedHarmonicBalancer +import numpy as np + +app = Flask(__name__) + +@app.route('/') +def index(): + return render_template('index.html') + +@app.route('/api/balance', methods=['POST']) +def balance(): + data = request.json + base_freq = data['baseFreq'] + harmonic_level = data['harmonicLevel'] + + # Generate a sample signal + t = np.linspace(0, 1, 1000) + signal = np.sin(2 * np.pi * base_freq * t) + (harmonic_level / 100) * np.sin(2 * np.pi * 2 * base_freq * t) + + # Process the signal + balancer = EnhancedHarmonicBalancer(base_freq, num_harmonics=5, application='power') + balanced_signal = balancer.balance_signal(signal, sample_rate=1000) + + # Calculate THD before and after + thd_before = balancer.calculate_thd(signal, 1000) + thd_after = balancer.calculate_thd(balanced_signal, 1000) + + # Prepare data for plotting + return jsonify({ + 'signals': [ + {'x': t.tolist(), 'y': signal.tolist(), 'type': 'scatter', 'name': 'Original'}, + {'x': t.tolist(), 'y': balanced_signal.tolist(), 'type': 'scatter', 'name': 'Balanced'} + ], + 'layout': {'title': 'Signal Comparison'}, + 'spectrum': [ + {'x': np.fft.fftfreq(1000, 1/1000)[:500].tolist(), 'y': np.abs(np.fft.fft(signal))[:500].tolist(), 'type': 'scatter', 'name': 'Original Spectrum'}, + {'x': np.fft.fftfreq(1000, 1/1000)[:500].tolist(), 'y': np.abs(np.fft.fft(balanced_signal))[:500].tolist(), 'type': 'scatter', 'name': 'Balanced Spectrum'} + ], + 'spectrumLayout': {'title': 'Frequency Spectrum'}, + 'metrics': { + 'thdBefore': f"{thd_before:.2%}", + 'thdAfter': f"{thd_after:.2%}", + 'improvement': f"{(thd_before - thd_after) / thd_before:.2%}" + } + }) + +if __name__ == '__main__': + app.run(debug=True) \ No newline at end of file diff --git a/css/style.css b/css/style.css new file mode 100644 index 0000000..f616d0a --- /dev/null +++ b/css/style.css @@ -0,0 +1,66 @@ +body { + font-family: Arial, sans-serif; + margin: 0; + padding: 20px; + background-color: #f0f0f0; +} + +.container { + max-width: 1200px; + margin: 0 auto; + background-color: white; + padding: 20px; + border-radius: 10px; + box-shadow: 0 0 10px rgba(0,0,0,0.1); +} + +h1 { + text-align: center; + color: #333; +} + +.control-panel { + display: flex; + justify-content: space-between; + align-items: center; + margin-bottom: 20px; +} + +.parameters { + display: flex; + gap: 20px; +} + +.visualization { + display: flex; + gap: 20px; + margin-bottom: 20px; +} + +#signalPlot, #spectrumPlot { + flex: 1; + height: 400px; +} + +.metrics { + display: flex; + justify-content: space-around; +} + +.metric-box { + text-align: center; + background-color: #f9f9f9; + padding: 10px; + border-radius: 5px; +} + +.metric-box h3 { + margin: 0; + color: #666; +} + +.metric-box span { + font-size: 24px; + font-weight: bold; + color: #333; +} \ No newline at end of file diff --git a/index.html b/index.html new file mode 100644 index 0000000..4a3713c --- /dev/null +++ b/index.html @@ -0,0 +1,41 @@ + + + + + + Harmonic Balancer Demo + + + + +
+

Harmonic Balancer Demonstration

+
+ +
+ + +
+
+
+
+
+
+
+
+

THD Before

+ 0% +
+
+

THD After

+ 0% +
+
+

Improvement

+ 0% +
+
+
+ + + \ No newline at end of file diff --git a/js/demo.js b/js/demo.js new file mode 100644 index 0000000..bdde1be --- /dev/null +++ b/js/demo.js @@ -0,0 +1,33 @@ +document.addEventListener('DOMContentLoaded', function() { + const startButton = document.getElementById('startDemo'); + const baseFreqInput = document.getElementById('baseFreq'); + const harmonicLevelInput = document.getElementById('harmonicLevel'); + + function updatePlots() { + fetch('/api/balance', { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + }, + body: JSON.stringify({ + baseFreq: parseFloat(baseFreqInput.value), + harmonicLevel: parseFloat(harmonicLevelInput.value) + }), + }) + .then(response => response.json()) + .then(data => { + // Update plots + Plotly.newPlot('signalPlot', data.signals, data.layout); + Plotly.newPlot('spectrumPlot', data.spectrum, data.spectrumLayout); + + // Update metrics + document.getElementById('thdBefore').textContent = data.metrics.thdBefore; + document.getElementById('thdAfter').textContent = data.metrics.thdAfter; + document.getElementById('improvement').textContent = data.metrics.improvement; + }); + } + + startButton.addEventListener('click', updatePlots); + baseFreqInput.addEventListener('change', updatePlots); + harmonicLevelInput.addEventListener('input', updatePlots); +}); \ No newline at end of file diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..becac98 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,3 @@ +numpy>=1.18.0 +scipy>=1.4.0 +matplotlib>=3.1.0 \ No newline at end of file diff --git a/src/__init__.py b/src/__init__.py new file mode 100644 index 0000000..a7e3071 --- /dev/null +++ b/src/__init__.py @@ -0,0 +1 @@ +from .harmonic_balancer import EnhancedHarmonicBalancer \ No newline at end of file diff --git a/src/circuit.py b/src/circuit.py new file mode 100644 index 0000000..9218e16 --- /dev/null +++ b/src/circuit.py @@ -0,0 +1,73 @@ + +import numpy as np +from qiskit import QuantumCircuit, QuantumRegister, ClassicalRegister + +class QuantumResonanceCircuit: + def __init__(self, resonance_freq=4.40e9, coupling_strength=0.1): + self.resonance_freq = resonance_freq + self.coupling_strength = coupling_strength + self.num_qubits = 4 + + def initialize_state(self): + """Initialize the quantum state""" + return np.array([1.0] + [0.0] * (2**self.num_qubits - 1), dtype=complex) + + def get_hamiltonian(self): + """Calculate the Hamiltonian of the system""" + dim = 2**self.num_qubits + H = np.zeros((dim, dim), dtype=complex) + # Add resonant coupling terms + for i in range(self.num_qubits-1): + H[i,i+1] = self.coupling_strength + H[i+1,i] = self.coupling_strength + # Add energy terms + for i in range(dim): + H[i,i] = self.resonance_freq * bin(i).count('1') + return H + + def evolve_state(self, state, time): + """Evolve the quantum state over time""" + H = self.get_hamiltonian() + U = np.exp(-1j * H * time) + return U @ state + + def calculate_resonance_function(self, x, y): + """ + Calculate the resonance function f(x,y) for a pair of qubits + """ + return np.exp(-((x - y)**2) / (2 * self.coupling_strength)) + + def calculate_evolutionary_potential(self, x, y): + """ + Calculate the evolutionary potential P(x,y) for a pair of qubits + """ + resonance = self.calculate_resonance_function(x, y) + return resonance * np.exp(-1j * self.resonance_freq * (x + y)) + + def get_total_possibilities(self): + """ + Calculate the total number of possibilities using the combined formula: + T = ∑(i=1 to n)∑(j=i+1 to n) f(x_i, y_j) · P(x_i, y_j) + """ + total = 0 + for i in range(self.num_qubits): + for j in range(i + 1, self.num_qubits): + f_xy = self.calculate_resonance_function(i, j) + p_xy = self.calculate_evolutionary_potential(i, j) + total += f_xy * np.abs(p_xy) + return total + + def create_entangled_circuit(self): + """ + Create a quantum circuit with entangled pairs + """ + qr = QuantumRegister(self.num_qubits, 'q') + cr = ClassicalRegister(self.num_qubits, 'c') + qc = QuantumCircuit(qr, cr) + + # Create entangled pairs + for i in range(0, self.num_qubits - 1, 2): + qc.h(qr[i]) + qc.cx(qr[i], qr[i+1]) + + return qc diff --git a/src/harmonic_balancer.py b/src/harmonic_balancer.py index 2ee19a8..b04f4e5 100644 --- a/src/harmonic_balancer.py +++ b/src/harmonic_balancer.py @@ -1,165 +1,193 @@ import numpy as np -from scipy.optimize import minimize -from typing import Dict, List, Tuple, Optional import logging +from .circuit import QuantumResonanceCircuit +from .utils.utils import generate_harmony_vector +# from .utils.helpers import plot_convergence +from .system import System +from scipy.signal import find_peaks, butter, filtfilt, iirnotch +from scipy.optimize import minimize class EnhancedHarmonicBalancer: - """ - Advanced harmonic balancing system for power and vibration applications. - - This class implements an enhanced version of the harmonic balancer that uses - adaptive frequency detection, optimization-based Psi adjustment, and - application-specific processing for power systems and mechanical vibrations. - - Attributes: - base_frequency (float): The fundamental frequency of the system - num_harmonics (int): Number of harmonics to consider - application (str): Type of application ('power' or 'vibration') - psi (np.ndarray): Phase adjustment values for each harmonic - frequencies (np.ndarray): Array of harmonic frequencies - history (dict): Historical performance metrics - - Example: - >>> balancer = EnhancedHarmonicBalancer(60, num_harmonics=5, application='power') - >>> balanced_signal = balancer.balance_signal(input_signal, sample_rate) - """ - - def __init__(self, base_frequency: float, num_harmonics: int = 5, - application: str = 'power') -> None: - """ - Initialize the harmonic balancer. - - Args: - base_frequency: Fundamental frequency of the system - num_harmonics: Number of harmonics to consider - application: Type of application ('power' or 'vibration') - """ + def __init__(self, base_frequency: float, num_harmonics: int = 5, application: str = 'power'): self.base_frequency = base_frequency self.num_harmonics = num_harmonics + self.application = application self.golden_ratio = (1 + np.sqrt(5)) / 2 self.psi = np.random.uniform(0, 2*np.pi, num_harmonics) self.frequencies = np.array([base_frequency * (i + 1) for i in range(num_harmonics)]) - self.application = application - self.learning_rate = 0.01 - self.history = {'thd': [], 'psi': [], 'frequency_drift': []} - + # Setup logging logging.basicConfig(level=logging.INFO) self.logger = logging.getLogger(__name__) - - def detect_base_frequency(self, signal_data: np.ndarray, - sample_rate: float) -> float: - """ - Detect the fundamental frequency in the signal. - - Uses FFT-based analysis to identify the dominant frequency component - and updates the internal frequency tracking if drift is detected. - - Args: - signal_data: Input time series data - sample_rate: Sampling rate of the signal - - Returns: - float: Detected fundamental frequency - """ + + # Initialize quantum circuit + self.quantum_circuit = QuantumResonanceCircuit(num_harmonics) + + # Initialize history + self.history = {'scores': [], 'states': []} + + # Set convergence threshold + self.convergence_threshold = 1e-6 + + # Initialize other attributes + self.best_score = float('-inf') + self.best_solution = None + self.harmony_memory = [] + self.harmony_memory_size = 20 + self.max_iterations = 100 + self.num_qubits = num_harmonics # Assuming num_qubits is equal to num_harmonics + def check_convergence(self): + # Check if the algorithm has converged + if len(self.history['scores']) < 2: + return False + return abs(self.history['scores'][-1] - self.history['scores'][-2]) < self.convergence_threshold + + def generate_new_harmony(self, transition_constant): + # Generate a new harmony vector based on the transition constant and quantum circuit + base_harmony = generate_harmony_vector(self.num_qubits) + quantum_state = self.quantum_circuit.initialize_state() + evolved_state = self.quantum_circuit.evolve_state(quantum_state, transition_constant) + quantum_influence = np.abs(evolved_state)**2 + return np.where(np.random.rand(self.num_qubits) < quantum_influence[:self.num_qubits], 1, base_harmony) + + def update_harmony_memory(self, new_vector, evolved_state, score): + # Update the harmony memory with the new vector and score + if score > self.best_score: + self.best_score = score + self.best_solution = new_vector + self.harmony_memory.append(new_vector) + if len(self.harmony_memory) > self.harmony_memory_size: + self.harmony_memory.pop(0) + + def run_experiment(self): + for iteration in range(self.max_iterations): + new_harmony_vector = self.generate_new_harmony(transition_constant=0.1) + evolved_state = self.quantum_circuit.evolve_state(new_harmony_vector, time=0.1) + score = self.objective_function(evolved_state) + self.update_harmony_memory(new_harmony_vector, evolved_state, score) + self.history['scores'].append(score) + self.history['states'].append(evolved_state) + if self.check_convergence(): + break + return self.best_solution, self.best_score + + def apply_quantum_resonance(self): + total_possibilities = self.quantum_circuit.get_total_possibilities() + return total_possibilities + + def resonance_condition(self, F0: float, k: float, m: float, omega: float, b: float) -> float: + """Calculate amplitude at resonance condition.""" + return F0 / np.sqrt((k - m * omega**2)**2 + (b * omega)**2) + + def wave_interference(self, y1: np.ndarray, y2: np.ndarray) -> np.ndarray: + """Calculate wave interference between two signals.""" + return y1 + y2 + + def golden_harmony(self, R: float, F: float, E: float) -> float: + """Calculate golden harmony metric.""" + return np.sqrt((R * F**2) + E**2) + + def detect_base_frequency(self, signal_data: np.ndarray, sample_rate: float) -> float: spectrum = np.abs(np.fft.fft(signal_data)) freqs = np.fft.fftfreq(len(signal_data), 1/sample_rate) - positive_freqs = freqs[:len(freqs)//2] positive_spectrum = spectrum[:len(spectrum)//2] - - main_freq_idx = np.argmax(positive_spectrum) - detected_freq = np.abs(positive_freqs[main_freq_idx]) - - if np.abs(detected_freq - self.base_frequency) > 0.5: - self.logger.info(f"Frequency drift detected: {detected_freq - self.base_frequency:.2f} Hz") - self.base_frequency = detected_freq - self.frequencies = np.array([detected_freq * (i + 1) for i in range(self.num_harmonics)]) - - return detected_freq - - def optimize_psi(self, signal_data: np.ndarray, - sample_rate: float) -> float: - """ - Optimize Psi values to minimize Total Harmonic Distortion. - - Uses Nelder-Mead optimization to find optimal phase adjustments. - - Args: - signal_data: Input time series data - sample_rate: Sampling rate of the signal - - Returns: - float: Final THD value after optimization - """ - def objective(psi_values): - self.psi = psi_values - balanced = self.balance_signal(signal_data, sample_rate) - return self.calculate_thd(balanced, sample_rate) - - result = minimize(objective, self.psi, method='Nelder-Mead') + positive_freqs = freqs[:len(freqs)//2] + + peaks, _ = find_peaks(positive_spectrum, height=max(positive_spectrum)/10) + + if len(peaks) > 0: + sorted_peaks = sorted(peaks, key=lambda x: positive_spectrum[x], reverse=True) + for peak in sorted_peaks[:3]: + detected_freq = positive_freqs[peak] + if 0.8 * self.base_frequency <= detected_freq <= 1.2 * self.base_frequency: + if abs(detected_freq - self.base_frequency) > 0.1: + self.base_frequency = detected_freq + self.frequencies = np.array([self.base_frequency * (i + 1) for i in range(self.num_harmonics)]) + return detected_freq + + return self.base_frequency + + def optimize_psi(self, signal_data: np.ndarray, sample_rate: float) -> float: + def objective(psi): + balanced = self.apply_psi(signal_data, psi, sample_rate) + thd = self.calculate_thd(balanced, sample_rate) + harmony = self.golden_harmony(thd, self.base_frequency, np.mean(np.abs(balanced))) + return abs(harmony - self.golden_ratio) + + result = minimize(objective, self.psi, method='BFGS') self.psi = result.x return result.fun - - def calculate_thd(self, signal_data: np.ndarray, - sample_rate: float) -> float: - """ - Calculate Total Harmonic Distortion of the signal. - - Args: - signal_data: Input time series data - sample_rate: Sampling rate of the signal - - Returns: - float: THD value - """ + + def apply_psi(self, signal_data: np.ndarray, psi: np.ndarray, sample_rate: float) -> np.ndarray: + t = np.arange(len(signal_data)) / sample_rate + correction = np.zeros_like(signal_data) + for i, freq in enumerate(self.frequencies): + correction += self.resonance_condition(1, 1, 1, 2*np.pi*freq, 0.1) * np.sin(2 * np.pi * freq * t + psi[i]) + return self.wave_interference(signal_data, -correction) + + def calculate_thd(self, signal_data: np.ndarray, sample_rate: float) -> float: spectrum = np.abs(np.fft.fft(signal_data)) freqs = np.fft.fftfreq(len(signal_data), 1/sample_rate) - fundamental_idx = np.argmax(spectrum[:len(spectrum)//2]) - harmonics = spectrum[fundamental_idx*2:fundamental_idx*5] - - thd = np.sqrt(np.sum(harmonics**2)) / spectrum[fundamental_idx] - return thd - - def balance_signal(self, signal_data: np.ndarray, - sample_rate: float) -> np.ndarray: - """ - Apply harmonic balancing to the input signal. - - Implements application-specific processing for power systems - and mechanical vibrations. - - Args: - signal_data: Input time series data - sample_rate: Sampling rate of the signal - - Returns: - np.ndarray: Balanced signal - """ + harmonics = spectrum[fundamental_idx*2:fundamental_idx*(self.num_harmonics+1)] + return np.sqrt(np.sum(harmonics**2)) / spectrum[fundamental_idx] + + def balance_signal(self, signal_data: np.ndarray, sample_rate: float) -> np.ndarray: detected_freq = self.detect_base_frequency(signal_data, sample_rate) - - freq_domain = np.fft.fft(signal_data) - freqs = np.fft.fftfreq(len(signal_data), 1/sample_rate) - + + # Update base_frequency if the detected frequency is significantly different + if abs(detected_freq - self.base_frequency) > 0.1: # You can adjust this threshold + self.base_frequency = detected_freq + self.frequencies = np.array([self.base_frequency * (i + 1) for i in range(self.num_harmonics)]) + self.logger.info(f"Base frequency updated to {self.base_frequency} Hz") + + self.optimize_psi(signal_data, sample_rate) + balanced = self.apply_psi(signal_data, self.psi, sample_rate) + + # Apply application-specific processing if self.application == 'power': - # Focus on odd harmonics for power systems - for i, freq in enumerate(self.frequencies): - if i % 2 == 0: # Odd harmonics - mask = (np.abs(freqs) >= freq-1) & (np.abs(freqs) <= freq+1) - freq_domain[mask] *= np.exp(1j * self.psi[i]) - + balanced = self.power_specific_processing(balanced, sample_rate) elif self.application == 'vibration': - # Consider all harmonics with progressive damping - for i, freq in enumerate(self.frequencies): - mask = (np.abs(freqs) >= freq-2) & (np.abs(freqs) <= freq+2) - damping = 1 / (i + 1) - freq_domain[mask] *= np.exp(1j * self.psi[i]) * damping - - balanced = np.real(np.fft.ifft(freq_domain)) - - # Update history - self.history['thd'].append(self.calculate_thd(balanced, sample_rate)) - self.history['psi'].append(self.psi.copy()) - self.history['frequency_drift'].append(detected_freq - self.base_frequency) - + balanced = self.vibration_specific_processing(balanced, sample_rate) + return balanced + + def power_specific_processing(self, signal_data: np.ndarray, sample_rate: float) -> np.ndarray: + # Apply quantum entanglement simulation + entanglement_effect = self.quantum_entanglement_simulation(self.num_harmonics) + # Apply a series of notch filters to remove specific harmonics + for harmonic in range(2, self.num_harmonics + 1): + notch_freq = harmonic * self.base_frequency + q = 30.0 # Quality factor + w0 = notch_freq / (sample_rate / 2) + b, a = iirnotch(w0, q) + signal_data = filtfilt(b, a, signal_data) + + # Apply quantum influence + quantum_influence = self.apply_quantum_resonance() + signal_data *= (1 + 0.1 * quantum_influence) # Adjust the scaling factor as needed + + return signal_data + + + def vibration_specific_processing(self, signal_data: np.ndarray, sample_rate: float) -> np.ndarray: + # Implement a simple low-pass filter to reduce high-frequency components + cutoff_freq = 2 * self.base_frequency # Adjust as needed + nyquist = 0.5 * sample_rate + normal_cutoff = cutoff_freq / nyquist + b, a = butter(4, normal_cutoff, btype='low', analog=False) + return filtfilt(b, a, signal_data) + + def quantum_entanglement_simulation(self, num_harmonics): + # Implement a simple quantum entanglement simulation + # This is a placeholder implementation and should be replaced with actual quantum simulation logic + return np.random.rand(num_harmonics) + + def mri_harmonic_suppression(self, signal_data: np.ndarray, sample_rate: float) -> np.ndarray: + """Suppress specific harmonics for MRI application.""" + for harmonic in [3, 5, 7]: # Suppress 3rd, 5th, and 7th harmonics + notch_freq = harmonic * self.base_frequency + w0 = notch_freq / (sample_rate / 2) + b, a = iirnotch(w0, 30) + signal_data = filtfilt(b, a, signal_data) + return signal_data diff --git a/src/system.py b/src/system.py new file mode 100644 index 0000000..80ed1b7 --- /dev/null +++ b/src/system.py @@ -0,0 +1,55 @@ +import numpy as np + +class System: + def __init__(self, num_qubits): + self.num_qubits = num_qubits + self.parameters = np.zeros(num_qubits) + self.state = np.zeros((num_qubits, num_qubits)) + self.initialize_circuit() + + def initialize_circuit(self): + # Initialize the hybrid graphene-silicon-diamond quantum circuit + self.graphene_layer = np.random.rand(self.num_qubits) + self.silicon_interface = np.random.rand(self.num_qubits, self.num_qubits) + self.diamond_layer = np.random.rand(self.num_qubits) + + def evolve_state(self, state, steps=1): + if state.shape[0] != self.num_qubits: + raise ValueError("Invalid state vector shape") + evolved_state = state + for _ in range(steps): + evolved_state = self.apply_quantum_operations(evolved_state) + return evolved_state / np.linalg.norm(evolved_state) + + def apply_quantum_operations(self, state): + # Apply quantum operations based on the hybrid circuit + state = np.dot(self.silicon_interface, state) + state = state * self.graphene_layer + state = state + self.diamond_layer + return state + + def update_parameters(self, state, score, transition_constant): + self.parameters += transition_constant * score * state + + def get_parameters(self): + """ + Get the current parameters of the quantum system. + + Returns: + np.ndarray: The current parameters. + """ + return self.parameters + + def reset_parameters(self): + """ + Reset the parameters of the quantum system to zero. + """ + self.parameters = np.zeros(self.num_qubits) + + def encode_dna_sequence(self, sequence): + # Encode a DNA sequence into a quantum state + dna_mapping = {'A': 0, 'T': 1, 'C': 2, 'G': 3} + encoded_state = np.zeros(self.num_qubits) + for i, base in enumerate(sequence): + encoded_state[i % self.num_qubits] += dna_mapping[base] + return encoded_state / np.linalg.norm(encoded_state) \ No newline at end of file diff --git a/src/utils/helpers.py b/src/utils/helpers.py new file mode 100644 index 0000000..2d94c10 --- /dev/null +++ b/src/utils/helpers.py @@ -0,0 +1,34 @@ +import numpy as np # type: ignore + +def phi_pi_transition(): + # Implement the phi-pi transition logic + return state * transition_constant # type: ignore + pass + +def generate_harmony_vector(num_qubits): + return np.random.rand(num_qubits) + +def state_to_dna(state): + # Convert state to DNA sequence + return ''.join(['A' if x < 0.25 else 'C' if x < 0.5 else 'G' if x < 0.75 else 'T' for x in state]) + pass + +def count_valid_codons(dna_sequence): + # Count valid codons in DNA sequence + return sum(1 for i in range(0, len(dna_sequence), 3) if dna_sequence[i:i+3] in ['ATG', 'TAA', 'TAG', 'TGA']) + pass + +def calculate_gc_content(dna_sequence): + # Calculate GC content of DNA sequence + return (dna_sequence.count('G') + dna_sequence.count('C')) / len(dna_sequence) + pass + +def calculate_base_balance(dna_sequence): + # Calculate base balance of DNA sequence + return { + 'A': dna_sequence.count('A'), + 'C': dna_sequence.count('C'), + 'G': dna_sequence.count('G'), + 'T': dna_sequence.count('T') + } + pass \ No newline at end of file diff --git a/src/utils/utils.py b/src/utils/utils.py new file mode 100644 index 0000000..b2218b1 --- /dev/null +++ b/src/utils/utils.py @@ -0,0 +1,37 @@ +import math +import numpy as np # type: ignore + +BASE_PAIRS = ['A', 'T', 'G', 'C'] +START_CODON = 'ATG' +STOP_CODONS = ['TAA', 'TAG', 'TGA'] + +PHI = (1 + math.sqrt(5)) / 2 # Golden ratio + +def phi_pi_transition(state, transition_constant): + # Example implementation of phi_pi_transition + return state * transition_constant + +def generate_harmony_vector(size): + # Example implementation of generating a harmony vector + return np.random.rand(size) + +def state_to_dna(state): + # Example implementation of converting state to DNA sequence + return ''.join(['A' if x < 0.25 else 'C' if x < 0.5 else 'G' if x < 0.75 else 'T' for x in state]) + +def count_valid_codons(dna_sequence): + # Example implementation of counting valid codons + return sum(1 for i in range(0, len(dna_sequence), 3) if dna_sequence[i:i+3] in ['ATG', 'TAA', 'TAG', 'TGA']) + +def calculate_gc_content(dna_sequence): + # Example implementation of calculating GC content + return (dna_sequence.count('G') + dna_sequence.count('C')) / len(dna_sequence) + +def calculate_base_balance(dna_sequence): + # Example implementation of calculating base balance + return { + 'A': dna_sequence.count('A'), + 'C': dna_sequence.count('C'), + 'G': dna_sequence.count('G'), + 'T': dna_sequence.count('T') + } \ No newline at end of file diff --git a/tests/test_harmonic_balancer.py b/tests/test_harmonic_balancer.py new file mode 100644 index 0000000..c489adb --- /dev/null +++ b/tests/test_harmonic_balancer.py @@ -0,0 +1,67 @@ +import unittest +import numpy as np +from src.harmonic_balancer import EnhancedHarmonicBalancer + +class TestEnhancedHarmonicBalancer(unittest.TestCase): + + def setUp(self): + self.balancer = EnhancedHarmonicBalancer(base_frequency=60, num_harmonics=5, application='power') + + def test_initialization(self): + self.assertEqual(self.balancer.base_frequency, 60) + self.assertEqual(self.balancer.num_harmonics, 5) + self.assertEqual(self.balancer.application, 'power') + self.assertEqual(len(self.balancer.psi), 5) + + def test_detect_base_frequency(self): + t = np.linspace(0, 1, 1000) + signal = np.sin(2 * np.pi * 60 * t) + detected_freq = self.balancer.detect_base_frequency(signal, sample_rate=1000) + self.assertAlmostEqual(detected_freq, 60, delta=1) + + def test_calculate_thd(self): + t = np.linspace(0, 1, 1000) + signal = np.sin(2 * np.pi * 60 * t) + 0.1 * np.sin(2 * np.pi * 120 * t) + thd = self.balancer.calculate_thd(signal, sample_rate=1000) + self.assertGreater(thd, 0) + self.assertLess(thd, 1) + + def test_balance_signal(self): + t = np.linspace(0, 1, 1000) + signal = np.sin(2 * np.pi * 60 * t) + 0.5 * np.sin(2 * np.pi * 120 * t) + balanced = self.balancer.balance_signal(signal, sample_rate=1000) + self.assertEqual(len(balanced), len(signal)) + self.assertNotEqual(np.sum(np.abs(balanced - signal)), 0) + + def test_power_specific_processing(self): + t = np.linspace(0, 1, 1000) + signal = np.sin(2 * np.pi * 60 * t) + 0.5 * np.sin(2 * np.pi * 180 * t) + processed = self.balancer.power_specific_processing(signal, sample_rate=1000) + self.assertEqual(len(processed), len(signal)) + + # Test if harmonics are reduced + fft_original = np.fft.fft(signal) + fft_processed = np.fft.fft(processed) + self.assertLess(np.abs(fft_processed[180]), np.abs(fft_original[180])) + + def test_combined_approach(self): + t = np.linspace(0, 1, 1000) + signal = np.sin(2 * np.pi * 60 * t) + 0.5 * np.sin(2 * np.pi * 120 * t) + 0.3 * np.sin(2 * np.pi * 180 * t) + processed = self.balancer.power_specific_processing(signal, sample_rate=1000) + + # Check if the signal is changed + self.assertFalse(np.array_equal(signal, processed)) + + # Check if harmonics are reduced + fft_original = np.fft.fft(signal) + fft_processed = np.fft.fft(processed) + self.assertLess(np.abs(fft_processed[120]), np.abs(fft_original[120])) + self.assertLess(np.abs(fft_processed[180]), np.abs(fft_original[180])) + + # Check if THD is improved + thd_original = self.balancer.calculate_thd(signal, 1000) + thd_processed = self.balancer.calculate_thd(processed, 1000) + self.assertLess(thd_processed, thd_original) + +if __name__ == '__main__': + unittest.main() diff --git a/tests/test_integration_harmonic_balancer.py b/tests/test_integration_harmonic_balancer.py new file mode 100644 index 0000000..5efb908 --- /dev/null +++ b/tests/test_integration_harmonic_balancer.py @@ -0,0 +1,126 @@ +import unittest +import numpy as np +from src.harmonic_balancer import EnhancedHarmonicBalancer +import time + +class TestEnhancedHarmonicBalancerIntegration(unittest.TestCase): + + def setUp(self): + self.balancer = EnhancedHarmonicBalancer(base_frequency=60, num_harmonics=5, application='power') + + def generate_signal(self, duration, sample_rate, base_freq, harmonics): + t = np.linspace(0, duration, int(duration * sample_rate)) + signal = np.zeros_like(t) + for i, amplitude in enumerate(harmonics): + signal += amplitude * np.sin(2 * np.pi * base_freq * (i + 1) * t) + return signal + + def test_complex_signal_balancing(self): + # Generate a complex signal with multiple harmonics + signal = self.generate_signal(duration=1, sample_rate=1000, base_freq=60, + harmonics=[1, 0.5, 0.3, 0.2, 0.1]) + + balanced = self.balancer.balance_signal(signal, sample_rate=1000) + + # Check if THD is improved + original_thd = self.balancer.calculate_thd(signal, 1000) + balanced_thd = self.balancer.calculate_thd(balanced, 1000) + self.assertLess(balanced_thd, original_thd) + + def test_frequency_drift_adaptation(self): + # Generate a signal with drifting frequency + t = np.linspace(0, 1, 1000) + drifting_freq = 60 + 2 * np.sin(2 * np.pi * 0.5 * t) # Frequency drifts between 58 and 62 Hz + signal = np.sin(2 * np.pi * drifting_freq * t) + + initial_frequency = self.balancer.base_frequency + balanced = self.balancer.balance_signal(signal, sample_rate=1000) + + # Check if the balancer detected any frequency change + self.assertNotEqual(self.balancer.base_frequency, initial_frequency) + self.assertNotEqual(self.balancer.base_frequency, 60) + + def test_noise_robustness(self): + # Generate a signal with added noise + signal = self.generate_signal(duration=1, sample_rate=1000, base_freq=60, + harmonics=[1, 0.5, 0.3]) + noise = np.random.normal(0, 0.1, signal.shape) + noisy_signal = signal + noise + + balanced = self.balancer.balance_signal(noisy_signal, sample_rate=1000) + + # Check if the balancer still improves THD despite noise + original_thd = self.balancer.calculate_thd(noisy_signal, 1000) + balanced_thd = self.balancer.calculate_thd(balanced, 1000) + self.assertLess(balanced_thd, original_thd) + + def test_performance_scaling(self): + durations = [0.1, 1, 10, 100] # Logarithmic scale + execution_times = [] + + for duration in durations: + signal = self.generate_signal(duration=duration, sample_rate=1000, base_freq=60, + harmonics=[1, 0.5, 0.3, 0.2, 0.1]) + + # Run the balance_signal method multiple times and take the average + num_runs = 5 + total_time = 0 + for _ in range(num_runs): + start_time = time.time() + self.balancer.balance_signal(signal, sample_rate=1000) + end_time = time.time() + total_time += end_time - start_time + + average_time = total_time / num_runs + execution_times.append(average_time) + + # Print execution times for debugging + for duration, exec_time in zip(durations, execution_times): + print(f"Duration: {duration}s, Execution time: {exec_time:.6f}s") + + # Check if there's a general increasing trend + # We'll allow for small fluctuations by checking if at least 2 out of 3 transitions show an increase + increases = sum(execution_times[i] < execution_times[i+1] for i in range(len(execution_times)-1)) + self.assertGreaterEqual(increases, 2, "Execution time should generally increase with signal duration") + + # Check if the execution time for the longest duration is significantly larger than for the shortest + self.assertGreater(execution_times[-1], execution_times[0] * 1.5, + "Execution time for the longest duration should be at least 1.5 times that of the shortest duration") + + def test_edge_case_very_low_frequency(self): + signal = self.generate_signal(duration=10, sample_rate=1000, base_freq=1, + harmonics=[1, 0.5, 0.3]) + + balanced = self.balancer.balance_signal(signal, sample_rate=1000) + + # Check if the balancer can handle very low frequencies + self.assertIsNotNone(balanced) + self.assertEqual(len(balanced), len(signal)) + + def test_edge_case_very_high_frequency(self): + signal = self.generate_signal(duration=0.1, sample_rate=100000, base_freq=10000, + harmonics=[1, 0.5, 0.3]) + + balanced = self.balancer.balance_signal(signal, sample_rate=100000) + + # Check if the balancer can handle very high frequencies + self.assertIsNotNone(balanced) + self.assertEqual(len(balanced), len(signal)) + + def test_application_specific_processing(self): + signal = self.generate_signal(duration=1, sample_rate=1000, base_freq=60, + harmonics=[1, 0, 0.3, 0, 0.1]) # Only odd harmonics for power systems + + # Test power application + self.balancer.application = 'power' + power_balanced = self.balancer.balance_signal(signal, sample_rate=1000) + + # Test vibration application + self.balancer = EnhancedHarmonicBalancer(base_frequency=60, num_harmonics=5, application='vibration') + vibration_balanced = self.balancer.balance_signal(signal, sample_rate=1000) + + # Check if the results are different for different applications + self.assertFalse(np.array_equal(power_balanced, vibration_balanced)) + +if __name__ == '__main__': + unittest.main() \ No newline at end of file