Coverage for include / graph.py: 86%

86 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 16:26 +0000

1import h5py as h5 

2import networkx as nx 

3import numpy as np 

4from typing import Literal 

5 

6""" Network generation function """ 

7 

8 

9def generate_graph( 

10 *, 

11 N: int, 

12 mean_degree: int = None, 

13 type: Literal["random", "BarabasiAlbert", "BollobasRiordan", "WattsStrogatz", "Star", "Regular"], 

14 seed: int = None, 

15 graph_props: dict = None, 

16) -> nx.Graph: 

17 """Generates graphs of a given topology 

18 

19 :param N: number of nodes 

20 :param mean_degree: mean degree of the graph 

21 :param type: graph topology kind; can be any of 'random', 'BarabasiAlbert' (scale-free undirected), 'BollobasRiordan' 

22 (scale-free directed), 'WattsStrogatz' (small-world) 

23 :param seed: the random seed to use for the graph generation (ensuring the graphs are always the same) 

24 :param graph_props: dictionary containing the type-specific parameters 

25 :return: the networkx graph object. All graphs are fully connected 

26 TODO: graph can have isolated components! 

27 """ 

28 

29 def _connect_isolates(G: nx.Graph) -> nx.Graph: 

30 """Connects isolated vertices to main network body.""" 

31 isolates = list(nx.isolates(G)) 

32 N = G.number_of_nodes() 

33 for v in isolates: 

34 new_nb = np.random.randint(0, N) 

35 while new_nb in isolates: 

36 new_nb = np.random.randint(0, N) 

37 G.add_edge(v, new_nb) 

38 

39 return G 

40 

41 # Random graph 

42 if type.lower() == "random": 

43 is_directed: bool = graph_props["is_directed"] 

44 

45 G = _connect_isolates( 

46 nx.fast_gnp_random_graph( 

47 N, mean_degree / (N - 1), directed=is_directed, seed=seed 

48 ) 

49 ) 

50 

51 # Undirected scale-free graph 

52 elif type.lower() == "barabasialbert": 

53 G = _connect_isolates(nx.barabasi_albert_graph(N, mean_degree, seed)) 

54 

55 # Directed scale-free graph 

56 elif type.lower() == "bollobasriordan": 

57 G = nx.DiGraph( 

58 _connect_isolates( 

59 nx.scale_free_graph(N, **graph_props["BollobasRiordan"], seed=seed) 

60 ) 

61 ) 

62 

63 # Small-world (Watts-Strogatz) graph 

64 elif type.lower() == "wattsstrogatz": 

65 p: float = graph_props["WattsStrogatz"]["p_rewire"] 

66 

67 G = _connect_isolates(nx.watts_strogatz_graph(N, mean_degree, p, seed)) 

68 

69 # Star graph 

70 elif type.lower() == "star": 

71 G = nx.star_graph(N) 

72 

73 # Regular graph 

74 elif type.lower() == "regular": 

75 G = nx.random_regular_graph(mean_degree, N, seed) 

76 

77 # Raise error upon unrecognised graph type 

78 else: 

79 raise ValueError(f"Unrecognised graph type '{type}'!") 

80 

81 # Add random uniform weights to the edges 

82 if graph_props["is_weighted"]: 

83 for e in G.edges(): 

84 G[e[0]][e[1]]["weight"] = np.random.rand() 

85 

86 return G 

87 

88 

89def save_nw( 

90 network: nx.Graph, 

91 nw_group: h5.Group, 

92 *, 

93 write_adjacency_matrix: bool = False, 

94 static: bool = True, 

95): 

96 """Saves a network to a h5.Group graph group. The network can be either dynamic of static, 

97 static in this case meaning that none of the datasets have a time dimension. 

98 

99 :param network: the network to save 

100 :param nw_group: the h5.Group 

101 :param write_adjacency_matrix: whether to write out the entire adjacency matrix 

102 :param static: whether the network is dynamic or static 

103 """ 

104 

105 # Vertices 

106 vertices = nw_group.create_dataset( 

107 "_vertices", 

108 (network.number_of_nodes(),) if static else (1, network.number_of_nodes()), 

109 maxshape=(network.number_of_nodes(),) 

110 if static 

111 else (None, network.number_of_nodes()), 

112 chunks=True, 

113 compression=3, 

114 dtype=int, 

115 ) 

116 vertices.attrs["dim_names"] = ["vertex_idx"] if static else ["time", "vertex_idx"] 

117 vertices.attrs["coords_mode__vertex_idx"] = "trivial" 

118 

119 # Edges; the network size is assumed to remain constant 

120 edges = nw_group.create_dataset( 

121 "_edges", 

122 (network.size(), 2) if static else (1, network.size(), 2), 

123 maxshape=(network.size(), 2) if static else (None, network.size(), 2), 

124 chunks=True, 

125 compression=3, 

126 ) 

127 edges.attrs["dim_names"] = ( 

128 ["edge_idx", "vertex_idx"] if static else ["time", "edge_idx", "vertex_idx"] 

129 ) 

130 edges.attrs["coords_mode__edge_idx"] = "trivial" 

131 edges.attrs["coords_mode__vertex_idx"] = "trivial" 

132 

133 # Edge properties 

134 edge_weights = nw_group.create_dataset( 

135 "_edge_weights", 

136 (network.size(),) if static else (1, network.size()), 

137 maxshape=(network.size(),) if static else (None, network.size()), 

138 chunks=True, 

139 compression=3, 

140 ) 

