Coverage for pySDC/projects/Performance/controller_MPI_scorep.py: 0%

283 statements  

« prev     ^ index     » next       coverage.py v7.5.0, created at 2024-04-29 09:02 +0000

1import numpy as np 

2from mpi4py import MPI 

3 

4from pySDC.core.Controller import controller 

5from pySDC.core.Errors import ControllerError 

6from pySDC.core.Step import step 

7from pySDC.implementations.convergence_controller_classes.check_convergence import CheckConvergence 

8 

9import scorep.user as spu 

10 

11 

12class controller_MPI(controller): 

13 """ 

14 

15 PFASST controller, running parallel version of PFASST in blocks (MG-style) 

16 

17 """ 

18 

19 def __init__(self, controller_params, description, comm): 

20 """ 

21 Initialization routine for PFASST controller 

22 

23 Args: 

24 controller_params: parameter set for the controller and the step class 

25 description: all the parameters to set up the rest (levels, problems, transfer, ...) 

26 comm: MPI communicator 

27 """ 

28 

29 # call parent's initialization routine 

30 super(controller_MPI, self).__init__(controller_params) 

31 

32 # create single step per processor 

33 self.S = step(description) 

34 

35 # pass communicator for future use 

36 self.comm = comm 

37 # add request handler for status send 

38 self.req_status = None 

39 

40 num_procs = self.comm.Get_size() 

41 rank = self.comm.Get_rank() 

42 

43 # insert data on time communicator to the steps (helpful here and there) 

44 self.S.status.time_size = num_procs 

45 

46 if self.params.dump_setup and rank == 0: 

47 self.dump_setup(step=self.S, controller_params=controller_params, description=description) 

48 

49 num_levels = len(self.S.levels) 

50 

51 # add request handle container for isend 

52 self.req_send = [None] * num_levels 

53 

54 if num_procs > 1 and num_levels > 1: 

55 for L in self.S.levels: 

56 if not L.sweep.coll.right_is_node or L.sweep.params.do_coll_update: 

57 raise ControllerError("For PFASST to work, we assume uend^k = u_M^k") 

58 

59 if num_levels == 1 and self.params.predict_type is not None: 

60 self.logger.warning( 

61 'you have specified a predictor type but only a single level.. ' 'predictor will be ignored' 

62 ) 

63 

64 def run(self, u0, t0, Tend): 

65 """ 

66 Main driver for running the parallel version of SDC, MSSDC, MLSDC and PFASST 

67 

68 Args: 

69 u0: initial values 

70 t0: starting time 

71 Tend: ending time 

72 

73 Returns: 

74 end values on the finest level 

75 stats object containing statistics for each step, each level and each iteration 

76 """ 

77 

78 # reset stats to prevent double entries from old runs 

79 self.hooks.reset_stats() 

80 

81 # find active processes and put into new communicator 

82 rank = self.comm.Get_rank() 

83 num_procs = self.comm.Get_size() 

84 all_dt = self.comm.allgather(self.S.dt) 

85 all_time = [t0 + sum(all_dt[0:i]) for i in range(num_procs)] 

86 time = all_time[rank] 

87 all_active = all_time < Tend - 10 * np.finfo(float).eps 

88 

89 if not any(all_active): 

90 raise ControllerError('Nothing to do, check t0, dt and Tend') 

91 

92 active = all_active[rank] 

93 if not all(all_active): 

94 comm_active = self.comm.Split(active) 

95 rank = comm_active.Get_rank() 

96 num_procs = comm_active.Get_size() 

97 else: 

98 comm_active = self.comm 

99 

100 self.S.status.slot = rank 

101 

102 # initialize block of steps with u0 

103 self.restart_block(num_procs, time, u0) 

104 uend = u0 

105 

106 # call post-setup hook 

107 self.hooks.post_setup(step=None, level_number=None) 

108 

109 # call pre-run hook 

