-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathDynamicsModel.py
72 lines (53 loc) · 1.89 KB
/
DynamicsModel.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import numpy as np
from blitz.modules import BayesianLinear
from blitz.utils import variational_estimator
HIDDEN_SIZE = 64
REPLAY_SIZE = 100000
SAMPLE_SIZE = 2048
Experience = namedtuple(
"Experience",
field_names=["state", "action", "new_state"],
)
class ReplayBuffer:
"""Replay Buffer for storing past experiences allowing the agent to learn from them.
Args:
capacity: size of the buffer
"""
def __init__(self, capacity: int) -> None:
self.buffer = deque(maxlen=capacity)
def __len__(self) -> None:
return len(self.buffer)
def append(self, experience: Experience) -> None:
"""Add experience to the buffer.
Args:
experience: tuple (state, action, reward, done, new_state)
"""
self.buffer.append(experience)
def sample(self, batch_size: int) -> Tuple:
indices = np.random.choice(len(self.buffer), batch_size, replace=False)
states, actions, rewards, dones, next_states = zip(*(self.buffer[idx] for idx in indices))
return (
np.array(states),
np.array(actions),
np.array(rewards, dtype=np.float32),
np.array(dones, dtype=np.bool),
np.array(next_states),
)
class DynamicsModel(nn.Module):
def __init__(self, input_dim, output_dim):
super().__init__()
#self.linear = nn.Linear(input_dim, output_dim)
self.blinear1 = BayesianLinear(input_dim, HIDDEN_SIZE)
self.blinear2 = BayesianLinear(HIDDEN_SIZE, output_dim)
def forward(self, x):
x_ = self.blinear1(x)
x_ = F.relu(x_)
return self.blinear2(x_)
class Model():
def __init__(self, input_dim, output_dim):
self.net = DynamicsModel(input_dim, output_dim)
self.buffer = ReplayBuffer(REPLAY_SIZE)