Python web 模块,HTTPError() 实例源码

我们从Python开源项目中,提取了以下34个代码示例,用于说明如何使用web.HTTPError()

项目:CommunityCellularManager    作者:facebookincubator    | 项目源码 | 文件源码
def handle_with_logging(self):
    def process(processors):
        try:
            if processors:
                p, processors = processors[0], processors[1:]
                return p(lambda: process(processors))
            else:
                return self.handle()
        except web.HTTPError as e:
            logger.error("Web error: %s" % e)
            raise
        except (KeyboardInterrupt, SystemExit):
            raise
        except Exception as e:
            logger.critical("Unhandled exception raised",
                    traceback=traceback.format_exc())
            raise self.internalerror()

    # processors must be applied in the resvere order. (??)
    return process(self.processors)

# monkeypatch to allow error capturing
项目:annotated-py-tornado    作者:hhstore    | 项目源码 | 文件源码
def put(self, bucket, object_name):
        object_name = urllib.unquote(object_name)
        bucket_dir = os.path.abspath(os.path.join(
            self.application.directory, bucket))
        if not bucket_dir.startswith(self.application.directory) or \
           not os.path.isdir(bucket_dir):
            raise web.HTTPError(404)
        path = self._object_path(bucket, object_name)
        if not path.startswith(bucket_dir) or os.path.isdir(path):
            raise web.HTTPError(403)
        directory = os.path.dirname(path)
        if not os.path.exists(directory):
            os.makedirs(directory)
        object_file = open(path, "w")
        object_file.write(self.request.body)
        object_file.close()
        self.finish()
项目:rucio    作者:rucio01    | 项目源码 | 文件源码
def generate_http_error(status_code, exc_cls, exc_msg):
    """
    utitily function to generate a complete HTTP error response.
    :param status_code: The HTTP status code to generate a response for.
    :param exc_cls: The name of the exception class to send with the response.
    :param exc_msg: The error message.
    :returns: a web.py HTTP response object.
    """

    status = codes[status_code]
    data = {'ExceptionClass': exc_cls,
            'ExceptionMessage': exc_msg}
    # Truncate too long exc_msg
    if len(str(exc_msg)) > 15000:
        exc_msg = str(exc_msg)[:15000]
    headers = {'Content-Type': 'application/octet-stream',
               'ExceptionClass': exc_cls,
               'ExceptionMessage': clean_headers(exc_msg)}
    try:
        return HTTPError(status, headers=headers, data=render_json(**data))
    except:
        print {'Content-Type': 'application/octet-stream', 'ExceptionClass': exc_cls, 'ExceptionMessage': str(exc_msg).strip()}
        raise
项目:lc_cloud    作者:refractionPOINT    | 项目源码 | 文件源码
def doGET( self ):
        params = web.input( sensor_id = None )

        if params.sensor_id is None:
            raise web.HTTPError( '400 Bad Request: sensor id required' )

        info = model.request( 'get_sensor_info', { 'id_or_host' : params.sensor_id } )

        if not isOrgAllowed( AgentId( info.data[ 'id' ] ).org_id ):
            raise web.HTTPError( '401 Unauthorized' )

        info = model.request( 'get_lastips', { 'id' : params.sensor_id } )

        if not info.isSuccess:
            raise web.HTTPError( '503 Service Unavailable: %s' % str( info ) )

        return info.data
项目:lc_cloud    作者:refractionPOINT    | 项目源码 | 文件源码
def doGET( self ):
        params = web.input( sensor_id = None )

        if params.sensor_id is None:
            raise web.HTTPError( '400 Bad Request: sensor id required' )

        info = model.request( 'get_sensor_info', { 'id_or_host' : params.sensor_id } )

        if not isOrgAllowed( AgentId( info.data[ 'id' ] ).org_id ):
            raise web.HTTPError( '401 Unauthorized' )

        info = model.request( 'get_lastevents', { 'id' : params.sensor_id } )

        if not info.isSuccess:
            raise web.HTTPError( '503 Service Unavailable: %s' % str( info ) )

        return info.data