110 self.hooks.pre_run(step=self.S, level_number=0) 

111 

112 comm_active.Barrier() 

113 

114 # while any process still active... 

115 while active: 

116 while not self.S.status.done: 

117 name = f'REGION -- {self.S.status.stage} -- {self.S.status.slot}' 

118 spu.region_begin(name) 

119 self.pfasst(comm_active, num_procs) 

120 spu.region_end(name) 

121 

122 time += self.S.dt 

123 

124 # broadcast uend, set new times and fine active processes 

125 tend = comm_active.bcast(time, root=num_procs - 1) 

126 uend = self.S.levels[0].uend.bcast(root=num_procs - 1, comm=comm_active) 

127 all_dt = comm_active.allgather(self.S.dt) 

128 all_time = [tend + sum(all_dt[0:i]) for i in range(num_procs)] 

129 time = all_time[rank] 

130 all_active = all_time < Tend - 10 * np.finfo(float).eps 

131 active = all_active[rank] 

132 if not all(all_active): 

133 comm_active_new = comm_active.Split(active) 

134 comm_active.Free() 

135 comm_active = comm_active_new 

136 rank = comm_active.Get_rank() 

137 num_procs = comm_active.Get_size() 

138 self.S.status.slot = rank 

139 

140 # initialize block of steps with u0 

141 self.restart_block(num_procs, time, uend) 

142 

143 # call post-run hook 

144 self.hooks.post_run(step=self.S, level_number=0) 

145 

146 comm_active.Free() 

147 

148 return uend, self.hooks.return_stats() 

149 

150 def restart_block(self, size, time, u0): 

151 """ 

152 Helper routine to reset/restart block of (active) steps 

153 

154 Args: 

155 size: number of active time steps 

156 time: current time 

157 u0: initial value to distribute across the steps 

158 

159 Returns: 

160 block of (all) steps 

161 """ 

162 

163 # store link to previous step 

164 self.S.prev = self.S.status.slot - 1 

165 self.S.next = self.S.status.slot + 1 

166 

167 # resets step 

168 self.S.reset_step() 

169 # determine whether I am the first and/or last in line 

170 self.S.status.first = self.S.prev == -1 

171 self.S.status.last = self.S.next == size 

172 # intialize step with u0 

173 self.S.init_step(u0) 

174 # reset some values 

175 self.S.status.done = False 

176 self.S.status.iter = 0 

177 self.S.status.stage = 'SPREAD' 

178 for l in self.S.levels: 

179 l.tag = None 

180 self.req_status = None 

181 self.req_send = [None] * len(self.S.levels) 

182 self.S.status.prev_done = False 

183 

184 self.S.status.time_size = size 

185 

186 for lvl in self.S.levels: 

187 lvl.status.time = time 

188 lvl.status.sweep = 1 

189 

190 @staticmethod 

191 def recv(target, source, tag=None, comm=None): 

192 """ 

193 Receive function 

194 

195 Args: 

196 target: level which will receive the values 

197 source: level which initiated the send 

198 tag: identifier to check if this message is really for me 

199 comm: communicator 

200 """ 

201 target.u[0].recv(source=source, tag=tag, comm=comm) 

202 # re-evaluate f on left interval boundary 

203 target.f[0] = target.prob.eval_f(target.u[0], target.time) 

204 

205 def predictor(self, comm): 

206 """ 

207 Predictor function, extracted from the stepwise implementation (will be also used by matrix sweppers) 

208 

209 Args: 

210 comm: communicator 

211 """ 

212 

213 if self.params.predict_type is None: 

214 pass 

215 

216 elif self.params.predict_type == 'fine_only': 

217 # do a fine sweep only 

218 self.S.levels[0].sweep.update_nodes() 

219 

220 elif self.params.predict_type == 'libpfasst_style': 

221 # restrict to coarsest level 

222 for l in range(1, len(self.S.levels)): 

