Distributed Tensorflow

Senior Data Scientist

Tensorflow is a popular open-source library designed for numerical computation, most commonly the training and serving of neural networks. In this framework, computation is described via data flow graphs, which offer a large degree of flexibility in the structure and placement of operations. This allows for parallelizing computation across multiple workers, which is often beneficial in the training of neural networks given the large number of training data that must be processed. Furthermore, such parallelization may sometimes be required if the size of the model becomes sufficiently large. In this post, we will explore the mechanisms through which computation in TensorFlow can be distributed.

  A Sample TensorFlow Graph  
A sample TensorFlow graph

Data vs. Model Parallelism

When splitting the training of a neural network across multiple compute nodes, two strategies are commonly employed: data parallelism and model parallelism. In the former, individual instances of the model are created on each node and fed different training samples; this allows for higher training throughput. Conversely, in model parallelism, a single instance of the model is split across multiple nodes allowing for larger models, ones which may not necessarily fit in the memory of a single node, to be trained. If desired, these two strategies can also be composed resulting in multiple instances of a given model with each instance spanning multiple nodes. In this post, we will focus on data parallelism.

 

  
Different forms of data and model parallelism. Left: Data parallelism. Center: Model parallelism. Right: Data and model parallelism.

Data Parallelism in TensorFlow

When using TensorFlow, data parallelism largely manifests itself in two forms: in-graph replication and between-graph replication. The most significant difference between the two strategies lies in the structure of the flow graph and the consequences that result.

In-Graph Replication

In-graph replication is commonly thought to be the simpler and more straightforward (but less scalable) of the two approaches. When employing this strategy, a single graph is created on the distributed master that includes all of the replicas residing on its worker devices. As one can imagine, such a graph can grow to be quite large as the number of workers grows, which can adversely impact performance. However, for small systems (e.g., a dual-GPU desktop computer), in-graph replication may be preferred due to its simplicity.

Below are snippets demonstrating the fairly painless manner in which in-graph replication can be applied relative to baseline TensorFlow code using a single GPU. Given the issues this approach has with scaling, we will only consider the case of a single-machine, multi-GPU configuration. The differences between the two blocks of code are fairly minor, namely chunking of the input data so that it's evenly distributed across workers, iterating over the workers where the graph elements are placed on each device, and the concatenation of the results from the various workers. With this small code change, one is able to leverage multiple devices, making this method popular for simpler configurations where scalability is not a significant concern.
 


# single GPU (baseline)
import tensorflow as tf



# place the initial data on the cpu
with tf.device('/cpu:0'):
    input_data = tf.Variable([[1., 2., 3.],
                              [4., 5., 6.],
                              [7., 8., 9.],
                              [10., 11., 12.]])
    b = tf.Variable([[1.], [1.], [2.]])






# compute the result on the 0th gpu
with tf.device('/gpu:0'):
    output = tf.matmul(input_data, b)





# create a session and run
with tf.Session() as sess:
    sess.run(tf.global_variables_initializer())
    print sess.run(output)

# in-graph replication
import tensorflow as tf

num_gpus = 2

# place the initial data on the cpu
with tf.device('/cpu:0'):
    input_data = tf.Variable([[1., 2., 3.],
                              [4., 5., 6.],
                              [7., 8., 9.],
                              [10., 11., 12.]])
    b = tf.Variable([[1.], [1.], [2.]])

# split the data into chunks for each gpu
inputs = tf.split(input_data, num_gpus)
outputs = []

# loop over available gpus and pass input data
for i in range(num_gpus):
    with tf.device('/gpu:'+str(i)):
        outputs.append(tf.matmul(inputs[i], b))

# merge the results of the devices
with tf.device('/cpu:0'):
    output = tf.concat(outputs, axis=0)

# create a session and run
with tf.Session() as sess:
    sess.run(tf.global_variables_initializer())
    print sess.run(output)

 

These changes can also be visualized by examining the TensorFlow graphs, which are included below. The repeated GPU blocks illustrate the manner in which the original method is scaled.
 

  Single Device for Comparison   In-Graph Replication  
Visualizing in-graph replication. Left: The original graph. Right: The resulting graph after in-graph replication.

Between-Graph Replication

Recognizing the limitations of in-graph replication at scale, between-graph replication aims to be performant with large numbers of nodes. This is achieved by creating a copy of the computation graph on each worker and eliminating the need for the master to host a copy of each worker's graph. Coordination between these separate graphs is performed through a bit of TensorFlow magic -- if two separate nodes allocate a variable on the same TensorFlow device with the same name, those allocations are merged and the variable will share the same backend storage, thus coupling the workers together.

However, care must be taken to ensure correct device placement. If two workers allocate the variable on different devices, this coupling will not occur. To aid in this process, TensorFlow offers replica_device_setter. Provided the individual workers create their graphs in the same sequence, replica_device_setter offers a deterministic method for variable allocation, ensuring the variables reside on the same device. This will be demonstrated in the code below.

Since between-graph replication is largely creating duplicates of the original graph, most of the relevant changes actually reside in the configuration of the nodes in the cluster. Therefore, the following code snippet will focus on just that. It is important to note that this script would typically be executed on each machine in the cluster but with different command-line arguments. We will walk through the code line-by-line now.
 


import sys
import tensorflow as tf

# specify the cluster's architecture
cluster = tf.train.ClusterSpec({'ps': ['192.168.1.1:1111'], 
                                'worker': ['192.168.1.2:1111',
                                           '192.168.1.3:1111']
                               })

# parse command-line to specify machine
job_type = sys.argv[1]  # job type: "worker" or "ps"
task_idx = sys.argv[2]  # index job in the worker or ps list
                        # as defined in the ClusterSpec

# create TensorFlow Server. This is how the machines communicate.
server = tf.train.Server(cluster, job_name=job_type, task_index=task_idx)

# parameter server is updated by remote clients. 
# will not proceed beyond this if statement.
if job_type == 'ps':
    server.join()
else:
    # workers only
    with tf.device(tf.train.replica_device_setter(
                        worker_device='/job:worker/task:'+task_idx,
                        cluster=cluster)):
        # build your model here as if you only were using a single machine
        
    with tf.Session(server.target):
        # train your model here


The first step in running distributed TensorFlow is to specify the architecture of the cluster using tf.train.ClusterSpec. Nodes are typically divided into two roles (or "jobs"): parameter servers ("ps") which host variables and "workers" which perform the heavy computation. The IP address and port of each node is provided. Next, the script must determine its job type and index within the network; this is typically achieved by passing command-line arguments to the script, which we parse. job_type specifies whether the node is running a ps or worker task, whereas task_idx specifies the node's index into the ps or worker list. Using this information, a TensorFlow Server is created to allow for connectivity between devices.

Next, if the node is a parameter server, it simply joins its threads and waits for them to terminate. While it may seem counterintuitive that there is no ps-specific code, the graph elements are actually pushed to it from the workers.

Conversely, if the device is a worker, we build our model using the replica_device_setter so that parameters are consistently allocated across our ps servers as discussed earlier. These copies will be largely identical to their single-machine counterparts. Finally, we create a tf.Session and train our model.

Wrapping Up

Hopefully, this post has clarified some of the terminology and techniques associated with distributed TensorFlow. In future posts, we will explore this and other topics in more detail, so feel free to check back periodically.

Also, if improving the healthcare system through the intelligent application of machine learning excites you, feel free to reach out. We're in the process of scaling up, both in terms of headcount and compute. Sponsored by NVIDIA and GE, we have the resources to expand and are actively looking for talent. Come join us and put our DGX-1s to good use!