Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TUKRの実装@德永 #17

Open
wants to merge 18 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -131,4 +131,5 @@ dmypy.json
# for Mac
*.DS_Store

.idea/
.idea/
Lecture_UKR/tokunaga/datastore/
37 changes: 0 additions & 37 deletions Lecture_TUKR/data_scratch.py

This file was deleted.

46 changes: 46 additions & 0 deletions Lecture_TUKR/tokunaga/cross_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
#sampleフォルダのukr_torchとクロステストするプログラム

import numpy as np
from Lecture_UKR.tokunaga.ukr import UKR as ukr
from Lecture_UKR.sample.ukr_torch import UKR as ukr_sample
from Lecture_UKR.tokunaga.data import create_kura

class TestUKR():
def test_UKR(self):
epoch = 100
sigma = 0.2
eta = 100
latent_dim = 2
nb_samples = 100
seed = 4
np.random.seed(seed)

X = create_kura(nb_samples)
Zinit = 0.2 * sigma * np.random.rand(nb_samples, latent_dim) - 0.1 * sigma

#サンプルプログラム
ukr_sam = ukr_sample(X, latent_dim, sigma, prior='random', Zinit=Zinit)
ukr_sam.fit(epoch, eta)

#作ったプログラム
ukr_mine = ukr(X, latent_dim, sigma, prior='random', Zinit=Zinit)
ukr_mine.fit(epoch, eta)

print("You are executing ", __file__)

np.testing.assert_allclose(
ukr_sam.history['z'],
ukr_mine.history['z'],
rtol=1e-05,
atol=1e-08)
np.testing.assert_allclose(
ukr_sam.history['f'],
ukr_mine.history['f'],
rtol=1e-05,
atol=1e-08)


if __name__ == "__main__":
test_ukr = TestUKR()
test_ukr.test_UKR()
#unittest.main()
67 changes: 67 additions & 0 deletions Lecture_TUKR/tokunaga/data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
import numpy as np
import matplotlib.pyplot as plt

def load_kura_tsom(xsamples, ysamples, missing_rate=None,retz=None):
z1 = np.linspace(-1, 1, xsamples)
z2 = np.linspace(-1, 1, ysamples)

#z1 = np.random.uniform(-1, 1, xsamples)
#z2 = np.random.uniform(-1, 1, ysamples)

z1_repeated, z2_repeated = np.meshgrid(z1, z2, indexing='ij')
x1 = z1_repeated
x2 = z2_repeated
x3 = 1 * (x1**2 - x2**2) + + np.random.uniform(-0.5, 0.5, xsamples*ysamples).reshape(xsamples, ysamples)
YukiTokunaga marked this conversation as resolved.
Show resolved Hide resolved
#ノイズを加えたい時はここをいじる,locがガウス分布の平均、scaleが分散,size何個ノイズを作るか
#このノイズを加えることによって三次元空間のデータ点は上下に動く

x = np.concatenate([x1[:, :, None], x2[:, :, None], x3[:, :, None]], axis=2)
truez = np.concatenate([x1[:, None], x2[:, None]], axis=2)

if missing_rate == 0 or missing_rate == None:
if retz:
return x, truez
else:
return x

def load_kura_list(xsamples, ysamples, missing_rate=None, retz=None):
z1 = np.random.uniform(-1, 1, xsamples)
z2 = np.random.uniform(-1, 1, ysamples)
z1_num = np.arange(xsamples)
z2_num = np.arange(ysamples)

z1_repeated, z2_repeated = np.meshgrid(z1, z2)
z1_num_repeated, z2_num_repeated = np.meshgrid(z1_num, z2_num)
x1 = z1_repeated.reshape(-1)
x2 = z2_repeated.reshape(-1)
x1_num = z1_num_repeated.reshape(-1)
x2_num = z2_num_repeated.reshape(-1)
x3 = x1**2 - x2**2
#ノイズを加えたい時はここをいじる,locがガウス分布の平均、scaleが分散,size何個ノイズを作るか
#このノイズを加えることによって三次元空間のデータ点は上下に動く

