Python util 模块,PriorityQueue() 实例源码


def computeDistances(layout):
    "Runs UCS to all other positions from each position"
    distances = {}
    allNodes = layout.walls.asList(False)
    for source in allNodes:
        dist = {}
        closed = {}
        for node in allNodes:
            dist[node] = sys.maxint
        import util
        queue = util.PriorityQueue()
        queue.push(source, 0)
        dist[source] = 0
        while not queue.isEmpty():
            node = queue.pop()
            if node in closed:
            closed[node] = True
            nodeDist = dist[node]
            adjacent = []
            x, y = node
            if not layout.isWall((x,y+1)):
            if not layout.isWall((x,y-1)):
                adjacent.append((x,y-1) )
            if not layout.isWall((x+1,y)):
                adjacent.append((x+1,y) )
            if not layout.isWall((x-1,y)):
            for other in adjacent:
                if not other in dist:
                oldDist = dist[other]
                newDist = nodeDist+1
                if newDist < oldDist:
                    dist[other] = newDist
                    queue.push(other, newDist)
        for target in allNodes:
            distances[(target, source)] = dist[target]
    return distances
def uniformCostSearch(problem):
  "Search the node of least total cost first. "
  frontier = PriorityQueue()
  startNode = Node((problem.getStartState(), None, None))

  #Check if start node is goal
  if problem.isGoalState(startNode.state):
      return []

  for successors in problem.getSuccessors(problem.getStartState()):
      newNode = Node(successors, startNode)
      frontier.push(newNode, 0)

  explored = list()

  while not frontier.isEmpty():
      leafNode = frontier.pop()
      if problem.isGoalState(leafNode.state):
          return leafNode.getPath()
      for successor in problem.getSuccessors(leafNode.state):
          newNode = Node(successor, leafNode)
          if newNode.state not in frontier.stateList and newNode.state not in explored:
              frontier.push(newNode, newNode.pathCost)
  return []
def aStarSearch(problem, heuristic=nullHeuristic):
    "Search the node that has the lowest combined cost and heuristic first."

    # Use the genericSearch method, with the fringe maintained with a 
    # PriorityQueue. The cost is calculated using the provided heuristic. 
    # If no heuristic is given (such as UCS), then default to the given
    # nullHeuristic
    return genericSearch(problem, util.PriorityQueue(), heuristic)

def uniformCostSearch(problem):
    """Search the node of least total cost first."""
    "*** YOUR CODE HERE ***"
    startState = problem.getStartState()
    visited = set()
    actions = []
    fringe = util.PriorityQueue()
    fringe.push((startState, None, None, actions), 0)

    while not fringe.isEmpty():
        currPath = fringe.pop()
        currState = currPath[0]
        action = currPath[1]
        stepCost = currPath[2]
        actions = currPath[3]
        if problem.isGoalState(currState):
            return actions
        if not currState in visited:
            paths = problem.getSuccessors(currState)
            for path in paths:
                if not path[0] in visited:
                    newActions = list(actions)
                    fringe.push((path[0],path[1],path[2],newActions), problem.getCostOfActions(newActions))
def aStarSearch(problem, heuristic=nullHeuristic):
    """Search the node that has the lowest combined cost and heuristic first."""
    "*** YOUR CODE HERE ***"
    startState = problem.getStartState()
    visited = set()
    fringe = util.PriorityQueue()
    fringe.push((startState, ()), heuristic(startState,problem))

    while not fringe.isEmpty():
        currNode = fringe.pop()
        currState = currNode[0]
        currPlan = currNode[1]
        if problem.isGoalState(currState):
            return list(currPlan)
        if not currState in visited:
            paths = problem.getSuccessors(currState)
            for path in paths:
                newPlan = list(currPlan)
                nextNode = (path[0], tuple(newPlan))
                if not path[0] in visited:
                    fringe.push(nextNode, heuristic(path[0],problem) 
                            + problem.getCostOfActions(newPlan)) 

def uniformCostSearch(problem):
    """Search the node of least total cost first."""
    "*** YOUR CODE HERE ***"

    "we use PriorityQueue data structure, 'smart queue' "
    "struture and help us to find the lower cost of action"
    "use PriorityQueue to detect which route is shortest, and pop the shortest route"

    nodePriorityQueue = util.PriorityQueue()
    visited = []
    path = []
    totalCost = 0
    startNode =(problem.getStartState(),path)

    "start while loop to find the correct path"

    while  nodePriorityQueue.isEmpty() is False:
        node,path = nodePriorityQueue.pop()


        if problem.isGoalState(node):
            return path

        for successor, direction, cost in problem.getSuccessors(node) :
            if successor not in visited:
                newNode =(successor,path + [direction])
                nodePriorityQueue.push(newNode,problem.getCostOfActions(path + [direction]))
            if problem.isGoalState(successor):
                newNode =(successor,path + [direction])
                nodePriorityQueue.push(newNode,problem.getCostOfActions(path + [direction]))

    return None

