From b67ade0bc98fbbae6af002ff1834e0f605e0b393 Mon Sep 17 00:00:00 2001 From: "bdodla@expedia.com" <13788369+EXPEbdodla@users.noreply.github.com> Date: Tue, 23 Jul 2024 11:13:13 -0700 Subject: [PATCH] fix: Decouple go feature server from gopy bindings (#118) * fix: Decouple go feature server from gopy bindings --------- Co-authored-by: Bhargav Dodla --- .github/workflows/build_wheels.yml | 2 +- .github/workflows/go_pr.yml | 85 ++++++ .github/workflows/lint_pr.yml | 2 +- .github/workflows/linter.yml | 35 +-- .github/workflows/pr_integration_tests.yml | 19 -- .../workflows/pr_local_integration_tests.yml | 23 +- .github/workflows/publish.yml | 2 +- .github/workflows/unit_tests.yml | 38 +-- Makefile | 66 ++--- go.mod | 4 +- go.sum | 6 +- go/README.md | 112 +------- go/infra/docker/feature-server/Dockerfile | 31 +++ go/internal/feast/registry/http.go | 15 +- go/internal/feast/registry/registry.go | 5 - go/internal/feast/registry/repoconfig.go | 38 ++- go/internal/feast/registry/repoconfig_test.go | 89 +++++- .../feast/server/logging/memorybuffer.go | 3 +- .../feast/server/logging/memorybuffer_test.go | 2 +- .../feast/transformation/transformation.go | 136 --------- go/main.go | 180 ++++++++++++ go/main_test.go | 71 +++++ sdk/python/feast/cli.py | 11 - sdk/python/feast/feature_store.py | 74 +---- sdk/python/feast/repo_config.py | 6 - sdk/python/tests/README.md | 3 - sdk/python/tests/conftest.py | 34 +-- .../test_elasticsearch_online_store.py | 4 +- .../integration/e2e/test_go_feature_server.py | 263 ------------------ .../integration_test_repo_config.py | 3 - .../universal/data_sources/file.py | 5 +- setup.py | 96 +------ 32 files changed, 565 insertions(+), 898 deletions(-) create mode 100644 .github/workflows/go_pr.yml create mode 100644 go/infra/docker/feature-server/Dockerfile create mode 100644 go/main.go create mode 100644 go/main_test.go delete mode 100644 sdk/python/tests/integration/e2e/test_go_feature_server.py diff --git a/.github/workflows/build_wheels.yml b/.github/workflows/build_wheels.yml index f04015a989..72cb485b80 100644 --- a/.github/workflows/build_wheels.yml +++ b/.github/workflows/build_wheels.yml @@ -116,7 +116,7 @@ jobs: needs: get-version strategy: matrix: - component: [feature-server, feature-server-java, feature-transformation-server] + component: [feature-server, feature-server-java, feature-transformation-server, feature-server-go] env: REGISTRY: feastdev steps: diff --git a/.github/workflows/go_pr.yml b/.github/workflows/go_pr.yml new file mode 100644 index 0000000000..b8c7b24f14 --- /dev/null +++ b/.github/workflows/go_pr.yml @@ -0,0 +1,85 @@ +name: go_pr + +on: + pull_request_target: + types: + - opened + - synchronize + - labeled + +jobs: + lint-go: + # when using pull_request_target, all jobs MUST have this if check for 'ok-to-test' or 'approved' for security purposes. + if: + ((github.event.action == 'labeled' && (github.event.label.name == 'approved' || github.event.label.name == 'lgtm' || github.event.label.name == 'ok-to-test')) || + (github.event.action != 'labeled' && (contains(github.event.pull_request.labels.*.name, 'ok-to-test') || contains(github.event.pull_request.labels.*.name, 'approved') || contains(github.event.pull_request.labels.*.name, 'lgtm')))) && + github.repository == 'ExpediaGroup/feast' + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + with: + # pull_request_target runs the workflow in the context of the base repo + # as such actions/checkout needs to be explicit configured to retrieve + # code from the PR. + ref: refs/pull/${{ github.event.pull_request.number }}/merge + submodules: recursive + - name: Setup Go + id: setup-go + uses: actions/setup-go@v2 + with: + go-version: 1.22.5 + - name: Lint go + run: make lint-go + + unit-test-go: + # when using pull_request_target, all jobs MUST have this if check for 'ok-to-test' or 'approved' for security purposes. + if: + ((github.event.action == 'labeled' && (github.event.label.name == 'approved' || github.event.label.name == 'lgtm' || github.event.label.name == 'ok-to-test')) || + (github.event.action != 'labeled' && (contains(github.event.pull_request.labels.*.name, 'ok-to-test') || contains(github.event.pull_request.labels.*.name, 'approved') || contains(github.event.pull_request.labels.*.name, 'lgtm')))) && + github.repository == 'ExpediaGroup/feast' + runs-on: ubuntu-latest + needs: lint-go + steps: + - uses: actions/checkout@v4 + with: + # pull_request_target runs the workflow in the context of the base repo + # as such actions/checkout needs to be explicit configured to retrieve + # code from the PR. + ref: refs/pull/${{ github.event.pull_request.number }}/merge + submodules: recursive + - name: Setup Go + id: setup-go + uses: actions/setup-go@v2 + with: + go-version: 1.22.5 + - name: Setup Python + uses: actions/setup-python@v5 + id: setup-python + with: + python-version: "3.11" + architecture: x64 + - name: Test Go + run: make test-go + - uses: actions/upload-artifact@v3 + with: + name: go-coverage-report + path: ${{ github.workspace }}/coverage.html + + build-docker-image-go: + # when using pull_request_target, all jobs MUST have this if check for 'ok-to-test' or 'approved' for security purposes. + if: + ((github.event.action == 'labeled' && (github.event.label.name == 'approved' || github.event.label.name == 'lgtm' || github.event.label.name == 'ok-to-test')) || + (github.event.action != 'labeled' && (contains(github.event.pull_request.labels.*.name, 'ok-to-test') || contains(github.event.pull_request.labels.*.name, 'approved') || contains(github.event.pull_request.labels.*.name, 'lgtm')))) && + github.repository == 'ExpediaGroup/feast' + runs-on: ubuntu-latest + strategy: + matrix: + component: [ feature-server-go ] + env: + REGISTRY: gcr.io/kf-feast + steps: + - uses: actions/checkout@v4 + with: + submodules: 'true' + - name: Build image + run: make build-${{ matrix.component }}-docker REGISTRY=${REGISTRY} VERSION=${GITHUB_SHA} \ No newline at end of file diff --git a/.github/workflows/lint_pr.yml b/.github/workflows/lint_pr.yml index d1aa7d16a3..c76e5f49d3 100644 --- a/.github/workflows/lint_pr.yml +++ b/.github/workflows/lint_pr.yml @@ -14,7 +14,7 @@ permissions: jobs: validate-title: if: - github.repository == 'feast-dev/feast' + github.repository != 'feast-dev/feast' name: Validate PR title runs-on: ubuntu-latest steps: diff --git a/.github/workflows/linter.yml b/.github/workflows/linter.yml index d04aef0c48..db3a979192 100644 --- a/.github/workflows/linter.yml +++ b/.github/workflows/linter.yml @@ -15,21 +15,8 @@ jobs: with: python-version: "3.11" architecture: x64 - - name: Setup Go - id: setup-go - uses: actions/setup-go@v2 - with: - go-version: 1.19.7 - name: Install uv run: curl -LsSf https://astral.sh/uv/install.sh | sh - - name: Install apache-arrow on ubuntu - run: | - sudo apt update - sudo apt install -y -V ca-certificates lsb-release wget - wget https://apache.jfrog.io/artifactory/arrow/$(lsb_release --id --short | tr 'A-Z' 'a-z')/apache-arrow-apt-source-latest-$(lsb_release --codename --short).deb - sudo apt install -y -V ./apache-arrow-apt-source-latest-$(lsb_release --codename --short).deb - sudo apt update - sudo apt install -y -V "libarrow-dev=11.0.0-1" - name: Install dependencies run: | make install-python-ci-dependencies-uv @@ -39,27 +26,11 @@ jobs: lint-go: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v4 - name: Setup Go id: setup-go uses: actions/setup-go@v2 with: - go-version: 1.19.7 - - name: Setup Python - id: setup-python - uses: actions/setup-python@v2 - with: - python-version: "3.8" - - name: Upgrade pip version - run: | - pip install --upgrade pip - - name: Install apache-arrow on ubuntu - run: | - sudo apt update - sudo apt install -y -V ca-certificates lsb-release wget - wget https://apache.jfrog.io/artifactory/arrow/$(lsb_release --id --short | tr 'A-Z' 'a-z')/apache-arrow-apt-source-latest-$(lsb_release --codename --short).deb - sudo apt install -y -V ./apache-arrow-apt-source-latest-$(lsb_release --codename --short).deb - sudo apt update - sudo apt install -y -V "libarrow-dev=11.0.0-1" + go-version: 1.22.5 - name: Lint go - run: make lint-go + run: make lint-go \ No newline at end of file diff --git a/.github/workflows/pr_integration_tests.yml b/.github/workflows/pr_integration_tests.yml index 75f946faac..f4a9132d29 100644 --- a/.github/workflows/pr_integration_tests.yml +++ b/.github/workflows/pr_integration_tests.yml @@ -52,11 +52,6 @@ jobs: with: python-version: ${{ matrix.python-version }} architecture: x64 - - name: Setup Go - id: setup-go - uses: actions/setup-go@v2 - with: - go-version: 1.19.7 - name: Authenticate to Google Cloud uses: 'google-github-actions/auth@v1' with: @@ -75,20 +70,6 @@ jobs: aws-region: us-west-2 - name: Use AWS CLI run: aws sts get-caller-identity - - name: Install apache-arrow on ubuntu - if: matrix.os == 'ubuntu-latest' - run: | - sudo apt update - sudo apt install -y -V ca-certificates lsb-release wget - wget https://apache.jfrog.io/artifactory/arrow/$(lsb_release --id --short | tr 'A-Z' 'a-z')/apache-arrow-apt-source-latest-$(lsb_release --codename --short).deb - sudo apt install -y -V ./apache-arrow-apt-source-latest-$(lsb_release --codename --short).deb - sudo apt update - sudo apt install -y -V "libarrow-dev=11.0.0-1" - - name: Install apache-arrow on macos - if: matrix.os == 'macOS-latest' - run: | - brew install apache-arrow - brew install pkg-config - name: Install uv run: curl -LsSf https://astral.sh/uv/install.sh | sh - name: Get uv cache dir diff --git a/.github/workflows/pr_local_integration_tests.yml b/.github/workflows/pr_local_integration_tests.yml index 9eda08fb44..25273b8a55 100644 --- a/.github/workflows/pr_local_integration_tests.yml +++ b/.github/workflows/pr_local_integration_tests.yml @@ -38,12 +38,6 @@ jobs: with: python-version: ${{ matrix.python-version }} architecture: x64 - - - name: Setup Go - id: setup-go - uses: actions/setup-go@v2 - with: - go-version: 1.19.7 - name: Install uv run: curl -LsSf https://astral.sh/uv/install.sh | sh - name: Get uv cache dir @@ -55,24 +49,9 @@ jobs: with: path: ${{ steps.uv-cache.outputs.dir }} key: ${{ runner.os }}-${{ steps.setup-python.outputs.python-version }}-uv-${{ hashFiles(format('**/py{0}-ci-requirements.txt', env.PYTHON)) }} - - name: Install apache-arrow on ubuntu - if: matrix.os == 'ubuntu-latest' - run: | - sudo apt update - sudo apt install -y -V ca-certificates lsb-release wget - wget https://apache.jfrog.io/artifactory/arrow/$(lsb_release --id --short | tr 'A-Z' 'a-z')/apache-arrow-apt-source-latest-$(lsb_release --codename --short).deb - sudo apt install -y -V ./apache-arrow-apt-source-latest-$(lsb_release --codename --short).deb - sudo apt update - sudo apt install -y -V "libarrow-dev=11.0.0-1" - name: Install dependencies run: make install-python-ci-dependencies-uv - - name: Compile Go along with Extensions - run: | - make install-go-proto-dependencies - make install-go-ci-dependencies - COMPILE_GO=true python setup.py develop - CGO_LDFLAGS_ALLOW=".*" COMPILE_GO=True python setup.py build_ext --inplace - name: Test local integration tests if: ${{ always() }} # this will guarantee that step won't be canceled and resources won't leak - run: make test-python-integration-local + run: make test-python-integration-local \ No newline at end of file diff --git a/.github/workflows/publish.yml b/.github/workflows/publish.yml index e56296ec4b..09b33e6d77 100644 --- a/.github/workflows/publish.yml +++ b/.github/workflows/publish.yml @@ -49,7 +49,7 @@ jobs: needs: [get-version, publish-python-sdk] strategy: matrix: - component: [feature-server, feature-server-java, feature-transformation-server, feast-operator] + component: [feature-server, feature-server-java, feature-transformation-server, feast-operator, feature-server-go] env: MAVEN_CACHE: gs://feast-templocation-kf-feast/.m2.2020-08-19.tar REGISTRY: feastdev diff --git a/.github/workflows/unit_tests.yml b/.github/workflows/unit_tests.yml index 42b5df1a35..ab76b2d147 100644 --- a/.github/workflows/unit_tests.yml +++ b/.github/workflows/unit_tests.yml @@ -23,21 +23,6 @@ jobs: with: python-version: ${{ matrix.python-version }} architecture: x64 - - name: Setup Go - id: setup-go - uses: actions/setup-go@v2 - with: - go-version: 1.19.7 - - name: Install apache-arrow on ubuntu - if: matrix.os == 'ubuntu-latest' - run: | - sudo apt update - sudo apt install -y -V ca-certificates lsb-release wget - wget https://apache.jfrog.io/artifactory/arrow/$(lsb_release --id --short | tr 'A-Z' 'a-z')/apache-arrow-apt-source-latest-$(lsb_release --codename --short).deb - sudo apt install -y -V ./apache-arrow-apt-source-latest-$(lsb_release --codename --short).deb - sudo apt update - sudo apt install -y -V "libarrow-dev=11.0.0-1" - sudo apt install -y -V pkg-config - name: Install uv run: | curl -LsSf https://astral.sh/uv/install.sh | sh @@ -67,7 +52,7 @@ jobs: run: make test-python-unit - name: Test Python for Mac if: matrix.os != 'ubuntu-latest' - run: python -m pytest -n 8 --color=yes sdk/python/tests --ignore=sdk/python/tests/expediagroup --ignore=sdk/python/tests/integration/e2e/test_go_feature_server.py + run: python -m pytest -n 8 --color=yes sdk/python/tests --ignore=sdk/python/tests/expediagroup unit-test-go: runs-on: ${{ matrix.os }} @@ -81,6 +66,11 @@ jobs: PYTHON: ${{ matrix.python-version }} steps: - uses: actions/checkout@v4 + - name: Setup Go + id: setup-go + uses: actions/setup-go@v2 + with: + go-version: 1.22.5 - name: Setup Python id: setup-python uses: actions/setup-python@v5 @@ -90,20 +80,6 @@ jobs: - name: Upgrade pip version run: | pip install --upgrade "pip>=22.1,<23" - - name: Setup Go - id: setup-go - uses: actions/setup-go@v2 - with: - go-version: 1.19.7 - - name: Install apache-arrow on ubuntu - run: | - sudo apt update - sudo apt install -y -V ca-certificates lsb-release wget - wget https://apache.jfrog.io/artifactory/arrow/$(lsb_release --id --short | tr 'A-Z' 'a-z')/apache-arrow-apt-source-latest-$(lsb_release --codename --short).deb - sudo apt install -y -V ./apache-arrow-apt-source-latest-$(lsb_release --codename --short).deb - sudo apt update - sudo apt install -y -V "libarrow-dev=11.0.0-1" - sudo apt install -y -V pkg-config - name: Test run: make test-go @@ -125,4 +101,4 @@ jobs: run: yarn build:lib - name: Run yarn tests working-directory: ./ui - run: yarn test --watchAll=false + run: yarn test --watchAll=false \ No newline at end of file diff --git a/Makefile b/Makefile index 1b9ca953e6..2ee3b60771 100644 --- a/Makefile +++ b/Makefile @@ -36,11 +36,8 @@ build: protos build-java build-docker # Python SDK -install-python-ci-dependencies: install-go-proto-dependencies install-go-ci-dependencies +install-python-ci-dependencies: python -m piptools sync sdk/python/requirements/py$(PYTHON)-ci-requirements.txt - pip install --no-deps -e . - python setup.py build_python_protos --inplace - COMPILE_GO=true python setup.py develop install-python-ci-dependencies-uv: uv pip sync --system sdk/python/requirements/py$(PYTHON)-ci-requirements.txt @@ -124,7 +121,6 @@ test-python-universal-spark: not test_historical_features_persisting and \ not test_historical_retrieval_fails_on_validation and \ not test_universal_cli and \ - not test_go_feature_server and \ not test_feature_logging and \ not test_reorder_columns and \ not test_logged_features_validation and \ @@ -147,7 +143,6 @@ test-python-universal-trino: not test_historical_features_persisting and \ not test_historical_retrieval_fails_on_validation and \ not test_universal_cli and \ - not test_go_feature_server and \ not test_feature_logging and \ not test_reorder_columns and \ not test_logged_features_validation and \ @@ -176,7 +171,7 @@ test-python-universal-mssql: sdk/python/tests -# To use Athena as an offline store, you need to create an Athena database and an S3 bucket on AWS. +# To use Athena as an offline store, you need to create an Athena database and an S3 bucket on AWS. # https://docs.aws.amazon.com/athena/latest/ug/getting-started.html # Modify environment variables ATHENA_REGION, ATHENA_DATA_SOURCE, ATHENA_DATABASE, ATHENA_WORKGROUP or # ATHENA_S3_BUCKET_NAME according to your needs. If tests fail with the pytest -n 8 option, change the number to 1. @@ -190,8 +185,7 @@ test-python-universal-athena: ATHENA_WORKGROUP=primary \ ATHENA_S3_BUCKET_NAME=feast-int-bucket \ python -m pytest -n 8 --integration \ - -k "not test_go_feature_server and \ - not test_logged_features_validation and \ + -k "not test_logged_features_validation and \ not test_lambda and \ not test_feature_logging and \ not test_offline_write and \ @@ -203,7 +197,7 @@ test-python-universal-athena: not s3_registry and \ not test_snowflake" \ sdk/python/tests - + test-python-universal-postgres-offline: PYTHONPATH='.' \ FULL_REPO_CONFIGS_MODULE=sdk.python.feast.infra.offline_stores.contrib.postgres_repo_configuration \ @@ -212,7 +206,6 @@ test-python-universal-postgres-offline: -k "not test_historical_retrieval_with_validation and \ not test_historical_features_persisting and \ not test_universal_cli and \ - not test_go_feature_server and \ not test_feature_logging and \ not test_reorder_columns and \ not test_logged_features_validation and \ @@ -221,7 +214,6 @@ test-python-universal-postgres-offline: not test_push_features_to_offline_store and \ not gcs_registry and \ not s3_registry and \ - not test_snowflake and \ not test_universal_types" \ sdk/python/tests @@ -231,7 +223,6 @@ test-python-universal-postgres-online: PYTEST_PLUGINS=sdk.python.tests.integration.feature_repos.universal.online_store.postgres \ python -m pytest -n 8 --integration \ -k "not test_universal_cli and \ - not test_go_feature_server and \ not test_feature_logging and \ not test_reorder_columns and \ not test_logged_features_validation and \ @@ -250,7 +241,6 @@ test-python-universal-postgres-online: PYTEST_PLUGINS=sdk.python.tests.integration.feature_repos.universal.online_store.postgres \ python -m pytest -n 8 --integration \ -k "not test_universal_cli and \ - not test_go_feature_server and \ not test_feature_logging and \ not test_reorder_columns and \ not test_logged_features_validation and \ @@ -269,7 +259,6 @@ test-python-universal-postgres-online: PYTEST_PLUGINS=sdk.python.tests.integration.feature_repos.universal.online_store.mysql \ python -m pytest -n 8 --integration \ -k "not test_universal_cli and \ - not test_go_feature_server and \ not test_feature_logging and \ not test_reorder_columns and \ not test_logged_features_validation and \ @@ -295,7 +284,6 @@ test-python-universal-hazelcast: PYTEST_PLUGINS=sdk.python.tests.integration.feature_repos.universal.online_store.hazelcast \ python -m pytest -n 8 --integration \ -k "not test_universal_cli and \ - not test_go_feature_server and \ not test_feature_logging and \ not test_reorder_columns and \ not test_logged_features_validation and \ @@ -331,7 +319,6 @@ test-python-universal-cassandra-no-cloud-providers: PYTEST_PLUGINS=sdk.python.tests.integration.feature_repos.universal.online_store.elasticsearch \ python -m pytest -n 8 --integration \ -k "not test_universal_cli and \ - not test_go_feature_server and \ not test_feature_logging and \ not test_reorder_columns and \ not test_logged_features_validation and \ @@ -344,23 +331,9 @@ test-python-universal-cassandra-no-cloud-providers: not test_snowflake" \ sdk/python/tests -test-python-universal-singlestore-online: - PYTHONPATH='.' \ - FULL_REPO_CONFIGS_MODULE=sdk.python.feast.infra.online_stores.contrib.singlestore_repo_configuration \ - PYTEST_PLUGINS=sdk.python.tests.integration.feature_repos.universal.online_store.singlestore \ - python -m pytest -n 8 --integration \ - -k "not test_universal_cli and \ - not gcs_registry and \ - not s3_registry and \ - not test_snowflake" \ - sdk/python/tests - test-python-universal: python -m pytest -n 8 --integration sdk/python/tests -test-python-go-server: compile-go-lib - FEAST_USAGE=False IS_TEST=True pytest --integration --goserver sdk/python/tests - format-python: cd ${ROOT_DIR}/sdk/python; python -m ruff check --fix feast/ tests/ cd ${ROOT_DIR}/sdk/python; python -m ruff format feast/ tests/ @@ -428,22 +401,20 @@ install-protoc-dependencies: compile-protos-go: install-go-proto-dependencies install-protoc-dependencies python setup.py build_go_protos -compile-go-lib: install-go-proto-dependencies install-go-ci-dependencies - CGO_LDFLAGS_ALLOW=".*" COMPILE_GO=True python setup.py build_ext --inplace - install-feast-ci-locally: pip install -e ".[ci]" -# Needs feast package to setup the feature store -# CGO flag is due to this issue: https://github.com/golang/go/wiki/InvalidFlag -test-go: compile-protos-go compile-protos-python compile-go-lib install-feast-ci-locally - CGO_LDFLAGS_ALLOW=".*" go test -tags cgo,ccalloc ./... +build-go: compile-protos-go + go build -o feast ./go/main.go + +test-go: compile-protos-go compile-protos-python install-feast-ci-locally + CGO_ENABLED=1 go test -coverprofile=coverage.out ./... && go tool cover -html=coverage.out -o coverage.html format-go: gofmt -s -w go/ -lint-go: compile-protos-go compile-go-lib - go vet -tags cgo,ccalloc ./go/internal/feast ./go/embedded +lint-go: compile-protos-go + go vet ./go/internal/feast # Docker @@ -476,6 +447,14 @@ build-feature-server-java-docker: -t $(REGISTRY)/feature-server-java:$(VERSION) \ -f java/infra/docker/feature-server/Dockerfile --load . +push-feature-server-go-docker: + docker push $(REGISTRY)/feature-server-go:$(VERSION) + +build-feature-server-go-docker: + docker buildx build --build-arg VERSION=$(VERSION) \ + -t $(REGISTRY)/feature-server-go:$(VERSION) \ + -f go/infra/docker/feature-server/Dockerfile --load . + push-feast-operator-docker: cd infra/feast-operator && \ IMAGE_TAG_BASE=$(REGISTRY)/feast-operator \ @@ -504,6 +483,11 @@ build-java-docker-dev: -t feastdev/feature-server-java:dev \ -f java/infra/docker/feature-server/Dockerfile.dev --load . +build-go-docker-dev: + docker buildx build --build-arg VERSION=dev \ + -t feastdev/feature-server-go:dev \ + -f go/infra/docker/feature-server/Dockerfile --load . + # Documentation install-dependencies-proto-docs: @@ -542,4 +526,4 @@ build-helm-docs: # Note: requires node and yarn to be installed build-ui: - cd $(ROOT_DIR)/sdk/python/feast/ui && yarn upgrade @feast-dev/feast-ui --latest && yarn install && npm run build --omit=dev + cd $(ROOT_DIR)/sdk/python/feast/ui && yarn upgrade @feast-dev/feast-ui --latest && yarn install && npm run build --omit=dev \ No newline at end of file diff --git a/go.mod b/go.mod index 50052a64c2..58eda4aaa8 100644 --- a/go.mod +++ b/go.mod @@ -7,7 +7,6 @@ replace github.com/go-python/gopy v0.4.4 => github.com/feast-dev/gopy v0.4.1-0.2 require ( github.com/apache/arrow/go/v8 v8.0.0 github.com/ghodss/yaml v1.0.0 - github.com/go-python/gopy v0.4.4 // github.com/go-redis/redis/v8 v8.11.5 github.com/golang/protobuf v1.5.3 github.com/google/uuid v1.3.0 @@ -44,8 +43,6 @@ require ( require github.com/rs/zerolog v1.21.0 -require github.com/ianlancetaylor/cgosymbolizer v0.0.0-20230801000641-8736a9d41aaa - require ( github.com/JohnCGriffin/overflow v0.0.0-20211019200055-46fa312c352c // indirect github.com/andybalholm/brotli v1.0.5 // indirect @@ -63,6 +60,7 @@ require ( github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3 // indirect github.com/pierrec/lz4/v4 v4.1.15 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/stretchr/objx v0.5.0 // indirect github.com/zeebo/xxh3 v1.0.2 // indirect go.uber.org/atomic v1.11.0 // indirect golang.org/x/exp v0.0.0-20220827204233-334a2380cb91 // indirect diff --git a/go.sum b/go.sum index a910eb4406..25f5bcc79f 100644 --- a/go.sum +++ b/go.sum @@ -115,8 +115,6 @@ github.com/envoyproxy/go-control-plane v0.9.9-0.20210512163311-63b5d3c536b0/go.m github.com/envoyproxy/go-control-plane v0.9.10-0.20210907150352-cf90f659a021/go.mod h1:AFq3mo9L8Lqqiid3OhADV3RfLJnjiw63cSpi+fDTRC0= github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4= -github.com/feast-dev/gopy v0.4.1-0.20220714211711-252048177d85 h1:BKmfqWiDbxvviB6vemPbbNjF+ywRsBMCdk1QvrcGgkc= -github.com/feast-dev/gopy v0.4.1-0.20220714211711-252048177d85/go.mod h1:tlA/KcD7rM8B+NQJR4SASwiinfKY0aiMFanHszR8BZA= github.com/fogleman/gg v1.2.1-0.20190220221249-0403632d5b90/go.mod h1:R/bRT+9gY/C5z7JzPU0zXsXHKM4/ayA+zqcVNZzPa1k= github.com/fogleman/gg v1.3.0/go.mod h1:R/bRT+9gY/C5z7JzPU0zXsXHKM4/ayA+zqcVNZzPa1k= github.com/franela/goblin v0.0.0-20200105215937-c9ffbefa60db/go.mod h1:7dvUGVsVBjqR7JHJk0brhHOZYGmfBYOrK0ZhYMEtBr4= @@ -226,8 +224,6 @@ github.com/hashicorp/memberlist v0.1.3/go.mod h1:ajVTdAv/9Im8oMAAj5G31PhhMCZJV2p github.com/hashicorp/serf v0.8.2/go.mod h1:6hOLApaqBFA1NXqRQAsxw9QxuDEvNxSQRwA/JwenrHc= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/hudl/fargo v1.3.0/go.mod h1:y3CKSmjA+wD2gak7sUSXTAoopbhU08POFhmITJgmKTg= -github.com/ianlancetaylor/cgosymbolizer v0.0.0-20230801000641-8736a9d41aaa h1:FEZID0R3+pkWLvjmZJ2iL+SZTcb2+/PgVvoyQss/q/I= -github.com/ianlancetaylor/cgosymbolizer v0.0.0-20230801000641-8736a9d41aaa/go.mod h1:DvXTE/K/RtHehxU8/GtDs4vFtfw64jJ3PaCnFri8CRg= github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8= github.com/influxdata/influxdb1-client v0.0.0-20191209144304-8bf82d3c094d/go.mod h1:qj24IKcXYK6Iy9ceXlo3Tc+vtHo9lIhSX5JddghvEPo= github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k= @@ -701,4 +697,4 @@ inet.af/netaddr v0.0.0-20220811202034-502d2d690317 h1:U2fwK6P2EqmopP/hFLTOAjWTki inet.af/netaddr v0.0.0-20220811202034-502d2d690317/go.mod h1:OIezDfdzOgFhuw4HuWapWq2e9l0H9tK4F1j+ETRtF3k= rsc.io/pdf v0.1.1/go.mod h1:n8OzWcQ6Sp37PL01nO98y4iUCRdTGarVfzxY20ICaU4= sigs.k8s.io/yaml v1.1.0/go.mod h1:UJmg0vDUVViEyp3mgSv9WPwZCDxu4rQW1olrI1uml+o= -sourcegraph.com/sourcegraph/appdash v0.0.0-20190731080439-ebfcffb1b5c0/go.mod h1:hI742Nqp5OhwiqlzhgfbWU4mW4yO10fP+LoT9WOswdU= \ No newline at end of file +sourcegraph.com/sourcegraph/appdash v0.0.0-20190731080439-ebfcffb1b5c0/go.mod h1:hI742Nqp5OhwiqlzhgfbWU4mW4yO10fP+LoT9WOswdU= diff --git a/go/README.md b/go/README.md index 2091137501..43125bfad3 100644 --- a/go/README.md +++ b/go/README.md @@ -1,109 +1,9 @@ This directory contains the Go logic that's executed by the `EmbeddedOnlineFeatureServer` from Python. -## Building and Linking -[gopy](https://github.com/go-python/gopy) generates (and compiles) a CPython extension module from a Go package. That's what we're using here, as visible in [setup.py](../setup.py). +## Build and Run +To build and run the Go Feature Server locally, create a feature_store.yaml file with necessary configurations and run below commands: -Under the hood, gopy invokes `go build`, and then templates `cgo` stubs for the Go module that exposes the public functions from the Go module as C functions. -For our project, this stuff can be found at `sdk/python/feast/embedded_go/lib/embedded.go` & `sdk/python/feast/embedded_go/lib/embedded_go.h` after running `make compile-go-lib`. - -## Arrow memory management -Understanding this is the trickiest part of this integration. - -At a high level, when using the Python<>Go integration, the Python layer exports request data into an [Arrow Record batch](https://arrow.apache.org/docs/python/data.html) which is transferred to Go using Arrow's zero copy mechanism. -Similarly, the Go layer converts feature values read from the online store into a Record Batch that's exported to Python using the same mechanics. - -The first thing to note is that from the Python perspective, all the export logic assumes that we're exporting to & importing from C, not Go. This is because pyarrow only interops with C, and the fact we're using Go is an implementation detail not relevant to the Python layer. - -### Export Entities & Request data from Python to Go -The code exporting to C is this, in [online_feature_service.py](../sdk/python/feast/embedded_go/online_features_service.py) -``` -( - entities_c_schema, - entities_ptr_schema, - entities_c_array, - entities_ptr_array, -) = allocate_schema_and_array() -( - req_data_c_schema, - req_data_ptr_schema, - req_data_c_array, - req_data_ptr_array, -) = allocate_schema_and_array() - -batch, schema = map_to_record_batch(entities, join_keys_types) -schema._export_to_c(entities_ptr_schema) -batch._export_to_c(entities_ptr_array) - -batch, schema = map_to_record_batch(request_data) -schema._export_to_c(req_data_ptr_schema) -batch._export_to_c(req_data_ptr_array) -``` - -Under the hood, `allocate_schema_and_array` allocates a pointer (`struct ArrowSchema*` and `struct ArrowArray*`) in native memory (i.e. the C layer) using `cffi`. -Next, the RecordBatch exports to this pointer using [`_export_to_c`](https://github.com/apache/arrow/blob/master/python/pyarrow/table.pxi#L2509), which uses [`ExportRecordBatch`](https://arrow.apache.org/docs/cpp/api/c_abi.html#_CPPv417ExportRecordBatchRK11RecordBatchP10ArrowArrayP11ArrowSchema) under the hood. - -As per the documentation for ExportRecordBatch: -> Status ExportRecordBatch(const RecordBatch &batch, struct ArrowArray *out, struct ArrowSchema *out_schema = NULLPTR) -> Export C++ RecordBatch using the C data interface format. -> -> The record batch is exported as if it were a struct array. The resulting ArrowArray struct keeps the record batch data and buffers alive until its release callback is called by the consumer. - -This is why `GetOnlineFeatures()` in `online_features.go` calls `record.Release()` as below: -``` -entitiesRecord, err := readArrowRecord(entities) -if err != nil { - return err -} -defer entitiesRecord.Release() -... -requestDataRecords, err := readArrowRecord(requestData) -if err != nil { - return err -} -defer requestDataRecords.Release() -``` - -Additionally, we need to pass in a pair of pointers to `GetOnlineFeatures()` that are populated by the Go layer, and the resultant feature values can be passed back to Python (via the C layer) using zero-copy semantics. -That happens as follows: -``` -( - features_c_schema, - features_ptr_schema, - features_c_array, - features_ptr_array, -) = allocate_schema_and_array() - -... - -record_batch = pa.RecordBatch._import_from_c( - features_ptr_array, features_ptr_schema -) -``` - -The corresponding Go code that exports this data is: -``` -result := array.NewRecord(arrow.NewSchema(outputFields, nil), outputColumns, int64(numRows)) - -cdata.ExportArrowRecordBatch(result, - cdata.ArrayFromPtr(output.DataPtr), - cdata.SchemaFromPtr(output.SchemaPtr)) -``` - -The documentation for `ExportArrowRecordBatch` is great. It has this super useful caveat: - -> // The release function on the populated CArrowArray will properly decrease the reference counts, -> // and release the memory if the record has already been released. But since this must be explicitly -> // done, make sure it is released so that you do not create a memory leak. - -This implies that the receiver is on the hook for explicitly releasing this memory. - -However, we're using `_import_from_c`, which uses [`ImportRecordBatch`](https://arrow.apache.org/docs/cpp/api/c_abi.html#_CPPv417ImportRecordBatchP10ArrowArrayP11ArrowSchema), which implies that the receiver of the RecordBatch is the new owner of the data. -This is wrapped by pyarrow - and when the corresponding python object goes out of scope, it should clean up the underlying record batch. - -Another thing to note (which I'm not sure may be the source of issues) is that Arrow has the concept of [Memory Pools](https://arrow.apache.org/docs/python/api/memory.html#memory-pools). -Memory pools can be set in python as well as in Go. I *believe* that if we use the CGoArrowAllocator, that uses whatever pool C++ uses, which should be the same as the one used by PyArrow. But this should be vetted. - - -### References -- https://arrow.apache.org/docs/format/CDataInterface.html#memory-management -- https://arrow.apache.org/docs/python/memory.html \ No newline at end of file +```bash + go build -o feast ./go/main.go + ./feast --type=http --port +``` \ No newline at end of file diff --git a/go/infra/docker/feature-server/Dockerfile b/go/infra/docker/feature-server/Dockerfile new file mode 100644 index 0000000000..cf63bb4559 --- /dev/null +++ b/go/infra/docker/feature-server/Dockerfile @@ -0,0 +1,31 @@ +FROM golang:1.22.5 + +# Update the package list and install the ca-certificates package +RUN apt-get update && apt-get install -y ca-certificates +RUN apt install -y protobuf-compiler + +RUN go install google.golang.org/protobuf/cmd/protoc-gen-go@v1.31.0 +RUN go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@v1.3.0 + +# Set the current working directory inside the container +WORKDIR /app + +# Copy the source code into the container +COPY go/ ./go/ +COPY go.mod go.sum ./ + +# Compile Protobuf files +COPY protos/ ./protos/ +RUN mkdir -p go/protos +RUN find ./protos -name "*.proto" \ + -exec protoc --proto_path=protos --go_out=go/protos --go_opt=module=github.com/feast-dev/feast/go/protos --go-grpc_out=go/protos --go-grpc_opt=module=github.com/feast-dev/feast/go/protos {} \; + +# Build the Go application +RUN go build -o feast ./go/main.go + +# Expose ports +EXPOSE 8080 + +# Command to run the executable +# Pass arguments to the executable (Ex: ./feast --type=grpc) +CMD ["./feast"] \ No newline at end of file diff --git a/go/internal/feast/registry/http.go b/go/internal/feast/registry/http.go index d1d69c1b3a..65ca2ccf32 100644 --- a/go/internal/feast/registry/http.go +++ b/go/internal/feast/registry/http.go @@ -3,12 +3,13 @@ package registry import ( "crypto/tls" "fmt" - "github.com/feast-dev/feast/go/protos/feast/core" - "github.com/rs/zerolog/log" - "google.golang.org/protobuf/proto" "io" "net/http" "time" + + "github.com/feast-dev/feast/go/protos/feast/core" + "github.com/rs/zerolog/log" + "google.golang.org/protobuf/proto" ) type HttpRegistryStore struct { @@ -59,7 +60,7 @@ func (hrs *HttpRegistryStore) TestConnectivity() error { defer resp.Body.Close() if resp.StatusCode != http.StatusOK { - return fmt.Errorf("HTTP Registry connecitiy check failed with status code: %d", resp.StatusCode) + return fmt.Errorf("HTTP Registry connectivity check failed with status code: %d", resp.StatusCode) } return nil @@ -157,9 +158,6 @@ func (r *HttpRegistryStore) loadOnDemandFeatureViews(registry *core.Registry) er if err := proto.Unmarshal(data, od_feature_view_list); err != nil { return err } - if len(od_feature_view_list.GetOndemandfeatureviews()) == 0 { - log.Warn().Msg(fmt.Sprintf("Feature Registry has no associated Ondemandfeatureviews for project %s.", r.project)) - } registry.OnDemandFeatureViews = append(registry.OnDemandFeatureViews, od_feature_view_list.GetOndemandfeatureviews()...) return nil }) @@ -172,9 +170,6 @@ func (r *HttpRegistryStore) loadFeatureServices(registry *core.Registry) error { if err := proto.Unmarshal(data, feature_service_list); err != nil { return err } - if len(feature_service_list.GetFeatureservices()) == 0 { - log.Warn().Msg(fmt.Sprintf("Feature Registry has no associated FeatureServices for project %s.", r.project)) - } registry.FeatureServices = append(registry.FeatureServices, feature_service_list.GetFeatureservices()...) return nil }) diff --git a/go/internal/feast/registry/registry.go b/go/internal/feast/registry/registry.go index 830e528f9d..6ff6f20641 100644 --- a/go/internal/feast/registry/registry.go +++ b/go/internal/feast/registry/registry.go @@ -129,7 +129,6 @@ func (r *Registry) load(registry *core.Registry) { func (r *Registry) loadEntities(registry *core.Registry) { entities := registry.Entities for _, entity := range entities { - // fmt.Println("Entity load: ", entity.Spec.Name) if _, ok := r.cachedEntities[r.project]; !ok { r.cachedEntities[r.project] = make(map[string]*core.Entity) } @@ -140,7 +139,6 @@ func (r *Registry) loadEntities(registry *core.Registry) { func (r *Registry) loadFeatureServices(registry *core.Registry) { featureServices := registry.FeatureServices for _, featureService := range featureServices { - // fmt.Println("featureServices load: ", featureService.Spec.Name) if _, ok := r.cachedFeatureServices[r.project]; !ok { r.cachedFeatureServices[r.project] = make(map[string]*core.FeatureService) } @@ -151,7 +149,6 @@ func (r *Registry) loadFeatureServices(registry *core.Registry) { func (r *Registry) loadFeatureViews(registry *core.Registry) { featureViews := registry.FeatureViews for _, featureView := range featureViews { - // fmt.Println("featureView load: ", featureView.Spec.Name) if _, ok := r.cachedFeatureViews[r.project]; !ok { r.cachedFeatureViews[r.project] = make(map[string]*core.FeatureView) } @@ -162,7 +159,6 @@ func (r *Registry) loadFeatureViews(registry *core.Registry) { func (r *Registry) loadStreamFeatureViews(registry *core.Registry) { streamFeatureViews := registry.StreamFeatureViews for _, streamFeatureView := range streamFeatureViews { - // fmt.Println("streamFeatureView load: ", streamFeatureView.Spec.Name) if _, ok := r.cachedStreamFeatureViews[r.project]; !ok { r.cachedStreamFeatureViews[r.project] = make(map[string]*core.StreamFeatureView) } @@ -173,7 +169,6 @@ func (r *Registry) loadStreamFeatureViews(registry *core.Registry) { func (r *Registry) loadOnDemandFeatureViews(registry *core.Registry) { onDemandFeatureViews := registry.OnDemandFeatureViews for _, onDemandFeatureView := range onDemandFeatureViews { - // fmt.Println("onDemandFeatureView load: ", onDemandFeatureView.Spec.Name) if _, ok := r.cachedOnDemandFeatureViews[r.project]; !ok { r.cachedOnDemandFeatureViews[r.project] = make(map[string]*core.OnDemandFeatureView) } diff --git a/go/internal/feast/registry/repoconfig.go b/go/internal/feast/registry/repoconfig.go index 2b140ad5da..f70310f261 100644 --- a/go/internal/feast/registry/repoconfig.go +++ b/go/internal/feast/registry/repoconfig.go @@ -3,9 +3,11 @@ package registry import ( "encoding/json" "fmt" - "io/ioutil" + "os" "path/filepath" + "time" + "github.com/feast-dev/feast/go/internal/feast/server/logging" "github.com/ghodss/yaml" ) @@ -60,7 +62,7 @@ func NewRepoConfigFromJSON(repoPath, configJSON string) (*RepoConfig, error) { // NewRepoConfigFromFile reads the `feature_store.yaml` file in the repo path and converts it // into a RepoConfig struct. func NewRepoConfigFromFile(repoPath string) (*RepoConfig, error) { - data, err := ioutil.ReadFile(filepath.Join(repoPath, "feature_store.yaml")) + data, err := os.ReadFile(filepath.Join(repoPath, "feature_store.yaml")) if err != nil { return nil, err } @@ -69,14 +71,44 @@ func NewRepoConfigFromFile(repoPath string) (*RepoConfig, error) { return nil, err } + repoConfigWithEnv := os.ExpandEnv(string(data)) + config := RepoConfig{} - if err = yaml.Unmarshal(data, &config); err != nil { + if err = yaml.Unmarshal([]byte(repoConfigWithEnv), &config); err != nil { return nil, err } config.RepoPath = repoPath return &config, nil } +func (r *RepoConfig) GetLoggingOptions() (*logging.LoggingOptions, error) { + loggingOptions := logging.LoggingOptions{} + if loggingOptionsMap, ok := r.FeatureServer["feature_logging"].(map[string]interface{}); ok { + loggingOptions = logging.DefaultOptions + for k, v := range loggingOptionsMap { + switch k { + case "queue_capacity": + if value, ok := v.(int); ok { + loggingOptions.ChannelCapacity = value + } + case "emit_timeout_micro_secs": + if value, ok := v.(int); ok { + loggingOptions.EmitTimeout = time.Duration(value) * time.Microsecond + } + case "write_to_disk_interval_secs": + if value, ok := v.(int); ok { + loggingOptions.WriteInterval = time.Duration(value) * time.Second + } + case "flush_interval_secs": + if value, ok := v.(int); ok { + loggingOptions.FlushInterval = time.Duration(value) * time.Second + } + } + } + } + return &loggingOptions, nil +} + func (r *RepoConfig) GetRegistryConfig() (*RegistryConfig, error) { if registryConfigMap, ok := r.Registry.(map[string]interface{}); ok { registryConfig := RegistryConfig{CacheTtlSeconds: defaultCacheTtlSeconds, ClientId: defaultClientID} diff --git a/go/internal/feast/registry/repoconfig_test.go b/go/internal/feast/registry/repoconfig_test.go index 540ffd0b6c..90a20b204a 100644 --- a/go/internal/feast/registry/repoconfig_test.go +++ b/go/internal/feast/registry/repoconfig_test.go @@ -1,12 +1,13 @@ package registry import ( - "fmt" "os" "path/filepath" "strings" "testing" + "time" + "github.com/feast-dev/feast/go/internal/feast/server/logging" "github.com/stretchr/testify/assert" ) @@ -166,8 +167,6 @@ func TestGetRegistryConfig_Map(t *testing.T) { // Call the method under test registryConfig, _ := config.GetRegistryConfig() - fmt.Println(registryConfig) - // Assert that the method correctly processed the map assert.Equal(t, "data/registry.db", registryConfig.Path) assert.Equal(t, "local", registryConfig.RegistryStoreType) @@ -220,3 +219,87 @@ func TestGetRegistryConfig_CacheTtlSecondsTypes(t *testing.T) { assert.Equal(t, int64(60), registryConfig.CacheTtlSeconds) } } + +func TestGetLoggingOptions_Defaults(t *testing.T) { + config := RepoConfig{ + FeatureServer: map[string]interface{}{ + "feature_logging": map[string]interface{}{}, + }, + } + options, err := config.GetLoggingOptions() + assert.Nil(t, err) + assert.Equal(t, logging.DefaultOptions, *options) +} + +func TestGetLoggingOptions_QueueCapacity(t *testing.T) { + config := RepoConfig{ + FeatureServer: map[string]interface{}{ + "feature_logging": map[string]interface{}{ + "queue_capacity": 100, + }, + }, + } + expected := logging.DefaultOptions + expected.ChannelCapacity = 100 + options, err := config.GetLoggingOptions() + assert.Nil(t, err) + assert.Equal(t, expected, *options) +} + +func TestGetLoggingOptions_EmitTimeoutMicroSecs(t *testing.T) { + config := RepoConfig{ + FeatureServer: map[string]interface{}{ + "feature_logging": map[string]interface{}{ + "emit_timeout_micro_secs": 500, + }, + }, + } + expected := logging.DefaultOptions + expected.EmitTimeout = 500 * time.Microsecond + options, err := config.GetLoggingOptions() + assert.Nil(t, err) + assert.Equal(t, expected, *options) +} + +func TestGetLoggingOptions_WriteToDiskIntervalSecs(t *testing.T) { + config := RepoConfig{ + FeatureServer: map[string]interface{}{ + "feature_logging": map[string]interface{}{ + "write_to_disk_interval_secs": 10, + }, + }, + } + expected := logging.DefaultOptions + expected.WriteInterval = 10 * time.Second + options, err := config.GetLoggingOptions() + assert.Nil(t, err) + assert.Equal(t, expected, *options) +} + +func TestGetLoggingOptions_FlushIntervalSecs(t *testing.T) { + config := RepoConfig{ + FeatureServer: map[string]interface{}{ + "feature_logging": map[string]interface{}{ + "flush_interval_secs": 15, + }, + }, + } + expected := logging.DefaultOptions + expected.FlushInterval = 15 * time.Second + options, err := config.GetLoggingOptions() + assert.Nil(t, err) + assert.Equal(t, expected, *options) +} + +func TestGetLoggingOptions_InvalidType(t *testing.T) { + config := RepoConfig{ + FeatureServer: map[string]interface{}{ + "feature_logging": map[string]interface{}{ + "queue_capacity": "invalid", + }, + }, + } + options, err := config.GetLoggingOptions() + assert.Nil(t, err) + assert.Equal(t, logging.DefaultOptions, *options) +} diff --git a/go/internal/feast/server/logging/memorybuffer.go b/go/internal/feast/server/logging/memorybuffer.go index c9f00218df..1febed6ffd 100644 --- a/go/internal/feast/server/logging/memorybuffer.go +++ b/go/internal/feast/server/logging/memorybuffer.go @@ -2,6 +2,7 @@ package logging import ( "fmt" + "github.com/apache/arrow/go/v8/arrow" "github.com/apache/arrow/go/v8/arrow/array" "github.com/apache/arrow/go/v8/arrow/memory" @@ -128,7 +129,7 @@ func getArrowSchema(schema *FeatureServiceSchema) (*arrow.Schema, error) { // and writes them to arrow table. // Returns arrow table that contains all of the logs in columnar format. func (b *MemoryBuffer) convertToArrowRecord() (arrow.Record, error) { - arrowMemory := memory.NewCgoArrowAllocator() + arrowMemory := memory.NewGoAllocator() numRows := len(b.logs) columns := make(map[string][]*types.Value) diff --git a/go/internal/feast/server/logging/memorybuffer_test.go b/go/internal/feast/server/logging/memorybuffer_test.go index ec83680f4f..6158ead4d3 100644 --- a/go/internal/feast/server/logging/memorybuffer_test.go +++ b/go/internal/feast/server/logging/memorybuffer_test.go @@ -118,7 +118,7 @@ func TestSerializeToArrowTable(t *testing.T) { LogTimestamp: time.Now(), }) - pool := memory.NewCgoArrowAllocator() + pool := memory.NewGoAllocator() builder := array.NewRecordBuilder(pool, b.arrowSchema) defer builder.Release() diff --git a/go/internal/feast/transformation/transformation.go b/go/internal/feast/transformation/transformation.go index 65a19fb856..85eac9e891 100644 --- a/go/internal/feast/transformation/transformation.go +++ b/go/internal/feast/transformation/transformation.go @@ -2,27 +2,19 @@ package transformation import ( "context" - "errors" "fmt" "runtime" "strings" - "unsafe" "github.com/apache/arrow/go/v8/arrow" - "github.com/apache/arrow/go/v8/arrow/array" - "github.com/apache/arrow/go/v8/arrow/cdata" "github.com/apache/arrow/go/v8/arrow/memory" "github.com/rs/zerolog/log" - "google.golang.org/protobuf/types/known/timestamppb" "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer" "github.com/feast-dev/feast/go/internal/feast/model" "github.com/feast-dev/feast/go/internal/feast/onlineserving" - "github.com/feast-dev/feast/go/protos/feast/serving" prototypes "github.com/feast-dev/feast/go/protos/feast/types" "github.com/feast-dev/feast/go/types" - - _ "github.com/ianlancetaylor/cgosymbolizer" ) /* @@ -89,19 +81,6 @@ func AugmentResponseWithOnDemandTransforms( ReleaseArrowContext(requestContextArrow) return nil, err } - } else { - onDemandFeatures, err = CallTransformations( - odfv, - retrievedFeatures, - requestContextArrow, - transformationCallback, - numRows, - fullFeatureNames, - ) - if err != nil { - ReleaseArrowContext(requestContextArrow) - return nil, err - } } result = append(result, onDemandFeatures...) @@ -118,121 +97,6 @@ func ReleaseArrowContext(requestContextArrow map[string]arrow.Array) { } } -func CallTransformations( - featureView *model.OnDemandFeatureView, - retrievedFeatures map[string]arrow.Array, - requestContext map[string]arrow.Array, - callback TransformationCallback, - numRows int, - fullFeatureNames bool, -) ([]*onlineserving.FeatureVector, error) { - - inputArr := cdata.CArrowArray{} - inputSchema := cdata.CArrowSchema{} - - outArr := cdata.CArrowArray{} - outSchema := cdata.CArrowSchema{} - - defer cdata.ReleaseCArrowArray(&inputArr) - defer cdata.ReleaseCArrowArray(&outArr) - defer cdata.ReleaseCArrowSchema(&inputSchema) - defer cdata.ReleaseCArrowSchema(&outSchema) - - inputArrPtr := uintptr(unsafe.Pointer(&inputArr)) - inputSchemaPtr := uintptr(unsafe.Pointer(&inputSchema)) - - outArrPtr := uintptr(unsafe.Pointer(&outArr)) - outSchemaPtr := uintptr(unsafe.Pointer(&outSchema)) - - inputFields := make([]arrow.Field, 0) - inputColumns := make([]arrow.Array, 0) - for name, arr := range retrievedFeatures { - inputFields = append(inputFields, arrow.Field{Name: name, Type: arr.DataType()}) - inputColumns = append(inputColumns, arr) - } - for name, arr := range requestContext { - inputFields = append(inputFields, arrow.Field{Name: name, Type: arr.DataType()}) - inputColumns = append(inputColumns, arr) - } - - inputRecord := array.NewRecord(arrow.NewSchema(inputFields, nil), inputColumns, int64(numRows)) - defer inputRecord.Release() - - cdata.ExportArrowRecordBatch(inputRecord, &inputArr, &inputSchema) - - // Recover from a panic from FFI so the server doesn't crash - var err error - defer func() { - if e := recover(); e != nil { - logStackTrace() - switch value := e.(type) { - case error: - log.Error().Err(value).Msg("") - err = fmt.Errorf("python transformation callback error: %w\n", value) - case string: - log.Error().Msg(value) - err = fmt.Errorf("python transformation callback error: %s\n", value) - default: - log.Error().Msg("Unknown panic") - err = fmt.Errorf("python transformation callback error: %v\n", value) - } - } - }() - ret := callback(featureView.Base.Name, inputArrPtr, inputSchemaPtr, outArrPtr, outSchemaPtr, fullFeatureNames) - - if ret != numRows { - return nil, errors.New("python transformation callback failed") - } - - outRecord, err := cdata.ImportCRecordBatch(&outArr, &outSchema) - if err != nil { - return nil, err - } - - result := make([]*onlineserving.FeatureVector, 0) - for idx, field := range outRecord.Schema().Fields() { - dropFeature := true - - if featureView.Base.Projection != nil { - var featureName string - if fullFeatureNames { - featureName = strings.Split(field.Name, "__")[1] - } else { - featureName = field.Name - } - - for _, feature := range featureView.Base.Projection.Features { - if featureName == feature.Name { - dropFeature = false - } - } - } else { - dropFeature = false - } - - if dropFeature { - continue - } - - statuses := make([]serving.FieldStatus, numRows) - timestamps := make([]*timestamppb.Timestamp, numRows) - - for idx := 0; idx < numRows; idx++ { - statuses[idx] = serving.FieldStatus_PRESENT - timestamps[idx] = timestamppb.Now() - } - - result = append(result, &onlineserving.FeatureVector{ - Name: field.Name, - Values: outRecord.Column(idx), - Statuses: statuses, - Timestamps: timestamps, - }) - } - - return result, nil -} - func EnsureRequestedDataExist(requestedOnDemandFeatureViews []*model.OnDemandFeatureView, requestDataFeatures map[string]*prototypes.RepeatedValue) error { diff --git a/go/main.go b/go/main.go new file mode 100644 index 0000000000..d2d9814db8 --- /dev/null +++ b/go/main.go @@ -0,0 +1,180 @@ +package main + +import ( + "flag" + "fmt" + "net" + "os" + "os/signal" + "strings" + "syscall" + + "github.com/feast-dev/feast/go/internal/feast" + "github.com/feast-dev/feast/go/internal/feast/registry" + "github.com/feast-dev/feast/go/internal/feast/server" + "github.com/feast-dev/feast/go/internal/feast/server/logging" + "github.com/feast-dev/feast/go/protos/feast/serving" + "github.com/rs/zerolog/log" + "google.golang.org/grpc" + "google.golang.org/grpc/health" + "google.golang.org/grpc/health/grpc_health_v1" + + grpctrace "gopkg.in/DataDog/dd-trace-go.v1/contrib/google.golang.org/grpc" + "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer" +) + +type ServerStarter interface { + StartHttpServer(fs *feast.FeatureStore, host string, port int, writeLoggedFeaturesCallback logging.OfflineStoreWriteCallback, loggingOpts *logging.LoggingOptions) error + StartGrpcServer(fs *feast.FeatureStore, host string, port int, writeLoggedFeaturesCallback logging.OfflineStoreWriteCallback, loggingOpts *logging.LoggingOptions) error +} + +type RealServerStarter struct{} + +func (s *RealServerStarter) StartHttpServer(fs *feast.FeatureStore, host string, port int, writeLoggedFeaturesCallback logging.OfflineStoreWriteCallback, loggingOpts *logging.LoggingOptions) error { + return StartHttpServer(fs, host, port, writeLoggedFeaturesCallback, loggingOpts) +} + +func (s *RealServerStarter) StartGrpcServer(fs *feast.FeatureStore, host string, port int, writeLoggedFeaturesCallback logging.OfflineStoreWriteCallback, loggingOpts *logging.LoggingOptions) error { + return StartGrpcServer(fs, host, port, writeLoggedFeaturesCallback, loggingOpts) +} + +func main() { + // Default values + serverType := "http" + host := "" + port := 8080 + server := RealServerStarter{} + // Current Directory + repoPath, err := os.Getwd() + if err != nil { + log.Error().Stack().Err(err).Msg("Failed to get current directory") + } + + flag.StringVar(&serverType, "type", serverType, "Specify the server type (http or grpc)") + flag.StringVar(&repoPath, "chdir", repoPath, "Repository path where feature store yaml file is stored") + + flag.StringVar(&host, "host", host, "Specify a host for the server") + flag.IntVar(&port, "port", port, "Specify a port for the server") + flag.Parse() + + repoConfig, err := registry.NewRepoConfigFromFile(repoPath) + if err != nil { + log.Fatal().Stack().Err(err).Msg("Failed to convert to RepoConfig") + } + + fs, err := feast.NewFeatureStore(repoConfig, nil) + if err != nil { + log.Fatal().Stack().Err(err).Msg("Failed to create NewFeatureStore") + } + + loggingOptions, err := repoConfig.GetLoggingOptions() + if err != nil { + log.Fatal().Stack().Err(err).Msg("Failed to get LoggingOptions") + } + + // TODO: writeLoggedFeaturesCallback is defaulted to nil. write_logged_features functionality needs to be + // implemented in Golang specific to OfflineStoreSink. Python Feature Server doesn't support this. + if serverType == "http" { + err = server.StartHttpServer(fs, host, port, nil, loggingOptions) + } else if serverType == "grpc" { + err = server.StartGrpcServer(fs, host, port, nil, loggingOptions) + } else { + fmt.Println("Unknown server type. Please specify 'http' or 'grpc'.") + } + + if err != nil { + log.Fatal().Stack().Err(err).Msg("Failed to start server") + } + +} + +func constructLoggingService(fs *feast.FeatureStore, writeLoggedFeaturesCallback logging.OfflineStoreWriteCallback, loggingOpts *logging.LoggingOptions) (*logging.LoggingService, error) { + var loggingService *logging.LoggingService = nil + if writeLoggedFeaturesCallback != nil { + sink, err := logging.NewOfflineStoreSink(writeLoggedFeaturesCallback) + if err != nil { + return nil, err + } + + loggingService, err = logging.NewLoggingService(fs, sink, logging.LoggingOptions{ + ChannelCapacity: loggingOpts.ChannelCapacity, + EmitTimeout: loggingOpts.EmitTimeout, + WriteInterval: loggingOpts.WriteInterval, + FlushInterval: loggingOpts.FlushInterval, + }) + if err != nil { + return nil, err + } + } + return loggingService, nil +} + +// StartGprcServerWithLogging starts gRPC server with enabled feature logging +func StartGrpcServer(fs *feast.FeatureStore, host string, port int, writeLoggedFeaturesCallback logging.OfflineStoreWriteCallback, loggingOpts *logging.LoggingOptions) error { + if strings.ToLower(os.Getenv("ENABLE_DATADOG_TRACING")) == "true" { + tracer.Start(tracer.WithRuntimeMetrics()) + defer tracer.Stop() + } + loggingService, err := constructLoggingService(fs, writeLoggedFeaturesCallback, loggingOpts) + if err != nil { + return err + } + ser := server.NewGrpcServingServiceServer(fs, loggingService) + log.Info().Msgf("Starting a gRPC server on host %s port %d", host, port) + lis, err := net.Listen("tcp", fmt.Sprintf("%s:%d", host, port)) + if err != nil { + return err + } + + grpcServer := grpc.NewServer(grpc.UnaryInterceptor(grpctrace.UnaryServerInterceptor())) + serving.RegisterServingServiceServer(grpcServer, ser) + healthService := health.NewServer() + grpc_health_v1.RegisterHealthServer(grpcServer, healthService) + + stop := make(chan os.Signal, 1) + signal.Notify(stop, syscall.SIGINT, syscall.SIGTERM) + + go func() { + // As soon as these signals are received from OS, try to gracefully stop the gRPC server + <-stop + log.Info().Msg("Stopping the gRPC server...") + grpcServer.GracefulStop() + if loggingService != nil { + loggingService.Stop() + } + log.Info().Msg("gRPC server terminated") + }() + + return grpcServer.Serve(lis) +} + +// StartHttpServerWithLogging starts HTTP server with enabled feature logging +// Go does not allow direct assignment to package-level functions as a way to +// mock them for tests +func StartHttpServer(fs *feast.FeatureStore, host string, port int, writeLoggedFeaturesCallback logging.OfflineStoreWriteCallback, loggingOpts *logging.LoggingOptions) error { + loggingService, err := constructLoggingService(fs, writeLoggedFeaturesCallback, loggingOpts) + if err != nil { + return err + } + ser := server.NewHttpServer(fs, loggingService) + log.Info().Msgf("Starting a HTTP server on host %s, port %d", host, port) + + stop := make(chan os.Signal, 1) + signal.Notify(stop, syscall.SIGINT, syscall.SIGTERM) + + go func() { + // As soon as these signals are received from OS, try to gracefully stop the gRPC server + <-stop + log.Info().Msg("Stopping the HTTP server...") + err := ser.Stop() + if err != nil { + log.Error().Err(err).Msg("Error when stopping the HTTP server") + } + if loggingService != nil { + loggingService.Stop() + } + log.Info().Msg("HTTP server terminated") + }() + + return ser.Serve(host, port) +} diff --git a/go/main_test.go b/go/main_test.go new file mode 100644 index 0000000000..567a6cf5af --- /dev/null +++ b/go/main_test.go @@ -0,0 +1,71 @@ +package main + +import ( + "testing" + + "github.com/feast-dev/feast/go/internal/feast" + "github.com/feast-dev/feast/go/internal/feast/server/logging" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" +) + +// MockServerStarter is a mock of ServerStarter interface for testing +type MockServerStarter struct { + mock.Mock +} + +func (m *MockServerStarter) StartHttpServer(fs *feast.FeatureStore, host string, port int, writeLoggedFeaturesCallback logging.OfflineStoreWriteCallback, loggingOpts *logging.LoggingOptions) error { + args := m.Called(fs, host, port, writeLoggedFeaturesCallback, loggingOpts) + return args.Error(0) +} + +func (m *MockServerStarter) StartGrpcServer(fs *feast.FeatureStore, host string, port int, writeLoggedFeaturesCallback logging.OfflineStoreWriteCallback, loggingOpts *logging.LoggingOptions) error { + args := m.Called(fs, host, port, writeLoggedFeaturesCallback, loggingOpts) + return args.Error(0) +} + +// TestStartHttpServer tests the StartHttpServer function +func TestStartHttpServer(t *testing.T) { + mockServerStarter := new(MockServerStarter) + fs := &feast.FeatureStore{} + host := "localhost" + port := 8080 + var writeLoggedFeaturesCallback logging.OfflineStoreWriteCallback + + loggingOpts := &logging.LoggingOptions{} + + mockServerStarter.On("StartHttpServer", fs, host, port, mock.AnythingOfType("logging.OfflineStoreWriteCallback"), loggingOpts).Return(nil) + + err := mockServerStarter.StartHttpServer(fs, host, port, writeLoggedFeaturesCallback, loggingOpts) + assert.NoError(t, err) + mockServerStarter.AssertExpectations(t) +} + +// TestStartGrpcServer tests the StartGrpcServer function +func TestStartGrpcServer(t *testing.T) { + mockServerStarter := new(MockServerStarter) + fs := &feast.FeatureStore{} + host := "localhost" + port := 9090 + var writeLoggedFeaturesCallback logging.OfflineStoreWriteCallback + loggingOpts := &logging.LoggingOptions{} + + mockServerStarter.On("StartGrpcServer", fs, host, port, mock.AnythingOfType("logging.OfflineStoreWriteCallback"), loggingOpts).Return(nil) + + err := mockServerStarter.StartGrpcServer(fs, host, port, writeLoggedFeaturesCallback, loggingOpts) + assert.NoError(t, err) + mockServerStarter.AssertExpectations(t) +} + +// TestConstructLoggingService tests the constructLoggingService function +func TestConstructLoggingService(t *testing.T) { + fs := &feast.FeatureStore{} + var writeLoggedFeaturesCallback logging.OfflineStoreWriteCallback + loggingOpts := &logging.LoggingOptions{} + + _, err := constructLoggingService(fs, writeLoggedFeaturesCallback, loggingOpts) + assert.NoError(t, err) + // Further assertions can be added here based on the expected behavior of constructLoggingService +} + +// Note: Additional tests can be written for other functions and error scenarios. diff --git a/sdk/python/feast/cli.py b/sdk/python/feast/cli.py index e654eaa2cf..8c08bee677 100644 --- a/sdk/python/feast/cli.py +++ b/sdk/python/feast/cli.py @@ -676,12 +676,6 @@ def init_command(project_directory, minimal: bool, template: str): show_default=True, help="Timeout for keep alive", ) -@click.option( - "--go", - is_flag=True, - show_default=True, - help="Use Go to serve", -) @click.option( "--registry_ttl_sec", "-r", @@ -699,17 +693,12 @@ def serve_command( no_access_log: bool, workers: int, keep_alive_timeout: int, - go: bool, registry_ttl_sec: int = 5, ): try: """Start a feature server locally on a given port.""" store = create_feature_store(ctx) - if go: - # Turn on Go feature retrieval. - store.config.go_feature_serving = True - store.serve( host=host, port=port, diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index bf461636f4..cbeb4c9b88 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -18,7 +18,6 @@ from datetime import datetime, timedelta from pathlib import Path from typing import ( - TYPE_CHECKING, Any, Callable, Dict, @@ -91,9 +90,6 @@ logger = logging.getLogger(__name__) -if TYPE_CHECKING: - from feast.embedded_go.online_features_service import EmbeddedOnlineFeatureServer - class FeatureStore: """ @@ -104,14 +100,12 @@ class FeatureStore: repo_path: The path to the feature repo. _registry: The registry for the feature store. _provider: The provider for the feature store. - _go_server: The (optional) Go feature server for the feature store. """ config: RepoConfig repo_path: Path _registry: BaseRegistry _provider: Provider - _go_server: Optional["EmbeddedOnlineFeatureServer"] def __init__( self, @@ -171,7 +165,6 @@ def __init__( self._registry = r self._provider = get_provider(self.config) - self._go_server = None def version(self) -> str: """Returns the version of the current Feast SDK/CLI.""" @@ -1009,10 +1002,6 @@ def apply( ) self._registry.commit() - # go server needs to be reloaded to apply new configuration. - # we're stopping it here - # new server will be instantiated on the next online request - self._teardown_go_server() def teardown(self): """Tears down all local and cloud resources for the feature store.""" @@ -1025,7 +1014,6 @@ def teardown(self): self._get_provider().teardown_infra(self.project, tables, entities) self._registry.teardown() - self._teardown_go_server() def get_historical_features( self, @@ -1771,56 +1759,20 @@ def serve( """Start the feature consumption server locally on a given port.""" type_ = type_.lower() - if self.config.go_feature_serving: - # Start go server instead of python if the flag is enabled - self._lazy_init_go_server() - enable_logging: bool = getattr( - getattr( - getattr(self.config, "feature_server", None), - "feature_logging", - None, - ), - "enabled", - False, - ) - logging_options = ( - self.config.feature_server.feature_logging - if enable_logging and self.config.feature_server - else None - ) - if type_ == "http" and self._go_server is not None: - self._go_server.start_http_server( - host, - port, - enable_logging=enable_logging, - logging_options=logging_options, - ) - elif type_ == "grpc" and self._go_server is not None: - self._go_server.start_grpc_server( - host, - port, - enable_logging=enable_logging, - logging_options=logging_options, - ) - else: - raise ValueError( - f"Unsupported server type '{type_}'. Must be one of 'http' or 'grpc'." - ) - else: - if type_ != "http": - raise ValueError( - f"Python server only supports 'http'. Got '{type_}' instead." - ) - # Start the python server - feature_server.start_server( - self, - host=host, - port=port, - no_access_log=no_access_log, - workers=workers, - keep_alive_timeout=keep_alive_timeout, - registry_ttl_sec=registry_ttl_sec, + if type_ != "http": + raise ValueError( + f"Python server only supports 'http'. Got '{type_}' instead." ) + # Start the python server + feature_server.start_server( + self, + host=host, + port=port, + no_access_log=no_access_log, + workers=workers, + keep_alive_timeout=keep_alive_timeout, + registry_ttl_sec=registry_ttl_sec, + ) def _teardown_go_server(self): self._go_server = None diff --git a/sdk/python/feast/repo_config.py b/sdk/python/feast/repo_config.py index 001cb74156..dc6ef10d78 100644 --- a/sdk/python/feast/repo_config.py +++ b/sdk/python/feast/repo_config.py @@ -187,12 +187,6 @@ class RepoConfig(FeastBaseModel): repo_path: Optional[Path] = None - go_feature_serving: Optional[bool] = False - """ If True, use the Go feature server instead of the Python feature server. """ - - go_feature_retrieval: Optional[bool] = False - """ If True, use the embedded Go code to retrieve features instead of the Python SDK. """ - entity_key_serialization_version: StrictInt = 1 """ Entity key serialization version: This version is used to control what serialization scheme is used when writing data to the online store. diff --git a/sdk/python/tests/README.md b/sdk/python/tests/README.md index 5b93012902..d56d1de465 100644 --- a/sdk/python/tests/README.md +++ b/sdk/python/tests/README.md @@ -16,7 +16,6 @@ Let's inspect the test setup in `sdk/python/tests/integration`: $ tree . ├── e2e -│ ├── test_go_feature_server.py │ ├── test_python_feature_server.py │ ├── test_universal_e2e.py │ └── test_validation.py @@ -92,8 +91,6 @@ Tests in Feast are split into integration and unit tests. * The main codepaths include: * basic e2e tests for offline stores * `test_universal_e2e.py` - * go feature server - * `test_go_feature_server.py` * python http server * `test_python_feature_server.py` * data quality monitoring feature validation diff --git a/sdk/python/tests/conftest.py b/sdk/python/tests/conftest.py index c9598de6fa..27b3deae80 100644 --- a/sdk/python/tests/conftest.py +++ b/sdk/python/tests/conftest.py @@ -33,8 +33,8 @@ create_basic_driver_dataset, create_document_dataset, ) -from tests.integration.feature_repos.integration_test_repo_config import ( - IntegrationTestRepoConfig, # noqa: E402 +from tests.integration.feature_repos.integration_test_repo_config import ( # noqa: E402 + IntegrationTestRepoConfig, ) from tests.integration.feature_repos.repo_configuration import ( # noqa: E402 AVAILABLE_OFFLINE_STORES, @@ -46,8 +46,8 @@ construct_universal_feature_views, construct_universal_test_data, ) -from tests.integration.feature_repos.universal.data_sources.file import ( - FileDataSourceCreator, # noqa: E402 +from tests.integration.feature_repos.universal.data_sources.file import ( # noqa: E402 + FileDataSourceCreator, ) from tests.integration.feature_repos.universal.entities import ( # noqa: E402 customer, @@ -110,18 +110,11 @@ def pytest_addoption(parser): default=False, help="Run benchmark tests", ) - parser.addoption( - "--goserver", - action="store_true", - default=False, - help="Run tests that use the go feature server", - ) def pytest_collection_modifyitems(config, items: List[Item]): should_run_integration = config.getoption("--integration") is True should_run_benchmark = config.getoption("--benchmark") is True - should_run_goserver = config.getoption("--goserver") is True integration_tests = [t for t in items if "integration" in t.keywords] if not should_run_integration: @@ -141,15 +134,6 @@ def pytest_collection_modifyitems(config, items: List[Item]): for t in benchmark_tests: items.append(t) - goserver_tests = [t for t in items if "goserver" in t.keywords] - if not should_run_goserver: - for t in goserver_tests: - items.remove(t) - else: - items.clear() - for t in goserver_tests: - items.append(t) - @pytest.fixture def simple_dataset_1() -> pd.DataFrame: @@ -279,9 +263,6 @@ def pytest_generate_tests(metafunc: pytest.Metafunc): if "python_server" in metafunc.fixturenames: extra_dimensions.extend([{"python_feature_server": True}]) - if "goserver" in markers: - extra_dimensions.append({"go_feature_serving": True}) - configs = [] if offline_stores: for provider, offline_store_creator in offline_stores: @@ -295,13 +276,6 @@ def pytest_generate_tests(metafunc: pytest.Metafunc): **dim, } - # temporary Go works only with redis - if config.get("go_feature_serving") and ( - not isinstance(online_store, dict) - or online_store["type"] != "redis" - ): - continue - c = IntegrationTestRepoConfig(**config) if c not in _config_cache: diff --git a/sdk/python/tests/expediagroup/test_elasticsearch_online_store.py b/sdk/python/tests/expediagroup/test_elasticsearch_online_store.py index 804c32e3e6..9fe7b54780 100644 --- a/sdk/python/tests/expediagroup/test_elasticsearch_online_store.py +++ b/sdk/python/tests/expediagroup/test_elasticsearch_online_store.py @@ -13,7 +13,7 @@ ElasticsearchOnlineStoreConfig, ) from feast.field import Field -from feast.infra.offline_stores.file import FileOfflineStoreConfig +from feast.infra.offline_stores.dask import DaskOfflineStoreConfig from feast.infra.offline_stores.file_source import FileSource from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto from feast.protos.feast.types.Value_pb2 import BytesList, FloatList @@ -59,7 +59,7 @@ def repo_config(embedded_elasticsearch): username=embedded_elasticsearch["username"], password=embedded_elasticsearch["password"], ), - offline_store=FileOfflineStoreConfig(), + offline_store=DaskOfflineStoreConfig(), entity_key_serialization_version=2, ) diff --git a/sdk/python/tests/integration/e2e/test_go_feature_server.py b/sdk/python/tests/integration/e2e/test_go_feature_server.py deleted file mode 100644 index 0f972e45df..0000000000 --- a/sdk/python/tests/integration/e2e/test_go_feature_server.py +++ /dev/null @@ -1,263 +0,0 @@ -import threading -import time -from datetime import datetime -from typing import List - -import grpc -import pandas as pd -import pytest -import pytz -import requests - -from feast.embedded_go.online_features_service import EmbeddedOnlineFeatureServer -from feast.feast_object import FeastObject -from feast.feature_logging import LoggingConfig -from feast.feature_service import FeatureService -from feast.infra.feature_servers.base_config import FeatureLoggingConfig -from feast.protos.feast.serving.ServingService_pb2 import ( - FieldStatus, - GetOnlineFeaturesRequest, - GetOnlineFeaturesResponse, -) -from feast.protos.feast.serving.ServingService_pb2_grpc import ServingServiceStub -from feast.protos.feast.types.Value_pb2 import RepeatedValue -from feast.type_map import python_values_to_proto_values -from feast.value_type import ValueType -from feast.wait import wait_retry_backoff -from tests.integration.feature_repos.repo_configuration import ( - construct_universal_feature_views, -) -from tests.integration.feature_repos.universal.entities import ( - customer, - driver, - location, -) -from tests.utils.http_server import check_port_open, free_port -from tests.utils.test_log_creator import generate_expected_logs, get_latest_rows - - -@pytest.mark.integration -@pytest.mark.goserver -def test_go_grpc_server(grpc_client): - resp: GetOnlineFeaturesResponse = grpc_client.GetOnlineFeatures( - GetOnlineFeaturesRequest( - feature_service="driver_features", - entities={ - "driver_id": RepeatedValue( - val=python_values_to_proto_values( - [5001, 5002], feature_type=ValueType.INT64 - ) - ) - }, - full_feature_names=True, - ) - ) - assert list(resp.metadata.feature_names.val) == [ - "driver_id", - "driver_stats__conv_rate", - "driver_stats__acc_rate", - "driver_stats__avg_daily_trips", - ] - for vector in resp.results: - assert all([s == FieldStatus.PRESENT for s in vector.statuses]) - - -@pytest.mark.integration -@pytest.mark.goserver -def test_go_http_server(http_server_port): - response = requests.post( - f"http://localhost:{http_server_port}/get-online-features", - json={ - "feature_service": "driver_features", - "entities": {"driver_id": [5001, 5002]}, - "full_feature_names": True, - }, - ) - assert response.status_code == 200, response.text - response = response.json() - assert set(response.keys()) == {"metadata", "results"} - metadata = response["metadata"] - results = response["results"] - assert response["metadata"] == { - "feature_names": [ - "driver_id", - "driver_stats__conv_rate", - "driver_stats__acc_rate", - "driver_stats__avg_daily_trips", - ] - }, metadata - assert len(results) == 4, results - assert all( - set(result.keys()) == {"event_timestamps", "statuses", "values"} - for result in results - ), results - assert all( - result["statuses"] == ["PRESENT", "PRESENT"] for result in results - ), results - assert results[0]["values"] == [5001, 5002], results - for result in results[1:]: - assert len(result["values"]) == 2, result - assert all(value is not None for value in result["values"]), result - - -@pytest.mark.integration -@pytest.mark.goserver -@pytest.mark.universal_offline_stores -@pytest.mark.parametrize("full_feature_names", [True, False], ids=lambda v: str(v)) -def test_feature_logging( - grpc_client, environment, universal_data_sources, full_feature_names -): - fs = environment.feature_store - feature_service = fs.get_feature_service("driver_features") - log_start_date = datetime.now().astimezone(pytz.UTC) - driver_ids = list(range(5001, 5011)) - - for driver_id in driver_ids: - # send each driver id in separate request - grpc_client.GetOnlineFeatures( - GetOnlineFeaturesRequest( - feature_service="driver_features", - entities={ - "driver_id": RepeatedValue( - val=python_values_to_proto_values( - [driver_id], feature_type=ValueType.INT64 - ) - ) - }, - full_feature_names=full_feature_names, - ) - ) - # with some pause - time.sleep(0.1) - - _, datasets, _ = universal_data_sources - latest_rows = get_latest_rows(datasets.driver_df, "driver_id", driver_ids) - feature_view = fs.get_feature_view("driver_stats") - features = [ - feature.name - for proj in feature_service.feature_view_projections - for feature in proj.features - ] - expected_logs = generate_expected_logs( - latest_rows, feature_view, features, ["driver_id"], "event_timestamp" - ) - - def retrieve(): - retrieval_job = fs._get_provider().retrieve_feature_service_logs( - feature_service=feature_service, - start_date=log_start_date, - end_date=datetime.now().astimezone(pytz.UTC), - config=fs.config, - registry=fs._registry, - ) - try: - df = retrieval_job.to_df() - except Exception: - # Table or directory was not created yet - return None, False - - return df, df.shape[0] == len(driver_ids) - - persisted_logs = wait_retry_backoff( - retrieve, timeout_secs=60, timeout_msg="Logs retrieval failed" - ) - - persisted_logs = persisted_logs.sort_values(by="driver_id").reset_index(drop=True) - persisted_logs = persisted_logs[expected_logs.columns] - pd.testing.assert_frame_equal(expected_logs, persisted_logs, check_dtype=False) - - -""" -Start go feature server either on http or grpc based on the repo configuration for testing. -""" - - -def _server_port(environment, server_type: str): - if not environment.test_repo_config.go_feature_serving: - pytest.skip("Only for Go path") - - fs = environment.feature_store - - embedded = EmbeddedOnlineFeatureServer( - repo_path=str(fs.repo_path.absolute()), - repo_config=fs.config, - feature_store=fs, - ) - port = free_port() - if server_type == "grpc": - target = embedded.start_grpc_server - elif server_type == "http": - target = embedded.start_http_server - else: - raise ValueError("Server Type must be either 'http' or 'grpc'") - - t = threading.Thread( - target=target, - args=("127.0.0.1", port), - kwargs=dict( - enable_logging=True, - logging_options=FeatureLoggingConfig( - enabled=True, - queue_capacity=100, - write_to_disk_interval_secs=1, - flush_interval_secs=1, - emit_timeout_micro_secs=10000, - ), - ), - ) - t.start() - - wait_retry_backoff( - lambda: (None, check_port_open("127.0.0.1", port)), timeout_secs=15 - ) - - yield port - if server_type == "grpc": - embedded.stop_grpc_server() - else: - embedded.stop_http_server() - - # wait for graceful stop - time.sleep(5) - - -# Go test fixtures - - -@pytest.fixture -def initialized_registry(environment, universal_data_sources): - fs = environment.feature_store - - _, _, data_sources = universal_data_sources - feature_views = construct_universal_feature_views(data_sources) - - feature_service = FeatureService( - name="driver_features", - features=[feature_views.driver], - logging_config=LoggingConfig( - destination=environment.data_source_creator.create_logged_features_destination(), - sample_rate=1.0, - ), - ) - feast_objects: List[FeastObject] = [feature_service] - feast_objects.extend(feature_views.values()) - feast_objects.extend([driver(), customer(), location()]) - - fs.apply(feast_objects) - fs.materialize(environment.start_date, environment.end_date) - - -@pytest.fixture -def grpc_server_port(environment, initialized_registry): - yield from _server_port(environment, "grpc") - - -@pytest.fixture -def http_server_port(environment, initialized_registry): - yield from _server_port(environment, "http") - - -@pytest.fixture -def grpc_client(grpc_server_port): - ch = grpc.insecure_channel(f"localhost:{grpc_server_port}") - yield ServingServiceStub(ch) diff --git a/sdk/python/tests/integration/feature_repos/integration_test_repo_config.py b/sdk/python/tests/integration/feature_repos/integration_test_repo_config.py index 4662734383..309f92005a 100644 --- a/sdk/python/tests/integration/feature_repos/integration_test_repo_config.py +++ b/sdk/python/tests/integration/feature_repos/integration_test_repo_config.py @@ -37,7 +37,6 @@ class IntegrationTestRepoConfig: full_feature_names: bool = True infer_features: bool = False python_feature_server: bool = False - go_feature_serving: bool = False def __repr__(self) -> str: if not self.online_store_creator: @@ -61,7 +60,6 @@ def __repr__(self) -> str: f"{self.offline_store_creator.__name__.split('.')[-1].replace('DataSourceCreator', '')}", online_store_type, f"python_fs:{self.python_feature_server}", - f"go_fs:{self.go_feature_serving}", ] ) @@ -77,6 +75,5 @@ def __eq__(self, other): and self.online_store == other.online_store and self.offline_store_creator == other.offline_store_creator and self.online_store_creator == other.online_store_creator - and self.go_feature_serving == other.go_feature_serving and self.python_feature_server == other.python_feature_server ) diff --git a/sdk/python/tests/integration/feature_repos/universal/data_sources/file.py b/sdk/python/tests/integration/feature_repos/universal/data_sources/file.py index 5174e16046..f5f161f7ba 100644 --- a/sdk/python/tests/integration/feature_repos/universal/data_sources/file.py +++ b/sdk/python/tests/integration/feature_repos/universal/data_sources/file.py @@ -382,20 +382,19 @@ def setup(self, registry: RegistryConfig): repo_path = Path(tempfile.mkdtemp()) with open(repo_path / "feature_store.yaml", "w") as outfile: yaml.dump(config.model_dump(by_alias=True), outfile) - repo_path = str(repo_path.resolve()) self.server_port = free_port() host = "0.0.0.0" cmd = [ "feast", - "-c" + repo_path, + "-c" + str(repo_path.resolve()), "serve_offline", "--host", host, "--port", str(self.server_port), ] - self.proc = subprocess.Popen( + self.proc = subprocess.Popen( # type: ignore cmd, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL ) diff --git a/setup.py b/setup.py index f40307cbdc..f5fefc1a4c 100644 --- a/setup.py +++ b/setup.py @@ -24,8 +24,7 @@ from pathlib import Path from subprocess import CalledProcessError -from setuptools import Extension, find_packages, setup, Command -from setuptools.command.build_ext import build_ext as _build_ext +from setuptools import Command, Extension, find_packages, setup from setuptools.command.build_py import build_py from setuptools.command.develop import develop from setuptools.command.install import install @@ -443,92 +442,6 @@ def run(self): develop.run(self) -class build_ext(_build_ext): - def finalize_options(self) -> None: - super().finalize_options() - if os.getenv("COMPILE_GO", "false").lower() == "false": - self.extensions = [e for e in self.extensions if not self._is_go_ext(e)] - - def _is_go_ext(self, ext: Extension): - return any( - source.endswith(".go") or source.startswith("github") - for source in ext.sources - ) - - def build_extension(self, ext: Extension): - print(f"Building extension {ext}") - if not self._is_go_ext(ext): - # the base class may mutate `self.compiler` - compiler = copy.deepcopy(self.compiler) - self.compiler, compiler = compiler, self.compiler - try: - return _build_ext.build_extension(self, ext) - finally: - self.compiler, compiler = compiler, self.compiler - - bin_path = _generate_path_with_gopath() - go_env = json.loads( - subprocess.check_output(["go", "env", "-json"]).decode("utf-8").strip() - ) - - print(f"Go env: {go_env}") - print(f"CWD: {os.getcwd()}") - - destination = os.path.dirname(os.path.abspath(self.get_ext_fullpath(ext.name))) - subprocess.check_call( - ["go", "install", "golang.org/x/tools/cmd/goimports"], - env={"PATH": bin_path, **go_env}, - ) - subprocess.check_call( - ["go", "get", "github.com/go-python/gopy@v0.4.4"], - env={"PATH": bin_path, **go_env}, - ) - subprocess.check_call( - ["go", "install", "github.com/go-python/gopy"], - env={"PATH": bin_path, **go_env}, - ) - subprocess.check_call( - [ - "gopy", - "build", - "-output", - destination, - "-vm", - sys.executable, - "--build-tags", - "cgo,ccalloc", - "--dynamic-link=True", - "-no-make", - *ext.sources, - ], - env={ - "PATH": bin_path, - "CGO_LDFLAGS_ALLOW": ".*", - **go_env, - }, - ) - - def copy_extensions_to_source(self): - build_py = self.get_finalized_command("build_py") - for ext in self.extensions: - fullname = self.get_ext_fullname(ext.name) - modpath = fullname.split(".") - package = ".".join(modpath[:-1]) - package_dir = build_py.get_package_dir(package) - - src_dir = dest_dir = package_dir - - if src_dir.startswith(PYTHON_CODE_PREFIX): - src_dir = package_dir[len(PYTHON_CODE_PREFIX) :] - src_dir = src_dir.lstrip("/") - - src_dir = os.path.join(self.build_lib, src_dir) - - # copy whole directory - print(f"Copying from {src_dir} to {dest_dir}") - copy_tree(src_dir, dest_dir) - - setup( name=NAME, author=AUTHOR, @@ -598,12 +511,5 @@ def copy_extensions_to_source(self): "build_go_protos": BuildGoProtosCommand, "build_py": BuildCommand, "develop": DevelopCommand, - "build_ext": build_ext, }, - ext_modules=[ - Extension( - "feast.embedded_go.lib._embedded", - ["github.com/feast-dev/feast/go/embedded"], - ) - ], )