x = np.concatenate([x1[:, None], x2[:, None], x3[:, None]], axis=1)
x = x + np.random.normal(0, 0.2, (xsamples * ysamples, 3))
x_num = np.concatenate([x1_num[:, None], x2_num[:, None]], axis=1)

if missing_rate == 0 or missing_rate == None:
if retz:
return x
else:
return x, x_num

if __name__ == '__main__':
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D

xsamples = 10
ysamples = 15

#x = load_kura_tsom(xsamples, ysamples)
x = load_kura_list(xsamples, ysamples)
#print(plt.rcParams['image.cmap'])
fig = plt.figure(figsize=[5, 5])
ax_x = fig.add_subplot(projection='3d')
ax_x.scatter(x[:, 0], x[:, 1], x[:, 2])
ax_x.set_title('Generated three-dimensional data')
plt.show()
Binary file added Lecture_TUKR/tokunaga/iikanzi.gif
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added Lecture_TUKR/tokunaga/iikanzi.mp4
Binary file not shown.
57 changes: 57 additions & 0 deletions Lecture_TUKR/tokunaga/load.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import numpy as np
import matplotlib.pyplot as plt
import os
import cv2


def load_animal_data(retlabel_animal=True, retlabel_feature=False):
datastore_name = 'datastore/animal'
file_name = 'features.txt'

directory_path = os.path.join(os.path.dirname(__file__), datastore_name)
file_path = os.path.join(directory_path, file_name)

x = np.loadtxt(file_path)

return_objects = [x]

if retlabel_animal:
label_name = 'labels_animal.txt'
label_path = os.path.join(directory_path, label_name)
label_animal = np.genfromtxt(label_path, dtype=str)
return_objects.append(label_animal)

if retlabel_feature:
label_name = 'labels_feature.txt'
label_path = os.path.join(directory_path, label_name)
label_feature = np.genfromtxt(label_path, dtype=str)
return_objects.append(label_feature)

return return_objects

# def load_angle_resized_data():
# datastore_name = 'datastore/Angle_resized/'
# dir_list = os.listdir(datastore_name)
# #file_name = '/-5/A_01_-05.jpg'
#
# # directory_path = os.path.join(os.path.dirname(__file__), datastore_name)
# # file_path = os.path.join(directory_path, file_name)
#
# dir_name = dir_list[0]
# img = []
# file_list = os.listdir(datastore_name + dir_name)
# for file_name in file_list:
# image = cv2.imread(datastore_name + dir_name + '/' + file_name)
# img.append(image)
# # img = cv2.imread(datastore_name + file_name)
# #
# # print(img)
# # plt.imshow(img)
# # plt.show()
#
# return np.array(img)


if __name__ == '__main__':
img = load_angle_resized_data()
print(img)
157 changes: 157 additions & 0 deletions Lecture_TUKR/tokunaga/lukr.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
import numpy as np
from tqdm import tqdm #プログ
import numpy as np
from tqdm import tqdm
import jax
import jax.numpy as jnp
import matplotlib.pyplot as plt
from jax import jit

class List_UKR:
def __init__(self, X, X_num, nb_samles1, nb_samples2, latent_dim1, latent_dim2, sigma1, sigma2, prior='random', Uinit=None, Vinit=None):
#--------初期値を設定する.---------
self.X, X_num = X, X_num
#ここから下は書き換えてね
self.nb_samples1, self.nb_samples2 = nb_samles1, nb_samples2
self.ob_dim = X.shape[1]
self.sigma1, self.sigma2 = sigma1, sigma2
self.latent_dim1, self.latent_dim2 = latent_dim1, latent_dim2

if Uinit is None:
if prior == 'normal': #一様事前分布のとき
U = np.random.uniform(-0.01, 0.01, self.nb_samples1 * self.latent_dim1).reshape(self.nb_samples1, self.latent_dim1)
else: #ガウス事前分布のとき
U = np.random.normal(0, 0.001, self.nb_samples1 * self.latent_dim1).reshape(self.nb_samples1, self.latent_dim1)
else: #Zの初期値が与えられた時
U = Uinit

