Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

echo watermark #1

Merged
merged 7 commits into from
Dec 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 10 additions & 9 deletions example/example_echo_watermark.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,33 +2,34 @@
from hide_info.echo_watermark import EchoWatermark, get_error_rate, get_snr
from hide_info import utils

ori_file = "音乐.wav"
embedded_file = "打入水印_音乐.wav" # 嵌入了水印的文件
wm_str = "回声水印算法测试用水印"
ori_file = "sounds.wav"
embedded_file = "sounds_with_watermark.wav"
wm_str = "回声水印算法,欢迎 star!"

wm_bits = [int(i) for i in utils.bytes2bin(wm_str.encode('utf-8'))]
wm_bits = utils.bytes2bin(wm_str.encode('utf-8'))

len_wm_bits = len(wm_bits)

# embed:
echo_wm = EchoWatermark(pwd=111001, algo_type=2)
echo_wm = EchoWatermark(pwd=111001)
echo_wm.embed(origin_filename=ori_file, wm_bits=wm_bits, embed_filename=embedded_file)

# extract:
echo_wm = EchoWatermark(pwd=111001, algo_type=2)
echo_wm = EchoWatermark(pwd=111001)
wm_extract = echo_wm.extract(embed_filename=embedded_file, len_wm_bits=len_wm_bits)

wm_str_extract = utils.bin2bytes(''.join([str(int(i)) for i in wm_extract])).decode('utf-8')
wm_str_extract = utils.bin2bytes(wm_extract).decode('utf-8', errors='replace')
print("解出水印:", wm_str_extract)
get_error_rate(wm_extract, wm_bits)

# %%
# %% There are 3 algorithms:
wm_bits = np.random.randint(2, size=200).tolist()
len_wm_bits = len(wm_bits)

for algo_type in [1, 2, 3]:
echo_wm = EchoWatermark(pwd=111001, algo_type=algo_type)
echo_wm.embed(origin_filename=ori_file, wm_bits=wm_bits, embed_filename=embedded_file)
wm_extract = echo_wm.extract(embed_filename=embedded_file, len_wm_bits=len_wm_bits)
get_error_rate(wm_extract, wm_bits)
error_rate = get_error_rate(wm_extract, wm_bits)
get_snr(embedded_file, ori_file)
assert error_rate <= 0.03
4 changes: 2 additions & 2 deletions example/example_hide_in_music.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,15 @@

text = "待嵌入到音乐文件的文本,下面的代码中,会把这段文本以二进制形式隐藏到一个音乐文件中"

hide_in_music.encode(text.encode('utf-8'), music_filename="音乐.wav", music_filename_new="藏文于音.wav")
hide_in_music.encode(text.encode('utf-8'), music_filename="sounds.wav", music_filename_new="藏文于音.wav")

text_encode = hide_in_music.decode(music_filename="藏文于音.wav")

print(text_encode.decode('utf-8'))

# %%
# 把文件隐藏到某个音乐中
hide_in_music.file_encode(filename='要隐藏的文件.zip', music_filename="音乐.wav", music_filename_new="藏物于音.wav")
hide_in_music.file_encode(filename='要隐藏的文件.zip', music_filename="sounds.wav", music_filename_new="藏物于音.wav")
# 从音乐中提取文件
hide_in_music.file_decode(filename="藏物于音-解出的文件.zip", music_filename="藏物于音.wav")

Expand Down
File renamed without changes.
230 changes: 92 additions & 138 deletions hide_info/echo_watermark.py
Original file line number Diff line number Diff line change
@@ -1,45 +1,43 @@
# 回声音频水印
import numpy as np
from scipy.io import wavfile
from scipy.signal import windows
from scipy.signal.windows import hann
from numpy.fft import ifft, fft


class EchoWatermark:
def __init__(self, pwd, algo_type=2, verbose=False):
def __init__(self, pwd, algo_type=3, verbose=False):
self.pwd = pwd
self.algo_type = algo_type
self.verbose = verbose

