Skip to content

Commit

Permalink
revert
Browse files Browse the repository at this point in the history
  • Loading branch information
vbhagwat committed Jan 10, 2025
1 parent 00758c2 commit 3cbc8e6
Showing 1 changed file with 8 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
"""
Cassandra/Astra DB online store for Feast.
"""
import asyncio

import logging
from datetime import datetime
from typing import Any, Callable, Dict, List, Literal, Optional, Sequence, Tuple
Expand Down Expand Up @@ -318,7 +318,7 @@ def __del__(self):
"""
pass

async def online_write_batch(
def online_write_batch(
self,
config: RepoConfig,
table: FeatureView,
Expand Down Expand Up @@ -364,13 +364,14 @@ async def online_write_batch(
batch.add(insert_cql, params)
# this happens N-1 times, will be corrected outside:
if progress:
await progress(1)
progress(1)

futures.append(session.execute_async(batch))
if len(futures) >= config.online_store.write_concurrency:
# Raises exception if at least one of the batch fails
try:
await asyncio.gather(*[future.result() for future in futures])
for future in futures:
future.result()
futures = []
except Exception as exc:
logger.error(f"Error writing a batch: {exc}")
Expand All @@ -379,15 +380,16 @@ async def online_write_batch(

if len(futures) > 0:
try:
await asyncio.gather(*[future.result() for future in futures])
for future in futures:
future.result()
futures = []
except Exception as exc:
logger.error(f"Error writing a batch: {exc}")
print(f"Error writing a batch: {exc}")
raise Exception("Error writing a batch") from exc
# correction for the last missing call to `progress`:
if progress:
await progress(1)
progress(1)

def online_read(
self,
Expand Down

0 comments on commit 3cbc8e6

Please sign in to comment.