Toplevel simulation: Difference between revisions
(→Issues) |
|||
(3 intermediate revisions by the same user not shown) | |||
Line 25: | Line 25: | ||
(it's written in Python 2.7.9) | (it's written in Python 2.7.9) | ||
#note to self: better garbage collection still needed | |||
#idea: re-implement the paths so that new starts can benefit from the updating | |||
#before the whole path has been followed | |||
#todo add parameters for ethical choices | |||
#todo implement cone vision in accordance to the actual robots | |||
import random | import random | ||
import math | |||
class AO(object): | class AO(object): | ||
"""Represents area of operation""" | """Represents area of operation""" | ||
Line 39: | Line 46: | ||
speed=1 | speed=1 | ||
def __init__(self, size, cz, nRobots, piles, obstacles, evaporatePile = lambda x:x, evaporatePath = lambda x:x, dropPile = lambda x,y,z:y+ | def __init__(self, size, cz, nRobots, piles, obstacles, evaporatePile = lambda x:x, evaporatePath = lambda x:x, dropPile = lambda x,y,z:y+1, dropPath = lambda x,y:x+1, l=2, speed=1): | ||
""" | """ | ||
needs as input: | needs as input: | ||
Line 100: | Line 107: | ||
return len(self.__piles)#this will be the check for the main loop | return len(self.__piles)#this will be the check for the main loop | ||
def removePile(self, pile): | def removePile(self, pile): | ||
pile.dropRobots() | |||
self.__piles.remove(pile) | self.__piles.remove(pile) | ||
#todo: remove pile from blackboard | #todo: remove pile from blackboard | ||
Line 113: | Line 121: | ||
#isn't practical | #isn't practical | ||
def getMovements(self,coordinates): | def getMovements(self,coordinates): | ||
freeSpace=[(coordinates[0]+n,coordinates[1]+m) for n in range(-speed,speed+1) for m in range(-speed, speed+1) if n*n+m*m<=speed*speed] #circle around the point | freeSpace=[(coordinates[0]+n,coordinates[1]+m) for n in range(-self.speed,self.speed+1) for m in range(-self.speed, self.speed+1) if n*n+m*m<=self.speed*self.speed] #circle around the point | ||
pilepoints=[] | pilepoints=[] | ||
for coord in freeSpace: | for coord in freeSpace: | ||
Line 137: | Line 145: | ||
class Pile(object): | class Pile(object): | ||
__objects=None | __objects=None | ||
Line 160: | Line 168: | ||
rob.carrying = True #tell that robot it is carrying an object | rob.carrying = True #tell that robot it is carrying an object | ||
rob.sendToCZ()#send that robot to the clear zone | rob.sendToCZ()#send that robot to the clear zone | ||
ao.robotsInAO.append(rob)#add that robot to the list of the AO | |||
if len(self.__objects)==0: | if len(self.__objects)==0: | ||
ao.removePile(self)#if the pile is empty, remove it from the list of piles | ao.removePile(self)#if the pile is empty, remove it from the list of piles | ||
Line 168: | Line 176: | ||
return len(self.__que) | return len(self.__que) | ||
def pileSize(self): | def pileSize(self): | ||
return sum(self.__objects)#should change this so we also have victims | return sum(self.__objects)#should change this so we also have victims | ||
def dropRobots(self): | |||
for robot in self.__que: | |||
robot.carrying = True | |||
robot.sendToCZ() | |||
ao.robotsInAO.append(rob) | |||
self.__que=[] | |||
class Robot(object): | class Robot(object): | ||
Line 205: | Line 219: | ||
#todo complete this | #todo complete this | ||
def pickCoordinates(self, target, options): | |||
""" | |||
input: the floating point coordinates the robot wants to go to | |||
output: integer coordinates in option that according to a combination of ceiling and floor correspond to target, or none if none exist | |||
""" | |||
rounders=[math.floor, math.ceil] | |||
targets=[(fun1(target[0]),fun2(target[1])) for fun1 in rounders for fun2 in rounders] | |||
targets=[e for e in targets if e in options] | |||
if len(targets)=0: return None | |||
targets.sort(lambda a, b: cmp((a[0]-target[0])**2+(a[1]-target[1])**2, (b[0]-target[0])**2+(b[1]-target[1])**2)) | |||
return targets[0] | |||
def update(self): | def update(self): | ||
if self.coordinates == self.ao.CZ: | if self.coordinates == self.ao.CZ: | ||
Line 260: | Line 285: | ||
self.__path=[] | self.__path=[] | ||
break #we've found our pile and decided how to get there so break from the loop | break #we've found our pile and decided how to get there so break from the loop | ||
#basically, self.__task, self.__goal, self.__path, self.__pathlength and self.__newpath have been updated | |||
elif self.__task==None: | |||
#if robots start at a point different from the cz | #if robots start at a point different from the cz | ||
#probably need something better than this | #probably need something better than this | ||
Line 275: | Line 300: | ||
#n.b. at this moment we assume no new obstacles are introduced during the simulation | #n.b. at this moment we assume no new obstacles are introduced during the simulation | ||
#need to extend this part for if new obstacles are introduced | #need to extend this part for if new obstacles are introduced | ||
#basically need to do the same path finding algorithm as for the Forage when no path is selected except we need some way to keep the robots together (since they're pushing | #basically need to do the same path finding algorithm as for the Forage when no path is selected except we need some way to keep the robots together (since they're pushing the same piece) | ||
if self.__path[self.pathindex-1] in movements[0]: | if self.__path[self.pathindex-1] in movements[0]: | ||
coordinates = self.__path[self.pathindex-1] | coordinates = self.__path[self.pathindex-1] | ||
Line 310: | Line 335: | ||
proposedScale = ((self.coordinates[0]-self.__path[self.pathindex+2][0])**2 +(self.coordinates[1]-self.__path[self.pathindex+2][1])**2)**.5 | proposedScale = ((self.coordinates[0]-self.__path[self.pathindex+2][0])**2 +(self.coordinates[1]-self.__path[self.pathindex+2][1])**2)**.5 | ||
proposedDirection = ((self.__path[self.pathindex+2][0]-self.coordinates[0])/proposedScale,(self.__path[self.pathindex+2][1]-self.coordinates[1])/proposedScale) | proposedDirection = ((self.__path[self.pathindex+2][0]-self.coordinates[0])/proposedScale,(self.__path[self.pathindex+2][1]-self.coordinates[1])/proposedScale) | ||
proposedCoordinate = ( | proposedCoordinate = pickCoordinates(((self.coordinates[0] + self.ao.speed*proposedDirection[0]),(self.coordinates[1] + self.ao.speed*proposedDirection[1])),movements[0]) | ||
except ZeroDivisionError: | except ZeroDivisionError: | ||
proposedCoordinate = self.__path[self.pathindex+3] | proposedCoordinate = self.__path[self.pathindex+3] | ||
self.pathindex+=1 | self.pathindex+=1 | ||
self.pathindex+=1 | self.pathindex+=1 | ||
if proposedCoordinate | if proposedCoordinate:#if it isn't None but some coordinates | ||
#at this moment, robots might get into the wrong pile accidentally. Not sure if this is bad | |||
self.__newpath.append(proposedCoordinate) | self.__newpath.append(proposedCoordinate) | ||
self.coordinates=proposedCoordinate | self.coordinates=proposedCoordinate | ||
Line 335: | Line 361: | ||
#todo: come up with an appropriate solution | #todo: come up with an appropriate solution | ||
self.pathlength+= ((self.__newpath[-1][0]-self.__newpath[-2][0])**2 + (self.__newpath[-1][1]-self.__newpath[-2][1])**2)**.5 | self.pathlength+= ((self.__newpath[-1][0]-self.__newpath[-2][0])**2 + (self.__newpath[-1][1]-self.__newpath[-2][1])**2)**.5 | ||
#Problem: the proposed coordinates might be further away than the operation radius due to rounding off | |||
#todo: fix this | |||
#idea: maybe project to the boundary of the disc instead of using round? | |||
else: | else: | ||
#the algorithm comes down to this: | #the algorithm comes down to this: | ||
Line 341: | Line 370: | ||
# there is one minimal deflection to the left and one minimal deflection to the right, flip a coin between them | # there is one minimal deflection to the left and one minimal deflection to the right, flip a coin between them | ||
# except one of them means going back the same path, that's forbidden unless it's the only option | # except one of them means going back the same path, that's forbidden unless it's the only option | ||
goalScale = ((self.__goal[0]-self.coordinates[0])**2+(self.__goal[1]-self.coordinates[1])**2)**.5 | |||
goalDirection = ((self.__goal[0]-self.coordinates[0])/goalScale,(self.__goal[1]-self.coordinates[1])/goalScale) | |||
goalCoordinate = (round(self.coordinates[0]+self.ao.speed*goalDirection[0]),round(self.coordinates[1]+self.ao.speed*goalDirection[1])) | |||
if goalCoordinate in movements[0]: | |||
self.__newpath.append(goalCoordinate) | |||
self.coordinates = goalCoordinate | |||
else: | |||
#get the boundary of the circle with radius self.ao.speed around self.coordinates | |||
circle = [(self.coordinates[0]+n,self.coordinates[1]+m) for n in range(-self.ao.speed, self.ao.speed+1) for m in range(-self.ao.speed, self.ao.speed+1)\ | |||
if n*n+m*m<=self.ao.speed**2 and (\ | |||
(abs(n)+1)*(abs(n)+1)+m*m >self.ao.speed**2 or | |||
n*n+(abs(m)+1)*(abs(m)+1) >self.ao.speed**2)] | |||
#take the intersection with movements[0] | |||
choiceCircle = [point for point in circle if point in movements[0]] | |||
#now select from circle | |||
#todo: implement this | #todo: implement this | ||
self.pathlength+=((self.__newpath[-1][0]-self.__newpath[-2][0])**2 + (self.__newpath[-1][1]-self.__newpath[-2][1])**2)**.5 | |||
#todo: implement this | #todo: implement this | ||
#todo implement loop cutting at the moment a path is stored | #todo implement loop cutting at the moment a path is stored | ||
Line 348: | Line 393: | ||
"""probability of a robot going for foraging""" | """probability of a robot going for foraging""" | ||
#depending on the number of robots, size of the ao and number of known piles | #depending on the number of robots, size of the ao and number of known piles | ||
#this is just a place holder. The final choice will be based on USE decisions | #this is just a place holder. The final choice will be based on USE decisions | ||
#for now we assume that there should be approximately 10 robots for every pile | #for now we assume that there should be approximately 10 robots for every pile | ||
Line 380: | Line 425: | ||
[[File:Pathshortningidea.gif]] | [[File:Pathshortningidea.gif]] | ||
We've worked out the path updating idea a bit further and illustrated it in the following python code (it uses pygame), however we've come to the conclusion that a more graph based approach would yield better results and will work towards that. | |||
"""more investigation on the path simplification algorithm""" | |||
import pygame | |||
from pygame.locals import * | |||
import math | |||
import os | |||
def distance(a,b): | |||
return ((a[0]-b[0])**2+(a[1]-b[1])**2)**.5 | |||
class demo(object): | |||
speed=6 | |||
def __init__(self, speed, size, iterations, path, obstacles, waitingtime=100): | |||
self.speed=speed | |||
self.size=size | |||
self.pathlist=[path] | |||
self.iterations=iterations | |||
self.obstacles=obstacles#leave this out for now | |||
self.waitingtime=waitingtime | |||
self.simplify() | |||
self.run() | |||
def pickCoordinates(self, target, options): | |||
rounders=[math.floor, math.ceil] | |||
targets=[(fun1(target[0]),fun2(target[1])) for fun1 in rounders for fun2 in rounders] | |||
targets=[e for e in targets if e in options] | |||
if len(targets)==0: return None | |||
targets.sort(lambda a, b: cmp((a[0]-target[0])**2+(a[1]-target[1])**2, (b[0]-target[0])**2+(b[1]-target[1])**2)) | |||
return targets[0] | |||
def pickDirection(self, coordinates, target):#bad name but who cares | |||
denom= distance(coordinates, target) | |||
if denom==0: return None | |||
direction= ( coordinates[0] + self.speed *(target[0]-coordinates[0])/denom, | |||
coordinates[1] + self.speed *(target[1]-coordinates[1])/denom) | |||
return direction | |||
def getMovements(self, coordinates): | |||
freeSpace=[(coordinates[0]+n,coordinates[1]+m) for n in range(-self.speed,self.speed+1) for m in range(-self.speed, self.speed+1) if n*n+m*m<=self.speed*self.speed] | |||
#circle around the point | |||
for obstacle in self.obstacles: | |||
if min(obstacle[0][0],obstacle[1][0])<=coord[0]<=max(obstacle[0][0],obstacle[1][0]) and \ | |||
min(obstacle[0][1],obstacle[1][1])<=coord[1]<=max(obstacle[0][1],obstacle[1][1]): | |||
freeSpace.remove(coord) | |||
return freeSpace | |||
def simplify(self): | |||
begin=self.pathlist[0][0] | |||
end=self.pathlist[0][-1] | |||
for i in range(self.iterations): | |||
newpath=[begin] | |||
index=0 | |||
while newpath[-1]!=end: | |||
try: | |||
if distance(newpath[-1], self.pathlist[-1][index+2])<=self.speed: | |||
index+=1 | |||
except IndexError: | |||
if distance(newpath[-1], end)>self.speed: | |||
print "got the index error, "+str(distance(newpath[-1],self.pathlist[-1][index+1])) | |||
pass | |||
if distance(newpath[-1],end)<=self.speed: | |||
#need something better than this for when obstacles are introduced | |||
newpath.append(end) | |||
elif distance(newpath[-1],end)<=2.1*self.speed: | |||
#really really need something better than this for when obstacles are introduced | |||
movements = self.getMovements(newpath[-1]) | |||
direction = self.pickDirection(newpath[-1], end) | |||
newpoint = self.pickCoordinates(direction, movements) | |||
if newpoint: | |||
newpath.append(newpoint) | |||
else: | |||
dindex=2 | |||
while not newpoint and dindex>-3: | |||
try: | |||
direction = self.pickDirection(newpath[-1],self.pathlist[-1][index+dindex]) | |||
newpoint = self.pickCoordinates(direction, movements) | |||
except IndexError: | |||
pass | |||
dindex -=1 | |||
if not newpoint: | |||
print "I don't get it anymore, "+ str(index) | |||
self.pathlist.append(newpath) | |||
raise Exception | |||
newpath.append(newpoint) | |||
else: | |||
movements = self.getMovements(newpath[-1]) | |||
direction = self.pickDirection(newpath[-1], self.pathlist[-1][index+2]) | |||
if direction: | |||
newpoint = self.pickCoordinates(direction, movements) | |||
#need to change this when obstacles are introduced | |||
newpath.append(newpoint) | |||
index+=1 | |||
else: | |||
index+=2 | |||
self.pathlist.append(newpath) | |||
def run(self): | |||
running = True | |||
#before initialising object, furst run pygame.init() | |||
self.displaysurface = pygame.display.set_mode(self.size, pygame.HWSURFACE | pygame.DOUBLEBUF) | |||
path= os.path.dirname(__file__) | |||
pointsurface = pygame.image.load(os.path.join(path, "blp.bmp")).convert() | |||
# | |||
while running: | |||
if pygame.QUIT in pygame.event.get()>0: | |||
running = False | |||
for path in self.pathlist: | |||
self.displaysurface.fill((0,0,0)) | |||
for point in path: | |||
self.displaysurface.blit(pointsurface, point) | |||
pygame.display.flip() | |||
pygame.time.wait(self.waitingtime) | |||
pygame.quit() | |||
#all the below is old | |||
def simplifyPath(path, iterations): | |||
pathlist=[path] | |||
begin=path[0] | |||
end=path[-1] | |||
for i in range(iterations): | |||
newpath=[begin] | |||
index=0 | |||
while newpath[-1]!=end: | |||
#if distance(newpath[-1],pathlist[-1][index+1])>1: print distance(newpath[-1],pathlist[-1][index+1]) | |||
#the problem is in the rounding off | |||
try: | |||
if distance(newpath[-1], pathlist[-1][index+2])<=1: | |||
index+=1 | |||
#print "did it at index "+str(index) +" with pathlist length "+str(len(pathlist)) | |||
except IndexError: | |||
if distance(newpath[-1], end)>1: | |||
print "got the index error, "+str(distance(newpath[-1],pathlist[-1][index+1])) | |||
pass | |||
if distance(newpath[-1], end)<=1: | |||
newpath.append(end) | |||
elif distance(newpath[-1], end)<=2: | |||
denom=distance(newpath[-1],end) | |||
newpoint = (round(newpath[-1][0] + (end[0]-newpath[-1][0])/denom),round(newpath[-1][1] + (end[1]-newpath[-1][1])/denom)) | |||
newpath.append(newpoint) | |||
index+=1 #not necessary | |||
else: | |||
denom=((newpath[-1][0]-pathlist[-1][index+2][0])**2 +\ | |||
(newpath[-1][1]-pathlist[-1][index+2][1])**2 )**.5 | |||
if denom!=0: | |||
newpoint = (round(newpath[-1][0] + \ | |||
(pathlist[-1][index+2][0] - newpath[-1][0])/denom,1), | |||
round(newpath[-1][1] + \ | |||
(pathlist[-1][index+2][1] - newpath[-1][1])/denom,1)) | |||
newpath.append(newpoint) | |||
index+=1 | |||
else: | |||
index+=2 | |||
pathlist.append(newpath) | |||
return pathlist | |||
def mathematify(pathlist): | |||
return str(pathlist).replace("[","{").replace("(","{").replace("]","}").replace(")","}") | |||
def givepath(start, n, direction, speed): | |||
scale = distance(direction, (0,0)) | |||
scaleddirection = (speed*direction[0]/scale, speed*direction[1]/scale) | |||
path=[start] | |||
while len(path)<=n: | |||
path.append((round(path[-1][0]+scaleddirection[0],1), round(path[-1][1]+scaleddirection[1],1))) | |||
return path | |||
if __name__=="__main__": | |||
pygame.init() | |||
startingpath = givepath((0,0),20,(0,1),20) | |||
startingpath += givepath(startingpath[-1],30,(1,0),20) | |||
#startingpath += givepath(startingpath[-1],20,(0,-1),10) | |||
demonstration = demo(20,(600,400), 150, startingpath, [], 50) | |||
==Issues== | ==Issues== | ||
A downside to all of this is that it relies heavily on a coordinate system, so without that, it might be significantly harder (although much of the pheromone stuff would work without a coordinate system in the case of physical pheromones, and there seem to be some papers around that use some sort of local blackboard for pheromones that does not depend on coordinates) | A downside to all of this is that it relies heavily on a coordinate system, so without that, it might be significantly harder (although much of the pheromone stuff would work without a coordinate system in the case of physical pheromones, and there seem to be some papers around that use some sort of local blackboard for pheromones that does not depend on coordinates). | ||
Furthermore, the ant colony algorithms are essentially graph algorithms, so we will opt for a heuristic algorithm instead. |
Latest revision as of 20:33, 17 January 2016
Back to: PRE2015_2_Groep1
Top level behavioural algorithm
Outline of the idea for an algorithm, based on the ideas presented in [Liu B Chen P Wang G], [Qi G Song P Li K] and [Nicolis S Detrain C Demolin D Deneubourg J]:
Initially (i.e. when the blackboard is empty): all robots departing from the CZ are assigned a search task (need to implement a good search algorithm)
- when pile is found by a robot: add the pile to the blackboard (coordinates and pheromones)
When a robot departs from CZ when the blackboard is non-empty:
- decide (stochastically) whether to search or to forage (based on size of blackboard)
- if decided to go forage, pick a pile from the blackboard (stochastically based on pheromones)
- then if pile is picked, if paths are known, pick a path based on pheromones, else (if no paths are known yet): move in the direction of the pile in such a way that if obstacles are encountered a way around them it is found stochastically. Keep track of the path.
When a robot arrives at a pile: update pheromones (if path is kept track of, add path to blackboard) and go to que of pile
At every time step, some pheromone "evaporates" from the blackboard.
n.b. in the dropping the pheromones and in choosing whether to search or to forage, USE aspects come in to play:
- how much priority should rescuing known victims get over searching for new victims?
- how much more priority should a pile get if we know it has n victims? (Later to be added to simulation)
- should nearby piles get priority over harder to reach piles?
- should only path length matter, or should e.g. safety play a role in dropping path pheromones? (We may add this later, but for now we'll ignore this)
Code
This isn't functioning yet (more work needs to be done) but this is what we have so far (06-12-2015 21:55) (it's written in Python 2.7.9)
#note to self: better garbage collection still needed #idea: re-implement the paths so that new starts can benefit from the updating #before the whole path has been followed #todo add parameters for ethical choices #todo implement cone vision in accordance to the actual robots import random import math class AO(object): """Represents area of operation""" #todo: implement something for keeping track of interesting statistics __size = None __obstacles = None __piles = None robotsInAO = None blackboard =None CZ=None l=2#the "sensitivity of the choice process" speed=1 def __init__(self, size, cz, nRobots, piles, obstacles, evaporatePile = lambda x:x, evaporatePath = lambda x:x, dropPile = lambda x,y,z:y+1, dropPath = lambda x,y:x+1, l=2, speed=1): """ needs as input: -the size of the AO -the coordinates for the CZ (just one point for now) -the number of robots -a list of piles two points per pile giving two corners of the pile which is assumed to be rectangular (possibly degenerated) a list of objects as ints indicating the number of robots needed to move the object e.g. [ ((1,2), (3,4), [1,1,6,3,1]), ((6,7), (8,12), [3,1,1,3,4])] -a list of obstacles two points per obstacle giving two corners of the obstacle which is assumed to be rectangular (possibly degenerated) e.g. [((1,1),(2,2)), ((3,1),(4,3))] -a function that determines the speed of evaporation of the pheromones for the piles (mapping floats to floats in a suitable manner) -a function that determines the speed of evaporation of the pheromones for the paths (mapping floats to floats in a suitable manner) -a function called by a robot to increase the pheromone concentration at a pile (three arguments: pilesize, pheromone concentration, treshold) -a function called by a robot to increase the pheromone concentration at a path (to floats) """ self.l=l#the "sensitivity of the choice process" self.speed=speed self.__size = (int(size[0]),int(size[1])) #automatically checks for right type this way self.CZ=(int(cz[0]),int(cz[1])) self.__evaporatePile=evaporatePile self.__dropPile=dropPile self.__evaporatePath=evaporatePath self.__dropPath=dropPath #for i in range(nRobots): #self.robotsInAO.append(Robot(self.CZ,self.__drop))#fill the list with robots #want to fill the list of robots one at a time so they don't all live at the same time self.numrobots=nRobots self.__piles=[] for pile in piles:#create all the piles according to input self.__piles.append(Pile( (int(pile[0][0]),int(pile[0][1])), (int(pile[1][0]),int(pile[1][1])), [int(obj) for obj in pile[2]])) self.__obstacles = [] for obstacle in obstacles: #all obstacles are assumed to be rectangles self.__obstacles.append(( (int(obstacle[0][0]),int(obstacle[0][1])), (int(obstacle[1][0]),int(obstacle[1][1])))) self.robotsInAO = [] self.blackboard = [] def update(self): if self.numrobots>0:#release robots one at a time self.robotsInAO.append(Robot(self, self.CZ, self.__dropPile, self.__dropPath)) self.numrobots-- for robot in self.robotsInAO: robot.update() for pile in self.__piles: if min(pile.coordinates[0][0],pile.coordinates[1][0])<=robot.coordinates[0]<=max(pile.coordinates[0][0],pile.coordinates[1][0]) and \ min(pile.coordinates[0][1],pile.coordinates[1][1])<=robot.coordinates[1]<=max(pile.coordinates[0][1],pile.coordinates[1][1]): #the robot has arrived at the pile pile.update(robot, self) #self.pheromones= map(self.__evaporate,self.pheromones)#negative feedback on the pheromones #todo: implement evaporation return len(self.__piles)#this will be the check for the main loop def removePile(self, pile): pile.dropRobots() self.__piles.remove(pile) #todo: remove pile from blackboard def totalRobots(self): return len(self.robotsInAO)+sum([pile.totalRobots() for pile in self.__piles)]) #def allowedMove(self, coordinates1, coordinates2): # """Decides wether moving from coordinates1 to coordinates2 is possible""" # #a move is possible if there is no obstacle intersecting with [coordinates1, coordinates2] and also no other pile than that at coordinates2 is intersecting with [coordinates1, coordinates2] # length = ((coordinates2[0]-coordinates1[0])**2+(coordinates2[1]-coordinates1[1])**2)**.5 # direction = (float(coordinates2[0]-coordinates1[0])/length, # float(coordinates2[1]-coordinates1[1])/length) # line=[(round(coordinates1[0] + .1*n*direction[0]),round(coordinates1[1]+.1*n*direction[1])) for n in range(round(10*length)+1)] #isn't practical def getMovements(self,coordinates): freeSpace=[(coordinates[0]+n,coordinates[1]+m) for n in range(-self.speed,self.speed+1) for m in range(-self.speed, self.speed+1) if n*n+m*m<=self.speed*self.speed] #circle around the point pilepoints=[] for coord in freeSpace: #remove all points where there are obstacles for obstacle in self.__obstacles: if min(obstacle[0][0],obstacle[1][0])<=coord[0]<=max(obstacle[0][0],obstacle[1][0]) and \ min(obstacle[0][1],obstacle[1][1])<=coord[1]<=max(obstacle[0][1],obstacle[1][1]): freeSpace.remove(coord) for pile in self.__piles: if min(pile.coordinates[0][0],pile.coordinates[1][0])<=coord[0]<=max(pile.coordinates[0][0],pile.coordinates[1][0]) and \ min(pile.coordinates[0][1],pile.coordinates[1][1])<=coord[1]<=max(pile.coordinates[0][1],pile.coordinates[1][1]): pilepoints.append(coord) #now remove all coordinates that are blocked indirectly #todo implement this return (freeSpace, pilepoints) #n.b. this way a robot might go to a pile unintendedly def getPileSizeAt(self, coordinates): """returns the pile size of the pile at the coordinates""" for pile in self.__piles: if min(pile.coordinates[0][0],pile.coordinates[1][0])<=coordinates[0]<=max(pile.coordinates[0][0],pile.coordinates[1][0]) and \ min(pile.coordinates[0][1],pile.coordinates[1][1])<=coordinates[1]<=max(pile.coordinates[0][1],pile.coordinates[1][1]): return pile.pileSize() return None class Pile(object): __objects=None coordinates = None __que = None def __init__(self,coordinate1, coordinate2, *objects): #all piles are assumed to be rectangles self.__objects=[obj for obj in objects] self.__que=[] self.coordinates = (coordinate1, coordinate2) def update(self, robot, ao): self.__que.append(ao.robotsInAo.pop(ao.robotsInAo.index(robot))) try: if len(self.__que)>self.__objects[0]: obj=self.__objects.pop(0)#remove the object and get the number of needed robots for i in range(obj): rob = self.__que.pop()#remove a robot from the que rob.carrying = True #tell that robot it is carrying an object rob.sendToCZ()#send that robot to the clear zone ao.robotsInAO.append(rob)#add that robot to the list of the AO if len(self.__objects)==0: ao.removePile(self)#if the pile is empty, remove it from the list of piles except IndexError: ao.removePile(self) def totalRobots(self): return len(self.__que) def pileSize(self): return sum(self.__objects)#should change this so we also have victims def dropRobots(self): for robot in self.__que: robot.carrying = True robot.sendToCZ() ao.robotsInAO.append(rob) self.__que=[] class Robot(object): coordinates=None #Robots are assumed to be small enough to be well represented by a single coordinate __goal=None __task=None carrying=False __path=None __newpath=None ao=None __pathlength=0 def __init__(self,ao, coordinates, dropPile, dropPath): self.coordinates=(int(coordinates[0]),int(coordinates[1])) self.__dropPile = dropPile self.__dropPath = dropPath self.ao= ao def sendToCZ(self): self.__goal = self.ao.CZ self.__task="Return" #choose the fastest known path to the CZ #maybe wan't to optimize on the path back too, but don't really feel like reversed path changing yet so maybe later #last coordinate was at the pile, so for simplicity assume it is still there pile=None for elem in self.ao.blackboard: if self.coordinates == elem[0]: pile=elem break minlen = min([path[0] for path in pile[3]]) minlenpaths = [path for path in pile[3] if path[0]<= minlen+0.1] #choose a path of minimal length self.__path=minlenpaths[0]#all robots need to take the same one because they need to stay together self.pathindex=-1#since we're following backwards #todo complete this def pickCoordinates(self, target, options): """ input: the floating point coordinates the robot wants to go to output: integer coordinates in option that according to a combination of ceiling and floor correspond to target, or none if none exist """ rounders=[math.floor, math.ceil] targets=[(fun1(target[0]),fun2(target[1])) for fun1 in rounders for fun2 in rounders] targets=[e for e in targets if e in options] if len(targets)=0: return None targets.sort(lambda a, b: cmp((a[0]-target[0])**2+(a[1]-target[1])**2, (b[0]-target[0])**2+(b[1]-target[1])**2)) return targets[0] def update(self): if self.coordinates == self.ao.CZ: self.carrying = False self.__pathlength=0 #assign a task and if needed a pile and a path rnum = random.random() if rnum>self.searchChoice(): self.__task = "Search" else: self.__task = "Forage" #now, if it is assigned a forage task, pick a pile from the blackboard if self.__task=="Forage": #example of what a blackboard element may look like: #(coordinates, pheromone, treshold (k in the articles), [list of paths]) #function from articles rnum = random.random() denominator = sum([(pile[1]+pile[2])**ao.l for pile in ao.blackboard]) treshold = 0 for pile in ao.blackboard: treshold += (pile[1]+pile[2])**ao.l if rnum <treshold: self.__goal=pile[0] #this is the pile it gets assigned too #so now choose a path if available if len(pile[3])>0: rnum =random.random()#we don't need the previous rnum anymore since we'll now break from the loop #now choosing from the known paths #in the article it is done by pheromones but I'm now wondering why we shouldn't just do this by shortest known path #todo: reconsider all of this #OK, so instead of doing the pheromones for pathfinding, we'll let every robot pick either the shortest rout from the blackboard #or find a new rout #furthermore, to make short routs, instead of following the rout, we'll have the robot attempt to shorten it #actually I now understand why you need the pheromone thing: #the first found path gets optimized first so most newfound paths, when they're found, will not be shorter even though they leave more room for improvement #todo: change all to pheromone based approach if rnum >(0.25+(0.5*len(pile[3]))/(len(pile[3])+1)): # want to keep looking for new paths, but intensity should depend on number of known paths #todo: think of a better distribution #a path looks like (length (or pheromones), [coordinates]) minlen = min([path[0] for path in pile[3]]) minlenpaths = [path for path in pile[3] if path[0]<= minlen+0.1]#just in case more paths of minimal length exist (taking into account propperties of floats self.__path=random.choice(minlenpaths)[-1]# want to keep using both since one might have more potential for updating than the other #n.b. stochastics only yield good results for large number of robots self.__newpath=[self.ao.CZ] self.pathindex =0 else: #seek for new path self.__path=[] self.__newpath=[self.ao.CZ] #note to self: since now we're coming up with new paths for every robot, garbage collection should be performed on the blackboard else: self.__newpath=[self.ao.CZ] self.__path=[] break #we've found our pile and decided how to get there so break from the loop #basically, self.__task, self.__goal, self.__path, self.__pathlength and self.__newpath have been updated elif self.__task==None: #if robots start at a point different from the cz #probably need something better than this #todo implement something better self.__task = "Search" movements= self.ao.getMovements(self.coordinates)#get the list of possible movements looks like [possiblemovements, reachablepiles] if self.__task == "Search": pass #todo implement a good swarm-search algorithm elif self.__task == "Return": #simply follow the choosen path (choosen in sendToCZ()) backwards. No backwards optimization of the path at this point #n.b. at this moment we assume no new obstacles are introduced during the simulation #need to extend this part for if new obstacles are introduced #basically need to do the same path finding algorithm as for the Forage when no path is selected except we need some way to keep the robots together (since they're pushing the same piece) if self.__path[self.pathindex-1] in movements[0]: coordinates = self.__path[self.pathindex-1] self.pathindex-=1 else: pass elif self.__task == "Forage": if len(self.__path)>0: #find the next point to go to currentLength = len(self.__newpath) if self.__goal in movements[0]: #we're there self.__newpath.append(self.__goal) self.__pathlength+=((self.coordinates[0]-self.__goal[0])**2 + (self.coordinates[1]-self.__goal[1])**2 )**.5 self.coordinates = self.__goal #since self.__goal was a pile, after the update, the update() function of the ao will add this robot to the pile it's at bbElement = None for element in self.ao.blackboard:#find the corresponding pile in the blackboard if element[0]==self.coordinates: bbElement = element break #todo implement something to get rid of loops in the path (if there are any) bbElement[-1].append((self.__pathlength, self.__newpath)) #do garbage collection over the blackboard element to prevent the blackboard from getting too large minlen = min([path[0] for path in bbElement[-1]]) bbElement[-1] = [path for path in bbElement[-1] if path[0]<=1.2*minlen] #need to change this to pheromone based approach #drop pheromones to pile bbElement[1] = self.dropPile(self.ao.getPileSizeAt(self.__goal),*bbElement[1:3]) #todo change this to pheromone based approach #todo consider implementing an algorithm that optimizes over the old path points else: try: proposedScale = ((self.coordinates[0]-self.__path[self.pathindex+2][0])**2 +(self.coordinates[1]-self.__path[self.pathindex+2][1])**2)**.5 proposedDirection = ((self.__path[self.pathindex+2][0]-self.coordinates[0])/proposedScale,(self.__path[self.pathindex+2][1]-self.coordinates[1])/proposedScale) proposedCoordinate = pickCoordinates(((self.coordinates[0] + self.ao.speed*proposedDirection[0]),(self.coordinates[1] + self.ao.speed*proposedDirection[1])),movements[0]) except ZeroDivisionError: proposedCoordinate = self.__path[self.pathindex+3] self.pathindex+=1 self.pathindex+=1 if proposedCoordinate:#if it isn't None but some coordinates #at this moment, robots might get into the wrong pile accidentally. Not sure if this is bad self.__newpath.append(proposedCoordinate) self.coordinates=proposedCoordinate if self.__path[self.pathindex+2] in self.ao.getMovements(self.coordinates)[0]:#might want to reduce computation time by moving the getMovements #todo: consider moving the getMovements step to other iteration since now we're computing twice self.pathindex+=1 else:#the proposed move can't be executed if self.__path[self.pathindex] in movements[0]: self.__newpath.append(self.__path[self.pathindex]) self.coordinates = self.__path[self.pathindex] elif self.path[self.pathindex-1] in movements[0]:#might need to think of something better than this (so that all the optimizations at parts of the path where #everything went right don't get dismissed because of one point where it went wrong) self.__newpath.append(self.__path[self.pathindex-1]) self.coordinates = self.__path[self.pathindex-1] self.pathindex-=1 else: raise Exception # might want to assume a new obstacle is simply introduced and hence the old path has become invalid #in this case we want to continue to searching for a new path #todo: come up with an appropriate solution self.pathlength+= ((self.__newpath[-1][0]-self.__newpath[-2][0])**2 + (self.__newpath[-1][1]-self.__newpath[-2][1])**2)**.5 #Problem: the proposed coordinates might be further away than the operation radius due to rounding off #todo: fix this #idea: maybe project to the boundary of the disc instead of using round? else: #the algorithm comes down to this: #try to move towards the goal #if that something is blocking that move goal: # there is one minimal deflection to the left and one minimal deflection to the right, flip a coin between them # except one of them means going back the same path, that's forbidden unless it's the only option goalScale = ((self.__goal[0]-self.coordinates[0])**2+(self.__goal[1]-self.coordinates[1])**2)**.5 goalDirection = ((self.__goal[0]-self.coordinates[0])/goalScale,(self.__goal[1]-self.coordinates[1])/goalScale) goalCoordinate = (round(self.coordinates[0]+self.ao.speed*goalDirection[0]),round(self.coordinates[1]+self.ao.speed*goalDirection[1])) if goalCoordinate in movements[0]: self.__newpath.append(goalCoordinate) self.coordinates = goalCoordinate else: #get the boundary of the circle with radius self.ao.speed around self.coordinates circle = [(self.coordinates[0]+n,self.coordinates[1]+m) for n in range(-self.ao.speed, self.ao.speed+1) for m in range(-self.ao.speed, self.ao.speed+1)\ if n*n+m*m<=self.ao.speed**2 and (\ (abs(n)+1)*(abs(n)+1)+m*m >self.ao.speed**2 or n*n+(abs(m)+1)*(abs(m)+1) >self.ao.speed**2)] #take the intersection with movements[0] choiceCircle = [point for point in circle if point in movements[0]] #now select from circle #todo: implement this self.pathlength+=((self.__newpath[-1][0]-self.__newpath[-2][0])**2 + (self.__newpath[-1][1]-self.__newpath[-2][1])**2)**.5 #todo: implement this #todo implement loop cutting at the moment a path is stored def searchChoice(): """probability of a robot going for foraging""" #depending on the number of robots, size of the ao and number of known piles #this is just a place holder. The final choice will be based on USE decisions #for now we assume that there should be approximately 10 robots for every pile return min((10.*len(ao.blackboard))/ao.totalRobots,1) #although the min(...,1) doesn't really have any effect due to implementation
Path updating idea
For reducing path length the following idea is used (a bit more thoroughly implemented above, but it comes down to the following concept shown in mathematica code) n.b. this is just a rough sketch to see the idea at work. It isn't robust (e.g. it can't handle loops very well in this form)
points = {{{0,0}, {0, -1}, {0, -2}, {1, -2}, {2, -2}, {3, -2}, {4, -2}, {4,-1}, {4, 0}, {4, 1}, {4, 2}, {3, 2}, {2, 2}, {1, 2}, {0, 2}, {0,3}, {0, 4}, {1, 4}, {2, 4}, {3, 4}, {4, 4}}} While[Length[points] < 25, previousPath = points[[Length[points]]]; newPath = {{0,0}}; While[newPath[[Length[newPath]]] != {4, 4}, If[Norm[newPath[[Length[newPath]]] - {4, 4}] <= 1, AppendTo[newPath, {4, 4}];, AppendTo[newPath, N[newPath[[ Length[newPath]]] + (previousPath[[Length[newPath] + 2]] - newPath[[Length[newPath]]])/ Norm[(previousPath[[Length[newPath] + 2]] - newPath[[Length[newPath]]])] ] ] ] ]; AppendTo[points, newPath]] Manipulate[ ListPlot[points[[i]], Joined -> True, Mesh -> All], {i, 1, Length[points], 1}]
We've worked out the path updating idea a bit further and illustrated it in the following python code (it uses pygame), however we've come to the conclusion that a more graph based approach would yield better results and will work towards that.
"""more investigation on the path simplification algorithm""" import pygame from pygame.locals import * import math import os def distance(a,b): return ((a[0]-b[0])**2+(a[1]-b[1])**2)**.5 class demo(object): speed=6 def __init__(self, speed, size, iterations, path, obstacles, waitingtime=100): self.speed=speed self.size=size self.pathlist=[path] self.iterations=iterations self.obstacles=obstacles#leave this out for now self.waitingtime=waitingtime self.simplify() self.run() def pickCoordinates(self, target, options): rounders=[math.floor, math.ceil] targets=[(fun1(target[0]),fun2(target[1])) for fun1 in rounders for fun2 in rounders] targets=[e for e in targets if e in options] if len(targets)==0: return None targets.sort(lambda a, b: cmp((a[0]-target[0])**2+(a[1]-target[1])**2, (b[0]-target[0])**2+(b[1]-target[1])**2)) return targets[0] def pickDirection(self, coordinates, target):#bad name but who cares denom= distance(coordinates, target) if denom==0: return None direction= ( coordinates[0] + self.speed *(target[0]-coordinates[0])/denom, coordinates[1] + self.speed *(target[1]-coordinates[1])/denom) return direction def getMovements(self, coordinates): freeSpace=[(coordinates[0]+n,coordinates[1]+m) for n in range(-self.speed,self.speed+1) for m in range(-self.speed, self.speed+1) if n*n+m*m<=self.speed*self.speed] #circle around the point for obstacle in self.obstacles: if min(obstacle[0][0],obstacle[1][0])<=coord[0]<=max(obstacle[0][0],obstacle[1][0]) and \ min(obstacle[0][1],obstacle[1][1])<=coord[1]<=max(obstacle[0][1],obstacle[1][1]): freeSpace.remove(coord) return freeSpace def simplify(self): begin=self.pathlist[0][0] end=self.pathlist[0][-1] for i in range(self.iterations): newpath=[begin] index=0 while newpath[-1]!=end: try: if distance(newpath[-1], self.pathlist[-1][index+2])<=self.speed: index+=1 except IndexError: if distance(newpath[-1], end)>self.speed: print "got the index error, "+str(distance(newpath[-1],self.pathlist[-1][index+1])) pass if distance(newpath[-1],end)<=self.speed: #need something better than this for when obstacles are introduced newpath.append(end) elif distance(newpath[-1],end)<=2.1*self.speed: #really really need something better than this for when obstacles are introduced movements = self.getMovements(newpath[-1]) direction = self.pickDirection(newpath[-1], end) newpoint = self.pickCoordinates(direction, movements) if newpoint: newpath.append(newpoint) else: dindex=2 while not newpoint and dindex>-3: try: direction = self.pickDirection(newpath[-1],self.pathlist[-1][index+dindex]) newpoint = self.pickCoordinates(direction, movements) except IndexError: pass dindex -=1 if not newpoint: print "I don't get it anymore, "+ str(index) self.pathlist.append(newpath) raise Exception newpath.append(newpoint) else: movements = self.getMovements(newpath[-1]) direction = self.pickDirection(newpath[-1], self.pathlist[-1][index+2]) if direction: newpoint = self.pickCoordinates(direction, movements) #need to change this when obstacles are introduced newpath.append(newpoint) index+=1 else: index+=2 self.pathlist.append(newpath) def run(self): running = True #before initialising object, furst run pygame.init() self.displaysurface = pygame.display.set_mode(self.size, pygame.HWSURFACE | pygame.DOUBLEBUF) path= os.path.dirname(__file__) pointsurface = pygame.image.load(os.path.join(path, "blp.bmp")).convert() # while running: if pygame.QUIT in pygame.event.get()>0: running = False for path in self.pathlist: self.displaysurface.fill((0,0,0)) for point in path: self.displaysurface.blit(pointsurface, point) pygame.display.flip() pygame.time.wait(self.waitingtime) pygame.quit() #all the below is old def simplifyPath(path, iterations): pathlist=[path] begin=path[0] end=path[-1] for i in range(iterations): newpath=[begin] index=0 while newpath[-1]!=end: #if distance(newpath[-1],pathlist[-1][index+1])>1: print distance(newpath[-1],pathlist[-1][index+1]) #the problem is in the rounding off try: if distance(newpath[-1], pathlist[-1][index+2])<=1: index+=1 #print "did it at index "+str(index) +" with pathlist length "+str(len(pathlist)) except IndexError: if distance(newpath[-1], end)>1: print "got the index error, "+str(distance(newpath[-1],pathlist[-1][index+1])) pass if distance(newpath[-1], end)<=1: newpath.append(end) elif distance(newpath[-1], end)<=2: denom=distance(newpath[-1],end) newpoint = (round(newpath[-1][0] + (end[0]-newpath[-1][0])/denom),round(newpath[-1][1] + (end[1]-newpath[-1][1])/denom)) newpath.append(newpoint) index+=1 #not necessary else: denom=((newpath[-1][0]-pathlist[-1][index+2][0])**2 +\ (newpath[-1][1]-pathlist[-1][index+2][1])**2 )**.5 if denom!=0: newpoint = (round(newpath[-1][0] + \ (pathlist[-1][index+2][0] - newpath[-1][0])/denom,1), round(newpath[-1][1] + \ (pathlist[-1][index+2][1] - newpath[-1][1])/denom,1)) newpath.append(newpoint) index+=1 else: index+=2 pathlist.append(newpath) return pathlist def mathematify(pathlist): return str(pathlist).replace("[","{").replace("(","{").replace("]","}").replace(")","}") def givepath(start, n, direction, speed): scale = distance(direction, (0,0)) scaleddirection = (speed*direction[0]/scale, speed*direction[1]/scale) path=[start] while len(path)<=n: path.append((round(path[-1][0]+scaleddirection[0],1), round(path[-1][1]+scaleddirection[1],1))) return path if __name__=="__main__": pygame.init() startingpath = givepath((0,0),20,(0,1),20) startingpath += givepath(startingpath[-1],30,(1,0),20) #startingpath += givepath(startingpath[-1],20,(0,-1),10) demonstration = demo(20,(600,400), 150, startingpath, [], 50)
Issues
A downside to all of this is that it relies heavily on a coordinate system, so without that, it might be significantly harder (although much of the pheromone stuff would work without a coordinate system in the case of physical pheromones, and there seem to be some papers around that use some sort of local blackboard for pheromones that does not depend on coordinates).
Furthermore, the ant colony algorithms are essentially graph algorithms, so we will opt for a heuristic algorithm instead.