diff --git a/.github/workflows/deploy_lambdas.yml b/.github/workflows/deploy_lambdas.yml new file mode 100644 index 0000000..8d102df --- /dev/null +++ b/.github/workflows/deploy_lambdas.yml @@ -0,0 +1,88 @@ +name: CI + +# Controls when the action will run. Triggers the workflow on push or pull request +# events but only for the main branch +on: + #when there is a push to the main + push: + branches: [ serverless ] + +jobs: + build: + runs-on: ubuntu-latest + steps: + + # Checks-out your repository under $GITHUB_WORKSPACE, so your job can access it + - uses: actions/checkout@v2 + + #installs a version of python, but I need this if deploying to a severless Python Lambda? + - name: Set up Python + uses: actions/setup-python@v2 + with: + python-version: '3.7' + + #credentials to connect to AWS + - name: Configure AWS credentials from Production account + uses: aws-actions/configure-aws-credentials@v1 + with: + aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }} + aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }} + aws-region: us-east-1 + profile_name: default + project_name: FuncXActionProvider + runtime: python3.7 + + - name: Create dependency layer + run: | + python -m pip install --upgrade pip + mkdir python + #install all dependencies as defined by requirements.txt in the aws directory + pip3 install --use-deprecated=legacy-resolver -r aws/requirements.txt -t ./python + + # There is a problem with the parsl library in lambda. We have a slimmed + # down version to replace it + rm -Rf python/parsl* + pip3 install --use-deprecated=legacy-resolver -t python ./slim-parsl + + #zip files into current directory + zip -r funcxLayer.zip ./python + aws lambda publish-layer-version --layer-name FuncxLayer --zip-file fileb://./funcxLayer.zip --compatible-runtimes python3.6 python3.7 python3.8 > layer_info.json + cat layer_info.json + LAYER_VERSION=$(jq ".Version" layer_info.json) + LAYER_VERSION_ARN=$(jq ".LayerVersionArn" layer_info.json) + echo "LAYER_VERSION_ARN=$LAYER_VERSION_ARN" >> $GITHUB_ENV + + - name: Upload Globus Auth Function + run: | + cp aws/funcx-globus-auth.py ./lambda_function.py + zip funcx-globus-auth.zip ./lambda_function.py + rm ./lambda_function.py + aws lambda update-function-code --function-name funcx-globus-auth --zip-file fileb://./funcx-globus-auth.zip + + - name: Upload Action Introspect Function + run: | + cp aws/action_introspect.py ./lambda_function.py + zip action_introspect.zip ./lambda_function.py + rm ./lambda_function.py + aws lambda update-function-code --function-name action_introspect --zip-file fileb://./action_introspect.zip + aws lambda wait function-updated --function-name action_introspect + aws lambda update-function-configuration --function-name action_introspect --layers ${{ env.LAYER_VERSION_ARN }} + + - name: Upload Action Run Function + run: | + cp aws/funcx-run.py ./lambda_function.py + zip funcx-run.zip ./lambda_function.py + rm ./lambda_function.py + aws lambda update-function-code --function-name funcx-run --zip-file fileb://./funcx-run.zip + aws lambda wait function-updated --function-name funcx-run + aws lambda update-function-configuration --function-name funcx-run --layers ${{ env.LAYER_VERSION_ARN }} + + - name: Upload Action Status Function + run: | + cp aws/action-status.py ./lambda_function.py + zip action-status.zip ./lambda_function.py + rm ./lambda_function.py + aws lambda update-function-code --function-name action_status --zip-file fileb://./action-status.zip + aws lambda wait function-updated --function-name action_status + aws lambda update-function-configuration --function-name action_status --layers ${{ env.LAYER_VERSION_ARN }} + diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..23d005d --- /dev/null +++ b/.gitignore @@ -0,0 +1,143 @@ +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +share/python-wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.nox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +*.py,cover +.hypothesis/ +.pytest_cache/ +cover/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +local_settings.py +db.sqlite3 +db.sqlite3-journal + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +.pybuilder/ +target/ + +# Jupyter Notebook +.ipynb_checkpoints + +# IPython +profile_default/ +ipython_config.py + +# pyenv +# For a library or package, you might want to ignore these files since the code is +# intended to run in multiple environments; otherwise, check them in: +# .python-version + +# pipenv +# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control. +# However, in case of collaboration, if having platform-specific dependencies or dependencies +# having no cross-platform support, pipenv may install dependencies that don't work, or not +# install all needed dependencies. +#Pipfile.lock + +# PEP 582; used by e.g. github.com/David-OConnor/pyflow +__pypackages__/ + +# Celery stuff +celerybeat-schedule +celerybeat.pid + +# SageMath parsed files +*.sage.py + +# Environments +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# Spyder project settings +.spyderproject +.spyproject + +# Rope project settings +.ropeproject + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ +.dmypy.json +dmypy.json + +# Pyre type checker +.pyre/ + +# pytype static type analyzer +.pytype/ + +# Cython debug symbols +cython_debug/ + +.automatesecrets +example_flow_info.json +.python-version +.idea diff --git a/README.md b/README.md index 244af41..baa3e26 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,177 @@ -# action-provider -Globus Automate Action Provider +# Serverless Globus Automate Action Provider +This repo contains configuration and code for deploying a serverless Globus +Automate Action Provider for submitting funcX tasks. + +## Initial Setup +Create a GlobusAuth app as per the instructions from [Action Provider Tools](https://action-provider-tools.readthedocs.io/en/latest/setting_up_auth.html) + +Save the resulting client ID and secret in [AWS Secrets Manager](https://console.aws.amazon.com/secretsmanager/home?region=us-east-1#!/listSecrets) +You will want to add two values: +- API_CLIENT_ID +- API_CLIENT_SECRET + +The lambda functions here assume that the secret is named `funcX-GlobusAPI` + +## Dynamo DB +We use a single DynamoDB table to relate action_id's to taskIDs. +You will need to create a dynamo DB table called `funcx-actions` - set the +partition key to `action-id` (string). We will need a policy to allow the +lambda functions to interact with this table. This one should do: +```json +{ + "Version": "2012-10-17", + "Statement": [ + { + "Sid": "VisualEditor0", + "Effect": "Allow", + "Action": [ + "dynamodb:BatchGetItem", + "dynamodb:BatchWriteItem", + "dynamodb:ConditionCheckItem", + "dynamodb:PutItem", + "dynamodb:DeleteItem", + "dynamodb:PartiQLUpdate", + "dynamodb:Scan", + "dynamodb:Query", + "dynamodb:UpdateItem", + "dynamodb:DescribeTimeToLive", + "dynamodb:PartiQLSelect", + "dynamodb:DescribeTable", + "dynamodb:PartiQLInsert", + "dynamodb:GetItem", + "dynamodb:UpdateTable", + "dynamodb:PartiQLDelete" + ], + "Resource": "arn:aws:dynamodb:us-east-1:512084481048:table/funcx-actions" + } + ] +} +``` + +The `Resource` ARN should match the table you created. + +An example of the items in this table looks like this: +```json +{ + "action-id": "e9e664d2-d923-4c48-948d-d9b68145077c", + "tasks": { + "5d6ae875-ef80-44eb-8664-4c159ced0c46": { + "result": "Hello World!" + }, + "86017ef8-faaa-4f50-af62-44d1cf02398e": { + "result": 10 + }, + "f2ccbd80-e94b-4af0-afad-f35aff58d4c4": { + "result": 100 + } + } +} +``` + +Action-id is the partition key and matches the GlobusAutomate action that was received +by the service. Tasks is a dictionary of taskID to the results returned from the +function invocation. The `result` is null until something comes back from funcX. + +When all of the results are non-null the action is complete. + + +## Lambda Functions +The lambda functions implement the Action Provider API. They are automatically +deployed to the AWS account by github actions. + +All of the functions get their dependencies from an AWS Lambda Layer called +`FuncxLayer` which is built from [aws/requirements.txt](aws/requirements.txt). + +The action provider functions are fronted by an authorizer function called +`funcx-globus-auth` - This introspect's the bearer token and extracts the +needed dependent tokens. It adds some useful extracted data to the event dict +passed into each lambda function. + +```python + authResponse['context'] = { + 'name': name, + 'user_id': user_id, + 'identities': str(identities), + 'funcx_token': funcx_token, + 'search_token': search_token, + 'openid_token': openid_token + } +``` + +The GitHub Action CI job that deploys these Lambda functions assumes that the +AWS credentials are stored in repository secrets: +* AWS_ACCESS_KEY_ID +* AWS_SECRET_ACCESS_KEY + +I think th GitHubAction assumes that the layer, and the lambda functions already +exist in the account, so you may need to create initial blank values for these. + +## API Gateway +The key to the serverless architecture is the AWS API Gateway. Ideally this +would be configured by a CloudFormation template, however for now, these +notes will have to do. + +1. Go to [API Gateway dashboard](https://console.aws.amazon.com/apigateway/main/apis?region=us-east-1) +in the AWS Console. +2. Create new REST API. Use _New API_ since I haven't had luck importing the OpenAPI Spec +3. Click on _Authorizers_ and create a new lambda authorizer based on [aws/funcx-globus-auth.py](aws/funcx-globus-auth.py) +4. For _Lambda Event Payload_ select _Request_ and use `Header: Authorization` - this +takes the bearer token and puts it into the `event` dict. +5. Now go into _Resources_ and add a GET method under `/` +6. Set up as _Lambda Function_ Integration Type +7. Check _Use Lambda Proxy integration_ +8. Select [aws/action_introspect](aws/action_introspect.py) +9. Add a resource named `/run` and add a POST method +10. Select [aws/funcx-run](aws/funcx-run.py) as the lambda function +11. Make a new resource called `action_id` and set the path to be `{action_id}` - +this will create a path variable. +12. Under action_id, create a new resource called `status` and add a GET method +which calls `action-status` Lambda +13. Finally select _Deploy API_ action and create a new stage called `dev` - make +a note of the generated URL + +## Interacting with The Action Provider +You will need to install the Globus Automate cli +```shell script +pip install globus_automate_client +``` + +### View the action +You can retrieve the action document with: +```shell script + globus-automate action introspect --action-url <> --action-scope <> +``` + +## Run the Action in Isolation +You need to create a body json document that represents an invocation of the action. +```json +{ + "tasks": [{ + "endpoint": "4b116d3c-1703-4f8f-9f6f-39921e5864df", + "function": "4b116d3c-1703-4f8f-9f6f-39921e5864df", + "payload": { + "x": 2 + } + }] +} +``` + +Then invoke the action with +```shell script +globus-automate action run --body sample.json --action-url <> --action-scope <> +``` + +## Deploy the Example Flow +Examine the code in [example/deploy_example_flow.py](example/deploy_example_flow.py) +- this will create a flow definition that invokes the action provider. You'll +need to update the `ActionUrl` and `ActionScope` to match your new ActionProvider. + +Note the generated Flow ID. You can launch an instance of this flow with +```shell script + globus-automate flow run -v <> +``` + +Note the flow_id that comes back so you can monitor the progress with +```shell script +globus-automate flow action-status -v --flow-id <> <> +``` diff --git a/aws/action-status.py b/aws/action-status.py new file mode 100644 index 0000000..edcdb1a --- /dev/null +++ b/aws/action-status.py @@ -0,0 +1,155 @@ +import json +import boto3 +import decimal +import datetime +import traceback +import globus_sdk + +from boto3.dynamodb.conditions import Key +from globus_sdk import AccessTokenAuthorizer, RefreshTokenAuthorizer +from funcx.sdk.client import FuncXClient + +from funcx.errors import ( + FuncxTaskExecutionFailed, + SerializationError, + TaskPending, + handle_response_errors, +) + +from funcx.version import __version__ +import pathlib +from funcx.sdk.login_manager import tokenstore, LoginManager + +class DecimalEncoder(json.JSONEncoder): + def default(self, obj): + if isinstance(obj, decimal.Decimal): + return int(obj) + return super(DecimalEncoder, self).default(obj) + + +class fxLoginManager(LoginManager): + def __init__(self, authorizers, environment=None): + self.authorizers = authorizers + home_dir = '/tmp/funcx' + tokenstore._home = lambda: pathlib.Path(home_dir) + self._token_storage = tokenstore.get_token_storage_adapter(environment=environment) + + def _get_authorizer( + self, resource_server: str + ) -> globus_sdk.RefreshTokenAuthorizer: + return self.authorizers[resource_server] + + +def lambda_handler(event, context): + print("---->", __version__) + + dynamodb = boto3.resource('dynamodb') + table = dynamodb.Table('funcx-actions') + print(event) + + auth = AccessTokenAuthorizer(event['requestContext']['authorizer']['funcx_token']) + search_auth = AccessTokenAuthorizer( + event['requestContext']['authorizer']['search_token']) + openid_auth = AccessTokenAuthorizer( + event['requestContext']['authorizer']['openid_token']) + + user_id = event['requestContext']['authorizer']['user_id'] + + home_dir = '/tmp/funcx' + tokenstore._home = lambda: pathlib.Path(home_dir) + + fxmanager = fxLoginManager(authorizers={'funcx_service': auth, + 'search.api.globus.org/all': search_auth, + 'openid': openid_auth}) + + fxc = FuncXClient(login_manager=fxmanager, funcx_home=home_dir) + + action_id = event['pathParameters']['action-id'] + + response = table.query( + KeyConditionExpression=Key('action-id').eq(action_id) + ) + assert "Items" in response + assert len(response['Items']) == 1 + + action_record = response['Items'][0] + print(action_record) + + task_results = json.loads(action_record['tasks']) + # Find the taskIDs where the results are not yet in + running_tasks = list(filter(lambda task_id: bool(task_id), + [key if not task_results[key][ + 'completed'] else False + for key in task_results.keys()])) + + status = "SUCCEEDED" + display_status = "Function Results Received" + details = None + + failure = None + if running_tasks: + for task in running_tasks: + result = None + completed = False + try: + result = fxc.get_result(task) + completed = True + except TaskPending as eek: + print("Pending ", eek) + except Exception as eek2: + failure = traceback.format_exc() + result = failure + print("Detected an exception: ", eek2) + completed = True + + task_results[task]['result'] = result + task_results[task]['completed'] = completed + + update_response = table.update_item( + Key={ + 'action-id': action_id + }, + UpdateExpression="set tasks=:t, fx_ttl=:l", + ExpressionAttributeValues={ + ':t': json.dumps(task_results, cls=DecimalEncoder), + ':l': int(datetime.datetime.now().timestamp()) + 1209600 + }, + ReturnValues="UPDATED_NEW" + ) + + print("updated_response", update_response) + if failure: + print("FAILED ", failure) + status = "FAILED" + display_status = "Function Failed" + + if not completed: + status = "ACTIVE" + display_status = "Function Active" + + + # Now check again to see if everything is done + running_tasks = list(filter(lambda task_id: bool(task_id), + [key if not task_results[key][ + 'completed'] else False + for key in task_results.keys()])) + if not running_tasks: + all_res = [task_results[tt]['result'] for tt in + task_results.keys()] + + details = {'result': all_res} + + print("List results ->", details) + + result = { + "action_id": action_id, + 'status': status, + 'display_status': display_status, + 'details': details + } + + print("Status result", result) + return { + 'statusCode': 200, + 'body': json.dumps(result) + } diff --git a/aws/action_introspect.py b/aws/action_introspect.py new file mode 100644 index 0000000..f05ec28 --- /dev/null +++ b/aws/action_introspect.py @@ -0,0 +1,57 @@ +import json + + +def lambda_handler(event, context): + print(event) + + return { + 'statusCode': 202, + 'body': json.dumps( + { + "globus_auth_scope": "https://auth.globus.org/scopes/b3db7e59-a6f1-4947-95c2-59d6b7a70f8c/action_all", + "title": "FuncX Action Provider", + "subtitle": "Run FuncX", + "admin_contact": "bengal1@illinois.edu", + "synchronous": False, + "input_schema": { + "additionalProperties": False, + "properties": { + "tasks": { + "description": "List of tasks to invoke", + "items": { + "additionalProperties": False, + "properties": { + "endpoint": { + "description": "UUID of Endpoint where the function is to be run", + "type": "string" + }, + "function": { + "description": "UUID of the function to be run", + "type": "string" + }, + "payload": { + "description": "Arguments to function", + "type": "object" + } + } + } + } + + }, + "type": "object" + }, + "keywords": None, + "log_supported": False, + "maximum_deadline": "P30D", + "runnable_by": [ + "all_authenticated_users" + ], + "types": [ + "Action" + ], + "visible_to": [ + "public" + ] + } + ) + } \ No newline at end of file diff --git a/aws/funcx-globus-auth.py b/aws/funcx-globus-auth.py new file mode 100644 index 0000000..45d2e7c --- /dev/null +++ b/aws/funcx-globus-auth.py @@ -0,0 +1,76 @@ +import globus_sdk +import boto3 + +def get_secret(): + secret_name = "funcX-GlobusAPI" + region_name = "us-east-1" + + # Create a Secrets Manager client + session = boto3.session.Session() + + client = session.client( + service_name='secretsmanager', + region_name=region_name + ) + + get_secret_value_response = client.get_secret_value( + SecretId=secret_name + ) + return eval(get_secret_value_response['SecretString']) + + +def generate_policy(principalId, effect, resource, message="", name=None, identities=[], user_id=None, dependent_token=None): + funcx_token = dependent_token.by_resource_server['funcx_service']['access_token'] + search_token = dependent_token.by_resource_server['search.api.globus.org']['access_token'] + openid_token = dependent_token.by_resource_server['auth.globus.org']['access_token'] + + authResponse = {} + authResponse['principalId'] = principalId + if effect and resource: + policyDocument = {} + policyDocument['Version'] = '2012-10-17' + policyDocument['Statement'] = [ + {'Action': 'execute-api:Invoke', + 'Effect': effect, + 'Resource': resource + } + ] + authResponse['policyDocument'] = policyDocument + authResponse['context'] = { + 'name': name, + 'user_id': user_id, + 'identities': str(identities), + 'funcx_token': funcx_token, + 'search_token': search_token, + 'openid_token': openid_token + } + print("AuthResponse", authResponse) + return authResponse + + +def lambda_handler(event, context): + globus_secrets = get_secret() + + auth_client = globus_sdk.ConfidentialAppAuthClient( + globus_secrets['API_CLIENT_ID'], globus_secrets['API_CLIENT_SECRET']) + + token = event['headers']['Authorization'].replace("Bearer ", "") + + auth_res = auth_client.oauth2_token_introspect(token, include="identities_set") + depends = auth_client.oauth2_get_dependent_tokens(token) + print(depends) + + + if not auth_res: + return generate_policy(None, 'Deny', event['methodArn'], message='User not found') + + if not auth_res['active']: + return generate_policy(None, 'Deny', event['methodArn'], + message='User account not active') + + print("auth_res", auth_res) + return generate_policy(auth_res['username'], 'Allow', event['methodArn'], + name=auth_res["name"], + identities=auth_res["identities_set"], + user_id=auth_res['sub'], + dependent_token=depends) diff --git a/aws/funcx-run.py b/aws/funcx-run.py new file mode 100644 index 0000000..0280cfe --- /dev/null +++ b/aws/funcx-run.py @@ -0,0 +1,113 @@ +import json +from funcx.sdk.client import FuncXClient +from globus_sdk import AccessTokenAuthorizer, RefreshTokenAuthorizer +import boto3 +import uuid +import datetime +import pathlib +import globus_sdk +from funcx.sdk.login_manager import tokenstore, LoginManager + + +class fxLoginManager(LoginManager): + def __init__(self, authorizers, environment=None): + self.authorizers = authorizers + home_dir = '/tmp/funcx' + tokenstore._home = lambda: pathlib.Path(home_dir) + self._token_storage = tokenstore.get_token_storage_adapter(environment=environment) + + def _get_authorizer( + self, resource_server: str + ) -> globus_sdk.RefreshTokenAuthorizer: + return self.authorizers[resource_server] + + +def now_isoformat(): + return datetime.datetime.now().isoformat() + + +def lambda_handler(event, context): + print(event) + + auth = AccessTokenAuthorizer(event['requestContext']['authorizer']['funcx_token']) + search_auth = AccessTokenAuthorizer( + event['requestContext']['authorizer']['search_token']) + openid_auth = AccessTokenAuthorizer( + event['requestContext']['authorizer']['openid_token']) + + user_id = event['requestContext']['authorizer']['user_id'] + + home_dir = '/tmp/funcx' + + tokenstore._home = lambda: pathlib.Path(home_dir) + + fxmanager = fxLoginManager(authorizers={'funcx_service': auth, + 'search.api.globus.org/all': search_auth, + 'openid': openid_auth}) + + fxc = FuncXClient(login_manager=fxmanager, funcx_home=home_dir) + + dynamodb = boto3.resource('dynamodb') + table = dynamodb.Table('funcx-actions') + + body = json.loads(event['body']) + + action_id = str(uuid.uuid4()) + monitor_by = body['monitor_by'] if 'monitor_by' in body else None + manage_by = body['manage_by'] if 'manage_by' in body else None + + result = { + "action_id": action_id, + 'status': 'ACTIVE', + 'display_status': 'Function Submitted', + 'details': None, + 'monitor_by': monitor_by, + 'manage_by': manage_by, + 'start_time': now_isoformat(), + } + + status_code = 202 + + batch_res = None + try: + batch = fxc.create_batch() + + for task in body['body']['tasks']: + print(task) + payload = task.get('payload', None) + if payload: + batch.add(endpoint_id=task['endpoint'], function_id=task['function'], + kwargs=task['payload']) + else: + batch.add(endpoint_id=task['endpoint'], function_id=task['function']) + + batch_res = fxc.batch_run(batch) + print({'action_id': action_id, 'tasks': batch_res}) + result['details'] = batch_res + except Exception as eek: + print('FAILED ', eek) + result['status'] = 'FAILED' + result['display_status'] = 'Failed to submit tasks' + result['details'] = str(eek) + status_code = 400 + + # Create a dynamo record where the primary key is this action's ID + # Tasks is a dict by task_id and contains the eventual results from their + # execution. Where there are no more None results then the action is complete. + # Set a TTL for two weeks from now. + + if batch_res: + response = table.put_item( + Item={ + 'action-id': action_id, + 'tasks': json.dumps({task_id: {"result": None, "completed": False} for task_id in batch_res}), + 'fx_ttl': int(datetime.datetime.now().timestamp()) + 1209600 + } + ) + print("Dynamo", response) + + print("Status result", result) + return { + 'statusCode': status_code, + 'body': json.dumps(result) + } diff --git a/aws/requirements.txt b/aws/requirements.txt new file mode 100644 index 0000000..0492304 --- /dev/null +++ b/aws/requirements.txt @@ -0,0 +1,5 @@ +globus-sdk>=3.0.0 +funcx>=1.0.12 +funcx-endpoint>=1.0.12 +globus_automate_client +https://thirdparty.aboutcode.org/pypi/cryptography-3.4.8-cp36-abi3-manylinux_2_12_x86_64.manylinux2010_x86_64.whl diff --git a/cloud-formation/api_gateway.py b/cloud-formation/api_gateway.py new file mode 100644 index 0000000..e69de29 diff --git a/example/deploy_example_flow.py b/example/deploy_example_flow.py new file mode 100644 index 0000000..ca84963 --- /dev/null +++ b/example/deploy_example_flow.py @@ -0,0 +1,88 @@ +from globus_automate_flow import GlobusAutomateFlowDef, GlobusAutomateFlow +import json + +import globus_automate_client + + +def flow_def(flow_permissions, endpoint, sum_function, hello_world_function): + return GlobusAutomateFlowDef( + title="FuncX Example", + description="Show how to invoke FuncX", + visible_to=flow_permissions['flow_permissions'], + runnable_by=flow_permissions['flow_permissions'], + administered_by=flow_permissions['flow_permissions'], + flow_definition={ + "StartAt": "StartSubmission", + "States": { + "StartSubmission": { + "Type": "Action", + "ActionUrl": " https://b6vr4fptui.execute-api.us-east-1.amazonaws.com/test", + "ActionScope": "https://auth.globus.org/scopes/b3db7e59-a6f1-4947-95c2-59d6b7a70f8c/action_all", + "Parameters": { + "tasks": [{ + "endpoint": endpoint, + "function": sum_function, + "payload": { + "items": [1, 2, 3, 4] + } + }, + { + "endpoint": endpoint, + "function": sum_function, + "payload": { + "items": [10, 20, 30, 40] + } + }, + { + "endpoint": endpoint, + "function": hello_world_function, + "payload": {} + } + ] + }, + "Next": "EndSubmission" + }, + "EndSubmission": { + "Type": "Pass", + "End": True + } + } + } + ) + +from funcx.sdk.client import FuncXClient +fxc = FuncXClient() + +def hello_world(): + return "Hello World!" + + +def funcx_sum(items): + import time + time.sleep(15) + return sum(items) + +with open(".automatesecrets", 'r') as f: + globus_secrets = json.load(f) + +native_app_id = "417301b1-5101-456a-8a27-423e71a2ae26" # Premade native app ID +flows_client = globus_automate_client.create_flows_client(native_app_id) + +flow = flow_def(flow_permissions={ + "flow_permissions": [ + "urn:globus:groups:id:5fc63928-3752-11e8-9c6f-0e00fd09bf20" + ], + "admin_permissions": [ + "urn:globus:groups:id:5fc63928-3752-11e8-9c6f-0e00fd09bf20" + ], +}, + endpoint="4b116d3c-1703-4f8f-9f6f-39921e5864df", + sum_function=fxc.register_function(funcx_sum), + hello_world_function=fxc.register_function(hello_world)) + +print(flow.flow_definition) + +example_flow = GlobusAutomateFlow.from_flow_def(flows_client, + flow_def=flow) + +example_flow.save_flow("example_flow_info.json") diff --git a/example/flow_action.py b/example/flow_action.py new file mode 100644 index 0000000..7b0f9f3 --- /dev/null +++ b/example/flow_action.py @@ -0,0 +1,22 @@ +import ast + + +class FlowAction: + def __init__(self, flow, action_id: str): + self.action_id = action_id + self.flow = flow + + def get_status(self): + return self.flow.get_status(self.action_id) + + def get_error_msgs(self): + logs = self.flow.get_flow_logs(self.action_id) + error_msgs = [] + for failure in filter(lambda x: x['code'] == 'ActionFailed', logs['entries']): + # Failures from Search Ingest Action Provider are bundled up as string + # representation of Python dict + cause = ast.literal_eval(failure['details']['cause']) + if 'errors' in cause: + error_msgs.append(cause['errors']) + + return error_msgs diff --git a/example/globus_automate_flow.py b/example/globus_automate_flow.py new file mode 100644 index 0000000..0198986 --- /dev/null +++ b/example/globus_automate_flow.py @@ -0,0 +1,115 @@ +import json +from typing import Mapping, Any, Optional, List + +from globus_automate_client import FlowsClient + +from flow_action import FlowAction + +class GlobusAutomateFlowDef: + def __init__(self, + flow_definition: Mapping[str, Any], + title: str, + subtitle: Optional[str] = None, + description: Optional[str] = None, + keywords: List[str] = [], + visible_to: List[str] = [], + runnable_by: List[str] = [], + administered_by: List[str] = [], + input_schema: Optional[Mapping[str, Any]] = None): + self.flow_definition = flow_definition + self.title = title + self.subtitle = subtitle + self.description = description + self.keywords = keywords + self.visible_to = visible_to + self.runnable_by = runnable_by + self.administered_by = administered_by + self.input_schema = input_schema + + +class GlobusAutomateFlow: + def __init__(self, client: FlowsClient): + self.flows_client = client + self.flow_id = None + self.flow_scope = None + self.saved_flow = None + self.runAsScopes = None + + @classmethod + def from_flow_def(cls, client: FlowsClient, + flow_def: GlobusAutomateFlowDef): + result = GlobusAutomateFlow(client) + result._deploy_mdf_flow(flow_def) + return result + + @classmethod + def from_existing_flow(cls, path: str): + result = GlobusAutomateFlow(None) + result.read_flow(path) + return result + + def set_client(self, client): + self.flows_client = client + + @property + def url(self): + return "https://flows.globus.org/flows/" + self.flow_id + + def __str__(self): + return f'Globus Automate Flow: id={self.flow_id}, scope={self.flow_scope}' + + + def get_status(self, action_id: str): + return self.flows_client.flow_action_status( + self.flow_id, + self.flow_scope, + action_id).data + + def get_flow_logs(self, action_id: str): + return self.flows_client.flow_action_log( + self.flow_id, self.flow_scope, + action_id, + limit=100).data + + def _deploy_mdf_flow(self, mdf_flow_def: GlobusAutomateFlowDef): + flow_deploy_res = self.flows_client.deploy_flow( + flow_definition=mdf_flow_def.flow_definition, + title=mdf_flow_def.title, + subtitle=mdf_flow_def.subtitle, + description=mdf_flow_def.description, + visible_to=mdf_flow_def.visible_to, + runnable_by=mdf_flow_def.runnable_by, + administered_by=mdf_flow_def.administered_by, + # TODO: Make rough schema outline into JSONSchema + input_schema=mdf_flow_def.input_schema, + validate_definition=True, + validate_input_schema=True + ) + self.flow_id = flow_deploy_res["id"] + self.flow_scope = flow_deploy_res["globus_auth_scope"] + self.saved_flow = self.flows_client.get_flow(self.flow_id).data + print(self.runAsScopes) + + def run_flow(self, flow_input: dict): + flow_res = self.flows_client.run_flow(self.flow_id, self.flow_scope, flow_input) + return FlowAction(self, flow_res.data['action_id']) + + def save_flow(self, path): + # Save Flow ID/scope for future use + with open(path, 'w') as f: + flow_info = { + "flow_id": self.flow_id, + "flow_scope": self.flow_scope + } + json.dump(flow_info, f) + + def read_flow(self, path): + # Save Flow ID/scope for future use + with open(path, 'r') as f: + flow_info = json.load(f) + self.flow_id = flow_info['flow_id'] + self.flow_scope = flow_info['flow_scope'] + + def get_scope_for_runAs_role(self, rolename): + print("--->RunAsScopes ", self.runAsScopes[rolename]) + return self.globus_auth.scope_id_from_uri(self.runAsScopes[rolename]) diff --git a/slim-parsl/parsl/__init__.py b/slim-parsl/parsl/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/slim-parsl/parsl/app/__init__.py b/slim-parsl/parsl/app/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/slim-parsl/parsl/app/errors.py b/slim-parsl/parsl/app/errors.py new file mode 100644 index 0000000..9a56532 --- /dev/null +++ b/slim-parsl/parsl/app/errors.py @@ -0,0 +1,46 @@ +import dill +from six import reraise +from tblib import Traceback +from functools import wraps +from typing import Callable, Union, Any, TypeVar +from types import TracebackType +import logging +logger = logging.getLogger(__name__) + + +class RemoteExceptionWrapper: + def __init__(self, e_type: type, e_value: Exception, traceback: TracebackType) -> None: + + self.e_type = dill.dumps(e_type) + self.e_value = dill.dumps(e_value) + self.e_traceback = Traceback(traceback) + + def reraise(self) -> None: + + t = dill.loads(self.e_type) + + # the type is logged here before deserialising v and tb + # because occasionally there are problems deserialising the + # value (see #785, #548) and the fix is related to the + # specific exception type. + logger.debug("Reraising exception of type {}".format(t)) + + v = dill.loads(self.e_value) + tb = self.e_traceback.as_traceback() + + reraise(t, v, tb) + + +R = TypeVar('R') + + +def wrap_error(func: Callable[..., R]) -> Callable[..., Union[R, RemoteExceptionWrapper]]: + @wraps(func) # type: ignore + def wrapper(*args: object, **kwargs: object) -> Any: + import sys + from funcx.serialize.errors import RemoteExceptionWrapper + try: + return func(*args, **kwargs) # type: ignore + except Exception: + return RemoteExceptionWrapper(*sys.exc_info()) + return wrapper # type: ignore diff --git a/slim-parsl/requirements.txt b/slim-parsl/requirements.txt new file mode 100644 index 0000000..340b840 --- /dev/null +++ b/slim-parsl/requirements.txt @@ -0,0 +1,3 @@ +six +tblib +globus-sdk>=2.0 \ No newline at end of file diff --git a/slim-parsl/setup.py b/slim-parsl/setup.py new file mode 100644 index 0000000..7b625ae --- /dev/null +++ b/slim-parsl/setup.py @@ -0,0 +1,31 @@ +import os + +from setuptools import find_namespace_packages, setup + +version = "1.0" + +with open("requirements.txt") as f: + install_requires = f.readlines() + +setup( + name="slim-parsl", + version=version, + packages=find_namespace_packages(include=["parsl", "parsl.*"]), + description="Slim Parsl that will work with AWS Lambda", + install_requires=install_requires, + python_requires=">=3.6.0", + classifiers=[ + "Development Status :: 3 - Alpha", + "Intended Audience :: Science/Research", + "License :: OSI Approved :: Apache Software License", + "Natural Language :: English", + "Operating System :: OS Independent", + "Programming Language :: Python :: 3", + "Topic :: Scientific/Engineering", + ], + keywords=["funcX", "FaaS", "Function Serving"], + author="funcX team", + author_email="labs@globus.org", + license="Apache License, Version 2.0", + url="https://github.com/funcx-faas/funcx", +)