项目:lc_cloud    作者:refractionPOINT    | 项目源码 | 文件源码
def doGET( self ):
        params = web.input( sensor_id = None )

        if params.sensor_id is None:
            raise web.HTTPError( '400 Bad Request: sensor id required' )

        info = model.request( 'get_sensor_info', { 'id_or_host' : params.sensor_id } )

        if not isOrgAllowed( AgentId( info.data[ 'id' ] ).org_id ):
            raise web.HTTPError( '401 Unauthorized' )

        usage = model.request( 'get_sensor_bandwidth', { 'sid' : params.sensor_id } )
        if not usage.isSuccess:
            raise web.HTTPError( '503 Service Unavailable: %s' % str( usage ) )

        return usage.data
项目:lc_cloud    作者:refractionPOINT    | 项目源码 | 文件源码
def doPOST( self ):
        params = web.input( sid = None, tag = None )

        if params.sid is None:
            raise web.HTTPError( '400 Bad Request: sid required' )

        if params.tag is None:
            raise web.HTTPError( '400 Bad Request: tag required' )

        if not isSensorAllowed( params.sid ):
            raise web.HTTPError( '401 Unauthorized' )

        resp = tagging.request( 'add_tags', { 'sid' : AgentId( params.sid ).sensor_id, 
                                              'tag' : params.tag,
                                              'by' : session.email,
                                              'ttl' : ( 60 * 60 * 24 * 365 * 20 ) } )
        if resp.isSuccess:
            return { 'success' : True }
        else:
            raise web.HTTPError( '503 Service Unavailable: %s' % str( resp ) )
项目:lc_cloud    作者:refractionPOINT    | 项目源码 | 文件源码
def doPOST( self ):
        params = web.input( sid = None, tag = None )

        if params.sid is None:
            raise web.HTTPError( '400 Bad Request: sid required' )

        if params.tag is None:
            raise web.HTTPError( '400 Bad Request: tag required' )

        if not isSensorAllowed( params.sid ):
            raise web.HTTPError( '401 Unauthorized' )

        resp = tagging.request( 'del_tags', { 'sid' : AgentId( params.sid ).sensor_id, 
                                              'tag' : params.tag,
                                              'by' : session.email } )
        if resp.isSuccess:
            return { 'success' : True }
        else:
            raise web.HTTPError( '503 Service Unavailable: %s' % str( resp ) )
项目:optf2    作者:Lagg    | 项目源码 | 文件源码
def GET(self, user):
        baseurl = user.strip('/').split('/')
        if len(baseurl) > 0:
            user = baseurl[-1]

        if not user:
            raise steam.items.InventoryError("Need an ID")

        try:
            prof = models.user(user).load()
            ctx = models.sim_context(prof).load()
            for ct in (ctx or []):
                ct.setdefault("inventory_logo", '')

            return template.sim_selector(prof, ctx)
        except steam.items.InventoryError as E:
            raise web.NotFound(template.errors.generic("Failed to load backpack ({0})".format(E)))
        except steam.user.ProfileError as E:
            raise web.NotFound(template.errors.generic("Failed to load profile ({0})".format(E)))
        except steam.api.HTTPError as E:
            raise web.NotFound(template.errors.generic("Couldn't connect to Steam (HTTP {0})".format(E)))
        except models.CacheEmptyError as E:
            raise web.NotFound(template.errors.generic(E))
项目:annotated-py-tornado    作者:hhstore    | 项目源码 | 文件源码
def put(self, bucket_name):
        path = os.path.abspath(os.path.join(
            self.application.directory, bucket_name))
        if not path.startswith(self.application.directory) or \
           os.path.exists(path):
            raise web.HTTPError(403)
        os.makedirs(path)
        self.finish()
项目:annotated-py-tornado    作者:hhstore    | 项目源码 | 文件源码
def delete(self, bucket_name):
        path = os.path.abspath(os.path.join(
            self.application.directory, bucket_name))
        if not path.startswith(self.application.directory) or \
           not os.path.isdir(path):
            raise web.HTTPError(404)
        if len(os.listdir(path)) > 0:
            raise web.HTTPError(403)
        os.rmdir(path)
        self.set_status(204)
        self.finish()
