-
Notifications
You must be signed in to change notification settings - Fork 0
/
symbolic_network.py
355 lines (298 loc) · 15.2 KB
/
symbolic_network.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
"""Contains the symbolic regression neural network architecture."""
import tensorflow as tf
from utils import functions
# Constants for L0 Regularization
BETA = 2 / 3
GAMMA = -0.1
ZETA = 1.1
EPSILON = 1e-6
class SymbolicLayer: # want to be able to merge layers and change the W accordingly
"""Neural network layer for symbolic regression where activation functions correspond to primitive functions.
Can take multi-input activation functions (like multiplication)"""
def __init__(self, funcs=None, initial_weight=None, variable=False, init_stddev=0.1):
"""
funcs: List of activation functions, using utils.functions
initial_weight: (Optional) Initial value for weight matrix
variable: Boolean of whether initial_weight is a variable or not
init_stddev: (Optional) if initial_weight isn't passed in, this is standard deviation of initial weight
"""
if funcs is None:
funcs = functions.default_func
self.initial_weight = initial_weight
self.W = None # Weight matrix
self.built = False # Boolean whether weights have been initialized
if self.initial_weight is not None: # use the given initial weight
with tf.name_scope("symbolic_layer"):
if not variable:
self.W = tf.Variable(self.initial_weight)
else:
self.W = self.initial_weight
self.built = True
self.output = None # Tensorflow tensor for layer output
self.init_stddev = init_stddev
self.n_funcs = len(funcs) # Number of activation functions (and number of layer outputs)
self.funcs = [func.tf for func in funcs] # Convert functions to list of Tensorflow functions
self.n_double = functions.count_double(funcs) # Number of activation functions that take 2 inputs
self.n_single = self.n_funcs - self.n_double # Number of activation functions that take 1 input
self.out_dim = self.n_funcs + self.n_double
def build(self, in_dim):
"""Initialize weight matrix"""
self.W = tf.compat.v1.Variable(tf.random_normal(shape=[in_dim, self.out_dim], stddev=self.init_stddev)) #different than v2
self.built = True
def __call__(self, x):
"""Multiply by weight matrix and apply activation units"""
with tf.name_scope("symbolic_layer"):
if not self.built:
self.build(x.shape[1].value) # First dimension is batch size
g = tf.matmul(x, self.W) # shape = (?, self.size)
self.output = []
in_i = 0 # input index
out_i = 0 # output index
# Apply functions with only a single input
while out_i < self.n_single:
self.output.append(self.funcs[out_i](g[:, in_i]))
in_i += 1
out_i += 1
# Apply functions that take 2 inputs and produce 1 output
while out_i < self.n_funcs:
self.output.append(self.funcs[out_i](g[:, in_i], g[:, in_i+1]))
in_i += 2
out_i += 1
self.output = tf.stack(self.output, axis=1)
return self.output # store a copy of these results, append them to the
def get_weight(self):
return self.W
class SymbolicLayerBias(SymbolicLayer):
"""SymbolicLayer with a bias term"""
def __init__(self, funcs=None, initial_weight=None, variable=False, init_stddev=0.1):
super().__init__(funcs, initial_weight, variable, init_stddev)
self.b = None
def build(self, in_dim):
super().build(in_dim)
self.b = tf.compat.v1.Variable(tf.ones(shape=self.n_funcs) * 0.01)
def __call__(self, x):
"""Multiply by weight matrix and apply activation units"""
super().__call__(x)
self.output += self.b
return self.output
class SymbolicNet:
"""Symbolic regression network with multiple layers. Produces one output."""
def __init__(self, symbolic_depth, funcs=None, initial_weights=None, initial_bias=None,
variable=False, init_stddev=0.1):
self.depth = symbolic_depth # Number of hidden layers
self.funcs = funcs
self.shape = (None, 1)
if initial_weights is not None:
self.symbolic_layers = [SymbolicLayer(funcs=funcs, initial_weight=initial_weights[i], variable=variable)
for i in range(self.depth)]
if not variable:
self.output_weight = tf.compat.v1.Variable(initial_weights[-1])
else:
self.output_weight = initial_weights[-1]
else:
# Each layer initializes its own weights
if isinstance(init_stddev, list):
self.symbolic_layers = [SymbolicLayer(funcs=funcs, init_stddev=init_stddev[i]) for i in range(self.depth)]
else:
self.symbolic_layers = [SymbolicLayer(funcs=funcs, init_stddev=init_stddev) for _ in range(self.depth)]
# Initialize weights for last layer (without activation functions)
self.output_weight = tf.compat.v1.Variable(tf.random_uniform(shape=(self.symbolic_layers[-1].n_funcs, 1)))
def build(self, input_dim):
in_dim = input_dim
for i in range(self.depth):
self.symbolic_layers[i].build(in_dim)
in_dim = self.symbolic_layers[i].n_funcs
def __call__(self, input):
self.shape = (int(input.shape[1]), 1) # Dimensionality of the input
h = input
# Building hidden layers
for i in range(self.depth):
h = self.symbolic_layers[i](h)
# Final output (no activation units) of network
h = tf.matmul(h, self.output_weight)
return h
def get_weights(self):
"""Return list of weight matrices"""
# First part is iterating over hidden weights. Then append the output weight.
return [self.symbolic_layers[i].W for i in range(self.depth)] + [self.output_weight]
class MaskedSymbolicNet(SymbolicNet):
"""Symbolic regression network where weights below a threshold are set to 0 and frozen. In other words, we apply
a mask to the symbolic network to fine-tune the non-zero weights."""
def __init__(self, sess, sr_unit, threshold=0.01):
# weights = sr_unit.get_weights()
# masked_weights = []
# for w_i in weights:
# # w_i = tf.where(tf.abs(w_i) < threshold, tf.zeros_like(w_i), w_i)
# # w_i = tf.where(tf.abs(w_i) < threshold, tf.stop_gradient(w_i), w_i)
# masked_weights.append(w_i)
weights = sr_unit.get_weights()
masked_weights = []
for w_i in weights:
mask = tf.constant(sess.run(tf.math.abs(w_i) > threshold), dtype=tf.float32)
masked_weights.append(tf.math.multiply(w_i, mask))
super().__init__(sr_unit.depth, funcs=sr_unit.funcs, initial_weights=masked_weights, variable=True)
self.sr_unit = sr_unit
class SymbolicLayerL0(SymbolicLayer):
def __init__(self, funcs=None, initial_weight=None, variable=False, init_stddev=0.1,
bias=False, droprate_init=0.5, lamba=1.):
super().__init__(funcs, initial_weight, variable, init_stddev) #super computes thefunction itself
self.droprate_init = droprate_init if droprate_init != 0 else 0.5
self.use_bias = bias
self.lamba = lamba
self.bias = None
self.qz_log_alpha = None
self.in_dim = None
self.eps = None
def build(self, in_dim):
with tf.name_scope("symbolic_layer"):
self.in_dim = in_dim
if self.W is None:
self.W = tf.Variable(tf.random.normal(shape=[in_dim, self.out_dim], stddev=self.init_stddev))
if self.use_bias:
self.bias = tf.Variable(0.1*tf.ones((1, self.out_dim)))
self.qz_log_alpha = tf.Variable(tf.random.normal(shape=(in_dim, self.out_dim),
mean=tf.math.log(1-self.droprate_init) - tf.math.log(self.droprate_init),
stddev=1e-2))
def quantile_concrete(self, u):
"""Quantile, aka inverse CDF, of the 'stretched' concrete distribution"""
y = tf.sigmoid((tf.math.log(u) - tf.math.log(1.0-u) + self.qz_log_alpha) / BETA)
return y * (ZETA - GAMMA) + GAMMA
def sample_u(self, shape, reuse_u=False):
"""Uniform random numbers for concrete distribution"""
# print("Hello")
if self.eps is None or not reuse_u:
self.eps = tf.random.uniform(shape=shape, minval=EPSILON, maxval=1.0 - EPSILON)
return self.eps
def sample_z(self, batch_size, sample=True):
"""Use the hard concrete distribution as described in https://arxiv.org/abs/1712.01312"""
if sample:
eps = self.sample_u((batch_size, self.in_dim, self.out_dim))
z = self.quantile_concrete(eps)
return tf.clip_by_value(z, 0, 1)
else: # Mean of the hard concrete distribution
pi = tf.sigmoid(self.qz_log_alpha)
return tf.clip_by_value(pi * (ZETA - GAMMA) + GAMMA, clip_value_min=0.0, clip_value_max=1.0)
def get_z_mean(self):
"""Mean of the hard concrete distribution"""
pi = tf.sigmoid(self.qz_log_alpha)
return tf.clip_by_value(pi * (ZETA - GAMMA) + GAMMA, clip_value_min=0.0, clip_value_max=1.0)
def sample_weights(self, reuse_u=False):
z = self.quantile_concrete(self.sample_u((self.in_dim, self.out_dim), reuse_u=reuse_u))
mask = tf.clip_by_value(z, clip_value_min=0.0, clip_value_max=1.0)
return mask * self.W
def get_weight(self):
"""Deterministic value of weight based on mean of z"""
return self.W * self.get_z_mean()
def loss(self):
"""Regularization loss term"""
return tf.reduce_sum(tf.sigmoid(self.qz_log_alpha - BETA * tf.math.log(-GAMMA / ZETA)))
def __call__(self, x, sample=True, reuse_u=False):
"""Multiply by weight matrix and apply activation units""" #append on units from previous layers as well
with tf.name_scope("symbolic_layer"):
print("x") ##if x is already given a
print(x)
if self.W is None or self.qz_log_alpha is None:
self.build(x.shape[1])
if sample:
h = tf.matmul(x, self.sample_weights(reuse_u=reuse_u))
else:
print("correct")
w = self.get_weight()
h = tf.matmul(x, w)
if self.use_bias:
h = h + self.bias
# shape of h = (?, self.n_funcs)ow
self.output = []
# apply a different activation unit to each column of h
in_i = 0 # input index
out_i = 0 # output index
# Apply functions with only a single input
while out_i < self.n_single:
self.output.append(self.funcs[out_i](h[:, in_i]))
in_i += 1
out_i += 1
# Apply functions that take 2 inputs and produce 1 output
while out_i < self.n_funcs:
self.output.append(self.funcs[out_i](h[:, in_i], h[:, in_i+1]))
in_i += 2
out_i += 1
self.output = tf.stack(self.output, axis=1)
print(self.output) ##just a tensor
# try:
# self.output = tf.concat([x, self.output], 0)
# print(self.output)
# print("pass")
# return self.output
# except:
# print(self.output)
# return self.output
return self.output
class SymbolicNetL0(SymbolicNet): #skip connections go here
"""Symbolic regression network with multiple layers. Produces one output."""
def __init__(self, symbolic_depth, funcs=None, initial_weights=None, initial_bias=None,
variable=False, init_stddev=0.1):
super().__init__(symbolic_depth, funcs, initial_weights, initial_bias, variable, init_stddev)
if initial_weights is not None:
print("pass2")
self.symbolic_layers = [SymbolicLayerL0(funcs=funcs, initial_weight=initial_weights[i], variable=variable)
for i in range(self.depth)]
if not variable:
self.output_weight = tf.Variable(initial_weights[-1])
else:
self.output_weight = initial_weights[-1]
else:
# Each layer initializes its own weights
if isinstance(init_stddev, list):
self.symbolic_layers = [SymbolicLayerL0(funcs=funcs, init_stddev=init_stddev[i])
for i in range(self.depth)] #modify this to contain skip layers: connect output of one layer to the input of the next
else:
self.symbolic_layers = [SymbolicLayerL0(funcs=funcs, init_stddev=init_stddev)
for _ in range(self.depth)]
# Initialize weights for last layer (without activation functions)
self.output_weight = tf.Variable(tf.random_uniform(shape=(self.symbolic_layers[-1].n_funcs, 1)))
def __call__(self, input, sample=True, reuse_u=False):
self.shape = (int(input.shape[1]), 1) # Dimensionality of the input
# connect output from previous layer to input of next layer
h = input
saved_h = [h]
for i in range(self.depth):
if i==0 or i == self.depth-1:
h = self.symbolic_layers[i](h, sample=sample, reuse_u=reuse_u)
else:
h = self.symbolic_layers[i](h, sample=sample, reuse_u=reuse_u)
h = tf.concat([h, saved_h[-1]], 0) #concatenating layers together
saved_h.append(h)
# Final output (no activation units) of network
h = tf.matmul(h, self.output_weight)
return h
def get_loss(self):
return tf.reduce_sum([self.symbolic_layers[i].loss() for i in range(self.depth)])
def get_weights(self):
return self.get_symbolic_weights() + [self.get_output_weight()]
def get_symbolic_weights(self):
return [self.symbolic_layers[i].get_weight() for i in range(self.depth)]
def get_output_weight(self):
return self.output_weight
class SymbolicCell(tf.keras.layers.SimpleRNNCell):
"""cell for use with tf.keras.layers.RNN, allowing us to build a recurrent network with the EQL network.
This is used for the propagating decoder in the dynamics architecture.
Assume two state variables: position and velocity."""
state_size = 2
output_size = 2
def __init__(self, sym1, sym2):
# units: dimensionality of output space
super().__init__(units=self.output_size)
self.sym1 = sym1
self.sym2 = sym2
def call(self, inputs, state, training=None):
"""
Arguments:
inputs (at time t): shape (batch, feature)
state: [x], shape(x)=(batch, feature)
training: Ignore this
"""
full_input = state[0] + inputs[:, :2]
full_input = tf.concat([full_input, inputs[:, 2:4]], axis=1)
output = tf.concat([self.sym1(full_input), self.sym2(full_input)], axis=1)
next_state = output
return output, next_state