From b92cdf1c475777299b97e1ad34ec6e8eadc39fd9 Mon Sep 17 00:00:00 2001 From: guofei9987 Date: Sat, 10 Dec 2022 16:33:42 +0800 Subject: [PATCH 1/7] add: echo watermark for wav --- example/example_echo_watermark.py | 17 +++-- example/example_hide_in_music.py | 4 +- .../sounds.wav | Bin hide_info/echo_watermark.py | 69 ++++++++---------- hide_info/utils.py | 20 ++++- 5 files changed, 60 insertions(+), 50 deletions(-) rename "example/\351\237\263\344\271\220.wav" => example/sounds.wav (100%) diff --git a/example/example_echo_watermark.py b/example/example_echo_watermark.py index eeafbd9..e58b942 100644 --- a/example/example_echo_watermark.py +++ b/example/example_echo_watermark.py @@ -2,27 +2,27 @@ from hide_info.echo_watermark import EchoWatermark, get_error_rate, get_snr from hide_info import utils -ori_file = "音乐.wav" -embedded_file = "打入水印_音乐.wav" # 嵌入了水印的文件 +ori_file = "sounds.wav" +embedded_file = "sounds_with_watermark.wav" wm_str = "回声水印算法测试用水印" -wm_bits = [int(i) for i in utils.bytes2bin(wm_str.encode('utf-8'))] +wm_bits = utils.bytes2bin(wm_str.encode('utf-8')) len_wm_bits = len(wm_bits) # embed: -echo_wm = EchoWatermark(pwd=111001, algo_type=2) +echo_wm = EchoWatermark(pwd=111001) echo_wm.embed(origin_filename=ori_file, wm_bits=wm_bits, embed_filename=embedded_file) # extract: -echo_wm = EchoWatermark(pwd=111001, algo_type=2) +echo_wm = EchoWatermark(pwd=111001) wm_extract = echo_wm.extract(embed_filename=embedded_file, len_wm_bits=len_wm_bits) -wm_str_extract = utils.bin2bytes(''.join([str(int(i)) for i in wm_extract])).decode('utf-8') +wm_str_extract = utils.bin2bytes(wm_extract).decode('utf-8') print("解出水印:", wm_str_extract) get_error_rate(wm_extract, wm_bits) -# %% +# %% There are 3 algorithms: wm_bits = np.random.randint(2, size=200).tolist() len_wm_bits = len(wm_bits) @@ -30,5 +30,6 @@ echo_wm = EchoWatermark(pwd=111001, algo_type=algo_type) echo_wm.embed(origin_filename=ori_file, wm_bits=wm_bits, embed_filename=embedded_file) wm_extract = echo_wm.extract(embed_filename=embedded_file, len_wm_bits=len_wm_bits) - get_error_rate(wm_extract, wm_bits) + error_rate = get_error_rate(wm_extract, wm_bits) get_snr(embedded_file, ori_file) + assert error_rate < 0.01 diff --git a/example/example_hide_in_music.py b/example/example_hide_in_music.py index 218bfe8..7047d82 100644 --- a/example/example_hide_in_music.py +++ b/example/example_hide_in_music.py @@ -2,7 +2,7 @@ text = "待嵌入到音乐文件的文本,下面的代码中,会把这段文本以二进制形式隐藏到一个音乐文件中" -hide_in_music.encode(text.encode('utf-8'), music_filename="音乐.wav", music_filename_new="藏文于音.wav") +hide_in_music.encode(text.encode('utf-8'), music_filename="sounds.wav", music_filename_new="藏文于音.wav") text_encode = hide_in_music.decode(music_filename="藏文于音.wav") @@ -10,7 +10,7 @@ # %% # 把文件隐藏到某个音乐中 -hide_in_music.file_encode(filename='要隐藏的文件.zip', music_filename="音乐.wav", music_filename_new="藏物于音.wav") +hide_in_music.file_encode(filename='要隐藏的文件.zip', music_filename="sounds.wav", music_filename_new="藏物于音.wav") # 从音乐中提取文件 hide_in_music.file_decode(filename="藏物于音-解出的文件.zip", music_filename="藏物于音.wav") diff --git "a/example/\351\237\263\344\271\220.wav" b/example/sounds.wav similarity index 100% rename from "example/\351\237\263\344\271\220.wav" rename to example/sounds.wav diff --git a/hide_info/echo_watermark.py b/hide_info/echo_watermark.py index b48701e..399c71a 100644 --- a/hide_info/echo_watermark.py +++ b/hide_info/echo_watermark.py @@ -10,23 +10,23 @@ def __init__(self, pwd, algo_type=2, verbose=False): self.algo_type = algo_type self.verbose = verbose - self.FRAME_LENGTH = 2048 # 帧长度 - self.CONTROL_STRENGTH = 0.2 # 嵌入强度 + self.frame_len = 2048 # 帧长度 + self.echo_amplitude = 0.2 # 回声幅度 self.OVERLAP = 0.5 # 帧分析的重叠率 self.NEGATIVE_DELAY = 4 # negative delay, for negative echo self.LOG_FLOOR = 0.00001 # 回声参数 - # for key 1 + # keyword = 1 self.delay11, self.delay10 = 100, 110 - # for key 0 + # keyword = 0 self.delay01, self.delay00 = 120, 130 - def embed(self, origin_filename,wm_bits, embed_filename): + def embed(self, origin_filename, wm_bits, embed_filename): pwd = self.pwd algo_type = self.algo_type - FRAME_LENGTH = self.FRAME_LENGTH - CONTROL_STRENGTH = self.CONTROL_STRENGTH + frame_len = self.frame_len + echo_amplitude = self.echo_amplitude OVERLAP = self.OVERLAP NEGATIVE_DELAY = self.NEGATIVE_DELAY LOG_FLOOR = self.LOG_FLOOR @@ -36,10 +36,10 @@ def embed(self, origin_filename,wm_bits, embed_filename): signal_len = len(host_signal) # 帧的移动量 - frame_shift = int(FRAME_LENGTH * (1 - OVERLAP)) + frame_shift = int(frame_len * (1 - OVERLAP)) # 和相邻帧的重叠长度 - overlap_length = int(FRAME_LENGTH * OVERLAP) + overlap_length = int(frame_len * OVERLAP) # 可嵌入总比特数 embed_nbit_ = (signal_len - overlap_length) // frame_shift @@ -57,7 +57,7 @@ def embed(self, origin_filename,wm_bits, embed_filename): f"可以嵌入的总比特数为: {embed_nbit_},水印长度为{len(wm_bits)},重复嵌入 {n_repeat} 次, 实际嵌入{embed_nbit}") # 扩展水印信号 - wmark_extended = np.repeat(wm_bits, n_repeat) + wm_repeat = np.repeat(wm_bits, n_repeat) # 生成密钥 np.random.seed(pwd) @@ -66,32 +66,32 @@ def embed(self, origin_filename,wm_bits, embed_filename): pointer = 0 echoed_signal1 = np.zeros((frame_shift * embed_nbit)) - prev1 = np.zeros((FRAME_LENGTH)) + prev1 = np.zeros((frame_len)) de = NEGATIVE_DELAY # for i in range(embed_nbit): - frame = host_signal[pointer: (pointer + FRAME_LENGTH)] + frame = host_signal[pointer: (pointer + frame_len)] if secret_key_extended[i] == 1: - if wmark_extended[i] == 1: + if wm_repeat[i] == 1: delay = delay11 else: delay = delay10 else: - if wmark_extended[i] == 1: + if wm_repeat[i] == 1: delay = delay01 else: delay = delay00 - echo_positive = CONTROL_STRENGTH \ + echo_positive = echo_amplitude \ * np.concatenate((np.zeros(delay), - frame[0:FRAME_LENGTH - delay])) + frame[0:frame_len - delay])) - echo_negative = - CONTROL_STRENGTH \ + echo_negative = - echo_amplitude \ * np.concatenate((np.zeros(delay + de), - frame[0:FRAME_LENGTH - delay - de])) + frame[0:frame_len - delay - de])) - echo_forward = CONTROL_STRENGTH \ - * np.concatenate((frame[delay:FRAME_LENGTH], np.zeros(delay))) + echo_forward = echo_amplitude \ + * np.concatenate((frame[delay:frame_len], np.zeros(delay))) if algo_type == 1: echoed_frame1 = frame + echo_positive @@ -100,9 +100,9 @@ def embed(self, origin_filename,wm_bits, embed_filename): else: # algo_type == 3 echoed_frame1 = frame + echo_positive + echo_forward - echoed_frame1 = echoed_frame1 * windows.hann(FRAME_LENGTH) + echoed_frame1 = echoed_frame1 * windows.hann(frame_len) echoed_signal1[frame_shift * i: frame_shift * (i + 1)] = \ - np.concatenate((prev1[frame_shift:FRAME_LENGTH] + + np.concatenate((prev1[frame_shift:frame_len] + echoed_frame1[0:overlap_length], echoed_frame1[overlap_length:frame_shift])) @@ -119,8 +119,8 @@ def embed(self, origin_filename,wm_bits, embed_filename): def extract(self, embed_filename, len_wm_bits): pwd = self.pwd algo_type = self.algo_type - FRAME_LENGTH = self.FRAME_LENGTH - CONTROL_STRENGTH = self.CONTROL_STRENGTH + frame_len = self.frame_len + echo_amplitude = self.echo_amplitude OVERLAP = self.OVERLAP NEGATIVE_DELAY = self.NEGATIVE_DELAY LOG_FLOOR = self.LOG_FLOOR @@ -130,8 +130,8 @@ def extract(self, embed_filename, len_wm_bits): _, eval_signal1 = wavfile.read(embed_filename) signal_len = len(eval_signal1) - frame_shift = int(FRAME_LENGTH * (1 - OVERLAP)) - embed_nbit_ = (signal_len - int(FRAME_LENGTH * OVERLAP)) // frame_shift + frame_shift = int(frame_len * (1 - OVERLAP)) + embed_nbit_ = (signal_len - int(frame_len * OVERLAP)) // frame_shift # 重复次数 n_repeat = embed_nbit_ // len_wm_bits @@ -151,7 +151,7 @@ def extract(self, embed_filename, len_wm_bits): pointer = 0 detected_bit1 = np.zeros(embed_nbit) for i in range(embed_nbit): - wmarked_frame1 = eval_signal1[pointer: pointer + FRAME_LENGTH] + wmarked_frame1 = eval_signal1[pointer: pointer + frame_len] ceps1 = np.fft.ifft( np.log(np.square(np.fft.fft(wmarked_frame1)) + LOG_FLOOR)).real @@ -195,12 +195,9 @@ def extract(self, embed_filename, len_wm_bits): count = 0 wmark_recovered1 = np.zeros(len_wm_bits) - # wmark_recovered2 = np.zeros(len_wm_bits) - # wmark_recovered3 = np.zeros(len_wm_bits) for i in range(len_wm_bits): - - # 汇总比特值(平均值) + # 汇总比特值(按平均值) ave = np.sum(detected_bit1[count:count + n_repeat]) / n_repeat if ave >= 0.5: wmark_recovered1[i] = 1 @@ -215,11 +212,10 @@ def extract(self, embed_filename, len_wm_bits): def get_error_rate(wmark_recovered, wm_bits): # 计算错误率 len_wm_bits = len(wm_bits) - denom = np.sum(np.abs(wmark_recovered - wm_bits)) - BER = np.sum(np.abs(wmark_recovered - wm_bits)) / \ - len_wm_bits * 100 - print(f'bit error rate = {BER:.2f}% ({denom} / {len_wm_bits})') - return BER + error_num = np.sum(np.abs(wmark_recovered - wm_bits)) + error_rate = error_num / len_wm_bits + print(f'bit error rate = {error_rate:.2%} ({error_num} / {len_wm_bits})') + return error_rate def get_snr(wav_with_wm, orig_file): @@ -232,4 +228,3 @@ def get_snr(wav_with_wm, orig_file): - eval_signal1.astype(np.float32)))) print(f'SNR = {SNR:.2f} dB') return SNR - diff --git a/hide_info/utils.py b/hide_info/utils.py index 5b4c069..1d234ea 100644 --- a/hide_info/utils.py +++ b/hide_info/utils.py @@ -17,15 +17,29 @@ def deserialization(serialized_data: bytes): return serialized_data[4:4 + int.from_bytes(serialized_data[:4], byteorder="big")] -def bytes2bin(bytes1: bytes) -> str: +def bytes2bin_(bytes1: bytes) -> str: """ 把 bytes 转化为 "10110" 这种形式的二进制 """ return ''.join([format(i, '08b') for i in bytes1]) -def bin2bytes(bin1: str) -> bytes: +def bin2bytes_(bin1: str) -> bytes: """ - bytes2bin 的相反操作 + bytes2bin_ 的相反操作 """ return b''.join([struct.pack('>B', int(bin1[i * 8:i * 8 + 8], base=2)) for i in range(len(bin1) // 8)]) + + +def bytes2bin(bytes1: bytes) -> list: + """ + 把 bytes 转化为 [1, 0, 1, 1, 0] 这种形式的二进制 + """ + return [int(i) for i in bytes2bin_(bytes1)] + + +def bin2bytes(bin1: list) -> bytes: + """ + bytes2bin 的相反操作 + """ + return bin2bytes_(''.join([str(int(i)) for i in bin1])) From 393fbaa28b683a2327a0f9514a068846f7173f4e Mon Sep 17 00:00:00 2001 From: guofei9987 Date: Sat, 10 Dec 2022 16:38:35 +0800 Subject: [PATCH 2/7] add: echo watermark for wav --- hide_info/echo_watermark.py | 21 +++++++++------------ 1 file changed, 9 insertions(+), 12 deletions(-) diff --git a/hide_info/echo_watermark.py b/hide_info/echo_watermark.py index 399c71a..aa30bf4 100644 --- a/hide_info/echo_watermark.py +++ b/hide_info/echo_watermark.py @@ -12,9 +12,8 @@ def __init__(self, pwd, algo_type=2, verbose=False): self.frame_len = 2048 # 帧长度 self.echo_amplitude = 0.2 # 回声幅度 - self.OVERLAP = 0.5 # 帧分析的重叠率 + self.overlap = 0.5 # 帧分析的重叠率 self.NEGATIVE_DELAY = 4 # negative delay, for negative echo - self.LOG_FLOOR = 0.00001 # 回声参数 # keyword = 1 @@ -27,19 +26,18 @@ def embed(self, origin_filename, wm_bits, embed_filename): algo_type = self.algo_type frame_len = self.frame_len echo_amplitude = self.echo_amplitude - OVERLAP = self.OVERLAP + overlap = self.overlap NEGATIVE_DELAY = self.NEGATIVE_DELAY - LOG_FLOOR = self.LOG_FLOOR delay11, delay10, delay01, delay00 = self.delay11, self.delay10, self.delay01, self.delay00 sr, host_signal = wavfile.read(origin_filename) signal_len = len(host_signal) # 帧的移动量 - frame_shift = int(frame_len * (1 - OVERLAP)) + frame_shift = int(frame_len * (1 - overlap)) # 和相邻帧的重叠长度 - overlap_length = int(frame_len * OVERLAP) + overlap_length = int(frame_len * overlap) # 可嵌入总比特数 embed_nbit_ = (signal_len - overlap_length) // frame_shift @@ -120,18 +118,17 @@ def extract(self, embed_filename, len_wm_bits): pwd = self.pwd algo_type = self.algo_type frame_len = self.frame_len - echo_amplitude = self.echo_amplitude - OVERLAP = self.OVERLAP + overlap = self.overlap NEGATIVE_DELAY = self.NEGATIVE_DELAY - LOG_FLOOR = self.LOG_FLOOR delay11, delay10, delay01, delay00 = self.delay11, self.delay10, self.delay01, self.delay00 + log_floor = 0.00001 # 取对数时的最小值 # 打开已嵌入水印的音频文件 _, eval_signal1 = wavfile.read(embed_filename) signal_len = len(eval_signal1) - frame_shift = int(frame_len * (1 - OVERLAP)) - embed_nbit_ = (signal_len - int(frame_len * OVERLAP)) // frame_shift + frame_shift = int(frame_len * (1 - overlap)) + embed_nbit_ = (signal_len - int(frame_len * overlap)) // frame_shift # 重复次数 n_repeat = embed_nbit_ // len_wm_bits @@ -153,7 +150,7 @@ def extract(self, embed_filename, len_wm_bits): for i in range(embed_nbit): wmarked_frame1 = eval_signal1[pointer: pointer + frame_len] ceps1 = np.fft.ifft( - np.log(np.square(np.fft.fft(wmarked_frame1)) + LOG_FLOOR)).real + np.log(np.square(np.fft.fft(wmarked_frame1)) + log_floor)).real if secret_key[i] == 1: if algo_type == 1: From 03c340f048b1fd81d455a564a45376ab9c9fba85 Mon Sep 17 00:00:00 2001 From: guofei9987 Date: Sat, 10 Dec 2022 16:48:54 +0800 Subject: [PATCH 3/7] add: echo watermark for wav --- hide_info/echo_watermark.py | 36 ++++++++++++++++++------------------ 1 file changed, 18 insertions(+), 18 deletions(-) diff --git a/hide_info/echo_watermark.py b/hide_info/echo_watermark.py index aa30bf4..0f0b584 100644 --- a/hide_info/echo_watermark.py +++ b/hide_info/echo_watermark.py @@ -2,6 +2,7 @@ import numpy as np from scipy.io import wavfile from scipy.signal import windows +from numpy.fft import ifft class EchoWatermark: @@ -13,7 +14,7 @@ def __init__(self, pwd, algo_type=2, verbose=False): self.frame_len = 2048 # 帧长度 self.echo_amplitude = 0.2 # 回声幅度 self.overlap = 0.5 # 帧分析的重叠率 - self.NEGATIVE_DELAY = 4 # negative delay, for negative echo + self.neg_delay = 4 # negative delay, for negative echo # 回声参数 # keyword = 1 @@ -27,7 +28,7 @@ def embed(self, origin_filename, wm_bits, embed_filename): frame_len = self.frame_len echo_amplitude = self.echo_amplitude overlap = self.overlap - NEGATIVE_DELAY = self.NEGATIVE_DELAY + neg_delay = self.neg_delay delay11, delay10, delay01, delay00 = self.delay11, self.delay10, self.delay01, self.delay00 sr, host_signal = wavfile.read(origin_filename) @@ -64,8 +65,7 @@ def embed(self, origin_filename, wm_bits, embed_filename): pointer = 0 echoed_signal1 = np.zeros((frame_shift * embed_nbit)) - prev1 = np.zeros((frame_len)) - de = NEGATIVE_DELAY # + prev1 = np.zeros(frame_len) for i in range(embed_nbit): frame = host_signal[pointer: (pointer + frame_len)] @@ -81,12 +81,12 @@ def embed(self, origin_filename, wm_bits, embed_filename): delay = delay00 echo_positive = echo_amplitude \ - * np.concatenate((np.zeros(delay), - frame[0:frame_len - delay])) + * np.concatenate( + (np.zeros(delay), frame[0:frame_len - delay])) echo_negative = - echo_amplitude \ - * np.concatenate((np.zeros(delay + de), - frame[0:frame_len - delay - de])) + * np.concatenate((np.zeros(delay + neg_delay), + frame[0:frame_len - delay - neg_delay])) echo_forward = echo_amplitude \ * np.concatenate((frame[delay:frame_len], np.zeros(delay))) @@ -119,7 +119,7 @@ def extract(self, embed_filename, len_wm_bits): algo_type = self.algo_type frame_len = self.frame_len overlap = self.overlap - NEGATIVE_DELAY = self.NEGATIVE_DELAY + neg_delay = self.neg_delay delay11, delay10, delay01, delay00 = self.delay11, self.delay10, self.delay01, self.delay00 log_floor = 0.00001 # 取对数时的最小值 @@ -149,7 +149,7 @@ def extract(self, embed_filename, len_wm_bits): detected_bit1 = np.zeros(embed_nbit) for i in range(embed_nbit): wmarked_frame1 = eval_signal1[pointer: pointer + frame_len] - ceps1 = np.fft.ifft( + ceps1 = ifft( np.log(np.square(np.fft.fft(wmarked_frame1)) + log_floor)).real if secret_key[i] == 1: @@ -159,8 +159,8 @@ def extract(self, embed_filename, len_wm_bits): else: detected_bit1[i] = 0 elif algo_type == 2: - if (ceps1[delay11] - ceps1[delay11 + NEGATIVE_DELAY]) > \ - (ceps1[delay10] - ceps1[delay10 + NEGATIVE_DELAY]): + if (ceps1[delay11] - ceps1[delay11 + neg_delay]) > \ + (ceps1[delay10] - ceps1[delay10 + neg_delay]): detected_bit1[i] = 1 else: detected_bit1[i] = 0 @@ -177,8 +177,8 @@ def extract(self, embed_filename, len_wm_bits): else: detected_bit1[i] = 0 elif algo_type == 2: - if (ceps1[delay01] - ceps1[delay01 + NEGATIVE_DELAY]) > \ - (ceps1[delay00] - ceps1[delay00 + NEGATIVE_DELAY]): + if (ceps1[delay01] - ceps1[delay01 + neg_delay]) > \ + (ceps1[delay00] - ceps1[delay00 + neg_delay]): detected_bit1[i] = 1 else: detected_bit1[i] = 0 @@ -206,18 +206,18 @@ def extract(self, embed_filename, len_wm_bits): return wmark_recovered1 -def get_error_rate(wmark_recovered, wm_bits): +def get_error_rate(wm_extract, wm_bits): # 计算错误率 len_wm_bits = len(wm_bits) - error_num = np.sum(np.abs(wmark_recovered - wm_bits)) + error_num = np.sum(np.abs(wm_extract - wm_bits)) error_rate = error_num / len_wm_bits print(f'bit error rate = {error_rate:.2%} ({error_num} / {len_wm_bits})') return error_rate -def get_snr(wav_with_wm, orig_file): +def get_snr(file_with_wm, orig_file): sr, host_signal = wavfile.read(orig_file) - _, eval_signal1 = wavfile.read(wav_with_wm) + _, eval_signal1 = wavfile.read(file_with_wm) SNR = 10 * np.log10( np.sum(np.square(host_signal.astype(np.float32))) From c55e72c61b3277a57c80eeab592cb4705e69bfae Mon Sep 17 00:00:00 2001 From: guofei9987 Date: Sat, 10 Dec 2022 16:59:40 +0800 Subject: [PATCH 4/7] add: echo watermark for wav --- hide_info/echo_watermark.py | 70 ++++++++++++------------------------- 1 file changed, 22 insertions(+), 48 deletions(-) diff --git a/hide_info/echo_watermark.py b/hide_info/echo_watermark.py index 0f0b584..6e7e84f 100644 --- a/hide_info/echo_watermark.py +++ b/hide_info/echo_watermark.py @@ -29,7 +29,9 @@ def embed(self, origin_filename, wm_bits, embed_filename): echo_amplitude = self.echo_amplitude overlap = self.overlap neg_delay = self.neg_delay + delay11, delay10, delay01, delay00 = self.delay11, self.delay10, self.delay01, self.delay00 + delay_matrix = [[delay00, delay01], [delay10, delay11]] sr, host_signal = wavfile.read(origin_filename) signal_len = len(host_signal) @@ -66,19 +68,11 @@ def embed(self, origin_filename, wm_bits, embed_filename): pointer = 0 echoed_signal1 = np.zeros((frame_shift * embed_nbit)) prev1 = np.zeros(frame_len) + for i in range(embed_nbit): frame = host_signal[pointer: (pointer + frame_len)] - if secret_key_extended[i] == 1: - if wm_repeat[i] == 1: - delay = delay11 - else: - delay = delay10 - else: - if wm_repeat[i] == 1: - delay = delay01 - else: - delay = delay00 + delay = delay_matrix[secret_key_extended[i]][wm_repeat[i]] echo_positive = echo_amplitude \ * np.concatenate( @@ -105,7 +99,7 @@ def embed(self, origin_filename, wm_bits, embed_filename): echoed_frame1[overlap_length:frame_shift])) prev1 = echoed_frame1 - pointer = pointer + frame_shift + pointer += frame_shift echoed_signal1 = np.concatenate( (echoed_signal1, host_signal[len(echoed_signal1): signal_len])) @@ -153,57 +147,37 @@ def extract(self, embed_filename, len_wm_bits): np.log(np.square(np.fft.fft(wmarked_frame1)) + log_floor)).real if secret_key[i] == 1: - if algo_type == 1: - if ceps1[delay11] > ceps1[delay10]: - detected_bit1[i] = 1 - else: - detected_bit1[i] = 0 - elif algo_type == 2: - if (ceps1[delay11] - ceps1[delay11 + neg_delay]) > \ - (ceps1[delay10] - ceps1[delay10 + neg_delay]): - detected_bit1[i] = 1 - else: - detected_bit1[i] = 0 - else: # algo_type == 3 - if ceps1[delay11] > ceps1[delay10]: - detected_bit1[i] = 1 - else: - detected_bit1[i] = 0 - + delay0, delay1 = delay10, delay11 else: - if algo_type == 1: - if ceps1[delay01] > ceps1[delay00]: - detected_bit1[i] = 1 - else: - detected_bit1[i] = 0 - elif algo_type == 2: - if (ceps1[delay01] - ceps1[delay01 + neg_delay]) > \ - (ceps1[delay00] - ceps1[delay00 + neg_delay]): - detected_bit1[i] = 1 - else: - detected_bit1[i] = 0 - else: - if ceps1[delay01] > ceps1[delay00]: - detected_bit1[i] = 1 - else: - detected_bit1[i] = 0 + delay0, delay1 = delay00, delay01 + + if algo_type == 1: + if ceps1[delay1] > ceps1[delay0]: + detected_bit1[i] = 1 + elif algo_type == 2: + if (ceps1[delay1] - ceps1[delay1 + neg_delay]) > \ + (ceps1[delay0] - ceps1[delay0 + neg_delay]): + detected_bit1[i] = 1 + else: # algo_type == 3 + if ceps1[delay1] > ceps1[delay0]: + detected_bit1[i] = 1 pointer = pointer + frame_shift count = 0 - wmark_recovered1 = np.zeros(len_wm_bits) + wm_extract = np.zeros(len_wm_bits) for i in range(len_wm_bits): # 汇总比特值(按平均值) ave = np.sum(detected_bit1[count:count + n_repeat]) / n_repeat if ave >= 0.5: - wmark_recovered1[i] = 1 + wm_extract[i] = 1 else: - wmark_recovered1[i] = 0 + wm_extract[i] = 0 count = count + n_repeat - return wmark_recovered1 + return wm_extract def get_error_rate(wm_extract, wm_bits): From 70f361f075ef9e0ec3a98b02475160812de67ab3 Mon Sep 17 00:00:00 2001 From: guofei9987 Date: Sat, 10 Dec 2022 17:04:18 +0800 Subject: [PATCH 5/7] add: echo watermark for wav --- hide_info/echo_watermark.py | 55 ++++++++++++++++++------------------- 1 file changed, 27 insertions(+), 28 deletions(-) diff --git a/hide_info/echo_watermark.py b/hide_info/echo_watermark.py index 6e7e84f..b6ceccd 100644 --- a/hide_info/echo_watermark.py +++ b/hide_info/echo_watermark.py @@ -17,9 +17,9 @@ def __init__(self, pwd, algo_type=2, verbose=False): self.neg_delay = 4 # negative delay, for negative echo # 回声参数 - # keyword = 1 + # pwd[i] = 1 self.delay11, self.delay10 = 100, 110 - # keyword = 0 + # pwd[i] = 0 self.delay01, self.delay00 = 120, 130 def embed(self, origin_filename, wm_bits, embed_filename): @@ -30,11 +30,10 @@ def embed(self, origin_filename, wm_bits, embed_filename): overlap = self.overlap neg_delay = self.neg_delay - delay11, delay10, delay01, delay00 = self.delay11, self.delay10, self.delay01, self.delay00 - delay_matrix = [[delay00, delay01], [delay10, delay11]] + delay_matrix = [[self.delay00, self.delay01], [self.delay10, self.delay11]] - sr, host_signal = wavfile.read(origin_filename) - signal_len = len(host_signal) + sr, ori_signal = wavfile.read(origin_filename) + signal_len = len(ori_signal) # 帧的移动量 frame_shift = int(frame_len * (1 - overlap)) @@ -66,11 +65,11 @@ def embed(self, origin_filename, wm_bits, embed_filename): secret_key_extended = np.repeat(secret_key, n_repeat) pointer = 0 - echoed_signal1 = np.zeros((frame_shift * embed_nbit)) + echoed_signal = np.zeros((frame_shift * embed_nbit)) prev1 = np.zeros(frame_len) for i in range(embed_nbit): - frame = host_signal[pointer: (pointer + frame_len)] + frame = ori_signal[pointer: (pointer + frame_len)] delay = delay_matrix[secret_key_extended[i]][wm_repeat[i]] @@ -86,27 +85,27 @@ def embed(self, origin_filename, wm_bits, embed_filename): * np.concatenate((frame[delay:frame_len], np.zeros(delay))) if algo_type == 1: - echoed_frame1 = frame + echo_positive + echoed_frame = frame + echo_positive elif algo_type == 2: - echoed_frame1 = frame + echo_positive + echo_negative + echoed_frame = frame + echo_positive + echo_negative else: # algo_type == 3 - echoed_frame1 = frame + echo_positive + echo_forward + echoed_frame = frame + echo_positive + echo_forward - echoed_frame1 = echoed_frame1 * windows.hann(frame_len) - echoed_signal1[frame_shift * i: frame_shift * (i + 1)] = \ + echoed_frame = echoed_frame * windows.hann(frame_len) + echoed_signal[frame_shift * i: frame_shift * (i + 1)] = \ np.concatenate((prev1[frame_shift:frame_len] + - echoed_frame1[0:overlap_length], - echoed_frame1[overlap_length:frame_shift])) + echoed_frame[0:overlap_length], + echoed_frame[overlap_length:frame_shift])) - prev1 = echoed_frame1 + prev1 = echoed_frame pointer += frame_shift - echoed_signal1 = np.concatenate( - (echoed_signal1, host_signal[len(echoed_signal1): signal_len])) + echoed_signal = np.concatenate( + (echoed_signal, ori_signal[len(echoed_signal): signal_len])) # 将保存为wav格式 - echoed_signal1 = echoed_signal1.astype(np.int16) - wavfile.write(embed_filename, sr, echoed_signal1) + echoed_signal = echoed_signal.astype(np.int16) + wavfile.write(embed_filename, sr, echoed_signal) def extract(self, embed_filename, len_wm_bits): pwd = self.pwd @@ -118,8 +117,8 @@ def extract(self, embed_filename, len_wm_bits): log_floor = 0.00001 # 取对数时的最小值 # 打开已嵌入水印的音频文件 - _, eval_signal1 = wavfile.read(embed_filename) - signal_len = len(eval_signal1) + _, wm_signal = wavfile.read(embed_filename) + signal_len = len(wm_signal) frame_shift = int(frame_len * (1 - overlap)) embed_nbit_ = (signal_len - int(frame_len * overlap)) // frame_shift @@ -142,7 +141,7 @@ def extract(self, embed_filename, len_wm_bits): pointer = 0 detected_bit1 = np.zeros(embed_nbit) for i in range(embed_nbit): - wmarked_frame1 = eval_signal1[pointer: pointer + frame_len] + wmarked_frame1 = wm_signal[pointer: pointer + frame_len] ceps1 = ifft( np.log(np.square(np.fft.fft(wmarked_frame1)) + log_floor)).real @@ -190,12 +189,12 @@ def get_error_rate(wm_extract, wm_bits): def get_snr(file_with_wm, orig_file): - sr, host_signal = wavfile.read(orig_file) - _, eval_signal1 = wavfile.read(file_with_wm) + sr, ori_signal = wavfile.read(orig_file) + _, wm_signal = wavfile.read(file_with_wm) SNR = 10 * np.log10( - np.sum(np.square(host_signal.astype(np.float32))) - / np.sum(np.square(host_signal.astype(np.float32) - - eval_signal1.astype(np.float32)))) + np.sum(np.square(ori_signal.astype(np.float32))) + / np.sum(np.square(ori_signal.astype(np.float32) + - wm_signal.astype(np.float32)))) print(f'SNR = {SNR:.2f} dB') return SNR From 8471090752e6f669de20104e9fa9bf81de43f943 Mon Sep 17 00:00:00 2001 From: guofei9987 Date: Sat, 10 Dec 2022 17:22:59 +0800 Subject: [PATCH 6/7] add: echo watermark for wav --- example/example_echo_watermark.py | 6 +++--- hide_info/echo_watermark.py | 21 +++++++-------------- 2 files changed, 10 insertions(+), 17 deletions(-) diff --git a/example/example_echo_watermark.py b/example/example_echo_watermark.py index e58b942..a086669 100644 --- a/example/example_echo_watermark.py +++ b/example/example_echo_watermark.py @@ -4,7 +4,7 @@ ori_file = "sounds.wav" embedded_file = "sounds_with_watermark.wav" -wm_str = "回声水印算法测试用水印" +wm_str = "回声水印算法欢迎 star" wm_bits = utils.bytes2bin(wm_str.encode('utf-8')) @@ -18,7 +18,7 @@ echo_wm = EchoWatermark(pwd=111001) wm_extract = echo_wm.extract(embed_filename=embedded_file, len_wm_bits=len_wm_bits) -wm_str_extract = utils.bin2bytes(wm_extract).decode('utf-8') +wm_str_extract = utils.bin2bytes(wm_extract).decode('utf-8', errors='replace') print("解出水印:", wm_str_extract) get_error_rate(wm_extract, wm_bits) @@ -32,4 +32,4 @@ wm_extract = echo_wm.extract(embed_filename=embedded_file, len_wm_bits=len_wm_bits) error_rate = get_error_rate(wm_extract, wm_bits) get_snr(embedded_file, ori_file) - assert error_rate < 0.01 + assert error_rate <= 0.01 diff --git a/hide_info/echo_watermark.py b/hide_info/echo_watermark.py index b6ceccd..1838bd3 100644 --- a/hide_info/echo_watermark.py +++ b/hide_info/echo_watermark.py @@ -23,8 +23,6 @@ def __init__(self, pwd, algo_type=2, verbose=False): self.delay01, self.delay00 = 120, 130 def embed(self, origin_filename, wm_bits, embed_filename): - pwd = self.pwd - algo_type = self.algo_type frame_len = self.frame_len echo_amplitude = self.echo_amplitude overlap = self.overlap @@ -60,7 +58,7 @@ def embed(self, origin_filename, wm_bits, embed_filename): wm_repeat = np.repeat(wm_bits, n_repeat) # 生成密钥 - np.random.seed(pwd) + np.random.seed(self.pwd) secret_key = np.random.randint(2, size=int(len_wm_bits)) secret_key_extended = np.repeat(secret_key, n_repeat) @@ -84,9 +82,9 @@ def embed(self, origin_filename, wm_bits, embed_filename): echo_forward = echo_amplitude \ * np.concatenate((frame[delay:frame_len], np.zeros(delay))) - if algo_type == 1: + if self.algo_type == 1: echoed_frame = frame + echo_positive - elif algo_type == 2: + elif self.algo_type == 2: echoed_frame = frame + echo_positive + echo_negative else: # algo_type == 3 echoed_frame = frame + echo_positive + echo_forward @@ -100,16 +98,11 @@ def embed(self, origin_filename, wm_bits, embed_filename): prev1 = echoed_frame pointer += frame_shift - echoed_signal = np.concatenate( - (echoed_signal, ori_signal[len(echoed_signal): signal_len])) - + echoed_signal = np.concatenate((echoed_signal, ori_signal[len(echoed_signal):])).astype(np.int16) # 将保存为wav格式 - echoed_signal = echoed_signal.astype(np.int16) wavfile.write(embed_filename, sr, echoed_signal) def extract(self, embed_filename, len_wm_bits): - pwd = self.pwd - algo_type = self.algo_type frame_len = self.frame_len overlap = self.overlap neg_delay = self.neg_delay @@ -134,7 +127,7 @@ def extract(self, embed_filename, len_wm_bits): f"可以嵌入的总比特数为: {embed_nbit_},水印长度为{len_wm_bits},重复嵌入 {n_repeat} 次, 实际嵌入{embed_nbit}") # 加载密钥 - np.random.seed(pwd) + np.random.seed(self.pwd) secret_key = np.random.randint(2, size=int(len_wm_bits)) secret_key = np.repeat(secret_key, n_repeat) @@ -150,10 +143,10 @@ def extract(self, embed_filename, len_wm_bits): else: delay0, delay1 = delay00, delay01 - if algo_type == 1: + if self.algo_type == 1: if ceps1[delay1] > ceps1[delay0]: detected_bit1[i] = 1 - elif algo_type == 2: + elif self.algo_type == 2: if (ceps1[delay1] - ceps1[delay1 + neg_delay]) > \ (ceps1[delay0] - ceps1[delay0 + neg_delay]): detected_bit1[i] = 1 From 171566f009f6fe2d8d790c67d2f33fb5df7f125c Mon Sep 17 00:00:00 2001 From: guofei9987 Date: Sun, 10 Dec 2023 17:50:03 +0800 Subject: [PATCH 7/7] add: echo watermark for wav --- example/example_echo_watermark.py | 4 ++-- hide_info/echo_watermark.py | 30 +++++++++++++----------------- 2 files changed, 15 insertions(+), 19 deletions(-) diff --git a/example/example_echo_watermark.py b/example/example_echo_watermark.py index a086669..9eda6c6 100644 --- a/example/example_echo_watermark.py +++ b/example/example_echo_watermark.py @@ -4,7 +4,7 @@ ori_file = "sounds.wav" embedded_file = "sounds_with_watermark.wav" -wm_str = "回声水印算法欢迎 star" +wm_str = "回声水印算法,欢迎 star!" wm_bits = utils.bytes2bin(wm_str.encode('utf-8')) @@ -32,4 +32,4 @@ wm_extract = echo_wm.extract(embed_filename=embedded_file, len_wm_bits=len_wm_bits) error_rate = get_error_rate(wm_extract, wm_bits) get_snr(embedded_file, ori_file) - assert error_rate <= 0.01 + assert error_rate <= 0.03 diff --git a/hide_info/echo_watermark.py b/hide_info/echo_watermark.py index 1838bd3..d98c831 100644 --- a/hide_info/echo_watermark.py +++ b/hide_info/echo_watermark.py @@ -1,12 +1,12 @@ # 回声音频水印 import numpy as np from scipy.io import wavfile -from scipy.signal import windows -from numpy.fft import ifft +from scipy.signal.windows import hann +from numpy.fft import ifft, fft class EchoWatermark: - def __init__(self, pwd, algo_type=2, verbose=False): + def __init__(self, pwd, algo_type=3, verbose=False): self.pwd = pwd self.algo_type = algo_type self.verbose = verbose @@ -71,25 +71,21 @@ def embed(self, origin_filename, wm_bits, embed_filename): delay = delay_matrix[secret_key_extended[i]][wm_repeat[i]] - echo_positive = echo_amplitude \ - * np.concatenate( - (np.zeros(delay), frame[0:frame_len - delay])) + echo_positive = np.concatenate((np.zeros(delay), frame[0:frame_len - delay])) - echo_negative = - echo_amplitude \ - * np.concatenate((np.zeros(delay + neg_delay), + echo_negative = - np.concatenate((np.zeros(delay + neg_delay), frame[0:frame_len - delay - neg_delay])) - echo_forward = echo_amplitude \ - * np.concatenate((frame[delay:frame_len], np.zeros(delay))) + echo_forward = np.concatenate((frame[delay:frame_len], np.zeros(delay))) if self.algo_type == 1: - echoed_frame = frame + echo_positive + echoed_frame = frame + echo_amplitude * echo_positive elif self.algo_type == 2: - echoed_frame = frame + echo_positive + echo_negative + echoed_frame = frame + echo_amplitude * (echo_positive + echo_negative) else: # algo_type == 3 - echoed_frame = frame + echo_positive + echo_forward + echoed_frame = frame + echo_amplitude * (echo_positive + echo_forward) - echoed_frame = echoed_frame * windows.hann(frame_len) + echoed_frame = echoed_frame * hann(frame_len) echoed_signal[frame_shift * i: frame_shift * (i + 1)] = \ np.concatenate((prev1[frame_shift:frame_len] + echoed_frame[0:overlap_length], @@ -99,7 +95,7 @@ def embed(self, origin_filename, wm_bits, embed_filename): pointer += frame_shift echoed_signal = np.concatenate((echoed_signal, ori_signal[len(echoed_signal):])).astype(np.int16) - # 将保存为wav格式 + # 保存为wav格式 wavfile.write(embed_filename, sr, echoed_signal) def extract(self, embed_filename, len_wm_bits): @@ -136,7 +132,7 @@ def extract(self, embed_filename, len_wm_bits): for i in range(embed_nbit): wmarked_frame1 = wm_signal[pointer: pointer + frame_len] ceps1 = ifft( - np.log(np.square(np.fft.fft(wmarked_frame1)) + log_floor)).real + np.log(np.square(fft(wmarked_frame1)) + log_floor)).real if secret_key[i] == 1: delay0, delay1 = delay10, delay11 @@ -161,7 +157,7 @@ def extract(self, embed_filename, len_wm_bits): for i in range(len_wm_bits): # 汇总比特值(按平均值) - ave = np.sum(detected_bit1[count:count + n_repeat]) / n_repeat + ave = np.average(detected_bit1[count:count + n_repeat]) if ave >= 0.5: wm_extract[i] = 1 else: