Issue
Edit: The answer to this was a bit complicated. The tl;dr is make sure you do Lazy Loading properly; many of the variables declared in the code below were declared and set globally, but your global variables should be set to None
and only changed in your actual API call!
I'm going bonkers.
Here is my full main.py
. It can be run locally via functions-framework --target=api
or on Google Cloud directly:
import functions_framework
import sqlalchemy
import threading
from google.cloud.sql.connector import Connector, IPTypes
from sqlalchemy.orm import sessionmaker, scoped_session
Base = sqlalchemy.orm.declarative_base()
class TestUsers(Base):
__tablename__ = 'TestUsers'
uuid = sqlalchemy.Column(sqlalchemy.String, primary_key=True)
cloud_sql_connection_name = "myproject-123456:asia-northeast3:tosmedb"
connector = Connector()
def getconn():
connection = connector.connect(
cloud_sql_connection_name,
"pg8000",
user="postgres",
password="redacted",
db="tosme",
ip_type=IPTypes.PUBLIC,
)
return connection
def init_pool():
engine_url = sqlalchemy.engine.url.URL.create(
"postgresql+pg8000",
username="postgres",
password="redacted",
host=cloud_sql_connection_name,
database="tosme"
)
engine = sqlalchemy.create_engine(engine_url, creator=getconn)
# Create tables if they don't exist
Base.metadata.create_all(engine)
return engine
engine = init_pool()
# Prepare a thread-safe Session maker
Session = scoped_session(sessionmaker(bind=engine))
print("Database initialized")
def run_concurrency_test():
def get_user():
with Session() as session:
session.query(TestUsers).first()
print("Simulating concurrent reads...")
threads = []
for i in range(2):
thread = threading.Thread(target=get_user)
threads.append(thread)
thread.start()
# Wait for all threads to complete
for thread in threads:
thread.join()
print(f"Thread {thread.name} completed")
print("Test passed - Threads all completed!\n")
run_concurrency_test()
@functions_framework.http
def api(request):
print("API hit - Calling run_concurrency_test()...")
run_concurrency_test()
return "Success"
requirements.txt
:
functions-framework==3.*
cloud-sql-python-connector[pg8000]==1.5.*
SQLAlchemy==2.*
pg8000==1.*
It's super simple - and it works! As long as you have a PostgreSQL instance, it will create the TestUsers table as needed, query it twice (at the same time via threads!), and every time you curl it, it works as well. Here's some example output:
Database initialized
Simulating concurrent reads...
Thread Thread-4 (get_user) completed
Thread Thread-5 (get_user) completed
Test passed - Threads all completed!
API hit - Calling run_concurrency_test()...
Simulating concurrent reads...
Thread Thread-7 (get_user) completed
Thread Thread-8 (get_user) completed
Test passed - Threads all completed!
However, if I comment out the first call to run_concurrency_test()
(i.e. the one that's not inside the api(request)
), run it and curl, I get this:
Database initialized
API hit - Calling run_concurrency_test()...
Simulating concurrent reads...
Thread Thread-4 (get_user) completed
It gets stuck! Specifically, it gets stuck at session.query(TestUsers).first()
. It didn't get stuck when I ran the concurrency test outside the api()
first. To the best of my knowledge, my code is stateless, and thread safe. So what is going on here that makes it suddenly not work?
Solution
Please see this other SO post for the proper detailed usage of the Cloud SQL Python Connector with Cloud Functions.
The reason for the error here has to do with initializing the Connector
as a global var outside of the Cloud Function request context. Cloud Functions only have access to compute and resources when requests are made, otherwise they scale down. The Connector
has background tasks that run in order to make successful connections to Cloud SQL when the time comes where you try to connect, these background tasks are being throttled and causing your error because you are attempting to initialize it globally when no CPU is allocated to your function.
Cloud Functions recommends lazy initializing global variables for this exact reason and is what the linked post above showcases.
NOTE: Initializing the Connector
inside of getconn
as another answer mentions is not recommended and will introduce more bugs into your code when attempting to scale traffic. It works because it guarantees the Connector
is initialized within the Cloud Function request context but will create a new Connector
on each db connection. The `Connector is meant to be shared across connections to allow for scalable solutions and is thus why having it as a lazy global var is the recommended approach.
Answered By - Jack Wotherspoon
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.