223 self.S.transfer(source=self.S.levels[l - 1], target=self.S.levels[l]) 

224 

225 self.hooks.pre_comm(step=self.S, level_number=len(self.S.levels) - 1) 

226 if not self.S.status.first: 

227 self.logger.debug( 

228 'recv data predict: process %s, stage %s, time, %s, source %s, tag %s' 

229 % (self.S.status.slot, self.S.status.stage, self.S.time, self.S.prev, self.S.status.iter) 

230 ) 

231 self.recv(target=self.S.levels[-1], source=self.S.prev, tag=self.S.status.iter, comm=comm) 

232 self.hooks.post_comm(step=self.S, level_number=len(self.S.levels) - 1) 

233 

234 # do the sweep with new values 

235 self.S.levels[-1].sweep.update_nodes() 

236 self.S.levels[-1].sweep.compute_end_point() 

237 

238 self.hooks.pre_comm(step=self.S, level_number=len(self.S.levels) - 1) 

239 if not self.S.status.last: 

240 self.logger.debug( 

241 'send data predict: process %s, stage %s, time, %s, target %s, tag %s' 

242 % (self.S.status.slot, self.S.status.stage, self.S.time, self.S.next, self.S.status.iter) 

243 ) 

244 self.S.levels[-1].uend.send(dest=self.S.next, tag=self.S.status.iter, comm=comm) 

245 self.hooks.post_comm(step=self.S, level_number=len(self.S.levels) - 1, add_to_stats=True) 

246 

247 # go back to fine level, sweeping 

248 for l in range(len(self.S.levels) - 1, 0, -1): 

249 # prolong values 

250 self.S.transfer(source=self.S.levels[l], target=self.S.levels[l - 1]) 

251 # on middle levels: do sweep as usual 

252 if l - 1 > 0: 

253 self.S.levels[l - 1].sweep.update_nodes() 

254 

255 # end with a fine sweep 

256 self.S.levels[0].sweep.update_nodes() 

257 

258 elif self.params.predict_type == 'pfasst_burnin': 

259 # restrict to coarsest level 

260 for l in range(1, len(self.S.levels)): 

261 self.S.transfer(source=self.S.levels[l - 1], target=self.S.levels[l]) 

262 

263 for p in range(self.S.status.slot + 1): 

264 self.hooks.pre_comm(step=self.S, level_number=len(self.S.levels) - 1) 

265 if not p == 0 and not self.S.status.first: 

266 self.logger.debug( 

267 'recv data predict: process %s, stage %s, time, %s, source %s, tag %s, phase %s' 

268 % (self.S.status.slot, self.S.status.stage, self.S.time, self.S.prev, self.S.status.iter, p) 

269 ) 

270 self.recv(target=self.S.levels[-1], source=self.S.prev, tag=self.S.status.iter, comm=comm) 

271 self.hooks.post_comm(step=self.S, level_number=len(self.S.levels) - 1) 

272 

273 # do the sweep with new values 

274 self.S.levels[-1].sweep.update_nodes() 

275 self.S.levels[-1].sweep.compute_end_point() 

276 

277 self.hooks.pre_comm(step=self.S, level_number=len(self.S.levels) - 1) 

278 if not self.S.status.last: 

279 self.logger.debug( 

280 'send data predict: process %s, stage %s, time, %s, target %s, tag %s, phase %s' 

281 % (self.S.status.slot, self.S.status.stage, self.S.time, self.S.next, self.S.status.iter, p) 

282 ) 

283 self.S.levels[-1].uend.send(dest=self.S.next, tag=self.S.status.iter, comm=comm) 

284 self.hooks.post_comm( 

285 step=self.S, level_number=len(self.S.levels) - 1, add_to_stats=(p == self.S.status.slot) 

286 ) 

287 

288 # interpolate back to finest level 

289 for l in range(len(self.S.levels) - 1, 0, -1): 

