Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
echoyang7 committed Jun 28, 2024
2 parents 9904069 + 4cdff44 commit 647c468
Show file tree
Hide file tree
Showing 5 changed files with 79 additions and 42 deletions.
3 changes: 3 additions & 0 deletions lyrebird/compatibility.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,9 @@ def monkey_patch_application(async_obj, async_funcs=None, async_values=None):
from lyrebird.event import EventServer
from lyrebird import event

# TODO Short-term fix, Optimized to general logic
EventServer.publish_trace_deep -= 1

msg_queue = async_obj['msg_queue']
process_namespace = async_obj['process_namespace']

Expand Down
4 changes: 2 additions & 2 deletions lyrebird/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,12 +114,12 @@ def callback_func_run_statistic(callback_fn, args, kwargs, report_info):
return
if not application.config.get('event.lyrebird_metrics_report', True):
return
application.server['event'].publish('lyrebird_metrics', {
application.server['event'].publish('lyrebird_metrics', {'lyrebird_metrics': {
'sender': 'EventServer',
'action': 'broadcast_handler',
'duration': event_duration,
'trace_info': str(report_info['trace_info'])
})
}})


class CustomExecuteServer(ProcessServer):
Expand Down
47 changes: 29 additions & 18 deletions lyrebird/mock/extra_mock_server/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,22 @@ def make_raw_headers_line(request: web.Request):
return json.dumps(raw_headers, ensure_ascii=False)


async def make_response_header(proxy_resp_headers: dict, context: LyrebirdProxyContext, data=None):
response_headers = {}
for k, v in proxy_resp_headers.items():
if k.lower() == 'content-length':
if data is not None:
response_headers[k] = str(len(data))
elif k.lower() == 'host':
response_headers['Host'] = context.netloc
elif k.lower() == 'location':
response_headers['Host'] = context.netloc
response_headers[k] = v
else:
response_headers[k] = v
return response_headers


async def send_request(context: LyrebirdProxyContext, target_url):
async with client.ClientSession(auto_decompress=False) as session:
request: web.Request = context.request
Expand All @@ -64,24 +80,19 @@ async def send_request(context: LyrebirdProxyContext, target_url):
) as _resp:
proxy_resp_status = _resp.status
proxy_resp_headers = _resp.headers
# TODO support stream response
proxy_resp_data = await _resp.read()

response_headers = {}
for k, v in proxy_resp_headers.items():
if k.lower() in ['transfer-encoding']:
continue
elif k.lower() == 'content-length':
response_headers[k] = str(len(proxy_resp_data))
elif k.lower() == 'host':
response_headers['Host'] = context.netloc
elif k.lower() == 'location':
response_headers['Host'] = context.netloc
response_headers[k] = v
else:
response_headers[k] = v

resp = web.Response(status=proxy_resp_status, body=proxy_resp_data, headers=response_headers)
if 'Transfer-Encoding' in proxy_resp_headers and proxy_resp_headers.get('Transfer-Encoding') == 'chunked':
response_headers = await make_response_header(proxy_resp_headers, context)
resp = web.StreamResponse(status=proxy_resp_status, headers=response_headers)
await resp.prepare(request)
async for data in _resp.content.iter_any():
await resp.write(data)
await resp.write_eof()
logger.info(f'Stream Request finished: {proxy_resp_status} {context.origin_url}')
else:
proxy_resp_data = await _resp.read()
response_headers = await make_response_header(proxy_resp_headers, context, proxy_resp_data)
resp = web.Response(status=proxy_resp_status, body=proxy_resp_data, headers=response_headers)
logger.info(f'Bytes Response finished: {proxy_resp_status} {context.origin_url}')
return resp


Expand Down
65 changes: 44 additions & 21 deletions lyrebird/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -553,6 +553,25 @@ def destory(self):
self.redis.delete(self.uuid)
self.redis.close()

def copy(self):
# Use the copy method with caution, especially during the create and release phases.
return type(self)(host=self.host, port=self.port, db=self.db, param_uuid=self.uuid)

def __eq__(self, other):
if not isinstance(other, type(self)):
return NotImplemented
if self.uuid != other.uuid:
return False
if self.host != other.host:
return False
if self.port != other.port:
return False
if self.db != other.db:
return False
return True

def __ne__(self, other):
return not self.__eq__(other)

def __getstate__(self):
return pickle.dumps({
Expand Down Expand Up @@ -599,6 +618,9 @@ def __delitem__(self, key):
def __contains__(self, key):
return self.redis.hexists(self.uuid, key)

def __iter__(self):
return self.raw()

def keys(self):
return [key.decode() for key in self.redis.hkeys(self.uuid)]

Expand All @@ -618,13 +640,31 @@ def update(self, data):
for key, value in data.items():
self[key] = value
self.redis.expire(self.uuid, REDIS_EXPIRE_TIME)


def pop(self, key, default=None):
if key not in self:
return default
else:
value = self.get(key, default)
del self[key]
return value

def setdefault(self, key, default=None):
try:
return self[key]
except KeyError:
self[key] = default
return default

def clear(self):
self.redis.delete(self.uuid)

def raw(self):
return {key.decode(): json.loads(value.decode()) for key, value in self.redis.hgetall(self.uuid).items()}

def __hash__(self):
return hash((self.uuid, self.host, self.port, self.db))

def __len__(self):
return len(self.redis.hkeys(self.uuid))

Expand All @@ -640,8 +680,6 @@ def _hook_value(parent, key, value):
return RedisHookedDict(parent, key, value)
elif isinstance(value, list):
return RedisHookedList(parent, key, value)
elif isinstance(value, set):
return RedisHookedSet(parent, key, value)
else:
return value

Expand All @@ -665,7 +703,7 @@ def __getitem__(self, key):
return _hook_value(self, key, dict.__getitem__(self, key))

def __setitem__(self, key, value):
dict.__setitem__(self, key, value)
dict.__setitem__(self, key, _hook_value(self, key, value))
self.parent[self.key] = self

def __delitem__(self, key):
Expand All @@ -679,6 +717,7 @@ def update(self, *args, **kwargs):
def __deepcopy__(self, memo):
return deepcopy(dict(self), memo)


class RedisHookedList(RedisHook, list):
def __init__(self, parent, key, value):
list.__init__(self, value)
Expand All @@ -688,7 +727,7 @@ def __getitem__(self, index):
return _hook_value(self, index, list.__getitem__(self, index))

def __setitem__(self, index, value):
list.__setitem__(self, index, _hook_value(value))
list.__setitem__(self, index, _hook_value(self, index, value))
self.parent[self.key] = self

def __delitem__(self, index):
Expand All @@ -701,19 +740,3 @@ def append(self, value):

def __deepcopy__(self, memo):
return deepcopy(list(self), memo)

class RedisHookedSet(RedisHook, set):
def __init__(self, parent, key, value):
set.__init__(self, value)
RedisHook.__init__(self, parent, key)

def add(self, value):
set.add(self, _hook_value(value))
self.parent[self.key] = self

def remove(self, value):
set.remove(self, value)
self.parent[self.key] = self

def __deepcopy__(self, memo):
return deepcopy(set(self), memo)
2 changes: 1 addition & 1 deletion lyrebird/version.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
IVERSION = (3, 0, 3)
IVERSION = (3, 0, 4)
VERSION = ".".join(str(i) for i in IVERSION)
LYREBIRD = "Lyrebird " + VERSION

0 comments on commit 647c468

Please sign in to comment.