Python keras.backend 模块,l2_normalize() 实例源码
我们从Python开源项目中,提取了以下36个代码示例,用于说明如何使用keras.backend.l2_normalize()。
def call(self, inputs):
x1 = inputs[0]
x2 = inputs[1]
if self.match_type in ['dot']:
if self.normalize:
x1 = K.l2_normalize(x1, axis=2)
x2 = K.l2_normalize(x2, axis=2)
output = K.tf.einsum('abd,acd->abc', x1, x2)
output = K.tf.expand_dims(output, 3)
elif self.match_type in ['mul', 'plus', 'minus']:
x1_exp = K.tf.stack([x1] * self.shape2[1], 2)
x2_exp = K.tf.stack([x2] * self.shape1[1], 1)
if self.match_type == 'mul':
output = x1_exp * x2_exp
elif self.match_type == 'plus':
output = x1_exp + x2_exp
elif self.match_type == 'minus':
output = x1_exp - x2_exp
elif self.match_type in ['concat']:
x1_exp = K.tf.stack([x1] * self.shape2[1], axis=2)
x2_exp = K.tf.stack([x2] * self.shape1[1], axis=1)
output = K.tf.concat([x1_exp, x2_exp], axis=3)
return output
def define_network(vector_size, loss):
base_model = InceptionV3(weights='imagenet', include_top=True)
for layer in base_model.layers: # Freeze layers in pretrained model
layer.trainable = False
# fully-connected layer to predict
x = Dense(4096, activation='relu', name='fc1')(base_model.layers[-2].output)
x = Dense(8096, name='fc2')(x)
x = Dropout(0.5)(x)
x = Dense(2048,activation='relu', name='fc3')(x)
predictions = Dense(vector_size, activation='relu')(x)
l2 = Lambda(lambda x: K.l2_normalize(x, axis=1))(predictions)
model = Model(inputs=base_model.inputs, outputs=l2)
optimizer = 'adam'
if loss == 'euclidean':
model.compile(optimizer = optimizer, loss = euclidean_distance)
else:
model.compile(optimizer = optimizer, loss = loss)
return model
def create_full_matching_layer_f(self, input_dim_a, input_dim_b):
"""Create a full-matching layer of a model."""
inp_a = Input(shape=(input_dim_a, self.hidden_dim,))
inp_b = Input(shape=(input_dim_b,))
W = []
for i in range(self.perspective_num):
wi = K.random_uniform_variable((1, self.hidden_dim), -1.0, 1.0,
seed=self.seed if self.seed is not None else 243)
W.append(wi)
val = np.concatenate((np.zeros((self.max_sequence_length-1,1)), np.ones((1,1))), axis=0)
kcon = K.constant(value=val, dtype='float32')
inp_b_perm = Lambda(lambda x: K.permute_dimensions(x, (0,2,1)))(inp_b)
last_state = Lambda(lambda x: K.permute_dimensions(K.dot(x, kcon),1)))(inp_b_perm)
m = []
for i in range(self.perspective_num):
outp_a = Lambda(lambda x: x * W[i])(inp_a)
outp_last = Lambda(lambda x: x * W[i])(last_state)
outp_a = Lambda(lambda x: K.l2_normalize(x, -1))(outp_a)
outp_last = Lambda(lambda x: K.l2_normalize(x, -1))(outp_last)
outp_last = Lambda(lambda x: K.permute_dimensions(x,1)))(outp_last)
outp = Lambda(lambda x: K.batch_dot(x[0], x[1], axes=[1, 2]))([outp_last, outp_a])
outp = Lambda(lambda x: K.permute_dimensions(x,1)))(outp)
m.append(outp)
if self.perspective_num > 1:
persp = Lambda(lambda x: K.concatenate(x, 2))(m)
else:
persp = m
model = Model(inputs=[inp_a, inp_b], outputs=persp)
return model
def create_full_matching_layer_b(self,
seed=self.seed if self.seed is not None else 243)
W.append(wi)
val = np.concatenate((np.ones((1, 1)), np.zeros((self.max_sequence_length - 1, 1))), 2, 1)))(inp_b)
last_state = Lambda(lambda x: K.permute_dimensions(K.dot(x, 1)))(inp_b_perm)
m = []
for i in range(self.perspective_num):
outp_a = Lambda(lambda x: x * W[i])(inp_a)
outp_last = Lambda(lambda x: x * W[i])(last_state)
outp_a = Lambda(lambda x: K.l2_normalize(x, 1)))(outp_last)
outp = Lambda(lambda x: K.batch_dot(x[0], 1)))(outp)
m.append(outp)
if self.perspective_num > 1:
persp = Lambda(lambda x: K.concatenate(x, outputs=persp)
return model
def create_maxpool_matching_layer(self, input_dim_b):
"""Create a maxpooling-matching layer of a model."""
inp_a = Input(shape=(input_dim_a,
seed=self.seed if self.seed is not None else 243)
W.append(wi)
m = []
for i in range(self.perspective_num):
outp_a = Lambda(lambda x: x * W[i])(inp_a)
outp_b = Lambda(lambda x: x * W[i])(inp_b)
outp_a = Lambda(lambda x: K.l2_normalize(x, -1))(outp_a)
outp_b = Lambda(lambda x: K.l2_normalize(x, -1))(outp_b)
outp_b = Lambda(lambda x: K.permute_dimensions(x,1)))(outp_b)
outp = Lambda(lambda x: K.batch_dot(x[0], 2]))([outp_b,1)))(outp)
outp = Lambda(lambda x: K.max(x, -1, keepdims=True))(outp)
m.append(outp)
if self.perspective_num > 1:
persp = Lambda(lambda x: K.concatenate(x, outputs=persp)
return model
def create_maxatt_matching_layer(self, input_dim_b):
"""Create a max-attentive-matching layer of a model."""
inp_a = Input(shape=(input_dim_a,))
W = []
for i in range(self.perspective_num):
wi = K.random_uniform_variable((1,
seed=self.seed if self.seed is not None else 243)
W.append(wi)
outp_a = Lambda(lambda x: K.l2_normalize(x, -1))(inp_a)
outp_b = Lambda(lambda x: K.l2_normalize(x, -1))(inp_b)
outp_b = Lambda(lambda x: K.permute_dimensions(x, 1)))(outp_b)
alpha = Lambda(lambda x: K.batch_dot(x[0], outp_a])
alpha = Lambda(lambda x: K.one_hot(K.argmax(x, 1), self.max_sequence_length))(alpha)
hmax = Lambda(lambda x: K.batch_dot(x[0], 2]))([alpha, outp_b])
m = []
for i in range(self.perspective_num):
outp_a = Lambda(lambda x: x * W[i])(inp_a)
outp_hmax = Lambda(lambda x: x * W[i])(hmax)
outp_a = Lambda(lambda x: K.l2_normalize(x, -1))(outp_a)
outp_hmax = Lambda(lambda x: K.l2_normalize(x, -1))(outp_hmax)
outp_hmax = Lambda(lambda x: K.permute_dimensions(x, 1)))(outp_hmax)
outp = Lambda(lambda x: K.batch_dot(x[0], 2]))([outp_hmax, outp_a])
val = np.eye(self.max_sequence_length)
kcon = K.constant(value=val, dtype='float32')
outp = Lambda(lambda x: K.sum(x * kcon, outputs=persp)
return model
def call(self, inputs):
x1 = inputs[0]
x2 = inputs[1]
if self.normalize:
x1 = K.l2_normalize(x1, axis=2)
x2 = K.l2_normalize(x2, axis=2)
output = K.tf.einsum('abd,fde,ace->afbc', self.M, x2)
return output
def call(self, x, mask=None):
x -= K.mean(x, axis=1, keepdims=True)
x = K.l2_normalize(x, axis=1)
pos = K.relu(x)
neg = K.relu(-x)
return K.concatenate([pos, neg], axis=1)
# global parameters
def call(self, inputs):
inputs -= K.mean(inputs, keepdims=True)
inputs = K.l2_normalize(inputs, axis=1)
pos = K.relu(inputs)
neg = K.relu(-inputs)
return K.concatenate([pos, axis=1)
# global parameters
def cosine(a, b):
"""Cosine similarity. Maximum is 1 (a == b),minimum is -1 (a == -b)."""
a = K.l2_normalize(a)
b = K.l2_normalize(b)
return 1 - K.mean(a * b, axis=-1)
def _initialize_centers(self, n_labels, output_dim):
trigger = Input(shape=(n_labels, ), name="trigger")
x = Dense(output_dim, activation='linear', name='dense')(trigger)
centers = Lambda(lambda x: K.l2_normalize(x, axis=-1),
output_shape=(output_dim,
name="centers")(x)
model = Model(inputs=trigger, outputs=centers)
model.compile(optimizer=SSMORMS3(), loss=self.loss)
return model
def call(self, mask=None):
# thanks to L2 normalization,mask actually has no effect
return K.l2_normalize(K.sum(x, axis=1), axis=-1)
def on_train_begin(self, logs=None):
# number of classes
n_classes = logs['n_classes']
if logs['restart']:
weights_h5 = self.WEIGHTS_H5.format(log_dir=logs['log_dir'],
epoch=logs['epoch'])
self.centers_ = keras.models.load_model(
weights_h5, custom_objects=CUSTOM_OBJECTS,
compile=True)
else:
# dimension of embedding space
output_dim = self.model.output_shape[-1]
# centers model
trigger = Input(shape=(n_classes, name="trigger")
x = Dense(output_dim, name='dense')(trigger)
centers = Lambda(lambda x: K.l2_normalize(x,
output_shape=(output_dim,
name="centers")(x)
self.centers_ = Model(inputs=trigger, outputs=centers)
self.centers_.compile(optimizer=SSMORMS3(),
loss=precomputed_gradient_loss)
# save list of classes
centers_txt = self.CENTERS_TXT.format(**logs)
with open(centers_txt, mode='w') as fp:
for label in logs['classes']:
fp.write('{label}\n'.format(label=label.decode('utf8')))
self.trigger_ = np.eye(n_classes)
self.fC_ = self.centers_.predict(self.trigger_).astype(self.float_autograd_)
def call(self, mask=None):
output = K.l2_normalize(x, self.axis)
output *= self.gamma
return output
def l2_normalize(x):
return K.l2_normalize(x, 0)
def squared_root_normalization(x):
"""
Squared root normalization for convolution layers` output
first apply global average pooling followed by squared root on all elements
then l2 normalize the vector
:param x: input tensor,output of convolution layer
:return:
"""
x = GlobalAveragePooling2D()(x)
#output shape = (None,nc)
# x = K.sqrt(x)
#x = K.l2_normalize(x,axis=0)
return x
def call(self, self.axis)
output *= self.gamma
return output
def get_model_3(params):
# metadata
inputs2 = Input(shape=(params["n_metafeatures"],))
x2 = Dropout(params["dropout_factor"])(inputs2)
if params["n_dense"] > 0:
dense2 = Dense(output_dim=params["n_dense"], init="uniform", activation='relu')
x2 = dense2(x2)
logging.debug("Output CNN: %s" % str(dense2.output_shape))
x2 = Dropout(params["dropout_factor"])(x2)
if params["n_dense_2"] > 0:
dense3 = Dense(output_dim=params["n_dense_2"], activation='relu')
x2 = dense3(x2)
logging.debug("Output CNN: %s" % str(dense3.output_shape))
x2 = Dropout(params["dropout_factor"])(x2)
dense4 = Dense(output_dim=params["n_out"], activation=params['final_activation'])
xout = dense4(x2)
logging.debug("Output CNN: %s" % str(dense4.output_shape))
if params['final_activation'] == 'linear':
reg = Lambda(lambda x :K.l2_normalize(x, axis=1))
xout = reg(xout)
model = Model(input=inputs2, output=xout)
return model
# Metadata 2 inputs,post-merge with dense layers
def get_model_32(params):
# metadata
inputs = Input(shape=(params["n_metafeatures"],))
reg = Lambda(lambda x :K.l2_normalize(x, axis=1))
x1 = reg(inputs)
inputs2 = Input(shape=(params["n_metafeatures2"],))
reg2 = Lambda(lambda x :K.l2_normalize(x, axis=1))
x2 = reg2(inputs2)
# merge
x = merge([x1, x2], mode='concat', concat_axis=1)
x = Dropout(params["dropout_factor"])(x)
if params['n_dense'] > 0:
dense2 = Dense(output_dim=params["n_dense"], activation='relu')
x = dense2(x)
logging.debug("Output CNN: %s" % str(dense2.output_shape))
dense4 = Dense(output_dim=params["n_out"], activation=params['final_activation'])
xout = dense4(x)
logging.debug("Output CNN: %s" % str(dense4.output_shape))
if params['final_activation'] == 'linear':
reg = Lambda(lambda x :K.l2_normalize(x, axis=1))
xout = reg(xout)
model = Model(input=[inputs,inputs2], output=xout)
return model
# Metadata 3 inputs,pre-merge and l2
def get_model_33(params):
# metadata
inputs = Input(shape=(params["n_metafeatures"], axis=1))
x2 = reg2(inputs2)
inputs3 = Input(shape=(params["n_metafeatures3"],))
reg3 = Lambda(lambda x :K.l2_normalize(x, axis=1))
x3 = reg3(inputs3)
# merge
x = merge([x1, x2, x3],inputs2,inputs3], output=xout)
return model
def call(self, axis=1)
#Result dictionary
def _cosine_distance(M, k):
# this is equation (6),or as I like to call it: The NaN factory.
# TODO: Find it in a library (keras cosine loss?)
# normalizing first as it is better conditioned.
nk = K.l2_normalize(k, axis=-1)
nM = K.l2_normalize(M, axis=-1)
cosine_distance = K.batch_dot(nM, nk)
# TODO: Do succesfull error handling
#cosine_distance_error_handling = tf.Print(cosine_distance,[cosine_distance],message="NaN occured in _cosine_distance")
#cosine_distance_error_handling = K.ones(cosine_distance_error_handling.shape)
#cosine_distance = tf.case({K.any(tf.is_nan(cosine_distance)) : (lambda: cosine_distance_error_handling)},
# default = lambda: cosine_distance,strict=True)
return cosine_distance
def call(self, self.axis)
output *= self.gamma
return output
def call(self, self.axis)
output *= self.gamma
return output
def cosine_similarity(x, y):
x = K.l2_normalize(x, axis=-1)
y = K.l2_normalize(y, axis=-1)
return K.sum(x * y, axis=-1)
def get_metric():
def return_metric():
import keras.backend as K
def cosine_proximity(y_true, y_pred):
y_true = K.l2_normalize(y_true, axis=-1)
y_pred = K.l2_normalize(y_pred, axis=-1)
return -K.mean(y_true * y_pred)
return cosine_proximity
return return_metric
def build_tpe(n_in, n_out, W_pca=None):
a = Input(shape=(n_in,))
p = Input(shape=(n_in,))
n = Input(shape=(n_in,))
if W_pca is None:
W_pca = np.zeros((n_in, n_out))
base_model = Sequential()
base_model.add(Dense(n_out, input_dim=n_in, bias=False, weights=[W_pca], activation='linear'))
base_model.add(Lambda(lambda x: K.l2_normalize(x, axis=1)))
# base_model = Sequential()
# base_model.add(Dense(178,input_dim=n_in,bias=True,activation='relu'))
# base_model.add(Dense(n_out,activation='tanh'))
# base_model.add(Lambda(lambda x: K.l2_normalize(x,axis=1)))
a_emb = base_model(a)
p_emb = base_model(p)
n_emb = base_model(n)
e = merge([a_emb, p_emb, n_emb], mode=triplet_merge, output_shape=triplet_merge_shape)
model = Model(input=[a, p, n], output=e)
predict = Model(input=a, output=a_emb)
model.compile(loss=triplet_loss, optimizer='rmsprop')
return model, predict
def create_att_matching_layer(self, input_dim_b):
"""Create an attentive-matching layer of a model."""
inp_a = Input(shape=(input_dim_a,))
w = []
for i in range(self.perspective_num):
wi = K.random_uniform_variable((1,
seed=self.seed if self.seed is not None else 243)
w.append(wi)
outp_a = Lambda(lambda x: K.l2_normalize(x, outp_a])
alpha = Lambda(lambda x: K.l2_normalize(x, 1))(alpha)
hmean = Lambda(lambda x: K.batch_dot(x[0], outp_b])
m = []
for i in range(self.perspective_num):
outp_a = Lambda(lambda x: x * w[i])(inp_a)
outp_hmean = Lambda(lambda x: x * w[i])(hmean)
outp_a = Lambda(lambda x: K.l2_normalize(x, -1))(outp_a)
outp_hmean = Lambda(lambda x: K.l2_normalize(x, -1))(outp_hmean)
outp_hmean = Lambda(lambda x: K.permute_dimensions(x, 1)))(outp_hmean)
outp = Lambda(lambda x: K.batch_dot(x[0], 2]))([outp_hmean, outputs=persp)
return model
def model2(weights_path=None):
'''
Creates a model by concatenating the features from lower layers
with high level convolution features for all aesthetic attributes along
with overall aesthetic score
:param weights_path: path of the weight file
:return: Keras model instance
This is the model used in the paper
'''
_input = Input(shape=(299, 299, 3))
resnet = ResNet50(include_top=False, weights='imagenet', input_tensor=_input)
activation_layers = []
layers = resnet.layers
for layer in layers:
# print layer.name,layer.input_shape,layer.output_shape
if 'activation' in layer.name:
activation_layers.append(layer)
activations = 0
activation_plus_squared_outputs = []
# Remove last activation layer so
# it can be used with spatial pooling layer if required
nlayers = len(activation_layers) - 1
for i in range(1, nlayers):
layer = activation_layers[i]
if layer.output_shape[-1] > activation_layers[i - 1].output_shape[-1]:
# print layer.name,layer.output_shape
activations += layer.output_shape[-1]
_out = Lambda(squared_root_normalization,
output_shape=squared_root_normalization_output_shape, name=layer.name + '_normalized')(layer.output)
activation_plus_squared_outputs.append(_out)
# print "sum of all activations should be {}".format(activations)
last_layer_output = GlobalAveragePooling2D()(activation_layers[-1].output)
# last_layer_output = Lambda(K.sqrt,output_shape=squared_root_normalization_output_shape)(last_layer_output)
last_layer_output = Lambda(l2_normalize, output_shape=l2_normalize_output_shape,
name=activation_layers[-1].name+'_normalized')(last_layer_output)
activation_plus_squared_outputs.append(last_layer_output)
merged = merge(activation_plus_squared_outputs, concat_axis=1)
merged = Lambda(l2_normalize, name='merge')(merged)
# output of model
outputs = []
attrs = ['BalacingElements', 'ColorHarmony', 'Content', 'DoF',
'Light', 'MotionBlur', 'Object', 'RuleOfThirds', 'VividColor']
for attribute in attrs:
outputs.append(Dense(1, init='glorot_uniform', activation='tanh', name=attribute)(merged))
non_negative_attrs = ['Repetition', 'Symmetry', 'score']
for attribute in non_negative_attrs:
outputs.append(Dense(1, activation='sigmoid', name=attribute)(merged))
model = Model(input=_input, output=outputs)
if weights_path:
model.load_weights(weights_path)
return model
def get_model_31(params):
# metadata
inputs = Input(shape=(params["n_metafeatures"],))
norm = BatchNormalization()
x = norm(inputs)
x = Dropout(params["dropout_factor"])(x)
dense = Dense(output_dim=params["n_dense"], activation='relu')
x = dense(x)
logging.debug("Output CNN: %s" % str(dense.output_shape))
x = Dropout(params["dropout_factor"])(x)
inputs2 = Input(shape=(params["n_metafeatures2"],))
norm2 = BatchNormalization()
x2 = norm2(inputs2)
x2 = Dropout(params["dropout_factor"])(x2)
dense2 = Dense(output_dim=params["n_dense"], activation='relu')
x2 = dense2(x2)
logging.debug("Output CNN: %s" % str(dense2.output_shape))
x2 = Dropout(params["dropout_factor"])(x2)
# merge
xout = merge([x, concat_axis=1)
dense4 = Dense(output_dim=params["n_out"], activation=params['final_activation'])
xout = dense4(xout)
logging.debug("Output CNN: %s" % str(dense4.output_shape))
if params['final_activation'] == 'linear':
reg = Lambda(lambda x :K.l2_normalize(x, output=xout)
return model
# Metadata 2 inputs,pre-merge and l2
def get_model_4(params):
embedding_weights = pickle.load(open(common.TRAINDATA_DIR+"/embedding_weights_w2v_%s.pk" % params['embeddings_suffix'],"rb"))
graph_in = Input(shape=(params['sequence_length'], params['embedding_dim']))
convs = []
for fsz in params['filter_sizes']:
conv = Convolution1D(nb_filter=params['num_filters'],
filter_length=fsz,
border_mode='valid',
activation='relu',
subsample_length=1)
x = conv(graph_in)
logging.debug("Filter size: %s" % fsz)
logging.debug("Output CNN: %s" % str(conv.output_shape))
pool = GlobalMaxPooling1D()
x = pool(x)
logging.debug("Output Pooling: %s" % str(pool.output_shape))
convs.append(x)
if len(params['filter_sizes'])>1:
merge = Merge(mode='concat')
out = merge(convs)
logging.debug("Merge: %s" % str(merge.output_shape))
else:
out = convs[0]
graph = Model(input=graph_in, output=out)
# main sequential model
model = Sequential()
if not params['model_variation']=='CNN-static':
model.add(Embedding(len(embedding_weights[0]), params['embedding_dim'], input_length=params['sequence_length'],
weights=embedding_weights))
model.add(Dropout(params['dropout_prob'][0], input_shape=(params['sequence_length'], params['embedding_dim'])))
model.add(graph)
model.add(Dense(params['n_dense']))
model.add(Dropout(params['dropout_prob'][1]))
model.add(Activation('relu'))
model.add(Dense(output_dim=params["n_out"], init="uniform"))
model.add(Activation(params['final_activation']))
logging.debug("Output CNN: %s" % str(model.output_shape))
if params['final_activation'] == 'linear':
model.add(Lambda(lambda x :K.l2_normalize(x, axis=1)))
return model
# word2vec ARCH with LSTM
def __prepare_model(self):
print('Build model...')
model = Sequential()
model.add(TimeDistributedDense(output_dim=self.hidden_cnt,
input_dim=self.input_dim,
input_length=self.input_length,
activation='sigmoid'))
# model.add(TimeDistributed(Dense(output_dim=self.hidden_cnt,
# input_dim=self.input_dim,
# input_length=self.input_length,
# activation='sigmoid')))
# my modification since import error from keras.layers.core import TimeDistributedMerge
# model.add(TimeDistributedMerge(mode='ave')) #comment by me
##################### my ref #########################################################
# # add a layer that returns the concatenation
# # of the positive part of the input and
# # the opposite of the negative part
#
# def antirectifier(x):
# x -= K.mean(x,axis=1,keepdims=True)
# x = K.l2_normalize(x,axis=1)
# pos = K.relu(x)
# neg = K.relu(-x)
# return K.concatenate([pos,neg],axis=1)
#
# def antirectifier_output_shape(input_shape):
# shape = list(input_shape)
# assert len(shape) == 2 # only valid for 2D tensors
# shape[-1] *= 2
# return tuple(shape)
#
# model.add(Lambda(antirectifier,output_shape=antirectifier_output_shape))
#############################################################################
model.add(Lambda(function=lambda x: K.mean(x,
output_shape=lambda shape: (shape[0],) + shape[2:]))
# model.add(Dropout(0.5))
model.add(Dropout(0.93755))
model.add(Dense(self.hidden_cnt, activation='tanh'))
model.add(Dense(self.output_dim, activation='softmax'))
# try using different optimizers and different optimizer configs
print('Compile model...')
# sgd = SGD(lr=0.1,decay=1e-6,momentum=0.9,nesterov=True)
# model.compile(loss='categorical_crossentropy',optimizer=sgd)
# return model
##my add
adagrad = keras.optimizers.Adagrad(lr=0.01, epsilon=1e-08, decay=0.0)
model.compile(loss='categorical_crossentropy', optimizer=adagrad)
return model
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。