我们从Python开源项目中,提取了以下10个代码示例,用于说明如何使用networkx.subgraph()。
def subgraph(self, nbunch): g = super().subgraph(nbunch) g.sequence_src = self.sequence_src return g
def get_subgraph_for_node(node_name): """ Prints the dependency graph for only the specified node_name (a full dependency graph can be difficult to read). :param node_name: Node for which to print the sub-graph :return: """ ancestors = nx.ancestors(G, node_name) ancestors.add(node_name) return nx.subgraph(G, ancestors)
def print_dependents(graph, preamble_list, imports): """ Print the immediate dependencies (imports/includes), and for each immediate dependency print its dependencies :param graph: Dictionary containing the subgraph of dependencies that we are about to print :param preamble_list: Preamble list, list of string to print out before each dependency (Provides the offset for higher order dependencies) :param imports: List of immediate imports/includes :return: """ # Create the preamble string for the current level preamble = '' for preamble_string in preamble_list: preamble += preamble_string # Print a newline for the current level print(preamble + ' |') for i in range(len(imports)): print(augment_format_string(imports[i], preamble + ' +--> %s') % imports[i]) # Determine if a dependency has dependencies on its own; if yes, # print them out before moving onto the next dependency try: imp_imports = graph[imports[i]] if i < (len(imports) - 1): preamble_list.append(' | ') else: preamble_list.append(' ') print_dependents(graph, preamble_list, imp_imports) preamble_list.pop(-1) # Only print a newline if we're NOT the last processed module if i < (len(imports) - 1): print(preamble + ' |') except KeyError: pass
def plot_module_dependency_graph(graph): """ Plot a graph of specified yang modules. this function is used to plot both the full dependency graph of all yang modules in the DB, or a subgraph of dependencies for a specified module :param graph: Graph to be plotted :return: None """ # fixed_pos = { 'ietf-interfaces':(0.01,0.01) } # fixed_nodes = fixed_pos.keys() # pos = nx.spring_layout(graph, iterations=200, # pos=fixed_pos, fixed=fixed_nodes) #pos = nx.circular_layout(graph) pos = nx.spring_layout(graph, iterations=2000) # Draw RFC nodes (yang modules) in red nx.draw_networkx_nodes(graph, pos=pos, nodelist=prune_graph_nodes(graph, RFC_TAG), node_size=200, node_shape='s', node_color='red', alpha=0.5, linewidths=0.5) # Draw draft nodes (yang modules) in green nx.draw_networkx_nodes(graph, pos=pos, nodelist=prune_graph_nodes(graph, DRAFT_TAG), node_size=200, node_shape='o', node_color='green', alpha=0.5, linewidths=0.5) # Draw unknown nodes (yang modules) in orange nx.draw_networkx_nodes(graph, pos=pos, nodelist=prune_graph_nodes(graph, UNKNOWN_TAG), node_size=200, node_shape='^', node_color='orange', alpha=1.0, linewidths=0.5) # Draw edges in light gray (fairly transparent) nx.draw_networkx_edges(graph, pos=pos, alpha=0.25, linewidths=0.1, arrows=False) # Draw labels on nodes (modules) nx.draw_networkx_labels(graph, pos=pos, font_size=10, font_weight='bold', alpha=1.0)
def subgraph_from(self, targets): '''Trim DAG to keep only nodes that produce targets''' # first, find all nodes with targets subnodes = [] for node in self.nodes(): if not isinstance(node._output_targets, Undetermined) and any(x in node._output_targets for x in targets): subnodes.append(node) # ancestors = set() for node in subnodes: ancestors |= nx.ancestors(self, node) return SoS_DAG(nx.subgraph(self, subnodes + list(ancestors)))
def sample(G): S = [0,0,0,0] a = choice(G.nodes()) b = choice(G.nodes()) c = choice(G.nodes()) d = choice(G.nodes()) Gprime = nx.subgraph(G, [a,b,c,d]) return Gprime
def build_bubblechains(g: AssemblyGraph, min_nodes: int=1) -> Iterable[AssemblyGraph]: # Build dictionary which maps the bubble source to the bubble sink logger.info("Searching for non-nested superbubbles in the assembly " "graph...") bubbles = {b[0]: b[1] for b in find_superbubbles(g, report_nested=False)} bubble_entrances = set(bubbles.keys()) bubble_exits = set(bubbles.values()) logger.debug("Found superbubbles: %s", bubbles) logger.info("Graph has %d superbubbles", len(bubbles)) # Obtain start nodes # Priority is given as follows: # 1. Bubble entrances without incoming edges # 2. Bubble entrances for which holds that it's not also an exit of an # other bubble # 3. Bubble entrances for which the entrance and corresponding exit have # not been visited yet start_points = [ n for n in bubble_entrances if g.in_degree(n) == 0] start_points.extend((n for n in bubble_entrances if n not in bubble_exits)) # We'll check later if this bubble has been visited already start_points.extend(bubble_entrances) logger.info("Number of start points : %d", len(start_points)) visited = set() for start in start_points: subgraph_nodes = set() logger.debug("New start point %s", start) if start not in bubble_entrances: raise AssemblyError("Unexpected start point: {}, this is not a " "bubble entrance.".format(start)) num_bubbles = 0 while start in bubble_entrances: bubble_exit = bubbles[start] if start in visited and bubble_exit in visited: logger.debug("<%s, %s> already visited, stopping.", start, bubble_exit) break bubble_nodes = superbubble_nodes(g, start, bubble_exit) subgraph_nodes.update(bubble_nodes) visited.update(bubble_nodes) start = bubble_exit num_bubbles += 1 if num_bubbles > 0: logger.info("Built bubblechain of %d bubbles", num_bubbles) if len(subgraph_nodes) >= min_nodes: yield networkx.subgraph(g, subgraph_nodes)
def core_substitution(graph, orig_cip_graph, new_cip_graph): """ graph is the whole graph.. subgraph is the interfaceregrion in that we will transplant new_cip_graph which is the interface and the new core """ graph=_edge_to_vertex(graph) assert( set(orig_cip_graph.nodes()) - set(graph.nodes()) == set([]) ), 'orig_cip_graph not in graph' # select only the interfaces of the cips new_graph_interface_nodes = [n for n, d in new_cip_graph.nodes(data=True) if 'core' not in d] new_cip_interface_graph = nx.subgraph(new_cip_graph, new_graph_interface_nodes) original_graph_interface_nodes = [n for n, d in orig_cip_graph.nodes(data=True) if 'core' not in d] original_interface_graph = nx.subgraph(orig_cip_graph, original_graph_interface_nodes) # get isomorphism between interfaces, if none is found we return an empty graph iso = get_good_isomorphism(graph, orig_cip_graph, new_cip_graph, original_interface_graph, new_cip_interface_graph) if len(iso) != len(original_interface_graph): # print iso # draw.display(orig_cip_graph) # draw.display(new_cip_graph) #draw.graphlearn([orig_cip_graph, new_cip_graph],size=10) logger.log(5,"grammar hash collision, discovered in 'core_substution' ") return None # ok we got an isomorphism so lets do the merging graph = nx.union(graph, new_cip_graph, rename=('', '-')) # removing old core # original_graph_core_nodes = [n for n, d in orig_cip_graph.nodes(data=True) if 'core' in d] original_graph_core_nodes = [n for n, d in orig_cip_graph.nodes(data=True) if 'core' in d] for n in original_graph_core_nodes: graph.remove_node(str(n)) # merge interfaces for k, v in iso.iteritems(): #graph.node[str(k)][ # 'intgggerface'] = True # i am marking the interface only for the backflow probability calculation in graphlearn, this is probably deleteable because we also do this in merge, also this line is superlong Ooo merge(graph, str(k), '-' + str(v)) # unionizing killed my labels so we need to relabel graph=eg._revert_edge_to_vertex_transform(graph) re = nx.convert_node_labels_to_integers(graph) graph_clean(re) return re