项目:annotated-py-tornado    作者:hhstore    | 项目源码 | 文件源码
def get(self, bucket, object_name):
        object_name = urllib.unquote(object_name)
        path = self._object_path(bucket, object_name)
        if not path.startswith(self.application.directory) or \
           not os.path.isfile(path):
            raise web.HTTPError(404)
        info = os.stat(path)
        self.set_header("Content-Type", "application/unknown")
        self.set_header("Last-Modified", datetime.datetime.utcfromtimestamp(
            info.st_mtime))
        object_file = open(path, "r")
        try:
            self.finish(object_file.read())
        finally:
            object_file.close()
项目:py-script    作者:xiaoxiamin    | 项目源码 | 文件源码
def testCustomNotFound(self):
        urls_a = ("/", "a")
        urls_b = ("/", "b")

        app_a = web.application(urls_a, locals())
        app_b = web.application(urls_b, locals())

        app_a.notfound = lambda: web.HTTPError("404 Not Found", {}, "not found 1")

        urls = (
            "/a", app_a,
            "/b", app_b
        )
        app = web.application(urls, locals())

        def assert_notfound(path, message):
            response = app.request(path)
            self.assertEquals(response.status.split()[0], "404")
            self.assertEquals(response.data, message)

        assert_notfound("/a/foo", "not found 1")
        assert_notfound("/b/foo", "not found")

        app.notfound = lambda: web.HTTPError("404 Not Found", {}, "not found 2")
        assert_notfound("/a/foo", "not found 1")
        assert_notfound("/b/foo", "not found 2")
项目:lc_cloud    作者:refractionPOINT    | 项目源码 | 文件源码
def GET( self ):
        params = web.input( ip = None )

        if params.ip is None:
            raise web.HTTPError( '400 Bad Request: ip required' )

        usage = querySites( 'models', 'get_ip_usage', 
                            queryData = { 'ip' : params.ip, 'oid' : getOrgs().keys() },
                            siteProc = lambda res, ctx, site: res[ 'usage' ], 
                            qProc = lambda res, ctx: [ x for x in itertools.chain( res ) ] )
        return usage

# This is a waterhose feed from a single sensor, it streams traffic as json.
项目:lc_cloud    作者:refractionPOINT    | 项目源码 | 文件源码
def GET( self ):
        web.header( 'Content-Type', 'application/json' )
        params = web.input( sid = None )

        sensorInfo = getSiteFor( params.sid )
        if sensorInfo is None:
            raise web.HTTPError( '404 Not Found: sensor not found' )

        sensorInfo, site = sensorInfo

        after = int( time.time() - 5 )
        eventCache = RingCache( maxEntries = 100, isAutoAdd = True )

        while True:
            now = int( time.time() )
            newest = 0
            res = querySite( 'models', 'get_timeline', 
                             { 'id' : sensorInfo[ 'id' ],
                               'is_include_content' : True,
                               'after' : after }, defaultSiteProc, site, {} )

            for r in res[ 'events' ]:
                if r[ 2 ] not in eventCache:
                    yield dumpJson( sanitizeJson( r[ 3 ] ) )
                eventTime = int( r[ 0 ] / 1000 )
                if eventTime < now + 30 and eventTime > newest:
                    newest = eventTime

            if 0 != newest:
                after = newest - 1
            gevent.sleep( 2 )
项目:lc_cloud    作者:refractionPOINT    | 项目源码 | 文件源码
def GET( self ):
        params = web.input( name = None )

        if params.name is None:
            raise web.HTTPError( '400 Bad Request: name required' )

        objects = querySites( 'models', 'get_obj_list', 
                              queryData = { 'name' : params.name },
                              siteProc = lambda res, ctx, site: dict( [ ( x[ 0 ], ( x[ 1 ], x[ 2 ] ) ) for x in res[ 'objects' ] ] ), 
                              qProc = lambda res, ctx: reduce( lambda x, y: x.update( y ) or x, res, {} ) )
        return objects