290 self.S.transfer(source=self.S.levels[l], target=self.S.levels[l - 1]) 

291 

292 # end this with a fine sweep 

293 self.S.levels[0].sweep.update_nodes() 

294 

295 elif self.params.predict_type == 'fmg': 

296 # TODO: implement FMG predictor 

297 raise NotImplementedError('FMG predictor is not yet implemented') 

298 

299 else: 

300 raise ControllerError('Wrong predictor type, got %s' % self.params.predict_type) 

301 

302 def pfasst(self, comm, num_procs): 

303 """ 

304 Main function including the stages of SDC, MLSDC and PFASST (the "controller") 

305 

306 For the workflow of this controller, check out one of our PFASST talks 

307 

308 Args: 

309 comm: communicator 

310 num_procs (int): number of parallel processes 

311 """ 

312 

313 stage = self.S.status.stage 

314 

315 self.logger.debug(stage + ' - process ' + str(self.S.status.slot)) 

316 

317 if stage == 'SPREAD': 

318 # (potentially) serial spreading phase 

319 

320 # first stage: spread values 

321 self.hooks.pre_step(step=self.S, level_number=0) 

322 

323 # call predictor from sweeper 

324 self.S.levels[0].sweep.predict() 

325 

326 # update stage 

327 if len(self.S.levels) > 1: # MLSDC or PFASST with predict 

328 self.S.status.stage = 'PREDICT' 

329 else: 

330 self.S.status.stage = 'IT_CHECK' 

331 

332 elif stage == 'PREDICT': 

333 # call predictor (serial) 

334 

335 self.hooks.pre_predict(step=self.S, level_number=0) 

336 

337 self.predictor(comm) 

338 

339 self.hooks.post_predict(step=self.S, level_number=0) 

340 

341 # update stage 

342 # self.hooks.pre_iteration(step=self.S, level_number=0) 

343 self.S.status.stage = 'IT_CHECK' 

344 

345 elif stage == 'IT_CHECK': 

346 # check whether to stop iterating (parallel) 

347 

348 self.hooks.pre_comm(step=self.S, level_number=0) 

349 

350 if self.req_send[0] is not None: 

351 self.req_send[0].wait() 

352 self.S.levels[0].sweep.compute_end_point() 

353 

354 if not self.S.status.last and self.params.fine_comm: 

355 self.logger.debug( 

356 'isend data: process %s, stage %s, time %s, target %s, tag %s, iter %s' 

357 % (self.S.status.slot, self.S.status.stage, self.S.time, self.S.next, 0, self.S.status.iter) 

358 ) 

359 self.req_send[0] = self.S.levels[0].uend.isend(dest=self.S.next, tag=self.S.status.iter, comm=comm) 

360 

361 if not self.S.status.first and not self.S.status.prev_done and self.params.fine_comm: 

362 self.logger.debug( 

363 'recv data: process %s, stage %s, time %s, source %s, tag %s, iter %s' 

364 % (self.S.status.slot, self.S.status.stage, self.S.time, self.S.prev, 0, self.S.status.iter) 

365 ) 

366 self.recv(target=self.S.levels[0], source=self.S.prev, tag=self.S.status.iter, comm=comm) 

367 

368 self.hooks.post_comm(step=self.S, level_number=0) 

369 

370 self.S.levels[0].sweep.compute_residual() 

371 self.S.status.done = CheckConvergence.check_convergence(self.S) 

372 

373 if self.params.all_to_done: 

374 self.hooks.pre_comm(step=self.S, level_number=0) 

375 self.S.status.done = comm.allreduce(sendobj=self.S.status.done, op=MPI.LAND) 

376 self.hooks.post_comm(step=self.S, level_number=0, add_to_stats=True) 

377 

378 else: 

379 self.hooks.pre_comm(step=self.S, level_number=0) 

380 

381 # check if an open request of the status send is pending 

382 if self.req_status is not None: 

383 self.req_status.wait() 

