- Bump
tartiflette
version to <0.12.0,>=0.11.0. - Update to the new
create_engine
API, breaking the way theengine
parameter works.
Before it was possible to do:
from tartiflette import Engine
engine = Engine(sdl)
ctx = {
'user_service': user_service
}
web.run_app(
register_graphql_handlers(
app=web.Application(),
engine=engine,
executor_context=ctx,
executor_http_endpoint='/graphql',
executor_http_methods=['POST', 'GET']
)
)
Typically you were able to provide your own already initialized Engine
Instance.
Now the engine is issued by the coroutine returning method create_engine
.
You have 3 choice in order to provide your own Engine :
from tartiflette import create_engine
engine = create_engine(sdl)
ctx = {
'user_service': user_service
}
web.run_app(
register_graphql_handlers(
app=web.Application(),
engine=engine,
executor_context=ctx,
executor_http_endpoint='/graphql',
executor_http_methods=['POST', 'GET']
)
)
register_graphql_handlers
will await that coroutine. (It can be any coroutine that respect the create_engine prototype)
from tartiflette import Engine
class myEngine(Engine):
def __init__(self, my, parameters):
super().__init__()
self._my = my
self._parameters = parameters
async def cook(
self,
sdl: Union[str, List[str]],
error_coercer: Callable[[Exception], dict] = None,
custom_default_resolver: Optional[Callable] = None,
modules: Optional[Union[str, List[str]]] = None,
schema_name: str = "default",
):
await super().cook(
sdl,
error_coercer,
custom_default_resolver,
modules,
schema_name
)
print(self._my, self._parameters)
ctx = {
'user_service': user_service
}
web.run_app(
register_graphql_handlers(
app=web.Application(),
engine=myEngine(my, parameters),
executor_context=ctx,
executor_http_endpoint='/graphql',
executor_http_methods=['POST', 'GET']
)
)
from tartiflette import Engine
class myEngine:
def __init__(self, my, parameters):
self._engine = Engine()
self._my = my
self._parameters = parameters
async def cook(
self,
sdl: Union[str, List[str]],
error_coercer: Callable[[Exception], dict] = None,
custom_default_resolver: Optional[Callable] = None,
modules: Optional[Union[str, List[str]]] = None,
schema_name: str = "default",
):
print(self._my)
await self._engine.cook(
sdl,
error_coercer,
custom_default_resolver,
modules,
schema_name
)
print(self._parameters)
async def execute(
self,
query: str,
operation_name: Optional[str] = None,
context: Optional[Dict[str, Any]] = None,
variables: Optional[Dict[str, Any]] = None,
initial_value: Optional[Any] = None,
):
return await self._engine.execute(
query,
operation_name,
context,
variables,
initial_value
)
ctx = {
'user_service': user_service
}
web.run_app(
register_graphql_handlers(
app=web.Application(),
engine=myEngine(my, parameters),
executor_context=ctx,
executor_http_endpoint='/graphql',
executor_http_methods=['POST', 'GET']
)
)