Stochastic gradient descent is an extremely scalable learning algorithm, however it's sequential nature can become a bottleneck when processing large data sets. To work around this, researchers have developed parallelized versions of the algorithm. In this post, I'll briefly review stochastic gradient descent as it's applied to logistic regression, and then demonstrate how to implement a parallelized version in Python, based on a recent research paper.
In what follows, I'll be using notation introduced in a previous post.
Gradient descent is a simple, principled optimization algorithm used to choose parameter values in a variety of discriminative and generative predictive models. The algorithm works by efficiently searching a model's parameter space according to the following rule:
\[\begin{aligned} \theta := \theta -\alpha \nabla J(\theta). \end{aligned} \]
In the stochastic version, the parameters \(\theta\) are updated after analyzing each individual training example, as opposed to updating after a subset (mini-batch) or the entire set of (batch) training examples. I've written a previous post on batch gradient descent here.
The algorithm's scalability comes from only having to examine a single training example prior to updating. When the data are large, iterating over an entire training set, or even a small subset, can be an impassable bottleneck.
Logistic regression is a classification algorithm that fits a model to a set of features and targets according to the following relationship \[\begin{aligned} h_\theta(x) = \frac{1}{1+e^{-\theta^Tx}}. \end{aligned} \]
When this hypothesis function is inserted into our cost function, \[\begin{aligned} J(\theta) = \frac{1}{2}\sum_{i=1}^m (y_i - h_\theta(x_i))^2, \end{aligned} \] we obtain the following update rule \[\begin{aligned} \theta := \theta -\alpha \left(y_i-h_\theta(x_i)\right)x_i, \end{aligned} \] which is equal to \[\begin{aligned} \theta := \theta -\alpha \left(y_i-\frac{1}{1+e^{-\theta^Tx_i}}\right)x_i. \end{aligned} \]
Now that we have an update rule, we can implement the algorithm. The code below defines our hypothesis function, \(h\), and the stochastic gradient descent routine, \(sgd\). \(h\) takes as input, a single training example and the parameter vector \(\theta\), and returns the result of the hypothesis function we defined previously. The complete source code for this post can be found here.
For input, \(sgd\) takes the training set's features, \(x\), and targets, \(y\), along with a learning rate, \(a\), and a maximum number of iterations, \(max\_iter\). The algorithm first intializes the parmeters, \(\theta\) to random values and then makes \(max\_iter\) passes over the training set. At each pass, the training data is randomly permuted, after which the parameters \(\theta\) are updated after observing each individual training example. For this post, I am excluding a convergence criteria and instead iterating over the data the maximum number of iterations, as defined by the routine's input parameter.
from math import e import numpy as np def h(x,theta): return 1./(1.+e**(np.dot(x,-theta))) def sgd(x,y,a,max_iter): #initialize algorithm state m,n = x.shape theta = np.random.random(n) z = np.arange(m) #make max_iter passes over the training set for t in xrange(max_iter): #shuffle indices prior to searching z = np.random.permutation(z) #for each training example for i in z: #update parameters theta = theta + a*(y[i]-h(x[i],theta))*x[i] return theta
There are a variety of methods available that implement a parallelized version of stochastic gradient descent. The version we'll use here is based on a relatively new method that trains an ensemble of models in parallel, using stochastic gradient descent and distinct subsets of the training set, and then uses the average parameter values to make predictions. The paper describing this method along with its mathematical properties can be found here.
Implementing the parallelized version of stochastic gradient is trivial process, thanks to Python's multiprocessing library. All that is required is to partition the training set and define a function that takes a data structure containing the subset of training data and stochastic gradient descent parameters.
def train(input): x = input['x'] y = input['y'] a = input['learning_rate'] iters = input['iters'] return = sgd(x,y,a,max_iter=iters)
Then, using a process pool from multiprocessing, each subset of training data can be passed to each worker process and the results can be averaged and used to make predictions.
The code below demonstrates this by creating a syntheic data set using sci-kit learn, partitioning it into distinct subsets, training individual models in parallel, and then using the average to make predictions on the test set.
from multiprocessing import Pool from sklearn import metrics, datasets import numpy as np #learning rate a = 0.001 #create a synthetic data set, default features, 1500 examples, 2 classes x,y = datasets.make_classification(1500) #insert a 1's column to the beginning of x x = np.hstack((np.ones((x.shape[0],1)),x)) #partition the data input = [{'x':x[:250],'y':y[:250],'learning_rate':a,'iters':1000}, {'x':x[250:500],'y':y[250:500],'learning_rate':a,'iters':1000}, {'x':x[500:750],'y':y[500:750],'learning_rate':a,'iters':1000}, {'x':x[750:1000],'y':y[750:1000],'learning_rate':a,'iters':1000}] #worker pool pool = Pool(4) #estimate parameters for each model in parallel thetas = pool.map(train, input) #take the average theta = np.mean(thetas,axis=0) #make predictions on the test set pred = [h(x[i],theta) for i in xrange(1000,1500)] #get the roc curve fpr, tpr, thresholds = metrics.roc_curve(y[1000:], pred)
The figure below plots the ROC curves for parallel and serial versions of the stochastic gradient descent code, trained on the same data, indicating that they produce similar performance, just as the paper proves mathematically.
The advantage of running a parallelized version of stochastic gradient descent is it's speed. To measure the improvement that the parallel version produces I've written two test routines, one for parallel stochastic gradient descent and one for the sequential version. Measuing the run time of each method indicates the parallel version takes 4 times less execution time than the sequential version. This is expected, since I partitioned the data into 4 equal sized groups.
import timeit from multiprocessing import Pool from sklearn import metrics, datasets import numpy as np def test_parallel_sgd(): #learning rate a = 0.001 #create a synthetic data set, default features, 1500 examples, 2 classes x,y = datasets.make_classification(1500) #insert a 1's column to the beginning of x x = np.hstack((np.ones((x.shape[0],1)),x)) #partition the data input = [{'x':x[:250],'y':y[:250],'learning_rate':a,'iters':1000}, {'x':x[250:500],'y':y[250:500],'learning_rate':a,'iters':1000}, {'x':x[500:750],'y':y[500:750],'learning_rate':a,'iters':1000}, {'x':x[750:1000],'y':y[750:1000],'learning_rate':a,'iters':1000}] #worker pool pool = Pool(4) #estimate parameters for each model in parallel thetas = pool.map(train, input) def test_parallel_sgd(): #learning rate a = 0.001 #create a synthetic data set, default features, 1500 examples, 2 classes x,y = datasets.make_classification(1500) #insert a 1's column to the beginning of x x = np.hstack((np.ones((x.shape[0],1)),x)) #estimate parameters for each model in parallel thetas = sgd(x[:1000],y[:1000], a, 1000) if __name__=="__main__": t = timeit.Timer(lambda: test_parallel_sgd()) print t.timeit(number=1) #prints 2.25465488434 t = timeit.Timer(lambda: test_serial_sgd()) print t.timeit(number=1) #prints 8.18555998802
In this post I've demonstrated how to implement a parallel version of stochastic gradient descent in Python and shown that it performs nearly as well as the sequential version. The complete source code for this post can be found here.