if Vinit is None:
if prior == 'normal':
V = np.random.uniform(-0.01, 0.01, self.nb_samples2 * self.latent_dim2).reshape(self.nb_samples2, self.latent_dim2)
else:
V = np.random.normal(0, 0.001, self.nb_samples2 * self.latent_dim2).reshape(self.nb_samples2, self.latent_dim2)
else:
V = Vinit

self.U = U
self.V = V
self.history = {}

def f(self, U, V): #写像の計算
NU = U[X_num[:, 0]]
NV = V[X_num[:, 1]]
du = jnp.sum((NU[:, None, :]-NU[None, :, :])**2, axis=2)
dv = jnp.sum((NV[:, None, :]-NV[None, :, :])**2, axis=2)
ku = jnp.exp(-1 * du / (2 * self.sigma1 ** 2))
kv = jnp.exp(-1 * dv / (2 * self.sigma2 ** 2))
f = ku * kv @ self.X / np.sum(ku * kv, axis=1, keepdims=True)
return f

def zetaf(self, X, zetaU, U, zetaV, V):
# NU = U[X_num[:, 0]]
# NV = V[X_num[:, 1]]
# zetaNU = zetaU[X_num[:, 0]]
# zetaNV = zetaV[X_num[:, 1]]
# du = jnp.sum((zetaNU[:, None, :] - NU[None, :, :]) ** 2, axis=2)
# dv = jnp.sum((zetaNV[:, None, :] - NV[None, :, :]) ** 2, axis=2)
# ku = jnp.exp(-1 * du / (2 * self.sigma1 ** 2))
# kv = jnp.exp(-1 * dv / (2 * self.sigma2 ** 2))
# f = ku * kv @ self.X / np.sum(ku * kv, axis=1, keepdims=True)
du = np.sum((zetaU[:, None, :] - U[None, :, :])**2, axis=2)
dv = np.sum((zetaV[:, None, :] - V[None, :, :])**2, axis=2)
ku = np.exp(-1 * du / (2 * self.sigma1 ** 2))
kv = np.exp(-1 * dv / (2 * self.sigma2 ** 2))
f = np.einsum('li,kj,ijd->lkd', ku, kv, X) / np.einsum('li,kj->lk', ku, kv)[:, :, None]
return f

def E(self, U, V, X, alpha=0.01, norm=2): #目的関数の計算
d = ((X-self.f(U, V))**2)/(self.nb_samples1*self.nb_samples2)
E = jnp.sum(d)+alpha*(jnp.sum(U**norm)+jnp.sum(V**norm))
return E

def fit(self, nb_epoch: int, ueta: float, veta: float):
# 学習過程記録用
self.history['u'] = np.zeros((nb_epoch, self.nb_samples1, self.latent_dim1))
self.history['v'] = np.zeros((nb_epoch, self.nb_samples2, self.latent_dim2))
self.history['f'] = np.zeros((nb_epoch, self.nb_samples1 * self.nb_samples2, self.ob_dim))
self.history['error'] = np.zeros(nb_epoch)

for epoch in tqdm(range(nb_epoch)):
# U, Vの更新
self.U = self.U - ueta*jax.grad(self.E, argnums=0)(self.U, self.V, self.X)/self.nb_samples2
self.V = self.V - veta*jax.grad(self.E, argnums=1)(self.U, self.V, self.X)/self.nb_samples1
# 学習過程記録用
self.history['u'][epoch] = self.U
self.history['v'][epoch] = self.V
self.history['f'][epoch] = self.f(self.U, self.V)
self.history['error'][epoch] = self.E(self.U, self.V, self.X)

