Coverage for pySDC/helpers/transfer_helper.py: 77%

142 statements  

« prev     ^ index     » next       coverage.py v7.6.1, created at 2024-09-19 09:13 +0000

1# coding=utf-8 

2import numpy as np 

3import scipy.sparse as sprs 

4from scipy.interpolate import BarycentricInterpolator 

5 

6 

7def next_neighbors_periodic(p, ps, k): 

8 """ 

9 Function to find the next neighbors for a periodic setup 

10 

11 This function gives for a value p the k points next to it which are found in 

12 in the vector ps and the points which are found periodically. 

13 

14 Args: 

15 p: the current point 

16 ps (np.ndarray): the grid with the potential neighbors 

17 k (int): number of neighbors to find 

18 

19 Returns: 

20 list: the k next neighbors 

21 """ 

22 p_bar = p - np.floor(p / 1.0) * 1.0 

23 ps = ps - ps[0] 

24 distance_to_p = np.asarray( 

25 list(map(lambda tk: min([np.abs(tk + 1 - p_bar), np.abs(tk - p_bar), np.abs(tk - 1 - p_bar)]), ps)) 

26 ) 

27 

28 # zip it 

29 value_index = [] 

30 for d, i in zip(distance_to_p, range(distance_to_p.size)): 

31 value_index.append((d, i)) 

32 # sort by distance 

33 value_index_sorted = sorted(value_index, key=lambda s: s[0]) 

34 # take first k indices with least distance and sort them 

35 return sorted(map(lambda s: s[1], value_index_sorted[0:k])) 

36 

37 

38def next_neighbors(p, ps, k): 

39 """ 

40 Function to find the next neighbors for a non-periodic setup 

41 

42 This function gives for a value p the k points next to it which are found in 

43 in the vector ps 

44 

45 Args: 

46 p: the current point 

47 ps (np.ndarray): the grid with the potential neighbors 

48 k (int): number of neighbors to find 

49 

50 Returns: 

51 list: the k next neighbors 

52 """ 

53 distance_to_p = np.abs(ps - p) 

54 # zip it 

55 value_index = [] 

56 for d, i in zip(distance_to_p, range(distance_to_p.size)): 

57 value_index.append((d, i)) 

58 # sort by distance 

59 value_index_sorted = sorted(value_index, key=lambda s: s[0]) 

60 # take first k indices with least distance and sort them 

61 return sorted(map(lambda s: s[1], value_index_sorted[0:k])) 

62 

63 

64def continue_periodic_array(arr, nn): 

65 """ 

66 Function to append an array for nn neighbors for periodicity 

67 

68 Args: 

69 arr (np.ndarray): the input array 

70 nn (list): the neighbors 

71 

72 Returns: 

73 np.ndarray: the continued array 

74 """ 

75 

76 nn = np.asarray(nn) 

77 d_nn = nn[1:] - nn[:-1] 

78 if np.all(d_nn == np.ones(nn.shape[0] - 1)): 

79 return arr[nn] 

80 else: 

81 cont_arr = [arr[nn[0]]] 

82 shift = 0.0 

83 for n, d in zip(nn[1:], d_nn): 

84 if d != 1: 

85 shift = -1 

86 cont_arr.append(arr[n] + shift) 

87 

88 return np.asarray(cont_arr) 

89 

90 

91def restriction_matrix_1d(fine_grid, coarse_grid, k=2, periodic=False, pad=1): 

92 """ 

93 Function to contruct the restriction matrix in 1d using barycentric interpolation 

94 

95 Args: 

96 fine_grid (np.ndarray): a one dimensional 1d array containing the nodes of the fine grid 

97 coarse_grid (np.ndarray): a one dimensional 1d array containing the nodes of the coarse grid 

98 k (int): order of the restriction 

99 periodic (bool): flag to indicate periodicity 

100 pad (int): padding parameter for boundaries 

101 

102 Returns: 

103 sprs.csc_matrix: restriction matrix 

104 """ 