384 

385 # recv status 

386 if not self.S.status.first and not self.S.status.prev_done: 

387 self.S.status.prev_done = comm.recv(source=self.S.prev, tag=99) 

388 self.logger.debug( 

389 'recv status: status %s, process %s, time %s, target %s, tag %s, iter %s' 

390 % ( 

391 self.S.status.prev_done, 

392 self.S.status.slot, 

393 self.S.time, 

394 self.S.next, 

395 99, 

396 self.S.status.iter, 

397 ) 

398 ) 

399 self.S.status.done = self.S.status.done and self.S.status.prev_done 

400 

401 # send status forward 

402 if not self.S.status.last: 

403 self.logger.debug( 

404 'isend status: status %s, process %s, time %s, target %s, tag %s, iter %s' 

405 % (self.S.status.done, self.S.status.slot, self.S.time, self.S.next, 99, self.S.status.iter) 

406 ) 

407 self.req_status = comm.isend(self.S.status.done, dest=self.S.next, tag=99) 

408 

409 self.hooks.post_comm(step=self.S, level_number=0, add_to_stats=True) 

410 

411 if self.S.status.iter > 0: 

412 self.hooks.post_iteration(step=self.S, level_number=0) 

413 

414 # if not readys, keep doing stuff 

415 if not self.S.status.done: 

416 # increment iteration count here (and only here) 

417 self.S.status.iter += 1 

418 

419 self.hooks.pre_iteration(step=self.S, level_number=0) 

420 if len(self.S.levels) > 1: # MLSDC or PFASST 

421 self.S.status.stage = 'IT_UP' 

422 else: 

423 if num_procs == 1 or self.params.mssdc_jac: # SDC or parallel MSSDC (Jacobi-like) 

424 self.S.status.stage = 'IT_FINE' 

425 else: 

426 self.S.status.stage = 'IT_COARSE' # serial MSSDC (Gauss-like) 

427 

428 else: 

429 # Need to finish alll pending isend requests. These will occur for the first active process, since 

430 # in the last iteration the wait statement will not be called ("send and forget") 

431 for req in self.req_send: 

432 if req is not None: 

433 req.wait() 

434 if self.req_status is not None: 

435 self.req_status.wait() 

436 

437 self.hooks.post_step(step=self.S, level_number=0) 

438 self.S.status.stage = 'DONE' 

439 

440 elif stage == 'IT_FINE': 

441 nsweeps = self.S.levels[0].params.nsweeps 

442 

443 self.S.levels[0].status.sweep = 0 

444 

445 # do fine sweep 

446 for k in range(nsweeps): 

447 self.S.levels[0].status.sweep += 1 

448 

449 self.hooks.pre_comm(step=self.S, level_number=0) 

450 

451 if self.req_send[0] is not None: 

452 self.req_send[0].wait() 

453 self.S.levels[0].sweep.compute_end_point() 

454 

455 if not self.S.status.last and self.params.fine_comm: 

456 self.logger.debug( 

457 'isend data: process %s, stage %s, time %s, target %s, tag %s, iter %s' 

458 % (self.S.status.slot, self.S.status.stage, self.S.time, self.S.next, 0, self.S.status.iter) 

459 ) 

460 self.req_send[0] = self.S.levels[0].uend.isend(dest=self.S.next, tag=self.S.status.iter, comm=comm) 

461 

462 if not self.S.status.first and not self.S.status.prev_done and self.params.fine_comm: 

463 self.logger.debug( 

464 'recv data: process %s, stage %s, time %s, source %s, tag %s, iter %s' 

465 % (self.S.status.slot, self.S.status.stage, self.S.time, self.S.prev, 0, self.S.status.iter) 

466 ) 

467 self.recv(target=self.S.levels[0], source=self.S.prev, tag=self.S.status.iter, comm=comm) 

468 

469 self.hooks.post_comm(step=self.S, level_number=0, add_to_stats=(k == nsweeps - 1)) 

