Skip to content

Commit

Permalink
Add SchemaRegistry wrapper
Browse files Browse the repository at this point in the history
  • Loading branch information
expediamatt committed Feb 11, 2024
1 parent e3c7f42 commit 35a022c
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 2 deletions.
60 changes: 60 additions & 0 deletions sdk/python/feast/expediagroup/schema_registry/schema_registry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
"""
Wrapper for SchemaRegistryClient, to separate Feast from
the extensive auth and configuration process of
connecting to a SchemaRegistry.
Copyright 2024 Expedia Group
Author: [email protected]
"""

import requests

from confluent_kafka.schema_registry import SchemaRegistryClient


class SchemaRegistry():
# spark: SparkSession
# format: str
# preprocess_fn: Optional[MethodType]
# join_keys: List[str]

def __init__(self):
pass

def get_properties(
user: String,
password: String,
urn: String,
environment: String,
cert_path: String, #https://stackoverflow.com/questions/55203791/python-requests-using-certificate-value-instead-of-path
) -> dict:
"""Discover a Schema Registry with the provided urn and credentials,
and obtain a set of properties for use in Schema Registry calls."""
discovery_url = "https://stream-discovery-service-{environment}.rcp.us-east-1.data.{environment}.exp-aws.net/v2/discovery/urn/{urn}".format(
environment=environment, urn=urn
)

response = requests.get(
discovery_url,
auth=(user, password),
headers={"Accept": "application/json"},
verify=cert_path,
)

if response.status_code != 200:
raise RuntimeError(
"Discovery API returned unexpected HTTP status: {status}".format(
status=str(response.status_code)
)
)

try:
props = json.loads(response.text)
except (TypeError, UnicodeDecodeError):
raise TypeError(
"Discovery API response did not contain valid json: {response}".format(
response=response.text
)
)

return props
8 changes: 6 additions & 2 deletions sdk/python/feast/infra/contrib/spark_kafka_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from typing import List, Optional

import pandas as pd
from confluent_kafka.schema_registry import SchemaRegistryClient
from feast.expediagroup.schema_registry.schema_registry import SchemaRegistry
from confluent_kafka.schema_registry.avro import AvroDeserializer
from pyspark.sql import DataFrame, SparkSession
from pyspark.sql.avro.functions import from_avro
Expand Down Expand Up @@ -72,7 +72,11 @@ def __init__(
def init_confluent_avro_processor(self) -> None:
"""Extra initialization for Confluent Avro processor, which uses
SchemaRegistry and the Avro Deserializer, both of which need initialization."""
pass

user = "VAULT_SECRETS"
password = "VAULT_SECRETS"
urn = "NOT SURE"
environment = "NOT SURE"

def ingest_stream_feature_view(self, to: PushMode = PushMode.ONLINE) -> None:
ingested_stream_df = self._ingest_stream_data()
Expand Down

0 comments on commit 35a022c

Please sign in to comment.