self.FRAME_LENGTH = 2048 # 帧长度
self.CONTROL_STRENGTH = 0.2 # 嵌入强度
self.OVERLAP = 0.5 # 帧分析的重叠率
self.NEGATIVE_DELAY = 4 # negative delay, for negative echo
self.LOG_FLOOR = 0.00001
self.frame_len = 2048 # 帧长度
self.echo_amplitude = 0.2 # 回声幅度
self.overlap = 0.5 # 帧分析的重叠率
self.neg_delay = 4 # negative delay, for negative echo

# 回声参数
# for key 1
# pwd[i] = 1
self.delay11, self.delay10 = 100, 110
# for key 0
# pwd[i] = 0
self.delay01, self.delay00 = 120, 130

def embed(self, origin_filename,wm_bits, embed_filename):
pwd = self.pwd
algo_type = self.algo_type
FRAME_LENGTH = self.FRAME_LENGTH
CONTROL_STRENGTH = self.CONTROL_STRENGTH
OVERLAP = self.OVERLAP
NEGATIVE_DELAY = self.NEGATIVE_DELAY
LOG_FLOOR = self.LOG_FLOOR
delay11, delay10, delay01, delay00 = self.delay11, self.delay10, self.delay01, self.delay00
def embed(self, origin_filename, wm_bits, embed_filename):
frame_len = self.frame_len
echo_amplitude = self.echo_amplitude
overlap = self.overlap
neg_delay = self.neg_delay

delay_matrix = [[self.delay00, self.delay01], [self.delay10, self.delay11]]

sr, host_signal = wavfile.read(origin_filename)
signal_len = len(host_signal)
sr, ori_signal = wavfile.read(origin_filename)
signal_len = len(ori_signal)

# 帧的移动量
frame_shift = int(FRAME_LENGTH * (1 - OVERLAP))
frame_shift = int(frame_len * (1 - overlap))

# 和相邻帧的重叠长度
overlap_length = int(FRAME_LENGTH * OVERLAP)
overlap_length = int(frame_len * overlap)

# 可嵌入总比特数
embed_nbit_ = (signal_len - overlap_length) // frame_shift
Expand All @@ -57,81 +55,62 @@ def embed(self, origin_filename,wm_bits, embed_filename):
f"可以嵌入的总比特数为: {embed_nbit_},水印长度为{len(wm_bits)},重复嵌入 {n_repeat} 次, 实际嵌入{embed_nbit}")

# 扩展水印信号
wmark_extended = np.repeat(wm_bits, n_repeat)
wm_repeat = np.repeat(wm_bits, n_repeat)

# 生成密钥
np.random.seed(pwd)
np.random.seed(self.pwd)
secret_key = np.random.randint(2, size=int(len_wm_bits))
secret_key_extended = np.repeat(secret_key, n_repeat)

pointer = 0
echoed_signal1 = np.zeros((frame_shift * embed_nbit))
prev1 = np.zeros((FRAME_LENGTH))
de = NEGATIVE_DELAY #
echoed_signal = np.zeros((frame_shift * embed_nbit))
prev1 = np.zeros(frame_len)

for i in range(embed_nbit):
frame = host_signal[pointer: (pointer + FRAME_LENGTH)]
frame = ori_signal[pointer: (pointer + frame_len)]

if secret_key_extended[i] == 1:
if wmark_extended[i] == 1:
delay = delay11
else:
delay = delay10
else:
if wmark_extended[i] == 1:
delay = delay01
else:
delay = delay00

echo_positive = CONTROL_STRENGTH \
* np.concatenate((np.zeros(delay),
frame[0:FRAME_LENGTH - delay]))

echo_negative = - CONTROL_STRENGTH \
* np.concatenate((np.zeros(delay + de),
frame[0:FRAME_LENGTH - delay - de]))

echo_forward = CONTROL_STRENGTH \
* np.concatenate((frame[delay:FRAME_LENGTH], np.zeros(delay)))

if algo_type == 1:
echoed_frame1 = frame + echo_positive
elif algo_type == 2:
echoed_frame1 = frame + echo_positive + echo_negative
else: # algo_type == 3
echoed_frame1 = frame + echo_positive + echo_forward
delay = delay_matrix[secret_key_extended[i]][wm_repeat[i]]