141 edge_weights.attrs["dim_names"] = ["edge_idx"] if static else ["time", "edge_idx"] 

142 edge_weights.attrs["coords_mode__edge_idx"] = "trivial" 

143 

144 # Topological properties 

145 degree = nw_group.create_dataset( 

146 "_degree", 

147 (network.number_of_nodes(),) if static else (1, network.number_of_nodes()), 

148 maxshape=(network.number_of_nodes(),) 

149 if static 

150 else (None, network.number_of_nodes()), 

151 chunks=True, 

152 compression=3, 

153 dtype=int, 

154 ) 

155 degree.attrs["dim_names"] = ["vertex_idx"] if static else ["time", "vertex_idx"] 

156 degree.attrs["coords_mode__vertex_idx"] = "trivial" 

157 

158 degree_w = nw_group.create_dataset( 

159 "_degree_weighted", 

160 (network.number_of_nodes(),) if static else (1, network.number_of_nodes()), 

161 maxshape=(network.number_of_nodes(),) 

162 if static 

163 else (None, network.number_of_nodes()), 

164 chunks=True, 

165 compression=3, 

166 dtype=float, 

167 ) 

168 degree_w.attrs["dim_names"] = ["vertex_idx"] if static else ["time", "vertex_idx"] 

169 degree_w.attrs["coords_mode__vertex_idx"] = "trivial" 

170 

171 triangles = nw_group.create_dataset( 

172 "_triangles", 

173 (network.number_of_nodes(),) if static else (1, network.number_of_nodes()), 

174 maxshape=(network.number_of_nodes(),) 

175 if static 

176 else (None, network.number_of_nodes()), 

177 chunks=True, 

178 compression=3, 

179 dtype=float, 

180 ) 

181 triangles.attrs["dim_names"] = ["vertex_idx"] if static else ["time", "vertex_idx"] 

182 triangles.attrs["coords_mode__vertex_idx"] = "trivial" 

183 

184 triangles_w = nw_group.create_dataset( 

185 "_triangles_weighted", 

186 (network.number_of_nodes(),) if static else (1, network.number_of_nodes()), 

187 maxshape=(network.number_of_nodes(),) 

188 if static 

189 else (None, network.number_of_nodes()), 

190 chunks=True, 

191 compression=3, 

192 dtype=float, 

193 ) 

194 triangles_w.attrs["dim_names"] = ( 

195 ["vertex_idx"] if static else ["time", "vertex_idx"] 

196 ) 

197 triangles_w.attrs["coords_mode__vertex_idx"] = "trivial" 

198 

199 if not static: 

200 for dset in [ 

201 vertices, 

202 edges, 

203 edge_weights, 

204 degree, 

205 degree_w, 

206 triangles, 

207 triangles_w, 

208 ]: 

209 dset["coords_mode__time"] = "trivial" 

210 

211 # Write network properties 

212 if static: 

213 vertices[:] = network.nodes() 

214 edges[:, :] = network.edges() 

215 edge_weights[:] = list(nx.get_edge_attributes(network, "weight").values()) 

216 degree[:] = [network.degree(i) for i in network.nodes()] 

217 degree_w[:] = [deg[1] for deg in network.degree(weight="weight")] 

218 triangles[:] = [ 

219 val 

220 for val in np.diagonal( 

221 np.linalg.matrix_power(np.ceil(nx.to_numpy_array(network)), 3) 

222 ) 

223 ] 

224 triangles_w[:] = [ 

225 val 

226 for val in np.diagonal( 

227 np.linalg.matrix_power(nx.to_numpy_array(network), 3) 

228 ) 

229 ] 

230 else: 

231 vertices[0, :] = network.nodes() 

232 edges[0, :, :] = network.edges() 

233 edge_weights[0, :] = list(nx.get_edge_attributes(network, "weight").values()) 

234 degree[0, :] = [network.degree(i) for i in network.nodes()] 

235 degree_w[0, :] = [deg[1] for deg in network.degree(weight="weight")] 

236 triangles[0, :] = [ 

237 val 

238 for val in np.diagonal( 

239 np.linalg.matrix_power(np.ceil(nx.to_numpy_array(network)), 3) 

240 ) 

241 ] 

242 triangles_w[0, :] = [ 

243 val 

244 for val in np.diagonal( 

245 np.linalg.matrix_power(nx.to_numpy_array(network), 3) 

246 ) 

247 ] 

248 

249 if write_adjacency_matrix: 

250 adj_matrix = nx.to_numpy_array(network) 

251 

252 # Adjacency matrix: only written out if explicity specified 

253 adjacency_matrix = nw_group.create_dataset( 

254 "_adjacency_matrix", 

255 np.shape(adj_matrix) if static else [1] + list(np.shape(adj_matrix)), 

256 chunks=True, 

257 compression=3, 

258 ) 

259 adjacency_matrix.attrs["dim_names"] = ( 

260 ["i", "j"] if static else ["time", "i", "j"] 

261 ) 

262 adjacency_matrix.attrs["coords_mode__i"] = "trivial" 

263 adjacency_matrix.attrs["coords_mode__j"] = "trivial" 

264 if not static: 

265 adjacency_matrix.attrs["coords_mode__time"] = "trivial" 

266 

267 if static: 

268 adjacency_matrix[:, :] = adj_matrix 

269 else: 

270 adjacency_matrix[0, :, :] = adj_matrix