Issue
I'm new to python and i am trying to do a task that requires to query a mysql table of 50 millions records join with another table to match the data and finally group by to determine the count of record after group. I have try something like multiprocessing.Pool to speed up the process but found out that its not recommended to use multiprocessing for I/O-bound task like querying so i switched to use async
Here is the code using SqlAlchemy-ORM and asyncio library for python:
class attendace(Base):
__tablename__ = 'attendance'
id = Column(BigInteger, primary_key=True)
activity_id = Column(BigInteger)
user_login = Column(VARCHAR)
groupid = Column(Integer)
class group(Base):
__tablename__= 'group'
id = Column(BigInteger, primary_key=True)
term_id = Column(Integer)
is_virtual = Column(SmallInteger)
async def check_attendance(session, page_size, page_number, result_queue):
try:
offset = page_size * (page_number - 1)
result = await session.execute(\
select(func.count(attendace.activity_id), attendace.user_login, group.term_id)\
.join(group, attendace.groupid == group.id)\
.where(group.term_id >= 24)\
.group_by(group.term_id, attendace.user_login)
.offset(offset).limit(page_size))
res = result.all()
# result_queue.put(result)
print(f'Page {page_number} result: {res}')
except Exception as e:
print(f'Error {e}')
return []
async def main():
page_size = 50
total_pages = 1
result_queue = asyncio.Queue()
engine = create_async_engine('mysql+aiomysql://root@localhost/db_test', echo=False, poolclass=QueuePool)
async with engine.begin() as connection:
await connection.run_sync(Base.metadata.create_all)
async with sessionmaker(bind=engine, class_=AsyncSession, expire_on_commit=False)() as session:
tasks = [check_attendance(session, page_size, page_number, result_queue) for page_number in range(1, total_pages + 1)]
await asyncio.gather(*tasks)
for only page_size of 50, it has already taken about ~160s to display the result, which is equal to my whole raw sql (check_attendance is just a part of the query)
i want to ask for solutions for this large database query like this in python, is my current approach is suitable for the task ? what can i do to improve it ?
i also want to ask if there is a way to optimize the page_size and total_pages part for offset and limit in the query ? how can i make it to run through all the query until its done, not being limited by the total_pages that i provided ?
sorry for my bad english, thank you for checking this problem
i have try the multiprocessing.Pool and its still got the bottle-neck of the huge amount of data to query from the table
also, i tried the code that i uploaded and receive a very slow response result with larger page_size and total_pages
Solution
If you cannot change the database schema here is a hack that might work. I think it will still be pretty slow though. It takes about 80 seconds on my laptop to finish.
import os
import asyncio
from itertools import islice
from random import choice
from sqlalchemy import (
Column,
Integer,
String,
BigInteger,
)
from sqlalchemy.sql import (
func,
select,
insert,
)
from sqlalchemy.orm import (
declarative_base,
)
from sqlalchemy.ext.asyncio import create_async_engine
# I don't have python 3.12.
def batched(iterable, n):
# batched('ABCDEFG', 3) --> ABC DEF G
if n < 1:
raise ValueError('n must be at least one')
it = iter(iterable)
while batch := tuple(islice(it, n)):
yield batch
MAX_OVERFLOW = 20
POOL_SIZE=10
def get_engine(env):
return create_async_engine(f"postgresql+asyncpg://{env['DB_USER']}:{env['DB_PASSWORD']}@{env['DB_HOST']}:{env['DB_PORT']}/{env['DB_NAME']}", pool_size=POOL_SIZE, max_overflow=MAX_OVERFLOW)
Base = declarative_base()
class attendace(Base):
__tablename__ = 'attendance'
id = Column(BigInteger, primary_key=True)
activity_id = Column(BigInteger)
user_login = Column(String)
groupid = Column(Integer)
class group(Base):
__tablename__= 'group'
id = Column(BigInteger, primary_key=True)
term_id = Column(Integer)
is_virtual = Column(Integer)
async def check_attendance(engine, partitioner, mod_number, result_queue):
q = select(
func.count(attendace.activity_id), attendace.user_login, group.term_id
).join(
group, attendace.groupid == group.id
).where(
group.term_id >= 24,
# We can split the result sets in this way because
# we group by term_id.
# Ie. the sum of the parts is equal to the whole.
group.term_id % partitioner == mod_number
).group_by(group.term_id, attendace.user_login)
async with engine.connect() as conn:
result = await conn.execute(q)
res = result.all()
# result_queue.put(result)
print(f'Mod number {mod_number} results: {len(res)}')
return res
async def main():
# Must be a prime number, ideally that splits your result set evenly and
# into appropriately sized chunks.
PARTITIONER = 47
result_queue = asyncio.Queue()
engine = get_engine(os.environ)
async with engine.begin() as connection:
await connection.run_sync(Base.metadata.create_all)
async with engine.connect() as conn:
await conn.run_sync(populate)
total_mod_numbers = list(range(PARTITIONER))
# Split up jobs so we don't overload the connection pool.
for index, mod_numbers in enumerate(batched(total_mod_numbers, n=MAX_OVERFLOW)):
tasks = [check_attendance(engine, PARTITIONER, mod_number, result_queue) for mod_number in mod_numbers]
res = await asyncio.gather(*tasks)
print (f'Batch index {index} results: {len(res)}')
# Clean async engine
await engine.dispose()
def populate(conn):
""" Create a bunch of test data if the database is empty. """
res = conn.scalar(select(func.count(group.__table__.c.id)))
if res > 0:
return
rows = []
term_ids = list(range(1, 200))
group_ids = list(range(1, 1000000))
for gid in group_ids:
term_id = choice(term_ids)
rows.append(dict(id=gid, term_id=term_id, is_virtual=0))
conn.execute(insert(group.__table__), rows)
conn.commit()
rows = []
user_logins = [str(login) for login in range(1000000)]
activity_ids = list(range(1, 10000))
for attend_id in range(1, 50000000):
login = choice(user_logins)
gid = choice(group_ids)
act_id = choice(activity_ids)
rows.append(dict(id=attend_id, user_login=login, groupid=gid, activity_id=act_id))
conn.execute(insert(attendace.__table__), rows)
conn.commit()
if __name__ == '__main__':
print (asyncio.run(main()))
Answered By - Ian Wilson
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.