我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用tempfile._get_candidate_names()。
def __init__(self, module, rootPupyPath): ''' ''' self.module = module self.rootPupyPath = rootPupyPath #Constants self.x86PowershellPath = "syswow64\WindowsPowerShell\\v1.0\\powershell.exe" self.x64PowershellPath = "system32\\WindowsPowerShell\\v1.0\\powershell.exe" #Remote paths self.remoteTempFolder=self.module.client.conn.modules['os.path'].expandvars("%TEMP%") self.systemRoot = self.module.client.conn.modules['os.path'].expandvars("%SYSTEMROOT%") self.invokeReflectivePEInjectionRemotePath = "{0}.{1}".format(self.module.client.conn.modules['os.path'].join(self.remoteTempFolder, next(_get_candidate_names())), '.txt') self.mainPowershellScriptRemotePath = "{0}.{1}".format(self.module.client.conn.modules['os.path'].join(self.remoteTempFolder, next(_get_candidate_names())), '.ps1') self.pupyDLLRemotePath = "{0}.{1}".format(self.module.client.conn.modules['os.path'].join(self.remoteTempFolder, next(_get_candidate_names())), '.txt') self.invokeBypassUACRemotePath = "{0}.{1}".format(self.module.client.conn.modules['os.path'].join(self.remoteTempFolder, next(_get_candidate_names())), '.ps1') #Define Local paths self.pupyDLLLocalPath = os.path.join(gettempdir(),'dllFile.txt') self.mainPowerShellScriptPrivilegedLocalPath = os.path.join(gettempdir(),'mainPowerShellScriptPrivileged.txt') self.invokeReflectivePEInjectionLocalPath = os.path.join(self.rootPupyPath,"pupy", "external", "PowerSploit", "CodeExecution", "Invoke-ReflectivePEInjection.ps1") self.invokeBypassUACLocalPath = os.path.join(rootPupyPath, "pupy", "external", "Empire", "privesc", "Invoke-BypassUAC.ps1") #Others self.HKCU = self.module.client.conn.modules['_winreg'].HKEY_CURRENT_USER if "64" in self.module.client.desc['proc_arch']: self.powershellPath = self.module.client.conn.modules['os.path'].join(self.systemRoot, self.x64PowershellPath) else: powershellPath = self.module.client.conn.modules['os.path'].join(self.systemRoot, self.x86PowershellPath)
def upx_unpack(self, seed=None): # dump bytez to a temporary file tmpfilename = os.path.join( tempfile._get_default_tempdir(), next(tempfile._get_candidate_names())) with open(tmpfilename, 'wb') as outfile: outfile.write(self.bytez) with open(os.devnull, 'w') as DEVNULL: retcode = subprocess.call( ['upx', tmpfilename, '-d', '-o', tmpfilename + '_unpacked'], stdout=DEVNULL, stderr=DEVNULL) os.unlink(tmpfilename) if retcode == 0: # sucessfully unpacked with open(tmpfilename + '_unpacked', 'rb') as result: self.bytez = result.read() os.unlink(tmpfilename + '_unpacked') return self.bytez
def test_validate_file_or_dict(self): # verify user folder is expanded before load the file temp_name = next(tempfile._get_candidate_names()) file_path = '~/' + temp_name local_file_path = os.path.expanduser(file_path) with open(local_file_path, 'w') as f: f.write('{"prop":"val"}') # verify we load the json content correctly try: res = validate_file_or_dict(file_path) self.assertEqual(res['prop'], "val") finally: os.remove(local_file_path) # verify expanduser call won't mess up the json data data = '{"~d": "~/haha"}' res = validate_file_or_dict(data) self.assertEqual(res['~d'], '~/haha') # verify expanduser call again, but use single quot data = "{'~d': '~/haha'}" res = validate_file_or_dict(data) self.assertEqual(res['~d'], '~/haha')
def test_file_loading1(self): data = self.data[:1000] directory = tempfile._get_default_tempdir() filename = next(tempfile._get_candidate_names()) filename = directory + os.sep + filename + ".txt" np.savetxt(filename, data) consumer = ChainConsumer() consumer.add_chain(filename) summary = consumer.analysis.get_summary() actual = np.array(list(summary.values())[0]) assert np.abs(actual[1] - 5.0) < 0.5
def test_file_loading2(self): data = self.data[:1000] directory = tempfile._get_default_tempdir() filename = next(tempfile._get_candidate_names()) filename = directory + os.sep + filename + ".npy" np.save(filename, data) consumer = ChainConsumer() consumer.add_chain(filename) summary = consumer.analysis.get_summary() actual = np.array(list(summary.values())[0]) assert np.abs(actual[1] - 5.0) < 0.5
def get_temp_file_name(tmp_dir=None, extension=''): """Return an availiable name for temporary file.""" tmp_name = next(tempfile._get_candidate_names()) if not tmp_dir: tmp_dir = tempfile._get_default_tempdir() if extension is not None: tmp_name = tmp_name + '.' + extension return os.path.join(tmp_dir, tmp_name)
def get_temp_file_name(tmp_dir=None, extension=''): """Return an availiable name for temporary file.""" if tmp_dir is None: tmp_dir = iCount.TMP_ROOT # pylint: disable=protected-access tmp_name = next(tempfile._get_candidate_names()) if not tmp_dir: # pylint: disable=protected-access tmp_dir = tempfile._get_default_tempdir() if extension is not None: tmp_name = tmp_name + '.' + extension return os.path.join(tmp_dir, tmp_name)
def process(self, fuzzresult): temp_name = next(tempfile._get_candidate_names()) defult_tmp_dir = tempfile._get_default_tempdir() filename = os.path.join(defult_tmp_dir, temp_name + ".png") subprocess.call(['cutycapt', '--url=%s' % pipes.quote(fuzzresult.url), '--out=%s' % filename]) self.add_result("Screnshot taken, output at %s" % filename)
def upx_pack(self, seed=None): # tested with UPX 3.91 random.seed(seed) tmpfilename = os.path.join( tempfile._get_default_tempdir(), next(tempfile._get_candidate_names())) # dump bytez to a temporary file with open(tmpfilename, 'wb') as outfile: outfile.write(self.bytez) options = ['--force', '--overlay=copy'] compression_level = random.randint(1, 9) options += ['-{}'.format(compression_level)] # --exact # compression levels -1 to -9 # --overlay=copy [default] # optional things: # --compress-exports=0/1 # --compress-icons=0/1/2/3 # --compress-resources=0/1 # --strip-relocs=0/1 options += ['--compress-exports={}'.format(random.randint(0, 1))] options += ['--compress-icons={}'.format(random.randint(0, 3))] options += ['--compress-resources={}'.format(random.randint(0, 1))] options += ['--strip-relocs={}'.format(random.randint(0, 1))] with open(os.devnull, 'w') as DEVNULL: retcode = subprocess.call( ['upx'] + options + [tmpfilename, '-o', tmpfilename + '_packed'], stdout=DEVNULL, stderr=DEVNULL) os.unlink(tmpfilename) if retcode == 0: # successfully packed with open(tmpfilename + '_packed', 'rb') as infile: self.bytez = infile.read() os.unlink(tmpfilename + '_packed') return self.bytez
def temporary_file_name(): return os.path.join(temp_dir, next(tempfile._get_candidate_names()))
def test_retval(self): # _get_candidate_names returns a _RandomNameSequence object obj = tempfile._get_candidate_names() self.assertIsInstance(obj, tempfile._RandomNameSequence)
def test_same_thing(self): # _get_candidate_names always returns the same object a = tempfile._get_candidate_names() b = tempfile._get_candidate_names() self.assertTrue(a is b)
def _mock_candidate_names(*names): return support.swap_attr(tempfile, '_get_candidate_names', lambda: iter(names))
def merge(self, verify=True): temp_name = next(tempfile._get_candidate_names()) remote_path = "/var/config/rest/downloads/{0}".format(temp_name) temp_path = '/tmp/' + temp_name if self.client.check_mode: return True self.upload_to_device(temp_name) self.move_on_device(remote_path) response = self.merge_on_device( remote_path=temp_path, verify=verify ) self.remove_temporary_file(remote_path=temp_path) return response
def src(self): if self._values['src'] is not None: return self._values['src'] result = next(tempfile._get_candidate_names()) return result
def py_func(func, inp, Tout, name=None, grad=None): """Redfine tf.py_func to include gradients""" temp_name = next(tempfile._get_candidate_names()) _name = 'PyFuncGrad%s' %temp_name; tf.RegisterGradient(_name)(grad) g = tf.get_default_graph() with g.gradient_override_map({"PyFunc": _name}): return tf.py_func(func, inp, Tout, name=name)
def _find_attachment_ids_email(self): atts = super(InvoiceEletronic, self)._find_attachment_ids_email() attachment_obj = self.env['ir.attachment'] if self.model not in ('009'): return atts tmp = tempfile._get_default_tempdir() temp_name = os.path.join(tmp, next(tempfile._get_candidate_names())) command_args = ["--dpi", "84", str(self.url_danfe), temp_name] wkhtmltopdf = [_get_wkhtmltopdf_bin()] + command_args process = subprocess.Popen(wkhtmltopdf, stdout=subprocess.PIPE, stderr=subprocess.PIPE) out, err = process.communicate() if process.returncode not in [0, 1]: raise UserError(_('Wkhtmltopdf failed (error code: %s). ' 'Message: %s') % (str(process.returncode), err)) tmpDanfe = None with open(temp_name, 'r') as f: tmpDanfe = f.read() try: os.unlink(temp_name) except (OSError, IOError): _logger.error('Error when trying to remove file %s' % temp_name) if tmpDanfe: danfe_id = attachment_obj.create(dict( name="Danfe-%08d.pdf" % self.numero, datas_fname="Danfe-%08d.pdf" % self.numero, datas=base64.b64encode(tmpDanfe), mimetype='application/pdf', res_model='account.invoice', res_id=self.invoice_id.id, )) atts.append(danfe_id.id) return atts
def write_s3(bucket, key, rekognition_faces_response_json, rekognition_faces_response_csv): print('inside write s3 function (bucket: {}, key: {}, json: {}, csv: {})' .format(bucket, key, rekognition_faces_response_json, rekognition_faces_response_csv)) tmp_filename = 'tmp{}'.format(next(tempfile._get_candidate_names())) print('tmp filename: {}'.format(tmp_filename)) # write csv file f = open('/tmp/{}.csv'.format(tmp_filename), 'w') f.write(rekognition_faces_response_csv) f.close() f = open('/tmp/{}.csv'.format(tmp_filename), 'r') s3client.upload_fileobj(f, Bucket=bucket, Key='csv/' + key + '.csv') f.close() # write json file f = open('/tmp/{}.json'.format(tmp_filename), 'w') f.write(rekognition_faces_response_json) f.close() f = open('/tmp/{}.json'.format(tmp_filename), 'r') s3client.upload_fileobj(f, Bucket=bucket, Key='json/' + key + '.json') f.close() # --------------- Main handler ------------------
def generate_tmp_filename(extension): return tempfile._get_default_tempdir() + "/" + next(tempfile._get_candidate_names()) + "." + extension
def temp_filename(directory, suffix): temp_name = next(tempfile._get_candidate_names()) filename = os.path.join(directory, temp_name + suffix) return filename # run a 2-ary function on two things -- loop over elements pairwise if the # things are lists
def setUp(self): # Get a random temporary file to use for testing self.tmp_status_file = next(tempfile._get_candidate_names()) self.app.config['HEALTHCHECK_STATUS_FILE'] = self.tmp_status_file super(HealthViewTestCaseMixin, self).setUp()
def get_new_file_name(): # return tempfile._get_candidate_names() global ci while True: ci += 1 ci = ci % max_file_i yield str(ci)
def get_images_urls(soup): a = soup.find_all("a",{"class":"gallery__item carousel__item"}) if len(a) == 0: return [] images_urls = [] for img in a: h = img.get("href") parsed = urlparse.urlparse(h) image = urlparse.parse_qs(parsed.query)['img_url'][0] images_urls.append(image) return images_urls #temp_name = next(tempfile._get_candidate_names())
def getCaptchaImage(self, soup): i = soup.find("img", { "class" : "form__captcha" }) url= i.get("src") print soup.find_all("input") self.last_key = soup.find("input",{"name":"key"})["value"] self.last_retpath = soup.find("input",{"name":"retpath"})["value"] fname = "./files/"+next(tempfile._get_candidate_names())+".jpg" download_image(url, fname) return fname
def _copy_outside_keys(self): """Copy key from out of the workspace into one""" paths_map = {} real_inv = os.path.join(self.path, os.readlink(self.inventory)) for line in fileinput.input(real_inv, inplace=True): key_defs = re.findall(r"ansible_ssh_private_key_file=\/\S+", line) for key_def in key_defs: path = key_def.split("=")[-1] paths_map.setdefault(path, path) new_line = line.strip() for mapped_orig, mapped_new in paths_map.iteritems(): if mapped_orig == mapped_new: keyfilename = os.path.basename(mapped_orig) rand_part = next(tempfile._get_candidate_names()) new_fname = "{}-{}".format(keyfilename, rand_part) shutil.copy2(mapped_orig, os.path.join( self.path, new_fname)) paths_map[mapped_orig] = os.path.join( self.path_placeholder, new_fname) new_fname = paths_map[mapped_orig] else: new_fname = mapped_new new_line = re.sub(mapped_orig, new_fname, new_line) print(new_line)
def video(self, tensor=None, videofile=None, win=None, env=None, opts=None): """ This function plays a video. It takes as input the filename of the video or a `LxHxWxC` tensor containing all the frames of the video. The function does not support any plot-specific `options`. """ opts = {} if opts is None else opts opts['fps'] = opts.get('fps', 25) _assert_opts(opts) assert tensor is not None or videofile is not None, \ 'should specify video tensor or file' if tensor is not None: import cv2 import tempfile assert tensor.ndim == 4, 'video should be in 4D tensor' videofile = '/tmp/%s.ogv' % next(tempfile._get_candidate_names()) if cv2.__version__.startswith('2'): # OpenCV 2 fourcc = cv2.cv.CV_FOURCC( chr(ord('T')), chr(ord('H')), chr(ord('E')), chr(ord('O')) ) elif cv2.__version__.startswith('3'): # OpenCV 3 fourcc = cv2.VideoWriter_fourcc( chr(ord('T')), chr(ord('H')), chr(ord('E')), chr(ord('O')) ) writer = cv2.VideoWriter( videofile, fourcc, opts.get('fps'), (tensor.shape[1], tensor.shape[2]) ) assert writer.isOpened(), 'video writer could not be opened' for i in range(tensor.shape[0]): writer.write(tensor[i, :, :, :]) writer.release() writer = None extension = videofile.split(".")[-1].lower() mimetypes = dict(mp4='mp4', ogv='ogg', avi='avi', webm='webm') mimetype = mimetypes.get(extension) assert mimetype is not None, 'unknown video type: %s' % extension bytestr = loadfile(videofile) videodata = """ <video controls> <source type="video/%s" src="data:video/%s;base64,%s"> Your browser does not support the video tag. </video> """ % (mimetype, mimetype, base64.b64encode(bytestr).decode('utf-8')) return self.text(text=videodata, win=win, env=env, opts=opts)