Issue
This may seem strange, but I was curious to know if it was possible for a code block to be executed following an INSERT statement in a postgres database?
Specifically, I'm interested in executing Python code after an INSERT statement has occurred in a pg database.
Solution
There are several ways to approach this.
LISTEN/NOTIFY
The simple way to tackle this is to use postgresql notifications.
You can add after insert/update trigger which will do notification:
CREATE OR REPLACE FUNCTION on_insert() RETURNS trigger AS
$$
BEGIN
execute E'NOTIFY ENTITY_CHANGE, \'' || NEW.id || E'\'';
RETURN NEW;
END
$$
LANGUAGE 'plpgsql' VOLATILE;
create trigger trig_on_insert
after insert on ENTITY
for each row
execute procedure on_insert_to_t();
ENTITY_CHANGE
is identifier of the channel you can take any you like.
And your application should listen to it in separate thread (or process) and do what is needed:
from django.db import connection
curs = connection.cursor()
curs.execute("LISTEN ENTITY_CHANGED;")
while not_finish:
if select.select([connection],[],[],5) == ([],[],[]):
print "Timeout"
else:
connection.poll()
while connection.notifies:
notify = connection.notifies.pop()
entity_id = notify.payload
do_post_save(entity_id)
The only caveat is that notifications are not transactional and can be lost if some catastrophic failure happen. That is in situation when your application get notification but then crashed (or was killed) before it finishes processing of the notification such notification is lost forever.
If you need to guarantee that post save processing is always happen you need to maintain some table of tasks. After insert/update trigger should add task to this table and some python process should poll this table and do required processing. The downside is polling - it will do unnecessary queries when system is not doing save of entity.
You can combine both approaches to get best of all worlds that is use notify to start processing but processor should take tasks from task table which is filled by trigger. During your application startup processing should be run to do unfinished work if any.
Django pgpubsub library implements exactly that approach and provides a rather simple declarative API that allows to execute callbacks on django model changes:
# this defines a postgres channel that is used to send notifications
# and a trigger that does NOTIFY on MyModel change
@dataclass
class MyModelTriggerChannel(TriggerChannel):
model = MyModel
# This defines a callback to be invoked on MyModel chagne
@pgpubsub.post_update_listener(MyModelTriggerChannel)
def on_my_model_update(old: MyModel, new: MyModel):
# use new variable to access updated model data
...
Logical Replication
The better and more reliable approach is to use logical replication.
This option uses transaction log directly and the consumer acknowledges received change notifications so no notification are missed and the delivery can be reliable.
To demonstrate this I'm using here an image preconfigured for the logical replication and with the installed wal2json plugin for WAL decoding:
docker run -d --name "logical" -e POSTGRES_PASSWORD=123 -p 10000:5432 -d debezium/postgres:14
Here is an example of the consumer:
import psycopg2
from psycopg2.errors import UndefinedObject
from psycopg2.extras import LogicalReplicationConnection
my_connection = psycopg2.connect(
"dbname='postgres' host='localhost' port='10000' user='postgres' password='123'",
connection_factory=LogicalReplicationConnection,
)
cur = my_connection.cursor()
try:
cur.drop_replication_slot("wal2json_test_slot")
except UndefinedObject:
pass
cur.create_replication_slot("wal2json_test_slot", output_plugin="wal2json")
cur.start_replication(
slot_name="wal2json_test_slot", options={"pretty-print": 1}, decode=True
)
def consume(msg):
print(msg.payload)
msg.cursor.send_feedback(flush_lsn=msg.data_start)
cur.consume_stream(consume)
Now executing the insert like insert into table1 values (1, 'hello')
produces this:
{
"change": [
{
"kind": "insert",
"schema": "public",
"table": "table1",
"columnnames": ["i", "t"],
"columntypes": ["integer", "text"],
"columnvalues": [1, "hello"]
}
]
}
The downside of this is that you will get all the changes in DB and will need to filter and decode the data (I'm not aware of libraries that make this simple for you as a user).
Answered By - Roman-Stop RU aggression in UA
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.