Skip to content
This repository has been archived by the owner on Apr 4, 2024. It is now read-only.

Commit

Permalink
Merge branch 'main-python' into main
Browse files Browse the repository at this point in the history
  • Loading branch information
nedtwigg committed Mar 20, 2024
2 parents 43b7d1a + 8f7528a commit 08ba36e
Show file tree
Hide file tree
Showing 22 changed files with 1,473 additions and 25 deletions.
8 changes: 4 additions & 4 deletions .github/workflows/python-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ on:
branches: [main]
pull_request:
paths:
- 'python/**'
- "python/**"
defaults:
run:
working-directory: python/selfie-lib
Expand All @@ -24,9 +24,9 @@ jobs:
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version-file: 'python/selfie-lib/pyproject.toml'
cache: 'poetry'
python-version-file: "python/selfie-lib/pyproject.toml"
cache: "poetry"
- run: poetry install
- run: poetry run pytest -vv
- run: poetry run pyright
- run: poetry run ruff check
- run: poetry run ruff format --check
3 changes: 3 additions & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{
"java.compile.nullAnalysis.mode": "automatic"
}
123 changes: 123 additions & 0 deletions python/selfie-lib/selfie_lib/ArrayMap.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
from collections.abc import Set, Iterator, Mapping
from typing import List, TypeVar, Union
from abc import abstractmethod, ABC

T = TypeVar("T")
V = TypeVar("V")
K = TypeVar("K")


class ListBackedSet(Set[T], ABC):
@abstractmethod
def __len__(self) -> int:
...

@abstractmethod
def __getitem__(self, index: Union[int, slice]) -> Union[T, List[T]]:
...

def __contains__(self, item: object) -> bool:
for i in range(len(self)):
if self[i] == item:
return True
return False


class ArraySet(ListBackedSet[K]):
__data: List[K]

def __init__(self, data: List[K]):
raise NotImplementedError("Use ArraySet.empty() instead")

@classmethod
def __create(cls, data: List[K]) -> "ArraySet[K]":
# Create a new instance without calling __init__
instance = super().__new__(cls)
instance.__data = data
return instance

def __iter__(self) -> Iterator[K]:
return iter(self.__data)

@classmethod
def empty(cls) -> "ArraySet[K]":
if not hasattr(cls, "__EMPTY"):
cls.__EMPTY = cls([])
return cls.__EMPTY

def __len__(self) -> int:
return len(self.__data)

def __getitem__(self, index: Union[int, slice]) -> Union[K, List[K]]:
if isinstance(index, int):
return self.__data[index]
elif isinstance(index, slice):
return self.__data[index]
else:
raise TypeError("Invalid argument type.")

def plusOrThis(self, element: K) -> "ArraySet[K]":
# TODO: use binary search, and also special sort order for strings
if element in self.__data:
return self
else:
new_data = self.__data[:]
new_data.append(element)
new_data.sort() # type: ignore[reportOperatorIssue]
return ArraySet.__create(new_data)


class ArrayMap(Mapping[K, V]):
def __init__(self, data: list):
# TODO: hide this constructor as done in ArraySet
self.__data = data

@classmethod
def empty(cls) -> "ArrayMap[K, V]":
if not hasattr(cls, "__EMPTY"):
cls.__EMPTY = cls([])
return cls.__EMPTY

def __getitem__(self, key: K) -> V:
index = self.__binary_search_key(key)
if index >= 0:
return self.__data[2 * index + 1]
raise KeyError(key)

def __iter__(self) -> Iterator[K]:
return (self.__data[i] for i in range(0, len(self.__data), 2))

def __len__(self) -> int:
return len(self.__data) // 2