470 

471 self.hooks.pre_sweep(step=self.S, level_number=0) 

472 self.S.levels[0].sweep.update_nodes() 

473 self.S.levels[0].sweep.compute_residual() 

474 self.hooks.post_sweep(step=self.S, level_number=0) 

475 

476 # update stage 

477 self.S.status.stage = 'IT_CHECK' 

478 

479 elif stage == 'IT_UP': 

480 # go up the hierarchy from finest to coarsest level (parallel) 

481 

482 self.S.transfer(source=self.S.levels[0], target=self.S.levels[1]) 

483 

484 # sweep and send on middle levels (not on finest, not on coarsest, though) 

485 for l in range(1, len(self.S.levels) - 1): 

486 nsweeps = self.S.levels[l].params.nsweeps 

487 

488 for _ in range(nsweeps): 

489 self.hooks.pre_comm(step=self.S, level_number=l) 

490 

491 if self.req_send[l] is not None: 

492 self.req_send[l].wait() 

493 self.S.levels[l].sweep.compute_end_point() 

494 

495 if not self.S.status.last and self.params.fine_comm: 

496 self.logger.debug( 

497 'isend data: process %s, stage %s, time %s, target %s, tag %s, iter %s' 

498 % (self.S.status.slot, self.S.status.stage, self.S.time, self.S.next, l, self.S.status.iter) 

499 ) 

500 self.req_send[l] = self.S.levels[l].uend.isend( 

501 dest=self.S.next, tag=self.S.status.iter, comm=comm 

502 ) 

503 

504 if not self.S.status.first and not self.S.status.prev_done and self.params.fine_comm: 

505 self.logger.debug( 

506 'recv data: process %s, stage %s, time %s, source %s, tag %s, iter %s' 

507 % (self.S.status.slot, self.S.status.stage, self.S.time, self.S.prev, l, self.S.status.iter) 

508 ) 

509 self.recv(target=self.S.levels[l], source=self.S.prev, tag=self.S.status.iter, comm=comm) 

510 

511 self.hooks.post_comm(step=self.S, level_number=l) 

512 

513 self.hooks.pre_sweep(step=self.S, level_number=l) 

514 self.S.levels[l].sweep.update_nodes() 

515 self.S.levels[l].sweep.compute_residual() 

516 self.hooks.post_sweep(step=self.S, level_number=l) 

517 

518 # transfer further up the hierarchy 

519 self.S.transfer(source=self.S.levels[l], target=self.S.levels[l + 1]) 

520 

521 # update stage 

522 self.S.status.stage = 'IT_COARSE' 

523 

524 elif stage == 'IT_COARSE': 

525 # sweeps on coarsest level (serial/blocking) 

526 

527 # receive from previous step (if not first) 

528 self.hooks.pre_comm(step=self.S, level_number=len(self.S.levels) - 1) 

529 if not self.S.status.first and not self.S.status.prev_done: 

530 self.logger.debug( 

531 'recv data: process %s, stage %s, time %s, source %s, tag %s, iter %s' 

532 % ( 

533 self.S.status.slot, 

534 self.S.status.stage, 

535 self.S.time, 

536 self.S.prev, 

537 len(self.S.levels) - 1, 

538 self.S.status.iter, 

539 ) 

540 ) 

541 self.recv(target=self.S.levels[-1], source=self.S.prev, tag=self.S.status.iter, comm=comm) 

542 self.hooks.post_comm(step=self.S, level_number=len(self.S.levels) - 1) 

543 

544 # do the sweep 

545 self.hooks.pre_sweep(step=self.S, level_number=len(self.S.levels) - 1) 

546 assert self.S.levels[-1].params.nsweeps == 1, ( 

547 'ERROR: this controller can only work with one sweep on the coarse level, got %s' 

548 % self.S.levels[-1].params.nsweeps 

549 ) 