项目:lc_cloud    作者:refractionPOINT    | 项目源码 | 文件源码
def GET( self ):
        params = web.input( id = None )

        if params.id is None:
            raise web.HTTPError( '400 Bad Request: id required' )

        objects = querySites( 'models', 'get_obj_view', 
                              queryData = { 'id' : params.id },
                              qProc = lambda res, ctx: mergeObj( res ) )
        return objects
项目:lc_cloud    作者:refractionPOINT    | 项目源码 | 文件源码
def GET( self ):
        params = web.input( atom = None, max_depth = 5, max_atoms = 1000, with_routing = False )

        if params.atom is None:
            raise web.HTTPError( '400 Bad Request: atom required' )

        atoms = querySites( 'models', 'get_atoms_from_root', 
                              queryData = { 'id' : params.atom, 
                                            'depth' : params.max_depth,
                                            'max_atoms' : params.max_atoms,
                                            'with_routing' : params.with_routing },
                              qProc = lambda res, ctx: firstMatching( res, lambda x: 0 != len( x ) ) )

        return atoms
项目:lc_cloud    作者:refractionPOINT    | 项目源码 | 文件源码
def doGET( self ):
        params = web.input( sensor_id = None )

        if params.sensor_id is None:
            raise web.HTTPError( '400 Bad Request: sensor id required' )

        info = model.request( 'get_sensor_info', { 'id_or_host' : params.sensor_id } )

        if not isOrgAllowed( AgentId( info.data[ 'id' ] ).org_id ):
            raise web.HTTPError( '401 Unauthorized' )
        live_status = sensordir.request( 'get_endpoint', { 'aid' : params.sensor_id } )
        if not live_status.isSuccess:
            live_status = False
            transfered = 0
        else:
            transfered = live_status.data.get( 'transfered', 0 )
            live_status = True if live_status.data.get( 'endpoint', None ) is not None else False

        if not info.isSuccess:
            raise web.HTTPError( '503 Service Unavailable: %s' % str( info ) )

        if 0 == len( info.data ):
            raise web.HTTPError( '204 No Content: sensor not found' )

        info.data[ 'live_status' ] = live_status
        info.data[ 'transfered' ] = transfered

        return info.data
项目:lc_cloud    作者:refractionPOINT    | 项目源码 | 文件源码
def doGET( self ):
        params = web.input( atid = None )

        if params.atid is None:
            raise web.HTTPError( '400 Bad Request: atid required' )

        effectiveId = normalAtom( params.atid )

        info = model.request( 'get_atoms_from_root', { 'id' : effectiveId, 'with_routing' : True } )

        if not info.isSuccess:
            raise web.HTTPError( '503 Service Unavailable : %s' % str( info ) )

        info.data = list( info.data )

        for routing, _ in info.data:
            if not isOrgAllowed( AgentId( routing[ 'aid' ] ).org_id ):
                raise web.HTTPError( '401 Unauthorized' )

        # Make sure the root is present
        isFound = False
        for _, atom in info.data:
            if effectiveId == normalAtom( atom.values()[0]['hbs.THIS_ATOM'] ):
                isFound = True
                break
        info.data = map( lambda x: { 'data' : x[ 1 ], 'key' : EventInterpreter( x[ 1 ] ).shortKey() }, info.data )
        if not isFound:
            info.data.append( { 'data' : { 'UNKNOWN' : { 'hbs.THIS_ATOM' : effectiveId } },
                                'key' : 'UNKNOWN' } )

        # Summarize the events

        return info.data
