Skip to content

Commit

Permalink
PYTHON-3459 Add log messages to Server selection spec
Browse files Browse the repository at this point in the history
  • Loading branch information
NoahStapp committed Feb 6, 2024
1 parent 522bfca commit fb36404
Show file tree
Hide file tree
Showing 17 changed files with 1,971 additions and 47 deletions.
5 changes: 5 additions & 0 deletions .evergreen/resync-specs.sh
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,11 @@ do
;;
server-selection|server_selection)
cpjson server-selection/tests/ server_selection
rm -rf $PYMONGO/test/server_selection/logging
cpjson server-selection/tests/logging server_selection_logging
;;
server-selection-logging|server_selection_logging)
cpjson server-selection/tests/logging server_selection_logging
;;
sessions)
cpjson sessions/tests/ sessions
Expand Down
19 changes: 16 additions & 3 deletions pymongo/bulk.py
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,7 @@ def execute_command(
generator: Iterator[Any],
write_concern: WriteConcern,
session: Optional[ClientSession],
operation: Optional[str] = "TEST_OPERATION",
) -> dict[str, Any]:
"""Execute using write commands."""
# nModified is only reported for write commands, not legacy ops.
Expand Down Expand Up @@ -437,7 +438,14 @@ def retryable_bulk(
)

client = self.collection.database.client
client._retryable_write(self.is_retryable, retryable_bulk, session, bulk=self)
client._retryable_write(
self.is_retryable,
retryable_bulk,
session,
bulk=self,
operation=operation,
operation_id=op_id,
)

if full_result["writeErrors"] or full_result["writeConcernErrors"]:
_raise_bulk_write_error(full_result)
Expand Down Expand Up @@ -547,7 +555,12 @@ def execute_no_results(
return self.execute_command_no_results(conn, generator, write_concern)
return self.execute_op_msg_no_results(conn, generator)

def execute(self, write_concern: WriteConcern, session: Optional[ClientSession]) -> Any:
def execute(
self,
write_concern: WriteConcern,
session: Optional[ClientSession],
operation: Optional[str] = "TEST_OPERATION",
) -> Any:
"""Execute operations."""
if not self.ops:
raise InvalidOperation("No operations to execute")
Expand All @@ -568,4 +581,4 @@ def execute(self, write_concern: WriteConcern, session: Optional[ClientSession])
self.execute_no_results(connection, generator, write_concern)
return None
else:
return self.execute_command(generator, write_concern, session)
return self.execute_command(generator, write_concern, session, operation)
6 changes: 5 additions & 1 deletion pymongo/change_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
OperationFailure,
PyMongoError,
)
from pymongo.operations import _Operations
from pymongo.typings import _CollationIn, _DocumentType, _Pipeline

# The change streams spec considers the following server errors from the
Expand Down Expand Up @@ -244,7 +245,10 @@ def _run_aggregation_cmd(
comment=self._comment,
)
return self._client._retryable_read(
cmd.get_cursor, self._target._read_preference_for(session), session
cmd.get_cursor,
self._target._read_preference_for(session),
session,
operation=_Operations.AGGREGATE_OP,
)

def _create_cursor(self) -> CommandCursor:
Expand Down
5 changes: 4 additions & 1 deletion pymongo/client_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@
WTimeoutError,
)
from pymongo.helpers import _RETRYABLE_ERROR_CODES
from pymongo.operations import _Operations
from pymongo.read_concern import ReadConcern
from pymongo.read_preferences import ReadPreference, _ServerMode
from pymongo.server_type import SERVER_TYPE
Expand Down Expand Up @@ -843,7 +844,9 @@ def func(
) -> dict[str, Any]:
return self._finish_transaction(conn, command_name)

return self._client._retry_internal(func, self, None, retryable=True)
return self._client._retry_internal(
func, self, None, retryable=True, operation=_Operations.ABORT_TRANSACTION_OP
)