550 self.S.levels[-1].sweep.update_nodes() 

551 self.S.levels[-1].sweep.compute_residual() 

552 self.hooks.post_sweep(step=self.S, level_number=len(self.S.levels) - 1) 

553 self.S.levels[-1].sweep.compute_end_point() 

554 

555 # send to next step 

556 self.hooks.pre_comm(step=self.S, level_number=len(self.S.levels) - 1) 

557 if not self.S.status.last: 

558 self.logger.debug( 

559 'send data: process %s, stage %s, time %s, target %s, tag %s, iter %s' 

560 % ( 

561 self.S.status.slot, 

562 self.S.status.stage, 

563 self.S.time, 

564 self.S.next, 

565 len(self.S.levels) - 1, 

566 self.S.status.iter, 

567 ) 

568 ) 

569 self.S.levels[-1].uend.send(dest=self.S.next, tag=self.S.status.iter, comm=comm) 

570 self.hooks.post_comm(step=self.S, level_number=len(self.S.levels) - 1, add_to_stats=True) 

571 

572 # update stage 

573 if len(self.S.levels) > 1: # MLSDC or PFASST 

574 self.S.status.stage = 'IT_DOWN' 

575 else: 

576 self.S.status.stage = 'IT_CHECK' # MSSDC 

577 

578 elif stage == 'IT_DOWN': 

579 # prolong corrections down to finest level (parallel) 

580 

581 # receive and sweep on middle levels (except for coarsest level) 

582 for l in range(len(self.S.levels) - 1, 0, -1): 

583 # prolong values 

584 self.S.transfer(source=self.S.levels[l], target=self.S.levels[l - 1]) 

585 

586 # on middle levels: do sweep as usual 

587 if l - 1 > 0: 

588 nsweeps = self.S.levels[l - 1].params.nsweeps 

589 

590 for k in range(nsweeps): 

591 self.hooks.pre_comm(step=self.S, level_number=l - 1) 

592 

593 if self.req_send[l - 1] is not None: 

594 self.req_send[l - 1].wait() 

595 self.S.levels[l - 1].sweep.compute_end_point() 

596 

597 if not self.S.status.last and self.params.fine_comm: 

598 self.logger.debug( 

599 'isend data: process %s, stage %s, time %s, target %s, tag %s, iter %s' 

600 % ( 

601 self.S.status.slot, 

602 self.S.status.stage, 

603 self.S.time, 

604 self.S.next, 

605 l - 1, 

606 self.S.status.iter, 

607 ) 

608 ) 

609 self.req_send[l - 1] = self.S.levels[l - 1].uend.isend( 

610 dest=self.S.next, tag=self.S.status.iter, comm=comm 

611 ) 

612 

613 if not self.S.status.first and not self.S.status.prev_done and self.params.fine_comm: 

614 self.logger.debug( 

615 'recv data: process %s, stage %s, time %s, source %s, tag %s, iter %s' 

616 % ( 

617 self.S.status.slot, 

618 self.S.status.stage, 

619 self.S.time, 

620 self.S.prev, 

621 l - 1, 

622 self.S.status.iter, 

623 ) 

624 ) 

625 self.recv( 

626 target=self.S.levels[l - 1], source=self.S.prev, tag=self.S.status.iter, comm=comm 

627 ) 

628 

629 self.hooks.post_comm(step=self.S, level_number=l - 1, add_to_stats=(k == nsweeps - 1)) 

630 

631 self.hooks.pre_sweep(step=self.S, level_number=l - 1) 

632 self.S.levels[l - 1].sweep.update_nodes() 

633 self.S.levels[l - 1].sweep.compute_residual() 

634 self.hooks.post_sweep(step=self.S, level_number=l - 1) 

635 

636 # update stage 

637 self.S.status.stage = 'IT_FINE' 

638 

639 else: 

640 raise ControllerError('Weird stage, got %s' % self.S.status.stage)