我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用web.application()。
def app(self, dispatch, parsed): # Make sure we get an argh-like object here that has a dispatch object if dispatch is None: if not hasattr(self._parser, 'dispatch'): raise ValueError("Can't dispatch a non dispatchable parser without a dispatch method") dispatch = self._parser.dispatch parsed = False class WebuiPageWrapper(page.WebuiPage): _parser = self._parser _dispatch = dispatch _parsed = parsed urls = ('/', 'index') classes = {'index': WebuiPageWrapper} return web.application(urls, classes)
def GET(self): """ list all rucio accounts. HTTP Success: 200 OK HTTP Error: 401 Unauthorized 500 InternalError :param Rucio-Account: Account identifier. :param Rucio-Auth-Token: as an 32 character hex string. :returns: A list containing all account names as dict. """ header('Content-Type', 'application/x-json-stream') filter = {} if ctx.query: filter = dict(parse_qsl(ctx.query[1:])) for account in list_accounts(filter=filter): yield render_json(**account) + "\n"
def GET(self, account): """ Return the account usage of the account. HTTP Success: 200 OK HTTP Error: 401 Unauthorized 404 Not Found :param account: The account name. """ header('Content-Type', 'application/x-json-stream') try: for usage in get_account_usage(account=account, rse=None, issuer=ctx.env.get('issuer')): yield dumps(usage, cls=APIEncoder) + '\n' except AccountNotFound, e: raise generate_http_error(404, 'AccountNotFound', e.args[0][0]) except AccessDenied, e: raise generate_http_error(401, 'AccessDenied', e.args[0][0]) except Exception, e: print format_exc() raise InternalError(e)
def GET(self, rule_id): """ get rule information for given rule id. HTTP Success: 200 OK HTTP Error: 401 Unauthorized 404 Not Found 500 InternalError :returns: JSON dict containing informations about the requested user. """ header('Content-Type', 'application/json') try: rule = get_replication_rule(rule_id) except RuleNotFound as error: raise generate_http_error(404, 'RuleNotFound', error.args[0][0]) except RucioException as error: raise generate_http_error(500, error.__class__.__name__, error.args[0]) except Exception as error: raise InternalError(error) return render_json(**rule)
def GET(self, rule_id): """ get locks for a given rule_id. HTTP Success: 200 OK HTTP Error: 404 Not Found 500 InternalError :returns: JSON dict containing informations about the requested user. """ header('Content-Type', 'application/x-json-stream') try: locks = get_replica_locks_for_rule_id(rule_id) except RucioException as error: raise generate_http_error(500, error.__class__.__name__, error.args[0]) except Exception as error: raise InternalError(error) for lock in locks: yield dumps(lock, cls=APIEncoder) + '\n'
def GET(self, rule_id): """ get history for a given rule_id. HTTP Success: 200 OK HTTP Error: 404 Not Found 500 InternalError :returns: JSON dict containing informations about the requested user. """ header('Content-Type', 'application/x-json-stream') try: history = list_replication_rule_history(rule_id) except RucioException as error: raise generate_http_error(500, error.__class__.__name__, error.args[0]) except Exception as error: raise InternalError(error) for hist in history: yield dumps(hist, cls=APIEncoder) + '\n'
def GET(self, scope, name): """ get history for a given DID. HTTP Success: 200 OK HTTP Error: 404 Not Found 500 InternalError :returns: JSON dict containing informations about the requested user. """ header('Content-Type', 'application/x-json-stream') try: history = list_replication_rule_full_history(scope, name) except RucioException as error: raise generate_http_error(500, error.__class__.__name__, error.args[0]) except Exception as error: raise InternalError(error) for hist in history: yield dumps(hist, cls=APIEncoder) + '\n'
def GET(self, rule_id): """ get analysis for given rule. HTTP Success: 200 OK HTTP Error: 404 Not Found 500 InternalError :returns: JSON dict containing informations about the requested user. """ header('Content-Type', 'application/x-json-stream') try: analysis = examine_replication_rule(rule_id) except RucioException as error: raise generate_http_error(500, error.__class__.__name__, error.args[0]) except Exception as error: raise InternalError(error) return render_json(**analysis)
def GET(self, account, name=None): """ Return a summary of the states of all rules of a given subscription id. HTTP Success: 200 OK HTTP Error: 404 Not Found 500 Internal Error """ header('Content-Type', 'application/x-json-stream') try: for row in list_subscription_rule_states(account=account): yield dumps(row, cls=APIEncoder) + '\n' except RucioException as error: raise generate_http_error(500, error.__class__.__name__, error.args[0]) except Exception as error: raise InternalError(error)
def GET(self, subscription_id): """ Retrieve a subscription matching the given subscription id HTTP Success: 200 OK HTTP Error: 404 Not Found 401 Unauthorized """ header('Content-Type', 'application/json') try: subscription = get_subscription_by_id(subscription_id) except SubscriptionNotFound as error: raise generate_http_error(404, 'SubscriptionNotFound', error.args[0][0]) except RucioException as error: raise generate_http_error(500, error.__class__.__name__, error.args[0]) except Exception as error: raise InternalError(error) return render_json(**subscription)
def GET(self, rse): """ List dataset replicas replicas. HTTP Success: 200 OK HTTP Error: 401 Unauthorized 500 InternalError :returns: A dictionary containing all replicas on the RSE. """ header('Content-Type', 'application/x-json-stream') try: for row in list_datasets_per_rse(rse=rse): yield dumps(row, cls=APIEncoder) + '\n' except RucioException, e: raise generate_http_error(500, e.__class__.__name__, e.args[0][0]) except Exception, e: print format_exc() raise InternalError(e)
def GET(self, rse): """ list all RSE attributes for a RSE. HTTP Success: 200 OK HTTP Error: 401 Unauthorized 404 Not Found 500 InternalError :param rse: RSE name. :returns: A list containing all RSE attributes. """ header('Content-Type', 'application/json') return dumps(list_rse_attributes(rse))
def GET(self, rse): """ Get RSE usage information. :param rse: the RSE name. """ header('Content-Type', 'application/x-json-stream') source = None if ctx.query: params = parse_qs(ctx.query[1:]) if 'source' in params: source = params['source'][0] try: for usage in list_rse_usage_history(rse=rse, issuer=ctx.env.get('issuer'), source=source): yield render_json(**usage) + '\n' except RSENotFound, error: raise generate_http_error(404, 'RSENotFound', error[0][0]) except RucioException, error: raise generate_http_error(500, error.__class__.__name__, error.args[0][0]) except Exception, error: print format_exc() raise InternalError(error)
def GET(self, rse): """ Get account usage and limit for one RSE. :param rse: the RSE name. """ header('Content-Type', 'application/json') try: usage = get_rse_account_usage(rse=rse) for row in usage: yield dumps(row, cls=APIEncoder) + '\n' except RSENotFound, error: raise generate_http_error(404, 'RSENotFound', error[0][0]) except RucioException, error: raise generate_http_error(500, error.__class__.__name__, error.args[0][0]) except Exception, error: print format_exc() raise InternalError(error)
def GET(self, scope, name): """ List all parents of a data identifier. HTTP Success: 200 OK HTTP Error: 401 Unauthorized 500 InternalError :returns: A list of dictionary containing all dataset information. """ header('Content-Type', 'application/x-json-stream') try: for dataset in list_parent_dids(scope=scope, name=name): yield render_json(**dataset) + "\n" except DataIdentifierNotFound, error: raise generate_http_error(404, 'DataIdentifierNotFound', error.args[0][0]) except RucioException, error: raise generate_http_error(500, error.__class__.__name__, error.args[0][0]) except Exception, error: print format_exc() raise InternalError(error)
def GET(self, scope, name): """ Return all rules of a given DID. HTTP Success: 200 OK HTTP Error: 401 Unauthorized 404 Not Found :param scope: The scope name. """ header('Content-Type', 'application/x-json-stream') try: for rule in list_replication_rules({'scope': scope, 'name': name}): yield dumps(rule, cls=APIEncoder) + '\n' except RuleNotFound, error: raise generate_http_error(404, 'RuleNotFound', error.args[0][0]) except RucioException, error: raise generate_http_error(500, error.__class__.__name__, error.args[0]) except Exception, error: raise InternalError(error)
def GET(self, guid): """ Return the file associated to a GUID. HTTP Success: 200 OK HTTP Error: 401 Unauthorized 404 Not Found :param scope: The scope name. """ header('Content-Type', 'application/x-json-stream') try: for dataset in get_dataset_by_guid(guid): yield dumps(dataset, cls=APIEncoder) + '\n' except DataIdentifierNotFound, error: raise generate_http_error(404, 'DataIdentifierNotFound', error.args[0][0]) except RucioException, error: raise generate_http_error(500, error.__class__.__name__, error.args[0]) except Exception, error: raise InternalError(error)
def GET(self): """ Returns list of recent identifiers. HTTP Success: 200 OK HTTP Error: 401 Unauthorized :param type: The DID type. """ header('Content-Type', 'application/x-json-stream') params = parse_qs(ctx.query[1:]) type = None if 'type' in params: type = params['type'][0] try: for did in list_new_dids(type): yield dumps(did, cls=APIEncoder) + '\n' except RucioException, error: raise generate_http_error(500, error.__class__.__name__, error.args[0]) except Exception, error: raise InternalError(error)
def GET(self): """ Retrieve all exceptions. HTTP Success: 200 OK HTTP Error: 404 Not Found 500 Internal Error """ header('Content-Type', 'application/x-json-stream') try: for exception in list_exceptions(): yield dumps(exception, cls=APIEncoder) + '\n' except LifetimeExceptionNotFound as error: raise generate_http_error(404, 'LifetimeExceptionNotFound', error.args[0][0]) except RucioException as error: raise generate_http_error(500, error.__class__.__name__, error.args[0]) except Exception as error: raise InternalError(error)
def GET(self, exception_id): """ Retrieve an exception. HTTP Success: 200 OK HTTP Error: 404 Not Found 500 Internal Error """ header('Content-Type', 'application/json') try: for exception in list_exceptions(exception_id): yield dumps(exception, cls=APIEncoder) + '\n' except LifetimeExceptionNotFound as error: raise generate_http_error(404, 'LifetimeExceptionNotFound', error.args[0][0]) except RucioException as error: raise generate_http_error(500, error.__class__.__name__, error.args[0]) except Exception as error: raise InternalError(error)
def GET(self, section): """ List configuration of a section HTTP Success: 200 OK HTTP Error: 401 Unauthorized 404 NotFound """ header('Content-Type', 'application/json') res = {} for item in config.items(section, issuer=ctx.env.get('issuer')): res[item[0]] = item[1] if res == {}: raise generate_http_error(404, 'ConfigNotFound', 'No configuration found for section \'%s\'' % section) return json.dumps(res)
def GET(self, scope, name, rse): """ List request for given DID to a destination RSE. HTTP Success: 200 OK HTTP Error: 401 Unauthorized 404 Request Not Found """ header('Content-Type', 'application/json') try: return json.dumps(request.get_request_by_did(scope=scope, name=name, rse=rse, issuer=ctx.env.get('issuer')), cls=APIEncoder) except: raise generate_http_error(404, 'RequestNotFound', 'No request found for DID %s:%s at RSE %s' % (scope, name, rse))
def run(urls, fvars, *middleware): setup_database() def stdout_processor(handler): handler() return web.ctx.get('output', '') def hook_processor(handler): for h in web.loadhooks.values() + web._loadhooks.values(): h() output = handler() for h in web.unloadhooks.values(): h() return output app = web.application(urls, fvars) app.add_processor(stdout_processor) app.add_processor(hook_processor) app.run(*middleware)
def testRedirect(self): urls = ( "/a", "redirect /hello/", "/b/(.*)", r"redirect /hello/\1", "/hello/(.*)", "hello" ) app = web.application(urls, locals()) class hello: def GET(self, name): name = name or 'world' return "hello " + name response = app.request('/a') self.assertEquals(response.status, '301 Moved Permanently') self.assertEquals(response.headers['Location'], 'http://0.0.0.0:8080/hello/') response = app.request('/a?x=2') self.assertEquals(response.status, '301 Moved Permanently') self.assertEquals(response.headers['Location'], 'http://0.0.0.0:8080/hello/?x=2') response = app.request('/b/foo?x=2') self.assertEquals(response.status, '301 Moved Permanently') self.assertEquals(response.headers['Location'], 'http://0.0.0.0:8080/hello/foo?x=2')
def test_subdirs(self): urls = ( "/(.*)", "blog" ) class blog: def GET(self, path): return "blog " + path app_blog = web.application(urls, locals()) urls = ( "/blog", app_blog, "/(.*)", "index" ) class index: def GET(self, path): return "hello " + path app = web.application(urls, locals()) self.assertEquals(app.request('/blog/foo').data, 'blog foo') self.assertEquals(app.request('/foo').data, 'hello foo') def processor(handler): return web.ctx.path + ":" + handler() app.add_processor(processor) self.assertEquals(app.request('/blog/foo').data, '/blog/foo:blog foo')
def test_subdomains(self): def create_app(name): urls = ("/", "index") class index: def GET(self): return name return web.application(urls, locals()) urls = ( "a.example.com", create_app('a'), "b.example.com", create_app('b'), ".*.example.com", create_app('*') ) app = web.subdomain_application(urls, locals()) def test(host, expected_result): result = app.request('/', host=host) self.assertEquals(result.data, expected_result) test('a.example.com', 'a') test('b.example.com', 'b') test('c.example.com', '*') test('d.example.com', '*')
def testUnload(self): x = web.storage(a=0) urls = ( "/foo", "foo", "/bar", "bar" ) class foo: def GET(self): return "foo" class bar: def GET(self): raise web.notfound() app = web.application(urls, locals()) def unload(): x.a += 1 app.add_processor(web.unloadhook(unload)) app.request('/foo') self.assertEquals(x.a, 1) app.request('/bar') self.assertEquals(x.a, 2)
def test_stopsimpleserver(self): urls = ( '/', 'index', ) class index: def GET(self): pass app = web.application(urls, locals()) thread = threading.Thread(target=app.run) thread.start() time.sleep(1) self.assertTrue(thread.isAlive()) app.stop() thread.join(timeout=1) self.assertFalse(thread.isAlive())
def GET(self): # one ip can start one game at time ip = web.ctx.ip try: service = Game.manager.get_service_by_key(ip) except KeyError: # register game service Game.manager.register_service(ip) service = Game.manager.get_service_by_key(ip) # get players' names data = web.input() players_names = [data[str(x)] for x in range(len(data))] # init game service service.init_game(players_names) messages = [service.map_describe()] response = { 'current_player': json.loads(json.dumps(service.current_player,cls=PlayerEncoder)), 'messages': messages } web.header('Content-Type', 'application/json') return json.dumps(response)
def search_simple_index(query, offset, count, draw): """ This function is responsible for hitting the solr endpoint and returning the results back. """ results = SOLR_SIMPLEINDEX.search(q=query, **{ 'start': int(offset), 'rows': int(count) }) print("Saw {0} result(s) for query {1}.".format(len(results), query)) formatted_hits = [] for hit in results.docs: formatted_hits.append( [hit['_news_title'], hit['_news_publisher'], CATEGORY[hit['_news_category'][0]], hit['_news_url']]) response = {'draw': draw, 'recordsFiltered': results.hits, 'data': formatted_hits} web.header('Content-Type', 'application/json') return json.dumps(response)
def search_entity_aware_index(query, offset, count, draw, qf): """ This function is responsible for hitting the solr endpoint and returning the results back. """ results = SOLR_ENTITYAWAREINDEX.search(q=query, **{ 'start': int(offset), 'rows': int(count), 'qf': qf }) print("Saw {0} result(s) for query {1}.".format(len(results), query)) formatted_hits = [] for hit in results.docs: formatted_hits.append( [hit['_news_title'], hit['_news_publisher'], CATEGORY[hit['_news_category'][0]], hit['_news_url']]) response = {'draw': draw, 'recordsFiltered': results.hits, 'data': formatted_hits} web.header('Content-Type', 'application/json') return json.dumps(response)
def search(query, offset, count, draw, solr_endpoint): """ This function is responsible for hitting the solr endpoint and returning the results back. """ results = solr_endpoint.search(q=query, **{ 'start': int(offset), 'rows': int(count) }) print("Saw {0} result(s) for query {1}.".format(len(results), query)) formatted_hits = [] for hit in results.docs: formatted_hits.append( [hit['_news_title'], hit['_news_publisher'], CATEGORY[hit['_news_category'][0]], hit['_news_url']]) response = {'draw': draw, 'recordsFiltered': results.hits, 'data': formatted_hits} web.header('Content-Type', 'application/json') return json.dumps(response)
def search_simple_index(query, offset, count, draw): """ This function is responsible for hitting the solr endpoint and returning the results back. """ results = SOLR_SIMPLEINDEX.search(q=query, **{ 'start': int(offset), 'rows': int(count), 'cache': 'false' }) print("Saw {0} result(s) for query {1}.".format(len(results), query)) formatted_hits = [] for hit in results.docs: formatted_hits.append( [hit['_news_title'], hit['_news_publisher'], CATEGORY[hit['_news_category'][0]], hit['_news_url']]) response = {'draw': draw, 'recordsFiltered': results.hits, 'data': formatted_hits} web.header('Content-Type', 'application/json') return json.dumps(response)
def __init__(self): web.application.__init__(self) self.__name__ = self self.app = None self.lock = threading.Lock()
def __init__(self, port=8080, host='0.0.0.0'): # pragma: no cover d_client = docker.from_env() d_client.images.pull('cyberreboot/vent-ncapture', tag='master') nf_inst = NControl() urls = nf_inst.urls() app = web.application(urls, globals()) web.httpserver.runsimple(app.wsgifunc(), (host, port))
def start_web_app(): """ starts the web app in a TestApp for testing """ nf_inst = ncontrol.NControl() urls = nf_inst.urls() app = web.application(urls, globals()) test_app = TestApp(app.wsgifunc()) return test_app
def test_create_r(): """ tests the restful endpoint: create """ # get web app test_app = start_web_app() # test create r = test_app.post('/create', params={'id': 'foo', 'interval': '60', 'filter': '', 'nic': 'eth1'}, headers={'Content-Type': 'application/json'}) assert r.status == 200 r = test_app.post('/create', params={'id': 'foo', 'interval': '60', 'iters': '1', 'filter': '', 'nic': 'eth1'}, headers={'Content-Type': 'application/json'}) assert r.status == 200 r = test_app.post('/create', params={}) assert r.status == 200 r = test_app.post('/create', params={'nic': 'eth1'}) assert r.status == 200 r = test_app.post('/create', params={'nic': 'eth1', 'id': 'foo'}) assert r.status == 200 r = test_app.post( '/create', params={'nic': 'eth1', 'id': 'foo', 'interval': '61'}) assert r.status == 200 r = test_app.post('/create', params='{}') assert r.status == 200 r = test_app.post('/create', params={'id': 'foo', 'interval': '60', 'filter': '', 'metadata': '{"foo": "bar"}', 'iters': '1', 'nic': 'eth1'}, headers={'Content-Type': 'application/json'}) assert r.status == 200
def startServer(self): sys.argv.append('0.0.0.0:%s' % config.PORT) app = web.application(self.urls, globals()) app.run()
def set_json_response(): """ ????content-type?json :return: """ web.header('content-type', 'application/json;charset=utf-8', unique=True)
def __init__( self, title, port, main_page ): self.title = title portal_pages['main/structure'] = main_page main_page.addToPortal( self ) self.app = web.application( self.urls, globals() )
def GET( self, x ): print 'in structure: '+x web.header('Content-Type', 'application/json') layout = portal_pages[x] return '{"layout":'+ layout.to_JSON() +'}'
def GET(self): web.header('Content-Type', "application/json") return json.dumps({'status': 'ok'})
def POST(self): data = web.input() if not ("ident" in data and "csr" in data): raise web.BadRequest() crt = sign_csr(data.csr, data.ident) web.header('Content-Type', "application/json") return json.dumps({'certificate': crt})
def GET(self): web.header('Content-Type','application/json') topic = web.input(value=' ') try: output = run_model.model(requirement = [topic.value]) except: output = 'Invalid query' pass return output
def POST(self): global modelCache params = json.loads(web.data()) requestInput = web.input() id = requestInput["id"] # We will always return the active cells because they are cheap. returnSnapshots = [TM_SNAPS.ACT_CELLS] from pprint import pprint; pprint(params) tm = TM(**params) tmFacade = TmFacade(tm, ioClient, modelId=id) modelId = tmFacade.getId() modelCache[modelId]["tm"] = tmFacade modelCache[modelId]["classifier"] = SDRClassifierFactory.create(implementation="py") modelCache[modelId]["recordsSeen"] = 0 print "Created TM {}".format(modelId) payload = { "meta": { "id": modelId, "saving": returnSnapshots } } tmState = tmFacade.getState(*returnSnapshots) for key in tmState: payload[key] = tmState[key] web.header("Content-Type", "application/json") return json.dumps(payload)
def run(self): port = 8080 app = web.application(self.urls, { 'bot':self.bot, 'bot_commands':self.bot_commands, 'bot_status_all':self.bot_status_all}) web.httpserver.runsimple(app.wsgifunc(), ("0.0.0.0", self.port))
def start_api_server(): sys.argv.append('0.0.0.0:%s' % config.API_PORT) app = web.application(urls, globals()) app.run()
def main(): # arguments = parse_arguments() app = web.application(urls, globals()) print 'Starting daemons for updating the cache and confidence rating..' costi_api.start_update_daemon() costi_api.start_rating_daemon() print 'Start server..' app.run()
def GET(self): all_col = ('name','description','owner','family_id','time','change_log') citype_input = web.input() condition = " " for col in range(len(all_col)): col_name = all_col[col] value = citype_input.get(col_name,None) if value <> None: if col_name == 'time' : condition = condition + "ct.starttime <= '" + value + "' and ct.endtime > '" + value + "' and " else : condition = condition + "ct." + col_name + " = '" + value + "' and " if value == None and col_name == 'time': condition = condition + "ct.endtime = '" + ENDTIME + "' and " v_sql = "select ct.name , convert(ct.description,'utf8') description,ct.owner,ct.family_id,convert(ct.displayname,'utf8') " \ "displayname,ct.change_log from t_ci_type ct where " + condition + "1=1" #v_sql = "select ct.name , ct.description,ct.owner,ct.family_id,ct.displayname,ct.change_log from t_ci_type ct where " + condition + "1=1" ci_type = db.query(v_sql) ci_as_dict = [] for ci in ci_type: ci_as_dict.append(ci) ci_type_json = json.dumps(ci_as_dict, indent = 4,ensure_ascii=False, separators = (',',':')).decode("GB2312") #Type is unicode # import sys,httplib, urllib # params = urllib.urlencode({'fid':'FCIT00000004','change_log':'test'}) # headers = {'Content-type': 'application/x-www-form-urlencoded', 'Accept': 'text/plain'} # con2 = httplib.HTTPConnection("localhost:8080") # con2.request("DELETE","/citype",params,headers) # con2.close() return ci_type_json
def GET(self): all_col = ('value','description','type_fid','ci_fid','owner','family_id','time','change_log','citype_name','ci_name','ciat_name','value_type') ci_input = web.input() condition = " " for col in range(len(all_col)): col_name = all_col[col] value = ci_input.get(col_name,None) if value <> None: if col_name == 'time' : condition = condition + "a.starttime <= '" + value + "' and a.endtime > '" + value + "' and b.starttime <= '" + value + "' and b.endtime > '" + value + "' and c.starttime <= '" + value + "' and c.endtime > '" + value + "' and d.starttime <= '" + value + "' and d.endtime > '" + value + "' and " elif col_name == 'citype_name': condition = condition + "d.name = '" + value + "' and " elif col_name == 'ci_name': condition = condition + "b.name = '" + value + "' and " elif col_name == 'ciat_name': condition = condition + "c.name = '" + value + "' and " elif col_name == 'value_type': condition = condition + "c.value_type = '" + value + "' and " else : condition = condition + "a." + col_name + " = '" + value + "' and " if value == None and col_name == 'time': condition = condition + "a.endtime = '" + ENDTIME + "' and b.endtime = '" + ENDTIME + "' and c.endtime = '" + ENDTIME + "' and d.endtime = '" + ENDTIME + "' and " v_sql = "select distinct d.name citype_name, b.name ci_name, c.name ciat_name, c.value_type, a.value, convert(a.description,'utf8') description, a.type_fid, a.ci_fid, a.owner, a.family_id, a.change_log from t_ci_attribute a, t_ci b, t_ci_attribute_type c, t_ci_type d where " + condition + "a.type_fid = c.family_id and a.ci_fid = b.family_id and b.type_fid = d.family_id and c.ci_type_fid = d.family_id " ci_recs = db.query(v_sql) ci_as_dict = [] for ci in ci_recs: ci_as_dict.append(ci) ci_json = json.dumps(ci_as_dict, indent = 4,ensure_ascii=False, separators = (',',':')).decode("GB2312") # import sys,httplib, urllib # params = urllib.urlencode({'fid':'FCAD00000002','change_log':'test'}) # headers = {'Content-type': 'application/x-www-form-urlencoded', 'Accept': 'text/plain'} # con2 = httplib.HTTPConnection("localhost:8080") # con2.request("DELETE","/ciattr",params,headers) # con2.close() return ci_json