echoed_frame1 = echoed_frame1 * windows.hann(FRAME_LENGTH)
echoed_signal1[frame_shift * i: frame_shift * (i + 1)] = \
np.concatenate((prev1[frame_shift:FRAME_LENGTH] +
echoed_frame1[0:overlap_length],
echoed_frame1[overlap_length:frame_shift]))
echo_positive = np.concatenate((np.zeros(delay), frame[0:frame_len - delay]))

prev1 = echoed_frame1
pointer = pointer + frame_shift
echo_negative = - np.concatenate((np.zeros(delay + neg_delay),
frame[0:frame_len - delay - neg_delay]))

echo_forward = np.concatenate((frame[delay:frame_len], np.zeros(delay)))

echoed_signal1 = np.concatenate(
(echoed_signal1, host_signal[len(echoed_signal1): signal_len]))
if self.algo_type == 1:
echoed_frame = frame + echo_amplitude * echo_positive
elif self.algo_type == 2:
echoed_frame = frame + echo_amplitude * (echo_positive + echo_negative)
else: # algo_type == 3
echoed_frame = frame + echo_amplitude * (echo_positive + echo_forward)

echoed_frame = echoed_frame * hann(frame_len)
echoed_signal[frame_shift * i: frame_shift * (i + 1)] = \
np.concatenate((prev1[frame_shift:frame_len] +
echoed_frame[0:overlap_length],
echoed_frame[overlap_length:frame_shift]))

# 将保存为wav格式
echoed_signal1 = echoed_signal1.astype(np.int16)
wavfile.write(embed_filename, sr, echoed_signal1)
prev1 = echoed_frame
pointer += frame_shift

echoed_signal = np.concatenate((echoed_signal, ori_signal[len(echoed_signal):])).astype(np.int16)
# 保存为wav格式
wavfile.write(embed_filename, sr, echoed_signal)

def extract(self, embed_filename, len_wm_bits):
pwd = self.pwd
algo_type = self.algo_type
FRAME_LENGTH = self.FRAME_LENGTH
CONTROL_STRENGTH = self.CONTROL_STRENGTH
OVERLAP = self.OVERLAP
NEGATIVE_DELAY = self.NEGATIVE_DELAY
LOG_FLOOR = self.LOG_FLOOR
frame_len = self.frame_len
overlap = self.overlap
neg_delay = self.neg_delay
delay11, delay10, delay01, delay00 = self.delay11, self.delay10, self.delay01, self.delay00
log_floor = 0.00001 # 取对数时的最小值

# 打开已嵌入水印的音频文件
_, eval_signal1 = wavfile.read(embed_filename)
signal_len = len(eval_signal1)
_, wm_signal = wavfile.read(embed_filename)
signal_len = len(wm_signal)

frame_shift = int(FRAME_LENGTH * (1 - OVERLAP))
embed_nbit_ = (signal_len - int(FRAME_LENGTH * OVERLAP)) // frame_shift
frame_shift = int(frame_len * (1 - overlap))
embed_nbit_ = (signal_len - int(frame_len * overlap)) // frame_shift

# 重复次数
n_repeat = embed_nbit_ // len_wm_bits
Expand All @@ -144,92 +123,67 @@ def extract(self, embed_filename, len_wm_bits):
f"可以嵌入的总比特数为: {embed_nbit_},水印长度为{len_wm_bits},重复嵌入 {n_repeat} 次, 实际嵌入{embed_nbit}")

# 加载密钥
np.random.seed(pwd)
np.random.seed(self.pwd)
secret_key = np.random.randint(2, size=int(len_wm_bits))
secret_key = np.repeat(secret_key, n_repeat)

pointer = 0
detected_bit1 = np.zeros(embed_nbit)
for i in range(embed_nbit):
wmarked_frame1 = eval_signal1[pointer: pointer + FRAME_LENGTH]
ceps1 = np.fft.ifft(
np.log(np.square(np.fft.fft(wmarked_frame1)) + LOG_FLOOR)).real
wmarked_frame1 = wm_signal[pointer: pointer + frame_len]
ceps1 = ifft(
np.log(np.square(fft(wmarked_frame1)) + log_floor)).real

