Skip to content

Split gpu ci workflow into single-gpu and multi-gpu #151

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 13 commits into
base: main
Choose a base branch
from
20 changes: 18 additions & 2 deletions .github/workflows/gpu-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,24 @@ on:

jobs:
build:
runs-on: 2GPU
runs-on: 1GPU
steps:
- uses: actions/checkout@v3
with:
fetch-depth: 0
- name: Run tests
run: |
ref_type=${{ github.ref_type }}
branch=main
if [[ $ref_type == "tag"* ]]
then
raw=$(git branch -r --contains ${{ github.ref_name }})
branch=${raw/origin\/}
fi
cd ${{ github.workspace }}; tox -r -e test-gpu -- $branch

multigpu:
runs-on: 2GPU
steps:
- uses: actions/checkout@v3
with:
Expand All @@ -27,4 +43,4 @@ jobs:
raw=$(git branch -r --contains ${{ github.ref_name }})
branch=${raw/origin\/}
fi
cd ${{ github.workspace }}; tox -e test-gpu -- $branch
cd ${{ github.workspace }}; tox -r -e test-gpu-multigpu -- $branch
213 changes: 85 additions & 128 deletions examples/02-Multi-GPU-Tensorflow-with-Horovod.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"cells": [
{
"cell_type": "code",
"execution_count": 1,
"execution_count": null,
"id": "bb28e271",
"metadata": {},
"outputs": [],
Expand Down Expand Up @@ -57,32 +57,25 @@
},
{
"cell_type": "code",
"execution_count": 2,
"execution_count": null,
"id": "edd46306",
"metadata": {},
"outputs": [],
"source": [
"import os\n",
"\n",
"from merlin.core.utils import download_file\n",
"from merlin.core.dispatch import get_lib"
]
},
{
"cell_type": "code",
"execution_count": 3,
"execution_count": null,
"id": "591f8c61",
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"downloading ml-25m.zip: 262MB [00:07, 36.0MB/s] \n",
"unzipping files: 100%|██████████| 8/8 [00:08<00:00, 1.08s/files]\n"
]
}
],
"outputs": [],
"source": [
"DATA_PATH = '/workspace'\n",
"DATA_PATH = os.environ.get(\"DATA_PATH\", os.path.expanduser(\"~/workspace\"))\n",
"download_file(\"http://files.grouplens.org/datasets/movielens/ml-25m.zip\", DATA_PATH + \"/ml-25m.zip\")"
]
},
Expand All @@ -108,59 +101,44 @@
},
{
"cell_type": "code",
"execution_count": 4,
"execution_count": null,
"id": "c65e5ef6",
"metadata": {},
"outputs": [],
"source": [
"GPU_COUNT = 2 # specify how many GPUs you would like to train on\n",
"\n",
"ratings = get_lib().read_csv(DATA_PATH + '/ml-25m/ratings.csv')\n",
"ratings = get_lib().read_csv(DATA_PATH + \"/ml-25m/ratings.csv\")\n",
"\n",
"for i in range(GPU_COUNT):\n",
" ratings[\n",
" int(i * ratings.shape[0] / GPU_COUNT):int((i + 1) * ratings.shape[0] / GPU_COUNT)\n",
" ].to_parquet(DATA_PATH + f'/train_{i}.parquet')"
"ratings.to_parquet(os.path.join(DATA_PATH, \"train.parquet\"))"
]
},
{
"cell_type": "markdown",
"id": "8b9a65b4",
"metadata": {},
"source": [
"**Important: Individual parquet files require to have the same number of batches. If one worker has more batches than another, the training process will freeze. At one point during the training process, the worker with more batches waits for the gradients from the worker with fewer batches. But the worker with fewer batches finished the training run.**`\n",
"\n",
"Let us now take a closer look at what else we will need to train with Horovod.\n",
"\n",
"### Write the training script to a file\n",
"\n",
"We need to have a `.py` file we will be able to load into each process using `horovodrun`. \n",
"We need to have a `.py` file we will be able to load into each process using `horovodrun`.\n",
"\n",
"### Set `CUDA visible devices` correctly inside each process\n",
"\n",
"We need to set the visible device in each process to its `rank`. This way process with `rank 0` will use the zeroth GPU, process with `rank 1` will use the first GPU, and so on. This ensures that each worker can access only a single GPU.\n",
"\n",
"Additionally, we will use the `rank` information to select the correct parquet file to load per worker (`DATA_PATH + f'/train_{hvd.local_rank()}.parquet'`)\n",
"\n"
"We need to set the visible device in each process to its `rank`. This way process with `rank 0` will use the zeroth GPU, process with `rank 1` will use the first GPU, and so on. This ensures that each worker can access only a single GPU."
]
},
{
"cell_type": "code",
"execution_count": 5,
"execution_count": null,
"id": "9fbe17a7",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Overwriting ./tf_trainer.py\n"
]
}
],
"outputs": [],
"source": [
"%%writefile './tf_trainer.py'\n",
"%%writefile \"./tf_trainer.py\"\n",
"\n",
"import argparse\n",
"import os\n",
"\n",
"# the order of statements and imports is imoportant\n",
Expand All @@ -176,20 +154,34 @@
"from merlin.io import Dataset\n",
"\n",
"import tensorflow as tf\n",
"import horovod.tensorflow.keras as hvd\n",
"import horovod.tensorflow as hvd\n",
"\n",
"from glob import glob\n",
"from merlin.core.dispatch import get_lib\n",
"\n",
"os.environ[\"TF_GPU_ALLOCATOR\"] = \"cuda_malloc_async\"\n",
"\n",
"hvd.init()\n",
"\n",
"from merlin.loader.tensorflow import Loader\n",
"\n",
"DATA_PATH = '/workspace'\n",
"parser = argparse.ArgumentParser()\n",
"parser.add_argument(\"--data_path\", default=None, help=\"Input directory.\")\n",
"parser.add_argument(\"--batch_size\", type=int, default=None, help=\"Batch size.\")\n",
"args = parser.parse_args()\n",
"\n",
"DATA_PATH = args.data_path or os.path.expanduser(\"~/workspace\")\n",
"BATCH_SIZE = args.batch_size or 1024\n",
"\n",
"dataset = Dataset(os.path.join(DATA_PATH, \"train.parquet\"))\n",
"dataset = dataset.repartition(MPI_SIZE)\n",
"\n",
"dataset = Dataset(glob(DATA_PATH + f'/train_{hvd.local_rank()}.parquet'), part_size=\"100MB\")\n",
"loader = Loader(dataset, batch_size=64 * 1024)\n",
"loader = Loader(\n",
" dataset,\n",
" batch_size=BATCH_SIZE,\n",
" global_size=MPI_SIZE,\n",
" global_rank=MPI_RANK,\n",
" device=MPI_RANK,\n",
")\n",
"\n",
"label_column = 'rating'\n",
"\n",
Expand All @@ -206,8 +198,8 @@
"class MatrixFactorization(tf.keras.Model):\n",
" def __init__(self, n_factors):\n",
" super().__init__()\n",
" self.user_embeddings = tf.keras.layers.Embedding(162541, n_factors)\n",
" self.movie_embeddings = tf.keras.layers.Embedding(209171, n_factors)\n",
" self.user_embeddings = tf.keras.layers.Embedding(162542, n_factors)\n",
" self.movie_embeddings = tf.keras.layers.Embedding(209172, n_factors)\n",
"\n",
" def call(self, batch, training=False):\n",
" user_embs = self.user_embeddings(batch['userId'])\n",
Expand All @@ -218,16 +210,51 @@
"\n",
"\n",
"model = MatrixFactorization(64)\n",
"opt = tf.keras.optimizers.Adam(1e-2 * hvd.size())\n",
"opt = hvd.DistributedOptimizer(opt)\n",
"model.compile(optimizer=opt, loss=tf.keras.losses.MeanSquaredError(), experimental_run_tf_function=False)\n",
"\n",
"model.fit(\n",
" loader,\n",
" epochs=1,\n",
" callbacks=[hvd.callbacks.BroadcastGlobalVariablesCallback(0)],\n",
" verbose=1 if hvd.rank() == 0 else 0\n",
")"
"loss = tf.keras.losses.MeanSquaredError()\n",
"opt = tf.optimizers.Adam(1e-2 * hvd.size())\n",
"\n",
"checkpoint_prefix = \"./checkpoints\"\n",
"checkpoint = tf.train.Checkpoint(model=model, optimizer=opt)\n",
"\n",
"\n",
"@tf.function\n",
"def training_step(features, labels, first_batch):\n",
" with tf.GradientTape() as tape:\n",
" probs = model(features, training=True)\n",
" loss_value = loss(labels, probs)\n",
"\n",
" # Horovod: add Horovod Distributed GradientTape.\n",
" tape = hvd.DistributedGradientTape(tape)\n",
"\n",
" grads = tape.gradient(loss_value, model.trainable_variables)\n",
" opt.apply_gradients(zip(grads, model.trainable_variables))\n",
"\n",
" # Horovod: broadcast initial variable states from rank 0 to all other processes.\n",
" # This is necessary to ensure consistent initialization of all workers when\n",
" # training is started with random weights or restored from a checkpoint.\n",
" #\n",
" # Note: broadcast should be done after the first gradient step to ensure optimizer\n",
" # initialization.\n",
" if first_batch:\n",
" hvd.broadcast_variables(model.variables, root_rank=0)\n",
" hvd.broadcast_variables(opt.variables(), root_rank=0)\n",
"\n",
" return loss_value\n",
"\n",
"\n",
"# Horovod: adjust number of steps based on number of GPUs.\n",
"for batch, (features, labels) in enumerate(loader):\n",
" loss_value = training_step(features, labels, batch == 0)\n",
"\n",
" if batch % 10 == 0 and hvd.rank() == 0:\n",
" print('Step #%d\\tLoss: %.6f' % (batch, loss_value))\n",
"\n",
"hvd.join()\n",
"\n",
"# Horovod: save checkpoints only on worker 0 to prevent other workers from\n",
"# corrupting it.\n",
"if hvd.rank() == 0:\n",
" checkpoint.save(checkpoint_prefix)"
]
},
{
Expand All @@ -242,84 +269,14 @@
},
{
"cell_type": "code",
"execution_count": 6,
"execution_count": null,
"id": "ec5e9b7f",
"metadata": {
"scrolled": true
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"[1,0]<stderr>:2022-12-08 06:58:30.501381: I tensorflow/core/platform/cpu_feature_guard.cc:194] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations: SSE3 SSE4.1 SSE4.2 AVX\n",
"[1,0]<stderr>:To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.\n",
"[1,0]<stderr>:2022-12-08 06:58:30.555187: I tensorflow/core/common_runtime/gpu/gpu_process_state.cc:222] Using CUDA malloc Async allocator for GPU: 0\n",
"[1,0]<stderr>:2022-12-08 06:58:30.555454: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1532] Created device /job:localhost/replica:0/task:0/device:GPU:0 with 16255 MB memory: -> device: 0, name: Tesla V100-SXM2-32GB-LS, pci bus id: 0000:85:00.0, compute capability: 7.0\n",
"[1,1]<stderr>:2022-12-08 06:58:30.575717: I tensorflow/core/platform/cpu_feature_guard.cc:194] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations: SSE3 SSE4.1 SSE4.2 AVX\n",
"[1,1]<stderr>:To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.\n",
"[1,1]<stderr>:2022-12-08 06:58:30.632564: I tensorflow/core/common_runtime/gpu/gpu_process_state.cc:222] Using CUDA malloc Async allocator for GPU: 0\n",
"[1,1]<stderr>:2022-12-08 06:58:30.632832: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1532] Created device /job:localhost/replica:0/task:0/device:GPU:0 with 16255 MB memory: -> device: 0, name: Tesla V100-SXM2-32GB-LS, pci bus id: 0000:86:00.0, compute capability: 7.0\n",
"[1,0]<stderr>:2022-12-08 06:58:35.010671: W tensorflow/core/common_runtime/forward_type_inference.cc:231] Type inference failed. This indicates an invalid graph that escaped type checking. Error message: INVALID_ARGUMENT: expected compatible input types, but input 1:\n",
"[1,0]<stderr>:type_id: TFT_OPTIONAL\n",
"[1,0]<stderr>:args {\n",
"[1,0]<stderr>: type_id: TFT_PRODUCT\n",
"[1,0]<stderr>: args {\n",
"[1,0]<stderr>: type_id: TFT_TENSOR\n",
"[1,0]<stderr>: args {\n",
"[1,0]<stderr>: type_id: TFT_BOOL\n",
"[1,0]<stderr>: }\n",
"[1,0]<stderr>: }\n",
"[1,0]<stderr>:}\n",
"[1,0]<stderr>: is neither a subtype nor a supertype of the combined inputs preceding it:\n",
"[1,0]<stderr>:type_id: TFT_OPTIONAL\n",
"[1,0]<stderr>:args {\n",
"[1,0]<stderr>: type_id: TFT_PRODUCT\n",
"[1,0]<stderr>: args {\n",
"[1,0]<stderr>: type_id: TFT_TENSOR\n",
"[1,0]<stderr>: args {\n",
"[1,0]<stderr>: type_id: TFT_LEGACY_VARIANT\n",
"[1,0]<stderr>: }\n",
"[1,0]<stderr>: }\n",
"[1,0]<stderr>:}\n",
"[1,0]<stderr>:\n",
"[1,0]<stderr>:\twhile inferring type of node 'mean_squared_error/cond/output/_11'\n",
"[1,1]<stderr>:2022-12-08 06:58:35.218048: W tensorflow/core/common_runtime/forward_type_inference.cc:231] Type inference failed. This indicates an invalid graph that escaped type checking. Error message: INVALID_ARGUMENT: expected compatible input types, but input 1:\n",
"[1,1]<stderr>:type_id: TFT_OPTIONAL\n",
"[1,1]<stderr>:args {\n",
"[1,1]<stderr>: type_id: TFT_PRODUCT\n",
"[1,1]<stderr>: args {\n",
"[1,1]<stderr>: type_id: TFT_TENSOR\n",
"[1,1]<stderr>: args {\n",
"[1,1]<stderr>: type_id: TFT_BOOL\n",
"[1,1]<stderr>: }\n",
"[1,1]<stderr>: }\n",
"[1,1]<stderr>:}\n",
"[1,1]<stderr>: is neither a subtype nor a supertype of the combined inputs preceding it:\n",
"[1,1]<stderr>:type_id: TFT_OPTIONAL\n",
"[1,1]<stderr>:args {\n",
"[1,1]<stderr>: type_id: TFT_PRODUCT\n",
"[1,1]<stderr>: args {\n",
"[1,1]<stderr>: type_id: TFT_TENSOR\n",
"[1,1]<stderr>: args {\n",
"[1,1]<stderr>: type_id: TFT_LEGACY_VARIANT\n",
"[1,1]<stderr>: }\n",
"[1,1]<stderr>: }\n",
"[1,1]<stderr>:}\n",
"[1,1]<stderr>:\n",
"[1,1]<stderr>:\twhile inferring type of node 'mean_squared_error/cond/output/_11'\n",
" 6/191 [..............................] - ETA: 2s - loss: 13.6433 [1,0]<stderr>:/usr/lib/python3/dist-packages/requests/__init__.py:89: RequestsDependencyWarning: urllib3 (1.26.12) or chardet (3.0.4) doesn't match a supported version!\n",
"[1,0]<stderr>: warnings.warn(\"urllib3 ({}) or chardet ({}) doesn't match a supported \"\n",
"[1,0]<stderr>:WARNING:tensorflow:Callback method `on_train_batch_end` is slow compared to the batch time (batch time: 0.0094s vs `on_train_batch_end` time: 0.1490s). Check your callbacks.\n",
"[1,1]<stderr>:/usr/lib/python3/dist-packages/requests/__init__.py:89: RequestsDependencyWarning: urllib3 (1.26.12) or chardet (3.0.4) doesn't match a supported version!\n",
"[1,1]<stderr>: warnings.warn(\"urllib3 ({}) or chardet ({}) doesn't match a supported \"\n",
"[1,1]<stderr>:WARNING:tensorflow:Callback method `on_train_batch_end` is slow compared to the batch time (batch time: 0.0093s vs `on_train_batch_end` time: 0.1489s). Check your callbacks.\n",
"191/191 [==============================] - 8s 12ms/step - loss: 3.3301<stdout>[1,0]<stdout[1,0]<stdout[1,0]<stdout>[1,0]<stdout>[1,0]<stdout>[1,0]<stdout>[1,0]<stdout>[1,0]<stdout>[1,0]<stdout>[1,0]<stdout>[1,0]<stdout>[1,0]<stdout>[1,0]<stdout>[1,0]<stdout>[1,0]<stdout>[1,0]<stdout>[1,0]<stdout>[1,0]<stdout>[1,0]<stdout>[1,0]<stdout>[1,0]<stdout>[1,0]<stdout>[1,0]<stdout>[1,0]<stdout>[1,0]<stdout>[1,0]<stdout>[1,0]<stdout>[1,0]<stdout>[1,0]<stdout>[1,0]<stdout>[1,0]<stdout>[1,0]<stdout>[1,0]<stdout>[1,0]<stdout>[1,0]<stdout>[1,0]<stdout>[1,0]<stdout>\n"
]
}
],
"outputs": [],
"source": [
"!horovodrun -np {GPU_COUNT} python tf_trainer.py"
"! horovodrun -np {GPU_COUNT} python tf_trainer.py --data_path={DATA_PATH} --batch_size=65536"
]
},
{
Expand Down
Loading