From 618571afbe024bf487fc0a0eec7e7180ccc37475 Mon Sep 17 00:00:00 2001 From: Julio Date: Thu, 11 May 2023 14:05:35 -0400 Subject: [PATCH] made padding op faster and raise error --- merlin/dataloader/ops/padding.py | 15 +++++++++-- tests/unit/dataloader/test_padding.py | 36 ++++++++++++++++++++++++++- 2 files changed, 48 insertions(+), 3 deletions(-) diff --git a/merlin/dataloader/ops/padding.py b/merlin/dataloader/ops/padding.py index f3e4fa07..f19af7c1 100644 --- a/merlin/dataloader/ops/padding.py +++ b/merlin/dataloader/ops/padding.py @@ -1,3 +1,4 @@ +import functools from typing import Union import numpy as np @@ -69,17 +70,27 @@ def compute_output_schema( return Schema(col_schemas) +def get_arange(array_lib, start, end): + return array_lib.arange(int(start), int(end)) + + def pad_put_zeros(column, padding_size, padding_val): # account for zero prepend array_lib = cupy if column.device == Device.GPU else np num_rows = len(column.offsets) - 1 zeros = array_lib.zeros((num_rows, padding_size)).flatten() + padding_val row_lengths = column.offsets[1:] - column.offsets[:-1] + if max(row_lengths) > padding_size: + raise ValueError( + f"There are records in data that have more values ({max(row_lengths)})" + f" than the padding size selected: {padding_size}" + ) row_ranges = [] starts = array_lib.arange(num_rows) * padding_size ends = starts + row_lengths - for idx, offset in enumerate(column.offsets[:-1]): - row_ranges.extend(array_lib.arange(int(starts[idx]), int(ends[idx]))) + row_ranges = array_lib.concatenate( + list(map(functools.partial(get_arange, array_lib), starts, ends)) + ) array_lib.put(zeros, row_ranges, column.values) zeros = array_lib.reshape(zeros, (num_rows, padding_size)) zeros = zeros.astype(column.dtype.element_type.value) diff --git a/tests/unit/dataloader/test_padding.py b/tests/unit/dataloader/test_padding.py index d5875251..a3d3d119 100644 --- a/tests/unit/dataloader/test_padding.py +++ b/tests/unit/dataloader/test_padding.py @@ -13,6 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. # +import numpy as np import pytest from merlin.core.dispatch import HAS_GPU, make_df @@ -26,7 +27,10 @@ def test_padding(cpu): padding_size = 5 padding_value = 0 batch_size = 3 - df = make_df({"a": [[1], [1, 2], [1, 2, 3], [1, 2, 3, 4]]}) + vals = [] + for _ in range(1000): + vals.append(np.random.choice(np.random.rand(10), np.random.randint(0, 5))) + df = make_df({"a": vals}) dataset = Dataset(df, cpu=bool(cpu)) dl_graph = ["a"] >> Padding(padding_size, padding_value) data_loader = Loader( @@ -44,3 +48,33 @@ def test_padding(cpu): column = batch[0]["a"] assert column.values.shape[-1] == padding_size assert column.offsets is None + + +@pytest.mark.parametrize("cpu", [None, "cpu"] if HAS_GPU else ["cpu"]) +def test_padding_size_too_small(cpu): + padding_size = 5 + padding_value = 0 + batch_size = 3 + vals = [] + for _ in range(1000): + vals.append(np.random.choice(np.random.rand(10), np.random.randint(0, 10))) + df = make_df({"a": vals}) + dataset = Dataset(df, cpu=bool(cpu)) + dl_graph = ["a"] >> Padding(padding_size, padding_value) + data_loader = Loader( + dataset, + batch_size=batch_size, + transforms=dl_graph, + shuffle=False, + device=cpu, + ) + col_schema = data_loader._output_schema["a"] + assert col_schema.shape.as_tuple[-1] == padding_size + assert not col_schema.shape.is_fixed # because we don't know the size of the final batch + assert not col_schema.is_ragged + with pytest.raises(ValueError) as exception_info: + for batch in data_loader: + column = batch[0]["a"] + assert column.values.shape[-1] == padding_size + assert column.offsets is None + assert "There are records in data that have more values" in str(exception_info.value)