Python os.path 模块,append() 实例源码
我们从Python开源项目中,提取了以下46个代码示例,用于说明如何使用os.path.append()。
def render_GET(self, request):
request.responseHeaders.setRawHeaders(b"Content-Type", ["text/html"])
data = bytes()
path = []
path.append(os.path.dirname(__file__))
# If we're being run from the build/ directory of a source or git tree,
# append the full path:
if not os.path.isabs(path[0]):
path.insert(0, os.getcwd())
# If we're being run as part of some unittests, get rid of the test dir:
if path[0].endswith("_trial_temp"):
path[0] = path[0].rsplit("_trial_temp")[0]
path.append('API.html')
spec = os.path.sep.join(path)
with open(spec) as fh:
data += bytes(fh.read())
return data
def check_if_match():
diff = 0
bad_chars = defaultdict(list)
minlen = min(len(buffers[0]), len(buffers[1]))
for i in xrange(minlen):
if buffers[0][i] != buffers[1][i]:
diff += 1
bad_chars[buffers[0][i]].append(buffers[1][i])
if len(buffers[0]) > minlen:
bad_chars[-1].append(buffers[1][-1])
elif len(buffers[1]) > minlen:
bad_chars[-1].append(buffers[0][-1])
return (diff, bad_chars)
def deserialize_lms_sig(buffer):
q = deserialize_u32(buffer[0:4])
# print "q: " + str(q)
lmots_type = typecode_peek(buffer[4:8])
# print "lmots_type: " + str(lmots_type)
if lmots_type in lmots_params:
pos = 4 + LmotsSignature.bytes(lmots_type)
else:
raise ValueError(err_unknown_typecode, str(lmots_type))
lmots_sig = buffer[4:pos]
lms_type = typecode_peek(buffer[pos:pos+4])
if lms_type in lms_params:
m, h, LenI = lms_params[lms_type]
else:
raise ValueError(err_unknown_typecode, str(lms_type))
if (q >= 2**h):
raise ValueError(err_bad_value)
pos = pos + 4
path = list()
for i in xrange(0, h):
path.append(buffer[pos:pos+m])
pos = pos + m
# PrintUtl.print_hex("buffer tail", buffer[pos:])
return lms_type, q, lmots_sig, path
def sign(self, message):
while (self.prv[-1].is_exhausted()):
print "level " + str(len(self.prv)) + " is exhausted"
if (len(self.prv) == 1):
raise ValueError(err_private_key_exhausted)
self.prv.pop()
self.pub.pop()
self.sig.pop()
while (len(self.prv) < self.levels):
print "refreshing level " + str(len(self.prv))
self.prv.append(LmsPrivateKey(lms_type=self.prv[0].lms_type, lmots_type=self.prv[0].lmots_type))
self.pub.append(self.prv[-1].get_public_key())
self.sig.append(self.prv[-2].sign(self.pub[-1].serialize()))
# sign message
lms_sig = self.prv[-1].sign(message)
return serialize_hss_sig(self.levels-1, self.pub, self.sig, lms_sig)
def Dependency_Parser(Sentence):
try:
Dep_Parser_Output = Dep_Parser(Sentence)
Dep_Parser_Tags = []
for Token in Dep_Parser_Output:
Dep, Head, Orth = Token.dep_, Token.head.orth_, Token.orth_
if Dep == u'ROOT':
Dep = u'root'
Head = u'ROOT'
Dep_Parser_Tags.append([Dep, Head, Orth])
return Dep_Parser_Tags
except Exception as e :
print e
return False
def Save_Wiki_IDs(self):
if Create_Folder(Path_Directory + '/Data/Wiki_IDs/', Flag_Remove = False):
Sub_Classes = []
Wiki_IDs_File = io.open(Path_Directory + '/Data/Wiki_IDs/{}'.format(self.Class), 'w', encoding = 'utf8')
Results_Sub_Classes = SPARQL_Commands(Query_Subclasses % self.Class)
if Results_Sub_Classes:
for Result in Results_Sub_Classes["results"]["bindings"]:
Sub_Classes.append(Result["x"]["value"])
print 'Accessed Class: {}'.format(self.Class)
print 'No. of Sub Classes: {}'.format(len(Sub_Classes))
for Subclass in Sub_Classes:
print Subclass
Results_Wiki_ID = SPARQL_Commands(Query_Wiki_ID % '<{}>'.format(Subclass))
if Results_Wiki_ID:
for Result in Results_Wiki_ID["results"]["bindings"]:
Wiki_IDs_File.write(unicode(Result["wikiid"]["value"]))
Wiki_IDs_File.write(u'\n')
Wiki_IDs_File.close()
def search(self, from_vertex):
self.discovered[from_vertex] = True
self.entered[from_vertex] = self.count
self.count += 1
for i in self.graph[from_vertex]:
if i not in self.discovered:
self.parents[i] = from_vertex
self.search(i)
elif self.entered[i] < self.entered[from_vertex] and i not in self.exited:
raise CircularDependencyDetected(self.path(i, from_vertex)+[i])
elif i not in self.parents:
self.parents[i] = from_vertex
self.exited[from_vertex] = self.count
self.count += 1
self.topological_order.append(from_vertex)
def load(self, keys, interpolate=True, raw=False):
projectables = []
for key in keys:
dataset = self.sd.select(key.name.capitalize())
fill_value = dataset.attributes()["_FillValue"]
try:
scale_factor = dataset.attributes()["scale_factor"]
except KeyError:
scale_factor = 1
data = np.ma.masked_equal(dataset.get(), fill_value) * scale_factor
# TODO: interpolate if needed
if key.resolution is not None and key.resolution < self.resolution and interpolate:
data = self._interpolate(data, self.resolution, key.resolution)
if raw:
projectables.append(data)
else:
projectables.append(Dataset(data, id=key))
return projectables
def build_deepwalk_corpus(G, num_paths, path_length, alpha=0, rand=random.Random(0)):
'''
extract the walks form the graph used for context embeddings
:param G: graph
:param num_paths: how many random walks to form a sentence
:param path_length: how long each path -> length of the sentence
:param alpha: restart probability
:param rand: random function
:return:
'''
walks = []
nodes = list(G.nodes())
for cnt in range(num_paths):
rand.shuffle(nodes)
for node in nodes:
walks.append(__random_walk__(G, path_length, rand=rand, alpha=alpha, start=node))
return np.array(walks)
def v1_subtopic_list(request, response, kvlclient, fid, sfid):
'''Retrieves a list of items in a subfolder.
The route for this endpoint is:
``GET /dossier/v1/folder/<fid>/subfolder/<sfid>``.
(Temporarily, the "current user" can be set via the
``annotator_id`` query parameter.)
The payload returned is a list of two element arrays. The first
element in the array is the item's content id and the second
element is the item's subtopic id.
'''
path = urllib.unquote(fid) + '/' + urllib.unquote(sfid)
try:
items = []
for it in new_folders(kvlclient, request).list(path):
if '@' in it.name:
items.append(it.name.split('@'))
else:
items.append((it.name, None))
return items
except KeyError:
response.status = 404
return []
def normalize_input(self):
input = []
for line in self.input.split('\n'):
line = line.strip()
line2 = line.encode('string-escape')
input.append(line2)
self.input = '\n'.join(input)
def post_process_bytes_line(line):
outb = []
l = line.strip()[:]
strip = ['0x', ',', ' ', '\\', 'x', '%u', '+', '.', "'", '"']
for s in strip:
l = l.replace(s, '')
for i in xrange(0, len(l), 2):
outb.append(int(l[i:i+2], 16))
return outb
def draw_chunk_table(comp):
''' Outputs a table that compares the found memory chunks side-by-side
in input file vs. memory '''
table = [('', '', '', '', 'File', 'Memory', 'Note')]
delims = (' ', ' ', ' ', ' | ', ' | ', ' | ', '')
last_unmodified = comp.get_last_unmodified_chunk()
for c in comp.get_chunks():
if c.dy == 0: note = 'missing'
elif c.dx > c.dy: note = 'compacted'
elif c.dx < c.dy: note = 'expanded'
elif c.unmodified: note = 'unmodified!'
else: note = 'corrupted'
table.append((c.i, c.j, c.dx, c.dy, shorten_bytes(c.xchunk), shorten_bytes(c.ychunk), note))
# draw the table
sizes = tuple(max(len(str(c)) for c in col) for col in zip(*table))
for i, row in enumerate(table):
out('\t' + ''.join(str(x).ljust(size) + delim for x, size, delim in zip(row, sizes, delims)))
if i == 0 or (i == last_unmodified + 1 and i < len(table)):
out('\t' + '-' * (sum(sizes) + sum(len(d) for d in delims)))
#
# Memory comparison algorithm originally taken from Mona.py by Peter Van Eeckhoutte - Corelan GCV
# https://github.com/corelan/mona
#
# It utilizes modified Longest Common Subsequence algorithm to mark number of modifications over
# supplied input to let it be transformed into another input, as compared to.
#
def get_grid(self):
''' Builds a 2-d suffix grid for our DP algorithm. '''
x = self.x
y = self.y[:len(x)*2]
width, height = len(x), len(y)
values = [[0] * (width + 1) for j in range(height + 1)]
moves = [[0] * (width + 1) for j in range(height + 1)]
equal = [[x[i] == y[j] for i in range(width)] for j in range(height)]
equal.append([False] * width)
for j, i in itertools.product(rrange(height + 1), rrange(width + 1)):
value = values[j][i]
if i >= 1 and j >= 1:
if equal[j-1][i-1]:
values[j-1][i-1] = value + 1
moves[j-1][i-1] = 2
elif value > values[j][i-1]:
values[j-1][i-1] = value
moves[j-1][i-1] = 2
if i >= 1 and not equal[j][i-1] and value - 2 > values[j][i-1]:
values[j][i-1] = value - 2
moves[j][i-1] = 1
if i >= 1 and j >= 2 and not equal[j-2][i-1] and value - 1 > values[j-2][i-1]:
values[j-2][i-1] = value - 1
moves[j-2][i-1] = 3
return (values, moves)
def construct_comparator_dump(self, mapping):
def toprint(x, src):
c = x
if len(c) == 0: c = ' '
elif len(c) == 2: c = x[1]
if ord(c) >= 0x20 and ord(c) < 0x7f:
return c
else:
return '.'
for i, chunk in enumerate(HexDumpPrinter.extract_chunks(mapping)):
chunk = list(chunk) # save generator result in a list
src, mapped = zip(*chunk)
values = []
for left, right in zip(src, mapped):
if left == right: values.append('') # byte matches original
elif len(right) == 0: values.append('-1') # byte dropped
elif len(right) == 2: values.append('+1') # byte expanded
else: values.append(bin2hex(right)) # byte modified
line1 = '%04x' % (i * 16) + ' | ' + bin2hex(src).ljust(49, ' ')
line2 = '%04x' % (i * 16) + ' | ' + ' '.join(sym.ljust(2, self.fill_matching) for sym in values)
line1 += '| ' + ''.join(map(lambda x: x if ord(x) >= 0x20 and ord(x) < 0x7f else '.', src)).ljust(16, ' ')
ascii2 = '| '
for i in range(len(values)): ascii2 += toprint(values[i], src[i])
for i in range(len(values), 16): ascii2 += ' '
line2 = line2.ljust(56, ' ')
line2 += ascii2
#out(dbg("Line1: ('%s')" % line1))
#out(dbg("Line2: ('%s')" % line2))
self.dump1.append(line1)
self.dump2.append(line2)
def extract_bytes(line):
linet = line.split(' | ')
strbytes = [linet[1][i:i+2] for i in range(0, len(linet[1]), 3)]
bytes = []
for s in strbytes:
bytes.append(s)
return bytes
def get_packages_path(self):
""" return the list of path to search packages for depending on client OS and architecture """
path=[]
if self.is_windows():
if self.is_proc_arch_64_bits():
path.append(os.path.join(ROOT, "packages","windows","amd64"))
path.append(os.path.join("packages","windows","amd64"))
else:
path.append(os.path.join(ROOT, "packages","windows","x86"))
path.append(os.path.join("packages","windows","x86"))
path.append(os.path.join(ROOT, "packages","windows","all"))
path.append(os.path.join("packages","windows","all"))
elif self.is_unix():
if self.is_proc_arch_64_bits():
path.append(os.path.join(ROOT, "packages","linux","amd64"))
path.append(os.path.join("packages","linux","amd64"))
else:
path.append(os.path.join(ROOT, "packages","linux","x86"))
path.append(os.path.join("packages","linux","x86"))
path.append(os.path.join(ROOT, "packages","linux","all"))
path.append(os.path.join("packages","linux","all"))
if self.is_android():
path.append(os.path.join(ROOT, "packages","android"))
path.append(os.path.join("packages","android"))
path.append(os.path.join(ROOT, "packages","all"))
path.append(os.path.join("packages","all"))
return path
def deserialize(cls, buffer):
lmots_type = typecode_peek(buffer[0:4])
if lmots_type in lmots_params:
n, p, w, ls = lmots_params[lmots_type]
else:
raise ValueError(err_unknown_typecode, str(lmots_type))
if (len(buffer) != cls.bytes(lmots_type)):
raise ValueError(err_bad_length)
C = buffer[4:n+4]
y = list()
pos = n+4
for i in xrange(0, p):
y.append(buffer[pos:pos+n])
pos = pos + n
return cls(C, y, lmots_type)
def __init__(self, S=None, SEED=None, lmots_type=lmots_sha256_n32_w8):
n, p, w, ls = lmots_params[lmots_type]
if S is None:
self.S = entropySource.read(n)
else:
self.S = S
self.x = list()
if SEED is None:
for i in xrange(0, p):
self.x.append(entropySource.read(n))
else:
for i in xrange(0, p):
self.x.append(H(self.S + SEED + u16str(i+1) + D_PRG))
self.type = lmots_type
self._signatures_remaining = 1
def sign(self, message):
if self._signatures_remaining != 1:
raise ValueError(err_private_key_exhausted)
n, p, w, ls = lmots_params[self.type]
C = entropySource.read(n)
hashQ = H(self.S + C + message + D_MESG)
V = hashQ + checksum(hashQ, w, ls)
y = list()
for i, x in enumerate(self.x):
tmp = x
for j in xrange(0, coef(V, i, w)):
tmp = H(self.S + tmp + u16str(i) + u8str(j) + D_ITER)
y.append(tmp)
self._signatures_remaining = 0
return LmotsSignature(C, y, self.type).serialize()
def get_param_list(cls):
param_list = list()
for t in lmots_params.keys():
param_list.append({'lmots_type':t})
return param_list
def get_path(self, node_num):
path = list()
while node_num > 1:
if (node_num % 2):
path.append(self.nodes[node_num - 1])
else:
path.append(self.nodes[node_num + 1])
node_num = node_num/2
return path
def get_param_list(cls):
param_list = list()
for x in lmots_params.keys():
for y in lms_params.keys():
param_list.append({'lmots_type':x, 'lms_type':y})
return param_list
def deserialize_hss_sig(buffer):
hss_max_levels = 8
levels = deserialize_u32(buffer[0:4]) + 1
if (levels > hss_max_levels):
raise ValueError(err_bad_value)
siglist = list()
publist = list()
tmp = buffer[4:]
for i in xrange(0, levels-1):
lms_sig, tmp = parse_lms_sig(tmp)
siglist.append(lms_sig)
lms_pub, tmp = LmsPublicKey.parse(tmp)
publist.append(lms_pub)
msg_sig = tmp
return levels, publist, siglist, msg_sig
def __init__(self, levels=2, lms_type=lms_sha256_m32_h5, lmots_type=lmots_sha256_n32_w8, prv0=None):
self.levels = levels
self.prv = list()
self.pub = list()
self.sig = list()
if prv0 is None:
prv0 = LmsPrivateKey(lms_type=lms_type, lmots_type=lmots_type)
self.prv.append(prv0)
self.pub.append(self.prv[0].get_public_key())
for i in xrange(1, self.levels):
self.prv.append(LmsPrivateKey(lms_type=lms_type, lmots_type=lmots_type))
self.pub.append(self.prv[-1].get_public_key())
self.sig.append(self.prv[-2].sign(self.pub[-1].serialize()))
def get_packages_path(self):
""" return the list of path to search packages for depending on client OS and architecture """
path=[]
if self.is_windows():
if self.is_proc_arch_64_bits():
path.append(os.path.join(ROOT, "packages","windows","amd64"))
path.append(os.path.join("packages","windows","amd64"))
else:
path.append(os.path.join(ROOT, "packages","windows","x86"))
path.append(os.path.join("packages","windows","x86"))
path.append(os.path.join(ROOT, "packages","windows","all"))
path.append(os.path.join("packages","windows","all"))
elif self.is_unix():
if self.is_proc_arch_64_bits():
path.append(os.path.join(ROOT, "packages","linux","amd64"))
path.append(os.path.join("packages","linux","amd64"))
else:
path.append(os.path.join(ROOT, "packages","linux","x86"))
path.append(os.path.join("packages","linux","x86"))
path.append(os.path.join(ROOT, "packages","linux","all"))
path.append(os.path.join("packages","linux","all"))
if self.is_android():
path.append(os.path.join(ROOT, "packages","android"))
path.append(os.path.join("packages","android"))
path.append(os.path.join(ROOT, "packages","all"))
path.append(os.path.join("packages","all"))
return path
def path(self, from_vertex, to_vertex):
path = []
i = to_vertex
path.append(i)
while i in self.parents and i is not from_vertex:
i = self.parents[i]
path.append(i)
if i is not from_vertex:
raise NoDependencyPathFound(from_vertex, to_vertex)
path.reverse()
return path
def compute_all_relevant_interfaces(cls, main_egg):
interfaces = []
for i in cls.compute_ordered_dependent_distributions(main_egg):
entry_map = i.get_entry_map('reahl.eggs')
if entry_map:
classes = list(entry_map.values())
assert len(classes) == 1, 'Only one eggdeb class per egg allowed'
interfaces.append(classes[0].load()(i))
return interfaces
def read_mda(self, attribute):
lines = attribute.split('\n')
mda = {}
current_dict = mda
path = []
for line in lines:
if not line:
continue
if line == 'END':
break
key, val = line.split('=')
key = key.strip()
val = val.strip()
try:
val = eval(val)
except NameError:
pass
if key in ['GROUP', 'OBJECT']:
new_dict = {}
path.append(val)
current_dict[val] = new_dict
current_dict = new_dict
elif key in ['END_GROUP', 'END_OBJECT']:
if val != path[-1]:
raise SyntaxError
path = path[:-1]
current_dict = mda
for item in path:
current_dict = current_dict[item]
elif key in ['CLASS', 'NUM_VAL']:
pass
else:
current_dict[key] = val
return mda
def file_1(b): #????????
file_list=[]
file_name=os.listdir(b) #????????????
os.chdir(b) #?????????????????????
new_path2=os.getcwd() #??????
for i in file_name:
merge_path_and_file=os.path.join(new_path2,i) #??????????????
file_list.append(merge_path_and_file)
return file_list
def input_path(a): #????????
path=[]
path2=file_1(a) #????????
for o in path2:
if os.path.isdir(o): #????????
input_path(o) # ??????????
path.append(o)
else:
print(o) # ?????????????????
return path
def __random_walk__(G, path_length, alpha=0, rand=random.Random(), start=None):
'''
Returns a truncated random walk.
:param G: networkx graph
:param path_length: Length of the random walk.
:param alpha: probability of restarts.
:param rand: random number generator
:param start: the start node of the random walk.
:return:
'''
if start:
path = [start]
else:
# Sampling is uniform w.r.t V, and not w.r.t E
path = [rand.choice(G.nodes)]
while len(path) < path_length:
cur = path[-1]
if len(G.neighbors(cur)) > 0:
if rand.random() >= alpha:
path.append(rand.choice(G.neighbors(cur)))
else:
path.append(path[0])
else:
break
return path
def write_walks_to_disk(G, filebase, num_paths, path_length, alpha=0, rand=random.Random(0), num_workers=cpu_count()):
'''
save the random walks on files so is not needed to perform the walks at each execution
:param G: graph to walks on
:param filebase: location where to save the final walks
:param num_paths: number of walks to do for each node
:param path_length: lenght of each walks
:param alpha: restart probability for the random walks
:param rand: generator of random numbers
:param num_workers: number of thread used to execute the job
:return:
'''
global __current_graph
global __vertex2str
__current_graph = G
__vertex2str = {v:str(v) for v in G.nodes()}
files_list = ["{}.{}".format(filebase, str(x)) for x in range(num_paths)]
expected_size = len(G)
args_list = []
files = []
if num_paths <= num_workers:
paths_per_worker = [1 for x in range(num_paths)]
else:
paths_per_worker = [len(list(filter(lambda z: z!= None, [y for y in x]))) for x in grouper(int(num_paths / num_workers)+1, range(1, num_paths+1))]
with ProcessPoolExecutor(max_workers=num_workers) as executor:
for size, file_, ppw in zip(executor.map(count_lines, files_list), files_list, paths_per_worker):
args_list.append((ppw, path_length, alpha, random.Random(rand.randint(0, 2**31)), file_))
with ProcessPoolExecutor(max_workers=num_workers) as executor:
for file_ in executor.map(_write_walks_to_disk, args_list):
files.append(file_)
return files
def make_undirected(self):
t0 = time()
for v in self.keys():
for other in self[v]:
if v != other:
self[other].append(v)
t1 = time()
logger.info('make_directed: added missing edges {}s'.format(t1-t0))
self.make_consistent()
return self
def random_walk(self, path_length, alpha=0, rand=random.Random(), start=None):
""" Returns a truncated random walk.
path_length: Length of the random walk.
alpha: probability of restarts.
start: the start node of the random walk.
"""
G = self
if start:
path = [start]
else:
# Sampling is uniform w.r.t V, and not w.r.t E
path = [rand.choice(G.keys())]
while len(path) < path_length:
cur = path[-1]
if len(G[cur]) > 0:
if rand.random() >= alpha:
path.append(rand.choice(G[cur]))
else:
path.append(path[0])
else:
break
return path
# TODO add build_walks in here
def build_deepwalk_corpus(G, num_paths, path_length, alpha=0,
rand=random.Random(0)):
walks = []
nodes = list(G.nodes())
for cnt in range(num_paths):
rand.shuffle(nodes)
for node in nodes:
walks.append(G.random_walk(path_length, rand=rand, alpha=alpha, start=node))
return walks
def load_edgelist(file_, undirected=True):
G = Graph()
with open(file_) as f:
for l in f:
x, y = l.strip().split()[:2]
x = int(x)
y = int(y)
G[x].append(y)
if undirected:
G[y].append(x)
G.make_consistent()
return G
def from_numpy(x, undirected=True):
G = Graph()
if issparse(x):
cx = x.tocoo()
for i,j,v in zip(cx.row, cx.col, cx.data):
G[i].append(j)
else:
raise Exception("Dense matrices not yet supported.")
if undirected:
G.make_undirected()
G.make_consistent()
return G
def make_path(fid, sfid=None, cid=None, subid=None):
path = [urllib.unquote(fid)]
if sfid is not None:
path.append(urllib.unquote(sfid))
if cid is not None:
if subid is not None:
path.append(cid + '@' + subid)
else:
path.append(cid)
return '/'.join(path)
def add_link_headers(page, per):
links = []
# Add the "first" and "prev" links.
if page > 1:
links.append(('first', setqp('page', '1')))
links.append(('prev', setqp('page', str(page - 1))))
# We never really know when the stream ends, so there is always a
# "next" link.
links.append(('next', setqp('page', str(page + 1))))
response.headers['Link'] = ', '.join(map(tuple_to_link, links))
def cmd_args(self):
base = super(Docker, self).cmd_args()
args = []
if self.rm:
args.append('--rm')
return [
'docker',
'run',
'-i',
'--name',
self.host,
] + args + [self.image] + base
def cmd_args(self):
args = ['ssh', '-o', 'PasswordAuthentication=no']
if self.user:
args.extend(['-l', self.user])
args.append(self.host)
if self.sudo:
args.append('sudo')
args.extend(super(SSHTunnel, self).cmd_args())
return ['"%s"' % w if ' ' in w else w for w in args]
# An alias, because this is the default tunnel type
def get_blocks(self):
'''
Compares two binary strings under the assumption that y is the result of
applying the following transformations onto x:
* change single bytes in x (likely)
* expand single bytes in x to two bytes (less likely)
* drop single bytes in x (even less likely)
Returns a generator that yields elements of the form (unmodified, xdiff, ydiff),
where each item represents a binary chunk with "unmodified" denoting whether the
chunk is the same in both strings, "xdiff" denoting the size of the chunk in x
and "ydiff" denoting the size of the chunk in y.
Example:
>>> x = "abcdefghijklm"
>>> y = "mmmcdefgHIJZklm"
>>> list(MemoryComparator(x, y).get_blocks())
[(False, 2, 3), (True, 5, 5),
(False, 3, 4), (True, 3, 3)]
'''
x, y = self.x, self.y
_, moves = self.get_grid()
# walk the grid
path = []
i, j = 0, 0
while True:
dy, dx = self.move_to_gradient[moves[j][i]]
if dy == dx == 0: break
path.append((dy == 1 and x[i] == y[j], dy, dx))
j, i = j + dy, i + dx
for i, j in zip(range(i, len(x)), itertools.count(j)):
if j < len(y): path.append((x[i] == y[j], 1, 1))
else: path.append((False, 0, 1))
i = j = 0
for unmodified, subpath in itertools.groupby(path, itemgetter(0)):
ydiffs = map(itemgetter(1), subpath)
dx, dy = len(ydiffs), sum(ydiffs)
yield unmodified, dx, dy
i += dx
j += dy
def reconstruct_line(letter, line, bytes):
bytes_line = ''
linet = line.split(' | ')
color_address = False
diff_indexes = []
i = 0
for b in bytes:
if len(b) != 2:
# difference
diff_indexes.append(i)
color_address = True
if len(b) == 4:
# not colored difference
l = list(bytes_line)
if len(l) > 1:
l[-1] = b[0]
bytes_line = ''.join(l)
bytes_line += b[1:]
else:
# colored difference
bytes_line += b + ' '
else:
bytes_line += b + ' '
i += 1
address = linet[0]
ascii = linet[2]
for b in range(len(bytes), 16):
bytes_line += ' ' * 3
ascii += ' '
if options.colored:
new_ascii = ''
for j in range(len(ascii)):
if j in diff_indexes:
new_ascii += bcolors.FAIL + ascii[j] + bcolors.ENDC
else:
new_ascii += ascii[j]
new_ascii, ascii = ascii, new_ascii
if color_address or len(letter) > 1:
if options.colored:
address = bcolors.OKBLUE + address + bcolors.ENDC
else:
address = address
return '{}{} | {} | {}'.format(letter, address, bytes_line, ascii.ljust(16))
def load_movie_data():
# Movie data files used for building the graph
movies_directors_filename = "./data/movie_directors.dat"
movies_actors_filename = "./data/movie_actors.dat"
movies_genres_filename = "./data/movie_genres.dat"
movies_filename = "./data/movies.dat"
# Load the data about the movies into a dictionary
# The dictionary maps a movie ID to a movie object
# Also store the unique directors, actors, and genres
movies = {}
with open(movies_filename, "r") as fin:
fin.next() # burn metadata line
for line in fin:
m_id, name = line.strip().split()[:2]
movies["m"+m_id] = Movie(name)
directors = set([])
with open(movies_directors_filename, "r") as fin:
fin.next() # burn metadata line
for line in fin:
m_id, director = line.strip().split()[:2]
if "m"+m_id in movies:
movies["m"+m_id].director = director
directors.add(director)
actors = set([])
with open(movies_actors_filename, "r") as fin:
fin.next() # burn metadata line
for line in fin:
m_id, actor = line.strip().split()[:2]
if "m"+m_id in movies:
movies["m"+m_id].actors.append(actor)
actors.add(actor)
genres = set([])
with open(movies_genres_filename, "r") as fin:
fin.next() # burn metadata line
for line in fin:
m_id, genre = line.strip().split()
if "m"+m_id in movies:
movies["m"+m_id].genres.append(genre)
genres.add(genre)
return movies, directors, actors, genres
def _connect_async(self, callback):
if self.connected:
callback(None)
return
try:
path = sys._chopsticks_path[:]
except AttributeError:
path = []
path.append(self.host)
if len(path) > chopsticks.DEPTH_LIMIT:
raise DepthLimitExceeded(
'Depth limit of %s exceeded at %s' % (
chopsticks.DEPTH_LIMIT,
' -> '.join(path)
)
)
self.connect_pipes()
self.reader = loop.reader(self.rpipe, self)
self.writer = loop.writer(self.wpipe)
def wrapped_callback(res):
self.connected = not isinstance(res, ErrorResult)
if self.connected:
# Remote sends a pickle_version in response to OP_START
self.pickle_version = min(self.HIGHEST_PICKLE_PROTOCOL, res)
callback(res)
self.callbacks[0] = wrapped_callback
self.reader.start()
self.writer.write_raw(bubble)
self.write_msg(
OP_START,
req_id=0,
host=self.host,
path=path,
depthlimit=chopsticks.DEPTH_LIMIT,
)
self.errreader = ioloop.StderrReader(errloop, self.epipe, self.host)
start_errloop()