我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用os.getenv()。
def session_id(self): """A unique session ID every time the user uses the workflow. .. versionadded:: 1.25 The session ID persists while the user is using this workflow. It expires when the user runs a different workflow or closes Alfred. """ if not self._session_id: sid = os.getenv('_WF_SESSION_ID') if not sid: from uuid import uuid4 sid = uuid4().hex self.setvar('_WF_SESSION_ID', sid) self._session_id = sid return self._session_id
def site_config_dirs(appname): """Return a list of potential user-shared config dirs for this application. "appname" is the name of application. Typical user config directories are: macOS: /Library/Application Support/<AppName>/ Unix: /etc or $XDG_CONFIG_DIRS[i]/<AppName>/ for each value in $XDG_CONFIG_DIRS Win XP: C:\Documents and Settings\All Users\Application ... ...Data\<AppName>\ Vista: (Fail! "C:\ProgramData" is a hidden *system* directory on Vista.) Win 7: Hidden, but writeable on Win 7: C:\ProgramData\<AppName>\ """ if WINDOWS: path = os.path.normpath(_get_win_folder("CSIDL_COMMON_APPDATA")) pathlist = [os.path.join(path, appname)] elif sys.platform == 'darwin': pathlist = [os.path.join('/Library/Application Support', appname)] else: # try looking in $XDG_CONFIG_DIRS xdg_config_dirs = os.getenv('XDG_CONFIG_DIRS', '/etc/xdg') if xdg_config_dirs: pathlist = [ os.path.join(expanduser(x), appname) for x in xdg_config_dirs.split(os.pathsep) ] else: pathlist = [] # always look in /etc directly as well pathlist.append('/etc') return pathlist # -- Windows support functions --
def _candidate_tempdir_list(): """Generate a list of candidate temporary directories which _get_default_tempdir will try.""" dirlist = [] # First, try the environment. for envname in 'TMPDIR', 'TEMP', 'TMP': dirname = _os.getenv(envname) if dirname: dirlist.append(dirname) # Failing that, try OS-specific locations. if _os.name == 'nt': dirlist.extend([ r'c:\temp', r'c:\tmp', r'\temp', r'\tmp' ]) else: dirlist.extend([ '/tmp', '/var/tmp', '/usr/tmp' ]) # As a last resort, the current directory. try: dirlist.append(_os.getcwd()) except (AttributeError, OSError): dirlist.append(_os.curdir) return dirlist
def __load_layout(self, config): var = config.get_value('engine/replace-with-kanji-python', 'layout') if var is None or var.get_type_string() != 's': path = os.path.join(os.getenv('IBUS_REPLACE_WITH_KANJI_LOCATION'), 'layouts') path = os.path.join(path, 'roomazi.json') if var: config.unset('engine/replace-with-kanji-python', 'layout') else: path = var.get_string() logger.info("layout: %s", path) layout = roomazi.layout # Use 'roomazi' as default try: with open(path) as f: layout = json.load(f) except ValueError as error: logger.error("JSON error: %s", error) except OSError as error: logger.error("Error: %s", error) except: logger.error("Unexpected error: %s %s", sys.exc_info()[0], sys.exc_info()[1]) self.__to_kana = self.__handle_roomazi_layout if 'Type' in layout: if layout['Type'] == 'Kana': self.__to_kana = self.__handle_kana_layout return layout
def __init__(self): if (os.getenv("LD_LIBRARY_PATH")): self.ld_lib_path = os.getenv("LD_LIBRARY_PATH") else: self.ld_lib_path = '' if (os.getenv("PYTHONPATH")): self.pythonpath = os.getenv("PYTHONPATH") else: self.pythonpath = '' if (os.getenv("CLASSPATH")): self.classpath = os.getenv("CLASSPATH") else: self.classpath = '' if (os.getenv("OCTAVE_PATH")): self.octave_path = os.getenv("OCTAVE_PATH") else: self.octave_path = ''
def __enter__(self): if (os.getenv("LD_LIBRARY_PATH")): self.ld_lib_path = os.getenv("LD_LIBRARY_PATH") else: self.ld_lib_path = '' if (os.getenv("PYTHONPATH")): self.pythonpath = os.getenv("PYTHONPATH") else: self.pythonpath = '' if (os.getenv("CLASSPATH")): self.classpath = os.getenv("CLASSPATH") else: self.classpath = '' if (os.getenv("OCTAVE_PATH")): self.octave_path = os.getenv("OCTAVE_PATH") else: self.octave_path = ''
def _prependToEnvVar(self, newVal, envVar): path = self._getEnvVarAsList(envVar) foundValue = False for entry in path: # Search to determine if the new value is already in the path try: if os.path.samefile(entry, newVal): # The value is already in the path foundValue = True break except OSError: # If we can't find concrete files to compare, fall back to string compare if entry == newVal: # The value is already in the path foundValue = True break if not foundValue: # The value does not already exist if os.environ.has_key(envVar): newpath = newVal+os.path.pathsep + os.getenv(envVar)+os.path.pathsep else: newpath = newVal+os.path.pathsep os.putenv(envVar, newpath) os.environ[envVar] = newpath
def create_kubeconfig_var_message(path): msg = """Set your KUBECONFIG environment variable to use kubectl""" shell = os.getenv("SHELL", "").lower() if "/bash" in shell or "/zsh" in shell: msg += """ export KUBECONFIG={0} """ elif "/fish" in shell: msg += """ set -g -x KUBECONFIG {0} """ else: msg += ". Unable to detect shell therefore assuming a Bash-compatible shell" msg += """ export KUBECONFIG={0} """ return msg.format(path).lstrip()
def resolve_nested_variables(values): def _replacement(name): """ get appropiate value for a variable name. first search in environ, if not found, then look into the dotenv variables """ ret = os.getenv(name, values.get(name, "")) return ret def _re_sub_callback(match_object): """ From a match object gets the variable name and returns the correct replacement """ return _replacement(match_object.group()[2:-1]) for k, v in values.items(): values[k] = __posix_variable.sub(_re_sub_callback, v) return values
def goglib_get_games_list(): proc = subprocess.Popen(['lgogdownloader', '--exclude', \ '1,2,4,8,16,32', '--list-details'],stdout=subprocess.PIPE) games_detailed_list = proc.stdout.readlines() stdoutdata, stderrdata = proc.communicate() if proc.returncode == 0: file_path = os.getenv('HOME') + '/.games_nebula/config/games_list' games_list_file = open(file_path, 'w') for line in games_detailed_list: if 'Getting game info' not in line: games_list_file.write(line) return 0 else: return 1
def __init__(self): """Setup the Runner for all Foremast modules.""" debug_flag() self.email = os.getenv("EMAIL") self.env = os.getenv("ENV") self.group = os.getenv("PROJECT") self.region = os.getenv("REGION") self.repo = os.getenv("GIT_REPO") self.runway_dir = os.getenv("RUNWAY_DIR") self.artifact_path = os.getenv("ARTIFACT_PATH") self.artifact_version = os.getenv("ARTIFACT_VERSION") self.promote_stage = os.getenv("PROMOTE_STAGE", "latest") self.git_project = "{}/{}".format(self.group, self.repo) parsed = gogoutils.Parser(self.git_project) generated = gogoutils.Generator(*parsed.parse_url(), formats=consts.APP_FORMATS) self.app = generated.app_name() self.trigger_job = generated.jenkins()['name'] self.git_short = generated.gitlab()['main'] self.raw_path = "./raw.properties" self.json_path = self.raw_path + ".json" self.configs = None
def __init__(self, mode): self.mode = mode self.mechanicRootDir = getenv("MECHANIC_ROOT_DIR", "") if mode != "USER": self.configFile = "${MECHANIC_ROOT_DIR}/etc/mechanic.conf" self.logFile = "" self.migrationDirs = ["${MECHANIC_ROOT_DIR}/etc/mechanic/migration.d", "${MECHANIC_ROOT_DIR}/var/lib/mechanic/migration.d"] self.preMigrationDirs = ["${MECHANIC_ROOT_DIR}/etc/mechanic/pre-migration.d", "${MECHANIC_ROOT_DIR}/var/lib/mechanic/pre-migration.d"] self.postMigrationDirs = ["${MECHANIC_ROOT_DIR}/etc/mechanic/post-migration.d", "${MECHANIC_ROOT_DIR}/var/lib/mechanic/post-migration.d"] self.stateDir = "${MECHANIC_ROOT_DIR}/var/lib/mechanic/state" self.runDir = "${MECHANIC_ROOT_DIR}/var/lib/mechanic/tmp" else: self.configFile = "${HOME}/.mechanic/mechanic.conf" self.logFile = "stderr" self.migrationDirs = ["${HOME}/.mechanic/migration.d"] self.preMigrationDirs = ["${HOME}/.mechanic/pre-migration.d"] self.postMigrationDirs = ["${HOME}/.mechanic/post-migration.d"] self.stateDir = "${HOME}/.mechanic/state" self.runDir = "${HOME}/.mechanic/tmp"
def _get_well_known_file(): """Get the well known file produced by command 'gcloud auth login'.""" # TODO(orestica): Revisit this method once gcloud provides a better way # of pinpointing the exact location of the file. default_config_dir = os.getenv(_CLOUDSDK_CONFIG_ENV_VAR) if default_config_dir is None: if os.name == 'nt': try: default_config_dir = os.path.join(os.environ['APPDATA'], _CLOUDSDK_CONFIG_DIRECTORY) except KeyError: # This should never happen unless someone is really # messing with things. drive = os.environ.get('SystemDrive', 'C:') default_config_dir = os.path.join(drive, '\\', _CLOUDSDK_CONFIG_DIRECTORY) else: default_config_dir = os.path.join(os.path.expanduser('~'), '.config', _CLOUDSDK_CONFIG_DIRECTORY) return os.path.join(default_config_dir, _WELL_KNOWN_CREDENTIALS_FILE)
def _SendRecv(): """Communicate with the Developer Shell server socket.""" port = int(os.getenv(DEVSHELL_ENV, 0)) if port == 0: raise NoDevshellServer() sock = socket.socket() sock.connect(('localhost', port)) data = CREDENTIAL_INFO_REQUEST_JSON msg = '%s\n%s' % (len(data), data) sock.sendall(_to_bytes(msg, encoding='utf-8')) header = sock.recv(6).decode() if '\n' not in header: raise CommunicationError('saw no newline in the first 6 bytes') len_str, json_str = header.split('\n', 1) to_read = int(len_str) - len(json_str) if to_read > 0: json_str += sock.recv(to_read, socket.MSG_WAITALL).decode() return CredentialInfoResponse(json_str)
def find_EXECUTABLES(Makefile, flags): ''' See the doc-string for find_prefix as well. Set Makefile['EXECUTABLES'] if needed to. Depends (directly) on $(gamesdir) and $(bindir). Depends (indirectly) on $(prefix). ''' if 'EXECUTABLES' not in Makefile: acceptable = os.getenv('PATH').split(':') for exec_dir in ('gamesdir', 'bindir'): if expand(exec_dir, Makefile) in acceptable: Makefile['EXECUTABLES'] = '$('+exec_dir+')' return False else: return True else: return False
def bool_env(var_name, default=False): """ Get an environment variable coerced to a boolean value. Example: Bash: $ export SOME_VAL=True settings.py: SOME_VAL = bool_env('SOME_VAL', False) Arguments: var_name: The name of the environment variable. default: The default to use if `var_name` is not specified in the environment. Returns: `var_name` or `default` coerced to a boolean using the following rules: "False", "false" or "" => False Any other non-empty string => True """ test_val = getenv(var_name, default) # Explicitly check for 'False', 'false', and '0' since all non-empty # string are normally coerced to True. if test_val in ('False', 'false', '0'): return False return bool(test_val)
def test_in_flight_is_one(self): """ Verify that in_flight value stays equal to one while doing multiple inserts. The number of inserts can be set through INSERTS_ITERATIONS environmental variable. Default value is 1000000. """ prepared = self.session.prepare("INSERT INTO race (x) VALUES (?)") iterations = int(os.getenv("INSERT_ITERATIONS", 1000000)) i = 0 leaking_connections = False while i < iterations and not leaking_connections: bound = prepared.bind((i,)) self.session.execute(bound) for pool in self.session._pools.values(): if leaking_connections: break for conn in pool.get_connections(): if conn.in_flight > 1: print(self.session.get_pool_state()) leaking_connections = True break i = i + 1 self.assertFalse(leaking_connections, 'Detected leaking connection after %s iterations' % i)
def teardown_module(module): global c global OPENNTI_C global OPENNTI_IN_JTI_C global OPENNTI_IN_LOG_C # Delete all files in /tests/output/ if not os.getenv('TRAVIS'): c.stop(container=OPENNTI_C) c.remove_container(container=OPENNTI_C) c.stop(container=OPENNTI_IN_JTI_C) c.remove_container(container=OPENNTI_IN_JTI_C) c.stop(container=OPENNTI_IN_LOG_C) c.remove_container(container=OPENNTI_IN_LOG_C) c.stop(container=TCP_REPLAY_C) c.remove_container(container=TCP_REPLAY_C)
def setup_logging(self, default_path=PATH_LOGGING, default_level=logging.INFO, env_key='LOG_CFG'): path = default_path self.logconf = None value = os.getenv(env_key, None) if value: path = value if os.path.exists(path): with open(path, 'rt') as f: config = json.load(f) self.logconf = logging.config.dictConfig(config) elif os.path.exists(path.replace("../", "")): with open(path.replace("../", ""), 'rt') as f: config = json.load(f) self._changePath(config["handlers"]) self.logconf = logging.config.dictConfig(config) else: print("Configurazione log non trovata (\"%s\"): applico le impostazioni predefinite" % path) self.logconf = logging.basicConfig(level=default_level)
def add_nvidia_docker_to_config(container_config): if not container_config.get('HostConfig', None): container_config['HostConfig'] = {} nvidia_config = get_nvidia_configuration() # Setup the Volumes container_config['HostConfig'].setdefault('VolumeDriver', nvidia_config['VolumeDriver']) container_config['HostConfig'].setdefault('Binds', []) container_config['HostConfig']['Binds'].extend(nvidia_config['Volumes']) # Get nvidia control devices devices = container_config['HostConfig'].get('Devices', []) # suport both '0 1' and '0, 1' formats, just like nvidia-docker gpu_isolation = os.getenv('NV_GPU', '').replace(',', ' ').split() pattern = re.compile(r'/nvidia([0-9]+)$') for device in nvidia_config['Devices']: if gpu_isolation: card_number = pattern.search(device) if card_number and card_number.group(1) not in gpu_isolation: continue devices.extend(parse_devices([device])) container_config['HostConfig']['Devices'] = devices
def keep_reading(self): """Output thread method for the process Sends the process output to the ViewController (through OutputTranscoder) """ while True: if self.stop: break ret = self.process.poll() if ret is not None: self.stop = True readable, writable, executable = select.select([self.master], [], [], 5) if readable: """ We read the new content """ data = os.read(self.master, 1024) text = data.decode('UTF-8', errors='replace') log_debug("RAW", repr(text)) log_debug("PID", os.getenv('BASHPID')) self.output_transcoder.decode(text) # log_debug("{} >> {}".format(int(time.time()), repr(text)))
def copy_nrpe_checks(): """ Copy the nrpe checks into place """ NAGIOS_PLUGINS = '/usr/local/lib/nagios/plugins' nrpe_files_dir = os.path.join(os.getenv('CHARM_DIR'), 'hooks', 'charmhelpers', 'contrib', 'openstack', 'files') if not os.path.exists(NAGIOS_PLUGINS): os.makedirs(NAGIOS_PLUGINS) for fname in glob.glob(os.path.join(nrpe_files_dir, "check_*")): if os.path.isfile(fname): shutil.copy2(fname, os.path.join(NAGIOS_PLUGINS, os.path.basename(fname)))
def indices(list_to_split, number_of_parallel_jobs, task_id=None): """This function returns the first and last index for the files for the current job ID. If no job id is set (e.g., because a sub-job is executed locally), it simply returns all indices.""" if number_of_parallel_jobs is None or number_of_parallel_jobs == 1: return None # test if the 'SEG_TASK_ID' environment is set sge_task_id = os.getenv('SGE_TASK_ID') if task_id is None else task_id if sge_task_id is None: # task id is not set, so this function is not called from a grid job # hence, we process the whole list return (0,len(list_to_split)) else: job_id = int(sge_task_id) - 1 # compute number of files to be executed number_of_objects_per_job = int(math.ceil(float(len(list_to_split) / float(number_of_parallel_jobs)))) start = job_id * number_of_objects_per_job end = min((job_id + 1) * number_of_objects_per_job, len(list_to_split)) return (start, end)
def setup_environment(): root = os.getenv('LAMBDA_TASK_ROOT') bin_dir = os.path.join(root, 'bin') os.environ['PATH'] += ':' + bin_dir os.environ['GIT_EXEC_PATH'] = bin_dir ssh_dir = tempfile.mkdtemp() ssh_identity = os.path.join(ssh_dir, 'identity') with os.fdopen(os.open(ssh_identity, os.O_WRONLY | os.O_CREAT, 0o600), 'w') as f: f.write(base64.b64decode(os.getenv('SSH_IDENTITY'))) ssh_config = os.path.join(ssh_dir, 'config') with open(ssh_config, 'w') as f: f.write('CheckHostIP no\n' 'StrictHostKeyChecking yes\n' 'IdentityFile %s\n' 'UserKnownHostsFile %s\n' % (ssh_identity, os.path.join(root, 'known_hosts'))) os.environ['GIT_SSH_COMMAND'] = 'ssh -F %s' % ssh_config
def run(): get_log_states() db_name = os.getenv('DB_NAME') or \ raiser(ValueError('DB_NAME is required')) bucket = os.getenv('S3_BUCKET') or \ raiser(ValueError('S3_BUCKET is required')) region = os.getenv('REGION', 'us-west-2') key = os.getenv('S3_KEY', 'pgbadger/') try: files = download_log_files(db_name) sync_s3(bucket, key) run_pgbadger(files) sync_s3(bucket, key, upload=True) # upload_to_s3(bucket, key, region) except Exception as e: traceback.print_exc() finally: save_log_states()
def list(self): self.parser.add_argument('--unit-type', help='Type of unit valid value\ is docker', required=True) self.parser.add_argument('--search-type', help='search type', required=False) self.parser.add_argument('--search-string', help='search string', required=False) args = self.parser.parse_args() unit_type = vars(args)['unit_type'] search_type = vars(args)['search_type'] search_string = vars(args)['search_string'] data = {'unit_type': unit_type, 'search_type': search_type, 'search_string': search_string} galaxia_api_endpoint = os.getenv("galaxia_api_endpoint") target_url = client.concatenate_url(galaxia_api_endpoint, self.catalogue_uri) resp = client.http_request('GET', target_url, self.headers, data) if unit_type == 'container': format_print.format_dict(resp.json(), "keys") if unit_type == 'dashboard': format_print.format_dict(resp.json(), "keys") if unit_type == 'exporter': header = ["EXPORTER_NAME", "EXPORTER_ID"] format_print.format_dict(resp.json(), header) if unit_type == 'node': header = ["Instance_Name", "Host_Name"] format_print.format_dict(resp.json(), header)
def create(self): self.parser.add_argument('--source-system', help='Source system', required=True) self.parser.add_argument('--target-system', help='Target system', required=True) self.parser.add_argument('--metrics-list', help='List of metrics to\ export', required=True) self.parser.add_argument('--time-interval', help='Time interval\ in which to push metrics to target\ system', required=True) self.parser.add_argument('--unit-type', help='Type of unit valid value\ is docker', required=True) self.parser.add_argument('--exporter-name', help='Unique Name for\ exporter', required=True) args = self.parser.parse_args() json_data = client.create_request_data(**vars(args)) galaxia_api_endpoint = os.getenv("galaxia_api_endpoint") target_url = client.concatenate_url(galaxia_api_endpoint, self.exporter_uri) resp = client.http_request('POST', target_url, self.headers, json_data) print resp.text
def list(self): resp = None self.parser.add_argument('--type', help="Type of unit valid values are\ containers, nodes", required=True) args = self.parser.parse_args() unit_type = vars(args)['type'] data = {"sub_type": unit_type} galaxia_api_endpoint = os.getenv("galaxia_api_endpoint") target_url = client.concatenate_url(galaxia_api_endpoint, self.metrics_uri) try: resp = client.http_request('GET', target_url, self.headers, data) headers = ["NAME", "DESCRIPTION"] print "List of supported metrics for "+unit_type format_print.format_dict(resp.json(), headers) except Exception as ex: pass
def sample(self): resp = None self.parser.add_argument('--type', help="Type of unit valid values are\ containers, nodes", required=True) self.parser.add_argument('--search-string', help='Search String', required=False) self.parser.add_argument('--search-type', help='Search String', required=False) self.parser.add_argument('--meter-name', help='Name of the meter', required=True) args = self.parser.parse_args() data = {"type": vars(args)['type'], "search_string": vars(args)['search_string'], "search_type": vars(args)['search_type'] , "meter_name": vars(args)['meter_name']} galaxia_api_endpoint = os.getenv("galaxia_api_endpoint") target_url = client.concatenate_url(galaxia_api_endpoint, self.sample_uri) try: resp = client.http_request('GET', target_url, self.headers, data) headers = ["NAME", "VALUE"] print "Current "+ vars(args)['meter_name'] #print "Current "+unit_type format_print.format_dict(resp.json(), headers) except Exception as ex: pass
def create(self): self.parser.add_argument('--name', help='Name of the dashboard', required=True) self.parser.add_argument('--metrics-list', nargs='+')#help='List of \ # metrics to be displayed on the dashboard', # required=True) self.parser.add_argument('--names-list', help='Names list of \ units to plot in dashboard') self.parser.add_argument('--search-string', help='Search String') self.parser.add_argument('--search-type', help='Search String') self.parser.add_argument('--unit-type', help='Type of unit, valid value is docker') self.parser.add_argument('--exclude', help='Search excluding search string', required=False) args = self.parser.parse_args() if not (args.names_list or (args.search_string and args.search_type)): self.parser.error('add --names-list or (--search-string and --search-type)') json_data = client.create_request_data(**vars(args)) print json_data galaxia_api_endpoint = os.getenv("galaxia_api_endpoint") target_url = client.concatenate_url(galaxia_api_endpoint, self.url) try: resp = client.http_request('PUT', target_url, self.headers, json_data) print resp.text except Exception as ex: pass
def oldest_peer(peers): """Determines who the oldest peer is by comparing unit numbers.""" local_unit_no = int(os.getenv('JUJU_UNIT_NAME').split('/')[1]) for peer in peers: remote_unit_no = int(peer.split('/')[1]) if remote_unit_no < local_unit_no: return False return True
def get_chrome_path(): if win_client(): PathName = os.getenv('localappdata') + '\\Google\\Chrome\\User Data\\Default\\' if (os.path.isdir(PathName) == False): return "[!] Chrome Doesn't exists", False if osx_client(): PathName = os.getenv('HOME') + "/Library/Application Support/Google/Chrome/Default/" if (os.path.isdir(PathName) == False): return "[!] Chrome Doesn't exists", False if lnx_client(): PathName = os.getenv('HOME') + '/.config/google-chrome/Default/' if (os.path.isdir(PathName) == False): return "[!] Chrome Doesn't exists", False return PathName, True
def __init__(self): self.__path_home = os.getenv("HOME") self.__path_config = self.__path_home + "/.config/mpis" self.__path_file = "/usr/share/mpis" self.__path_tr = os.path.join(self.__path_config, "locale") self.__path_db = os.path.join(self.__path_config, "db")
def init_from_url(self, snapshot=-1, thingpedia_url=None): if thingpedia_url is None: thingpedia_url = os.getenv('THINGPEDIA_URL', 'https://thingpedia.stanford.edu/thingpedia') ssl_context = ssl.create_default_context() with urllib.request.urlopen(thingpedia_url + '/api/snapshot/' + str(snapshot) + '?meta=1', context=ssl_context) as res: self._process_devices(json.load(res)['data']) with urllib.request.urlopen(thingpedia_url + '/api/entities?snapshot=' + str(snapshot), context=ssl_context) as res: self._process_entities(json.load(res)['data'])
def get_thingpedia(input_words, workdir, snapshot): thingpedia_url = os.getenv('THINGPEDIA_URL', 'https://thingpedia.stanford.edu/thingpedia') output = dict() with urllib.request.urlopen(thingpedia_url + '/api/snapshot/' + str(snapshot) + '?meta=1', context=ssl_context) as res: output['devices'] = json.load(res)['data'] for device in output['devices']: if device['kind_type'] == 'global': continue if device['kind_canonical']: add_words(input_words, device['kind_canonical']) else: print('WARNING: missing canonical for tt-device:%s' % (device['kind'],)) for function_type in ('triggers', 'queries', 'actions'): for function_name, function in device[function_type].items(): if not function['canonical']: print('WARNING: missing canonical for tt:%s.%s' % (device['kind'], function_name)) else: add_words(input_words, function['canonical']) for argname, argcanonical in zip(function['args'], function['argcanonicals']): if argcanonical: add_words(input_words, argcanonical) else: add_words(input_words, clean(argname)) for argtype in function['schema']: if not argtype.startswith('Enum('): continue enum_entries = argtype[len('Enum('):-1].split(',') for enum_value in enum_entries: add_words(input_words, clean(enum_value)) with urllib.request.urlopen(thingpedia_url + '/api/entities?snapshot=' + str(snapshot), context=ssl_context) as res: output['entities'] = json.load(res)['data'] for entity in output['entities']: if entity['is_well_known'] == 1: continue add_words(input_words, tokenize(entity['name'])) with open(os.path.join(workdir, 'thingpedia.json'), 'w') as fp: json.dump(output, fp, indent=2)
def main(): np.random.seed(1234) workdir = sys.argv[1] if len(sys.argv) > 2: snapshot = int(sys.argv[2]) else: snapshot = -1 if len(sys.argv) > 3: embed_size = int(sys.argv[3]) else: embed_size = 300 dataset = os.getenv('DATASET', workdir) glove = os.getenv('GLOVE', os.path.join(workdir, 'glove.42B.300d.txt')) download_glove(glove) input_words = set() # add the canonical words for the builtin functions add_words(input_words, 'now nothing notify return the event') create_dictionary(input_words, dataset) get_thingpedia(input_words, workdir, snapshot) save_dictionary(input_words, workdir) trim_embeddings(input_words, workdir, embed_size, glove)
def read_config(): config = ConfigParser() config.read([path.join(BASE_DIR, 'settings.ini'), os.getenv('CONF_FILE', '')]) return config
def read_environment(self): ''' Reads the settings from environment variables ''' # Setup credentials if os.getenv("DO_API_TOKEN"): self.api_token = os.getenv("DO_API_TOKEN") if os.getenv("DO_API_KEY"): self.api_token = os.getenv("DO_API_KEY")
def user_config_dir(appname=None, appauthor=None, version=None, roaming=False): r"""Return full path to the user-specific config dir for this application. "appname" is the name of application. If None, just the system directory is returned. "appauthor" (only used on Windows) is the name of the appauthor or distributing body for this application. Typically it is the owning company name. This falls back to appname. You may pass False to disable it. "version" is an optional version path element to append to the path. You might want to use this if you want multiple versions of your app to be able to run independently. If used, this would typically be "<major>.<minor>". Only applied when appname is present. "roaming" (boolean, default False) can be set True to use the Windows roaming appdata directory. That means that for users on a Windows network setup for roaming profiles, this user data will be sync'd on login. See <http://technet.microsoft.com/en-us/library/cc766489(WS.10).aspx> for a discussion of issues. Typical user data directories are: macOS: same as user_data_dir Unix: ~/.config/<AppName> # or in $XDG_CONFIG_HOME, if defined Win *: same as user_data_dir For Unix, we follow the XDG spec and support $XDG_CONFIG_HOME. That means, by deafult "~/.config/<AppName>". """ if system in ["win32", "darwin"]: path = user_data_dir(appname, appauthor, None, roaming) else: path = os.getenv('XDG_CONFIG_HOME', os.path.expanduser("~/.config")) if appname: path = os.path.join(path, appname) if appname and version: path = os.path.join(path, version) return path
def user_config_dir(appname, roaming=True): """Return full path to the user-specific config dir for this application. "appname" is the name of application. If None, just the system directory is returned. "roaming" (boolean, default True) can be set False to not use the Windows roaming appdata directory. That means that for users on a Windows network setup for roaming profiles, this user data will be sync'd on login. See <http://technet.microsoft.com/en-us/library/cc766489(WS.10).aspx> for a discussion of issues. Typical user data directories are: macOS: same as user_data_dir Unix: ~/.config/<AppName> Win *: same as user_data_dir For Unix, we follow the XDG spec and support $XDG_CONFIG_HOME. That means, by default "~/.config/<AppName>". """ if WINDOWS: path = user_data_dir(appname, roaming=roaming) elif sys.platform == "darwin": path = user_data_dir(appname) else: path = os.getenv('XDG_CONFIG_HOME', expanduser("~/.config")) path = os.path.join(path, appname) return path # for the discussion regarding site_config_dirs locations # see <https://github.com/pypa/pip/issues/1733>
def user_config_dir(appname=None, appauthor=None, version=None, roaming=False): r"""Return full path to the user-specific config dir for this application. "appname" is the name of application. If None, just the system directory is returned. "appauthor" (only used on Windows) is the name of the appauthor or distributing body for this application. Typically it is the owning company name. This falls back to appname. You may pass False to disable it. "version" is an optional version path element to append to the path. You might want to use this if you want multiple versions of your app to be able to run independently. If used, this would typically be "<major>.<minor>". Only applied when appname is present. "roaming" (boolean, default False) can be set True to use the Windows roaming appdata directory. That means that for users on a Windows network setup for roaming profiles, this user data will be sync'd on login. See <http://technet.microsoft.com/en-us/library/cc766489(WS.10).aspx> for a discussion of issues. Typical user data directories are: Mac OS X: same as user_data_dir Unix: ~/.config/<AppName> # or in $XDG_CONFIG_HOME, if defined Win *: same as user_data_dir For Unix, we follow the XDG spec and support $XDG_CONFIG_HOME. That means, by deafult "~/.config/<AppName>". """ if system in ["win32", "darwin"]: path = user_data_dir(appname, appauthor, None, roaming) else: path = os.getenv('XDG_CONFIG_HOME', os.path.expanduser("~/.config")) if appname: path = os.path.join(path, appname) if appname and version: path = os.path.join(path, version) return path
def check_for_tokens(): ''' Checks for token present in system environment. To set them, export them in your ~/.bashrc or ~/.zshrc ''' log.debug('Checking for tokens') kite_api_key = getenv('KITE_API_KEY') kite_request_token = getenv('KITE_REQUEST_TOKEN') kite_secret = getenv('KITE_SECRET') # Get your request token from the first time # kite.trade/connect/login?api_key=<> log.debug("Tokens fetched: {} {} ".format(kite_api_key, kite_secret,)) if kite_api_key is None or kite_secret is None: print(''' You need to add your Kite API token, along with Secret Key. \n export KITE_API_KEY='your-kite-api-key' export KITE_SECRET='your-kite-secret-key' \n You can fetch it from here : https://developers.kite.trade/apps ''') return False log.debug("Kite Request Token: {}".format(kite_request_token)) if kite_request_token is None: print(''' Set your request token. You can do this by setting environment variables: \n export KITE_REQUEST_TOKEN='your-kite-request-token' \n Generate request token from https://kite.trade/connect/login?api_key=<> ''') return False return True
def __load_dictionary(self, config): var = config.get_value('engine/replace-with-kanji-python', 'dictionary') if var is None or var.get_type_string() != 's': path = os.path.join(os.getenv('IBUS_REPLACE_WITH_KANJI_LOCATION'), 'restrained.dic') if var: config.unset('engine/replace-with-kanji-python', 'dictionary') else: path = var.get_string() return Dictionary(path)
def __init__(self, path): logger.info("Dictionary(%s)", path) self.__dict_base = {} self.__dict = {} self.__yomi = '' self.__no = 0 self.__cand = [] self.__numeric = '' self.__dirty = False self.__orders_path = '' # Load Katakana dictionary first so that Katakana words come after Kanji words. katakana_path = os.path.join(os.getenv('IBUS_REPLACE_WITH_KANJI_LOCATION'), 'katakana.dic') self.__load_dict(self.__dict_base, katakana_path) # Load system dictionary self.__load_dict(self.__dict_base, path) # Load private dictionary self.__dict = self.__dict_base.copy() my_path = os.path.expanduser('~/.local/share/ibus-replace-with-kanji/my.dic') self.__load_dict(self.__dict, my_path, 'a+') base = os.path.basename(path) if base: self.__orders_path = os.path.expanduser('~/.local/share/ibus-replace-with-kanji') self.__orders_path = os.path.join(self.__orders_path, base) self.__load_dict(self.__dict, self.__orders_path, 'a+', version_checked=False)
def __init__(self): '''Constructor.''' logging.basicConfig() random.seed() self._slack_bot_token = os.getenv('SLACK_BOT_TOKEN') self._slack = slackclient.SlackClient(self._slack_bot_token) self._slack_bot_id = self._get_bot_id() self._at_bot = '<@' + str(self._slack_bot_id) + '>' self._scraper = scraper.Scraper() self._polls = {} self.is_running = True self._reaction_interval = 1 self._keywords = [ 'belly', 'bite', 'eat', 'food', 'lunch', 'meal', 'menu', 'offer', 'stomach' ] self._thread = threading.Thread(None, self._loop_messages) self._thread.start()
def gather_mpi_arguments(hostfile, params): from mpi4py import MPI vendor = MPI.get_vendor() print_and_log(['MPI detected: %s' % str(vendor)], 'debug', logger) if vendor[0] == 'Open MPI': mpi_args = ['mpirun'] if os.getenv('LD_LIBRARY_PATH'): mpi_args += ['-x', 'LD_LIBRARY_PATH'] if os.getenv('PATH'): mpi_args += ['-x', 'PATH'] if os.getenv('PYTHONPATH'): mpi_args += ['-x', 'PYTHONPATH'] if os.path.exists(hostfile): mpi_args += ['-hostfile', hostfile] elif vendor[0] == 'Microsoft MPI': mpi_args = ['mpiexec'] if os.path.exists(hostfile): mpi_args += ['-machinefile', hostfile] elif vendor[0] == 'MPICH2': mpi_args = ['mpiexec'] if os.path.exists(hostfile): mpi_args += ['-f', hostfile] elif vendor[0] == 'MPICH': mpi_args = ['mpiexec'] if os.path.exists(hostfile): mpi_args += ['-f', hostfile] else: print_and_log([ '%s may not be yet properly implemented: contact developpers' % vendor[0]], 'error', logger) mpi_args = ['mpirun'] if os.path.exists(hostfile): mpi_args += ['-hostfile', hostfile] return mpi_args