-
Notifications
You must be signed in to change notification settings - Fork 713
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
asyncio.exceptions.CancelledError: Cancelled by cancel scope 7f27c9d9bbe0 #834
Comments
If it works with SQLAlchemyDataStore, then which job data store did you have problems with? Note that this report isn't very useful if I can't reproduce the problem here. |
I can reproduce this bug only with SQLAlchemyDataStore, other code doesn't affect for reproducing this bug |
You said in the report that it's working always with SQLAlchemyDataStore. Which is it? |
I'm sorry, I didn't express myself correctly in the report. This bug is working always with SQLAlchemyDataStore |
Which database driver was this with? Can you reproduce it with another driver? |
PostgreSQL |
That's not a database driver. Are you using psycopg, asyncpg or what? |
asyncpg |
And have you tried with another driver yet? |
No, haven't tried yet |
Could you also try with the latest |
Okay, I'll try it now |
I've received this error after startup the application
|
Ok, have you tried with the default pickle serializer? |
Where I should specify the default pickle serializer? |
Where do you have code that makes it use the JSON serializer? |
I use custom JsonSerializer in sqlalchemy engine class CustomSerializer(json.JSONEncoder):
def default(self, obj: typing.Any) -> typing.Any:
if isinstance(obj, (datetime.date, datetime.datetime)):
return obj.isoformat()
elif isinstance(obj, BaseModel):
return obj.dict()
def database_json_serializer(v):
return json.dumps(v, indent=2, ensure_ascii=False, cls=CustomSerializer)
engine = create_async_engine(
config.postgres_dsn,
pool_size=20,
max_overflow=60,
json_serializer=database_json_serializer
) |
The exception originated from the event broker though. Are you passing a JSON serializer to that? |
No class CustomSerializer(json.JSONEncoder):
def default(self, obj: typing.Any) -> typing.Any:
if isinstance(obj, (datetime.date, datetime.datetime)):
return obj.isoformat()
elif isinstance(obj, BaseModel):
return obj.dict()
def database_json_serializer(v):
return json.dumps(v, indent=2, ensure_ascii=False, cls=CustomSerializer)
engine = create_async_engine(
config.postgres_dsn,
pool_size=20,
max_overflow=60,
json_serializer=database_json_serializer
)
data_store = SQLAlchemyDataStore(scheduler_engine)
event_broker = AsyncpgEventBroker.from_async_sqla_engine(scheduler_engine)
scheduler = AsyncScheduler(data_store, event_broker) |
Does the async_postgres example work for you? |
Yeah, it's working, but I use apscheduler with fastapi. A minimal example I described in #833 import asyncio
import datetime
import json
import logging
import typing
from contextlib import asynccontextmanager
import uvicorn
from apscheduler import AsyncScheduler
from apscheduler.datastores.sqlalchemy import SQLAlchemyDataStore
from apscheduler.eventbrokers.asyncpg import AsyncpgEventBroker
from apscheduler.triggers.interval import IntervalTrigger
from fastapi import FastAPI, Response
from fastapi.middleware import Middleware
from pydantic import BaseModel
from sqlalchemy import Column, BigInteger, String
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, AsyncSession
from sqlalchemy.orm import declarative_base
from starlette.types import ASGIApp, Scope, Receive, Send
logger = logging.getLogger(__name__)
class CustomSerializer(json.JSONEncoder):
def default(self, obj: typing.Any) -> typing.Any:
if isinstance(obj, (datetime.date, datetime.datetime)):
return obj.isoformat()
elif isinstance(obj, BaseModel):
return obj.dict()
def database_json_serializer(v):
return json.dumps(v, indent=2, ensure_ascii=False, cls=CustomSerializer)
engine = create_async_engine(
"postgresql+asyncpg://",
pool_size=20,
max_overflow=60,
json_serializer=database_json_serializer
)
Session = async_sessionmaker(autocommit=False, autoflush=False, bind=engine, class_=AsyncSession)
Base = declarative_base()
class Item(Base):
__tablename__ = "items"
id = Column(BigInteger, autoincrement=True, primary_key=True)
name = Column(String, nullable=False)
code = Column(String, unique=True, nullable=False)
async def job_1():
logger.info("JOB_1 EXECUTE START")
await asyncio.sleep(5)
logger.info("JOB_1 EXECUTE END")
async def job_2():
logger.info("JOB_2 EXECUTE START")
async with Session() as db:
new_obj = Item(name="Item", code="1a1a1a")
db.add(new_obj)
await db.commit()
await db.refresh(new_obj)
await asyncio.sleep(5)
logger.info("JOB_2 EXECUTE END")
async def job_3():
logger.info("JOB_3 EXECUTE START")
await asyncio.sleep(10)
logger.info("JOB_3 EXECUTE END")
class SchedulerMiddleware:
def __init__(
self,
app: ASGIApp,
scheduler: AsyncScheduler,
) -> None:
self.app = app
self.scheduler = scheduler
async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
if scope["type"] == "lifespan":
async with self.scheduler:
await self.scheduler.add_schedule(
job_1,
IntervalTrigger(minutes=1),
id="job_1"
)
await self.scheduler.add_schedule(
job_2,
IntervalTrigger(minutes=1),
id="job_2"
)
await self.scheduler.add_schedule(
job_3,
IntervalTrigger(minutes=1),
id="job_3"
)
await self.scheduler.start_in_background()
await self.app(scope, receive, send)
else:
await self.app(scope, receive, send)
@asynccontextmanager
async def lifespan(_: FastAPI):
logger.info("LIFESPAN EVENT")
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
yield
data_store = SQLAlchemyDataStore(engine)
event_broker = AsyncpgEventBroker.from_async_sqla_engine(engine)
scheduler = AsyncScheduler(data_store, event_broker)
app = FastAPI(lifespan=lifespan, middleware=[
Middleware(SchedulerMiddleware, scheduler=scheduler)
])
@app.get("/")
async def route_main():
return Response(status_code=202)
if __name__ == "__main__":
uvicorn.run(
"apscheduler_test:app",
port=5000,
host="localhost",
reload=False,
proxy_headers=False,
log_config="logging.json"
) |
Ok, I can reproduce the error on my end now using the asgi_fastapi example. |
I've pushed an update to master. Could you update and try again? |
Yeah, the new update no longer has this error(TypeError: Object of type 'JobOutcome' is not JSON serializable) |
Are you seeing any other errors anymore? Can I close this issue? |
I did a few more tests and found that the error remained. This time I added a long delay to the job to simulate a real long job. Code: async def job_1():
print("JOB_1 EXECUTE START")
await asyncio.sleep(5)
print("JOB_1 EXECUTE END")
async def job_2():
print("JOB_2 EXECUTE START")
await asyncio.sleep(180)
print("JOB_2 EXECUTE END")
class SchedulerMiddleware:
def __init__(self, app: ASGIApp, scheduler: AsyncScheduler) -> None:
self.app = app
self.scheduler = scheduler
async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
if scope["type"] == "lifespan":
async with self.scheduler:
await self.scheduler.add_schedule(job_1, IntervalTrigger(minutes=1), id="job_1")
await self.scheduler.add_schedule(job_2, IntervalTrigger(minutes=1), id="job_2")
await self.scheduler.start_in_background()
await self.app(scope, receive, send)
else:
await self.app(scope, receive, send)
@asynccontextmanager
async def lifespan(_: FastAPI):
print("LIFESPAN EVENT")
yield
engine = create_async_engine("postgresql+asyncpg://...")
data_store = SQLAlchemyDataStore(engine)
event_broker = AsyncpgEventBroker.from_async_sqla_engine(engine)
scheduler = AsyncScheduler(data_store, event_broker)
app = FastAPI(lifespan=lifespan, middleware=[Middleware(SchedulerMiddleware, scheduler=scheduler)]) Logs:
|
@agronholm can you check, please? |
I'll check it this weekend. |
Also I've noticed that this error always throws after a long job(ex. 10 minutes)
|
@agronholm can you check, please? |
I said I would check it this weekend which isn't over yet. Patience! I have plenty of other projects to work on too. |
My changes that were supposed to fix the traceback length weren't correct. I've pushed the corrected code to this branch. I'll merge it to master when I have tests for it ready. |
I was able to reproduce the |
That bug seems to be the same as #803. |
Things to check first
I have checked that my issue does not already have a solution in the FAQ
I have searched the existing issues and didn't find my bug already reported there
I have checked that my bug is still present in the latest release
Version
4.0.0a4
What happened?
In random moment scheduler is crashing and throw this error:
Scheduler was tried to remove running job, but was throw error
KeyError: UUID('482d3fd1-1095-4d51-8c24-2e33e1ad7c3c')
How can we reproduce the bug?
I dont know how to reproduce this bug, it's working always with SqlalchemyDatastore
The text was updated successfully, but these errors were encountered: