From 0919ac5b1e1e01d6e83d6d19a9ce783051a90f43 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=83=AD=E9=A3=9E?= Date: Tue, 12 Dec 2023 18:05:31 +0800 Subject: [PATCH] echo watermark (#1) * add: echo watermark for wav --- example/example_echo_watermark.py | 19 +- example/example_hide_in_music.py | 4 +- .../sounds.wav | Bin hide_info/echo_watermark.py | 230 +++++++----------- hide_info/utils.py | 20 +- 5 files changed, 121 insertions(+), 152 deletions(-) rename "example/\351\237\263\344\271\220.wav" => example/sounds.wav (100%) diff --git a/example/example_echo_watermark.py b/example/example_echo_watermark.py index eeafbd9..9eda6c6 100644 --- a/example/example_echo_watermark.py +++ b/example/example_echo_watermark.py @@ -2,27 +2,27 @@ from hide_info.echo_watermark import EchoWatermark, get_error_rate, get_snr from hide_info import utils -ori_file = "音乐.wav" -embedded_file = "打入水印_音乐.wav" # 嵌入了水印的文件 -wm_str = "回声水印算法测试用水印" +ori_file = "sounds.wav" +embedded_file = "sounds_with_watermark.wav" +wm_str = "回声水印算法,欢迎 star!" -wm_bits = [int(i) for i in utils.bytes2bin(wm_str.encode('utf-8'))] +wm_bits = utils.bytes2bin(wm_str.encode('utf-8')) len_wm_bits = len(wm_bits) # embed: -echo_wm = EchoWatermark(pwd=111001, algo_type=2) +echo_wm = EchoWatermark(pwd=111001) echo_wm.embed(origin_filename=ori_file, wm_bits=wm_bits, embed_filename=embedded_file) # extract: -echo_wm = EchoWatermark(pwd=111001, algo_type=2) +echo_wm = EchoWatermark(pwd=111001) wm_extract = echo_wm.extract(embed_filename=embedded_file, len_wm_bits=len_wm_bits) -wm_str_extract = utils.bin2bytes(''.join([str(int(i)) for i in wm_extract])).decode('utf-8') +wm_str_extract = utils.bin2bytes(wm_extract).decode('utf-8', errors='replace') print("解出水印:", wm_str_extract) get_error_rate(wm_extract, wm_bits) -# %% +# %% There are 3 algorithms: wm_bits = np.random.randint(2, size=200).tolist() len_wm_bits = len(wm_bits) @@ -30,5 +30,6 @@ echo_wm = EchoWatermark(pwd=111001, algo_type=algo_type) echo_wm.embed(origin_filename=ori_file, wm_bits=wm_bits, embed_filename=embedded_file) wm_extract = echo_wm.extract(embed_filename=embedded_file, len_wm_bits=len_wm_bits) - get_error_rate(wm_extract, wm_bits) + error_rate = get_error_rate(wm_extract, wm_bits) get_snr(embedded_file, ori_file) + assert error_rate <= 0.03 diff --git a/example/example_hide_in_music.py b/example/example_hide_in_music.py index 218bfe8..7047d82 100644 --- a/example/example_hide_in_music.py +++ b/example/example_hide_in_music.py @@ -2,7 +2,7 @@ text = "待嵌入到音乐文件的文本,下面的代码中,会把这段文本以二进制形式隐藏到一个音乐文件中" -hide_in_music.encode(text.encode('utf-8'), music_filename="音乐.wav", music_filename_new="藏文于音.wav") +hide_in_music.encode(text.encode('utf-8'), music_filename="sounds.wav", music_filename_new="藏文于音.wav") text_encode = hide_in_music.decode(music_filename="藏文于音.wav") @@ -10,7 +10,7 @@ # %% # 把文件隐藏到某个音乐中 -hide_in_music.file_encode(filename='要隐藏的文件.zip', music_filename="音乐.wav", music_filename_new="藏物于音.wav") +hide_in_music.file_encode(filename='要隐藏的文件.zip', music_filename="sounds.wav", music_filename_new="藏物于音.wav") # 从音乐中提取文件 hide_in_music.file_decode(filename="藏物于音-解出的文件.zip", music_filename="藏物于音.wav") diff --git "a/example/\351\237\263\344\271\220.wav" b/example/sounds.wav similarity index 100% rename from "example/\351\237\263\344\271\220.wav" rename to example/sounds.wav diff --git a/hide_info/echo_watermark.py b/hide_info/echo_watermark.py index b48701e..d98c831 100644 --- a/hide_info/echo_watermark.py +++ b/hide_info/echo_watermark.py @@ -1,45 +1,43 @@ # 回声音频水印 import numpy as np from scipy.io import wavfile -from scipy.signal import windows +from scipy.signal.windows import hann +from numpy.fft import ifft, fft class EchoWatermark: - def __init__(self, pwd, algo_type=2, verbose=False): + def __init__(self, pwd, algo_type=3, verbose=False): self.pwd = pwd self.algo_type = algo_type self.verbose = verbose - self.FRAME_LENGTH = 2048 # 帧长度 - self.CONTROL_STRENGTH = 0.2 # 嵌入强度 - self.OVERLAP = 0.5 # 帧分析的重叠率 - self.NEGATIVE_DELAY = 4 # negative delay, for negative echo - self.LOG_FLOOR = 0.00001 + self.frame_len = 2048 # 帧长度 + self.echo_amplitude = 0.2 # 回声幅度 + self.overlap = 0.5 # 帧分析的重叠率 + self.neg_delay = 4 # negative delay, for negative echo # 回声参数 - # for key 1 + # pwd[i] = 1 self.delay11, self.delay10 = 100, 110 - # for key 0 + # pwd[i] = 0 self.delay01, self.delay00 = 120, 130 - def embed(self, origin_filename,wm_bits, embed_filename): - pwd = self.pwd - algo_type = self.algo_type - FRAME_LENGTH = self.FRAME_LENGTH - CONTROL_STRENGTH = self.CONTROL_STRENGTH - OVERLAP = self.OVERLAP - NEGATIVE_DELAY = self.NEGATIVE_DELAY - LOG_FLOOR = self.LOG_FLOOR - delay11, delay10, delay01, delay00 = self.delay11, self.delay10, self.delay01, self.delay00 + def embed(self, origin_filename, wm_bits, embed_filename): + frame_len = self.frame_len + echo_amplitude = self.echo_amplitude + overlap = self.overlap + neg_delay = self.neg_delay + + delay_matrix = [[self.delay00, self.delay01], [self.delay10, self.delay11]] - sr, host_signal = wavfile.read(origin_filename) - signal_len = len(host_signal) + sr, ori_signal = wavfile.read(origin_filename) + signal_len = len(ori_signal) # 帧的移动量 - frame_shift = int(FRAME_LENGTH * (1 - OVERLAP)) + frame_shift = int(frame_len * (1 - overlap)) # 和相邻帧的重叠长度 - overlap_length = int(FRAME_LENGTH * OVERLAP) + overlap_length = int(frame_len * overlap) # 可嵌入总比特数 embed_nbit_ = (signal_len - overlap_length) // frame_shift @@ -57,81 +55,62 @@ def embed(self, origin_filename,wm_bits, embed_filename): f"可以嵌入的总比特数为: {embed_nbit_},水印长度为{len(wm_bits)},重复嵌入 {n_repeat} 次, 实际嵌入{embed_nbit}") # 扩展水印信号 - wmark_extended = np.repeat(wm_bits, n_repeat) + wm_repeat = np.repeat(wm_bits, n_repeat) # 生成密钥 - np.random.seed(pwd) + np.random.seed(self.pwd) secret_key = np.random.randint(2, size=int(len_wm_bits)) secret_key_extended = np.repeat(secret_key, n_repeat) pointer = 0 - echoed_signal1 = np.zeros((frame_shift * embed_nbit)) - prev1 = np.zeros((FRAME_LENGTH)) - de = NEGATIVE_DELAY # + echoed_signal = np.zeros((frame_shift * embed_nbit)) + prev1 = np.zeros(frame_len) + for i in range(embed_nbit): - frame = host_signal[pointer: (pointer + FRAME_LENGTH)] + frame = ori_signal[pointer: (pointer + frame_len)] - if secret_key_extended[i] == 1: - if wmark_extended[i] == 1: - delay = delay11 - else: - delay = delay10 - else: - if wmark_extended[i] == 1: - delay = delay01 - else: - delay = delay00 - - echo_positive = CONTROL_STRENGTH \ - * np.concatenate((np.zeros(delay), - frame[0:FRAME_LENGTH - delay])) - - echo_negative = - CONTROL_STRENGTH \ - * np.concatenate((np.zeros(delay + de), - frame[0:FRAME_LENGTH - delay - de])) - - echo_forward = CONTROL_STRENGTH \ - * np.concatenate((frame[delay:FRAME_LENGTH], np.zeros(delay))) - - if algo_type == 1: - echoed_frame1 = frame + echo_positive - elif algo_type == 2: - echoed_frame1 = frame + echo_positive + echo_negative - else: # algo_type == 3 - echoed_frame1 = frame + echo_positive + echo_forward + delay = delay_matrix[secret_key_extended[i]][wm_repeat[i]] - echoed_frame1 = echoed_frame1 * windows.hann(FRAME_LENGTH) - echoed_signal1[frame_shift * i: frame_shift * (i + 1)] = \ - np.concatenate((prev1[frame_shift:FRAME_LENGTH] + - echoed_frame1[0:overlap_length], - echoed_frame1[overlap_length:frame_shift])) + echo_positive = np.concatenate((np.zeros(delay), frame[0:frame_len - delay])) - prev1 = echoed_frame1 - pointer = pointer + frame_shift + echo_negative = - np.concatenate((np.zeros(delay + neg_delay), + frame[0:frame_len - delay - neg_delay])) + + echo_forward = np.concatenate((frame[delay:frame_len], np.zeros(delay))) - echoed_signal1 = np.concatenate( - (echoed_signal1, host_signal[len(echoed_signal1): signal_len])) + if self.algo_type == 1: + echoed_frame = frame + echo_amplitude * echo_positive + elif self.algo_type == 2: + echoed_frame = frame + echo_amplitude * (echo_positive + echo_negative) + else: # algo_type == 3 + echoed_frame = frame + echo_amplitude * (echo_positive + echo_forward) + + echoed_frame = echoed_frame * hann(frame_len) + echoed_signal[frame_shift * i: frame_shift * (i + 1)] = \ + np.concatenate((prev1[frame_shift:frame_len] + + echoed_frame[0:overlap_length], + echoed_frame[overlap_length:frame_shift])) - # 将保存为wav格式 - echoed_signal1 = echoed_signal1.astype(np.int16) - wavfile.write(embed_filename, sr, echoed_signal1) + prev1 = echoed_frame + pointer += frame_shift + + echoed_signal = np.concatenate((echoed_signal, ori_signal[len(echoed_signal):])).astype(np.int16) + # 保存为wav格式 + wavfile.write(embed_filename, sr, echoed_signal) def extract(self, embed_filename, len_wm_bits): - pwd = self.pwd - algo_type = self.algo_type - FRAME_LENGTH = self.FRAME_LENGTH - CONTROL_STRENGTH = self.CONTROL_STRENGTH - OVERLAP = self.OVERLAP - NEGATIVE_DELAY = self.NEGATIVE_DELAY - LOG_FLOOR = self.LOG_FLOOR + frame_len = self.frame_len + overlap = self.overlap + neg_delay = self.neg_delay delay11, delay10, delay01, delay00 = self.delay11, self.delay10, self.delay01, self.delay00 + log_floor = 0.00001 # 取对数时的最小值 # 打开已嵌入水印的音频文件 - _, eval_signal1 = wavfile.read(embed_filename) - signal_len = len(eval_signal1) + _, wm_signal = wavfile.read(embed_filename) + signal_len = len(wm_signal) - frame_shift = int(FRAME_LENGTH * (1 - OVERLAP)) - embed_nbit_ = (signal_len - int(FRAME_LENGTH * OVERLAP)) // frame_shift + frame_shift = int(frame_len * (1 - overlap)) + embed_nbit_ = (signal_len - int(frame_len * overlap)) // frame_shift # 重复次数 n_repeat = embed_nbit_ // len_wm_bits @@ -144,92 +123,67 @@ def extract(self, embed_filename, len_wm_bits): f"可以嵌入的总比特数为: {embed_nbit_},水印长度为{len_wm_bits},重复嵌入 {n_repeat} 次, 实际嵌入{embed_nbit}") # 加载密钥 - np.random.seed(pwd) + np.random.seed(self.pwd) secret_key = np.random.randint(2, size=int(len_wm_bits)) secret_key = np.repeat(secret_key, n_repeat) pointer = 0 detected_bit1 = np.zeros(embed_nbit) for i in range(embed_nbit): - wmarked_frame1 = eval_signal1[pointer: pointer + FRAME_LENGTH] - ceps1 = np.fft.ifft( - np.log(np.square(np.fft.fft(wmarked_frame1)) + LOG_FLOOR)).real + wmarked_frame1 = wm_signal[pointer: pointer + frame_len] + ceps1 = ifft( + np.log(np.square(fft(wmarked_frame1)) + log_floor)).real if secret_key[i] == 1: - if algo_type == 1: - if ceps1[delay11] > ceps1[delay10]: - detected_bit1[i] = 1 - else: - detected_bit1[i] = 0 - elif algo_type == 2: - if (ceps1[delay11] - ceps1[delay11 + NEGATIVE_DELAY]) > \ - (ceps1[delay10] - ceps1[delay10 + NEGATIVE_DELAY]): - detected_bit1[i] = 1 - else: - detected_bit1[i] = 0 - else: # algo_type == 3 - if ceps1[delay11] > ceps1[delay10]: - detected_bit1[i] = 1 - else: - detected_bit1[i] = 0 - + delay0, delay1 = delay10, delay11 else: - if algo_type == 1: - if ceps1[delay01] > ceps1[delay00]: - detected_bit1[i] = 1 - else: - detected_bit1[i] = 0 - elif algo_type == 2: - if (ceps1[delay01] - ceps1[delay01 + NEGATIVE_DELAY]) > \ - (ceps1[delay00] - ceps1[delay00 + NEGATIVE_DELAY]): - detected_bit1[i] = 1 - else: - detected_bit1[i] = 0 - else: - if ceps1[delay01] > ceps1[delay00]: - detected_bit1[i] = 1 - else: - detected_bit1[i] = 0 + delay0, delay1 = delay00, delay01 + + if self.algo_type == 1: + if ceps1[delay1] > ceps1[delay0]: + detected_bit1[i] = 1 + elif self.algo_type == 2: + if (ceps1[delay1] - ceps1[delay1 + neg_delay]) > \ + (ceps1[delay0] - ceps1[delay0 + neg_delay]): + detected_bit1[i] = 1 + else: # algo_type == 3 + if ceps1[delay1] > ceps1[delay0]: + detected_bit1[i] = 1 pointer = pointer + frame_shift count = 0 - wmark_recovered1 = np.zeros(len_wm_bits) - # wmark_recovered2 = np.zeros(len_wm_bits) - # wmark_recovered3 = np.zeros(len_wm_bits) + wm_extract = np.zeros(len_wm_bits) for i in range(len_wm_bits): - - # 汇总比特值(平均值) - ave = np.sum(detected_bit1[count:count + n_repeat]) / n_repeat + # 汇总比特值(按平均值) + ave = np.average(detected_bit1[count:count + n_repeat]) if ave >= 0.5: - wmark_recovered1[i] = 1 + wm_extract[i] = 1 else: - wmark_recovered1[i] = 0 + wm_extract[i] = 0 count = count + n_repeat - return wmark_recovered1 + return wm_extract -def get_error_rate(wmark_recovered, wm_bits): +def get_error_rate(wm_extract, wm_bits): # 计算错误率 len_wm_bits = len(wm_bits) - denom = np.sum(np.abs(wmark_recovered - wm_bits)) - BER = np.sum(np.abs(wmark_recovered - wm_bits)) / \ - len_wm_bits * 100 - print(f'bit error rate = {BER:.2f}% ({denom} / {len_wm_bits})') - return BER + error_num = np.sum(np.abs(wm_extract - wm_bits)) + error_rate = error_num / len_wm_bits + print(f'bit error rate = {error_rate:.2%} ({error_num} / {len_wm_bits})') + return error_rate -def get_snr(wav_with_wm, orig_file): - sr, host_signal = wavfile.read(orig_file) - _, eval_signal1 = wavfile.read(wav_with_wm) +def get_snr(file_with_wm, orig_file): + sr, ori_signal = wavfile.read(orig_file) + _, wm_signal = wavfile.read(file_with_wm) SNR = 10 * np.log10( - np.sum(np.square(host_signal.astype(np.float32))) - / np.sum(np.square(host_signal.astype(np.float32) - - eval_signal1.astype(np.float32)))) + np.sum(np.square(ori_signal.astype(np.float32))) + / np.sum(np.square(ori_signal.astype(np.float32) + - wm_signal.astype(np.float32)))) print(f'SNR = {SNR:.2f} dB') return SNR - diff --git a/hide_info/utils.py b/hide_info/utils.py index 5b4c069..1d234ea 100644 --- a/hide_info/utils.py +++ b/hide_info/utils.py @@ -17,15 +17,29 @@ def deserialization(serialized_data: bytes): return serialized_data[4:4 + int.from_bytes(serialized_data[:4], byteorder="big")] -def bytes2bin(bytes1: bytes) -> str: +def bytes2bin_(bytes1: bytes) -> str: """ 把 bytes 转化为 "10110" 这种形式的二进制 """ return ''.join([format(i, '08b') for i in bytes1]) -def bin2bytes(bin1: str) -> bytes: +def bin2bytes_(bin1: str) -> bytes: """ - bytes2bin 的相反操作 + bytes2bin_ 的相反操作 """ return b''.join([struct.pack('>B', int(bin1[i * 8:i * 8 + 8], base=2)) for i in range(len(bin1) // 8)]) + + +def bytes2bin(bytes1: bytes) -> list: + """ + 把 bytes 转化为 [1, 0, 1, 1, 0] 这种形式的二进制 + """ + return [int(i) for i in bytes2bin_(bytes1)] + + +def bin2bytes(bin1: list) -> bytes: + """ + bytes2bin 的相反操作 + """ + return bin2bytes_(''.join([str(int(i)) for i in bin1]))