diff --git a/.github/workflows/gpu-ci.yml b/.github/workflows/gpu-ci.yml index 9add26bd..cb7f9f09 100644 --- a/.github/workflows/gpu-ci.yml +++ b/.github/workflows/gpu-ci.yml @@ -12,8 +12,24 @@ on: jobs: build: - runs-on: 2GPU + runs-on: 1GPU + steps: + - uses: actions/checkout@v3 + with: + fetch-depth: 0 + - name: Run tests + run: | + ref_type=${{ github.ref_type }} + branch=main + if [[ $ref_type == "tag"* ]] + then + raw=$(git branch -r --contains ${{ github.ref_name }}) + branch=${raw/origin\/} + fi + cd ${{ github.workspace }}; tox -r -e test-gpu -- $branch + multigpu: + runs-on: 2GPU steps: - uses: actions/checkout@v3 with: @@ -27,4 +43,4 @@ jobs: raw=$(git branch -r --contains ${{ github.ref_name }}) branch=${raw/origin\/} fi - cd ${{ github.workspace }}; tox -e test-gpu -- $branch + cd ${{ github.workspace }}; tox -r -e test-gpu-multigpu -- $branch diff --git a/examples/02-Multi-GPU-Tensorflow-with-Horovod.ipynb b/examples/02-Multi-GPU-Tensorflow-with-Horovod.ipynb index 814df910..a80e7d3b 100644 --- a/examples/02-Multi-GPU-Tensorflow-with-Horovod.ipynb +++ b/examples/02-Multi-GPU-Tensorflow-with-Horovod.ipynb @@ -2,7 +2,7 @@ "cells": [ { "cell_type": "code", - "execution_count": 1, + "execution_count": null, "id": "bb28e271", "metadata": {}, "outputs": [], @@ -57,32 +57,25 @@ }, { "cell_type": "code", - "execution_count": 2, + "execution_count": null, "id": "edd46306", "metadata": {}, "outputs": [], "source": [ + "import os\n", + "\n", "from merlin.core.utils import download_file\n", "from merlin.core.dispatch import get_lib" ] }, { "cell_type": "code", - "execution_count": 3, + "execution_count": null, "id": "591f8c61", "metadata": {}, - "outputs": [ - { - "name": "stderr", - "output_type": "stream", - "text": [ - "downloading ml-25m.zip: 262MB [00:07, 36.0MB/s] \n", - "unzipping files: 100%|██████████| 8/8 [00:08<00:00, 1.08s/files]\n" - ] - } - ], + "outputs": [], "source": [ - "DATA_PATH = '/workspace'\n", + "DATA_PATH = os.environ.get(\"DATA_PATH\", os.path.expanduser(\"~/workspace\"))\n", "download_file(\"http://files.grouplens.org/datasets/movielens/ml-25m.zip\", DATA_PATH + \"/ml-25m.zip\")" ] }, @@ -108,19 +101,16 @@ }, { "cell_type": "code", - "execution_count": 4, + "execution_count": null, "id": "c65e5ef6", "metadata": {}, "outputs": [], "source": [ "GPU_COUNT = 2 # specify how many GPUs you would like to train on\n", "\n", - "ratings = get_lib().read_csv(DATA_PATH + '/ml-25m/ratings.csv')\n", + "ratings = get_lib().read_csv(DATA_PATH + \"/ml-25m/ratings.csv\")\n", "\n", - "for i in range(GPU_COUNT):\n", - " ratings[\n", - " int(i * ratings.shape[0] / GPU_COUNT):int((i + 1) * ratings.shape[0] / GPU_COUNT)\n", - " ].to_parquet(DATA_PATH + f'/train_{i}.parquet')" + "ratings.to_parquet(os.path.join(DATA_PATH, \"train.parquet\"))" ] }, { @@ -128,39 +118,27 @@ "id": "8b9a65b4", "metadata": {}, "source": [ - "**Important: Individual parquet files require to have the same number of batches. If one worker has more batches than another, the training process will freeze. At one point during the training process, the worker with more batches waits for the gradients from the worker with fewer batches. But the worker with fewer batches finished the training run.**`\n", - "\n", "Let us now take a closer look at what else we will need to train with Horovod.\n", "\n", "### Write the training script to a file\n", "\n", - "We need to have a `.py` file we will be able to load into each process using `horovodrun`. \n", + "We need to have a `.py` file we will be able to load into each process using `horovodrun`.\n", "\n", "### Set `CUDA visible devices` correctly inside each process\n", "\n", - "We need to set the visible device in each process to its `rank`. This way process with `rank 0` will use the zeroth GPU, process with `rank 1` will use the first GPU, and so on. This ensures that each worker can access only a single GPU.\n", - "\n", - "Additionally, we will use the `rank` information to select the correct parquet file to load per worker (`DATA_PATH + f'/train_{hvd.local_rank()}.parquet'`)\n", - "\n" + "We need to set the visible device in each process to its `rank`. This way process with `rank 0` will use the zeroth GPU, process with `rank 1` will use the first GPU, and so on. This ensures that each worker can access only a single GPU." ] }, { "cell_type": "code", - "execution_count": 5, + "execution_count": null, "id": "9fbe17a7", "metadata": {}, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "Overwriting ./tf_trainer.py\n" - ] - } - ], + "outputs": [], "source": [ - "%%writefile './tf_trainer.py'\n", + "%%writefile \"./tf_trainer.py\"\n", "\n", + "import argparse\n", "import os\n", "\n", "# the order of statements and imports is imoportant\n", @@ -176,20 +154,34 @@ "from merlin.io import Dataset\n", "\n", "import tensorflow as tf\n", - "import horovod.tensorflow.keras as hvd\n", + "import horovod.tensorflow as hvd\n", "\n", - "from glob import glob\n", "from merlin.core.dispatch import get_lib\n", + "\n", "os.environ[\"TF_GPU_ALLOCATOR\"] = \"cuda_malloc_async\"\n", "\n", "hvd.init()\n", "\n", "from merlin.loader.tensorflow import Loader\n", "\n", - "DATA_PATH = '/workspace'\n", + "parser = argparse.ArgumentParser()\n", + "parser.add_argument(\"--data_path\", default=None, help=\"Input directory.\")\n", + "parser.add_argument(\"--batch_size\", type=int, default=None, help=\"Batch size.\")\n", + "args = parser.parse_args()\n", + "\n", + "DATA_PATH = args.data_path or os.path.expanduser(\"~/workspace\")\n", + "BATCH_SIZE = args.batch_size or 1024\n", + "\n", + "dataset = Dataset(os.path.join(DATA_PATH, \"train.parquet\"))\n", + "dataset = dataset.repartition(MPI_SIZE)\n", "\n", - "dataset = Dataset(glob(DATA_PATH + f'/train_{hvd.local_rank()}.parquet'), part_size=\"100MB\")\n", - "loader = Loader(dataset, batch_size=64 * 1024)\n", + "loader = Loader(\n", + " dataset,\n", + " batch_size=BATCH_SIZE,\n", + " global_size=MPI_SIZE,\n", + " global_rank=MPI_RANK,\n", + " device=MPI_RANK,\n", + ")\n", "\n", "label_column = 'rating'\n", "\n", @@ -206,8 +198,8 @@ "class MatrixFactorization(tf.keras.Model):\n", " def __init__(self, n_factors):\n", " super().__init__()\n", - " self.user_embeddings = tf.keras.layers.Embedding(162541, n_factors)\n", - " self.movie_embeddings = tf.keras.layers.Embedding(209171, n_factors)\n", + " self.user_embeddings = tf.keras.layers.Embedding(162542, n_factors)\n", + " self.movie_embeddings = tf.keras.layers.Embedding(209172, n_factors)\n", "\n", " def call(self, batch, training=False):\n", " user_embs = self.user_embeddings(batch['userId'])\n", @@ -218,16 +210,51 @@ "\n", "\n", "model = MatrixFactorization(64)\n", - "opt = tf.keras.optimizers.Adam(1e-2 * hvd.size())\n", - "opt = hvd.DistributedOptimizer(opt)\n", - "model.compile(optimizer=opt, loss=tf.keras.losses.MeanSquaredError(), experimental_run_tf_function=False)\n", - "\n", - "model.fit(\n", - " loader,\n", - " epochs=1,\n", - " callbacks=[hvd.callbacks.BroadcastGlobalVariablesCallback(0)],\n", - " verbose=1 if hvd.rank() == 0 else 0\n", - ")" + "loss = tf.keras.losses.MeanSquaredError()\n", + "opt = tf.optimizers.Adam(1e-2 * hvd.size())\n", + "\n", + "checkpoint_prefix = \"./checkpoints\"\n", + "checkpoint = tf.train.Checkpoint(model=model, optimizer=opt)\n", + "\n", + "\n", + "@tf.function\n", + "def training_step(features, labels, first_batch):\n", + " with tf.GradientTape() as tape:\n", + " probs = model(features, training=True)\n", + " loss_value = loss(labels, probs)\n", + "\n", + " # Horovod: add Horovod Distributed GradientTape.\n", + " tape = hvd.DistributedGradientTape(tape)\n", + "\n", + " grads = tape.gradient(loss_value, model.trainable_variables)\n", + " opt.apply_gradients(zip(grads, model.trainable_variables))\n", + "\n", + " # Horovod: broadcast initial variable states from rank 0 to all other processes.\n", + " # This is necessary to ensure consistent initialization of all workers when\n", + " # training is started with random weights or restored from a checkpoint.\n", + " #\n", + " # Note: broadcast should be done after the first gradient step to ensure optimizer\n", + " # initialization.\n", + " if first_batch:\n", + " hvd.broadcast_variables(model.variables, root_rank=0)\n", + " hvd.broadcast_variables(opt.variables(), root_rank=0)\n", + "\n", + " return loss_value\n", + "\n", + "\n", + "# Horovod: adjust number of steps based on number of GPUs.\n", + "for batch, (features, labels) in enumerate(loader):\n", + " loss_value = training_step(features, labels, batch == 0)\n", + "\n", + " if batch % 10 == 0 and hvd.rank() == 0:\n", + " print('Step #%d\\tLoss: %.6f' % (batch, loss_value))\n", + "\n", + "hvd.join()\n", + "\n", + "# Horovod: save checkpoints only on worker 0 to prevent other workers from\n", + "# corrupting it.\n", + "if hvd.rank() == 0:\n", + " checkpoint.save(checkpoint_prefix)" ] }, { @@ -242,84 +269,14 @@ }, { "cell_type": "code", - "execution_count": 6, + "execution_count": null, "id": "ec5e9b7f", "metadata": { "scrolled": true }, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "[1,0]:2022-12-08 06:58:30.501381: I tensorflow/core/platform/cpu_feature_guard.cc:194] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations: SSE3 SSE4.1 SSE4.2 AVX\n", - "[1,0]:To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.\n", - "[1,0]:2022-12-08 06:58:30.555187: I tensorflow/core/common_runtime/gpu/gpu_process_state.cc:222] Using CUDA malloc Async allocator for GPU: 0\n", - "[1,0]:2022-12-08 06:58:30.555454: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1532] Created device /job:localhost/replica:0/task:0/device:GPU:0 with 16255 MB memory: -> device: 0, name: Tesla V100-SXM2-32GB-LS, pci bus id: 0000:85:00.0, compute capability: 7.0\n", - "[1,1]:2022-12-08 06:58:30.575717: I tensorflow/core/platform/cpu_feature_guard.cc:194] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations: SSE3 SSE4.1 SSE4.2 AVX\n", - "[1,1]:To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.\n", - "[1,1]:2022-12-08 06:58:30.632564: I tensorflow/core/common_runtime/gpu/gpu_process_state.cc:222] Using CUDA malloc Async allocator for GPU: 0\n", - "[1,1]:2022-12-08 06:58:30.632832: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1532] Created device /job:localhost/replica:0/task:0/device:GPU:0 with 16255 MB memory: -> device: 0, name: Tesla V100-SXM2-32GB-LS, pci bus id: 0000:86:00.0, compute capability: 7.0\n", - "[1,0]:2022-12-08 06:58:35.010671: W tensorflow/core/common_runtime/forward_type_inference.cc:231] Type inference failed. This indicates an invalid graph that escaped type checking. Error message: INVALID_ARGUMENT: expected compatible input types, but input 1:\n", - "[1,0]:type_id: TFT_OPTIONAL\n", - "[1,0]:args {\n", - "[1,0]: type_id: TFT_PRODUCT\n", - "[1,0]: args {\n", - "[1,0]: type_id: TFT_TENSOR\n", - "[1,0]: args {\n", - "[1,0]: type_id: TFT_BOOL\n", - "[1,0]: }\n", - "[1,0]: }\n", - "[1,0]:}\n", - "[1,0]: is neither a subtype nor a supertype of the combined inputs preceding it:\n", - "[1,0]:type_id: TFT_OPTIONAL\n", - "[1,0]:args {\n", - "[1,0]: type_id: TFT_PRODUCT\n", - "[1,0]: args {\n", - "[1,0]: type_id: TFT_TENSOR\n", - "[1,0]: args {\n", - "[1,0]: type_id: TFT_LEGACY_VARIANT\n", - "[1,0]: }\n", - "[1,0]: }\n", - "[1,0]:}\n", - "[1,0]:\n", - "[1,0]:\twhile inferring type of node 'mean_squared_error/cond/output/_11'\n", - "[1,1]:2022-12-08 06:58:35.218048: W tensorflow/core/common_runtime/forward_type_inference.cc:231] Type inference failed. This indicates an invalid graph that escaped type checking. Error message: INVALID_ARGUMENT: expected compatible input types, but input 1:\n", - "[1,1]:type_id: TFT_OPTIONAL\n", - "[1,1]:args {\n", - "[1,1]: type_id: TFT_PRODUCT\n", - "[1,1]: args {\n", - "[1,1]: type_id: TFT_TENSOR\n", - "[1,1]: args {\n", - "[1,1]: type_id: TFT_BOOL\n", - "[1,1]: }\n", - "[1,1]: }\n", - "[1,1]:}\n", - "[1,1]: is neither a subtype nor a supertype of the combined inputs preceding it:\n", - "[1,1]:type_id: TFT_OPTIONAL\n", - "[1,1]:args {\n", - "[1,1]: type_id: TFT_PRODUCT\n", - "[1,1]: args {\n", - "[1,1]: type_id: TFT_TENSOR\n", - "[1,1]: args {\n", - "[1,1]: type_id: TFT_LEGACY_VARIANT\n", - "[1,1]: }\n", - "[1,1]: }\n", - "[1,1]:}\n", - "[1,1]:\n", - "[1,1]:\twhile inferring type of node 'mean_squared_error/cond/output/_11'\n", - " 6/191 [..............................] - ETA: 2s - loss: 13.6433 [1,0]:/usr/lib/python3/dist-packages/requests/__init__.py:89: RequestsDependencyWarning: urllib3 (1.26.12) or chardet (3.0.4) doesn't match a supported version!\n", - "[1,0]: warnings.warn(\"urllib3 ({}) or chardet ({}) doesn't match a supported \"\n", - "[1,0]:WARNING:tensorflow:Callback method `on_train_batch_end` is slow compared to the batch time (batch time: 0.0094s vs `on_train_batch_end` time: 0.1490s). Check your callbacks.\n", - "[1,1]:/usr/lib/python3/dist-packages/requests/__init__.py:89: RequestsDependencyWarning: urllib3 (1.26.12) or chardet (3.0.4) doesn't match a supported version!\n", - "[1,1]: warnings.warn(\"urllib3 ({}) or chardet ({}) doesn't match a supported \"\n", - "[1,1]:WARNING:tensorflow:Callback method `on_train_batch_end` is slow compared to the batch time (batch time: 0.0093s vs `on_train_batch_end` time: 0.1489s). Check your callbacks.\n", - "191/191 [==============================] - 8s 12ms/step - loss: 3.3301[1,0][1,0][1,0][1,0][1,0][1,0][1,0][1,0][1,0][1,0][1,0][1,0][1,0][1,0][1,0][1,0][1,0][1,0][1,0][1,0][1,0][1,0][1,0][1,0][1,0][1,0][1,0][1,0][1,0][1,0][1,0][1,0][1,0][1,0][1,0]\n" - ] - } - ], + "outputs": [], "source": [ - "!horovodrun -np {GPU_COUNT} python tf_trainer.py" + "! horovodrun -np {GPU_COUNT} python tf_trainer.py --data_path={DATA_PATH} --batch_size=65536" ] }, { diff --git a/merlin/dataloader/utils/tf/tf_trainer.py b/merlin/dataloader/utils/tf/tf_trainer.py index de16f91c..8d4e8325 100644 --- a/merlin/dataloader/utils/tf/tf_trainer.py +++ b/merlin/dataloader/utils/tf/tf_trainer.py @@ -1,11 +1,10 @@ # External dependencies import argparse -import glob import logging import os from merlin.core.compat import cupy, numpy -from merlin.io import Dataset +from merlin.schema import Tags # we can control how much memory to give tensorflow with this environment variable # IMPORTANT: make sure you do this before you initialize TF's runtime, otherwise @@ -38,11 +37,7 @@ BASE_DIR = args.dir_in or "./data/" BATCH_SIZE = int(args.batch_size or 16384) # Batch Size CATEGORICAL_COLUMNS = args.cats or ["movieId", "userId"] # Single-hot -CATEGORICAL_MH_COLUMNS = args.cats_mh or ["genres"] # Multi-hot -NUMERIC_COLUMNS = args.conts or [] -TRAIN_PATHS = sorted( - glob.glob(os.path.join(BASE_DIR, "train/*.parquet")) -) # Output from ETL-with-NVTabular + hvd.init() # Seed with system randomness (or a static seed) @@ -76,26 +71,25 @@ def seed_fn(): EMBEDDING_TABLE_SHAPES, MH_EMBEDDING_TABLE_SHAPES = nvt.ops.get_embedding_sizes(proc) EMBEDDING_TABLE_SHAPES.update(MH_EMBEDDING_TABLE_SHAPES) -ds = Dataset(TRAIN_PATHS, engine="parquet", part_mem_frac=0.06) -train_dataset_tf = Loader( - ds, # you could also use a glob pattern +train_ds = nvt.Dataset(f"{BASE_DIR}/train", engine="parquet", dtypes={"rating": xp.int8}) +train_ds.schema = train_ds.schema.remove_col("genres") + +target_column = train_ds.schema.select_by_tag(Tags.TARGET).column_names[0] + +train_loader = Loader( + train_ds, batch_size=BATCH_SIZE, shuffle=True, seed_fn=seed_fn, global_size=hvd.size(), global_rank=hvd.rank(), ) + inputs = {} # tf.keras.Input placeholders for each feature to be used emb_layers = [] # output of all embedding layers, which will be concatenated for col in CATEGORICAL_COLUMNS: inputs[col] = tf.keras.Input(name=col, dtype=tf.int32, shape=(1,)) -# Note that we need two input tensors for multi-hot categorical features -for col in CATEGORICAL_MH_COLUMNS: - inputs[col] = ( - tf.keras.Input(name=f"{col}__values", dtype=tf.int64, shape=(1,)), - tf.keras.Input(name=f"{col}__lengths", dtype=tf.int64, shape=(1,)), - ) -for col in CATEGORICAL_COLUMNS + CATEGORICAL_MH_COLUMNS: +for col in CATEGORICAL_COLUMNS: emb_layers.append( tf.feature_column.embedding_column( tf.feature_column.categorical_column_with_identity( @@ -112,9 +106,9 @@ def seed_fn(): x = tf.keras.layers.Dense(1, activation="sigmoid")(x) model = tf.keras.Model(inputs=inputs, outputs=x) loss = tf.losses.BinaryCrossentropy() -opt = tf.keras.optimizers.SGD(0.01 * hvd.size()) +opt = tf.keras.optimizers.legacy.SGD(0.01 * hvd.size()) opt = hvd.DistributedOptimizer(opt) -checkpoint_dir = "./checkpoints" +checkpoint_dir = os.path.join(BASE_DIR, "checkpoints") checkpoint = tf.train.Checkpoint(model=model, optimizer=opt) @@ -140,7 +134,7 @@ def training_step(examples, labels, first_batch): # Horovod: adjust number of steps based on number of GPUs. -for batch, (examples, labels) in enumerate(train_dataset_tf): +for batch, (examples, labels) in enumerate(train_loader): loss_value = training_step(examples, labels, batch == 0) if batch % 100 == 0 and hvd.local_rank() == 0: print(f"Step #{batch}\tLoss: {loss_value:.6f}") diff --git a/pytest.ini b/pytest.ini new file mode 100644 index 00000000..3b9940df --- /dev/null +++ b/pytest.ini @@ -0,0 +1,4 @@ +[pytest] +markers = + multigpu: Tests only run in multiple-GPU environments + singlegpu: Optional marker to run tests in single-GPU environments. Usually used when running in both single- and multi-GPU. diff --git a/tests/examples/test_multi_GPU_with_horovod_and_tensorflow.py b/tests/examples/test_multi_GPU_with_horovod_and_tensorflow.py index 50c0e33f..9d48bdad 100644 --- a/tests/examples/test_multi_GPU_with_horovod_and_tensorflow.py +++ b/tests/examples/test_multi_GPU_with_horovod_and_tensorflow.py @@ -1,28 +1,63 @@ import os +import subprocess + +import numpy as np +import pandas as pd import pytest from testbook import testbook pytest.importorskip("tensorflow") +pytest.importorskip("horovod") pytestmark = pytest.mark.tensorflow -@testbook("examples/02-Multi-GPU-Tensorflow-with-Horovod.ipynb", execute=False) -def test_getting_started_tensorflow(tb): +@pytest.mark.multigpu +@testbook("examples/02-Multi-GPU-Tensorflow-with-Horovod.ipynb", execute=False, timeout=120) +def test_getting_started_tensorflow(tb, tmpdir): + ml_25m_dir = tmpdir / "ml-25" + ml_25m_dir.mkdir() + ratings_path = ml_25m_dir / "ratings.csv" + pd.DataFrame( + { + "userId": np.random.randint(0, 10, 100_000), + "movieId": np.random.randint(0, 10, 100_000), + "rating": np.random.randint(0, 5, 100_000).astype(np.float32), + } + ).to_csv(ratings_path, index=False) + tb.inject( - """ - import pandas as pd - import numpy as np - - !mkdir -p /tmp/ml-25m - pd.DataFrame({ - 'userId': np.random.randint(0, 10, 100_000), - 'movieId': np.random.randint(0, 10, 100_000), - 'rating': np.random.randint(0, 5, 100_000).astype(np.float32) - }).to_csv('/tmp/ml-25m/ratings.csv', index=False) + f""" + import os + os.environ["DATA_PATH"] = "{str(tmpdir)}" """ ) - tb.cells[4].source = "DATA_PATH = '/tmp'" - tb.cells[7].source.replace("GPU_COUNT = 2", "GPU_COUNT = 1") + tb.execute() - os.system("horovodrun -np 1 python tf_trainer.py") + + curr_path = os.path.abspath(__file__) + repo_root = os.path.relpath(os.path.normpath(os.path.join(curr_path, "../../.."))) + hvd_wrap_path = os.path.join(repo_root, "merlin/dataloader/utils/tf/hvd_wrapper.sh") + with subprocess.Popen( + [ + "horovodrun", + "-np", + "2", + "-H", + "localhost:2", + "sh", + hvd_wrap_path, + "python", + "tf_trainer.py", + f"--data_path={str(tmpdir)}", + "--batch_size=65536", + ], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + ) as process: + process.wait() + stdout, stderr = process.communicate() + print(stdout, stderr) + assert "Loss:" in str(stdout) + + assert any(f.startswith("checkpoints-") for f in os.listdir(tmpdir)) diff --git a/tests/unit/dataloader/test_tf_dataloader.py b/tests/unit/dataloader/test_tf_dataloader.py index 10443897..1b7e3c1f 100644 --- a/tests/unit/dataloader/test_tf_dataloader.py +++ b/tests/unit/dataloader/test_tf_dataloader.py @@ -535,10 +535,7 @@ def test_multigpu_partitioning(dataset, batch_size, global_rank): assert indices == [global_rank] -@pytest.mark.skipif( - os.environ.get("NR_USER") is not None, - reason="not working correctly in ci environment", -) +@pytest.mark.multigpu @pytest.mark.skipif(importlib.util.find_spec("horovod") is None, reason="needs horovod") @pytest.mark.skipif( HAS_GPU and cupy and cupy.cuda.runtime.getDeviceCount() <= 1, diff --git a/tox.ini b/tox.ini index ffe48e74..b30386d4 100644 --- a/tox.ini +++ b/tox.ini @@ -66,12 +66,8 @@ passenv = NR_USER CUDA_VISIBLE_DEVICES sitepackages=true -; Runs in: Internal Jenkins +; Runs in: 1GPU Github Actions runners. ; Runs GPU-based tests. -; The jenkins jobs run on an image based on merlin-hugectr. This will include all cudf configuration -; and other gpu-specific libraries that we can enxpect will always exist. Thus, we don't need -; to install requirements.txt yet. As we get better at python environment isolation, we will -; need to add some back. deps = -rrequirements/dev.txt pytest @@ -80,7 +76,24 @@ commands = python -m pip install --upgrade git+https://github.com/NVIDIA-Merlin/core.git python -m pip install --upgrade git+https://github.com/NVIDIA-Merlin/nvtabular.git - python -m pytest --cov-report term --cov merlin -rxs tests/unit tests/examples + python -m pytest -m "singlegpu or not multigpu" --cov-report term --cov merlin -rxs tests/unit/ tests/examples/ + +[testenv:test-gpu-multigpu] +passenv = + CUDA_VISIBLE_DEVICES + OPAL_PREFIX +sitepackages=true +; Runs in: 2GPU Github Actions runners. +; Runs GPU-based tests. +deps = + -rrequirements/dev.txt + pytest + pytest-cov +commands = + python -m pip install --upgrade git+https://github.com/NVIDIA-Merlin/core.git + python -m pip install --upgrade git+https://github.com/NVIDIA-Merlin/nvtabular.git + + python -m pytest -m "multigpu" --cov-report term --cov merlin -rxs tests/unit/ tests/examples/ [testenv:test-models-cpu] passenv=GIT_COMMIT