diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index eef90db..fb8eba3 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -21,10 +21,13 @@ jobs: uses: actions/setup-python@v5 with: python-version: "${{ matrix.python-version }}" + - uses: astral-sh/ruff-action@v3 + with: + args: "check --exclude ./test/generator/" - uses: haskell-actions/setup@v2 with: ghc-version: '9.6' cabal-version: latest - run: sudo apt install x11-apps libxcb1-dev libxcb-render0-dev - run: git clone https://gitlab.freedesktop.org/xorg/proto/xcbproto.git proto && cd proto && git checkout ${{ matrix.xcbver }} - - run: make -j XCBDIR=./proto/src check + - run: make XCBDIR=./proto/src check diff --git a/Makefile b/Makefile index ad84048..78aab29 100644 --- a/Makefile +++ b/Makefile @@ -1,5 +1,3 @@ -AUTOPEP8=autopep8 --in-place --aggressive --aggressive - XCBDIR?=$(shell pkg-config --variable=xcbincludedir xcb-proto) ifneq ($(XCBDIR),$(shell pkg-config --variable=xcbincludedir xcb-proto)) XCBVER=$(shell sed -e '1,/AC_INIT/d' $(XCBDIR)/../configure.ac | head -n 1 | tr -d ,[:blank:]) @@ -7,31 +5,24 @@ else XCBVER=$(shell pkg-config --modversion xcb-proto) endif NCPUS=$(shell nproc) -PARALLEL=$(shell which parallel) CABAL=flock xcffib.cabal cabal GEN=$(CABAL) new-run --minimize-conflict-set -j$(NCPUS) exe:xcffibgen -- VENV=xcffib_venv PYTHON=$(VENV)/bin/python3 -FLAKE=$(VENV)/bin/flake8 GENERATOR_FILES=$(shell find . -path ./test -prune -false -o -name \*.hs) HANDWRITTEN_MODULE_FILES=$(shell find ./module -name \*.py) +GENERATED_TESTS_DIR=./test/generator/ + # you should have xcb-proto installed to run this xcffib: xcffib.cabal $(GENERATOR_FILES) $(HANDWRITTEN_MODULE_FILES) $(GEN) --input $(XCBDIR) --output ./xcffib + ruff format ./xcffib cp ./module/*py ./xcffib/ touch ./xcffib/py.typed sed -i "s/__xcb_proto_version__ = .*/__xcb_proto_version__ = \"${XCBVER}\"/" xcffib/__init__.py -.PHONY: xcffib-fmt -xcffib-fmt: module/*.py -ifeq (${PARALLEL},) - $(AUTOPEP8) ./xcffib/*.py -else - find ./xcffib/*.py | parallel -j $(NCPUS) $(AUTOPEP8) '{}' -endif - dist-newstyle: $(CABAL) new-configure --enable-tests @@ -52,13 +43,15 @@ valgrind: xcffib valgrind --leak-check=full --show-leak-kinds=definite $(PYTHON) -m pytest -v newtests: - $(GEN) --input ./test/generator/ --output ./test/generator/ + $(GEN) --input $(GENERATED_TESTS_DIR) --output $(GENERATED_TESTS_DIR) git diff test # These are all split out so make -j3 check goes as fast as possible. .PHONY: lint lint: $(VENV) - $(FLAKE) --config=./test/flake8.cfg ./module + ruff check --exclude $(GENERATED_TESTS_DIR) --exclude ./proto + $(CABAL) check + $(PYTHON) -m compileall xcffib .PHONY: htests htests: @@ -74,8 +67,6 @@ check-mode-%: xcffib requirements.txt ./test/test_mode.sh $* check: xcffib htests lint check-mode-api check-mode-abi - $(CABAL) check - $(PYTHON) -m compileall xcffib # make release ver=0.99.99 release: xcffib diff --git a/README.md b/README.md index f286720..e7f6b25 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -# xcffib [![Build Status](https://github.com/tych0/xcffib/workflows/ci/badge.svg)](https://github.com/tych0/xcffib/actions) +# xcffib [![Build Status](https://github.com/tych0/xcffib/workflows/ci/badge.svg?branch=master)](https://github.com/tych0/xcffib/actions) `xcffib` is the XCB binding for Python. diff --git a/module/__init__.py b/module/__init__.py index 30d2b15..2fd0f7c 100644 --- a/module/__init__.py +++ b/module/__init__.py @@ -20,10 +20,10 @@ import struct import weakref -from .ffi import ffi, lib, cffi_mode # noqa: F401 +from .ffi import ffi, lib, cffi_mode # noqa: F401 -__xcb_proto_version__ = 'placeholder' -__version__ = 'placeholder' +__xcb_proto_version__ = "placeholder" +__version__ = "placeholder" X_PROTOCOL = lib.X_PROTOCOL X_PROTOCOL_REVISION = lib.X_PROTOCOL_REVISION @@ -71,7 +71,6 @@ def visualtype_to_c_struct(vt): class Unpacker(object): - def __init__(self, known_max=None): self.size = 0 self.offset = 0 @@ -119,7 +118,6 @@ def synthetic(cls, data, format): class CffiUnpacker(Unpacker): - def __init__(self, cdata, known_max=None): self.cdata = cdata Unpacker.__init__(self, known_max) @@ -139,7 +137,6 @@ def copy(self): class MemoryUnpacker(Unpacker): - def __init__(self, buf): self.buf = buf Unpacker.__init__(self, len(self.buf)) @@ -157,12 +154,12 @@ def copy(self): def popcount(n): - return bin(n).count('1') + return bin(n).count("1") class XcffibException(Exception): + """Generic XcbException; replaces xcb.Exception.""" - """ Generic XcbException; replaces xcb.Exception. """ pass @@ -173,28 +170,33 @@ class XcffibNotImplemented(XcffibException, NotImplementedError): class ConnectionException(XcffibException): REASONS = { lib.XCB_CONN_ERROR: ( - 'xcb connection errors because of socket, ' - 'pipe and other stream errors.'), + "xcb connection errors because of socket, pipe and other stream errors." + ), lib.XCB_CONN_CLOSED_EXT_NOTSUPPORTED: ( - 'xcb connection shutdown because extension not supported'), + "xcb connection shutdown because extension not supported" + ), lib.XCB_CONN_CLOSED_MEM_INSUFFICIENT: ( - 'malloc(), calloc() and realloc() error upon failure, ' - 'for eg ENOMEM'), + "malloc(), calloc() and realloc() error upon failure, for eg ENOMEM" + ), lib.XCB_CONN_CLOSED_REQ_LEN_EXCEED: ( - 'Connection closed, exceeding request length that server ' - 'accepts.'), + "Connection closed, exceeding request length that server accepts." + ), lib.XCB_CONN_CLOSED_PARSE_ERR: ( - 'Connection closed, error during parsing display string.'), + "Connection closed, error during parsing display string." + ), lib.XCB_CONN_CLOSED_INVALID_SCREEN: ( - 'Connection closed because the server does not have a screen ' - 'matching the display.'), + "Connection closed because the server does not have a screen " + "matching the display." + ), lib.XCB_CONN_CLOSED_FDPASSING_FAILED: ( - 'Connection closed because some FD passing operation failed'), + "Connection closed because some FD passing operation failed" + ), } def __init__(self, err): XcffibException.__init__( - self, self.REASONS.get(err, "Unknown connection error.")) + self, self.REASONS.get(err, "Unknown connection error.") + ) class ProtocolException(XcffibException): @@ -217,8 +219,7 @@ class ProtocolException(XcffibException): def _add_core(value, __setup, events, errors): if not issubclass(value, Extension): - raise XcffibException( - "Extension type not derived from xcffib.Extension") + raise XcffibException("Extension type not derived from xcffib.Extension") if not issubclass(__setup, Struct): raise XcffibException("Setup type not derived from xcffib.Struct") @@ -235,19 +236,17 @@ def _add_core(value, __setup, events, errors): def _add_ext(key, value, events, errors): if not issubclass(value, Extension): - raise XcffibException( - "Extension type not derived from xcffib.Extension") + raise XcffibException("Extension type not derived from xcffib.Extension") extensions[key] = (value, events, errors) class ExtensionKey(object): - - """ This definitely isn't needed, but we keep it around for compatibility - with xpyb. - """ - def __init__(self, name): self.name = name + self.c_key = ffi.new("struct xcb_extension_t *") + self.c_name = ffi.new("char[]", self.name.encode()) + self.c_key.name = self.c_name + self.c_key.global_id = 0 def __hash__(self): return hash(self.name) @@ -258,19 +257,9 @@ def __eq__(self, o): def __ne__(self, o): return self.name != o.name - def to_cffi(self): - c_key = ffi.new("struct xcb_extension_t *") - c_key.name = name = ffi.new('char[]', self.name.encode()) - cffi_explicit_lifetimes[c_key] = name - # xpyb doesn't ever set global_id, which seems wrong, but whatever. - c_key.global_id = 0 - - return c_key - class Protobj(object): - - """ Note: Unlike xcb.Protobj, this does NOT implement the sequence + """Note: Unlike xcb.Protobj, this does NOT implement the sequence protocol. I found this behavior confusing: Protobj would implement the sequence protocol on self.buf, and then List would go and implement it on List. @@ -325,7 +314,8 @@ def reply(self): def check(self): # Request is not void and checked. assert self.is_checked and self.reply_type is None, ( - "Request is not void and checked") + "Request is not void and checked" + ) self.conn.request_check(self.sequence) def discard_reply(self): @@ -333,25 +323,22 @@ def discard_reply(self): class VoidCookie(Cookie): - def reply(self): raise XcffibException("No reply for this message type") class Extension(object): - def __init__(self, conn, key=None): self.conn = conn if key is None: self.c_key = ffi.NULL else: - c_key = key.to_cffi() - cffi_explicit_lifetimes[self] = c_key - self.c_key = c_key + self.c_key = key.c_key - def send_request(self, opcode, data, cookie=VoidCookie, reply=None, - is_checked=False): + def send_request( + self, opcode, data, cookie=VoidCookie, reply=None, is_checked=False + ): data = data.getvalue() assert len(data) > 3, "xcb_send_request data must be ast least 4 bytes" @@ -371,7 +358,7 @@ def send_request(self, opcode, data, cookie=VoidCookie, reply=None, # Here we need this iov_base to keep this memory alive until the end of # the function. - xcb_parts[2].iov_base = iov_base = ffi.new('char[]', data) # noqa + xcb_parts[2].iov_base = iov_base = ffi.new("char[]", data) # noqa xcb_parts[2].iov_len = len(data) xcb_parts[3].iov_base = ffi.NULL xcb_parts[3].iov_len = -len(data) & 3 # is this really necessary? @@ -384,10 +371,10 @@ def send_request(self, opcode, data, cookie=VoidCookie, reply=None, def __getattr__(self, name): if name.endswith("Checked"): - real = name[:-len("Checked")] + real = name[: -len("Checked")] is_checked = True elif name.endswith("Unchecked"): - real = name[:-len("Unchecked")] + real = name[: -len("Unchecked")] is_checked = False else: raise AttributeError(name) @@ -398,7 +385,6 @@ def __getattr__(self, name): class List(Protobj): - def __init__(self, unpacker, typ, count=None): Protobj.__init__(self, unpacker) @@ -419,7 +405,7 @@ def __init__(self, unpacker, typ, count=None): self.bufsize = unpacker.offset - old - self.raw = bytes(unpacker.buf[old:old + self.bufsize]) + self.raw = bytes(unpacker.buf[old : old + self.bufsize]) assert count is None or count == len(self.list) @@ -442,25 +428,25 @@ def __delitem__(self, key): del self.list[key] def to_string(self): - """ A helper for converting a List of chars to a native string. Dies if + """A helper for converting a List of chars to a native string. Dies if the list contents are not something that could be reasonably converted - to a string. """ + to a string.""" try: - return ''.join(chr(i[0]) for i in self) + return "".join(chr(i[0]) for i in self) except TypeError: - return ''.join(chr(i) for i in self) + return "".join(chr(i) for i in self) def to_nullsep_string(self) -> list[str]: - """ A helper for converting a List of chars to a list of native - strings, starting a new string each time a null (i.e. \\x00) is seen. """ + """A helper for converting a List of chars to a list of native + strings, starting a new string each time a null (i.e. \\x00) is seen.""" return self.to_string().split("\x00") def to_utf8(self): - return b''.join(self).decode('utf-8') + return b"".join(self).decode("utf-8") def to_atoms(self): - """ A helper for converting a List of chars to an array of atoms """ - return struct.unpack("<" + "%dI" % (len(self) // 4), b''.join(self)) + """A helper for converting a List of chars to an array of atoms""" + return struct.unpack("<" + "%dI" % (len(self) // 4), b"".join(self)) def buf(self): return self.raw @@ -476,7 +462,6 @@ def synthetic(cls, list=None): class OffsetMap(object): - def __init__(self, core): self.offsets = [(0, 0, core)] @@ -486,31 +471,35 @@ def add(self, offset, opcode, things): def get_extension_item(self, extension, item): try: - _, _, things = next((k, opcode, v) for k, opcode, v in self.offsets if opcode == extension) + _, _, things = next( + (k, opcode, v) for k, opcode, v in self.offsets if opcode == extension + ) return things[item] except StopIteration: raise IndexError(item) def __getitem__(self, item): try: - offset, _, things = next((k, opcode, v) for k, opcode, v in self.offsets if item >= k) + offset, _, things = next( + (k, opcode, v) for k, opcode, v in self.offsets if item >= k + ) return things[item - offset] except StopIteration: raise IndexError(item) class Connection(object): + """`auth` here should be ':', a format bequeathed to us from + xpyb.""" - """ `auth` here should be ':', a format bequeathed to us from - xpyb. """ def __init__(self, display=None, fd=-1, auth=None): if auth is not None: - [name, data] = auth.split(b':') + [name, data] = auth.split(b":") c_auth = ffi.new("xcb_auth_info_t *") - c_auth.name = ffi.new('char[]', name) + c_auth.name = ffi.new("char[]", name) c_auth.namelen = len(name) - c_auth.data = ffi.new('char[]', data) + c_auth.data = ffi.new("char[]", data) c_auth.datalen = len(data) else: c_auth = ffi.NULL @@ -518,7 +507,7 @@ def __init__(self, display=None, fd=-1, auth=None): if display is None: display = ffi.NULL else: - display = display.encode('latin1') + display = display.encode("latin1") i = ffi.new("int *") @@ -534,8 +523,9 @@ def __init__(self, display=None, fd=-1, auth=None): def _init_x(self): if core is None: - raise XcffibException("No core protocol object has been set. " - "Did you import xcffib.xproto?") + raise XcffibException( + "No core protocol object has been set. Did you import xcffib.xproto?" + ) self.core = core(self) self.setup = self.get_setup() @@ -546,10 +536,7 @@ def _init_x(self): def _setup_extensions(self): for key, (_, events, errors) in extensions.items(): - # We're explicitly not putting this as an argument to the next call - # as a hack for lifetime management. - c_ext = key.to_cffi() - reply = lib.xcb_get_extension_data(self._conn, c_ext) + reply = lib.xcb_get_extension_data(self._conn, key.c_key) self._event_offsets.add(reply.first_event, reply.major_opcode, events) self._error_offsets.add(reply.first_error, reply.major_opcode, errors) @@ -568,6 +555,7 @@ def ensure_connected(f): Check that the connection is valid both before and after the function is invoked. """ + @functools.wraps(f) def wrapper(*args): self = args[0] @@ -576,6 +564,7 @@ def wrapper(*args): return f(*args) finally: self.invalid() + return wrapper @ensure_connected @@ -686,7 +675,7 @@ def request_check(self, sequence): self._process_error(err) def hoist_event(self, e): - """ Hoist an xcb_generic_event_t to the right xcffib structure. """ + """Hoist an xcb_generic_event_t to the right xcffib structure.""" if e.response_type == 0: return self._process_error(ffi.cast("xcb_generic_error_t *", e)) @@ -694,7 +683,7 @@ def hoist_event(self, e): # this bit set. We don't actually care where the event came from, so we # just throw this away. Maybe we could expose this, if anyone actually # cares about it. - response_type = e.response_type & 0x7f + response_type = e.response_type & 0x7F buf = CffiUnpacker(e) event = None @@ -730,7 +719,6 @@ def discard_reply(self, sequence): class Response(Protobj): - def __init__(self, unpacker): Protobj.__init__(self, unpacker) @@ -751,7 +739,6 @@ def __init__(self, unpacker): class Reply(Response): - def __init__(self, unpacker): Response.__init__(self, unpacker) @@ -761,7 +748,6 @@ def __init__(self, unpacker): class Event(Response): - def __init__(self, unpacker): # This is here for debugging purposes! self.unpacker = unpacker @@ -770,11 +756,12 @@ def __init__(self, unpacker): # If this is a xcb_ge_generic_event_t (response type 35) then we need a few more fields if self.xge and isinstance(unpacker, CffiUnpacker): - self.extension, self.length, self.event_type, self.full_sequence = unpacker.unpack("xB2xIH22xI") + self.extension, self.length, self.event_type, self.full_sequence = ( + unpacker.unpack("xB2xIH22xI") + ) # There's some extra work to do if the event has data past the 32 byte boundary if self.length: - # Calculate the size of the original buffer. This is 4 bytes short as it seems to omit the `full_sequence` field buffer_size = 32 + (self.length * 4) + 4 @@ -782,7 +769,7 @@ def __init__(self, unpacker): buffer = ffi.buffer(unpacker.cdata, buffer_size) # Copy the event to the new buffer and skip the `full_sequence` field - buffer[32:buffer_size - 5] = buffer[36: buffer_size - 1] + buffer[32 : buffer_size - 5] = buffer[36 : buffer_size - 1] # Provide the resized buffer to the unpacker unpacker.buf = buffer @@ -793,15 +780,14 @@ def __init__(self, unpacker): class Error(Response, XcffibException): - def __init__(self, unpacker): Response.__init__(self, unpacker) XcffibException.__init__(self) - self.code = unpacker.unpack('B', increment=False) + self.code = unpacker.unpack("B", increment=False) def pack_list(from_, pack_type): - """ Return the wire packed version of `from_`. `pack_type` should be some + """Return the wire packed version of `from_`. `pack_type` should be some subclass of `xcffib.Struct`, or a string that can be passed to `struct.pack`. You must pass `size` if `pack_type` is a struct.pack string. """ @@ -809,7 +795,7 @@ def pack_list(from_, pack_type): if len(from_) == 0: return bytes() - if pack_type == 'c': + if pack_type == "c": if isinstance(from_, bytes): # Catch Python 3 bytes and Python 2 strings # PY3 is "helpful" in that when you do tuple(b'foo') you get @@ -820,7 +806,7 @@ def pack_list(from_, pack_type): # Catch Python 3 strings and Python 2 unicode strings, both of # which we encode to bytes as utf-8 # Here we create the tuple of bytes from the encoded string - from_ = [bytes((b,)) for b in bytearray(from_, 'utf-8')] + from_ = [bytes((b,)) for b in bytearray(from_, "utf-8")] elif isinstance(from_[0], int): # Pack from_ as char array, where from_ may be an array of ints # possibly greater than 256 @@ -828,6 +814,7 @@ def to_bytes(v): for _ in range(4): v, r = divmod(v, 256) yield r + from_ = [bytes((b,)) for i in from_ for b in to_bytes(i)] if isinstance(pack_type, str): @@ -846,7 +833,7 @@ def to_bytes(v): def wrap(ptr): - c_conn = ffi.cast('xcb_connection_t *', ptr) + c_conn = ffi.cast("xcb_connection_t *", ptr) conn = Connection.__new__(Connection) conn._conn = c_conn conn._init_x() diff --git a/module/ffi.py b/module/ffi.py index 5f5932c..8b4c640 100644 --- a/module/ffi.py +++ b/module/ffi.py @@ -6,11 +6,13 @@ try: # Note in ABI mode lib is already available, no dlopen() needed from ._xcffib import ffi, lib + cffi_mode = "api" except ImportError: try: # Note in ABI mode lib will be missing from ._xcffib import ffi + cffi_mode = "abi" except ImportError: # This means that someone is trying to import the module *without* diff --git a/module/ffi_build.py b/module/ffi_build.py index 914bcd5..e0ddb42 100644 --- a/module/ffi_build.py +++ b/module/ffi_build.py @@ -26,12 +26,10 @@ ("X_PROTOCOL", 11), ("X_PROTOCOL_REVISION", 0), ("X_TCP_PORT", 6000), - ("XCB_NONE", 0), ("XCB_COPY_FROM_PARENT", 0), ("XCB_CURRENT_TIME", 0), ("XCB_NO_SYMBOL", 0), - ("XCB_CONN_ERROR", 1), ("XCB_CONN_CLOSED_EXT_NOTSUPPORTED", 2), ("XCB_CONN_CLOSED_MEM_INSUFFICIENT", 3), @@ -39,13 +37,12 @@ ("XCB_CONN_CLOSED_PARSE_ERR", 5), ("XCB_CONN_CLOSED_INVALID_SCREEN", 6), ("XCB_CONN_CLOSED_FDPASSING_FAILED", 7), - - ("XCB_REQUEST_CHECKED", 1 << 0) + ("XCB_REQUEST_CHECKED", 1 << 0), ] # constants -CDEF = '\n'.join("#define %s %d" % (c, v) for c, v in CONSTANTS) +CDEF = "\n".join("#define %s %d" % (c, v) for c, v in CONSTANTS) # types CDEF += """ @@ -254,18 +251,17 @@ def ffi_for_mode(mode): ffi.cdef(CDEF) if mode == "api": ffi.set_source_pkgconfig( - 'xcffib._xcffib', - ['xcb'], + "xcffib._xcffib", + ["xcb"], r""" #include "xcb/xcb.h" #include "xcb/xproto.h" #include "xcb/xcbext.h" #include "xcb/render.h" - """) + """, + ) else: - ffi.set_source( - 'xcffib._xcffib', - None) + ffi.set_source("xcffib._xcffib", None) return ffi @@ -280,8 +276,13 @@ def build_ffi(): file = ffi_api.compile(verbose=True, tmpdir=tempfile.gettempdir()) shutil.copy(file, "xcffib") return ffi_api - except (CCompilerError, ExecError, PlatformError, - PkgConfigError, VerificationError) as e: + except ( + CCompilerError, + ExecError, + PlatformError, + PkgConfigError, + VerificationError, + ) as e: warn("Falling back to precompiled python mode: {}".format(str(e))) ffi_abi = ffi_for_mode("abi") diff --git a/module/testing.py b/module/testing.py index 0b77380..73e3c12 100644 --- a/module/testing.py +++ b/module/testing.py @@ -25,7 +25,7 @@ def lock_path(display): - return '/tmp/.X%d-lock' % display + return "/tmp/.X%d-lock" % display def find_display(): @@ -45,10 +45,9 @@ def find_display(): class XvfbTest: - - """ A helper class for testing things with nosetests. This class will run + """A helper class for testing things with nosetests. This class will run each test in its own fresh xvfb, leaving you with an xcffib connection to - that X session as `self.conn` for use in testing. """ + that X session as `self.conn` for use in testing.""" # Set this to true if you'd like to get xtrace output to stdout of each # test. @@ -60,26 +59,26 @@ def __init__(self, width=800, height=600, depth=16): self.depth = depth def spawn(self, cmd): - """ Spawn a command but swallow its output. """ + """Spawn a command but swallow its output.""" return subprocess.Popen(cmd) def _restore_display(self): if self._old_display is None: - del os.environ['DISPLAY'] + del os.environ["DISPLAY"] else: - os.environ['DISPLAY'] = self._old_display + os.environ["DISPLAY"] = self._old_display def setUp(self): - self._old_display = os.environ.get('DISPLAY') + self._old_display = os.environ.get("DISPLAY") self._display, self._display_lock = find_display() - os.environ['DISPLAY'] = ':%d' % self._display + os.environ["DISPLAY"] = ":%d" % self._display self._xvfb = self.spawn(self._xvfb_command()) if self.xtrace: - subprocess.Popen(['xtrace', '-n']) + subprocess.Popen(["xtrace", "-n"]) # xtrace's default display is :9; obviously this won't work # concurrently, but it's not the default so... - os.environ['DISPLAY'] = ':9' + os.environ["DISPLAY"] = ":9" try: self.conn = self._connect_to_xvfb() except AssertionError: @@ -121,17 +120,17 @@ def __exit__(self, type, value, traceback): self.tearDown() def _xvfb_command(self): - """ You can override this if you have some extra args for Xvfb or + """You can override this if you have some extra args for Xvfb or whatever. At this point, os.environ['DISPLAY'] is set to something Xvfb - can use. """ - screen = '%sx%sx%s' % (self.width, self.height, self.depth) - return ['Xvfb', os.environ['DISPLAY'], '-screen', '0', screen] + can use.""" + screen = "%sx%sx%s" % (self.width, self.height, self.depth) + return ["Xvfb", os.environ["DISPLAY"], "-screen", "0", screen] def _connect_to_xvfb(self): # sometimes it takes a while for Xvfb to start for _ in range(100): try: - conn = Connection(os.environ['DISPLAY']) + conn = Connection(os.environ["DISPLAY"]) conn.invalid() # xvfb creates a screen with a default width, and then resizes it. diff --git a/module/wrappers.py b/module/wrappers.py index 94b32f6..2f30bf0 100644 --- a/module/wrappers.py +++ b/module/wrappers.py @@ -16,6 +16,7 @@ def IDWrapper(freer): Classes create with IDWrapper return an ID and then free it upon exit of the context. """ + class Wrapper: def __init__(self, conn): self.conn = conn diff --git a/requirements.txt b/requirements.txt index b6b7907..f753ffa 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,6 +1,3 @@ -flake8 -autopep8 -cffi>=0.8.2 pytest pytest-xdist cffi >= 1.6 diff --git a/setup.py b/setup.py index dfdfddc..643fed8 100644 --- a/setup.py +++ b/setup.py @@ -27,8 +27,9 @@ class binding_build(build): print a helpful message if they have not been generated yet. We only need to check this when we are actually building or installing. """ + def finalize_options(self): - if not os.path.exists('./xcffib'): + if not os.path.exists("./xcffib"): print("It looks like you need to generate the binding.") print("please run 'make xcffib' or 'make check'.") sys.exit(1) @@ -37,7 +38,7 @@ def finalize_options(self): class binding_install(install): def finalize_options(self): - if not os.path.exists('./xcffib'): + if not os.path.exists("./xcffib"): print("It looks like you need to generate the binding.") print("please run 'make xcffib' or 'make check'.") sys.exit(1) @@ -57,21 +58,18 @@ def finalize_options(self): author_email="tycho@tycho.pizza", install_requires=dependencies, setup_requires=dependencies, - python_requires = ">=3.10", - packages=['xcffib'], - package_data={'xcffib': ['py.typed']}, + python_requires=">=3.10", + packages=["xcffib"], + package_data={"xcffib": ["py.typed"]}, zip_safe=False, - cmdclass={ - 'build': binding_build, - 'install': binding_install - }, + cmdclass={"build": binding_build, "install": binding_install}, classifiers=[ - 'Development Status :: 5 - Production/Stable', - 'License :: OSI Approved :: Apache Software License', - 'Programming Language :: Python :: 3', - 'Programming Language :: Python :: Implementation :: CPython', - 'Programming Language :: Python :: Implementation :: PyPy', - 'Topic :: Software Development :: Libraries' + "Development Status :: 5 - Production/Stable", + "License :: OSI Approved :: Apache Software License", + "Programming Language :: Python :: 3", + "Programming Language :: Python :: Implementation :: CPython", + "Programming Language :: Python :: Implementation :: PyPy", + "Topic :: Software Development :: Libraries", ], - cffi_modules=["xcffib/ffi_build.py:build_ffi"] + cffi_modules=["xcffib/ffi_build.py:build_ffi"], ) diff --git a/test/conftest.py b/test/conftest.py index d018660..5bd9be6 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -33,7 +33,7 @@ def xproto_test(xcffib_test): class XcffibTest(XvfbTest): - """ A home for common functions needed for xcffib testing. """ + """A home for common functions needed for xcffib testing.""" def setUp(self): XvfbTest.setUp(self) @@ -54,16 +54,16 @@ def create_window(self, wid=None, x=0, y=0, w=1, h=1, is_checked=False): self.default_screen.root_depth, wid, self.default_screen.root, - x, y, w, h, + x, + y, + w, + h, 0, xcffib.xproto.WindowClass.InputOutput, self.default_screen.root_visual, xcffib.xproto.CW.BackPixel | xcffib.xproto.CW.EventMask, - [ - self.default_screen.black_pixel, - xcffib.xproto.EventMask.StructureNotify - ], - is_checked=is_checked + [self.default_screen.black_pixel, xcffib.xproto.EventMask.StructureNotify], + is_checked=is_checked, ) def xeyes(self): @@ -72,13 +72,13 @@ def xeyes(self): self.default_screen.root, xcffib.xproto.CW.EventMask, [ - EventMask.SubstructureNotify | - EventMask.StructureNotify | - EventMask.SubstructureRedirect - ] + EventMask.SubstructureNotify + | EventMask.StructureNotify + | EventMask.SubstructureRedirect + ], ).check() - self.spawn(['xeyes']) + self.spawn(["xeyes"]) def intern(self, name): return self.xproto.InternAtom(False, len(name), name).reply().atom diff --git a/test/test_connection.py b/test/test_connection.py index ea879c7..5042efe 100644 --- a/test/test_connection.py +++ b/test/test_connection.py @@ -184,7 +184,7 @@ def test_ChangeProperty_NET_WM_NAME(self, xproto_test): utf8_string = xproto_test.intern("UTF8_STRING") title_bytes = b"test\xc2\xb7" - title_string = "test\u00B7" + title_string = "test\u00b7" # First check with an object already encoded as bytes xproto_test.xproto.ChangeProperty( @@ -198,7 +198,7 @@ def test_ChangeProperty_NET_WM_NAME(self, xproto_test): ) reply = xproto_test.xproto.GetProperty( - False, wid, net_wm_name, xcffib.xproto.GetPropertyType.Any, 0, (2 ** 32) - 1 + False, wid, net_wm_name, xcffib.xproto.GetPropertyType.Any, 0, (2**32) - 1 ).reply() print(reply.value.buf()) @@ -218,7 +218,7 @@ def test_ChangeProperty_NET_WM_NAME(self, xproto_test): ) reply = xproto_test.xproto.GetProperty( - False, wid, net_wm_name, xcffib.xproto.GetPropertyType.Any, 0, (2 ** 32) - 1 + False, wid, net_wm_name, xcffib.xproto.GetPropertyType.Any, 0, (2**32) - 1 ).reply() assert reply.value.buf() == title_bytes diff --git a/test/test_crazy_window_script.py b/test/test_crazy_window_script.py index a90181c..3c63ac2 100644 --- a/test/test_crazy_window_script.py +++ b/test/test_crazy_window_script.py @@ -21,8 +21,9 @@ # SOFTWARE. """ - This is mostly stolen from qtile's tests/scripts/window.py +This is mostly stolen from qtile's tests/scripts/window.py """ + import os import sys import struct diff --git a/test/test_fakeinput.py b/test/test_fakeinput.py index 14d6944..56657c7 100644 --- a/test/test_fakeinput.py +++ b/test/test_fakeinput.py @@ -11,35 +11,15 @@ def test_fakeinput(xcffib_test): def test(x, y): # motion - xtest.FakeInput( - 6, - 0, - xcffib.xproto.Time.CurrentTime, - screen.root, - x, - y, - 0) + xtest.FakeInput(6, 0, xcffib.xproto.Time.CurrentTime, screen.root, x, y, 0) # press - xtest.FakeInput( - 4, - 1, - xcffib.xproto.Time.CurrentTime, - screen.root, - 0, - 0, - 0) + xtest.FakeInput(4, 1, xcffib.xproto.Time.CurrentTime, screen.root, 0, 0, 0) # release - xtest.FakeInput( - 5, - 1, - xcffib.xproto.Time.CurrentTime, - screen.root, - 2, - 2, - 0) + xtest.FakeInput(5, 1, xcffib.xproto.Time.CurrentTime, screen.root, 2, 2, 0) xcffib_test.conn.flush() + test(50, 10) # we shouldn't get any errors diff --git a/test/test_mode.sh b/test/test_mode.sh index cade872..33a5f89 100755 --- a/test/test_mode.sh +++ b/test/test_mode.sh @@ -24,4 +24,4 @@ flock xcffib.cabal pip install --no-cache -v . # we won't pick up the local source xcffib module. the venv dir is as good a # place as any python -I -c "import xcffib; print(\"mode is\", xcffib.cffi_mode, \"path is\", xcffib.__file__); assert xcffib.cffi_mode == \"${MODE}\"" -python -I -m pytest --durations=3 -n auto +python -I -m pytest --durations=3 diff --git a/test/test_python_code.py b/test/test_python_code.py index 25587a7..7610d18 100644 --- a/test/test_python_code.py +++ b/test/test_python_code.py @@ -27,23 +27,22 @@ class TestPythonCode: - def test_struct_pack_uses_List(self): # suppose we have a list of ints... ints = struct.pack("=IIII", *range(4)) # Unpacker wants a cffi.cdata - cffi_ints = xcffib.ffi.new('char[]', ints) + cffi_ints = xcffib.ffi.new("char[]", ints) - l = xcffib.List(xcffib.CffiUnpacker(cffi_ints), "I", count=4) - ints2 = struct.pack("=IIII", *l) + data = xcffib.List(xcffib.CffiUnpacker(cffi_ints), "I", count=4) + ints2 = struct.pack("=IIII", *data) # after packing and unpacking, we should still have those ints assert ints == ints2 def test_union_pack(self): data = struct.pack("=" + ("b" * 20), *range(20)) - cffi_data = xcffib.ffi.new('char[]', data) + cffi_data = xcffib.ffi.new("char[]", data) cm = xcffib.xproto.ClientMessageData(xcffib.CffiUnpacker(cffi_data)) @@ -53,11 +52,11 @@ def test_union_pack(self): if sys.byteorder == "little": assert cm.data32[0] == 0x03020100 assert cm.data32[1] == 0x07060504 - assert cm.data32[2] == 0x0b0a0908 + assert cm.data32[2] == 0x0B0A0908 elif sys.byteorder == "big": assert cm.data32[0] == 0x00010203 assert cm.data32[1] == 0x04050607 - assert cm.data32[2] == 0x08090a0b + assert cm.data32[2] == 0x08090A0B else: raise Exception("unknown byte order?") @@ -96,10 +95,7 @@ def test_create_ClientMessageEvent(self, xcffib_test): wm_delete_window = xcffib_test.intern("WM_DELETE_WINDOW") e = xcffib.xproto.ClientMessageEvent.synthetic( - format=32, - window=wid, - type=wm_protocols, - data=union + format=32, window=wid, type=wm_protocols, data=union ) xcffib_test.xproto.SendEvent(False, wid, EventMask.NoEvent, e.pack()) @@ -126,13 +122,10 @@ def test_pack_from_event(self, xcffib_test): union = xcffib.xproto.ClientMessageData.synthetic(data, "I" * 5) e = xcffib.xproto.ClientMessageEvent.synthetic( - format=32, - window=wid, - type=wm_protocols, - data=union + format=32, window=wid, type=wm_protocols, data=union ) - e2 = xcffib.xproto.ClientMessageEvent(e) + _ = xcffib.xproto.ClientMessageEvent(e) def test_get_image(self, xcffib_test): # adapted from: https://gist.github.com/liftoff/4741790 @@ -144,9 +137,10 @@ def test_get_image(self, xcffib_test): # GetImage requires an output format as the first arg. We want ZPixmap: output_format = xcffib.xproto.ImageFormat.ZPixmap - plane_mask = 2**32 - 1 # No idea what this is but it works! + plane_mask = 2**32 - 1 # No idea what this is but it works! reply = xcffib_test.conn.core.GetImage( - output_format, root, 0, 0, width, height, plane_mask).reply() + output_format, root, 0, 0, width, height, plane_mask + ).reply() reply.data.buf() def test_ge_generic_event_hoist(self, xcffib_test): @@ -173,7 +167,7 @@ def test_ge_generic_event_hoist(self, xcffib_test): 100 << 16, # root_x 200 << 16, # root_y 0, # dx - 0 # dy + 0, # dy ) # Create cdata from the bytearray and cast it to a generic reply @@ -182,7 +176,7 @@ def test_ge_generic_event_hoist(self, xcffib_test): # Pass the reply to our hoist_event method event = xcffib_test.conn.hoist_event(generic_reply) - + assert isinstance(event, xcffib.xinput.BarrierHitEvent) assert event.root_x >> 16 == 100 assert event.root_y >> 16 == 200 @@ -196,22 +190,20 @@ def test_List_to_string(self, xcffib_test): for output in scrn_rsrcs.outputs: info = xrandr.GetOutputInfo(output, xcffib.XCB_CURRENT_TIME).reply() print(info.name.to_string()) - assert info.name.to_string() == 'screen' + assert info.name.to_string() == "screen" finally: xcffib_test.conn.disconnect() - class TestXcffibTestGenerator: - def test_XcffibTest_generator(self): try: - old_display = os.environ['DISPLAY'] + old_display = os.environ["DISPLAY"] except KeyError: old_display = "" # use some non-default width/height with XcffibTest(width=1001, height=502) as test: - assert os.environ['DISPLAY'] != old_display + assert os.environ["DISPLAY"] != old_display setup = test.conn.get_setup() screen = setup.roots[0] width = screen.width_in_pixels diff --git a/test/test_render.py b/test/test_render.py index ff4b844..0ce8b44 100644 --- a/test/test_render.py +++ b/test/test_render.py @@ -28,7 +28,9 @@ def test_CreateLinearGradient(self, xproto_test): visual = setup.roots[0].root_visual black = setup.roots[0].black_pixel conn.render = conn(xcffib.render.key) - conn.render.QueryVersion(xcffib.render.MAJOR_VERSION, xcffib.render.MINOR_VERSION) + conn.render.QueryVersion( + xcffib.render.MAJOR_VERSION, xcffib.render.MINOR_VERSION + ) window = conn.generate_id() conn.core.CreateWindow( @@ -65,12 +67,16 @@ def test_CreateLinearGradient(self, xproto_test): conn.render.CreateLinearGradientChecked( pic_gradient, xcffib.render.POINTFIX.synthetic(0, 0), - xcffib.render.POINTFIX.synthetic(double_to_fixed(WIDTH), double_to_fixed(HEIGHT)), + xcffib.render.POINTFIX.synthetic( + double_to_fixed(WIDTH), double_to_fixed(HEIGHT) + ), 2, [0, double_to_fixed(1)], [ - xcffib.render.COLOR.synthetic(0, 0, 0, 0xffff), # Solid black - xcffib.render.COLOR.synthetic(0xffff, 0xffff, 0xffff, 0xffff), # Solid white + xcffib.render.COLOR.synthetic(0, 0, 0, 0xFFFF), # Solid black + xcffib.render.COLOR.synthetic( + 0xFFFF, 0xFFFF, 0xFFFF, 0xFFFF + ), # Solid white ], ).check() @@ -80,18 +86,24 @@ def test_CreateLinearGradient(self, xproto_test): pic_gradient, 0, pic_window, - 0, 0, - 0, 0, - 0, 0, - WIDTH, HEIGHT + 0, + 0, + 0, + 0, + 0, + 0, + WIDTH, + HEIGHT, ) img = conn.core.GetImage( xcffib.xproto.ImageFormat.ZPixmap, window, - 0, 0, - WIDTH, HEIGHT, - 0xffffffff + 0, + 0, + WIDTH, + HEIGHT, + 0xFFFFFFFF, ).reply() conn.flush() diff --git a/test/test_xkb.py b/test/test_xkb.py index 77df63c..11535a6 100644 --- a/test/test_xkb.py +++ b/test/test_xkb.py @@ -1,16 +1,13 @@ -import xcffib -import xcffib.xproto - def test_query_rules_names(xproto_test): - setup = xproto_test.conn.get_setup() - root = xproto_test.default_screen.root string = xproto_test.intern("STRING") xkb_rules_names = xproto_test.intern("_XKB_RULES_NAMES") # should be enough for anybody... - prop = xproto_test.xproto.GetProperty(False, root, xkb_rules_names, string, 0, 64 * 1024).reply() + prop = xproto_test.xproto.GetProperty( + False, root, xkb_rules_names, string, 0, 64 * 1024 + ).reply() strings = prop.value.to_nullsep_string() # xephyr's defaults