# Coverage for pySDC/helpers/transfer_helper.py: 77%

## 142 statements

, created at 2024-09-19 09:13 +0000

1# coding=utf-8

2import numpy as np

3import scipy.sparse as sprs

4from scipy.interpolate import BarycentricInterpolator

7def next_neighbors_periodic(p, ps, k):

8 """

9 Function to find the next neighbors for a periodic setup

11 This function gives for a value p the k points next to it which are found in

12 in the vector ps and the points which are found periodically.

14 Args:

15 p: the current point

16 ps (np.ndarray): the grid with the potential neighbors

17 k (int): number of neighbors to find

19 Returns:

20 list: the k next neighbors

21 """

22 p_bar = p - np.floor(p / 1.0) * 1.0

23 ps = ps - ps[0]

24 distance_to_p = np.asarray(

25 list(map(lambda tk: min([np.abs(tk + 1 - p_bar), np.abs(tk - p_bar), np.abs(tk - 1 - p_bar)]), ps))

26 )

28 # zip it

29 value_index = []

30 for d, i in zip(distance_to_p, range(distance_to_p.size)):

31 value_index.append((d, i))

32 # sort by distance

33 value_index_sorted = sorted(value_index, key=lambda s: s[0])

34 # take first k indices with least distance and sort them

35 return sorted(map(lambda s: s[1], value_index_sorted[0:k]))

38def next_neighbors(p, ps, k):

39 """

40 Function to find the next neighbors for a non-periodic setup

42 This function gives for a value p the k points next to it which are found in

43 in the vector ps

45 Args:

46 p: the current point

47 ps (np.ndarray): the grid with the potential neighbors

48 k (int): number of neighbors to find

50 Returns:

51 list: the k next neighbors

52 """

53 distance_to_p = np.abs(ps - p)

54 # zip it

55 value_index = []

56 for d, i in zip(distance_to_p, range(distance_to_p.size)):

57 value_index.append((d, i))

58 # sort by distance

59 value_index_sorted = sorted(value_index, key=lambda s: s[0])

60 # take first k indices with least distance and sort them

61 return sorted(map(lambda s: s[1], value_index_sorted[0:k]))

64def continue_periodic_array(arr, nn):

65 """

66 Function to append an array for nn neighbors for periodicity

68 Args:

69 arr (np.ndarray): the input array

70 nn (list): the neighbors

72 Returns:

73 np.ndarray: the continued array

74 """

76 nn = np.asarray(nn)

77 d_nn = nn[1:] - nn[:-1]

78 if np.all(d_nn == np.ones(nn.shape[0] - 1)):

79 return arr[nn]

80 else:

81 cont_arr = [arr[nn[0]]]

82 shift = 0.0

83 for n, d in zip(nn[1:], d_nn):

84 if d != 1:

85 shift = -1

86 cont_arr.append(arr[n] + shift)

88 return np.asarray(cont_arr)

91def restriction_matrix_1d(fine_grid, coarse_grid, k=2, periodic=False, pad=1):

92 """

93 Function to contruct the restriction matrix in 1d using barycentric interpolation

95 Args:

96 fine_grid (np.ndarray): a one dimensional 1d array containing the nodes of the fine grid

97 coarse_grid (np.ndarray): a one dimensional 1d array containing the nodes of the coarse grid

98 k (int): order of the restriction

99 periodic (bool): flag to indicate periodicity

102 Returns:

103 sprs.csc_matrix: restriction matrix

104 """

106 n_g = coarse_grid.size

108 if periodic:

109 M = np.zeros((coarse_grid.size, fine_grid.size))

110 for i, p in zip(range(n_g), coarse_grid):

111 nn = next_neighbors_periodic(p, fine_grid, k)

112 circulating_one = np.asarray([1.0] + [0.0] * (k - 1))

113 cont_arr = continue_periodic_array(fine_grid, nn)

114 if p > np.mean(coarse_grid) and not (cont_arr[0] <= p <= cont_arr[-1]):

115 cont_arr += 1

116 bary_pol = []

117 for l in range(k):

118 bary_pol.append(BarycentricInterpolator(cont_arr, np.roll(circulating_one, l)))

119 with np.errstate(divide='ignore'):

120 M[i, nn] = np.asarray(list(map(lambda x: x(p), bary_pol)))

121 else:

122 M = np.zeros((coarse_grid.size, fine_grid.size + 2 * pad))

123 for i, p in zip(range(n_g), coarse_grid):

125 nn = next_neighbors(p, padded_f_grid, k)

126 # construct the lagrange polynomials for the k neighbors

127 circulating_one = np.asarray([1.0] + [0.0] * (k - 1))

128 bary_pol = []

129 for l in range(k):

