-
Notifications
You must be signed in to change notification settings - Fork 0
/
environments.py
77 lines (57 loc) · 2.09 KB
/
environments.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
import numpy as np
from scipy.stats import norm,t,gamma,ncx2,chi2,bernoulli
############## Environments for simulations ############
# Contextual Bandit
class ContextualBandit(object):
def __init__(self,g,Z,delta,sig=0.5):
self.g = g
self.Z = Z
self.delta = delta
self.df = len(delta)
self.sig = sig
def re_init(self):
pass
def pull(self,curr_t,a,size=1):
if isinstance(a,int):
# select action
# get reward
r = a*np.sum(self.Z[curr_t]*self.delta)+self.g[curr_t]+np.random.randn()*self.sig
return r
else:
# select action
r = a*np.repeat(np.dot(self.Z,self.delta)[np.newaxis,:],a.shape[0],0)+\
np.repeat(self.g[np.newaxis,:],a.shape[0],0)+\
np.random.randn(a.shape[0],a.shape[1])*self.sig
return r
# Mobile Health Simulator
class HeartStep(object):
def __init__(self,gamma,Z,delta,sig=30):
self.gamma = gamma
self.Z = Z
self.delta = delta
self.df = len(delta)
self.sig = sig
self.pre_eps = np.random.randn()*np.sqrt(2)
def re_init(self):
self.pre_eps = np.random.randn()*np.sqrt(2)
def pull(self,t_,a,size=1):
if isinstance(a,int):
# get eps
eps = self.pre_eps/np.sqrt(2)+np.random.randn()
# get reward
r = a*np.sum(self.Z[t_]*self.delta)+self.gamma[t_]+eps*self.sig[t_]/np.sqrt(2)
self.pre_eps = eps
return r
else:
T = a.shape[1]
N = a.shape[0]
e = np.random.randn(N,T)
# noise follows AR(1) process
eps = np.zeros([N,T+1])
eps[:,0] = np.random.randn(N)*np.sqrt(2)
for i in range(T):
eps[:,i+1] += (eps[:,i]/np.sqrt(2)+e[:,i])
eps = eps[:,1:]
r = a*np.repeat(np.dot(self.Z,self.delta)[np.newaxis,:],N,0)+\
np.repeat(self.gamma[np.newaxis,:],N,0)+eps*self.sig/np.sqrt(2)
return r