def _finish_transaction(self, conn: Connection, command_name: str) -> dict[str, Any]:
self._transaction.attempt += 1
Expand Down
63 changes: 44 additions & 19 deletions pymongo/collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
UpdateOne,
_IndexKeyHint,
_IndexList,
_Operations,
)
from pymongo.read_preferences import ReadPreference, _ServerMode
from pymongo.results import (
Expand Down Expand Up @@ -257,8 +258,10 @@ def _conn_for_reads(
) -> ContextManager[tuple[Connection, _ServerMode]]:
return self.__database.client._conn_for_reads(self._read_preference_for(session), session)

def _conn_for_writes(self, session: Optional[ClientSession]) -> ContextManager[Connection]:
return self.__database.client._conn_for_writes(session)
def _conn_for_writes(
self, session: Optional[ClientSession], operation: Optional[str] = None
) -> ContextManager[Connection]:
return self.__database.client._conn_for_writes(session, operation=operation)

def _command(
self,
Expand Down Expand Up @@ -336,7 +339,7 @@ def __create(
if "size" in options:
options["size"] = float(options["size"])
cmd.update(options)
with self._conn_for_writes(session) as conn:
with self._conn_for_writes(session, operation=_Operations.CREATE_OP) as conn:
if qev2_required and conn.max_wire_version < 21:
raise ConfigurationError(
"Driver support of Queryable Encryption is incompatible with server. "
Expand Down Expand Up @@ -558,7 +561,7 @@ def bulk_write(
raise TypeError(f"{request!r} is not a valid request") from None

write_concern = self._write_concern_for(session)
bulk_api_result = blk.execute(write_concern, session)
bulk_api_result = blk.execute(write_concern, session, _Operations.INSERT_OP)
if bulk_api_result is not None:
return BulkWriteResult(bulk_api_result, True)
return BulkWriteResult({}, False)
Expand Down Expand Up @@ -598,7 +601,9 @@ def _insert_command(

_check_write_command_response(result)

self.__database.client._retryable_write(acknowledged, _insert_command, session)
self.__database.client._retryable_write(
acknowledged, _insert_command, session, operation=_Operations.INSERT_OP
)

if not isinstance(doc, RawBSONDocument):
return doc.get("_id")
Expand Down Expand Up @@ -844,6 +849,7 @@ def _update_retryable(
session: Optional[ClientSession] = None,
let: Optional[Mapping[str, Any]] = None,
comment: Optional[Any] = None,
operation: Optional[str] = _Operations.UPDATE_OP,
) -> Optional[Mapping[str, Any]]:
"""Internal update / replace helper."""

Expand All @@ -870,7 +876,10 @@ def _update(
)

return self.__database.client._retryable_write(
(write_concern or self.write_concern).acknowledged and not multi, _update, session
(write_concern or self.write_concern).acknowledged and not multi,
_update,
session,
operation=operation,
)

def replace_one(
Expand Down Expand Up @@ -970,6 +979,7 @@ def replace_one(
session=session,
let=let,
comment=comment,
operation="replace",
),
write_concern.acknowledged,
)
Expand Down Expand Up @@ -1082,6 +1092,7 @@ def update_one(
session=session,
let=let,
comment=comment,
operation=_Operations.UPDATE_OP,
),
write_concern.acknowledged,
)
Expand Down Expand Up @@ -1182,6 +1193,7 @@ def update_many(
session=session,
let=let,
comment=comment,
operation=_Operations.UPDATE_OP,
),
write_concern.acknowledged,
)
Expand Down Expand Up @@ -1319,7 +1331,10 @@ def _delete(
)

return self.__database.client._retryable_write(
(write_concern or self.write_concern).acknowledged and not multi, _delete, session
(write_concern or self.write_concern).acknowledged and not multi,
_delete,
session,
operation=_Operations.DELETE_OP,
)

def delete_one(
Expand Down Expand Up @@ -1798,7 +1813,7 @@ def _cmd(
cmd.update(kwargs)
return self._count_cmd(session, conn, read_preference, cmd, collation=None)

return self._retryable_non_cursor_read(_cmd, None)
return self._retryable_non_cursor_read(_cmd, None, operation=_Operations.COUNT_OP)

def count_documents(
self,
Expand Down Expand Up @@ -1887,17 +1902,20 @@ def _cmd(
return 0
return result["n"]

return self._retryable_non_cursor_read(_cmd, session)
return self._retryable_non_cursor_read(_cmd, session, _Operations.COUNT_OP)

def _retryable_non_cursor_read(
self,
func: Callable[[Optional[ClientSession], Server, Connection, Optional[_ServerMode]], T],
session: Optional[ClientSession],
operation: Optional[str] = "TEST_OPERATION",
) -> T:
"""Non-cursor read helper to handle implicit session creation."""
client = self.__database.client
with client._tmp_session(session) as s:
return client._retryable_read(func, self._read_preference_for(s), s)
return client._retryable_read(
func, self._read_preference_for(s), s, operation=operation
)

def create_indexes(
self,
Expand Down Expand Up @@ -1960,7 +1978,7 @@ def __create_indexes(
command (like maxTimeMS) can be passed as keyword arguments.
"""
names = []
with self._conn_for_writes(session) as conn:
with self._conn_for_writes(session, operation=_Operations.CREATE_INDEXES_OP) as conn:
supports_quorum = conn.max_wire_version >= 9

def gen_indexes() -> Iterator[Mapping[str, Any]]:
Expand Down Expand Up @@ -2200,7 +2218,7 @@ def drop_index(
cmd.update(kwargs)
if comment is not None:
cmd["comment"] = comment
with self._conn_for_writes(session) as conn:
with self._conn_for_writes(session, operation=_Operations.DROP_INDEXES_OP) as conn:
self._command(
conn,
cmd,
Expand Down Expand Up @@ -2277,7 +2295,9 @@ def _cmd(
return cmd_cursor

with self.__database.client._tmp_session(session, False) as s:
return self.__database.client._retryable_read(_cmd, read_pref, s)
return self.__database.client._retryable_read(
_cmd, read_pref, s, operation=_Operations.LIST_INDEXES_OP
)

def index_information(
self,
Expand Down Expand Up @@ -2367,6 +2387,7 @@ def list_search_indexes(
cmd.get_read_preference(session), # type: ignore[arg-type]
session,
retryable=not cmd._performs_write,
operation=_Operations.LIST_SEARCH_INDEX_OP,
)

def create_search_index(
Expand Down Expand Up @@ -2435,7 +2456,7 @@ def gen_indexes() -> Iterator[Mapping[str, Any]]:
cmd = {"createSearchIndexes": self.name, "indexes": list(gen_indexes())}
cmd.update(kwargs)

with self._conn_for_writes(session) as conn:
with self._conn_for_writes(session, operation=_Operations.CREATE_SEARCH_INDEXES_OP) as conn:
resp = self._command(
conn,
cmd,
Expand Down Expand Up @@ -2469,7 +2490,7 @@ def drop_search_index(
cmd.update(kwargs)
if comment is not None:
cmd["comment"] = comment
with self._conn_for_writes(session) as conn:
with self._conn_for_writes(session, operation=_Operations.DROP_SEARCH_INDEXES_OP) as conn:
self._command(
conn,
cmd,
Expand Down Expand Up @@ -2505,7 +2526,7 @@ def update_search_index(
cmd.update(kwargs)
if comment is not None:
cmd["comment"] = comment
with self._conn_for_writes(session) as conn:
with self._conn_for_writes(session, operation=_Operations.UPDATE_SEARCH_INDEX_OP) as conn:
self._command(
conn,
cmd,
Expand Down Expand Up @@ -2589,6 +2610,7 @@ def _aggregate(
cmd.get_read_preference(session), # type: ignore[arg-type]
session,
retryable=not cmd._performs_write,
operation=_Operations.AGGREGATE_OP,
)

def aggregate(
Expand Down Expand Up @@ -2925,7 +2947,7 @@ def rename(
cmd["comment"] = comment
write_concern = self._write_concern_for_cmd(cmd, session)

with self._conn_for_writes(session) as conn:
with self._conn_for_writes(session, operation=_Operations.RENAME_OP) as conn:
with self.__database.client._tmp_session(session) as s:
return conn.command(
"admin",
Expand Down Expand Up @@ -3006,7 +3028,7 @@ def _cmd(
user_fields={"values": 1},
)["values"]

return self._retryable_non_cursor_read(_cmd, session)
return self._retryable_non_cursor_read(_cmd, session, operation=_Operations.DISTINCT_OP)

def _write_concern_for_cmd(
self, cmd: Mapping[str, Any], session: Optional[ClientSession]
Expand Down Expand Up @@ -3090,7 +3112,10 @@ def _find_and_modify(
return out.get("value")

return self.__database.client._retryable_write(
write_concern.acknowledged, _find_and_modify, session
write_concern.acknowledged,
_find_and_modify,
session,
operation=_Operations.FIND_AND_MODIFY_OP,
)

def find_one_and_delete(
Expand Down
Loading

0 comments on commit fb36404

Please sign in to comment.