diff --git a/sdk/python/feast/infra/contrib/spark_kafka_processor.py b/sdk/python/feast/infra/contrib/spark_kafka_processor.py index cafd8b3852..c8b24f2a82 100644 --- a/sdk/python/feast/infra/contrib/spark_kafka_processor.py +++ b/sdk/python/feast/infra/contrib/spark_kafka_processor.py @@ -36,10 +36,16 @@ def __init__( self, *, fs: FeatureStore, - sfv: StreamFeatureView, + sfv: FeatureView, config: ProcessorConfig, preprocess_fn: Optional[MethodType] = None, ): + + # In general, FeatureView may or may not have stream_source, but it must + # have one to use spark kafka processor + if not sfv.stream_source: + raise ValueError("Feature View must have a stream source to use spark streaming.") + if not isinstance(sfv.stream_source, KafkaSource): raise ValueError("data source is not kafka source") if not isinstance( diff --git a/sdk/python/tests/unit/infra/test_streaming_ingestion.py b/sdk/python/tests/unit/infra/test_streaming_ingestion.py index 6adc8630db..e166977eb0 100644 --- a/sdk/python/tests/unit/infra/test_streaming_ingestion.py +++ b/sdk/python/tests/unit/infra/test_streaming_ingestion.py @@ -16,11 +16,42 @@ from feast.entity import Entity from feast.value_type import ValueType +from feast.infra.contrib.spark_kafka_processor import SparkKafkaProcessor -def test_the_test(): - entity = Entity(name="my-entity", description="My entity") - assert entity.join_key == "my-entity" - assert(5==5) +def test_streaming_ingestion(): + + spark_config = IntegrationTestRepoConfig( + provider="local", + online_store_creator=RedisOnlineStoreCreator, + offline_store_creator=SparkDataSourceCreator, + batch_engine={"type": "spark.engine", "partitions": 10}, + ) + spark_environment = construct_test_environment( + spark_config, None, entity_key_serialization_version=1 + ) + df = create_basic_driver_dataset() + + # Make a stream source. + stream_source = KafkaSource( + name="kafka", + timestamp_field="event_timestamp", + kafka_bootstrap_servers="", + message_format=AvroFormat(""), + topic="topic", + batch_source=FileSource(path="some path"), + ) + StreamFeatureView( + name="test kafka stream feature view", + entities=[], + ttl=timedelta(days=30), + source=stream_source, + aggregations=[], + ) + + + + # processor = SparkKafkaProcessor() +#