项目:lc_cloud    作者:refractionPOINT    | 项目源码 | 文件源码
def doGET( self ):
        params = web.input( sid = None, after = None, before = None, is_json = True, is_flat = False )

        if params.sid is None:
            return renderAlone.error( 'Must provide a sid.' )

        info = model.request( 'get_sensor_info', { 'id_or_host' : params.sid } )
        aid = AgentId( info.data[ 'id' ] )
        if not isOrgAllowed( aid.org_id ):
            raise web.HTTPError( '401 Unauthorized' )

        req = { 'sid' : params.sid,
                'is_json' : params.is_json,
                'is_flat' : params.is_flat,
                'oid' : aid.org_id, 'by' : session.email }
        if params.after is not None and '0' != params.after:
            req[ 'after' ] = int( params.after )
        if params.before is not None and '0' != params.before:
            req[ 'before' ] = int( params.before )

        res = dataexporter.request( 'export_sensor', req )

        if not res.isSuccess:
            raise web.HTTPError( '503 Service Unavailable: %s' % str( res ) )

        setDownloadFileName( res.data[ 'export_name' ] )

        return res.data[ 'export' ]
项目:lc_cloud    作者:refractionPOINT    | 项目源码 | 文件源码
def doGET( self ):
        params = web.input( sid = None, before = None, after = None )

        if params.sid is None:
            raise web.HTTPError( '400 Bad Request: sid required' )

        if params.after is None:
            raise web.HTTPError( '400 Bad Request: after required' )

        if params.before is None:
            raise web.HTTPError( '400 Bad Request: before required' )

        params.after = int( params.after )
        params.before = int( params.before )

        info = model.request( 'get_sensor_info', { 'id_or_host' : params.sid } )

        aid = AgentId( info.data[ 'id' ] )
        if not isOrgAllowed( aid.org_id ):
            raise web.HTTPError( '401 Unauthorized' )

        info = blink.request( 'get_host_blink', { 'aid' : aid, 'after' : params.after, 'before' : params.before } )

        if not info.isSuccess:
            raise web.HTTPError( '503 Service Unavailable: %s' % str( info ) )

        return info.data
项目:lc_cloud    作者:refractionPOINT    | 项目源码 | 文件源码
def doGET( self ):
        params = web.input( ip = None, before = None, after = None )

        usage = model.request( 'get_ip_usage', { 'ip' : params.ip, 
                                                 'after' : params.after, 
                                                 'before' : params.before, 
                                                 'oid' : session.orgs } )
        if not usage.isSuccess:
            raise web.HTTPError( '503 Service Unavailable: %s' % str( usage ) )

        usage.data[ 'usage' ] = sorted( usage.data[ 'usage' ], key = lambda x: x[ 0 ], reverse = True )

        return usage.data
项目:lc_cloud    作者:refractionPOINT    | 项目源码 | 文件源码
def doPOST( self ):
        params = web.input( oid = None, iid = None, tags = '', desc = '' )

        try:
            oid = uuid.UUID( params.oid )
        except:
            oid = None

        try:
            if params.iid is not None:
                iid = uuid.UUID( params.iid )
            else:
                # If no IID is present it indicates to create a new one.
                iid = None
        except:
            iid = None

        if oid is None:
            raise web.HTTPError( '400 Bad Request: oid required' )

        if not isOrgAllowed( oid ):
            raise web.HTTPError( '401 Unauthorized' )

        tags = [ x.strip() for x in params.tags.split( ',' ) ]

        resp = deployment.request( 'set_installer_info', { 'oid' : oid, 
                                                           'iid' : iid,
                                                           'tags' : tags,
                                                           'desc' : params.desc } )
        if resp.isSuccess:
            redirectTo( 'manage' )
        else:
            raise web.HTTPError( '503 Service Unavailable: %s' % str( resp ) )
项目:lc_cloud    作者:refractionPOINT    | 项目源码 | 文件源码
def doPOST( self ):
        params = web.input( oid = None, iid = None )

        try:
            oid = uuid.UUID( params.oid )
        except:
            oid = None

        try:
            iid = uuid.UUID( params.iid )
        except:
            iid = None

        if oid is None:
            raise web.HTTPError( '400 Bad Request: oid required' )

        if iid is None:
            raise web.HTTPError( '400 Bad Request: iid required' )

        if not isOrgAllowed( oid ):
            raise web.HTTPError( '401 Unauthorized' )

        resp = deployment.request( 'del_installer', { 'oid' : oid, 'iid' : iid } )
        if resp.isSuccess:
            redirectTo( 'manage' )
        else:
            raise web.HTTPError( '503 Service Unavailable: %s' % str( resp ) )