if secret_key[i] == 1:
if algo_type == 1:
if ceps1[delay11] > ceps1[delay10]:
detected_bit1[i] = 1
else:
detected_bit1[i] = 0
elif algo_type == 2:
if (ceps1[delay11] - ceps1[delay11 + NEGATIVE_DELAY]) > \
(ceps1[delay10] - ceps1[delay10 + NEGATIVE_DELAY]):
detected_bit1[i] = 1
else:
detected_bit1[i] = 0
else: # algo_type == 3
if ceps1[delay11] > ceps1[delay10]:
detected_bit1[i] = 1
else:
detected_bit1[i] = 0

delay0, delay1 = delay10, delay11
else:
if algo_type == 1:
if ceps1[delay01] > ceps1[delay00]:
detected_bit1[i] = 1
else:
detected_bit1[i] = 0
elif algo_type == 2:
if (ceps1[delay01] - ceps1[delay01 + NEGATIVE_DELAY]) > \
(ceps1[delay00] - ceps1[delay00 + NEGATIVE_DELAY]):
detected_bit1[i] = 1
else:
detected_bit1[i] = 0
else:
if ceps1[delay01] > ceps1[delay00]:
detected_bit1[i] = 1
else:
detected_bit1[i] = 0
delay0, delay1 = delay00, delay01

if self.algo_type == 1:
if ceps1[delay1] > ceps1[delay0]:
detected_bit1[i] = 1
elif self.algo_type == 2:
if (ceps1[delay1] - ceps1[delay1 + neg_delay]) > \
(ceps1[delay0] - ceps1[delay0 + neg_delay]):
detected_bit1[i] = 1
else: # algo_type == 3
if ceps1[delay1] > ceps1[delay0]:
detected_bit1[i] = 1

pointer = pointer + frame_shift

count = 0
wmark_recovered1 = np.zeros(len_wm_bits)
# wmark_recovered2 = np.zeros(len_wm_bits)
# wmark_recovered3 = np.zeros(len_wm_bits)
wm_extract = np.zeros(len_wm_bits)

for i in range(len_wm_bits):

# 汇总比特值(平均值)
ave = np.sum(detected_bit1[count:count + n_repeat]) / n_repeat
# 汇总比特值(按平均值)
ave = np.average(detected_bit1[count:count + n_repeat])
if ave >= 0.5:
wmark_recovered1[i] = 1
wm_extract[i] = 1
else:
wmark_recovered1[i] = 0
wm_extract[i] = 0

count = count + n_repeat

return wmark_recovered1
return wm_extract


def get_error_rate(wmark_recovered, wm_bits):
def get_error_rate(wm_extract, wm_bits):
# 计算错误率
len_wm_bits = len(wm_bits)
denom = np.sum(np.abs(wmark_recovered - wm_bits))
BER = np.sum(np.abs(wmark_recovered - wm_bits)) / \
len_wm_bits * 100
print(f'bit error rate = {BER:.2f}% ({denom} / {len_wm_bits})')
return BER
error_num = np.sum(np.abs(wm_extract - wm_bits))
error_rate = error_num / len_wm_bits
print(f'bit error rate = {error_rate:.2%} ({error_num} / {len_wm_bits})')
return error_rate


def get_snr(wav_with_wm, orig_file):
sr, host_signal = wavfile.read(orig_file)
_, eval_signal1 = wavfile.read(wav_with_wm)
def get_snr(file_with_wm, orig_file):
sr, ori_signal = wavfile.read(orig_file)
_, wm_signal = wavfile.read(file_with_wm)

SNR = 10 * np.log10(
np.sum(np.square(host_signal.astype(np.float32)))
/ np.sum(np.square(host_signal.astype(np.float32)
- eval_signal1.astype(np.float32))))
np.sum(np.square(ori_signal.astype(np.float32)))
/ np.sum(np.square(ori_signal.astype(np.float32)
- wm_signal.astype(np.float32))))
print(f'SNR = {SNR:.2f} dB')
return SNR

Loading
Loading