def aStarSearch(problem, heuristic=nullHeuristic):

    """Search the node that has the lowest combined cost and heuristic first."""
    "*** YOUR CODE HERE ***"
    "it is very similar to UCS, just change the totalcost to the heuristic + cost  "
    nodePriorityQueue = util.PriorityQueue()
    visited = []
    path = []
    totalCost = heuristic(problem.getStartState(),problem)
    startNode =(problem.getStartState(),path)

    "start while loop to find the correct path"

    while  nodePriorityQueue.isEmpty() is False:
        node,path = nodePriorityQueue.pop()


        if problem.isGoalState(node):
            return path
        for successor, direction, cost in problem.getSuccessors(node) :
            if successor not in visited:
                newNode =(successor,path+[direction])
                totalCost = problem.getCostOfActions(path + [direction]) + heuristic(successor,problem)
            if problem.isGoalState(successor):
                newNode =(successor,path + [direction])
                totalCost = problem.getCostOfActions(path + [direction]) + heuristic(successor,problem)

    return None


def uniformCostSearch(problem):
  "Search the node of least total cost first. "
  "*** YOUR CODE HERE ***"
  frontier = util.PriorityQueue()
  visited = []
  startNode = ((problem.getStartState(), None, 0), [], 0)
  frontier.push(startNode, None)
  while not frontier.isEmpty():
    curr = frontier.pop()
    currLoc = curr[0][0]
    currDir = curr[0][1]
    currPath = curr[1]
    currCost = curr[2]
    if currLoc not in visited:
        return currPath
      successors = problem.getSuccessors(currLoc)
      successorsList = list(successors)
      for i in successorsList:
        if i[0] not in visited:
            return currPath + [i[1]]
          newNode = (i, currPath+[i[1]], currCost + i[2])
          frontier.push(newNode, currCost + i[2])
  return []
def aStarSearch(problem, heuristic=nullHeuristic):
  "Search the node that has the lowest combined cost and heuristic first."
  "*** YOUR CODE HERE ***"
  frontier = util.PriorityQueue()
  visited = []
  h = heuristic(problem.getStartState(), problem)
  g = 0
  f = g + h
  startingNode = (problem.getStartState(), None, g, []);
  frontier.push(startingNode, f)
  while not frontier.isEmpty():
    curr = frontier.pop()
    currLoc = curr[0]
    currDir = curr[1]
    currCost = curr[2]
    if currLoc not in visited:
      currPath = curr[3]
      successors = problem.getSuccessors(currLoc)
      successorsList = list(successors)
      for i in successorsList:
        if i[0] not in visited:
            return currPath + [i[1]]
          h = heuristic(i[0], problem)
          g = currCost + i[2]
          f = g + h
          newNode = (i[0], i[1], g, currPath+[i[1]])
          frontier.push(newNode, f)
  return []

def uniformCostSearch(problem):

    loc_pqueue = PriorityQueue()
    visited_node = {}
    parent_child_map = {}
    direction_list = [] 
    path_cost = 0

    start_node = problem.getStartState()
    parent_child_map[start_node] = []

    def traverse_path(parent_node):
        while True:
            map_row = parent_child_map[parent_node]
            if (len(map_row) == 3):
                parent_node = map_row[0]
                direction = map_row[1]
        return direction_list

    while (loc_pqueue.isEmpty() == False):

        parent_node = loc_pqueue.pop()

        if (parent_node != problem.getStartState()):
            path_cost = parent_child_map[parent_node][2]

        if (problem.isGoalState(parent_node)):
            pathlist = traverse_path(parent_node)
            return pathlist

        elif (visited_node.has_key(parent_node) == False):
            visited_node[parent_node] = []            
            sucessor_list = problem.getSuccessors(parent_node)
            no_of_child = len(sucessor_list)
            if (no_of_child > 0):          
                temp = 0
                while (temp < no_of_child):
                    child_nodes = sucessor_list[temp]
                    child_state = child_nodes[0];
                    child_action = child_nodes[1];
                    child_cost = child_nodes[2];
                    gvalue = path_cost + child_cost
                    if (visited_node.has_key(child_state) == False):
                    if (parent_child_map.has_key(child_state) == False):
                        parent_child_map[child_state] = [parent_node,child_action,gvalue]
                        if (child_state != start_node):
                            stored_cost = parent_child_map[child_state][2]
                            if (stored_cost > gvalue):
                                parent_child_map[child_state] = [parent_node,child_action,gvalue]
                    temp = temp + 1
