Python string 模块,atoi() 实例源码
我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用string.atoi()。
def do_POST(self):
dc = self.IFACE_CLASS
uri = urlparse.urljoin(self.get_baseuri(dc), self.path)
uri = urllib.unquote(uri)
dbname, dburi = TrytonDAVInterface.get_dburi(uri)
if dburi.startswith('Calendars'):
# read the body
body = None
if 'Content-Length' in self.headers:
l = self.headers['Content-Length']
body = self.rfile.read(atoi(l))
ct = None
if 'Content-Type' in self.headers:
ct = self.headers['Content-Type']
try:
DATA = '%s\n' % dc._get_caldav_post(uri, body, ct)
except DAV_Error, exception:
ec, _ = exception
return self.send_status(ec)
self.send_body(DATA, 200, 'OK', 'OK')
return
return _prev_do_POST(self)
def parse (self, vstring):
match = self.version_re.match(vstring)
if not match:
raise ValueError, "invalid version number '%s'" % vstring
(major, minor, patch, prerelease, prerelease_num) = \
match.group(1, 2, 4, 5, 6)
if patch:
self.version = tuple(map(string.atoi, [major, minor, patch]))
else:
self.version = tuple(map(string.atoi, [major, minor]) + [0])
if prerelease:
self.prerelease = (prerelease[0], string.atoi(prerelease_num))
else:
self.prerelease = None
def __init__(self, mmemory, typeaccess=0) :
if not isinstance(mmemory, Memory):
raise TypeError("ERREUR")
self.mmemory = mmemory
self.mmemory.open("r", typeaccess)
try :
fichier = open("/usr/include/asm/unistd.h", "r")
except IOError :
print "No such file /usr/include/asm/unistd.h"
sys.exit(-1)
liste = fichier.readlines()
fichier.close()
count = 0
for i in liste :
if(re.match("#define __NR_", i)) :
l = string.split(i)
if(l[2][0].isdigit()) :
count = string.atoi(l[2], 10)
self.lists_syscalls.append([count, l[1][5:]])
else :
count = count + 1
self.lists_syscalls.append([count, l[1][5:]])
def parse (self, vstring):
match = self.version_re.match(vstring)
if not match:
raise ValueError, "invalid version number '%s'" % vstring
(major, minor, patch, prerelease, prerelease_num) = \
match.group(1, 2, 4, 5, 6)
if patch:
self.version = tuple(map(string.atoi, [major, minor, patch]))
else:
self.version = tuple(map(string.atoi, [major, minor]) + [0])
if prerelease:
self.prerelease = (prerelease[0], string.atoi(prerelease_num))
else:
self.prerelease = None
def getProperPair(self,outfile=None):
'''Extract proper paried mapped reads.'''
if outfile is None:
outfile = self.fileName + ".PP.sam"
FO=open(outfile,'w')
PPcount=0
print >>sys.stderr, "Writing proper paired reads to\"",outfile,"\"... ",
for line in self.f:
hits=[]
if line[0] == '@':continue #skip head lines
if ParseSAM._reExpr2.match(line):continue #skip blank lines
field=line.rstrip('\n').split()
flagCode=string.atoi(field[1])
if ((flagCode & 0x0001) != 0) and ((flagCode & 0x0002)!=0):
PPcount +=1
FO.write(line)
FO.close()
print >>sys.stderr, str(PPcount) + " reads were saved!\n",
self.f.seek(0)
def getUniqMapRead(self,outfile=None):
'''Extract uniquely mapped reads.'''
if outfile is None:
outfile = self.fileName + ".uniq.sam"
FO=open(outfile,'w')
Uniqcount=0
print >>sys.stderr, "Writing uniquely mapped reads to\"",outfile,"\"... ",
for line in self.f:
hits=[]
if line[0] == '@':continue #skip head lines
if ParseSAM._reExpr2.match(line):continue #skip blank lines
field=line.rstrip('\n').split()
flagCode=string.atoi(field[1])
if (flagCode & 0x0004) == 1:
continue #skip unmapped reads
#else:
#print >>sys.stderr,line,
if (ParseSAM._uniqueHit_pat.search(line)):
print >>sys.stderr,line,
Uniqcount +=1
FO.write(line)
FO.close()
print >>sys.stderr, str(Uniqcount) + " reads were saved!\n",
self.f.seek(0)
def collapseSAM(self, outfile=None,collevel=10):
'''At most collevel[default=10] identical reads will be retained in outputting SAM file
The original SAM file must be sorted before hand. if not, using linux command like "sort -k3,3 -k4,4n myfile.sam >myfile.sorted.sam" '''
if outfile is None:
outfile = self.fileName + ".collapsed.sam"
print >>sys.stderr, "Writing collapsed SAM file to\"",outfile,"\"... "
FO=open(outfile,'w')
flag=""
for line in self.f:
if line[0] == '@':continue #skip head lines
if ParseSAM._reExpr2.match(line):continue #skip blank lines
field=line.rstrip('\n').split()
if (string.atoi(field[1]) & 0x0004)!=0: continue #skip unmapped line
id=field[2] + field[3] + field[5]
if (id != flag):
FO.write(line)
flag=id
skipTrigger=0
else:
skipTrigger+=1
if skipTrigger < collevel:
FO.write(line)
else:continue
FO.close()
self.f.seek(0)
def strtoip(ipstr):
"""convert an IP address in string form to numeric."""
res = 0L
count = 0
n = string.split(ipstr, '.')
for i in n:
res = res << 8L
ot = string.atoi(i)
if ot < 0 or ot > 255:
raise ValueError, "invalid IP octet"
res = res + ot
count = count + 1
# could be incomplete (short); make it complete.
while count < 4:
res = res << 8L
count = count + 1
return res
def cidrstrerr(str):
"""Check an IP address or CIDR netblock for validity.
Returns None if it is and otherwise an error string."""
if not cvalid.match(str):
return 'Not a syntatically valid IP address or netblock'
rng = 32
pos = string.find(str, '/')
ips = str
if not pos == -1:
rng = string.atoi(ips[pos+1:])
ips = str[:pos]
if rng < 0 or rng > 32:
return 'CIDR length out of range'
n = string.split(ips, '.')
for i in n:
ip = string.atoi(i)
if (ip < 0 or ip > 255):
return 'an IP octet is out of range'
# could check to see if it is 'proper', but.
return None
def parse (self, vstring):
match = self.version_re.match(vstring)
if not match:
raise ValueError, "invalid version number '%s'" % vstring
(major, minor, patch, prerelease, prerelease_num) = \
match.group(1, 2, 4, 5, 6)
if patch:
self.version = tuple(map(string.atoi, [major, minor, patch]))
else:
self.version = tuple(map(string.atoi, [major, minor]) + [0])
if prerelease:
self.prerelease = (prerelease[0], string.atoi(prerelease_num))
else:
self.prerelease = None
def GET(self, id=None):
if id == None:
return('Here are all the functions available: %s' % self.functions)
fn_name=id.split('(')[0]
args=str(id.split('(')[1]).split(',')
if len(args):args[-1]=args[-1][:-1]
total_args=[]
for t in args:
print (t,len(t))
if t[0]=="'" or t[0]=='"':total_args.append(t[1:-1])
else:total_args.append(string.atoi(t))
method = self.methods.get(fn_name)[0]
if method == None :
print ('no such command :',fn_name)
return 'no such command : %s'%fn_name
else:
print (method,total_args)
#while self.hw_lock and self.active: pass
#self.hw_lock=True
result=method(*total_args)
#self.hw_lock=False
return json.dumps(result,cls=NumpyEncoder)
def setThingSpeakCommand(self):
try:
message = str(self.cmdEditThingSpeak.text())
fn_name=message.split('(')[0]
args = message[message.find("(")+1:message.find(")")].strip().split(',')
total_args=[]
for t in args:
if not len(t):continue
if t[0]=="'" or t[0]=='"':total_args.append(t[1:-1])
else:total_args.append(string.atoi(t))
method = self.methods.get(fn_name)[0]
if method == None :
print ('no such command :',fn_name)
return 'no such command : %s'%fn_name
else:
#while self.hw_lock and self.active: pass
#self.hw_lock=True
self.thingSpeakCommand=[method,total_args]
except Exception,e:
self.results_2.append('Set Error : %s'%( e.message))
pass
def parseFunc(self,fn):
fn_name=fn.split('(')[0]
args=str(fn.split('(')[1]).split(',')
int_args=[]
try:
args[-1]=args[-1][:-1]
int_args=[string.atoi(t) for t in args]
except:
int_args=[] #in case the function has zero arguments, args[-1] will fail.
method = getattr(self.I,fn_name)
if method == None :
print ('no such command :',fn_name)
return None
else:
print (method,int_args)
return method,int_args
def parse (self, vstring):
match = self.version_re.match(vstring)
if not match:
raise ValueError, "invalid version number '%s'" % vstring
(major, minor, patch, prerelease, prerelease_num) = \
match.group(1, 2, 4, 5, 6)
if patch:
self.version = tuple(map(string.atoi, [major, minor, patch]))
else:
self.version = tuple(map(string.atoi, [major, minor]) + [0])
if prerelease:
self.prerelease = (prerelease[0], string.atoi(prerelease_num))
else:
self.prerelease = None
def process_id(train_file_dir,train_file_name):
# ???????????????????????????????query??????list??????????list
# train_file_dir?????????train_file_name????????
unable = open(os.path.join(train_file_dir, 'cannot_segment_query_id.txt'), 'r')
lines = unable.readlines()
unable_id = []
for line in lines:
a = line.replace("\n", "").split(" ")
unable_id.append(string.atoi(a[0]))
f = open(os.path.join(train_file_dir, train_file_name),'r')
qaid = []
for line in f:
file_dict_temp = json.loads(line)
temp_id = file_dict_temp['query_id']
if temp_id in unable_id:
continue
qaid.append(temp_id)
return qaid
def process_id(train_file_dir,train_file_name):
# ???????????????????????????????query??????list??????????list
# train_file_dir?????????train_file_name????????
unable = open(os.path.join(train_file_dir, 'cannot_segment_query_id.txt'), 'r')
lines = unable.readlines()
unable_id = []
for line in lines:
a = line.replace("\n", "").split(" ")
unable_id.append(string.atoi(a[0]))
f = open(os.path.join(train_file_dir, train_file_name),'r')
qaid = []
for line in f:
file_dict_temp = json.loads(line)
temp_id = file_dict_temp['query_id']
if temp_id in unable_id:
continue
qaid.append(temp_id)
return qaid
def parse_detail_page(self, response):
"""
Get details for each course
"""
item = response.meta['item']
parse_review_num = response.xpath('//span[@itemprop="votes"]/text()').extract_first().strip()
item['review_num'] = string.atoi(parse_review_num)
parse_student_num = re.findall(r'"mycourses-listed-count", 0, (.*), 0', response.text)[0].strip() or '0'
item['student_num'] = string.atoi(parse_student_num)
parse_course_info = response.xpath('//div[@class="course-desc"]').extract()
for i in range(len(parse_course_info) - 1):
parse_course_info[0].extend(parse_course_info[i+1])
item['keywords'] = self.get_keywords(parse_course_info[0]+item['name']) or []
yield item
def name2rgb(root, colorName, asInt = 0):
if colorName[0] == '#':
# Extract rgb information from the color name itself, assuming
# it is either #rgb, #rrggbb, #rrrgggbbb, or #rrrrggggbbbb
# This is useful, since tk may return incorrect rgb values if
# the colormap is full - it will return the rbg values of the
# closest color available.
colorName = colorName[1:]
digits = len(colorName) / 3
factor = 16 ** (4 - digits)
rgb = (
string.atoi(colorName[0:digits], 16) * factor,
string.atoi(colorName[digits:digits * 2], 16) * factor,
string.atoi(colorName[digits * 2:digits * 3], 16) * factor,
)
else:
# We have no choice but to ask Tk what the rgb values are.
rgb = root.winfo_rgb(colorName)
if not asInt:
rgb = (rgb[0] / _MAX_RGB, rgb[1] / _MAX_RGB, rgb[2] / _MAX_RGB)
return rgb
def timestringtoseconds(text, separator = ':'):
inputList = string.split(string.strip(text), separator)
if len(inputList) != 3:
raise ValueError, 'invalid value: ' + text
sign = 1
if len(inputList[0]) > 0 and inputList[0][0] in ('+', '-'):
if inputList[0][0] == '-':
sign = -1
inputList[0] = inputList[0][1:]
if re.search('[^0-9]', string.join(inputList, '')) is not None:
raise ValueError, 'invalid value: ' + text
hour = string.atoi(inputList[0])
minute = string.atoi(inputList[1])
second = string.atoi(inputList[2])
if minute >= 60 or second >= 60:
raise ValueError, 'invalid value: ' + text
return sign * (hour * 60 * 60 + minute * 60 + second)
def aligngrouptags(groups):
# Adjust the y position of the tags in /groups/ so that they all
# have the height of the highest tag.
maxTagHeight = 0
for group in groups:
if group._tag is None:
height = (string.atoi(str(group._ring.cget('borderwidth'))) +
string.atoi(str(group._ring.cget('highlightthickness'))))
else:
height = group._tag.winfo_reqheight()
if maxTagHeight < height:
maxTagHeight = height
for group in groups:
ringBorder = (string.atoi(str(group._ring.cget('borderwidth'))) +
string.atoi(str(group._ring.cget('highlightthickness'))))
topBorder = maxTagHeight / 2 - ringBorder / 2
group._hull.grid_rowconfigure(0, minsize = topBorder)
group._ring.grid_rowconfigure(0,
minsize = maxTagHeight - topBorder - ringBorder)
if group._tag is not None:
group._tag.place(y = maxTagHeight / 2)
def _next(self, event):
size = self.size()
if size <= 1:
return
cursels = self.curselection()
if len(cursels) == 0:
index = 0
else:
index = string.atoi(cursels[0])
if index == size - 1:
index = 0
else:
index = index + 1
self.selectitem(index)
def _previous(self, event):
size = self.size()
if size <= 1:
return
cursels = self.curselection()
if len(cursels) == 0:
index = size - 1
else:
index = string.atoi(cursels[0])
if index == 0:
index = size - 1
else:
index = index - 1
self.selectitem(index)
def parse (self, vstring):
match = self.version_re.match(vstring)
if not match:
raise ValueError, "invalid version number '%s'" % vstring
(major, minor, patch, prerelease, prerelease_num) = \
match.group(1, 2, 4, 5, 6)
if patch:
self.version = tuple(map(string.atoi, [major, minor, patch]))
else:
self.version = tuple(map(string.atoi, [major, minor]) + [0])
if prerelease:
self.prerelease = (prerelease[0], string.atoi(prerelease_num))
else:
self.prerelease = None
def main():
try:
opts, args = getopt.getopt(sys.argv[1:], "")
if len(args) > 1:
raise getopt.error, "Too many arguments."
except getopt.error, msg:
usage(msg)
for o, a in opts:
pass
if args:
try:
port = string.atoi(args[0])
except ValueError, msg:
usage(msg)
else:
port = PORT
main_thread(port)
def main():
nworkers = 4
opts, args = getopt.getopt(sys.argv[1:], '-w:')
for opt, arg in opts:
if opt == '-w':
nworkers = string.atoi(arg)
if not args:
args = [os.curdir]
wq = WorkQ()
for dir in args:
wq.addwork(find, (dir, selector, wq))
t1 = time.time()
wq.run(nworkers)
t2 = time.time()
sys.stderr.write('Total time %r sec.\n' % (t2-t1))
# The predicate -- defines what files we look for.
# Feel free to change this to suit your purpose
def main():
if len(sys.argv) < 3:
print "usage: rpython host command"
sys.exit(2)
host = sys.argv[1]
port = PORT
i = string.find(host, ':')
if i >= 0:
port = string.atoi(port[i+1:])
host = host[:i]
command = string.join(sys.argv[2:])
s = socket(AF_INET, SOCK_STREAM)
s.connect((host, port))
s.send(command)
s.shutdown(1)
reply = ''
while 1:
data = s.recv(BUFSIZE)
if not data: break
reply = reply + data
print reply,
def open_message(e=None):
global viewer
sel = scanbox.curselection()
if len(sel) != 1:
if len(sel) > 1:
msg = "Please open one message at a time"
else:
msg = "Please select a message to open"
dialog(root, "Can't Open Message", msg, "", 0, "OK")
return
cursor = scanbox['cursor']
scanbox['cursor'] = 'watch'
tk.call('update', 'idletasks')
i = sel[0]
line = scanbox.get(i)
if scanparser.match(line) >= 0:
num = string.atoi(scanparser.group(1))
m = mhf.openmessage(num)
if viewer: viewer.destroy()
from MimeViewer import MimeViewer
viewer = MimeViewer(bot, '+%s/%d' % (folder, num), m)
viewer.pack()
viewer.show()
scanbox['cursor'] = cursor
def unctime(date):
if date == "None": return None
if not unctime_monthmap:
months = ['Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun',
'Jul', 'Aug', 'Sep', 'Oct', 'Nov', 'Dec']
i = 0
for m in months:
i = i+1
unctime_monthmap[m] = i
words = string.split(date) # Day Mon DD HH:MM:SS YEAR
year = string.atoi(words[4])
month = unctime_monthmap[words[1]]
day = string.atoi(words[2])
[hh, mm, ss] = map(string.atoi, string.splitfields(words[3], ':'))
ss = ss - time.timezone
return time.mktime((year, month, day, hh, mm, ss, 0, 0, 0))
def parse (self, vstring):
match = self.version_re.match(vstring)
if not match:
raise ValueError, "invalid version number '%s'" % vstring
(major, minor, patch, prerelease, prerelease_num) = \
match.group(1, 2, 4, 5, 6)
if patch:
self.version = tuple(map(string.atoi, [major, minor, patch]))
else:
self.version = tuple(map(string.atoi, [major, minor]) + [0])
if prerelease:
self.prerelease = (prerelease[0], string.atoi(prerelease_num))
else:
self.prerelease = None
def parse (self, vstring):
match = self.version_re.match(vstring)
if not match:
raise ValueError, "invalid version number '%s'" % vstring
(major, minor, patch, prerelease, prerelease_num) = \
match.group(1, 2, 4, 5, 6)
if patch:
self.version = tuple(map(string.atoi, [major, minor, patch]))
else:
self.version = tuple(map(string.atoi, [major, minor]) + [0])
if prerelease:
self.prerelease = (prerelease[0], string.atoi(prerelease_num))
else:
self.prerelease = None
def main():
if len(sys.argv) == 2: # slob city
stringlength = string.atoi(sys.argv[1])
if stringlength < 2 or stringlength % 2 != 0:
raise RuntimeError('String length requested must be even '
'integer > 0.')
else:
stringlength = 6
while 1:
line = sys.stdin.readline()
if not line:
break
latlon = re.findall(r'([-0-9.]+)\s+([-0-9.]+)', line)
if latlon:
for leftval, rightval in latlon:
lat = string.atof(leftval)
lon = string.atof(rightval)
else:
raise RuntimeError('Cannot even get the basic items.')
astring = maidenhead.mh1(lat, lon, stringlength)
print('{0}'.format(astring))
def main():
if len(sys.argv) == 2: # slob city
stringlength = string.atoi(sys.argv[1])
if stringlength < 2 or stringlength % 2 != 0:
raise RuntimeError('String length requested must be even '
'integer > 0.')
else:
stringlength = 6
while 1:
line = sys.stdin.readline()
if not line:
break
latlon = re.findall(r'([-0-9.]+)\s+([-0-9.]+)', line)
if latlon:
for leftval, rightval in latlon:
lat = string.atof(leftval)
lon = string.atof(rightval)
else:
raise RuntimeError('Cannot even get the basic items.')
astring = maidenhead.mh2(lat, lon, stringlength)
print('{0}'.format(astring))
def do_POST(self):
dc = self.IFACE_CLASS
uri = urlparse.urljoin(self.get_baseuri(dc), self.path)
uri = urllib.unquote(uri)
dbname, dburi = TrytonDAVInterface.get_dburi(uri)
if dburi.startswith('Calendars'):
# read the body
body = None
if 'Content-Length' in self.headers:
l = self.headers['Content-Length']
body = self.rfile.read(atoi(l))
ct = None
if 'Content-Type' in self.headers:
ct = self.headers['Content-Type']
try:
DATA = '%s\n' % dc._get_caldav_post(uri, body, ct)
except DAV_Error, exception:
ec, _ = exception
return self.send_status(ec)
self.send_body(DATA, 200, 'OK', 'OK')
return
return _prev_do_POST(self)
def strtoip(ipstr):
"""convert an IP address in string form to numeric."""
res = 0L
count = 0
n = string.split(ipstr, '.')
for i in n:
res = res << 8L
ot = string.atoi(i)
if ot < 0 or ot > 255:
raise ValueError, "invalid IP octet"
res = res + ot
count = count + 1
# could be incomplete (short); make it complete.
while count < 4:
res = res << 8L
count = count + 1
return res
def cidrstrerr(str):
"""Check an IP address or CIDR netblock for validity.
Returns None if it is and otherwise an error string."""
if not cvalid.match(str):
return 'Not a syntatically valid IP address or netblock'
rng = 32
pos = string.find(str, '/')
ips = str
if not pos == -1:
rng = string.atoi(ips[pos+1:])
ips = str[:pos]
if rng < 0 or rng > 32:
return 'CIDR length out of range'
n = string.split(ips, '.')
for i in n:
ip = string.atoi(i)
if (ip < 0 or ip > 255):
return 'an IP octet is out of range'
# could check to see if it is 'proper', but.
return None
def parse (self, vstring):
match = self.version_re.match(vstring)
if not match:
raise ValueError, "invalid version number '%s'" % vstring
(major, minor, patch, prerelease, prerelease_num) = \
match.group(1, 2, 4, 5, 6)
if patch:
self.version = tuple(map(string.atoi, [major, minor, patch]))
else:
self.version = tuple(map(string.atoi, [major, minor]) + [0])
if prerelease:
self.prerelease = (prerelease[0], string.atoi(prerelease_num))
else:
self.prerelease = None
def parse (self, vstring):
match = self.version_re.match(vstring)
if not match:
raise ValueError, "invalid version number '%s'" % vstring
(major, minor, patch, prerelease, prerelease_num) = \
match.group(1, 2, 4, 5, 6)
if patch:
self.version = tuple(map(string.atoi, [major, minor, patch]))
else:
self.version = tuple(map(string.atoi, [major, minor]) + [0])
if prerelease:
self.prerelease = (prerelease[0], string.atoi(prerelease_num))
else:
self.prerelease = None
def getVolume(self):
list = [0x3]
list.append(self.__cmd_reqDict['Volume'])
cmd = self.__makeCommand(list)
length = self.__ser.write(cmd)
time.sleep(0.3)
if length != len(cmd):
self.__logger.error('write %d - %d /dev/ttyS2 failed.', len, len(cmd))
return False
ack = self.__ser.read(self.__ackLen)
if ack.find('OK') == -1:
return False
valueStr = '0x' + ack[2:]
value = string.atoi(valueStr, 16)
return value
#######################################
## get current play device
## 0:USB 2:SPI
#######################################
def getPlayDev(self):
list = [0x3]
list.append(self.__cmd_reqDict['PlayDev'])
cmd = self.__makeCommand(list)
length = self.__ser.write(cmd)
time.sleep(0.3)
if length != len(cmd):
self.__logger.error('write %d - %d /dev/ttyS2 failed.', len, len(cmd))
return False
ack = self.__ser.read(self.__ackLen)
if ack.find('OK') == -1:
return False
valueStr = '0x' + ack[2:]
value = string.atoi(valueStr, 16)
return value
#######################################
## get current music index in Flash
## 1-255
#######################################
def getMusicInFlash(self):
list = [0x3]
list.append(self.__cmd_reqDict['MusicInFlash'])
cmd = self.__makeCommand(list)
length = self.__ser.write(cmd)
time.sleep(0.3)
if length != len(cmd):
self.__logger.error('write %d - %d /dev/ttyS2 failed.', len, len(cmd))
return False
ack = self.__ser.read(self.__ackLen)
if ack.find('OK') == -1:
return False
valueStr = '0x' + ack[2:]
value = string.atoi(valueStr, 16)
return value
def parse (self, vstring):
match = self.version_re.match(vstring)
if not match:
raise ValueError, "invalid version number '%s'" % vstring
(major, minor, patch, prerelease, prerelease_num) = \
match.group(1, 2, 4, 5, 6)
if patch:
self.version = tuple(map(string.atoi, [major, minor, patch]))
else:
self.version = tuple(map(string.atoi, [major, minor]) + [0])
if prerelease:
self.prerelease = (prerelease[0], string.atoi(prerelease_num))
else:
self.prerelease = None
def parse (self, vstring):
match = self.version_re.match(vstring)
if not match:
raise ValueError, "invalid version number '%s'" % vstring
(major, minor, patch, prerelease, prerelease_num) = \
match.group(1, 2, 4, 5, 6)
if patch:
self.version = tuple(map(string.atoi, [major, minor, patch]))
else:
self.version = tuple(map(string.atoi, [major, minor]) + [0])
if prerelease:
self.prerelease = (prerelease[0], string.atoi(prerelease_num))
else:
self.prerelease = None
def encodeHostBlock(self, hostlist):
result = ""
try:
i = 0
while (i < 32):
if (i == 0 or i > len(hostlist)):
address = "127.0.0.1"
else:
address = hostlist[i-1]
a,b,c,d = map(string.atoi, string.split(address, '.'))
result += struct.pack("BBBB", a, b, c, d)
i += 1
except Exception as e:
pass
return result
# Take a 128 byte host block and turns it back into an array
def parse (self, vstring):
match = self.version_re.match(vstring)
if not match:
raise ValueError, "invalid version number '%s'" % vstring
(major, minor, patch, prerelease, prerelease_num) = \
match.group(1, 2, 4, 5, 6)
if patch:
self.version = tuple(map(string.atoi, [major, minor, patch]))
else:
self.version = tuple(map(string.atoi, [major, minor]) + [0])
if prerelease:
self.prerelease = (prerelease[0], string.atoi(prerelease_num))
else:
self.prerelease = None
def strtoip(ipstr):
"""convert an IP address in string form to numeric."""
res = 0L
count = 0
n = string.split(ipstr, '.')
for i in n:
res = res << 8L
ot = string.atoi(i)
if ot < 0 or ot > 255:
raise ValueError, "invalid IP octet"
res = res + ot
count = count + 1
# could be incomplete (short); make it complete.
while count < 4:
res = res << 8L
count = count + 1
return res
def cidrstrerr(str):
"""Check an IP address or CIDR netblock for validity.
Returns None if it is and otherwise an error string."""
if not cvalid.match(str):
return 'Not a syntatically valid IP address or netblock'
rng = 32
pos = string.find(str, '/')
ips = str
if not pos == -1:
rng = string.atoi(ips[pos+1:])
ips = str[:pos]
if rng < 0 or rng > 32:
return 'CIDR length out of range'
n = string.split(ips, '.')
for i in n:
ip = string.atoi(i)
if (ip < 0 or ip > 255):
return 'an IP octet is out of range'
# could check to see if it is 'proper', but.
return None
def parse (self, vstring):
match = self.version_re.match(vstring)
if not match:
raise ValueError, "invalid version number '%s'" % vstring
(major, minor, patch, prerelease, prerelease_num) = \
match.group(1, 2, 4, 5, 6)
if patch:
self.version = tuple(map(string.atoi, [major, minor, patch]))
else:
self.version = tuple(map(string.atoi, [major, minor]) + [0])
if prerelease:
self.prerelease = (prerelease[0], string.atoi(prerelease_num))
else:
self.prerelease = None
def quickload(fname,linestocut=4):
"""
Quickly loads in a long text file, chopping off first n 'linestocut'.
Usage: quickload(fname,linestocut=4)
Returns: array filled with data in fname
"""
f = open(fname,'r')
d = f.readlines()
f.close()
print fname,'read in.'
d = d[linestocut:]
d = map(string.split,d)
print 'Done with string.split on lines.'
for i in range(len(d)):
d[i] = map(string.atoi,d[i])
print 'Conversion to ints done.'
return N.array(d)
def read_info(self, info_file):
"read configuration information from a PCGI info file"
lines = open(info_file).readlines()
directives = {}
try:
for line in lines:
line = string.strip(line)
if not len(line) or line[0] == '#':
continue
k, v = string.split(line, '=', 1)
directives[string.strip(k)] = string.strip(v)
except Exception:
raise ParseError('Error parsing PCGI info file')
self.pid_file = directives.get('PCGI_PID_FILE', None)
self.socket_file = directives.get('PCGI_SOCKET_FILE', None)
if 'PCGI_PORT' in directives:
self.port = string.atoi(directives['PCGI_PORT'])
if 'PCGI_MODULE' in directives:
self.module = directives['PCGI_MODULE']
elif 'PCGI_MODULE_PATH' in directives:
path = directives['PCGI_MODULE_PATH']
path, module = os.path.split(path)
module, ext = os.path.splitext(module)
self.module = module