我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用pathlib.Path()。
def find_lib_dir(filename=None): import sys if filename is None: Cython.python_library_file() candidates = [Path(sys.exec_prefix, 'libs/'), Path('/lib'), Path('/usr/lib')] for path in candidates: if Path(path, filename).exists(): return str(path) return None # TODO: Cython project # TODO: Embed support for Cython project # TODO: Somehow package a whole set of modules with a runner inside?
def download(self, local_dir_=None, url_=None): ''' Args: local_dir_: where to save downloaded file url_: where to download dataset, if None, use default 'http://yann.lecun.com/exdb/mnist/' ''' # TODO check whether file exists if url_ is None: url_ = 'http://yann.lecun.com/exdb/mnist/' if local_dir_ is None: local_dir = self.DEFAULT_DIR else: local_dir = Path(local_dir_) local_dir.mkdir(parents=True, exist_ok=True) in_filename = '%(subset)s-%(type_s)s-idx%(ndim)s-ubyte.gz' for subset, (type_s, ndim) in product( ('train', 't10k'), zip(('images', 'labels'), (3,1))): filename = in_filename % locals() urllib.request.urlretrieve( url_ + filename, str(local_dir / filename))
def get_template_language(self, file_): """ Return the template language Every template file must end in with the language code, and the code must match the ISO_6301 lang code https://en.wikipedia.org/wiki/List_of_ISO_639-1_codes valid examples: account_created_pt.html payment_created_en.txt """ stem = Path(file_).stem language_code = stem.split('_')[-1:][0] if len(language_code) != 2: # TODO naive and temp implementation # check if the two chars correspond to one of the # available languages raise Exception('Template file `%s` must end in ISO_639-1 language code.' % file_) return language_code.lower()
def add_local_charm_dir(self, charm_dir, series): """Upload a local charm to the model. This will automatically generate an archive from the charm dir. :param charm_dir: Path to the charm directory :param series: Charm series """ fh = tempfile.NamedTemporaryFile() CharmArchiveGenerator(charm_dir).make_archive(fh.name) with fh: func = partial( self.add_local_charm, fh, series, os.stat(fh.name).st_size) charm_url = await self._connector.loop.run_in_executor(None, func) log.debug('Uploaded local charm: %s -> %s', charm_dir, charm_url) return charm_url
def sync_tools( self, all_=False, destination=None, dry_run=False, public=False, source=None, stream=None, version=None): """Copy Juju tools into this model. :param bool all_: Copy all versions, not just the latest :param str destination: Path to local destination directory :param bool dry_run: Don't do the actual copy :param bool public: Tools are for a public cloud, so generate mirrors information :param str source: Path to local source directory :param str stream: Simplestreams stream for which to sync metadata :param str version: Copy a specific major.minor version """ raise NotImplementedError()
def build_package(builder_image, package_type, version, out_dir, dependencies): """ Build a deb or RPM package using a fpm-within-docker Docker image. :param package_type str: "rpm" or "deb". :param version str: The package version. :param out_dir Path: Directory where package will be output. :param dependencies list: package names the resulting package should depend on. """ run([ "docker", "run", "--rm", "-e", "PACKAGE_VERSION=" + version, "-e", "PACKAGE_TYPE=" + package_type, "-v", "{}:/build-inside:rw".format(THIS_DIRECTORY), "-v", "{}:/source:rw".format(THIS_DIRECTORY.parent), "-v", str(out_dir) + ":/out", "-w", "/build-inside", builder_image, "/build-inside/build-package.sh", *dependencies ], check=True)
def prompt_extractor(self, item): extractor = extractors[item.data(Qt.UserRole)] inputs = [] if not assert_installed(self.view, **extractor.get('depends', {})): return if not extractor.get('pick_url', False): files, mime = QFileDialog.getOpenFileNames() for path in files: inputs.append((path, Path(path).stem)) else: text, good = QInputDialog.getText(self.view, ' ', 'Input an URL:') if text: url = urlparse(text) inputs.append((url.geturl(), url.netloc)) if inputs: wait = QProgressDialog('Extracting .proto structures...', None, 0, 0) wait.setWindowTitle(' ') self.set_view(wait) self.worker = Worker(inputs, extractor) self.worker.progress.connect(self.extraction_progress) self.worker.finished.connect(self.extraction_done) self.worker.start()
def CreateFile( self, directory_path: str, file_name: str, filename_suffix: str): """Creates a empty file. Args: directory_path (str): The path to the directory the file should be created. file_name (str): the name of the new file. filename_suffix (str): the suffix of the new file. Returns: str: the path of the created file """ file_path = self.CreateFilePath(directory_path, file_name, filename_suffix) if not os.path.exists(directory_path): self._CreateFolder(directory_path) pathlib.Path(file_path).touch() return file_path
def testPluginNameIfExisting(self): """test method after getting the plugin Name from the user if the plugin Name already exists""" with tempfile.TemporaryDirectory() as tmpdir: path = os.path.join(tmpdir, 'testfile') pathlib.Path(path).touch() output_handler = output_handler_file.OutputHandlerFile( path, file_handler.FileHandler(), prompt_info='the_plugin', prompt_error='the_plugin', ) plugin_helper = fake_sqlite_plugin_helper.FakeSQLitePluginHelper( plugin_exists=True, change_bool_after_every_call_plugin_exists=True, valid_name=True) controller = sqlite_controller.SQLiteController(output_handler, plugin_helper) actualName = 'the_plugin' controller._path = 'somepath' actual = controller.PluginName(None, None, actualName) expected = 'Plugin exists. Choose new Name' actual_prompt = self._ReadFromFile(path) self.assertEqual(expected, actual_prompt) self.assertEqual(actualName, actual)
def testCreateSQLQueryModelWithUserInputWithError(self): """test method CreateEventModelWithUserInput""" error_message = "Some Error..." fake_execution = fake_sqlite_query_execution.SQLQueryExecution( sql_query_data.SQLQueryData(has_error=True, error_message=error_message) ) sql_query = 'SELECT createdDate FROM Users ORDER BY createdDate' name = 'Contact' with tempfile.TemporaryDirectory() as tmpdir: path = os.path.join(tmpdir, 'testfile') pathlib.Path(path).touch() output_handler = output_handler_file.OutputHandlerFile( path, file_handler.FileHandler(), prompt_info=name) plugin_helper = fake_sqlite_plugin_helper.FakeSQLitePluginHelper( folder_exists=True) controller = sqlite_controller.SQLiteController(output_handler, plugin_helper) actual = controller._CreateSQLQueryModelWithUserInput(sql_query, False, fake_execution) self.assertIsNone(actual)
def testSourcePathIfNotExisting(self): """test method after getting the source path from the user""" with tempfile.TemporaryDirectory() as tmpdir: path = os.path.join(tmpdir, 'testfile') pathlib.Path(path).touch() output_handler = output_handler_file.OutputHandlerFile( path, file_handler.FileHandler(), prompt_error='the source path') plugin_helper = fake_sqlite_plugin_helper.FakeSQLitePluginHelper( folder_exists=False, change_bool_after_every_call_folder_exists=True) controller = sqlite_controller.SQLiteController(output_handler, plugin_helper) actualPath = 'testpath' source_path = controller.SourcePath(None, None, actualPath) expected = 'Folder does not exists. Enter correct one' actual = self._ReadFromFile(path) self.assertEqual(expected, actual) self.assertEqual(source_path, 'the source path')
def testTestPathIfExisting(self): """test method after getting the source path from the user""" with tempfile.TemporaryDirectory() as tmpdir: path = os.path.join(tmpdir, 'testfile') pathlib.Path(path).touch() output_handler = output_handler_file.OutputHandlerFile( path, file_handler.FileHandler()) plugin_helper = fake_sqlite_plugin_helper.FakeSQLitePluginHelper( file_exists=True) controller = sqlite_controller.SQLiteController(output_handler, plugin_helper) actualPath = os.path.join(path_helper.TestDatabasePath(), 'twitter_ios.db') valid_path = controller.TestPath(None, None, actualPath) actual_output = self._ReadFromFile(path) self.assertEqual(actualPath, controller._testfile) self.assertEqual('', actual_output) self.assertEqual(valid_path, actualPath)
def testTestPathIfNotExisting(self): """test method after getting the source path from the user""" with tempfile.TemporaryDirectory() as tmpdir: path = os.path.join(tmpdir, 'testfile') pathlib.Path(path).touch() wrongPath = os.path.join(tmpdir, 'testpath') validPath = os.path.join(tmpdir, 'testpathvalid') output_handler = output_handler_file.OutputHandlerFile( path, file_handler.FileHandler(), prompt_error=validPath) plugin_helper = fake_sqlite_plugin_helper.FakeSQLitePluginHelper( file_exists=False, change_bool_after_every_call_file_exists=True) controller = sqlite_controller.SQLiteController(output_handler, plugin_helper) actual_path = controller.TestPath(None, None, wrongPath) expected = 'File does not exists. Choose another.' actual = self._ReadFromFile(path) self.assertEqual(expected, actual) self.assertEqual(validPath, actual_path) # close connection so the temp file can be deleted before the program # circle is finished controller._query_execution._connection.close()
def testValidateRowNameIfNotOk(self): """test the validate row name method if not ok""" with tempfile.TemporaryDirectory() as tmpdir: path = os.path.join(tmpdir, 'testfile') pathlib.Path(path).touch() output_handler = output_handler_file.OutputHandlerFile( path, file_handler.FileHandler(), prompt_error='TheValidRowName') plugin_helper = fake_sqlite_plugin_helper.FakeSQLitePluginHelper( valid_row_name=False, change_bool_after_every_call_valid_row_name=True) controller = sqlite_controller.SQLiteController(output_handler, plugin_helper) valid = controller._ValidateRowName("theWrongName") expected = ('Row name is not in a valid format. Choose new Name [' 'RowName...]') actual = self._ReadFromFile(path) self.assertEqual(expected, actual) self.assertEqual(valid, 'TheValidRowName')
def testValidateTimestampStringIfNotOk(self): """test the validate timestamp string method if not ok""" with tempfile.TemporaryDirectory() as tmpdir: path = os.path.join(tmpdir, 'testfile') pathlib.Path(path).touch() output_handler = output_handler_file.OutputHandlerFile( path, file_handler.FileHandler(), prompt_error='this,that,bla') plugin_helper = fake_sqlite_plugin_helper.FakeSQLitePluginHelper( valid_comma_separated_string=False, change_bool_after_every_call_valid_comma_separated_string=True) controller = sqlite_controller.SQLiteController(output_handler, plugin_helper) valid = controller._ValidateTimestampString("this, that,bla") expected = ( 'Timestamps are not in valid format. Reenter them correctly [name,' 'name...]') actual = self._ReadFromFile(path) self.assertEqual(expected, actual) self.assertEqual(valid, 'this,that,bla')
def testValidateColumnStringIfNotOk(self): """test the validate column string method if not ok""" with tempfile.TemporaryDirectory() as tmpdir: path = os.path.join(tmpdir, 'testfile') pathlib.Path(path).touch() output_handler = output_handler_file.OutputHandlerFile( path, file_handler.FileHandler(), prompt_error='this,that,bla') plugin_helper = fake_sqlite_plugin_helper.FakeSQLitePluginHelper( valid_comma_separated_string=False, change_bool_after_every_call_valid_comma_separated_string=True) controller = sqlite_controller.SQLiteController(output_handler, plugin_helper) valid = controller._ValidateColumnString("this, that,bla") expected = ( 'Column names are not in valid format. Reenter them correctly [name,' 'name...]') actual = self._ReadFromFile(path) self.assertEqual(expected, actual) self.assertEqual(valid, 'this,that,bla')
def testGenerateIfNotConfirmed(self): """test the generate if confirmed """ template_path = path_helper.TemplatePath() with self.assertRaises(SystemExit): with tempfile.TemporaryDirectory() as tmpdir: file = os.path.join(tmpdir, 'testfile') pathlib.Path(file).touch() output_handler = output_handler_file.OutputHandlerFile( file, file_handler.FileHandler(), confirm=False) plugin_helper = fake_sqlite_plugin_helper.FakeSQLitePluginHelper( valid_name=False, change_bool_after_every_call_valid_name=True) controller = sqlite_controller.SQLiteController(output_handler, plugin_helper) controller.Generate('not used', 'not used') self.assertFalse(template_path)
def writerow(self, row): """ :param row: :return: """ self._bytes_written += self._out_writer.writerow(row) row_txt = self._buffer.getvalue() self._out_csv.write(row_txt) self._reset_buffer() self._out_csv.flush() if self._bytes_written > self.max_bytes: self._out_csv.close() self._make_csv_writer() out_name = str(Path(self._out_csv.name).absolute()) subprocess.Popen(['7z', 'a', '-t7z', '-m0=lzma', '-mx=9', '-mfb=64', '-md=16m', out_name + '.7z', out_name]) return row_txt
def _make_writer(self): """ :return: """ self._buffer = StringIO() self._bytes_written = 0 now = datetime.now() self.fname = self.log_folder + '/' + now.strftime('%Y%m%d_%H%M%S_{}.json'.format(self.make_random(6))) self.fname = str(pathlib.Path(self.fname)) self._out_fh = open(self.fname, 'w') self.write_pid() logging.warning("Writing to {} ({} bytes)".format(self._out_fh.name, self.max_bytes)) # compress any old files still lying around for fname in glob(self.log_folder+"/*.json"): if fname != self.fname: self._compress(fname)
def test_RTagsDaemonStartClean(self): try: os.chdir("clean") except OSError: print("Test Error: Couldn't cd into 'dirty' test directory.") raise self.assertFalse(self.cmake_build_info["build_dir"].is_dir()) self.plugin.setup_rtags_daemon() try: rtags_daemon_status = subprocess.check_output( self.cmake_cmd_info["rtags_status"]) except subprocess.CalledProcessError as e: print(e.output) self.assertTrue( len("*********************************\nfileids\n*********************************\n*********************************\nheadererrors\n*********************************\n*********************************\ninfo\n*********************************\nRunning a release build\nsocketFile: /Users/phillipbonhomme/.rdm\ndataDir: /Users/phillipbonhomme/.cache/rtags/\noptions: 0x14jobCount: 4\nrpVisitFileTimeout: 60000\nrpIndexDataMessageTimeout: 60000\nrpConnectTimeout: 0\nrpConnectTimeout: 0\ndefaultArguments: List<String>(-ferror-limit=50, -Wall, -fspell-checking, -Wno-unknown-warning-option\")\nincludePaths: List<Source::Include>(\")\ndefines: List<Source::Define>(-DRTAGS=\")\nignoredCompilers: Set<Path>(\")\n*********************************\njobs\n*********************************\n" ) <= len(str(rtags_daemon_status)))
def test_RTagsDaemonStartDirty(self): try: os.chdir("dirty") except OSError: print("Test Error: Couldn't cd into 'dirty' test directory.") raise self.assertTrue(self.cmake_build_info["build_dir"].is_dir()) self.plugin.setup_rtags_daemon() try: rtags_daemon_status = subprocess.check_output( self.cmake_cmd_info["rtags_status"]) except subprocess.CalledProcessError as e: print(e.output) self.assertTrue( len("*********************************\nfileids\n*********************************\n*********************************\nheadererrors\n*********************************\n*********************************\ninfo\n*********************************\nRunning a release build\nsocketFile: /Users/phillipbonhomme/.rdm\ndataDir: /Users/phillipbonhomme/.cache/rtags/\noptions: 0x14jobCount: 4\nrpVisitFileTimeout: 60000\nrpIndexDataMessageTimeout: 60000\nrpConnectTimeout: 0\nrpConnectTimeout: 0\ndefaultArguments: List<String>(-ferror-limit=50, -Wall, -fspell-checking, -Wno-unknown-warning-option\")\nincludePaths: List<Source::Include>(\")\ndefines: List<Source::Define>(-DRTAGS=\")\nignoredCompilers: Set<Path>(\")\n*********************************\njobs\n*********************************\n" ) <= len(str(rtags_daemon_status)))
def __init__(self, cfg_yaml=None, secret_cfg_yaml=None, create_vcs_client=True, load_cfg=True, load_secret_cfg=True, default_data_directory=None, create_default_data_directory=True): self.data = OrderedDict() self.cfg_yaml = cfg_yaml self.secret_cfg_yaml = secret_cfg_yaml if load_cfg and cfg_yaml and Path(cfg_yaml).exists(): self.cfg = self.load_cfg(cfg_yaml) else: self.cfg = {} if (load_secret_cfg and secret_cfg_yaml and Path(secret_cfg_yaml).exists()): self.secret_cfg = self.load_cfg(secret_cfg_yaml) else: self.secret_cfg = {} self._ensure_cfg_structure() if create_vcs_client: self._create_vcs_client() if default_data_directory: self.set_default_data_directory( default_data_directory, create=create_default_data_directory)
def get_node_id(self, ent=None, ent_attrs: dict=None): if ent and ent.kindname() == 'file': node_id = str(Path(ent.longname()).relative_to( self.root_path)) elif ent: node_id = ent.uniquename() elif ent_attrs and ent_attrs['kindname'] == 'file': try: node_id = str(Path(ent_attrs['longname']).relative_to( self.root_path)) except ValueError: node_id = ent_attrs['longname'] elif ent_attrs: node_id = ent_attrs['uniquename'] else: node_id = None return node_id
def load_vulnerability_database(): # Currently manually downloaded from # https://security-tracker.debian.org/tracker/data/json # Should instead download if not found in option localtion # or redownload if found but out of date # progress bar for download url = "https://security-tracker.debian.org/tracker/data/json" db = Path('debian.json') r = requests.get(url, stream=True) if not db.exists(): with open(db.name, 'wb') as data_file: total_length = 1024*20722 for chunk in progress.bar(r.iter_content(chunk_size=1024), label="Downloading Debian data", expected_size=(total_length/1024) + 1): if chunk: data_file.write(chunk) data_file.flush() with open(db.name, 'r') as data_file: return json.load(data_file)
def timeseriesdata_constructor_new_file(temp_dir): """Tests the TimeSeriesData class constructor when the file does not exist. Tests that a new file is created, that all expected data sets are present, and that the flag that indicates that the file is empty is set to False. """ tsd = TimeSeriesData(temp_dir + "/new_ananke.h5") tsd_file = Path(temp_dir + "/new_ananke.h5") #Check that the file was really created assert tsd_file.is_file() #Check that the data sets have been created assert set(tsd.h5_table.keys()) == {"genes", "timeseries", "samples"} assert set(tsd.h5_table["timeseries"].keys()) == {"data", "indices", "indptr"} assert set(tsd.h5_table["genes"].keys()) == {"sequences", "sequenceids", "clusters", "taxonomy", "sequenceclusters"} assert set(tsd.h5_table["samples"].keys()) == {"names", "time", "metadata", "mask"} #Check that the empty file flag is set assert tsd.filled_data == False
def get_config(): """ This load some configuration from the ``.travis.yml``, if file is present, ``doctr`` key if present. """ p = Path('.travis.yml') if not p.exists(): return {} with p.open() as f: travis_config = yaml.safe_load(f.read()) config = travis_config.get('doctr', {}) if not isinstance(config, dict): raise ValueError('config is not a dict: {}'.format(config)) return config
def clarin_corpora_sorted_by_size(base_directory: Path) -> List[GermanClarinCorpus]: return [ sc1(base_directory), pd2(base_directory), ziptel(base_directory), sc10(base_directory), GermanClarinCorpus("all.HEMPEL.4.cmdi.11610.1490680796", base_directory), GermanClarinCorpus("all.PD1.3.cmdi.16312.1490681066", base_directory), GermanClarinCorpus("all.VM1.3.cmdi.1508.1490625070", base_directory, id_filter_regex=vm1_id_german_filter_regex, training_test_split=TrainingTestSplit.training_only), GermanClarinCorpus("all.RVG-J.1.cmdi.18181.1490681704", base_directory), GermanClarinCorpus("all.ALC.4.cmdi.16602.1490632862", base_directory, training_test_split=TrainingTestSplit.randomly_grouped_by(lambda e: e.id[:3])), GermanClarinCorpus("all.VM2.3.cmdi.4260.1490625316", base_directory, id_filter_regex=vm2_id_german_filter_regex, training_test_split=TrainingTestSplit.training_only) ]
def __init__(self, base_directory: Path): super().__init__( corpus_name="german-speechdata-package-v2", base_directory=base_directory, base_source_url_or_directory="http://www.repository.voxforge1.org/downloads/de/", tar_gz_extension=".tar.gz", subdirectory_depth=1, umlaut_decoder=UmlautDecoder.none, training_test_split=TrainingTestSplit.by_directory(), tags_to_ignore=[], # exclude those 7 audio files because the first 2 are corrupt, the last 5 are empty: id_filter_regex=re.compile("(?!^2014-03-24-13-39-24_Kinect-RAW)" "(?!^2014-03-27-11-50-33_Kinect-RAW)" "(?!^2014-03-18-15-34-19_Realtek)" "(?!^2014-06-17-13-46-27_Kinect-RAW)" "(?!^2014-06-17-13-46-27_Realtek)" "(?!^2014-06-17-13-46-27_Samson)" "(?!^2014-06-17-13-46-27_Yamaha)" "(^.*$)"))
def _extract_positional_label_by_id(self, files: Iterable[Path]) -> Dict[str, Union[PositionalLabel, str]]: xml_ending = ".xml" microphone_endings = [ "_Yamaha", "_Kinect-Beam", "_Kinect-RAW", "_Realtek", "_Samson", "_Microsoft-Kinect-Raw" ] xml_files = [file for file in files if file.name.endswith(xml_ending) if self.id_filter_regex.match(name_without_extension(file))] return OrderedDict( (name_without_extension(file) + microphone_ending, self._extract_label_from_xml(file)) for file in xml_files for microphone_ending in microphone_endings if (Path(file.parent) / (name_without_extension(file) + microphone_ending + ".wav")).exists())
def train(self, labeled_spectrogram_batches: Iterable[List[LabeledSpectrogram]], preview_labeled_spectrogram_batch: List[LabeledSpectrogram], tensor_board_log_directory: Path, net_directory: Path, batches_per_epoch: int): print_preview_batch = lambda: log(self.test_and_predict_batch(preview_labeled_spectrogram_batch)) print_preview_batch() self.loss_net.fit_generator(self._loss_inputs_generator(labeled_spectrogram_batches), epochs=100000000, steps_per_epoch=batches_per_epoch, callbacks=self.create_callbacks( callback=print_preview_batch, tensor_board_log_directory=tensor_board_log_directory, net_directory=net_directory), initial_epoch=self.load_epoch if (self.load_epoch is not None) else 0)
def __init__(self, audio_file: Path, id: Optional[str] = None, sample_rate_to_convert_to: int = 16000, label: Optional[str] = "nolabel", fourier_window_length: int = 512, hop_length: int = 128, mel_frequency_count: int = 128, label_with_tags: str = None, positional_label: Optional[PositionalLabel] = None): # The default values for hop_length and fourier_window_length are powers of 2 near the values specified in the wave2letter paper. if id is None: id = name_without_extension(audio_file) self.audio_file = audio_file super().__init__( id=id, get_raw_audio=lambda: librosa.load(str(self.audio_file), sr=self.sample_rate)[0], label=label, sample_rate=sample_rate_to_convert_to, fourier_window_length=fourier_window_length, hop_length=hop_length, mel_frequency_count=mel_frequency_count, label_with_tags=label_with_tags, positional_label=positional_label)
def load(corpus_csv_file: Path, sampled_training_example_count: Optional[int] = None) -> 'Corpus': import csv with corpus_csv_file.open(encoding='utf8') as opened_csv: reader = csv.reader(opened_csv, delimiter=',', quotechar='"', quoting=csv.QUOTE_MINIMAL) def to_absolute(audio_file_path: Path) -> Path: return audio_file_path if audio_file_path.is_absolute() else Path( corpus_csv_file.parent) / audio_file_path examples = [ ( LabeledExampleFromFile( audio_file=to_absolute(Path(audio_file_path)), id=id, label=label, positional_label=None if positional_label == "" else PositionalLabel.deserialize( positional_label)), Phase[phase]) for id, audio_file_path, label, phase, positional_label in reader] return Corpus(training_examples=[e for e, phase in examples if phase == Phase.training], test_examples=[e for e, phase in examples if phase == Phase.test], sampled_training_example_count=sampled_training_example_count)
def init_hotword_switch(self): try: parent_dir = os.path.dirname(TOP_DIR) snowboyDetectFile = Path(os.path.join( parent_dir, "hotword_engine/snowboy/_snowboydetect.so")) print(snowboyDetectFile) if not snowboyDetectFile.exists(): self.snowboy_switch.set_sensitive(False) config['hotword_engine'] = 'PocketSphinx' except Exception as e: logging.error(e) config['hotword_engine'] = 'PocketSphinx' if config['hotword_engine'] == 'Snowboy': self.snowboy_switch.set_active(True) else: self.snowboy_switch.set_active(False)
def request_hotword_choice(): """ Method to request user for default Hotword Engine and configure it in settings. """ try: print("Checking for Snowboy Availability...") snowboyDetectFile = Path("main/hotword_engine/snowboy/_snowboydetect.so") if snowboyDetectFile.exists(): print("Snowboy is available on this platform") choice = input("Do you wish to use Snowboy as default Hotword Detection Engine (Recommended). (y/n) ") if choice == 'y': config['hotword_engine'] = 'Snowboy' print('\nSnowboy set as default Hotword Detection Engine\n') else: config['hotword_engine'] = 'PocketSphinx' print('\nPocketSphinx set as default Hotword Detection Engine\n') except Exception: print("Some Error Occurred. Using PocketSphinx as default engine for Hotword. Run this script again to change") config['hotword_engine'] = 'PocketSphinx'
def create_potree_page(work_dir, server_url, tablename, column): '''Create an html demo page with potree viewer ''' # get potree build potree = os.path.join(work_dir, 'potree') potreezip = os.path.join(work_dir, 'potree.zip') if not os.path.exists(potree): download('Getting potree code', 'http://3d.oslandia.com/potree.zip', potreezip) # unzipping content with ZipFile(potreezip) as myzip: myzip.extractall(path=work_dir) tablewschema = tablename.split('.')[-1] sample_page = os.path.join(work_dir, 'potree-{}.html'.format(tablewschema)) abs_sample_page = str(Path(sample_page).absolute()) pending('Creating a potree demo page : file://{}'.format(abs_sample_page)) resource = '{}.{}'.format(tablename, column) server_url = server_url.replace('http://', '') with io.open(sample_page, 'wb') as html: html.write(potree_page.format(resource=resource, server_url=server_url).encode()) ok()
def create_cesium_page(work_dir, tablename, column): '''Create an html demo page with cesium viewer ''' cesium = os.path.join(work_dir, 'cesium') cesiumzip = os.path.join(work_dir, 'cesium.zip') if not os.path.exists(cesium): download('Getting cesium code', 'http://3d.oslandia.com/cesium.zip', cesiumzip) # unzipping content with ZipFile(cesiumzip) as myzip: myzip.extractall(path=work_dir) tablewschema = tablename.split('.')[-1] sample_page = os.path.join(work_dir, 'cesium-{}.html'.format(tablewschema)) abs_sample_page = str(Path(sample_page).absolute()) pending('Creating a cesium demo page : file://{}'.format(abs_sample_page)) resource = '{}.{}'.format(tablename, column) with io.open(sample_page, 'wb') as html: html.write(cesium_page.format(resource=resource).encode()) ok()
def __init__(self, config): self.config = config self.ui = None self.data = {} self.order = [] self.player = None self.start_segment = None self.start_line = 0 self.end_segment = None self.song_data = {} self.last_note = None self.playing_row = None self.project_dir = Path(config["instance"]["project_dir"]) self.changed = False self.last_bpm_tap = "" self.last_nudge = ""
def scan(self): self.import_file = self.config["instance"]["import-file"] self.bits = self.import_lister.get(self.import_file) self.data_file = Path(self.bits["metadata"]).with_suffix(".data") self.metadata = ConfigParser(inline_comment_prefixes=None) self.metadata.read(str(self.bits["metadata"])) if self.data_file.exists(): with self.data_file.open(newline="") as csvfile: data_reader = csv.DictReader(csvfile, dialect=ImportCsvDialect) for row in data_reader: location = float(row["location"]) row["location"] = location self.data[location] = row if len(self.data) == 0: self.add_row(0.0, mark="START") self.add_row(self.bits["length_secs"], mark="END") self.update_order() self.clean()
def backup(self): dsecs = self.data_file.stat().st_mtime meta_file = Path(self.bits["metadata"]) msecs = meta_file.stat().st_mtime secs = max(dsecs, msecs) suffix = filename_iso_time(secs) backup_data = self.data_file.with_name("{}-{}{}".format(self.data_file.stem, suffix, self.data_file.suffix)) backup_meta = meta_file.with_name("{}-{}{}".format(meta_file.stem, suffix, meta_file.suffix)) with backup_data.open("w", newline='') as csvfile: fieldnames = ['location', 'lyric', 'mark', 'track-change', "chord-change", "chord-selection", "note"] writer = csv.DictWriter(csvfile, fieldnames=fieldnames, extrasaction='ignore', dialect=ImportCsvDialect) writer.writeheader() for location in self.order: writer.writerow(self.data[location]) with backup_meta.open('w') as meta: self.metadata.write(meta)
def load_home_config(args=None): """ Load the user-specific configuration. It can not contain the `project` section. It is based upon the application's internal defaults. (Note: It will modified with project specific settings before it is used.) """ global config global home_dir if home_dir is None: if args is not None and args.user_config is not None: home_dir = Path(args.user_config) else: home_dir = Path("~/.bittyband").expanduser() home_dir.mkdir(parents=True, exist_ok=True) home_config = home_dir / "settings.conf" if home_config.exists(): config.read(str(home_config)) if "project" in config: config["project"].clear() if "instance" in config: config["instance"].clear() return config
def get_project_root(): """ Return the common base-directory for projects. This allows you to specify a `--project` on the command-line and to reference a project directory off of this root without adding a mapping in the `projects` section of the user configuration file. The value for this comes from a `root` symlink in the `--user-config` directory. If this is a file, instead of a directory, it needs to be the name of the directory. (It does '~' expansion.) The expectation is that this will allow a common `~/BittyProjects` or `~/Documents/BittyTunes` directory to root your projects. """ global home_dir root = home_dir / "root" if root.is_file(): t = root.read_text().strip() root = Path(t).expanduser() if root.is_dir(): return root.resolve() return None
def find_project_dir(project): if project is None: project = "." root = get_project_root() if root is None: root = Path() project_dir = Path(project) if "." == project: project_dir = Path(".") elif "/" not in project and "\\" not in project: if "projects" in config: if project in config["projects"]: project_dir = Path(config["projects"].get(project)) project_dir = root.joinpath(project_dir.expanduser()) else: project_dir = project_dir.expanduser() if not project_dir.exists(): project_dir = root.joinpath(project_dir) if project_dir.is_dir(): project_dir = project_dir.resolve() return project_dir
def __init__(self, cwd=os.curdir): self._data = [] self.working_directory = Path(cwd)
def _process_str_paths(self, paths): # TODO: Dies if a file doesn't exist return [Path(file).resolve() for file in paths]
def add(self, *args): added = 0 for pattern in args: try: files = self._glob(pattern) files = self._process_str_paths(files) except NotImplementedError: # self._glob says "Non-relative patterns are unsupported" # Check if it exists and add it files = Path(pattern) if files.exists(): files = [files] else: files = None if not files: # TODO: Proper handling of this? print('Include pattern did not match:', pattern) else: for file in files: if file not in self._data: self._data.append(file) added += 1 return added
def _get_gitinfo(): import pygrunt.platform as platform import subprocess from pathlib import Path git = platform.current.find_executable('git') if git is None: # No git installed; assume we're on master return ('master', '') cwd = str(Path(__file__).parent) args = [git, 'rev-parse', '--abbrev-ref', 'HEAD'] result = subprocess.run(args, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, cwd=cwd, universal_newlines=True) if result.returncode != 0: # Quietly return defaults on fail return ('master', '') branch = result.stdout args = [git, 'rev-parse', 'HEAD'] result = subprocess.run(args, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, cwd=cwd, universal_newlines=True) if result.returncode != 0: # Quietly return defaults on fail return ('master', '') commit = result.stdout return (branch.strip(), commit.strip())
def sanitize(self): import os.path if not self.working_dir: self.working_dir = os.path.curdir if not self.output_dir: self.output_dir = os.path.join(self.working_dir, 'build', '') allowed_types = ['executable', 'library', 'shared'] if self.type not in allowed_types: # TODO: exceptions? Style.error('Invalid output type:', self.type) Style.error('Allowed types:', allowed_types) self.type = allowed_types[0] Style.error('Reverting to', self.type) if not self.executable: self.executable = os.path.join(self.output_dir, self.name) if self.type == 'executable': self.executable = platform.current.as_executable(self.executable) elif self.type == 'library': self.executable = platform.current.as_static_library(self.executable) elif self.type == 'shared': self.executable = platform.current.as_shared_library(self.executable) self.working_dir = os.path.realpath(self.working_dir) self.output_dir = os.path.realpath(self.output_dir) self.sources.working_directory = Path(self.working_dir) self._init_hooks() # Stage hooks # Create an empty list for each stage
def compile_object(self, in_file, out_file): import subprocess in_file = Path(in_file) out_file = Path(out_file) # Skip compile if RecompileStrategy says so # Since preprocess_source ( possibly used by recompile ) also modifies self._args, # we gotta back it up # TODO: Maybe use something more elegant than self._args? old_args = self._args if out_file.is_file(): if not self.recompile.should_recompile(str(in_file)): # Style.info('Nothing to do with', in_file) return True self._args = old_args Path(out_file).parent.mkdir(parents=True, exist_ok=True) self._args.extend(self._build_compiler_flags()) result = subprocess.run(self._args, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True) # TODO: do something useful with output if result.stdout: print(result.stdout) if result.stderr: print(result.stderr) return result.returncode == 0
def link_executable(self, in_files, out_file): import subprocess Path(out_file).parent.mkdir(parents=True, exist_ok=True) self._args.extend(self._build_linker_flags()) result = subprocess.run(self._args) return result.returncode == 0