105 

106 n_g = coarse_grid.size 

107 

108 if periodic: 

109 M = np.zeros((coarse_grid.size, fine_grid.size)) 

110 for i, p in zip(range(n_g), coarse_grid): 

111 nn = next_neighbors_periodic(p, fine_grid, k) 

112 circulating_one = np.asarray([1.0] + [0.0] * (k - 1)) 

113 cont_arr = continue_periodic_array(fine_grid, nn) 

114 if p > np.mean(coarse_grid) and not (cont_arr[0] <= p <= cont_arr[-1]): 

115 cont_arr += 1 

116 bary_pol = [] 

117 for l in range(k): 

118 bary_pol.append(BarycentricInterpolator(cont_arr, np.roll(circulating_one, l))) 

119 with np.errstate(divide='ignore'): 

120 M[i, nn] = np.asarray(list(map(lambda x: x(p), bary_pol))) 

121 else: 

122 M = np.zeros((coarse_grid.size, fine_grid.size + 2 * pad)) 

123 for i, p in zip(range(n_g), coarse_grid): 

124 padded_f_grid = border_padding(fine_grid, pad, pad) 

125 nn = next_neighbors(p, padded_f_grid, k) 

126 # construct the lagrange polynomials for the k neighbors 

127 circulating_one = np.asarray([1.0] + [0.0] * (k - 1)) 

128 bary_pol = [] 

129 for l in range(k): 

130 bary_pol.append(BarycentricInterpolator(padded_f_grid[nn], np.roll(circulating_one, l))) 

131 with np.errstate(divide='ignore'): 

132 M[i, nn] = np.asarray(list(map(lambda x: x(p), bary_pol))) 

133 if pad > 0: 

134 M = M[:, pad:-pad] 

135 

136 return sprs.csc_matrix(M) 

137 

138 

139def interpolation_matrix_1d(fine_grid, coarse_grid, k=2, periodic=False, pad=1, equidist_nested=True): 

140 """ 

141 Function to construct the restriction matrix in 1d using barycentric interpolation 

142 

143 Args: 

144 fine_grid (np.ndarray): a one dimensional 1d array containing the nodes of the fine grid 

145 coarse_grid (np.ndarray): a one dimensional 1d array containing the nodes of the coarse grid 

146 k (int): order of the restriction 

147 periodic (bool): flag to indicate periodicity 

148 pad (int): padding parameter for boundaries 

149 equidist_nested (bool): shortcut possible, if nodes are equidistant and nested 

150 

151 Returns: 

152 sprs.csc_matrix: interpolation matrix 

153 """ 

154 

155 n_f = fine_grid.size 

156 

157 if periodic: 

158 M = np.zeros((fine_grid.size, coarse_grid.size)) 

159 

160 if equidist_nested: 

161 for i, p in zip(range(n_f), fine_grid): 

162 if i % 2 == 0: 

163 M[i, int(i / 2)] = 1.0 

164 else: 

165 nn = [] 

166 cpos = int(i / 2) 

167 offset = int(k / 2) 

168 for j in range(k): 

169 nn.append(cpos - offset + 1 + j) 

170 if nn[-1] < 0: 

171 nn[-1] += coarse_grid.size 

172 elif nn[-1] > coarse_grid.size - 1: 

173 nn[-1] -= coarse_grid.size 

174 nn = sorted(nn) 

175 

176 circulating_one = np.asarray([1.0] + [0.0] * (k - 1)) 

177 if len(nn) > 0: 

178 cont_arr = continue_periodic_array(coarse_grid, nn) 

179 else: 

180 cont_arr = coarse_grid 

181 

182 if p > np.mean(fine_grid) and not (cont_arr[0] <= p <= cont_arr[-1]): 

183 cont_arr += 1 

184 

185 bary_pol = [] 

186 for l in range(k): 

