-
Notifications
You must be signed in to change notification settings - Fork 115
Draft Supporting PyTorch
ElasticDL is an elastic and k8s native distributed deep learning framework. It targets to be designed to work with any DL framework smoothly, such as TF, PyTorch, etc. Considering that PyTorch is more widely used in academia, this document is going to show you the design of how it works with PyTorch.
The training of most neural networks can be simplified to this process:
- Define the network: Define the
Class
of the network, declare the instance of the network net=Net(). - Define the optimizer:
optimizer=optim.xxx(net.parameters(),lr=xxx)
- Define the loss function:
compute_loss=nn.MSELoss()
- training loop:
- Clear the gradient information in the optimizer:
optimizer.zero_grad()
- Forward:
output=net(input)
- Calculate the loss:
loss=compute_loss(target,output)
- Backward:
loss.backward()
- Update parameters:
optimizer.step()
If you want to build a network, you need to define a Class
first which inherits
nn.Module
. nn
is a very useful toolbox, import torch.nn as nn
is needed.
For example, there are mainly two functions written in this class called Net
,
one is the initialized __init__
function, and the other is the forward
function.
class Net(nn.Module):
def __init__(self):
super().__init__()
self.conv1=nn.Conv2d(1,6,5)
self.conv2=nn.Conv2d(6,16,5)
def forward(self, x):
x=F.max_pool2d(F.relu(self.conv1(x)),2)
x=F.max_pool2d(F.relu(self.conv2(x)),2)
return x
net = Net()
__init__
is the definition of the convolutional layer, and super()
must be
executed first to initialize the parent class nn.Module
.forward
is the real
execution of the data flow. For example, in the above code, the input x
passes
through the defined conv1
first and then passes through the activation function
F.relu()
.In the beginning, you should import torch.nn.functional
as F
.
F.relu()
is an official function. If you define relu
as myrelu
in __init__
,
then your first sentence will be x=F.max_pool2d(myrelu(self.conv1(x)),2)
.
After a series of flows, return x
to the outside.
PyTorch has an abstract Dataset
class. This tutorial
walks through a nice example of creating a custom FacialLandmarkDataset
class as
a subclass of Dataset
.
PyTorch’s TensorDataset
is a Dataset
wrapping tensors.
torch.utils.data.DataLoader
is an iterator that provides all these features.
- Batching the data
- Shuffling the data
- Load the data in parallel using multiprocessing workers.
# Mnist digits dataset
train_data = torchvision.datasets.MNIST(
root='./mnist/',
train=True, # this is training data
transform=torchvision.transforms.ToTensor(), # Converts a PIL.Image or numpy.ndarray to
# torch.FloatTensor of shape (C x H x W) and normalize in the range [0.0, 1.0]
download=DOWNLOAD_MNIST,
)
Next, we input train_data
into neural network and get output by forward()
, and
finally calculate the error. The code below omits the part of calculating the accuracy.
If you want to take a closer look at the accuracy code, please go to me see all the
code on github.
optimizer = torch.optim.Adam(cnn.parameters(), lr=LR) # optimize all cnn parameters
loss_func = nn.CrossEntropyLoss() # the target label is not one-hotted
# training and testing
for epoch in range(EPOCH):
for step, (b_x, b_y) in enumerate(train_loader): # 分配 batch data, normalize x when iterate train_loader
output = cnn(b_x) # cnn output
loss = loss_func(output, b_y) # cross entropy loss
optimizer.zero_grad() # clear gradients for this training step
loss.backward() # backpropagation, compute gradients
optimizer.step() # apply gradients
If you want the output
of the neural network to be similar to the ground truth
you expect, that is to keep reducing the difference between the two. This difference
is defined by you, which is the object function or loss function. If the loss
function approaches 0, then the goal is naturally achieved.
The master process of ElasticDL uses asynchronous or synchronous SGD methods to
coordinate workers for training
. When using asynchronous SGD method, the master
will start a high-performance parameter server for each worker to use. When using
synchronous SGD, ElasticDL uses Kubernetes-native's fault-tolerable AllReduce implementation.
ElasticDL holds master-worker architecture. The master node plays the master role in two aspects.
- It's the master of the cluster.
- It's the master of the model training/evaluation/prediction process.
ElasticDL requests users to provide several functions, including forward
,
loss
, optimizer
and feed
. Here
is a MNIST model written in TensorFlow Keras API. The feed
customizes the conversion
process of training data to PyTorch model input.
In PyTorch, we follow the same interface design, and the following is a detailed example.
class Net(nn.Module):
def __init__(self):
super(Net, self).__init__()
self.conv1 = nn.Sequential( # input shape (1, 28, 28)
nn.Conv2d(
in_channels=1, # input height
out_channels=16, # n_filters
kernel_size=5, # filter size
stride=1, # filter movement/step
padding=2, # if want same width and length of this image after Conv2d, padding=(kernel_size-1)/2 if stride=1
), # output shape (16, 28, 28)
nn.ReLU(), # activation
nn.MaxPool2d(kernel_size=2), # choose max value in 2x2 area, output shape (16, 14, 14)
)
self.conv2 = nn.Sequential( # input shape (16, 14, 14)
nn.Conv2d(16, 32, 5, 1, 2), # output shape (32, 14, 14)
nn.ReLU(), # activation
nn.MaxPool2d(2), # output shape (32, 7, 7)
)
self.out = nn.Linear(32 * 7 * 7, 10) # fully connected layer, output 10 classes
def forward(self, x):
x = self.conv1(x)
x = self.conv2(x)
x = x.view(x.size(0), -1) # flatten the output of conv2 to (batch_size, 32 * 7 * 7)
output = self.out(x)
return output
net = Net()
In addition to defining models, users also need to specify feed
, loss
, optimizer
functions.
def loss(labels, predictions):
labels = tf.reshape(labels, [-1])
func = nn.CrossEntropyLoss()
return func(predictions, labels)
)
def optimizer(lr=0.1):
return torch.optim.Adam(cnn.parameters(), lr)
# how to make PyTorch DataLoad works with ElasticDL master, need improvment
def feed(dataset, mode, _):
def _parse_data(record):
if mode == Mode.PREDICTION:
feature_description = {
"image": tf.io.FixedLenFeature([28, 28], tf.float32)
}
else:
feature_description = {
"image": tf.io.FixedLenFeature([28, 28], tf.float32),
"label": tf.io.FixedLenFeature([1], tf.int64),
}
r = tf.io.parse_single_example(record, feature_description)
features = {
"image": tf.math.divide(tf.cast(r["image"], tf.float32), 255.0)
}
if mode == Mode.PREDICTION:
return features
else:
return features, tf.cast(r["label"], tf.int32)
dataset = dataset.map(_parse_data)
if mode == Mode.TRAINING:
dataset = dataset.shuffle(buffer_size=1024)
return dataset
ElasticDL introduces a master process for each job. By calling the Kubernetes API, the master process understands the cluster situation. The data is distributed by the masterdynamic_data_sharding.md.
- A worker get a task from the master.
- A worker reads real data according to the offset in the task
feed
customizes the conversion process of training data to PyTorch model input.
ElasticDL's dynamic data partitioning is based on indexes. The training data of ElasticDL is one or more files in RecordIO format. Tables in the MaxCompute database system are also uesd as the format of training data. The contents of each task are as follows:
- File name or table name,
- The offset of the first record from the beginning of the file (or table),
- The total number of records in this task.
TODO: Make DataLoader works with task, more details will be added.
There is a tutorial about feed in TensorFlow.
A task received by an ElasticDL worker usually includes multiple minibatches. For each task, the worker opens the corresponding file or table, and then:
- Get a mini-batch training data.
- Call the user-defined
forward
function with the local model as a parameter to calculate the cost. If the model is large, some parameters may come from the parameter server. - The worker performs backward calculations to obtain the gradient.
- In synchronous SGD, the worker calls
AllReduce
to implement FTlib to synchronize gradients and update the model. In asynchronous SGD, the worker uploads gradients to theparameter server
from time to time, and also obtains global model parameters from the parameter server from time to time.
while (True):
task = get_task()
dataset = create_dataset(task)
for minibatch in dataset:
pull_parameters()
forward()
backward()
push_gradients()
The advanced API in PyTorch such as torch.optim
is not available,we had to
update the value of each parameter by name, and manually zero the gradient of
each parameter.
torch.no_grad()
context is necessary because we don't want to record these
operations in the next gradient calculation.To go further, we can use
model.parameters()
and model.zero_grad()
(defined by PyTorch for nn.Module
)
to make these steps more concise, and there will be no errors of forgetting some
parameters, especially when we build a complex model:
with torch.no_grad():
for param in model.parameters():
param -= param.grad * lr
model.zero_grad()
This document describes the design of a distributed parameter server for ElasticDL.
PSClient
provides several util functions, like push_gradients
and pull_dense_parameters
,
we could directly use them.
with torch.no_grad():
grads = [param.grad.numpy() for param in model.parameters()]
self.ps_client.push_gradients(grads)
In the parameter server strategy, the workers pull the latest parameters from
the PS before forwarding and push gradients to the PS after backward.
Each PS pod has a RPC server to provide RPC services. Workers use RPC services
to pull model parameters. pull_variable
service is to pull all non-embedding
parameters. pull_embedding_vector
service is to pull embedding vectors
specified by an embedding layer name and a list of discrete IDs.
service PServer{
rpc pull_variable(PullModelRequest) returns (PullModelResponse);
rpc pull_embedding_vector(PullEmbeddingVectorRequest) returns (Tensor);
}
A worker computes gradients in each training iteration, which contain gradients
for non-embedding parameters and some embedding vectors if applicable. The worker
partitions these gradients using their corresponding parameter names or discrete
IDs for embedding vectors. Then the worker sends gradient partitions to their
corresponding PS pods by RPC calls push_gradient
.
service PServer{
rpc push_gradient(PushGradientRequest) returns (PushGradientResponse);
}
When a PS pod receives gradients in push_gradient
, it uses a PyTorch optimizer
to apply gradients to non-embedding parameters.
worker 中 _train_and_evaluate() , key codes: while True: dataset = self._task_data_service.get_dataset()
# 在这里把dataset(tf.data.Dataset)变成list, 然后自定义一个pytorch中的格式dataset,其中的element来自list
list中元素变成了numpy
# _dataset_fn类似transform的功能,此时torch.utils.data.Dataset中的数据还是
str(因为recordIO.read_records()读出数据是string),_dataset_fn将数据变成Tensor
self.dataset = tf_dataset.map(lambda x: tf.strings.to_number(x, tf.float32))
# 上面_dataset_fn()是用户需要自己写的,接下来在worker.py中还要进一步使用dataloader。
# 后续怎么改进?参考mxnet里方法,读取recordio后直接构造成numpy(image.ImageDetIter中功能)
if not dataset:
self._process_train_end_callback_task_if_needed()
break
dataset = self._dataset_fn(
dataset,
Mode.TRAINING,
self._task_data_service.data_reader.metadata,
)
dataset = dataset.batch(self._minibatch_size).prefetch(1)
而在get_dataset()中 ds = tf.data.Dataset.from_generator( self._gen, self.data_reader.records_output_types ) return ds
def records_output_types(self): return tf.string
这里的dataset是经过tf.data.Dataset.from_generator处理过,然后再通过自定义的_dataset_fn(),