# Theano imports
import theano
import theano.tensor as T
from theano.tensor.nnet import conv2d as conv
import warnings
from theano.sandbox.rng_mrg import MRG_RandomStreams as RS
import scipy
# numpy and python classics
import numpy as np
import time
import joblib
import pprint
"""
This is the actual implementation of our convolutional RBM.
The class implements only contrastive divergence learning so far
but the number of runs for Gibbs Sampling can be varied.
Furthermore, the beforementioned implementation of probabilistic max pooling
computes probabilities for and samples of the hidden layer distribution.
"""
[docs]class CRBM:
"""CRBM class.
The class :class:`CRBM` implements functionality for
a *convolutional restricted Boltzmann machine* (cRBM) that
extracts redundant DNA sequence features from a provided set
of sequences.
The model can subsequently be used to study the sequence content
of (e.g. regulatory) sequences, by visualizing the features in terms
of sequence logos or in order to cluster the sequences based
on sequence content.
Parameters
-----------
num_motifs : int
Number of motifs.
motif_length : int
Motif length.
epochs : int
Number of epochs to train (Default: 100).
input_dims :int
Input dimensions aka alphabet size (Default: 4 for DNA).
doublestranded : bool
Single strand or both strands. If set to True,
both strands are scanned. (Default: True).
batchsize : int
Batch size (Default: 20).
learning_rate : float)
Learning rate (Default: 0.1).
momentum : float
Momentum term (Default: 0.95).
pooling : int
Pooling factor (not relevant for
cRBM, but for future work) (Default: 1).
cd_k : int
Number of Gibbs sampling iterations in
each persistent contrastive divergence step (Default: 5).
rho : float
Target frequency of motif occurrences (Default: 0.01).
lambda_rate : float
Sparsity enforcement aka penality term (Default: 0.1).
"""
def __init__(self, num_motifs, motif_length, epochs = 100, input_dims=4, \
doublestranded = True, batchsize = 20, learning_rate = 0.1, \
momentum = 0.95, pooling = 1, cd_k = 5,
rho = 0.01, lambda_rate = 0.1):
# sanity checks:
if num_motifs <= 0:
raise Exception("Number of motifs must be positive.")
if motif_length <= 0:
raise Exception("Motif length must be positive.")
if epochs < 0:
raise Exception("Epochs must be non-negative.")
if input_dims <= 0:
raise Exception("input_dims must be positive.")
elif input_dims != 4:
warnings.warn("input_dims != 4 was not comprehensively \
tested yet. Be careful when interpreting the results.",
UserWarning)
if batchsize <= 0:
raise Exception("batchsize must be positive.")
if learning_rate <= 0.0:
raise Exception("learning_rate must be positive.")
if not (momentum >= 0.0 and momentum < 1.):
raise Exception("momentum must be between zero and one.")
if pooling <= 0:
raise Exception("pooling must be positive.")
if cd_k <= 0:
raise Exception("cd_k must be positive.")
if not (rho >= 0.0 and rho < 1.):
raise Exception("rho must be between zero and one.")
if lambda_rate < 0.:
raise Exception("lambda_rate must be non-negative.")
# parameters for the motifs
self.num_motifs = num_motifs
self.motif_length = motif_length
self.input_dims = input_dims
self.doublestranded = doublestranded
self.batchsize = batchsize
self.learning_rate = learning_rate
self.momentum = momentum
self.rho = rho
self.lambda_rate = lambda_rate
self.pooling = pooling
self.cd_k = cd_k
self.epochs = epochs
self.spmethod = 'entropy'
self._gradientSparsityConstraint = \
self._gradientSparsityConstraintEntropy
x = np.random.randn(self.num_motifs,
1,
self.input_dims,
self.motif_length
).astype(theano.config.floatX)
self.motifs = theano.shared(value=x, name='W', borrow=True)
# determine the parameter rho for the model if not given
if not rho:
rho = 1. / (self.num_motifs * self.motif_length)
if self.doublestranded:
rho=rho/2.
self.rho = rho
# cRBM parameters (2*x to respect both strands of the DNA)
b = np.zeros((1, self.num_motifs)).astype(theano.config.floatX)
# adapt the bias such that it will initially have rho motif hits in H
# That is, we want to have rho percent of the samples positive
# randn draws from 'standard normal', this is why we have 0 and 1
b = b + scipy.stats.norm.ppf(self.rho, 0, np.sqrt(self.motif_length))
self.bias = theano.shared(value=b, name='bias', borrow=True)
c = np.zeros((1, self.input_dims)).astype(theano.config.floatX)
self.c = theano.shared(value=c, name='c', borrow=True)
# infrastructural parameters
self.theano_rng = RS(seed=int(time.time()))
self.rng_data_permut = theano.tensor.shared_randomstreams.RandomStreams()
self.motif_velocity = theano.shared(value=np.zeros(self.motifs.get_value().shape).astype(theano.config.floatX),
name='velocity_of_W',
borrow=True)
self.bias_velocity = theano.shared(value=np.zeros(b.shape).astype(theano.config.floatX),
name='velocity_of_bias',
borrow=True)
self.c_velocity = theano.shared(value=np.zeros(c.shape).astype(theano.config.floatX),
name='velocity_of_c',
borrow=True)
val = np.zeros((self.batchsize, self.num_motifs, 1, 200)).astype(theano.config.floatX)
self.fantasy_h = theano.shared(value=val, name='fantasy_h', borrow=True)
if self.doublestranded:
self.fantasy_h_prime = theano.shared(value=\
np.zeros((self.batchsize, self.num_motifs, 1, 200)).astype(theano.config.floatX), \
name='fantasy_h_prime', borrow=True)
self._compileTheanoFunctions()
[docs] def saveModel(self, filename):
"""Save the model parameters and additional hyper-parameters.
Parameters
-----------
filename : str
Pickle filename where the model parameters are stored.
"""
numpyParams = (self.motifs.get_value(),
self.bias.get_value(),
self.c.get_value())
hyperparams = ( self.num_motifs,
self.motif_length,
self.input_dims,
self.doublestranded,
self.batchsize,
self.learning_rate,
self.momentum,
self.rho,
self.lambda_rate,
self.pooling,
self.cd_k,
self.epochs, self.spmethod)
pickleObject = (numpyParams, hyperparams)
joblib.dump(pickleObject, filename, protocol= 2)
@classmethod
[docs] def loadModel(cls, filename):
"""Load a model from a given pickle file.
Parameters
-----------
filename : str
Pickle file containing the model parameters.
returns : :class:`CRBM` object
An instance of CRBM with reloaded parameters.
"""
numpyParams, hyperparams =joblib.load(filename)
(num_motifs, motif_length, input_dims, \
doublestranded, batchsize, learning_rate, \
momentum, rho, lambda_rate,
pooling, cd_k,
epochs, spmethod) = hyperparams
obj = cls(num_motifs, motif_length, epochs, input_dims, \
doublestranded, batchsize, learning_rate, \
momentum, pooling, cd_k,
rho, lambda_rate)
motifs, bias, c = numpyParams
obj.motifs.set_value(motifs)
obj.bias.set_value(bias)
obj.c.set_value(c)
return obj
def _bottomUpActivity(self, data, flip_motif=False):
"""Theano function for computing bottom up activity."""
out = conv(data, self.motifs, filter_flip=flip_motif)
out = out + self.bias.dimshuffle('x', 1, 0, 'x')
return out
def _bottomUpProbability(self,activities):
"""Theano function for computing bottom up Probability."""
pool = self.pooling
x = activities.reshape((activities.shape[0], \
activities.shape[1], activities.shape[2], \
activities.shape[3]//pool, pool))
norm=T.sum(1. +T.exp(x), axis=4,keepdims=True)
x=T.exp(x)/norm
x=x.reshape((activities.shape[0], \
activities.shape[1], activities.shape[2], \
activities.shape[3]))
return x
def _bottomUpSample(self,probs):
"""Theano function for bottom up sampling."""
pool = self.pooling
_probs=probs.reshape((probs.shape[0], probs.shape[1], probs.shape[2], probs.shape[3]//pool, pool))
_probs_reshape=_probs.reshape((_probs.shape[0]*_probs.shape[1]*_probs.shape[2]*_probs.shape[3],pool))
samples=self.theano_rng.multinomial(pvals=_probs_reshape)
samples=samples.reshape((probs.shape[0],probs.shape[1],probs.shape[2],probs.shape[3]))
return T.cast(samples,theano.config.floatX)
def _computeHgivenV(self, data, flip_motif=False):
"""Theano function for complete bottom up pass."""
activity=self._bottomUpActivity(data,flip_motif)
probability=self._bottomUpProbability(activity)
sample=self._bottomUpSample(probability)
return [probability, sample]
def _topDownActivity(self, h, hprime):
"""Theano function for top down activity."""
W = self.motifs.dimshuffle(1, 0, 2, 3)
C = conv(h, W, border_mode='full', filter_flip=True)
out = T.sum(C, axis=1, keepdims=True) # sum over all K
if hprime:
C = conv(hprime, W[:,:,::-1,::-1], \
border_mode='full', filter_flip=True)
out = out+ T.sum(C, axis=1, keepdims=True) # sum over all K
c_bc = self.c
c_bc = c_bc.dimshuffle('x', 0, 1, 'x')
activity = out + c_bc
return activity
def _topDownProbability(self, activity, softmaxdown = True):
"""Theano function for top down probability."""
if softmaxdown:
return self._softmax(activity)
else:
return 1./(1.-T.exp(-activity))
def _topDownSample(self, probability, softmaxdown = True):
"""Theano function for top down sample."""
if softmaxdown:
pV_ = probability.dimshuffle(0, 1, 3, 2).reshape( \
(probability.shape[0]*probability.shape[3],
probability.shape[2]))
V_ = self.theano_rng.multinomial(n=1, pvals=pV_).astype(
theano.config.floatX)
V = V_.reshape((probability.shape[0], 1, probability.shape[3],
probability.shape[2])).dimshuffle(0, 1, 3, 2)
else:
V=self.theano_rng.multinomial(n=1,\
pvals=probability).astype(theano.config.floatX)
return V
def _computeVgivenH(self, H_sample, H_sample_prime, softmaxdown=True):
"""Theano function for complete top down pass."""
activity = self._topDownActivity(H_sample, H_sample_prime)
prob = self._topDownProbability(activity, softmaxdown)
sample = self._topDownSample(prob, softmaxdown)
return [prob, sample]
def _collectVHStatistics(self, prob_of_H, data):
"""Theano function for collecting V*H statistics."""
# reshape input
data = data.dimshuffle(1, 0, 2, 3)
prob_of_H = prob_of_H.dimshuffle(1, 0, 2, 3)
avh = conv(data, prob_of_H, border_mode="valid", filter_flip=False)
avh = avh / T.prod(prob_of_H.shape[1:])
avh = avh.dimshuffle(1, 0, 2, 3).astype(theano.config.floatX)
return avh
def _collectVStatistics(self, data):
"""Theano function for collecting V statistics."""
# reshape input
a = T.mean(data, axis=(0, 1, 3)).astype(theano.config.floatX)
a = a.dimshuffle('x', 0)
a = T.inc_subtensor(a[:, :], a[:, ::-1]) # match a-t and c-g occurances
return a
def _collectHStatistics(self, data):
"""Theano function for collecting H statistics."""
# reshape input
a = T.mean(data, axis=(0, 2, 3)).astype(theano.config.floatX)
a = a.dimshuffle('x', 0)
return a
def _collectUpdateStatistics(self, prob_of_H, prob_of_H_prime, data):
"""Theano function for collecting the complete update statistics."""
average_VH = self._collectVHStatistics(prob_of_H, data)
average_H = self._collectHStatistics(prob_of_H)
if prob_of_H_prime:
average_VH_prime=self._collectVHStatistics(prob_of_H_prime, data)
average_H_prime=self._collectHStatistics(prob_of_H_prime)
average_VH=(average_VH+average_VH_prime[:,:,::-1,::-1])/2.
average_H=(average_H+average_H_prime)/2.
average_V = self._collectVStatistics(data)
return average_VH, average_H, average_V
def _updateWeightsOnMinibatch(self, D, gibbs_chain_length):
"""Theano function that defines an SGD update step with momentum."""
# calculate the data gradient for weights (motifs), bias and c
[prob_of_H_given_data,H_given_data] = self._computeHgivenV(D)
if self.doublestranded:
[prob_of_H_given_data_prime,H_given_data_prime] = \
self._computeHgivenV(D, True)
else:
[prob_of_H_given_data_prime,H_given_data_prime] = [None, None]
# calculate data gradients
G_motif_data, G_bias_data, G_c_data = \
self._collectUpdateStatistics(prob_of_H_given_data, \
prob_of_H_given_data_prime, D)
# calculate model probs
H_given_model = self.fantasy_h
if self.doublestranded:
H_given_model_prime = self.fantasy_h_prime
else:
H_given_model_prime = None
for i in range(gibbs_chain_length):
prob_of_V_given_model, V_given_model = \
self._computeVgivenH(H_given_model, H_given_model_prime)
#sample up
prob_of_H_given_model, H_given_model = \
self._computeHgivenV(V_given_model)
if self.doublestranded:
prob_of_H_given_model_prime, H_given_model_prime = \
self._computeHgivenV(V_given_model, True)
else:
prob_of_H_given_model_prime, H_given_model_prime = None, None
# compute the model gradients
G_motif_model, G_bias_model, G_c_model = \
self._collectUpdateStatistics(prob_of_H_given_model, \
prob_of_H_given_model_prime, V_given_model)
mu = self.momentum
alpha = self.learning_rate
sp = self.lambda_rate
reg_motif, reg_bias = self._gradientSparsityConstraint(D)
vmotifs = mu * self.motif_velocity + \
alpha * (G_motif_data - G_motif_model-sp*reg_motif)
vbias = mu * self.bias_velocity + \
alpha * (G_bias_data - G_bias_model-sp*reg_bias)
vc = mu*self.c_velocity + \
alpha * (G_c_data - G_c_model)
new_motifs = self.motifs + vmotifs
new_bias = self.bias + vbias
new_c = self.c + vc
updates = [(self.motifs, new_motifs), (self.bias, new_bias), (self.c, new_c),
(self.motif_velocity, vmotifs), (self.bias_velocity, vbias), (self.c_velocity, vc),
(self.fantasy_h, H_given_model)]
if self.doublestranded:
updates.append((self.fantasy_h_prime, H_given_model_prime))
return updates
def _gradientSparsityConstraintEntropy(self, data):
"""Theano function that defines the entropy-based sparsity constraint."""
# get expected[H|V]
[prob_of_H, _] = self._computeHgivenV(data)
q = self.rho
p = T.mean(prob_of_H, axis=(0, 2, 3))
gradKernels = - T.grad(T.mean(q*T.log(p) + (1-q)*T.log(1-p)),
self.motifs)
gradBias = - T.grad(T.mean(q*T.log(p) + (1-q)*T.log(1-p)),
self.bias)
return gradKernels, gradBias
def _compileTheanoFunctions(self):
"""This methods compiles all theano functions."""
print("Start compiling Theano training function...")
D = T.tensor4('data')
updates = self._updateWeightsOnMinibatch(D, self.cd_k)
self.theano_trainingFct = theano.function(
[D],
None,
updates=updates,
name='train_CRBM'
)
#compute mean free energy
mfe_ = self._meanFreeEnergy(D)
#compute number of motif hits
[_, H] = self._computeHgivenV(D)
#H = self.bottomUpProbability(self.bottomUpActivity(D))
nmh_=T.mean(H) # mean over samples (K x 1 x N_h)
#compute norm of the motif parameters
twn_=T.sqrt(T.mean(self.motifs**2))
#compute information content
pwm = self._softmax(self.motifs)
entropy = -pwm * T.log2(pwm)
entropy = T.sum(entropy, axis=2) # sum over letters
ic_= T.log2(self.motifs.shape[2]) - \
T.mean(entropy) # log is possible information due to length of sequence
medic_= T.log2(self.motifs.shape[2]) - \
T.mean(T.sort(entropy, axis=2)[:, :, entropy.shape[2] // 2])
self.theano_evaluateData = theano.function(
[D],
[mfe_, nmh_],
name='evaluationData'
)
W=T.tensor4("W")
self.theano_evaluateParams = theano.function(
[],
[twn_,ic_,medic_],
givens={W:self.motifs},
name='evaluationParams'
)
fed=self._freeEnergyForData(D)
self.theano_freeEnergy=theano.function( [D],fed,name='fe_per_datapoint')
fed=self._freeEnergyPerMotif(D)
self.theano_fePerMotif=theano.function( [D],fed,name='fe_per_motif')
if self.doublestranded:
self.theano_getHitProbs = theano.function([D], \
self._bottomUpProbability(self._bottomUpActivity(D)))
else:
self.theano_getHitProbs = theano.function([D], \
#self.bottomUpProbability( T.maximum(self.bottomUpActivity(D),
self._bottomUpProbability( self._bottomUpActivity(D) +
self._bottomUpActivity(D, True)))
print("Compilation of Theano training function finished")
def _evaluateData(self, data):
"""Evaluate performance on given numpy array.
This is used to monitor training progress.
"""
return self.theano_evaluateData(data)
def _trainingFct(self, data):
"""Train on mini-batch given numpy array."""
return self.theano_trainingFct(data)
def _evaluateParams(self):
"""Evaluate parameters.
This is used to monitor training progress.
"""
return self.theano_evaluateParams()
[docs] def motifHitProbs(self, data):
"""Motif match probabilities.
Parameters
-----------
data : numpy-array
4D numpy array representing a DNA sequence in one-hot encoding.
See :meth:`crbm.sequences.seqToOneHot`.
returns : numpy-array
Per-position motif match probabilities of all motifs as numpy array.
"""
return self.theano_getHitProbs(data)
[docs] def freeEnergy(self, data):
"""Free energy determined on the given dataset.
Parameters
-----------
data : numpy-array
4D numpy array representing a DNA sequence in one-hot encoding.
See :meth:`crbm.sequences.seqToOneHot`.
returns : numpy-array
Free energy per sequence.
"""
return self.theano_freeEnergy(data)
[docs] def fit(self, training_data, test_data = None):
"""Fits the cRBM to the provided training sequences.
Parameters
-----------
training_data : numpy-array
4D-Numpy array representing the training sequence in one-hot encoding.
See :meth:`crbm.sequences.seqToOneHot`.
test_data : numpy-array
4D-Numpy array representing the validation sequence in one-hot encoding.
If no test_data is provided, the training progress will be reported
on the training set itself. See :meth:`crbm.sequences.seqToOneHot`.
"""
# assert that pooling can be done without rest to the division
# compute sequence length
nseq=int((training_data.shape[3]-\
self.motif_length + 1)/\
self.pooling)*\
self.pooling+ \
self.motif_length -1
training_data=training_data[:,:,:,:nseq]
if type(test_data) != type(None):
nseq=int((test_data.shape[3]-\
self.motif_length + 1)/\
self.pooling)*\
self.pooling+ \
self.motif_length -1
test_data=test_data[:,:,:,:nseq]
else:
test_data = training_data
# some debug printing
numTrainingBatches = training_data.shape[0] / self.batchsize
numTestBatches = test_data.shape[0] / self.batchsize
print(("BatchSize: " + str(self.batchsize)))
print(("Num of iterations per epoch: " + str(numTrainingBatches)))
start = time.time()
# compile training function
# now perform training
print("Start training the model...")
starttime = time.time()
for epoch in range(self.epochs):
for [start,end] in self._iterateBatchIndices(\
training_data.shape[0],self.batchsize):
self._trainingFct(training_data[start:end,:,:,:])
meanfe=0.0
meannmh=0.0
nb=0
for [start,end] in self._iterateBatchIndices(\
test_data.shape[0],self.batchsize):
[mfe_,nmh_]=self._evaluateData(test_data[start:end,:,:,:])
meanfe=meanfe+mfe_
meannmh=meannmh+nmh_
nb=nb+1
[twn_,ic_,medic_]=self._evaluateParams()
#for batchIdx in range(numTestBatches):
print(("Epoch {:d}: ".format(epoch) + \
"FE={:1.3f} ".format(meanfe/nb) + \
"NumH={:1.4f} ".format(meannmh/nb) + \
"WNorm={:2.2f} ".format(float(twn_)) + \
"IC={:1.3f} medIC={:1.3f}".format(float(ic_), float(medic_))))
# done with training
print(("Training finished after: {:5.2f} seconds!".format(\
time.time()-starttime)))
def _meanFreeEnergy(self, D):
"""Theano function for computing the mean free energy."""
return T.sum(self._freeEnergyForData(D))/D.shape[0]
[docs] def getPFMs(self):
"""Returns the weight matrices converted to *position frequency matrices*.
Parameters
-----------
returns: numpy-array
List of position frequency matrices as numpy arrays.
"""
def softmax_(x):
x_exp = np.exp(x)
y = np.zeros(x.shape)
for i in range(x.shape[1]):
y[:,i] = x_exp[:,i] / np.sum(x_exp[:,i])
return y
return [ softmax_(m[0, :, :]) for m in self.motifs.get_value() ]
def _freeEnergyForData(self, D):
"""Theano function for computing the free energy (per position)."""
pool = self.pooling
x=self._bottomUpActivity(D)
x = x.reshape((x.shape[0], x.shape[1], x.shape[2], x.shape[3]//pool, pool))
free_energy = -T.sum(T.log(1.+T.sum(T.exp(x), axis=4)), axis=(1, 2, 3))
if self.doublestranded:
x=self._bottomUpActivity(D,True)
x = x.reshape((x.shape[0], x.shape[1], x.shape[2], x.shape[3]//pool, pool))
free_energy = free_energy -T.sum(T.log(1.+T.sum(T.exp(x), axis=4)), axis=(1, 2, 3))
cMod = self.c
cMod = cMod.dimshuffle('x', 0, 1, 'x') # make it 4D and broadcastable there
free_energy = free_energy - T.sum(D * cMod, axis=(1, 2, 3))
return free_energy/D.shape[3]
def _freeEnergyPerMotif(self, D):
"""Theano function for computing the free energy (per motif)."""
pool = self.pooling
x=self._bottomUpActivity(D)
x = x.reshape((x.shape[0], x.shape[1], x.shape[2], x.shape[3]//pool, pool))
free_energy = -T.sum(T.log(1.+T.sum(T.exp(x), axis=4)), axis=(2, 3))
if self.doublestranded:
x=self._bottomUpActivity(D,True)
x = x.reshape((x.shape[0], x.shape[1], x.shape[2], x.shape[3]//pool, pool))
free_energy = free_energy -T.sum(T.log(1.+T.sum(T.exp(x), axis=4)), axis=(2, 3))
cMod = self.c
cMod = cMod.dimshuffle('x', 0, 1, 'x') # make it 4D and broadcastable there
free_energy = free_energy - T.sum(D * cMod, axis=(1, 2, 3)).dimshuffle(0, 'x')
return free_energy
def _softmax(self, x):
"""Softmax operation."""
return T.exp(x) / T.exp(x).sum(axis=2, keepdims=True)
def __repr__(self):
st = "Parameters:\n\n"
st += "Number of motifs: {}\n".format(self.num_motifs)
st += "Motif length: {}\n".format(self.motif_length)
st += "\n"
st += "Hyper-parameters:\n\n"
st += "input dims: {:d}".format(self.input_dims)
st += "doublestranded: {}".format(self.doublestranded)
st += "batchsize: {:d}".format(self.batchsize)
st += "learning rate: {:1.3f}".format(self.learning_rate)
st += "momentum: {:1.3f}".format(self.momentum)
st += "rho: {:1.4f}".format(self.rho)
st += "lambda: {:1.3f}".format(self.lambda_rate)
st += "pooling: {:d}".format(self.pooling)
st += "cd_k: {:d}".format(self.cd_k)
st += "epochs: {:d}".format(self.epochs)
return st
def _iterateBatchIndices(self, totalsize,nbatchsize):
"""Returns indices in batches."""
return [ [i,i+nbatchsize] if i+nbatchsize<=totalsize \
else [i,totalsize] for i in range(totalsize)[0::nbatchsize] ]