项目:optf2    作者:Lagg    | 项目源码 | 文件源码
def __init__(self, status, message = None):
        headers = {"Content-Type": jsonMimeType}
        web.HTTPError.__init__(self, status, headers, message or "{}")
项目:optf2    作者:Lagg    | 项目源码 | 文件源码
def __init__(self, message = None):
        status = "404 Not Found"
        headers = {"Content-Type": "application/rss+xml"}
        web.HTTPError.__init__(self, status, headers, message)
项目:optf2    作者:Lagg    | 项目源码 | 文件源码
def GET(self, app, user, cid = None):
        app = models.app_aliases.get(app, app)
        self._cid = cid
        markup.init_theme(app)
        try:
            userp = models.user(user).load()
            pack = models.inventory(userp, scope = app).load()
            items = pack["items"].values()
            equippeditems = {}
            classmap = set()
            slotlist = []
            self._app = app

            markup.set_navlink(markup.generate_root_url("loadout/{0}".format(userp["id64"]), app))

            # initial normal items
            try:
                sitems = models.schema(scope = app).processed_items.values()
                normalitems = views.filtering(sitems).byQuality("normal")
                equippeditems, slotlist, classmap = self.build_loadout(normalitems, equippeditems, slotlist, classmap)
            except models.CacheEmptyError:
                pass

            # Real equipped items
            equippeditems, slotlist, classmap = self.build_loadout(items, equippeditems, slotlist, classmap)

            return template.loadout(app, userp, equippeditems, sorted(classmap), self._slots_sorted + sorted(slotlist), cid)
        except steam.items.InventoryError as E:
            raise web.NotFound(template.errors.generic("Backpack error: {0}".format(E)))
        except steam.user.ProfileError as E:
            raise web.NotFound(template.errors.generic("Profile error: {0}".format(E)))
        except steam.api.HTTPError as E:
            raise web.NotFound(template.errors.generic("Couldn't connect to Steam (HTTP {0})".format(E)))
        except models.ItemBackendUnimplemented:
            raise web.NotFound(template.errors.generic("No backend found to handle loadouts for these items"))
项目:optf2    作者:Lagg    | 项目源码 | 文件源码
def GET(self, app, iid):
        user = None

        markup.init_theme(app)

        try:
            sitems = models.schema(scope = app).processed_items
            item = sitems[iid]

            if web.input().get("contents"):
                contents = item.get("contents")
                if contents:
                    item = contents
        except steam.api.HTTPError as E:
            raise web.NotFound(template.errors.generic("Couldn't connect to Steam (HTTP {0})".format(E)))
        except steam.items.SchemaError as E:
            raise web.NotFound(template.errors.generic("Couldn't open schema: {0}".format(E)))
        except KeyError:
            raise web.NotFound(template.item_error_notfound(iid))
        except models.CacheEmptyError as E:
            raise web.NotFound(template.errors.generic(E))
        except models.ItemBackendUnimplemented:
            raise web.NotFound(template.errors.generic("No backend found to handle the given item, this could mean that the item has no available associated schema (yet)"))

        caps = markup.get_capability_strings(item.get("caps", []))

        try:
            assets = models.assets(scope = app).price_map
            price = markup.generate_item_price_string(item, assets)
        except models.CacheEmptyError:
            price = None

        # Strip off quality prefix for possessive name
        itemname = item["mainname"]
        if itemname.startswith("The "):
            item["ownedname"] = itemname[4:]
        else:
            item["ownedname"] = itemname

        return template.item(app, user, item, price = price, caps = caps)
