forked from microsoft/BotBuilder-Samples
-
Notifications
You must be signed in to change notification settings - Fork 6
/
Copy pathapp.py
89 lines (73 loc) · 2.93 KB
/
app.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.
from http import HTTPStatus
from aiohttp import web
from aiohttp.web import Request, Response
from aiohttp.web_response import json_response
from botbuilder.core import (
ConversationState,
MemoryStorage,
)
from botbuilder.core.integration import (
aiohttp_channel_service_routes,
aiohttp_error_middleware,
)
from botbuilder.core.skills import SkillHandler
from botbuilder.integration.aiohttp import ConfigurationBotFrameworkAuthentication
from botbuilder.integration.aiohttp.skills import SkillHttpClient
from botbuilder.schema import Activity
from botframework.connector.auth import (
AuthenticationConfiguration,
SimpleCredentialProvider,
)
from skill_conversation_id_factory import SkillConversationIdFactory
from authentication import AllowedSkillsClaimsValidator
from bots import RootBot
from config import DefaultConfig, SkillConfiguration
from adapter_with_error_handler import AdapterWithErrorHandler
CONFIG = DefaultConfig()
SKILL_CONFIG = SkillConfiguration()
# Whitelist skills from SKILL_CONFIG
AUTH_CONFIG = AuthenticationConfiguration(
claims_validator=AllowedSkillsClaimsValidator(CONFIG).claims_validator
)
# Create adapter.
# See https://aka.ms/about-bot-adapter to learn more about how bots work.
SETTINGS = ConfigurationBotFrameworkAuthentication(
CONFIG,
auth_configuration=AUTH_CONFIG,
)
STORAGE = MemoryStorage()
CONVERSATION_STATE = ConversationState(STORAGE)
ID_FACTORY = SkillConversationIdFactory(STORAGE)
CREDENTIAL_PROVIDER = SimpleCredentialProvider(CONFIG.APP_ID, CONFIG.APP_PASSWORD)
CLIENT = SkillHttpClient(CREDENTIAL_PROVIDER, ID_FACTORY)
ADAPTER = AdapterWithErrorHandler(
SETTINGS, CONFIG, CONVERSATION_STATE, CLIENT, SKILL_CONFIG
)
# Create the Bot
BOT = RootBot(CONVERSATION_STATE, SKILL_CONFIG, CLIENT, CONFIG)
SKILL_HANDLER = SkillHandler(
ADAPTER, BOT, ID_FACTORY, CREDENTIAL_PROVIDER, AUTH_CONFIG
)
# Listen for incoming requests on /api/messages
async def messages(req: Request) -> Response:
# Main bot message handler.
if "application/json" in req.headers["Content-Type"]:
body = await req.json()
else:
return Response(status=HTTPStatus.UNSUPPORTED_MEDIA_TYPE)
activity = Activity().deserialize(body)
auth_header = req.headers["Authorization"] if "Authorization" in req.headers else ""
invoke_response = await ADAPTER.process_activity(auth_header, activity, BOT.on_turn)
if invoke_response:
return json_response(data=invoke_response.body, status=invoke_response.status)
return Response(status=HTTPStatus.OK)
APP = web.Application(middlewares=[aiohttp_error_middleware])
APP.router.add_post("/api/messages", messages)
APP.router.add_routes(aiohttp_channel_service_routes(SKILL_HANDLER, "/api/skills"))
if __name__ == "__main__":
try:
web.run_app(APP, host="localhost", port=CONFIG.PORT)
except Exception as error:
raise error