-
Notifications
You must be signed in to change notification settings - Fork 46
/
attentionRNN.py
167 lines (155 loc) · 7.73 KB
/
attentionRNN.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
import torch
from torch import nn
from torch.autograd import Variable
import torch.nn.functional as F
class EncoderRNN(nn.Module):
def __init__(self, input_size, embed_size, hidden_size, n_layers=1, dropout=0.5):
super(EncoderRNN, self).__init__()
self.input_size = input_size
self.hidden_size = hidden_size
self.embed_size = embed_size
self.n_layers = n_layers
self.dropout = dropout
self.embedding = nn.Embedding(input_size,embed_size)
self.gru = nn.GRU(embed_size, hidden_size, n_layers, dropout=self.dropout, bidirectional=True)
def forward(self, input_seqs, input_lengths, hidden=None):
'''
:param input_seqs:
Variable of shape (num_step(T),batch_size(B)), sorted decreasingly by lengths(for packing)
:param input:
list of sequence length
:param hidden:
initial state of GRU
:returns:
GRU outputs in shape (T,B,hidden_size(H))
last hidden stat of RNN(i.e. last output for GRU)
'''
embedded = self.embedding(input_seqs)
packed = torch.nn.utils.rnn.pack_padded_sequence(embedded, input_lengths)
outputs, hidden = self.gru(packed, hidden)
outputs, output_lengths = torch.nn.utils.rnn.pad_packed_sequence(outputs) # unpack (back to padded)
outputs = outputs[:, :, :self.hidden_size] + outputs[:, :, self.hidden_size:] # Sum bidirectional outputs
return outputs, hidden
class DynamicEncoder(nn.Module):
def __init__(self, input_size, embed_size, hidden_size, n_layers=1, dropout=0.5):
super().__init__()
self.input_size = input_size
self.hidden_size = hidden_size
self.embed_size = embed_size
self.n_layers = n_layers
self.dropout = dropout
self.embedding = nn.Embedding(input_size, embed_size)
self.gru = nn.GRU(embed_size, hidden_size, n_layers, bidirectional=True)
def forward(self, input_seqs, input_lens, hidden=None):
"""
forward procedure. **No need for inputs to be sorted**
:param input_seqs: Variable of [T,B]
:param hidden:
:param input_lens: *numpy array* of len for each input sequence
:return:
"""
batch_size = input_seqs.size(1)
embedded = self.embedding(input_seqs)
embedded = embedded.transpose(0, 1) # [B,T,E]
sort_idx = np.argsort(-input_lens)
unsort_idx = cuda_(torch.LongTensor(np.argsort(sort_idx)))
input_lens = input_lens[sort_idx]
sort_idx = cuda_(torch.LongTensor(sort_idx))
embedded = embedded[sort_idx].transpose(0, 1) # [T,B,E]
packed = torch.nn.utils.rnn.pack_padded_sequence(embedded, input_lens)
outputs, hidden = self.gru(packed, hidden)
outputs, _ = torch.nn.utils.rnn.pad_packed_sequence(outputs)
outputs = outputs[:, :, :self.hidden_size] + outputs[:, :, self.hidden_size:]
outputs = outputs.transpose(0, 1)[unsort_idx].transpose(0, 1).contiguous()
hidden = hidden.transpose(0, 1)[unsort_idx].transpose(0, 1).contiguous()
return outputs, hidden
class Attn(nn.Module):
def __init__(self, method, hidden_size):
super(Attn, self).__init__()
self.method = method
self.hidden_size = hidden_size
self.attn = nn.Linear(self.hidden_size * 2, hidden_size)
self.v = nn.Parameter(torch.rand(hidden_size))
stdv = 1. / math.sqrt(self.v.size(0))
self.v.data.normal_(mean=0, std=stdv)
def forward(self, hidden, encoder_outputs, src_len=None):
'''
:param hidden:
previous hidden state of the decoder, in shape (layers*directions,B,H)
:param encoder_outputs:
encoder outputs from Encoder, in shape (T,B,H)
:param src_len:
used for masking. NoneType or tensor in shape (B) indicating sequence length
:return
attention energies in shape (B,T)
'''
max_len = encoder_outputs.size(0)
this_batch_size = encoder_outputs.size(1)
H = hidden.repeat(max_len,1,1).transpose(0,1)
encoder_outputs = encoder_outputs.transpose(0,1) # [B*T*H]
attn_energies = self.score(H,encoder_outputs) # compute attention score
if src_len is not None:
mask = []
for b in range(src_len.size(0)):
mask.append([0] * src_len[b].item() + [1] * (encoder_outputs.size(1) - src_len[b].item()))
mask = cuda_(torch.ByteTensor(mask).unsqueeze(1)) # [B,1,T]
attn_energies = attn_energies.masked_fill(mask, -1e18)
return F.softmax(attn_energies).unsqueeze(1) # normalize with softmax
def score(self, hidden, encoder_outputs):
energy = F.tanh(self.attn(torch.cat([hidden, encoder_outputs], 2))) # [B*T*2H]->[B*T*H]
energy = energy.transpose(2,1) # [B*H*T]
v = self.v.repeat(encoder_outputs.data.shape[0],1).unsqueeze(1) #[B*1*H]
energy = torch.bmm(v,energy) # [B*1*T]
return energy.squeeze(1) #[B*T]
class BahdanauAttnDecoderRNN(nn.Module):
def __init__(self, hidden_size, embed_size, output_size, n_layers=1, dropout_p=0.1):
super(BahdanauAttnDecoderRNN, self).__init__()
# Define parameters
self.hidden_size = hidden_size
self.embed_size = embed_size
self.output_size = output_size
self.n_layers = n_layers
self.dropout_p = dropout_p
# Define layers
self.embedding = nn.Embedding(output_size, embed_size)
self.dropout = nn.Dropout(dropout_p)
self.attn = Attn('concat', hidden_size)
self.gru = nn.GRU(hidden_size + embed_size, hidden_size, n_layers, dropout=dropout_p)
#self.attn_combine = nn.Linear(hidden_size + embed_size, hidden_size)
self.out = nn.Linear(hidden_size, output_size)
def forward(self, word_input, last_hidden, encoder_outputs):
'''
:param word_input:
word input for current time step, in shape (B)
:param last_hidden:
last hidden stat of the decoder, in shape (layers*direction*B*H)
:param encoder_outputs:
encoder outputs in shape (T*B*H)
:return
decoder output
Note: we run this one step at a time i.e. you should use a outer loop
to process the whole sequence
Tip(update):
EncoderRNN may be bidirectional or have multiple layers, so the shape of hidden states can be
different from that of DecoderRNN
You may have to manually guarantee that they have the same dimension outside this function,
e.g, select the encoder hidden state of the foward/backward pass.
'''
# Get the embedding of the current input word (last output word)
word_embedded = self.embedding(word_input).view(1, word_input.size(0), -1) # (1,B,V)
word_embedded = self.dropout(word_embedded)
# Calculate attention weights and apply to encoder outputs
attn_weights = self.attn(last_hidden[-1], encoder_outputs)
context = attn_weights.bmm(encoder_outputs.transpose(0, 1)) # (B,1,V)
context = context.transpose(0, 1) # (1,B,V)
# Combine embedded input word and attended context, run through RNN
rnn_input = torch.cat((word_embedded, context), 2)
#rnn_input = self.attn_combine(rnn_input) # use it in case your size of rnn_input is different
output, hidden = self.gru(rnn_input, last_hidden)
output = output.squeeze(0) # (1,B,V)->(B,V)
# context = context.squeeze(0)
# update: "context" input before final layer can be problematic.
# output = F.log_softmax(self.out(torch.cat((output, context), 1)))
output = F.log_softmax(self.out(output))
# Return final output, hidden state
return output, hidden