我们从Python开源项目中,提取了以下6个代码示例,用于说明如何使用oauth2client.service_account()。
def getSvcAcctCredentials(scopes, act_as): try: if not GM.Globals[GM.OAUTH2SERVICE_JSON_DATA]: json_string = readFile(GC.Values[GC.OAUTH2SERVICE_JSON], continueOnError=True, displayError=True) if not json_string: invalidOauth2serviceJsonExit() GM.Globals[GM.OAUTH2SERVICE_JSON_DATA] = json.loads(json_string) credentials = oauth2client.service_account.ServiceAccountCredentials.from_json_keyfile_dict(GM.Globals[GM.OAUTH2SERVICE_JSON_DATA], scopes) credentials = credentials.create_delegated(act_as) credentials.user_agent = GAM_INFO serialization_data = credentials.serialization_data GM.Globals[GM.ADMIN] = serialization_data[u'client_email'] GM.Globals[GM.OAUTH2_CLIENT_ID] = serialization_data[u'client_id'] return credentials except (ValueError, IndexError, KeyError): invalidOauth2serviceJsonExit()
def _ProcessJsonArg(args): """Get client_info and service_account_json_keyfile from args. This just reads args.json, and decides (based on contents) whether it's a client_secrets or a service_account key, and returns as appropriate. """ filename = os.path.expanduser(args.json) if not filename: return '', '' with open(filename, 'rU') as f: try: contents = json.load(f) except ValueError: raise ValueError('Invalid JSON file: {}'.format(args.json)) if contents.get('type', '') == 'service_account': return '', filename else: return filename, ''
def checkServiceAccount(users): checkForExtraneousArguments() all_scopes_pass = True all_scopes, jcount = API.getSortedSvcAcctScopesList() i, count, users = getEntityArgument(users) for user in users: i += 1 user = convertUIDtoEmailAddress(user) entityPerformActionNumItems([Ent.USER, user], jcount, Ent.SCOPE, i, count) Ind.Increment() j = 0 for scope in all_scopes: j += 1 try: credentials = getSvcAcctCredentials(scope, user) credentials.refresh(httplib2.Http(disable_ssl_certificate_validation=GC.Values[GC.NO_VERIFY_SSL])) result = u'PASS' except httplib2.ServerNotFoundError as e: systemErrorExit(NETWORK_ERROR_RC, str(e)) except oauth2client.client.HttpAccessTokenRefreshError: result = u'FAIL' all_scopes_pass = False entityActionPerformedMessage([Ent.SCOPE, u'{0:60}'.format(scope)], result, j, jcount) Ind.Decrement() service_account = credentials.serialization_data[u'client_id'] _, _, user_domain = splitEmailAddressOrUID(user) printBlankLine() if all_scopes_pass: printLine(Msg.SCOPE_AUTHORIZATION_PASSED.format(service_account)) else: printErrorMessage(SCOPES_NOT_AUTHORIZED, Msg.SCOPE_AUTHORIZATION_FAILED.format(user_domain, service_account, u',\n'.join(all_scopes)))
def _GetCredentialForServiceAccount(json_keyfile, scopes, credentials_filename=None): with open(json_keyfile, 'r') as json_keyfile_obj: client_credentials = json.load(json_keyfile_obj) credential_store = _GetCredentialStore(credentials_filename, client_credentials['private_key_id'], ' '.join(sorted(scopes))) credentials = credential_store.get() if credentials is None or credentials.invalid: credentials = ( service_account.ServiceAccountCredentials.from_json_keyfile_dict( client_credentials, scopes=scopes)) credential_store.put(credentials) credentials.set_store(credential_store) return credentials