diff --git a/datahub-actions/setup.py b/datahub-actions/setup.py index 80fd432c..2b1bf23e 100644 --- a/datahub-actions/setup.py +++ b/datahub-actions/setup.py @@ -168,6 +168,7 @@ def get_long_description(): "executor = datahub_actions.plugin.action.execution.executor_action:ExecutorAction", "slack = datahub_actions.plugin.action.slack.slack:SlackNotificationAction", "teams = datahub_actions.plugin.action.teams.teams:TeamsNotificationAction", + "dingtalk_datahub = datahub_actions.plugin.action.dingtalk.dingtalk.DingtalkNotification", "metadata_change_sync = datahub_actions.plugin.action.metadata_change_sync.metadata_change_sync:MetadataChangeSyncAction", "tag_propagation = datahub_actions.plugin.action.tag.tag_propagation_action:TagPropagationAction", "term_propagation = datahub_actions.plugin.action.term.term_propagation_action:TermPropagationAction", diff --git a/datahub-actions/src/datahub_actions/plugin/action/dingtalk/__init__.py b/datahub-actions/src/datahub_actions/plugin/action/dingtalk/__init__.py new file mode 100644 index 00000000..48955489 --- /dev/null +++ b/datahub-actions/src/datahub_actions/plugin/action/dingtalk/__init__.py @@ -0,0 +1,13 @@ +# Copyright 2021 Acryl Data, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/datahub-actions/src/datahub_actions/plugin/action/dingtalk/dingtalk.py b/datahub-actions/src/datahub_actions/plugin/action/dingtalk/dingtalk.py new file mode 100644 index 00000000..c9ea967e --- /dev/null +++ b/datahub-actions/src/datahub_actions/plugin/action/dingtalk/dingtalk.py @@ -0,0 +1,93 @@ +import json +import logging + +import requests +from datahub.configuration.common import ConfigModel +from datahub.metadata.schema_classes import EntityChangeEventClass as EntityChangeEvent +from pydantic.types import SecretStr +from ratelimit import limits, sleep_and_retry + +from datahub_actions.action.action import Action +from datahub_actions.event.event_envelope import EventEnvelope +from datahub_actions.pipeline.pipeline_context import PipelineContext +from datahub_actions.utils.datahub_util import DATAHUB_SYSTEM_ACTOR_URN +from datahub_actions.utils.social_util import ( + get_message_from_entity_change_event, + get_welcome_message, +) + +logger = logging.getLogger(__name__) + + +@sleep_and_retry +@limits(calls=1, period=1) # 1 call per second +def post_message(webhook_url, keyword, content): + headers = {"Content-Type": "application/json"} + data = { + "msgtype": "text", + "text": {"content": keyword + content}, + } + response = requests.post(webhook_url, headers=headers, data=json.dumps(data)) + if response.status_code == 200: + logger.info("Message send successfully") + else: + logger.info("Message send failed") + + +class DingtalkNotificationConfig(ConfigModel): + webhook_url: SecretStr + keyword: SecretStr + default_channel: str + base_url: str = "http://localhost:9002/" + suppress_system_activity: bool = True + + +class DingtalkNotification(Action): + def name(self): + return "DingtalkNotification" + + def close(self) -> None: + pass + + @classmethod + def create(cls, config_dict: dict, ctx: PipelineContext) -> "Action": + action_config = DingtalkNotificationConfig.parse_obj(config_dict or {}) + logger.info(f"Dingtalk notification action configured with {action_config}") + return cls(action_config, ctx) + + def __init__(self, action_config: DingtalkNotificationConfig, ctx: PipelineContext): + self.action_config = action_config + self.ctx = ctx + post_message( + self.action_config.webhook_url.get_secret_value(), + self.action_config.keyword.get_secret_value(), + get_welcome_message(self.action_config.base_url).text, + ) + + def act(self, event: EventEnvelope) -> None: + try: + message = json.dumps(json.loads(event.as_json()), indent=4) + logger.debug(f"Received event: {message}") + if event.event_type == "EntityChangeEvent_v1": + assert isinstance(event.event, EntityChangeEvent) + if ( + event.event.auditStamp.actor == DATAHUB_SYSTEM_ACTOR_URN + and self.action_config.suppress_system_activity + ): + return None + + semantic_message = get_message_from_entity_change_event( + event.event, + self.action_config.base_url, + self.ctx.graph.graph if self.ctx.graph else None, + channel="dingtalk", + ) + post_message( + self.action_config.webhook_url.get_secret_value(), + self.action_config.keyword.get_secret_value(), + semantic_message, + ) + else: + logger.debug("Skipping message because it didn't match our filter") + except Exception as e: + logger.debug("Failed to process event", e) diff --git a/datahub-actions/src/datahub_actions/utils/name_resolver.py b/datahub-actions/src/datahub_actions/utils/name_resolver.py index f6c76f0b..ba6ff9b5 100644 --- a/datahub-actions/src/datahub_actions/utils/name_resolver.py +++ b/datahub-actions/src/datahub_actions/utils/name_resolver.py @@ -130,7 +130,7 @@ class SchemaFieldNameResolver(DefaultNameResolver): def get_entity_name( self, entity_urn: Urn, datahub_graph: Optional[DataHubGraph] ) -> str: - return DatasetUrn._get_simple_field_path_from_v2_field_path( + return DatasetUrn.get_simple_field_path_from_v2_field_path( entity_urn.get_entity_id()[1] ) diff --git a/datahub-actions/tests/unit/pipeline/test_pipeline.py b/datahub-actions/tests/unit/pipeline/test_pipeline.py index d62f1e49..074e6dbc 100644 --- a/datahub-actions/tests/unit/pipeline/test_pipeline.py +++ b/datahub-actions/tests/unit/pipeline/test_pipeline.py @@ -32,13 +32,13 @@ def test_create(): # Validate Pipeline is initialized assert valid_pipeline.name is not None assert valid_pipeline.source is not None - assert type(valid_pipeline.source) == TestEventSource + assert type(valid_pipeline.source) is TestEventSource assert valid_pipeline.transforms is not None assert len(valid_pipeline.transforms) == 2 # Filter + Custom - assert type(valid_pipeline.transforms[0]) == FilterTransformer - assert type(valid_pipeline.transforms[1]) == TestTransformer + assert type(valid_pipeline.transforms[0]) is FilterTransformer + assert type(valid_pipeline.transforms[1]) is TestTransformer assert valid_pipeline.action is not None - assert type(valid_pipeline.action) == TestAction + assert type(valid_pipeline.action) is TestAction assert valid_pipeline._shutdown is False assert valid_pipeline._stats is not None assert valid_pipeline._retry_count == 3 diff --git a/docker/actions.env b/docker/actions.env index 0237137e..cd68253e 100644 --- a/docker/actions.env +++ b/docker/actions.env @@ -9,4 +9,8 @@ SCHEMA_REGISTRY_URL=http://schema-registry:8081 # System Auth -- Required if Metadata Service Authentication is enabled. # DATAHUB_SYSTEM_CLIENT_ID=__datahub_system -# DATAHUB_SYSTEM_CLIENT_SECRET=JohnSnowKnowsNothing \ No newline at end of file +# DATAHUB_SYSTEM_CLIENT_SECRET=JohnSnowKnowsNothing + +# DATAHUB_ACTIONS_DINGTALK_ENABLED +# DATAHUB_ACTIONS_DINGTALK_DATAHUB_WEBHOOK_URL +# DATAHUB_ACTIONS_DINGTALK_DATAHUB_BASE_URL \ No newline at end of file diff --git a/docker/config/dingtalk_action.yaml b/docker/config/dingtalk_action.yaml new file mode 100644 index 00000000..743b91b7 --- /dev/null +++ b/docker/config/dingtalk_action.yaml @@ -0,0 +1,38 @@ +# Copyright 2021 Acryl Data, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +name: datahub_dingtalk_action +enabled: ${DATAHUB_ACTIONS_DINGTALK_ENABLED:-false} +source: + type: "kafka" + config: + connection: + bootstrap: ${KAFKA_BOOTSTRAP_SERVER:-localhost:9092} + schema_registry_url: ${SCHEMA_REGISTRY_URL:-http://localhost:8081} + topic_routes: + mcl: ${METADATA_CHANGE_LOG_VERSIONED_TOPIC_NAME:-MetadataChangeLog_Versioned_v1} + pe: ${PLATFORM_EVENT_TOPIC_NAME:-PlatformEvent_v1} +action: + type: dingtalk_datahub + config: + # Action-specific configs (map) + webhook_url: ${DATAHUB_ACTIONS_DINGTALK_DATAHUB_WEBHOOK_URL} + base_url: ${DATAHUB_ACTIONS_DINGTALK_DATAHUB_BASE_URL:-http://localhost:9002} + keyword: ${DATAHUB_ACTIONS_DINGTALK_KEYWORD:-Datahub} + default_channel: ${DATAHUB_ACTIONS_DINGTALK_CHANNEL:-dingtalk} + suppress_system_activity: ${DATAHUB_ACTIONS_DINGTALK_SUPPRESS_SYSTEM_ACTIVITY:-true} + +datahub: + server: "http://${DATAHUB_GMS_HOST:-localhost}:${DATAHUB_GMS_PORT:-8080}" + extra_headers: + Authorization: "Basic ${DATAHUB_SYSTEM_CLIENT_ID:-__datahub_system}:${DATAHUB_SYSTEM_CLIENT_SECRET:-JohnSnowKnowsNothing}" diff --git a/docker/datahub-actions/Dockerfile b/docker/datahub-actions/Dockerfile index 08f119a6..725132d2 100644 --- a/docker/datahub-actions/Dockerfile +++ b/docker/datahub-actions/Dockerfile @@ -17,10 +17,11 @@ ARG APP_ENV=prod FROM acryldata/datahub-ingestion-base:latest as prod-install COPY datahub-actions /actions-src +RUN pip config set global.index-url https://pypi.tuna.tsinghua.edu.cn/simple RUN mkdir -p /etc/datahub/actions && mkdir -p /tmp/datahub/logs/actions/system RUN cd /actions-src && \ - pip install "." && \ - pip install '.[all]' && \ + pip install "." -i https://pypi.tuna.tsinghua.edu.cn/simple && \ + pip install '.[all]' -i https://pypi.tuna.tsinghua.edu.cn/simple && \ # This is required to fix security vulnerability in htrace-core4 rm -f /usr/local/lib/python3.10/site-packages/pyspark/jars/htrace-core4-4.1.0-incubating.jar @@ -46,4 +47,4 @@ FROM ${APP_ENV}-install as final USER datahub RUN curl -s "https://get.sdkman.io" | bash RUN /bin/bash -c "source /$HOME/.sdkman/bin/sdkman-init.sh; sdk version; sdk install java 8.0.332-zulu" -CMD dockerize -wait ${DATAHUB_GMS_PROTOCOL:-http}://$DATAHUB_GMS_HOST:$DATAHUB_GMS_PORT/health -timeout 240s /start_datahub_actions.sh +CMD dockerize -wait ${DATAHUB_GMS_PROTOCOL:-http}://$DATAHUB_GMS_HOST:$DATAHUB_GMS_PORT/health -timeout 240s /start_datahub_actions.sh \ No newline at end of file diff --git a/docs/actions/dingtalk.md b/docs/actions/dingtalk.md new file mode 100644 index 00000000..3c056cf2 --- /dev/null +++ b/docs/actions/dingtalk.md @@ -0,0 +1,189 @@ +import FeatureAvailability from '@site/src/components/FeatureAvailability'; + +# DingTalk + + + +| | | +| --- | --- | +| **Status** | ![Incubating](https://img.shields.io/badge/support%20status-incubating-blue) | +| **Version Requirements** | ![Minimum Version Requirements](https://img.shields.io/badge/acryl_datahub_actions-v0.0.9+-green.svg) | + +## Overview + +This Action integrates DataHub with Dingtalk to send notifications to a configured Dingtalk group chat in your workspace. + +### Capabilities + +- Sending notifications of important events to a Dingtalk group chat + - Adding or Removing a tag from an entity (dataset, dashboard etc.) + - Updating documentation at the entity or field (column) level. + - Adding or Removing ownership from an entity (dataset, dashboard, etc.) + - Creating a Domain + - and many more. + +### User Experience + +On startup, the action will produce a welcome message that looks like the one below. + +![](https://img-blog.csdnimg.cn/ecfe25b5d2be4ed5a563e5ba69b96a3e.png#pic_center) + + +On each event, the action will produce a notification message that looks like the one below. + +![](https://img-blog.csdnimg.cn/ef624ede87c44d27b3e6b1acb3228e74.png#pic_center) + + +### Supported Events + +- `EntityChangeEvent_v1` +- Currently, the `MetadataChangeLog_v1` event is **not** processed by the Action. + +## Action Quickstart + +### Prerequisites + +Ensure that you have configured an incoming webhook in your Dingtalk group chat. + +Follow the guide [here](https://open.dingtalk.com/document/isvapp/enterprise-internal-robots-use-webhook-to-send-group-chat-messages) to set it up. + +Take note of the incoming webhook url as you will need to use that to configure the Dingtalk action. + +### Installation Instructions (Deployment specific) + +#### Quickstart + +If you are running DataHub using the docker quickstart option, there are no additional software installation steps. The `datahub-actions` container comes pre-installed with the Teams action. + +All you need to do is export a few environment variables to activate and configure the integration. See below for the list of environment variables to export. + +| Env Variable | Required for Integration | Purpose | +|-------------------------------------------| --- |------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| DATAHUB_ACTIONS_DINGTALK_ENABLED | ✅ | Set to "true" to enable the Dingtalk action | +| DATAHUB_ACTIONS_DINGTALK_WEBHOOK_URL | ✅ | Set to the incoming webhook url that you configured in the [pre-requisites step](#prerequisites) above | +| DATAHUB_ACTIONS_DINGTALK_KEYWORD | ✅ | Defaults to "Datahub". The keyword section is below the robot webhook section, you need to change the keyword to "Datahub" or set different keyword through Env variable. | +| DATAHUB_ACTIONS_DINGTALK_DATAHUB_BASE_URL | ❌ | Defaults to "http://localhost:9002". Set to the location where your DataHub UI is running. On a local quickstart this is usually "http://localhost:9002", so you shouldn't need to modify this | + +:::note +where to set keyword: [here](https://img-blog.csdnimg.cn/6ccd2e098ca546fcb616c80661c555e8.png#pic_center) + +:::note + +You will have to restart the `datahub-actions` docker container after you have exported these environment variables if this is the first time. The simplest way to do it is via the Docker Desktop UI, or by just issuing a `datahub docker quickstart --stop && datahub docker quickstart` command to restart the whole instance. + +::: + + +For example: +```shell +export DATAHUB_ACTIONS_DINGTALK_ENABLED=true +export DATAHUB_ACTIONS_DINGTALK_WEBHOOK_URL= + +datahub docker quickstart --stop && datahub docker quickstart +``` + +#### k8s / helm + +Similar to the quickstart scenario, there are no specific software installation steps. The `datahub-actions` container comes pre-installed with the Dingtalk action. You just need to export a few environment variables and make them available to the `datahub-actions` container to activate and configure the integration. See below for the list of environment variables to export. + +| Env Variable | Required for Integration | Purpose | +|-------------------------------------------| --- |-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| DATAHUB_ACTIONS_DINGTALK_ENABLED | ✅ | Set to "true" to enable the Dingtalk action | +| DATAHUB_ACTIONS_DINGTALK_WEBHOOK_URL | ✅ | Set to the incoming webhook url that you configured in the [pre-requisites step](#prerequisites) above | +| DATAHUB_ACTIONS_DINGTALK_DATAHUB_BASE_URL | ✅| Set to the location where your DataHub UI is running. For example, if your DataHub UI is hosted at "https://datahub.my-company.biz", set this to "https://datahub.my-company.biz" | + + +#### Bare Metal - CLI or Python-based + +If you are using the `datahub-actions` library directly from Python, or the `datahub-actions` cli directly, then you need to first install the `dingtalk` action plugin in your Python virtualenv. + +``` +pip install "datahub-actions[dingtalk]" +``` + +Then run the action with a configuration file that you have modified to capture your credentials and configuration. + +##### Sample Dingtalk Action Configuration File + +```yml +name: datahub_dingtalk_action +enabled: true +source: + type: "kafka" + config: + connection: + bootstrap: ${KAFKA_BOOTSTRAP_SERVER:-localhost:9092} + schema_registry_url: ${SCHEMA_REGISTRY_URL:-http://localhost:8081} + topic_routes: + mcl: ${METADATA_CHANGE_LOG_VERSIONED_TOPIC_NAME:-MetadataChangeLog_Versioned_v1} + pe: ${PLATFORM_EVENT_TOPIC_NAME:-PlatformEvent_v1} + +## 3a. Optional: Filter to run on events (map) +# filter: +# event_type: +# event: +# # Filter event fields by exact-match +# + +# 3b. Optional: Custom Transformers to run on events (array) +# transform: +# - type: +# config: +# # Transformer-specific configs (map) + +action: + type: dingtalk_datahub + config: + # Action-specific configs (map) + base_url: ${DATAHUB_ACTIONS_DINGTALK_DATAHUB_BASE_URL:-http://localhost:9002} + webhook_url: ${DATAHUB_ACTIONS_DINGTALK_WEBHOOK_URL} + suppress_system_activity: ${DATAHUB_ACTIONS_DINGTALK_SUPPRESS_SYSTEM_ACTIVITY:-true} + +datahub: + server: "http://${DATAHUB_GMS_HOST:-localhost}:${DATAHUB_GMS_PORT:-8080}" +``` + +##### Dingtalk Action Configuration Parameters + +| Field | Required | Default | Description | +|----------------------------| --- |--------------------------------------------------------------------------------------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| `base_url` | ❌| `False` | Whether to print events in upper case. | +| `keyword` | ✅| `Datahub` | The keyword that Dingtalk required to send content. Defaults to 'Datahub'. (It's at the same page where you will get robot webhook url.) | +| `webhook_url` | ✅ | Set to the incoming webhook url that you configured in the [pre-requisites step](#prerequisites) above | +| `suppress_system_activity` | ❌ | `True` | Set to `False` if you want to get low level system activity events, e.g. when datasets are ingested, etc. Note: this will currently result in a very spammy Teams notifications experience, so this is not recommended to be changed. | + + +## Troubleshooting + +If things are configured correctly, you should see logs on the `datahub-actions` container that indicate success in enabling and running the Dingtalk action. + +```shell +docker logs datahub-datahub-actions-1 + +... +[2022-12-04 16:47:44,536] INFO {datahub_actions.cli.actions:76} - DataHub Actions version: unavailable (installed editable via git) +[2022-12-04 16:47:44,565] WARNING {datahub_actions.cli.actions:103} - Skipping pipeline datahub_slack_action as it is not enabled +[2022-12-04 16:47:44,581] INFO {datahub_actions.plugin.action.dingtalk.dingtalk:60} - Dingtalk notification action configured with webhook_url=SecretStr('**********') base_url='http://localhost:9002' suppress_system_activity=True +[2022-12-04 16:47:46,393] INFO {datahub_actions.cli.actions:119} - Action Pipeline with name 'ingestion_executor' is now running. +[2022-12-04 16:47:46,393] INFO {datahub_actions.cli.actions:119} - Action Pipeline with name 'datahub_dingtalk_action' is now running. +... +``` +[Here is how it looks like](https://img-blog.csdnimg.cn/95a16f27d0ff40d9a1f542f533740c1a.png#pic_center) + + +If the Dingtalk action was not enabled, you would see messages indicating that. +e.g. the following logs below show that node of the Teams or Slack or Dingtalk action were enabled. + +```shell +docker logs datahub-datahub-actions-1 + +.... +No user action configurations found. Not starting user actions. +[2022-12-04 06:45:27,509] INFO {datahub_actions.cli.actions:76} - DataHub Actions version: unavailable (installed editable via git) +[2022-12-04 06:45:27,647] WARNING {datahub_actions.cli.actions:103} - Skipping pipeline datahub_slack_action as it is not enabled +[2022-12-04 06:45:27,649] WARNING {datahub_actions.cli.actions:103} - Skipping pipeline datahub_teams_action as it is not enabled +[2022-12-04 06:45:27,649] INFO {datahub_actions.cli.actions:119} - Action Pipeline with name 'ingestion_executor' is now running. +... + +``` +