def __binary_search_key(self, key: K) -> int:
# TODO: special sort order for strings
low, high = 0, (len(self.__data) // 2) - 1
while low <= high:
mid = (low + high) // 2
mid_key = self.__data[2 * mid]
if mid_key < key:
low = mid + 1
elif mid_key > key:
high = mid - 1
else:
return mid
return -(low + 1)

def plus(self, key: K, value: V) -> "ArrayMap[K, V]":
index = self.__binary_search_key(key)
if index >= 0:
raise ValueError("Key already exists")
insert_at = -(index + 1)
new_data = self.__data[:]
new_data[insert_at * 2 : insert_at * 2] = [key, value]
return ArrayMap(new_data)

def minus_sorted_indices(self, indicesToRemove: List[int]) -> "ArrayMap[K, V]":
if not indicesToRemove:
return self
newData = []
for i in range(0, len(self.__data), 2):
if i // 2 not in indicesToRemove:
newData.extend(self.__data[i : i + 2])
return ArrayMap(newData)
87 changes: 87 additions & 0 deletions python/selfie-lib/selfie_lib/CommentTracker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
from typing import Dict, Iterable, Tuple
from enum import Enum, auto
import threading
from selfie_lib.TypedPath import TypedPath
from selfie_lib.Slice import Slice


# Placeholder implementations for CallStack, SnapshotFileLayout, and FS
class CallStack:
pass


class SnapshotFileLayout:
def sourcePathForCall(self, location) -> "TypedPath":
# Placeholder return or raise NotImplementedError
raise NotImplementedError("sourcePathForCall is not implemented")


class WritableComment(Enum):
NO_COMMENT = auto()
ONCE = auto()
FOREVER = auto()

@property
def writable(self) -> bool:
return self != WritableComment.NO_COMMENT


class CommentTracker:
def __init__(self):
self.cache: Dict[TypedPath, WritableComment] = {}
self.lock = threading.Lock()

def pathsWithOnce(self) -> Iterable[TypedPath]:
with self.lock:
return [
path
for path, comment in self.cache.items()
if comment == WritableComment.ONCE
]

def hasWritableComment(self, call: CallStack, layout: SnapshotFileLayout) -> bool:
path = layout.sourcePathForCall(call)
with self.lock:
if path in self.cache:
comment = self.cache[path]
if comment.writable:
return True
else:
return False
else:
new_comment, _ = self.__commentAndLine(path)
self.cache[path] = new_comment
return new_comment.writable

@staticmethod
def commentString(typedPath: TypedPath) -> Tuple[str, int]:
comment, line = CommentTracker.__commentAndLine(typedPath)
if comment == WritableComment.NO_COMMENT:
raise ValueError("No writable comment found")
elif comment == WritableComment.ONCE:
return ("//selfieonce", line)
elif comment == WritableComment.FOREVER:
return ("//SELFIEWRITE", line)
else:
raise ValueError("Invalid comment type")

@staticmethod
def __commentAndLine(typedPath: TypedPath) -> Tuple[WritableComment, int]:
with open(typedPath.absolute_path, "r") as file:
content = Slice(file.read())
for comment_str in [
"//selfieonce",
"// selfieonce",
"//SELFIEWRITE",
"// SELFIEWRITE",
]:
index = content.indexOf(comment_str)
if index != -1:
lineNumber = content.baseLineAtOffset(index)
comment = (
WritableComment.ONCE
if "once" in comment_str
else WritableComment.FOREVER
)
return (comment, lineNumber)
return (WritableComment.NO_COMMENT, -1)
12 changes: 12 additions & 0 deletions python/selfie-lib/selfie_lib/EscapeLeadingWhitespace.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
from enum import Enum, auto


class EscapeLeadingWhitespace(Enum):
NEVER = auto()

def escape_line(self, line: str, space: str, tab: str) -> str:
return line

@staticmethod
def appropriate_for(file_content: str) -> "EscapeLeadingWhitespace":
return EscapeLeadingWhitespace.NEVER
35 changes: 35 additions & 0 deletions python/selfie-lib/selfie_lib/LineReader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import io


class LineReader:
def __init__(self, content: bytes):
self.__buffer = io.BytesIO(content)
self.__uses_unix_newlines = self.__detect_newline_type()
self.__line_count = 0 # Initialize line count

@classmethod
def for_binary(cls, content: bytes):
return cls(content)

@classmethod
def for_string(cls, content: str):
return cls(content.encode("utf-8"))

def __detect_newline_type(self) -> bool:
first_line = self.__buffer.readline()
self.__buffer.seek(0) # Reset buffer for actual reading
return b"\r\n" not in first_line

def unix_newlines(self) -> bool:
return self.__uses_unix_newlines

def read_line(self) -> str:
line_bytes = self.__buffer.readline()
if line_bytes:
self.__line_count += 1 # Increment line count for each line read
line = line_bytes.decode("utf-8")
return line.rstrip("\r\n" if not self.__uses_unix_newlines else "\n")

# Method to get the current line number
def get_line_number(self) -> int:
return self.__line_count
89 changes: 89 additions & 0 deletions python/selfie-lib/selfie_lib/Literals.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
from enum import Enum, auto
from typing import Protocol, TypeVar
from abc import abstractmethod
from .EscapeLeadingWhitespace import EscapeLeadingWhitespace
import io

T = TypeVar("T")


class Language(Enum):
PYTHON = auto()

@classmethod
def from_filename(cls, filename: str) -> "Language":
extension = filename.rsplit(".", 1)[-1]
if extension == "py":
return cls.PYTHON
else:
raise ValueError(f"Unknown language for file {filename}")


class LiteralValue:
def __init__(self, expected: T | None, actual: T, format: "LiteralFormat") -> None:
self.expected = expected
self.actual = actual
self.format = format


class LiteralFormat(Protocol[T]):
@abstractmethod
def encode(
self, value: T, language: Language, encoding_policy: "EscapeLeadingWhitespace"
) -> str:
raise NotImplementedError("Subclasses must implement the encode method")

@abstractmethod
def parse(self, string: str, language: Language) -> T:
raise NotImplementedError("Subclasses must implement the parse method")


MAX_RAW_NUMBER = 1000
PADDING_SIZE = len(str(MAX_RAW_NUMBER)) - 1


class LiteralInt(LiteralFormat[int]):
def _encode_underscores(
self, buffer: io.StringIO, value: int, language: Language
) -> io.StringIO:
if value >= MAX_RAW_NUMBER:
mod = value % MAX_RAW_NUMBER
left_padding = PADDING_SIZE - len(str(mod))
self._encode_underscores(buffer, value // MAX_RAW_NUMBER, language)
buffer.write("_")
buffer.write("0" * left_padding)
buffer.write(str(mod))
return buffer
elif value < 0:
buffer.write("-")
self._encode_underscores(buffer, abs(value), language)
return buffer
else:
buffer.write(str(value))
return buffer

def encode(
self, value: int, language: Language, encoding_policy: EscapeLeadingWhitespace
) -> str:
return self._encode_underscores(io.StringIO(), value, language).getvalue()

def parse(self, string: str, language: Language) -> int:
return int(string.replace("_", ""))


class LiteralBoolean(LiteralFormat[bool]):
def encode(
self, value: bool, language: Language, encoding_policy: EscapeLeadingWhitespace
) -> str:
return str(value)

def __to_boolean_strict(self, string: str) -> bool:
if string.lower() == "true":
return True
elif string.lower() == "false":
return False
else:
raise ValueError("String is not a valid boolean representation: " + string)

def parse(self, string: str, language: Language) -> bool:
return self.__to_boolean_strict(string)
Loading

0 comments on commit 08ba36e

Please sign in to comment.