项目:optf2    作者:Lagg    | 项目源码 | 文件源码
def GET(self, app, user, iid):
        markup.init_theme(app)

        try:
            user, items = models.load_inventory(user, scope = app)
        except steam.api.HTTPError as E:
            raise web.NotFound(template.errors.generic("Couldn't connect to Steam (HTTP {0})".format(E)))
        except steam.user.ProfileError as E:
            raise web.NotFound(template.errors.generic("Can't retrieve user profile data: {0}".format(E)))
        except steam.items.InventoryError as E:
            raise web.NotFound(template.errors.generic("Couldn't open backpack: {0}".format(E)))

        item = None
        try:
            item = items["items"][iid]
        except KeyError:
            for cid, bpitem in items["items"].iteritems():
                oid = bpitem.get("oid")
                if oid == long(iid):
                    item = bpitem
                    break
            if not item:
                raise web.NotFound(template.item_error_notfound(iid))

        if web.input().get("contents"):
            contents = item.get("contents")
            if contents:
                item = contents

        # Strip off quality prefix for possessive name
        itemname = item["mainname"]
        if itemname.startswith("The "):
            item["ownedname"] = itemname[4:]
        else:
            item["ownedname"] = itemname

        return template.item(app, user, item)
项目:opennumber    作者:opennumber    | 项目源码 | 文件源码
def __init__(self, url, absolute=False):

        status = '303 See Other'
        newloc = url

        home = web.ctx.environ['HTTP_ORIGIN']
        newloc = urlparse.urljoin(home, url)
        logger.info('seeother: %s', newloc)
        headers = {
            'Content-Type': 'text/html',
            'Location': newloc
        }
        web.webapi.HTTPError.__init__(self, status, headers, "")
        pass
项目:opennumber    作者:opennumber    | 项目源码 | 文件源码
def GET(self, *args, **kwargs):
        """
        """
        response = Response.internal_error()
        try:
            self.log_request()
            with models.Session() as orm:
                web.ctx.orm = orm
                response =  self.get(*args, **kwargs)
                return response
        except:
            logger.exception('BaseHandler failure:')
            status = '500 InternalError'
            headers = {'Content-Type': 'text/html'}
            raise web.HTTPError(status, headers, 'internal error')
项目:social-examples    作者:python-social-auth    | 项目源码 | 文件源码
def load_sqla(handler):
    web.ctx.orm = scoped_session(sessionmaker(bind=engine))
    try:
        return handler()
    except web.HTTPError:
        web.ctx.orm.commit()
        raise
    except:
        web.ctx.orm.rollback()
        raise
    finally:
        web.ctx.orm.commit()
项目:annotated-py-tornado    作者:hhstore    | 项目源码 | 文件源码
def get(self, bucket_name):
        prefix = self.get_argument("prefix", u"")
        marker = self.get_argument("marker", u"")
        max_keys = int(self.get_argument("max-keys", 50000))
        path = os.path.abspath(os.path.join(self.application.directory,
                                            bucket_name))
        terse = int(self.get_argument("terse", 0))
        if not path.startswith(self.application.directory) or \
           not os.path.isdir(path):
            raise web.HTTPError(404)
        object_names = []
        for root, dirs, files in os.walk(path):
            for file_name in files:
                object_names.append(os.path.join(root, file_name))
        skip = len(path) + 1
        for i in range(self.application.bucket_depth):
            skip += 2 * (i + 1) + 1
        object_names = [n[skip:] for n in object_names]
        object_names.sort()
        contents = []

        start_pos = 0
        if marker:
            start_pos = bisect.bisect_right(object_names, marker, start_pos)
        if prefix:
            start_pos = bisect.bisect_left(object_names, prefix, start_pos)

        truncated = False
        for object_name in object_names[start_pos:]:
            if not object_name.startswith(prefix):
                break
            if len(contents) >= max_keys:
                truncated = True
                break
            object_path = self._object_path(bucket_name, object_name)
            c = {"Key": object_name}
            if not terse:
                info = os.stat(object_path)
                c.update({
                    "LastModified": datetime.datetime.utcfromtimestamp(
                        info.st_mtime),
                    "Size": info.st_size,
                })
            contents.append(c)
            marker = object_name
        self.render_xml({"ListBucketResult": {
            "Name": bucket_name,
            "Prefix": prefix,
            "Marker": marker,
            "MaxKeys": max_keys,
            "IsTruncated": truncated,
            "Contents": contents,
        }})