我们从Python开源项目中,提取了以下19个代码示例,用于说明如何使用MySQLdb.NotSupportedError()。
def select_group_ids(self, resource_name, timestamp): """Select the group ids from a snapshot table. Args: resource_name (str): String of the resource name. timestamp (str): String of timestamp, formatted as YYYYMMDDTHHMMSSZ. Returns: list: A list of group ids. Raises: MySQLError: When an error has occured while executing the query. """ try: group_ids_sql = select_data.GROUP_IDS.format(timestamp) cursor = self.conn.cursor(cursorclass=cursors.DictCursor) cursor.execute(group_ids_sql) rows = cursor.fetchall() return [row['group_id'] for row in rows] except (DataError, IntegrityError, InternalError, NotSupportedError, OperationalError, ProgrammingError) as e: raise MySQLError(resource_name, e)
def execute_sql_with_fetch(self, resource_name, sql, values): """Executes a provided sql statement with fetch. Args: resource_name (str): String of the resource name. sql (str): String of the sql statement. values (tuple): Tuple of string for sql placeholder values. Returns: list: A list of dict representing rows of sql query result. Raises: MySQLError: When an error has occured while executing the query. """ try: cursor = self.conn.cursor(cursorclass=cursors.DictCursor) cursor.execute(sql, values) return cursor.fetchall() except (DataError, IntegrityError, InternalError, NotSupportedError, OperationalError, ProgrammingError) as e: raise MySQLError(resource_name, e)
def execute_sql_with_commit(self, resource_name, sql, values): """Executes a provided sql statement with commit. Args: resource_name (str): String of the resource name. sql (str): String of the sql statement. values (tuple): Tuple of string for sql placeholder values. Raises: MySQLError: When an error has occured while executing the query. """ try: cursor = self.conn.cursor() cursor.execute(sql, values) self.conn.commit() except (DataError, IntegrityError, InternalError, NotSupportedError, OperationalError, ProgrammingError) as e: raise MySQLError(resource_name, e)
def _rollback(self): try: BaseDatabaseWrapper._rollback(self) except Database.NotSupportedError: pass
def get_buckets_acls(self, resource_name, timestamp): """Select the bucket acls from a bucket acls snapshot table. Args: resource_name (str): String of the resource name. timestamp (str): String of timestamp, formatted as YYYYMMDDTHHMMSSZ. Returns: list: List of bucket acls. Raises: MySQLError: An error with MySQL has occurred. """ bucket_acls = {} cnt = 0 try: bucket_acls_sql = select_data.BUCKET_ACLS.format(timestamp) rows = self.execute_sql_with_fetch(resource_name, bucket_acls_sql, None) for row in rows: bucket_acl = bkt_acls.\ BucketAccessControls(bucket=row['bucket'], entity=row['entity'], email=row['email'], domain=row['domain'], role=row['role'], project_number=row['project_number']) bucket_acls[cnt] = bucket_acl cnt += 1 except (DataError, IntegrityError, InternalError, NotSupportedError, OperationalError, ProgrammingError) as e: LOGGER.error(errors.MySQLError(resource_name, e)) return bucket_acls
def get_bigquery_acls(self, resource_name, timestamp): """Select the Big Query acls from a Big Query acls snapshot table. Args: resource_name (str): String of the resource name. timestamp (str): String of timestamp, formatted as YYYYMMDDTHHMMSSZ. Returns: dict: Dictionary keyed by the count of ACLs and then the ACLs. Raises: MySQLError: An error with MySQL has occurred. """ bigquery_acls = {} cnt = 0 try: bigquery_acls_sql = select_data.BIGQUERY_ACLS.format(timestamp) rows = self.execute_sql_with_fetch(resource_name, bigquery_acls_sql, None) except (DataError, IntegrityError, InternalError, NotSupportedError, OperationalError, ProgrammingError) as e: LOGGER.error(errors.MySQLError(resource_name, e)) for row in rows: bigquery_acl = bq_acls.BigqueryAccessControls( dataset_id=row['dataset_id'], special_group=row['access_special_group'], user_email=row['access_user_by_email'], domain=row['access_domain'], role=row['role'], group_email=row['access_group_by_email'], project_id=row['project_id']) bigquery_acls[cnt] = bigquery_acl cnt += 1 return bigquery_acls
def _get_cloudsql_instance_acl_map(self, resource_name, timestamp): """Create CloudSQL instance acl map. Args: resource_name (str): String of the resource name. timestamp (str): String of timestamp, formatted as YYYYMMDDTHHMMSSZ. Returns: dict: Map of instance acls. Raises: MySQLError: An error with MySQL has occurred. """ cloudsql_acls_map = {} try: cloudsql_acls_sql = select_data.CLOUDSQL_ACLS.format(timestamp) rows = self.execute_sql_with_fetch(resource_name, cloudsql_acls_sql, None) for row in rows: acl_list = [] project_number = row['project_number'] instance_name = row['instance_name'] network = row['value'] hash_key = hash(str(project_number) + ',' + instance_name) if hash_key in cloudsql_acls_map: if network not in cloudsql_acls_map[hash_key]: cloudsql_acls_map[hash_key].append(network) else: acl_list.append(network) cloudsql_acls_map[hash_key] = acl_list except (DataError, IntegrityError, InternalError, NotSupportedError, OperationalError, ProgrammingError) as e: LOGGER.error(errors.MySQLError(resource_name, e)) return cloudsql_acls_map
def load_data(self, resource_name, timestamp, data): """Load data into a snapshot table. Args: resource_name (str): String of the resource name. timestamp (str): String of timestamp, formatted as YYYYMMDDTHHMMSSZ. data (iterable): An iterable or a list of data to be uploaded. Raises: MySQLError: When an error has occured while executing the query. """ with csv_writer.write_csv(resource_name, data) as csv_file: try: snapshot_table_name = self._create_snapshot_table_name( resource_name, timestamp) load_data_sql = load_data_sql_provider.provide_load_data_sql( resource_name, csv_file.name, snapshot_table_name) LOGGER.debug('SQL: %s', load_data_sql) cursor = self.conn.cursor() cursor.execute(load_data_sql) self.conn.commit() # TODO: Return the snapshot table name so that it can be tracked # in the main snapshot table. except (DataError, IntegrityError, InternalError, NotSupportedError, OperationalError, ProgrammingError) as e: raise MySQLError(resource_name, e)
def get_latest_snapshot_timestamp(self, statuses): """Select the latest timestamp of the completed snapshot. Args: statuses (tuple): The tuple of snapshot statuses to filter on. Returns: str: The string timestamp of the latest complete snapshot. Raises: MySQLError: When no rows are found. """ # Build a dynamic parameterized query string for filtering the # snapshot statuses if not isinstance(statuses, tuple): statuses = ('SUCCESS',) status_params = ','.join(['%s']*len(statuses)) filter_clause = SNAPSHOT_STATUS_FILTER_CLAUSE.format(status_params) try: cursor = self.conn.cursor() cursor.execute( select_data.LATEST_SNAPSHOT_TIMESTAMP + filter_clause, statuses) row = cursor.fetchone() if row: return row[0] raise NoResultsError('No snapshot cycle found.') except (DataError, IntegrityError, InternalError, NotSupportedError, OperationalError, ProgrammingError, NoResultsError) as e: raise MySQLError('snapshot_cycles', e)
def get_cloudsql_acls(self, resource_name, timestamp): """Select the cloudsql acls for project from a snapshot table. Args: resource_name (str): String of the resource name. timestamp (str): String of timestamp, formatted as YYYYMMDDTHHMMSSZ. Returns: list: List of cloudsql acls. Raises: MySQLError: An error with MySQL has occurred. """ cloudsql_acls = {} cnt = 0 try: cloudsql_instances_sql = ( select_data.CLOUDSQL_INSTANCES.format(timestamp)) rows = self.execute_sql_with_fetch(resource_name, cloudsql_instances_sql, None) acl_map = self._get_cloudsql_instance_acl_map(resource_name, timestamp) for row in rows: project_number = row['project_number'] instance_name = row['name'] ssl_enabled = row['settings_ip_configuration_require_ssl'] authorized_networks = self.\ _get_networks_for_instance(acl_map, project_number, instance_name) cloudsql_acl = csql_acls.\ CloudSqlAccessControl(instance_name=instance_name, authorized_networks=authorized_networks, ssl_enabled=ssl_enabled, project_number=project_number) cloudsql_acls[cnt] = cloudsql_acl cnt += 1 except (DataError, IntegrityError, InternalError, NotSupportedError, OperationalError, ProgrammingError) as e: LOGGER.error(errors.MySQLError(resource_name, e)) return cloudsql_acls