187 bary_pol.append(BarycentricInterpolator(cont_arr, np.roll(circulating_one, l))) 

188 with np.errstate(divide='ignore'): 

189 M[i, nn] = np.asarray(list(map(lambda x: x(p), bary_pol))) 

190 

191 else: 

192 for i, p in zip(range(n_f), fine_grid): 

193 nn = next_neighbors_periodic(p, coarse_grid, k) 

194 circulating_one = np.asarray([1.0] + [0.0] * (k - 1)) 

195 cont_arr = continue_periodic_array(coarse_grid, nn) 

196 

197 if p > np.mean(fine_grid) and not (cont_arr[0] <= p <= cont_arr[-1]): 

198 cont_arr += 1 

199 

200 bary_pol = [] 

201 for l in range(k): 

202 bary_pol.append(BarycentricInterpolator(cont_arr, np.roll(circulating_one, l))) 

203 with np.errstate(divide='ignore'): 

204 M[i, nn] = np.asarray(list(map(lambda x: x(p), bary_pol))) 

205 

206 else: 

207 M = np.zeros((fine_grid.size, coarse_grid.size + 2 * pad)) 

208 padded_c_grid = border_padding(coarse_grid, pad, pad) 

209 

210 if equidist_nested: 

211 for i, p in zip(range(n_f), fine_grid): 

212 if i % 2 != 0: 

213 M[i, int((i - 1) / 2) + 1] = 1.0 

214 else: 

215 nn = [] 

216 cpos = int(i / 2) 

217 offset = int(k / 2) 

218 for j in range(k): 

219 nn.append(cpos - offset + 1 + j) 

220 if nn[-1] < 0: 

221 nn[-1] += k 

222 elif nn[-1] > coarse_grid.size + 1: 

223 nn[-1] -= k 

224 nn = sorted(nn) 

225 # construct the lagrange polynomials for the k neighbors 

226 circulating_one = np.asarray([1.0] + [0.0] * (k - 1)) 

227 bary_pol = [] 

228 for l in range(k): 

229 bary_pol.append(BarycentricInterpolator(padded_c_grid[nn], np.roll(circulating_one, l))) 

230 with np.errstate(divide='ignore'): 

231 M[i, nn] = np.asarray(list(map(lambda x: x(p), bary_pol))) 

232 

233 else: 

234 for i, p in zip(range(n_f), fine_grid): 

235 nn = next_neighbors(p, padded_c_grid, k) 

236 # construct the lagrange polynomials for the k neighbors 

237 circulating_one = np.asarray([1.0] + [0.0] * (k - 1)) 

238 bary_pol = [] 

239 for l in range(k): 

240 bary_pol.append(BarycentricInterpolator(padded_c_grid[nn], np.roll(circulating_one, l))) 

241 with np.errstate(divide='ignore'): 

242 M[i, nn] = np.asarray(list(map(lambda x: x(p), bary_pol))) 

243 

244 if pad > 0: 

245 M = M[:, pad:-pad] 

246 

247 return sprs.csc_matrix(M) 

248 

249 

250def border_padding(grid, l, r, pad_type='mirror'): 

251 """ 

252 Function to pad/embed an array at the boundaries 

253 

254 Args: 

255 grid (np.npdarray): the input array 

256 l: left boundary 

257 r: right boundary 

258 pad_type: type of padding 

259 

260 Returns: 

261 np.npdarray: the padded array 

262 

263 """ 

264 

265 assert l < grid.size and r < grid.size 

266 padded_arr = np.zeros(grid.size + l + r) 

267 if pad_type == 'mirror': 

268 for i in range(l): 

269 padded_arr[i] = 2 * grid[0] - grid[l - i] 

270 for j in range(r): 

271 padded_arr[-j - 1] = 2 * grid[-1] - grid[-r + j - 1] 

272 padded_arr[l : l + grid.size] = grid 

273 return padded_arr