谁能帮我解决以下错误:
结果:失败异常:ImportError:无法从“azure.eventgrid”导入名称“EventGridPublisherClient”
这是 HTTPTrigger 的代码:
import os import logging from azure.core.credentials import AzureKeyCredential from azure.eventgrid import EventGridPublisherClient, EventGridEvent import azure.functions as func def main(req: func.HttpRequest) -> func.HttpResponse: logging.info('Python HTTP trigger function processed a request.') # Parse request body #req_body = req.get_json() # Get Event Grid topic endpoint and key from environment variables topic_endpoint = os.environ.get('EVENT_GRID_TOPIC_ENDPOINT') topic_key = os.environ.get('EVENT_GRID_TOPIC_KEY') # Create Event Grid publisher client credential = AzureKeyCredential(topic_key) client = EventGridPublisherClient(topic_endpoint, credential) # Create an event event = EventGridEvent( event_type="MyCustomEventType", subject="MyCustomSubject", data={ "message": "Hello, Event Grid!" }, data_version="1.0" ) # Publish the event client.send(event) # Return a response return func.HttpResponse("Event published to Event Grid topic.", status_code=200)
requirements.txt 看起来像这样:
urllib3 uplink requests azure-functions azure azure-eventgrid azure-core
在本地运行工作正常,但当我部署到 Azure 时,出现上述错误。我在兜圈子,似乎找不到任何有用的信息来帮助解决这个问题。有人有想法吗?
我尝试使用以下代码部署 Azure Function HTTP Trigger,它成功了。
Code:
import os import logging from azure.core.credentials import AzureKeyCredential from azure.eventgrid import EventGridPublisherClient, EventGridEvent import azure.functions as func def main(req: func.HttpRequest) -> func.HttpResponse: logging.info('Python HTTP trigger function processed a request.') topic_endpoint = os.environ.get('EVENT_GRID_TOPIC_ENDPOINT') topic_key = os.environ.get('EVENT_GRID_TOPIC_KEY') credential = AzureKeyCredential(topic_key) client = EventGridPublisherClient(topic_endpoint, credential) event = EventGridEvent( event_type="MyCustomEventType", subject="MyCustomSubject", data={ "message": "Hello, Event Grid!" }, data_version="1.0" ) client.send(event) return func.HttpResponse("Event published to Event Grid topic.", status_code=200)
local.setting.json:
{ "IsEncrypted": false, "Values": { "AzureWebJobsStorage": "<your-storage-account-connection-string>", "FUNCTIONS_WORKER_RUNTIME": "python", "EVENT_GRID_TOPIC_KEY": "<your-event-grid-topic-key>", "EVENT_GRID_TOPIC_ENDPOINT": "<your-event-grid-topic-endpoint>" } }
requirement.txt:
azure-functions azure-eventgrid==4.0.0 azure-core>=1.18.0
我在 Azure 门户的Configuration functionapp的Application.settings中添加了EVENT_GRID_TOPIC_ENDPOINT和eventgridtopickey ,
我运行上面的代码并得到以下结果:
使用上面的URL,我可以在浏览器中看到如下输出,
然后,我将上面的代码部署到functionapp,如下所示,
选择要部署的功能应用程序,
点击Delpoy选项,
HTTP触发函数部署成功,点击查看输出,
输出显示HTTP 触发器函数已成功部署到functionapp,如下所示,
成功部署到Azure 门户中的functionapp,如下所示,