@@ -179,7 +179,9 @@ def _is_minmax_reduction(func: T_Agg) -> bool:
179
179
180
180
181
181
def _is_first_last_reduction (func : T_Agg ) -> bool :
182
- return isinstance (func , str ) and func in ["nanfirst" , "nanlast" , "first" , "last" ]
182
+ if isinstance (func , Aggregation ):
183
+ func = func .name
184
+ return func in ["nanfirst" , "nanlast" , "first" , "last" ]
183
185
184
186
185
187
def _get_expected_groups (by : T_By , sort : bool ) -> T_ExpectIndex :
@@ -680,6 +682,7 @@ def rechunk_for_blockwise(
680
682
abs (max (newchunks ) - max (chunks )) / max (chunks ) < BLOCKWISE_RECHUNK_CHUNK_SIZE_THRESHOLD
681
683
)
682
684
):
685
+ logger .debug ("Rechunking to enable blockwise." )
683
686
# Less than 25% change in number of chunks, let's do it
684
687
return array .rechunk ({axis : newchunks })
685
688
else :
@@ -1668,7 +1671,12 @@ def dask_groupby_agg(
1668
1671
# This allows us to discover groups at compute time, support argreductions, lower intermediate
1669
1672
# memory usage (but method="cohorts" would also work to reduce memory in some cases)
1670
1673
labels_are_unknown = is_duck_dask_array (by_input ) and expected_groups is None
1671
- do_simple_combine = not _is_arg_reduction (agg ) and not labels_are_unknown
1674
+ do_grouped_combine = (
1675
+ _is_arg_reduction (agg )
1676
+ or labels_are_unknown
1677
+ or (_is_first_last_reduction (agg ) and array .dtype .kind != "f" )
1678
+ )
1679
+ do_simple_combine = not do_grouped_combine
1672
1680
1673
1681
if method == "blockwise" :
1674
1682
# use the "non dask" code path, but applied blockwise
@@ -2012,8 +2020,13 @@ def _validate_reindex(
2012
2020
expected_groups ,
2013
2021
any_by_dask : bool ,
2014
2022
is_dask_array : bool ,
2023
+ array_dtype : Any ,
2015
2024
) -> bool | None :
2016
2025
# logger.debug("Entering _validate_reindex: reindex is {}".format(reindex)) # noqa
2026
+ def first_or_last ():
2027
+ return func in ["first" , "last" ] or (
2028
+ _is_first_last_reduction (func ) and array_dtype .kind != "f"
2029
+ )
2017
2030
2018
2031
all_numpy = not is_dask_array and not any_by_dask
2019
2032
if reindex is True and not all_numpy :
@@ -2023,7 +2036,7 @@ def _validate_reindex(
2023
2036
raise ValueError (
2024
2037
"reindex=True is not a valid choice for method='blockwise' or method='cohorts'."
2025
2038
)
2026
- if func in [ "first" , "last" ] :
2039
+ if first_or_last () :
2027
2040
raise ValueError ("reindex must be None or False when func is 'first' or 'last." )
2028
2041
2029
2042
if reindex is None :
@@ -2034,9 +2047,10 @@ def _validate_reindex(
2034
2047
if all_numpy :
2035
2048
return True
2036
2049
2037
- if func in [ "first" , "last" ] :
2050
+ if first_or_last () :
2038
2051
# have to do the grouped_combine since there's no good fill_value
2039
- reindex = False
2052
+ # Also needed for nanfirst, nanlast with no-NaN dtypes
2053
+ return False
2040
2054
2041
2055
if method == "blockwise" :
2042
2056
# for grouping by dask arrays, we set reindex=True
@@ -2439,7 +2453,13 @@ def groupby_reduce(
2439
2453
raise ValueError (f"method={ method !r} can only be used when grouping by numpy arrays." )
2440
2454
2441
2455
reindex = _validate_reindex (
2442
- reindex , func , method , expected_groups , any_by_dask , is_duck_dask_array (array )
2456
+ reindex ,
2457
+ func ,
2458
+ method ,
2459
+ expected_groups ,
2460
+ any_by_dask ,
2461
+ is_duck_dask_array (array ),
2462
+ array .dtype ,
2443
2463
)
2444
2464
2445
2465
if not is_duck_array (array ):
@@ -2638,7 +2658,7 @@ def groupby_reduce(
2638
2658
2639
2659
# TODO: clean this up
2640
2660
reindex = _validate_reindex (
2641
- reindex , func , method , expected_ , any_by_dask , is_duck_dask_array (array )
2661
+ reindex , func , method , expected_ , any_by_dask , is_duck_dask_array (array ), array . dtype
2642
2662
)
2643
2663
2644
2664
if TYPE_CHECKING :
0 commit comments