#--------------以下描画用(上の部分が実装できたら実装してね)---------------------
def calc_approximate_f(self, resolution): #fのメッシュ描画用,resolution:一辺の代表点の数
nb_epoch = self.history['u'].shape[0]
self.history['y'] = np.zeros((nb_epoch, self.nb_samples1, self.nb_samples2, self.ob_dim))
zetaX = np.zeros((self.nb_samples1, self.nb_samples2, self.ob_dim))
for n in range(self.nb_samples1*nb_samples2):
zetaX[X_num[n, 0], X_num[n, 1], :] = X[n, :]
#self.history['y'] = np.zeros((nb_epoch, self.X.shape[0], self.ob_dim))
for epoch in tqdm(range(nb_epoch)):
zetau = [None, create_zeta_1D(self.history['u'][epoch]), create_zeta_2D(self.history['u'], resolution)][self.latent_dim1]
zetav = [None, create_zeta_1D(self.history['v'][epoch]), create_zeta_2D(self.history['v'], resolution)][self.latent_dim2]
Y = self.zetaf(zetaX, zetau, self.history['u'][epoch], zetav, self.history['v'][epoch])
self.history['y'][epoch] = Y
return self.history['y']

def create_zeta_2D(Z, resolution): #fのメッシュの描画用に潜在空間に代表点zetaを作る.
#XX, YY = np.meshgrid(np.linspace(-Z/400, Z/400, resolution), np.linspace(-Z/400, Z/400, resolution))
zmax = np.amax(Z, axis=0)
zmin = np.amin(Z, axis=0)
XX, YY = np.meshgrid(np.linspace(zmin[0], zmax[0], resolution), np.linspace(zmin[1], zmax[1], resolution))
xx = XX.reshape(-1)
yy = YY.reshape(-1)
zeta = np.concatenate([xx[:, None], yy[:, None]], axis=1)

return zeta

def create_zeta_1D(Z):
zmax = np.amax(Z)
zmin = np.amin(Z)
zeta = np.linspace(zmin, zmax, Z.shape[0]).reshape(-1, 1)

return zeta

if __name__ == '__main__':
from Lecture_TUKR.tokunaga.data import load_kura_tsom
from Lecture_TUKR.tokunaga.data import load_kura_list
#from Lecture_TUKR.tokunaga.load import load_angle_resized_data
from Lecture_TUKR.tokunaga.visualizer import visualize_history

#各種パラメータ変えて遊んでみてね.
epoch = 200 #学習回数
#sigma1 = 2 #uのカーネルの幅
#sigma2 = 3 #vのカーネル幅
ueta = 1 #uの学習率
veta = 1 #vの学習率
latent_dim1 = 1 #潜在空間1の次元
latent_dim2 = 1 #潜在空間2の次元

seed = 4
np.random.seed(seed)

#入力データ(詳しくはdata.pyを除いてみると良い)
nb_samples1 = 10 #潜在空間1のデータ数
nb_samples2 = 20 #潜在空間2のデータ数
sigma1 = np.log(nb_samples1)/nb_samples1
sigma2 = np.log(nb_samples2)/nb_samples2
X, X_num = load_kura_list(nb_samples1, nb_samples2) #record型の蔵型データ ob_dom=3, L=2
lukr = List_UKR(X, X_num, nb_samples1, nb_samples2, latent_dim1, latent_dim2, sigma1, sigma2, prior='normal')
#print(tukr.list_f(tukr.U, tukr.V))
lukr.fit(epoch, ueta, veta)
#visualize_real_history(load_data(), ukr.history['z'], ukr.history['error'], save_gif=True, filename="seed20")
#visualize_history(X, X_num, lukr.history['f'], lukr.history['u'], lukr.history['v'], lukr.history['error'], save_gif=False, filename="recode_iikanzi")

#----------描画部分が実装されたらコメントアウト外す----------
lukr.calc_approximate_f(resolution=10)
visualize_history(X, X_num, lukr.history['y'], lukr.history['u'], lukr.history['v'], lukr.history['error'], save_gif=True, filename="record_colormap_dekitenai")



Binary file added Lecture_TUKR/tokunaga/random.mp4
Binary file not shown.
Binary file added Lecture_TUKR/tokunaga/recode_iikanzi.mp4
Binary file not shown.
Binary file not shown.
Binary file added Lecture_TUKR/tokunaga/sigma05.mp4
Binary file not shown.
Binary file added Lecture_TUKR/tokunaga/tmp.mp4
Binary file not shown.
Loading