我们从Python开源项目中,提取了以下45个代码示例,用于说明如何使用bottle.response.status()。
def get_main(): services = [] for service in config.sections(): service_status = get_service_action(service, 'status') if service_status['status'] == 'not-found': cls = 'active' elif service_status['status'] == 'inactive' or service_status['status'] == 'failed': cls = 'danger' elif service_status['status'] == 'active': cls = 'success' else: cls = 'warning' disabled_start = True if cls == 'active' or cls == 'success' else False disabled_stop = True if cls == 'active' or cls == 'danger' else False disabled_restart = True if cls == 'active' or cls == 'danger' else False services.append({'class': cls, 'disabled_start': disabled_start, 'disabled_stop': disabled_stop, 'disabled_restart': disabled_restart, 'title': config.get(service, 'title'), 'service': service}) return template('index', hostname=gethostname(), services=services)
def http(host, port): from bottle import route, response, run @route('/') def root(): return http_root_str @route('/<s:path>') def do_conversion(s): try: i = parse_input(s) except ValueError as e: response.status = 400 return str(e) return convert(i) run(host=host, port=port)
def dtos_get_films_with_actors(): try: queryObject = QueryObject( filter = "ReleaseYear='{0}'".format(request.query['releaseYear']), expand = ['FilmActors.Actor', 'FilmCategories'] ) resultSerialData = dataService.dataViewDto.getItems("Film", queryObject) return json.dumps(resultSerialData, cls=CustomEncoder, indent=2) except dalUtils.StatusCodeError as err: response.status = err.value except: abort(400, 'Bad Request') # POST: POST: api/datasource/crud/operations/dtos/TestAction # with: Content-Type: application/json and body - {"param1":1}
def post_batch_entityset(entitySetName): try: result = dataProviderDto.apiProvider.handleInsertEntityBatch(entitySetName, request.json, dataService) response.content_type = "application/json; charset=utf-8" return json.dumps(result, cls=CustomEncoder) except dalUtils.StatusCodeError as err: response.status = err.value except: abort(400, 'Bad Request') ## DELETE: api/datasource/crud/batch/:entitySetName #@delete('/api/datasource/crud/batch/<entitySetName>') #def delete_batch_entityset(entitySetName): # try: # result = dataProviderDto.apiProvider.handleDeleteEntityBatch(entitySetName, request.json, dataService) # response.content_type = "application/json; charset=utf-8" # return json.dumps(result, cls=CustomEncoder) # except dalUtils.StatusCodeError as err: # response.status = err.value # except: # abort(400, 'Bad Request') # DELETE: api/datasource/crud/batch/:entitySetName?keys=key1:1,2,3,4;key2:4,5,6,7
def entities_get_films_with_actors(): try: queryObject = QueryObject( filter = "ReleaseYear='{0}'".format(request.query['releaseYear']), expand = ['FilmActors.Actor', 'FilmCategories'] ) resultSerialData = dataService.from_.remote.dtoView.Films.getItems(queryObject) return json.dumps(resultSerialData, cls=CustomEncoder, indent=2) except dalUtils.StatusCodeError as err: response.status = err.value except: abort(400, 'Bad Request') # POST: api/datasource/crud/operations/entities/TestAction # with: Content-Type: application/json and body - {"param1":1}
def _assert_admin(self, cert): if cert is not None: if isinstance(cert, bytes): cert = load_certificate(cert_bytes=cert) if isinstance(cert, x509.Certificate): host = cert.subject.get_attributes_for_oid(NameOID.COMMON_NAME)[0].value else: raise TypeError('cert must be a raw certificate in PEM or DER format or an x509.Certificate instance.') else: logger.warning('Admin command received by unauthentified host.') raise HTTPResponse( status=401, body={'message': 'Authentication required'} ) if host not in self.admin_hosts: logger.warning('Host %s unauthorized for admin commands.', host) raise HTTPResponse( status=403, body={'message': 'Host {} unauthorized for admin commands'.format(host)} )
def _error(error, code): """Return an error as json""" _error = { "status": str(code), "title": errors[code], "detail": error.body } traceback.print_exception(type(error), error, error.traceback) response.headers["Content-Type"] = "application/vnd.api+json" return response_object(errors=[_error])
def get_metadata(): if request.query.file_hash == '': response.status = 400 return jsonize({'message': 'file_hash parameter is missing'}) file_hash = clean_hash(request.query.file_hash) if not valid_hash(file_hash): response.status = 400 return jsonize({'message': 'Invalid hash format (use MD5, SHA1 or SHA2)'}) file_hash = get_file_id(file_hash) if file_hash is None: response.status = 404 return jsonize({'message': 'Metadata not found in the database'}) mdc = MetaController() res = mdc.read(file_hash) if res is None: log_event("metadata", file_hash) return dumps(change_date_to_str(res))
def last_uploaded(): number = request.query.get("n") if number is None: response.status = 400 return jsonize({"error": 1, "error_message": "Parameter n is missing"}) if number.isdigit() is False: response.status = 400 return jsonize({"error": 2, "error_message": "Parameter n must be a number"}) if int(number) == 0: return jsonize({"error": 3, "error_message": "Parameter n must be greater than zero."}) pc = PackageController() lasts = pc.last_updated(int(number)) for i in range(0, len(lasts)): # Convert datetime objects lasts[i] = change_date_to_str(lasts[i]) return jsonize(lasts)
def auth_server(): username = request.forms.get('username') if username == 'user_ok': return {'access_token': 'my-nifty-access-token'} elif username == 'auth_fail': response.status = 400 return {'error': 'errored with HTTP 400 on request'} elif username == 'create_fail': return {'access_token': 'the-token-with-which-create-will-fail'} elif username == 'delete_fail': return {'access_token': 'the-token-with-which-delete-will-fail'} elif username == 'empty_page': return {'access_token': 'the-token-which-causes-an-empty-page'} else: return {}
def create(): auth_token = request.headers.get('Authorization').split()[1] if auth_token == 'my-nifty-access-token': return {'response': 'Successful creation of user.'} elif auth_token == 'the-token-with-which-create-will-fail': response.status = 403 return {'error': 'Permission denied'} elif auth_token == 'the-token-which-causes-an-empty-page': response.status = 403 return "empty"
def delete(): auth_token = request.headers.get('Authorization').split()[1] if auth_token == 'my-nifty-access-token': return {'response': 'Successful deletion of user.'} elif auth_token == 'the-token-with-which-delete-will-fail': response.status = 403 return {'error': 'Permission denied'}
def get_service_action(service, action): if service in config.sections(): sdbus = systemdBus(True) if config.get('DEFAULT', 'scope', fallback='system') == 'user' else systemdBus() unit = config.get(service, 'unit') if action == 'start': return {action: 'OK'} if sdbus.start_unit(unit) else {action: 'Fail'} elif action == 'stop': return {action: 'OK'} if sdbus.stop_unit(unit) else {action: 'Fail'} elif action == 'restart': return {action: 'OK'} if sdbus.restart_unit(unit) else {action: 'Fail'} elif action == 'reload': return {action: 'OK'} if sdbus.reload_unit(unit) else {action: 'Fail'} elif action == 'reloadorrestart': return {action: 'OK'} if sdbus.reload_or_restart_unit(unit) else {action: 'Fail'} elif action == 'status': if sdbus.get_unit_load_state(unit) != 'not-found': return {action: str(sdbus.get_unit_active_state(unit))} else: return {action: 'not-found'} elif action == 'journal': return get_service_journal(service, 100) else: response.status = 400 return {'msg': 'Sorry, but cannot perform \'{}\' action.'.format(action)} else: response.status = 400 return {'msg': 'Sorry, but \'{}\' is not defined in config.'.format(service)}
def get_service_journal(service, lines): if service in config.sections(): if get_service_action(service, 'status')['status'] == 'not-found': return {'journal': 'not-found'} try: lines = int(lines) except Exception as e: response.status = 500 return {'msg': '{}'.format(e)} unit = config.get(service, 'unit') journal = Journal(unit) return {'journal': journal.get_tail(lines)} else: response.status = 400 return {'msg': 'Sorry, but \'{}\' is not defined in config.'.format(service)}
def get_service_journal_page(service): if service in config.sections(): if get_service_action(service, 'status')['status'] == 'not-found': abort(400,'Sorry, but service \'{}\' unit not found in system.'.format(config.get(service, 'title'))) journal_lines = get_service_journal(service, 100) return template('journal', hostname=gethostname(), service=config.get(service, 'title'), journal=journal_lines['journal']) else: abort(400, 'Sorry, but \'{}\' is not defined in config.'.format(service)) # Serve static content
def dtos_test_action(): try: param1 = request.json['param1'] # TODO: Add some actions in here except dalUtils.StatusCodeError as err: response.status = err.value except: abort(400, 'Bad Request')
def put_entityset(entitySetName): try: result = dataProviderDto.apiProvider.handleUpdateEntity(entitySetName, request.query, request.json, dataService) response.content_type = "application/json; charset=utf-8" return json.dumps(result, cls=CustomEncoder) except dalUtils.StatusCodeError as err: response.status = err.value except: abort(400, 'Bad Request') # PATCH: api/datasource/crud/:entitySetName?keys=key1:{key1} #@patch('/api/datasource/crud/<entitySetName>')
def patch_entityset(entitySetName): try: result = dataProviderDto.apiProvider.handleUpdateEntity(entitySetName, request.query, request.json, dataService) response.content_type = "application/json; charset=utf-8" return json.dumps(result, cls=CustomEncoder) except dalUtils.StatusCodeError as err: response.status = err.value except: abort(400, 'Bad Request') # POST: api/datasource/crud/:entitySetName
def post_entityset(entitySetName): # test1 = json.loads(request.body.read()) try: result = dataProviderDto.apiProvider.handleInsertEntity(entitySetName, request.json, dataService) response.content_type = "application/json; charset=utf-8" return json.dumps(result, cls=CustomEncoder) except dalUtils.StatusCodeError as err: response.status = err.value except: abort(400, 'Bad Request') # DELETE: api/datasource/crud/:entitySetName?keys=key1:{key1}
def put_batch_entityset(entitySetName): try: result = dataProviderDto.apiProvider.handleUpdateEntityBatch(entitySetName, request.json, dataService) response.content_type = "application/json; charset=utf-8" return json.dumps(result, cls=CustomEncoder) except dalUtils.StatusCodeError as err: response.status = err.value except: abort(400, 'Bad Request') # PATCH: api/datasource/crud/batch/:entitySetName #@patch('/api/datasource/crud/batch/<entitySetName>')
def patch_batch_entityset(entitySetName): try: result = dataProviderDto.apiProvider.handleUpdateEntityBatch(entitySetName, request.json, dataService) response.content_type = "application/json; charset=utf-8" return json.dumps(result, cls=CustomEncoder) except dalUtils.StatusCodeError as err: response.status = err.value except: abort(400, 'Bad Request') # POST: api/datasource/crud/batch/:entitySetName
def delete_batch_entityset(entitySetName): try: result = dataProviderDto.apiProvider.handleDeleteEntityBatch(entitySetName, request.query, dataService) response.content_type = "application/json; charset=utf-8" return json.dumps(result, cls=CustomEncoder) except dalUtils.StatusCodeError as err: response.status = err.value except: abort(400, 'Bad Request')
def _get_cert_config_if_allowed(self, domain, cert): if cert is not None: if isinstance(cert, bytes): cert = load_certificate(cert_bytes=cert) if isinstance(cert, x509.Certificate): host = cert.subject.get_attributes_for_oid(NameOID.COMMON_NAME)[0].value else: raise TypeError('cert must be a raw certificate in PEM or DER format or an x509.Certificate instance.') else: logger.warning('Request received for domain %s by unauthentified host.', domain) raise HTTPResponse( status=401, body={'message': 'Authentication required'} ) certconfig = self.certificates_config.match(domain) if certconfig: logger.debug('Domain %s matches pattern %s', domain, certconfig.pattern) if host in self.admin_hosts or host in certconfig.allowed_hosts: return certconfig else: logger.warning('Host %s unauthorized for domain %s.', host, domain) raise HTTPResponse( status=403, body={'message': 'Host {} unauthorized for domain {}'.format(host, domain)} ) else: logger.warning('No config matching domain %s found.', domain) raise HTTPResponse( status=404, body={'message': 'No configuration found for domain {}'.format(domain)} )
def _handle_auth(self): request_data = request.json csr = x509.load_pem_x509_csr(data=request_data['csr'].encode(), backend=default_backend()) # pylint: disable=unsubscriptable-object if not csr.is_signature_valid: raise HTTPResponse( status=400, body={'message': 'The certificate signing request signature is invalid.'} ) host = csr.subject.get_attributes_for_oid(NameOID.COMMON_NAME)[0].value csr_file = os.path.join(self.csr_path, "%s.csr" % (host)) crt_file = os.path.join(self.crt_path, "%s.crt" % (host)) if os.path.isfile(crt_file): crt = load_certificate(crt_file) if crt.public_key().public_numbers() == csr.public_key().public_numbers(): return { 'status': 'authorized', 'crt': dump_pem(crt).decode() } else: raise HTTPResponse( status=409, body={'message': 'Mismatch between the certificate signing request and the certificate.'} ) else: # Save CSR with open(csr_file, 'w') as f: f.write(csr.public_bytes(serialization.Encoding.PEM).decode()) response.status = 202 return { 'status': 'pending' }
def _handle_revoke_cert(self, domain): rawcert = request.environ['ssl_certificate'] self._assert_admin(rawcert) self.acmeproxy.revoke_certificate(domain) return { 'status': 'revoked' }
def _handle_delete_cert(self, domain): rawcert = request.environ['ssl_certificate'] self._assert_admin(rawcert) self.acmeproxy.delete_certificate(domain) return { 'status': 'deleted' }
def _handle_health_check(self): response.status = 200 return { 'status': 'alive' }
def delete_resources(): """Delete all resources.""" resources = get_resources() resources.clear() response.status = 204
def jsonp(data, callback): reply = {"status": "OK", "data": data} return callback + "([" + jsonize(reply) + "]);"
def api_process_file(): file_hash = clean_hash(request.query.file_hash) if len(file_hash) != 40: response.status = 400 return jsonize({'message': 'Invalid hash format (use sha1)'}) res = process_file(file_hash, True) if res is None: response.status = 404 return jsonize("File not found in the database") return jsonize("File processed")
def get_file(): tmp_folder = "/tmp/mass_download" subprocess.call(["mkdir", "-p", tmp_folder]) file_hash = clean_hash(request.query.file_hash) key = '' if len(file_hash) == 40: key = 'sha1' else: response.status = 400 return jsonize({'message': 'Invalid hash format (use sha1)'}) pc = PackageController() res = pc.searchFile(file_hash) if res is None: response.status = 404 return jsonize({'message': 'File not found in the database'}) if res == 1: response.status = 400 return jsonize({'message': 'File not available for downloading'}) res = pc.getFile(file_hash) zip_name = os.path.join(tmp_folder, str(file_hash) + '.zip') file_name = os.path.join(tmp_folder, str(file_hash) + '.codex') fd = open(file_name, "wb") fd.write(res) fd.close() subprocess.call(["zip", "-ju", "-P", "codex", zip_name, file_name]) return static_file(str(file_hash) + ".zip", root=tmp_folder, download=True)
def add_file(): # tags = request.forms.get('name') upload = request.files.get('file') form_date = request.forms.get('file_date') try: # validate process_date(form_date) except ValueError: # response.status = 422 #status can't be added because angular will not # show the message. return jsonize({'message': 'Invalid date format'}) logging.debug("add_file(). date=" + str(form_date)) if form_date is None: form_date = datetime.datetime.now() name = upload.filename data_bin = upload.file.read() file_id = hashlib.sha1(data_bin).hexdigest() logging.debug("add_file(): file_id=" + str(file_id)) status = upload_file(data_bin) process_file(file_id) # ToDo: add a redis job update_date(file_id, form_date) if(status == "ok"): return jsonize({'message': 'Added with ' + str(file_id)}) elif(status == "already exists"): return jsonize({'message': 'Already exists ' + str(file_id)}) elif(status == "virustotal"): return jsonize({'message': 'Already exists ' + str(file_id)}) else: return jsonize({'message': 'Error'})
def get_result_from_av(): hash_id = request.query.file_hash if len(hash_id) == 0: response.status = 400 return jsonize({'error': 4, 'error_message': 'file_hash parameter is missing.'}) hash_id = clean_hash(hash_id) if not valid_hash(hash_id): return jsonize({'error': 5, 'error_message': 'Invalid hash format.'}) if(len(hash_id) != 40): data = "1=" + str(hash_id) res = SearchModule.search_by_id(data, 1, [], True) if(len(res) == 0): response.status = 400 return jsonize({'error': 6, 'error_message': 'File not found'}) else: sha1 = res[0]["sha1"] else: sha1 = hash_id key_manager = KeyManager() if(key_manager.check_keys_in_secrets()): av_result = get_av_result(sha1, 'high') else: return jsonize({'error': 7, "error_message": "Error: VirusTotal API key missing from secrets.py file"}) if(av_result.get('status') == "added"): return jsonize({"message": "AV scans downloaded."}) elif(av_result.get('status') == "already_had_it"): return jsonize({"message": "File already have AV scans."}) elif(av_result.get('status') == "not_found"): return jsonize({"error": 10, "error_message": "Not found on VT."}) elif(av_result.get('status') == "no_key_available"): return jsonize({"error": 11, "error_message": "No key available right now. Please try again later."}) else: logging.error("av_result for hash=" + str(sha1)) logging.error("av_result=" + str(av_result)) return jsonize({"error": 9, "error_message": "Cannot get analysis."})
def web_index(token=None): """ provides an index of buttons This function is called from far far away, over the Internet """ logging.info('Serving index page') try: if 'key' not in settings['server']: pass elif decode_token(settings, token) != 'index': raise ValueError('Invalid label in token') except Exception as feedback: if logging.getLogger().getEffectiveLevel() == logging.DEBUG: logging.error("Unable to serve the index page") raise else: logging.error(str(feedback)) response.status = 400 return 'Invalid request' items = [] global buttons for button in buttons: items.append({ 'label': button, 'delete-url': '/delete/'+settings['tokens'].get(button+'-delete'), 'initialise-url': '/initialise/'+settings['tokens'].get(button+'-initialise'), 'push-url': '/'+settings['tokens'].get(button), }) logging.debug('Buttons: {}'.format(items)) return template('views/list_items', prefix=settings['server']['url'], items=items) # # invoked from bt.tn #
def web_press(button=None): """ Processes the press of a bt.tn device This function is called from far far away, over the Internet """ if button is None: button = settings['server']['default'] try: button = decode_token(settings, button) context = load_button(settings, button) return handle_button(context) except socket.error as feedback: if logging.getLogger().getEffectiveLevel() == logging.DEBUG: logging.error("Unable to push '{}'".format(button)) raise else: logging.error(str(feedback)) response.status = 500 return 'Internal error' except Exception as feedback: if logging.getLogger().getEffectiveLevel() == logging.DEBUG: logging.error("Unable to push '{}'".format(button)) raise else: logging.error(str(feedback)) response.status = 400 return 'Invalid request'
def web_initialise(button=None): """ Initialises a room This function is called from far far away, over the Internet """ if button is None: button = settings['server']['default'] logging.info("Initialising button '{}'".format(button)) try: button = decode_token(settings, button, action='initialise') context = load_button(settings, button) delete_room(context) global buttons buttons.pop(button, None) context = load_button(settings, button) context['spark']['id'] = get_room(context) return 'OK' except Exception as feedback: if logging.getLogger().getEffectiveLevel() == logging.DEBUG: logging.error("Unable to initialise '{}'".format(button)) raise else: logging.error(str(feedback)) response.status = 400 return 'Invalid request'
def web_delete(button=None): """ Deletes a room This function is called from far far away, over the Internet """ if button is None: button = settings['server']['default'] logging.info("Deleting button '{}'".format(button)) try: button = decode_token(settings, button, action='delete') context = load_button(settings, button) delete_room(context) global buttons buttons.pop(button, None) return 'OK' except Exception as feedback: if logging.getLogger().getEffectiveLevel() == logging.DEBUG: logging.error("Unable to delete '{}'".format(button)) raise else: logging.error(str(feedback)) response.status = 400 return 'Invalid request'
def raise_error(error_code, msg): response.status = error_code return msg
def destroy(resources, resource_id): """destroy the resource from app.data""" if resources not in app.resources or resource_id not in app.data[resources]: abort(404, "Not Found") del app.data[resources][resource_id] response.status = 204 return ""
def dispatch(url): """ This class is the beginning of all entrypoints in the Ray API. Here, each url will be redirect to the right handler """ url = bottle_req.path log.info('request: %s', bottle_req.url) if url[-1] == '/': url = url[:-1] response_code = 200 try: processed = process(url, bottle_req, bottle_resp) try: from_func, http_status = processed[0], processed[1] bottle_resp.status = http_status return from_func except: return processed except exceptions.RayException as e: log.exception('ray exception: ') response_code = e.http_code except: log.exception('exception:') raise bottle_resp.status = response_code
def status(): return 'OK'
def create_user_with_key(): """ Create a user directory with a keyfile on the shared volume, data arriving in the payload of the request with a JSON payload. """ username = username_from_request(request) if not username: response.status = 422 return {'error': "Parameter 'username' not specified"} elif not username_valid(username): response.status = 400 return {'error': "Invalid parameter 'username': '{0}' not allowed.". format(username) } pubkey = request.json.get('pubkey') if not pubkey: response.status = 422 return {'error': "Parameter 'pubkey' not specified"} abs_home_path = normpath(os.path.join(HOME_PATH_PREFIX, username)) username_was_added = check_and_add(username) # Do the actual creation store_pubkey(username, abs_home_path, pubkey) response.status = 201 return {'response': 'Successful creation of user {0} and/or upload of key.' .format(username)}
def normalize(request, jobdef): if request.headers['X-Github-Event'] == 'ping': response.status = 200 response.body = "pong" return False env = {} raw_body = request.body.read() body = json.load(request.body) secret = bytes(jobdef.custom_params['secret']) if request.headers['X-Github-Event'] != 'push': raise NotImplementedError("Only push and ping events are currently supported") # Verify hash signature hashtype, signature = request.headers['X-Hub-Signature'].split('=', 1) if hashtype != 'sha1': raise ValueError("Invalid hashtype received") res = hmac.new(secret, msg=raw_body, digestmod=hashlib.sha1) # Python <2.7 doesn't have a secure compare for hmac, so we just compare # manually. This leaves this code vulnerable to timing attacks, # unfortunately. if str(res.hexdigest()) != str(signature): raise ValueError("Invalid secret") env['provider'] = 'github' env["event"] = request.headers['X-Github-Event'] env['repo_type'] = 'git' env['repo_url'] = body['repository']['clone_url'] env['repo_url_ssh'] = body['repository']['ssh_url'] env['repo_url_http'] = body['repository']['clone_url'] env['ref'] = body['ref'] env['commit'] = body['after'] mail_to = [] mail_to.append(body['repository']['owner']['email']) mail_to.append(body['pusher']['email']) env['mail_to'] = ', '.join(mail_to) return env
def create_app(engine): app = Bottle() @app.error() @app.error(404) def handle_error(error): if issubclass(type(error.exception), ApiException): response.status = error.exception.code else: response.status = error.status_code response.set_header('Content-type', 'application/json') resp = { 'type': type(error.exception).__name__, 'message': repr(error.exception) if error.exception else '', 'traceback': error.traceback, 'status_code': response.status } log.error('Exception, type=%s, message=%s, status_code=%s, traceback=%s'\ % (resp['type'], resp['message'], resp['status_code'], resp['traceback'])) return '%s %s' % (resp['status_code'], resp['message']) @app.route('/ping', method=['GET']) def ping(): return {'name': 'xFlow', 'version': '0.1' } @app.route('/publish', method=['POST']) def publish(): data = json.loads(request.body.read()) try: publish_schema.validate(data) except jsonschema.ValidationError as err: raise BadRequest(err) stream = data['stream'] event = json.dumps(data['event']) try: engine.publish(stream, event) except core.KinesisStreamDoesNotExist as ex: raise NotFoundException(str(ex)) return {} @app.route('/track/workflows/<workflow_id>/executions/<execution_id>', method=['GET']) def track(workflow_id, execution_id): try: tracking_info = engine.track(workflow_id, execution_id) return tracking_info except (core.CloudWatchStreamDoesNotExist, core.WorkflowDoesNotExist, core.CloudWatchLogDoesNotExist) as ex: raise NotFoundException(str(ex)) raise Exception("Something went wrong!") return app
def web_inbound_call(button=None): """ Handles an inbound phone call This function is called from twilio cloud back-end """ if button is None: button = settings['server']['default'] logging.info("Receiving inbound call for button '{}'".format(button)) try: button = decode_token(settings, button, action='call') context = load_button(settings, button) update, twilio_action = get_push_details(context) response.content_type = 'text/xml' behaviour = twilio.twiml.Response() say = None if 'call' in twilio_action: for line in twilio_action['call']: if line.keys()[0] == 'say': say = line['say'] break if say is None: say = "What's up Doc?" behaviour.say(say) return str(behaviour) except Exception as feedback: if logging.getLogger().getEffectiveLevel() == logging.DEBUG: logging.error("Unable to handle inbound call for '{}'".format(button)) raise else: logging.error(str(feedback)) response.status = 400 return 'Invalid request' # # the collection of buttons that we manage #