Skip to content

Commit 7a7dbca

Browse files
committed
trying to pass LB/UB from RC Spoke
1 parent cbe1113 commit 7a7dbca

File tree

3 files changed

+178
-86
lines changed

3 files changed

+178
-86
lines changed

mpisppy/cylinders/hub.py

+3-70
Original file line numberDiff line numberDiff line change
@@ -12,22 +12,21 @@
1212
import mpisppy.log
1313

1414
from mpisppy.cylinders.spcommunicator import RecvArray, SPCommunicator
15-
from math import inf
1615

1716
from mpisppy import global_toc
1817

1918
from mpisppy.cylinders.spwindow import Field
2019

2120
# Could also pass, e.g., sys.stdout instead of a filename
22-
mpisppy.log.setup_logger("mpisppy.cylinders.Hub",
21+
mpisppy.log.setup_logger(__name__,
2322
"hub.log",
2423
level=logging.CRITICAL)
25-
logger = logging.getLogger("mpisppy.cylinders.Hub")
24+
logger = logging.getLogger(__name__)
2625

2726
class Hub(SPCommunicator):
2827

2928
send_fields = (*SPCommunicator.send_fields, Field.SHUTDOWN, Field.BEST_OBJECTIVE_BOUNDS,)
30-
receive_fields = (*SPCommunicator.receive_fields, Field.OBJECTIVE_INNER_BOUND, Field.OBJECTIVE_OUTER_BOUND, )
29+
receive_fields = (*SPCommunicator.receive_fields,)
3130

3231
_hub_algo_best_bound_provider = False
3332

@@ -37,16 +36,10 @@ def __init__(self, spbase_object, fullcomm, strata_comm, cylinder_comm, communic
3736
logger.debug(f"Built the hub object on global rank {fullcomm.Get_rank()}")
3837
# for logging
3938
self.print_init = True
40-
self.latest_ib_char = None
41-
self.latest_ob_char = None
42-
self.last_ib_idx = None
43-
self.last_ob_idx = None
4439
# for termination based on stalling out
4540
self.stalled_iter_cnt = 0
4641
self.last_gap = float('inf') # abs_gap tracker
4742

48-
self.initialize_bound_values()
49-
5043
return
5144

5245
@abc.abstractmethod
@@ -169,66 +162,6 @@ def hub_finalize(self):
169162
global_toc("Statistics at termination", True)
170163
self.screen_trace()
171164

172-
def update_innerbounds(self):
173-
""" Update the inner bounds after receiving them from the spokes
174-
"""
175-
logging.debug("Hub is trying to update from InnerBounds")
176-
for idx, cls, recv_buf in self.receive_field_spcomms[Field.OBJECTIVE_INNER_BOUND]:
177-
if recv_buf.is_new():
178-
bound = recv_buf[0]
179-
logging.debug("!! new InnerBound to opt {}".format(bound))
180-
self.BestInnerBound = self.InnerBoundUpdate(bound, cls, idx)
181-
logging.debug("ph back from InnerBounds")
182-
183-
def update_outerbounds(self):
184-
""" Update the outer bounds after receiving them from the spokes
185-
"""
186-
logging.debug("Hub is trying to update from OuterBounds")
187-
for idx, cls, recv_buf in self.receive_field_spcomms[Field.OBJECTIVE_OUTER_BOUND]:
188-
if recv_buf.is_new():
189-
bound = recv_buf[0]
190-
logging.debug("!! new OuterBound to opt {}".format(bound))
191-
self.BestOuterBound = self.OuterBoundUpdate(bound, cls, idx)
192-
logging.debug("ph back from OuterBounds")
193-
194-
def OuterBoundUpdate(self, new_bound, cls=None, idx=None, char='*'):
195-
current_bound = self.BestOuterBound
196-
if self._outer_bound_update(new_bound, current_bound):
197-
if cls is None:
198-
self.latest_ob_char = char
199-
self.last_ob_idx = 0
200-
else:
201-
self.latest_ob_char = cls.converger_spoke_char
202-
self.last_ob_idx = idx
203-
return new_bound
204-
else:
205-
return current_bound
206-
207-
def InnerBoundUpdate(self, new_bound, cls=None, idx=None, char='*'):
208-
current_bound = self.BestInnerBound
209-
if self._inner_bound_update(new_bound, current_bound):
210-
if cls is None:
211-
self.latest_ib_char = char
212-
self.last_ib_idx = 0
213-
else:
214-
self.latest_ib_char = cls.converger_spoke_char
215-
self.last_ib_idx = idx
216-
return new_bound
217-
else:
218-
return current_bound
219-
220-
def initialize_bound_values(self):
221-
if self.opt.is_minimizing:
222-
self.BestInnerBound = inf
223-
self.BestOuterBound = -inf
224-
self._inner_bound_update = lambda new, old : (new < old)
225-
self._outer_bound_update = lambda new, old : (new > old)
226-
else:
227-
self.BestInnerBound = -inf
228-
self.BestOuterBound = inf
229-
self._inner_bound_update = lambda new, old : (new > old)
230-
self._outer_bound_update = lambda new, old : (new < old)
231-
232165
def _populate_boundsout_cache(self, buf):
233166
""" Populate a given buffer with the current bounds
234167
"""

mpisppy/cylinders/reduced_costs_spoke.py

+87-8
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
# All rights reserved. Please see the files COPYRIGHT.md and LICENSE.md for
77
# full copyright and license information.
88
###############################################################################
9+
import math
910
import pyomo.environ as pyo
1011
import numpy as np
1112
from mpisppy.cylinders.lagrangian_bounder import LagrangianOuterBound
@@ -15,7 +16,8 @@
1516

1617
class ReducedCostsSpoke(LagrangianOuterBound):
1718

18-
send_fields = (*LagrangianOuterBound.send_fields, Field.EXPECTED_REDUCED_COST, Field.SCENARIO_REDUCED_COST,)
19+
send_fields = (*LagrangianOuterBound.send_fields, Field.EXPECTED_REDUCED_COST, Field.SCENARIO_REDUCED_COST,
20+
Field.NONANT_LOWER_BOUNDS, Field.NONANT_UPPER_BOUNDS,)
1921
receive_fields = (*LagrangianOuterBound.receive_fields,)
2022

2123
converger_spoke_char = 'R'
@@ -57,8 +59,40 @@ def register_send_fields(self) -> None:
5759
scenario_buffer_len += len(s._mpisppy_data.nonant_indices)
5860
self._scenario_rc_buffer = np.zeros(scenario_buffer_len)
5961

62+
self.initialize_bound_fields()
63+
self.create_integer_variable_where()
64+
6065
return
6166

67+
def initialize_bound_fields(self):
68+
self._nonant_lower_bounds = self.send_buffers[Field.NONANT_LOWER_BOUNDS].value_array()
69+
self._nonant_upper_bounds = self.send_buffers[Field.NONANT_UPPER_BOUNDS].value_array()
70+
71+
self._nonant_lower_bounds[:] = -np.inf
72+
self._nonant_upper_bounds[:] = np.inf
73+
74+
for s in self.opt.local_scenarios.values():
75+
scenario_lower_bounds = np.fromiter(
76+
_lb_generator(s._mpisppy_data.nonant_indices.values()),
77+
dtype=float,
78+
count=len(s._mpisppy_data.nonant_indices),
79+
)
80+
self._nonant_lower_bounds = np.maximum(self._nonant_lower_bounds, scenario_lower_bounds)
81+
82+
scenario_upper_bounds = np.fromiter(
83+
_ub_generator(s._mpisppy_data.nonant_indices.values()),
84+
dtype=float,
85+
count=len(s._mpisppy_data.nonant_indices),
86+
)
87+
self._nonant_upper_bounds = np.minimum(self._nonant_upper_bounds, scenario_upper_bounds)
88+
89+
def create_integer_variable_where(self):
90+
self._integer_variable_where = np.full(len(self._nonant_lower_bounds), False)
91+
for s in self.opt.local_scenarios.values():
92+
for idx, xvar in enumerate(s._mpisppy_data.nonant_indices.values()):
93+
if xvar.is_integer():
94+
self._integer_variable_where[idx] = True
95+
6296
@property
6397
def rc_global(self):
6498
return self.send_buffers[Field.EXPECTED_REDUCED_COST].value_array()
@@ -84,24 +118,19 @@ def lagrangian_prep(self):
84118
same as base class, but relax the integer variables and
85119
attach the reduced cost suffix
86120
"""
87-
# Split up PH_Prep? Prox option is important for APH.
88-
# Seems like we shouldn't need the Lagrangian stuff, so attach_prox=False
89-
# Scenarios are created here
90-
self.opt.PH_Prep(attach_prox=False)
91-
self.opt._reenable_W()
92-
93121
relax_integer_vars = pyo.TransformationFactory("core.relax_integer_vars")
94122
for s in self.opt.local_subproblems.values():
95123
relax_integer_vars.apply_to(s)
96124
s.rc = pyo.Suffix(direction=pyo.Suffix.IMPORT)
97-
self.opt._create_solvers(presolve=False)
125+
super().lagrangian_prep()
98126

99127
def lagrangian(self, need_solution=True):
100128
if not need_solution:
101129
raise RuntimeError("ReducedCostsSpoke always needs a solution to work")
102130
bound = super().lagrangian(need_solution=need_solution)
103131
if bound is not None:
104132
self.extract_and_store_reduced_costs()
133+
self.extract_and_store_updated_nonant_bounds(bound)
105134
return bound
106135

107136
def extract_and_store_reduced_costs(self):
@@ -176,6 +205,56 @@ def extract_and_store_reduced_costs(self):
176205
Field.SCENARIO_REDUCED_COST,
177206
)
178207

208+
def extract_and_store_updated_nonant_bounds(self, lr_outer_bound):
209+
# update the best bound from the hub
210+
# as a side effect, calls update_receive_buffers
211+
# TODO: fix this side effect
212+
if self.got_kill_signal():
213+
return
214+
215+
self.update_innerbounds()
216+
217+
if math.isinf(self.BestInnerBound):
218+
# can do anything with no bound
219+
return
220+
221+
nonzero_rc = np.where(self.rc_global==0, np.nan, self.rc_global)
222+
bound_tightening = np.divide(self.BestInnerBound - lr_outer_bound, nonzero_rc)
223+
224+
tighten_upper = np.where(bound_tightening>0, np.nan, bound_tightening)
225+
tighten_lower = np.where(bound_tightening<0, np.nan, bound_tightening)
226+
227+
tighten_upper += self._nonant_lower_bounds
228+
tighten_lower += self._nonant_upper_bounds
229+
230+
# max of existing lower and new lower
231+
np.fmax(tighten_lower, self._nonant_lower_bounds, out=self._nonant_lower_bounds)
232+
233+
# min of existing upper and new upper
234+
np.fmin(tighten_upper, self._nonant_upper_bounds, out=self._nonant_upper_bounds)
235+
236+
# ceiling of lower bounds for integer variables
237+
np.ceil(self._nonant_lower_bounds, out=self._nonant_lower_bounds, where=self._integer_variable_where)
238+
# floor of upper bounds for integer variables
239+
np.floor(self._nonant_upper_bounds, out=self._nonant_upper_bounds, where=self._integer_variable_where)
240+
241+
179242
def main(self):
180243
# need the solution for ReducedCostsSpoke
181244
super().main(need_solution=True)
245+
246+
247+
def _lb_generator(var_iterable):
248+
for v in var_iterable:
249+
lb = v.lb
250+
if lb is None:
251+
yield -np.inf
252+
yield lb
253+
254+
255+
def _ub_generator(var_iterable):
256+
for v in var_iterable:
257+
ub = v.ub
258+
if ub is None:
259+
yield np.inf
260+
yield ub

mpisppy/cylinders/spcommunicator.py

+88-8
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,17 @@
1919
Separate hub and spoke classes for memory/window management?
2020
"""
2121

22-
import numpy as np
2322
import abc
2423
import time
24+
import logging
25+
import numpy as np
26+
from math import inf
2527

26-
from mpisppy import MPI
28+
from mpisppy import MPI, global_toc
2729
from mpisppy.cylinders.spwindow import Field, FieldLengths, SPWindow
2830

31+
logger = logging.getLogger(__name__)
32+
2933
def communicator_array(size):
3034
arr = np.empty(size+1, dtype='d')
3135
arr[:] = np.nan
@@ -116,7 +120,8 @@ class SPCommunicator:
116120
or expects to receive from another SPCommunicator object.
117121
"""
118122
send_fields = ()
119-
receive_fields = (Field.NONANT_LOWER_BOUNDS, Field.NONANT_UPPER_BOUNDS,)
123+
receive_fields = (Field.OBJECTIVE_INNER_BOUND, Field.OBJECTIVE_OUTER_BOUND,
124+
Field.NONANT_LOWER_BOUNDS, Field.NONANT_UPPER_BOUNDS,)
120125

121126
def __init__(self, spbase_object, fullcomm, strata_comm, cylinder_comm, communicators, options=None):
122127
self.fullcomm = fullcomm
@@ -152,6 +157,13 @@ def __init__(self, spbase_object, fullcomm, strata_comm, cylinder_comm, communic
152157
# the SPBase object
153158
self.opt.spcomm = self
154159

160+
# for communicating with bounders
161+
self.latest_ib_char = None
162+
self.latest_ob_char = None
163+
self.last_ib_idx = None
164+
self.last_ob_idx = None
165+
self.initialize_bound_values()
166+
155167
return
156168

157169
def _make_key(self, field: Field, origin: int):
@@ -385,20 +397,88 @@ def update_nonant_bounds(self):
385397
upper bound buffers should be up-to-date, which can be done by calling
386398
`SPCommunicator.update_receive_buffers`.
387399
"""
388-
_INF = float("inf")
400+
bounds_modified = 0
389401
for _, _, recv_buf in self.receive_field_spcomms[Field.NONANT_LOWER_BOUNDS]:
390-
for s in self.opt.local_scenarios.items():
402+
for s in self.opt.local_scenarios.values():
391403
for ci, (ndn_i, xvar) in enumerate(s._mpisppy_data.nonant_indices.items()):
392404
xvarlb = xvar.lb
393405
if xvarlb is None:
394-
xvarlb = -_INF
406+
xvarlb = -inf
395407
if recv_buf[ci] > xvarlb:
396408
xvar.lb = recv_buf[ci]
409+
bounds_modified += 1
397410
for _, _, recv_buf in self.receive_field_spcomms[Field.NONANT_UPPER_BOUNDS]:
398-
for s in self.opt.local_scenarios.items():
411+
for s in self.opt.local_scenarios.values():
399412
for ci, (ndn_i, xvar) in enumerate(s._mpisppy_data.nonant_indices.items()):
400413
xvarub = xvar.ub
401414
if xvarub is None:
402-
xvarub = _INF
415+
xvarub = inf
403416
if recv_buf[ci] < xvarub:
404417
xvar.ub = recv_buf[ci]
418+
bounds_modified += 1
419+
420+
bounds_modified /= len(self.opt.local_scenarios)
421+
422+
if bounds_modified > 0:
423+
global_toc(f"{self.__class__.__name__}: tightened {int(bounds_modified)} variable bounds", self.cylinder_rank == 0)
424+
425+
def update_innerbounds(self):
426+
""" Update the inner bounds after receiving them from the spokes
427+
"""
428+
logger.debug(f"{self.__class__.__name__} is trying to update from InnerBounds")
429+
for idx, cls, recv_buf in self.receive_field_spcomms[Field.OBJECTIVE_INNER_BOUND]:
430+
if recv_buf.is_new():
431+
bound = recv_buf[0]
432+
logger.debug("!! new InnerBound to opt {}".format(bound))
433+
self.BestInnerBound = self.InnerBoundUpdate(bound, cls, idx)
434+
logger.debug(f"{self.__class__.__name__} back from InnerBounds")
435+
436+
def update_outerbounds(self):
437+
""" Update the outer bounds after receiving them from the spokes
438+
"""
439+
logger.debug(f"{self.__class__.__name__} is trying to update from OuterBounds")
440+
for idx, cls, recv_buf in self.receive_field_spcomms[Field.OBJECTIVE_OUTER_BOUND]:
441+
if recv_buf.is_new():
442+
bound = recv_buf[0]
443+
logger.debug("!! new OuterBound to opt {}".format(bound))
444+
self.BestOuterBound = self.OuterBoundUpdate(bound, cls, idx)
445+
logger.debug(f"{self.__class__.__name__} back from OuterBounds")
446+
447+
def OuterBoundUpdate(self, new_bound, cls=None, idx=None, char='*'):
448+
current_bound = self.BestOuterBound
449+
if self._outer_bound_update(new_bound, current_bound):
450+
if cls is None:
451+
self.latest_ob_char = char
452+
self.last_ob_idx = 0
453+
else:
454+
self.latest_ob_char = cls.converger_spoke_char
455+
self.last_ob_idx = idx
456+
return new_bound
457+
else:
458+
return current_bound
459+
460+
def InnerBoundUpdate(self, new_bound, cls=None, idx=None, char='*'):
461+
current_bound = self.BestInnerBound
462+
if self._inner_bound_update(new_bound, current_bound):
463+
if cls is None:
464+
self.latest_ib_char = char
465+
self.last_ib_idx = 0
466+
else:
467+
self.latest_ib_char = cls.converger_spoke_char
468+
self.last_ib_idx = idx
469+
return new_bound
470+
else:
471+
return current_bound
472+
473+
def initialize_bound_values(self):
474+
if self.opt.is_minimizing:
475+
self.BestInnerBound = inf
476+
self.BestOuterBound = -inf
477+
self._inner_bound_update = lambda new, old : (new < old)
478+
self._outer_bound_update = lambda new, old : (new > old)
479+
else:
480+
self.BestInnerBound = -inf
481+
self.BestOuterBound = inf
482+
self._inner_bound_update = lambda new, old : (new > old)
483+
self._outer_bound_update = lambda new, old : (new < old)
484+

0 commit comments

Comments
 (0)