131 with np.errstate(divide='ignore'):

132 M[i, nn] = np.asarray(list(map(lambda x: x(p), bary_pol)))

136 return sprs.csc_matrix(M)

139def interpolation_matrix_1d(fine_grid, coarse_grid, k=2, periodic=False, pad=1, equidist_nested=True):

140 """

141 Function to construct the restriction matrix in 1d using barycentric interpolation

143 Args:

144 fine_grid (np.ndarray): a one dimensional 1d array containing the nodes of the fine grid

145 coarse_grid (np.ndarray): a one dimensional 1d array containing the nodes of the coarse grid

146 k (int): order of the restriction

147 periodic (bool): flag to indicate periodicity

149 equidist_nested (bool): shortcut possible, if nodes are equidistant and nested

151 Returns:

152 sprs.csc_matrix: interpolation matrix

153 """

155 n_f = fine_grid.size

157 if periodic:

158 M = np.zeros((fine_grid.size, coarse_grid.size))

160 if equidist_nested:

161 for i, p in zip(range(n_f), fine_grid):

162 if i % 2 == 0:

163 M[i, int(i / 2)] = 1.0

164 else:

165 nn = []

166 cpos = int(i / 2)

167 offset = int(k / 2)

168 for j in range(k):

169 nn.append(cpos - offset + 1 + j)

170 if nn[-1] < 0:

171 nn[-1] += coarse_grid.size

172 elif nn[-1] > coarse_grid.size - 1:

173 nn[-1] -= coarse_grid.size

174 nn = sorted(nn)

176 circulating_one = np.asarray([1.0] + [0.0] * (k - 1))

177 if len(nn) > 0:

178 cont_arr = continue_periodic_array(coarse_grid, nn)

179 else:

180 cont_arr = coarse_grid

182 if p > np.mean(fine_grid) and not (cont_arr[0] <= p <= cont_arr[-1]):

183 cont_arr += 1

185 bary_pol = []

186 for l in range(k):

187 bary_pol.append(BarycentricInterpolator(cont_arr, np.roll(circulating_one, l)))

188 with np.errstate(divide='ignore'):

189 M[i, nn] = np.asarray(list(map(lambda x: x(p), bary_pol)))

191 else:

192 for i, p in zip(range(n_f), fine_grid):

193 nn = next_neighbors_periodic(p, coarse_grid, k)

194 circulating_one = np.asarray([1.0] + [0.0] * (k - 1))

195 cont_arr = continue_periodic_array(coarse_grid, nn)

197 if p > np.mean(fine_grid) and not (cont_arr[0] <= p <= cont_arr[-1]):

198 cont_arr += 1

200 bary_pol = []

201 for l in range(k):

202 bary_pol.append(BarycentricInterpolator(cont_arr, np.roll(circulating_one, l)))

203 with np.errstate(divide='ignore'):

204 M[i, nn] = np.asarray(list(map(lambda x: x(p), bary_pol)))

206 else:

207 M = np.zeros((fine_grid.size, coarse_grid.size + 2 * pad))

210 if equidist_nested:

211 for i, p in zip(range(n_f), fine_grid):

212 if i % 2 != 0:

213 M[i, int((i - 1) / 2) + 1] = 1.0

214 else:

215 nn = []

216 cpos = int(i / 2)

217 offset = int(k / 2)

218 for j in range(k):

219 nn.append(cpos - offset + 1 + j)

220 if nn[-1] < 0:

221 nn[-1] += k

222 elif nn[-1] > coarse_grid.size + 1:

223 nn[-1] -= k

224 nn = sorted(nn)

225 # construct the lagrange polynomials for the k neighbors

226 circulating_one = np.asarray([1.0] + [0.0] * (k - 1))

227 bary_pol = []

228 for l in range(k):

230 with np.errstate(divide='ignore'):

231 M[i, nn] = np.asarray(list(map(lambda x: x(p), bary_pol)))

233 else:

234 for i, p in zip(range(n_f), fine_grid):

235 nn = next_neighbors(p, padded_c_grid, k)

236 # construct the lagrange polynomials for the k neighbors

237 circulating_one = np.asarray([1.0] + [0.0] * (k - 1))

238 bary_pol = []

239 for l in range(k):

241 with np.errstate(divide='ignore'):

242 M[i, nn] = np.asarray(list(map(lambda x: x(p), bary_pol)))

247 return sprs.csc_matrix(M)

251 """

252 Function to pad/embed an array at the boundaries

254 Args:

255 grid (np.npdarray): the input array

256 l: left boundary

257 r: right boundary

260 Returns:

263 """

265 assert l < grid.size and r < grid.size

266 padded_arr = np.zeros(grid.size + l + r)