Issue
I am trying to download a SQLite queried table as .csv using Flask
, SQLAlchemy
(not Flask-SQLAlchemy
) and DataTables
(plug-in for jQuery) with the server-side processing option.
Basically, I have a query that connects to a SQLite dataset, imports the data and returns it to a DataTable.
A summary of what is already working:
- Drop down (HTML select element) to chose which table should be displayed.
- Retrieval of data given the selected table, using DataTable's "server-side processing" option, which allows the fetch of large data across multiple partial requests via Ajax request (using
offset
andlimit
). - Filter of case insensitive text in each column, which is passed from the DataTable table to the Flask code (server-side) and retrieved accordingly. User needs to press "Enter" after typing the code to apply the text filter.
- Download of the complete displayed table as .csv.
The problem that I am facing is that once I type in a filter from any column, the downloaded .csv still contains the full table instead of the filtered one. For example, when I search for "2" in Column A
and press Enter, I see the filtered data as expected:
But when I press to download the table, the full table data is downloaded instead of the filtered one. I would like to download only the filtered data. Does anyone know how to fix this issue? Thanks in advance.
Below I provide a full working code with a sample SQLite "test.db" dataset with 2 tables ("table_a" and "table_b") to reproduce the code. I am using the latest versions of the packages (Flask==3.0.0
, pandas==2.1.3
and SQLAlchemy==2.0.22
).
/templates/index.html
<!DOCTYPE html>
<html>
<head>
<title>Flask Application</title>
<link rel="stylesheet" href="https://cdn.datatables.net/1.13.7/css/jquery.dataTables.css"/>
<link rel="stylesheet" href="https://cdn.datatables.net/1.13.7/css/dataTables.bootstrap5.min.css"/>
<style>
.text-wrap{
white-space:normal;
}
.width-200{
width:200px;
}
</style>
</head>
<body>
<h1>Table Viewer</h1>
<form id="selector" action="{{ url_for('.database_download') }}" method="POST" enctype="multipart/form-data">
Select table:
<select id="table-selector" name="table-selector">
<!-- <option disabled selected value>Select a Dataset</option> -->
<option value="table_a" selected>Table A</option>
<option value="table_b">Table B</option>
</select>
</form>
<br>
<p>
<input type="checkbox" id="order-columns" value="True" checked> Order columns by date</p>
<p><input type="button" id="button-database-selector" value="Select Table" /></p>
<br>
<br>
<br>
<div id="content-a" class="table table-striped table-hover" style="display:none; width:100%; table-layout:fixed; overflow-x:auto;">
<table id="table-a">
<thead>
<tr>
<th></th>
<th></th>
</tr>
<tr>
<th></th>
<th></th>
</tr>
</thead>
</table>
</div>
<div id="content-b" class="table table-striped table-hover" style="display:none; width:100%; table-layout:fixed; overflow-x:auto;">
<table id="table-b">
<thead>
<tr>
<th></th>
<th></th>
</tr>
<tr>
<th></th>
<th></th>
</tr>
</thead>
</table>
</div>
<div id="download" style="display:none">
<p>Download table: <input type="submit" value="Download" form="selector"></button></p>
</div>
<script type="text/javascript" src="https://code.jquery.com/jquery-3.7.0.js"></script>
<script type="text/javascript" src="https://cdn.datatables.net/1.13.7/js/jquery.dataTables.min.js"></script>
<script type="text/javascript" src="https://cdn.datatables.net/1.13.7/js/dataTables.bootstrap5.min.js"></script>
<script type="text/javascript">
$(document).ready( function() {
$('input[id="button-database-selector"]').click( function() {
if ($('input[id="order-columns"]').prop('checked')) {
var order_columns = 'True';
} else {
var order_columns = '';
}
var database_selected = $('select[id="table-selector"]').val();
// Display download button
$('div[id="download"]').css('display', 'block');
if (database_selected == 'table_a') {
// Hide all tables
$('div[id^="content-"]').hide();
// Display table
$('div[id="content-a"]').css('display', 'block');
// Setup - add a text input to each header cell
$('#table-a thead tr:eq(1) th').each( function () {
var title = $('#table-a thead tr:eq(0) th').eq( $(this).index() ).text();
$(this).html( '<input type="text" placeholder="Search '+title+'" />' );
} );
var table = $('table[id="table-a"]').DataTable({
'pageLength': 50,
'ordering': false,
// 'scrollX': true,
// 'sScrollX': '100%',
'searching': true,
'dom': 'lrtip',
'orderCellsTop': true,
'serverSide': true,
'bDestroy': true,
'ajax': {
'url': "{{ url_for('.table_a') }}",
'type': 'GET',
'data': {
'orderColumns': order_columns,
},
},
'columns': [
{ 'title': 'Column Date', 'data': 'column_date' },
{ 'title': 'Column A', 'data': 'column_a' },
],
'columnDefs': [
{
'render': function (data, type, full, meta) {
return "<div class='text-wrap width-200'>" + data + "</div>";
},
'targets': '_all',
}
],
});
// Apply the search
table.columns().every( function (index) {
$('#table-a thead tr:eq(1) th:eq(' + index + ') input').on('keydown', function (event) {
if (event.keyCode == 13) {
table.column($(this).parent().index() + ':visible')
.search(this.value)
.draw();
}
});
});
} else if (database_selected === 'table_b') {
// Hide all tables
$('div[id^="content-"]').hide();
// Display table
$('div[id="content-b"]').css('display', 'block');
// Setup - add a text input to each header cell
$('#table-b thead tr:eq(1) th').each( function () {
var title = $('#table-b thead tr:eq(0) th').eq( $(this).index() ).text();
$(this).html( '<input type="text" placeholder="Search '+title+'" />' );
} );
var table = $('table[id="table-b"]').DataTable({
'pageLength': 50,
'ordering': false,
// 'scrollX': true,
// 'sScrollX': '100%',
'searching': true,
'dom': 'lrtip',
'orderCellsTop': true,
'serverSide': true,
'bDestroy': true,
'ajax': {
'url': "{{ url_for('.table_b') }}",
'type': 'GET',
'data': {
'orderColumns': order_columns,
},
},
'columns': [
{ 'title': 'Column Date', 'data': 'column_date' },
{ 'title': 'Column B', 'data': 'column_b' },
],
'columnDefs': [
{
'render': function (data, type, full, meta) {
return "<div class='text-wrap width-200'>" + data + "</div>";
},
'targets': '_all',
}
],
});
// Apply the search
table.columns().every( function (index) {
$('#table-b thead tr:eq(1) th:eq(' + index + ') input').on('keydown', function (event) {
if (event.keyCode == 13) {
table.column($(this).parent().index() + ':visible')
.search(this.value)
.draw();
}
});
});
}
});
});
</script>
</body>
</html>
/app.py
# Import packages
from datetime import date
from io import BytesIO
from fastapi.encoders import jsonable_encoder
from flask import Flask, render_template, request, jsonify, send_file
import pandas as pd
from sqlalchemy import create_engine, Column, DateTime, Integer, String
from sqlalchemy.orm import declarative_base, sessionmaker
app = Flask(__name__)
# Create a SQLite database engine
engine = create_engine(
url='sqlite:///test.db',
echo=False,
)
# Create a session
Session = sessionmaker(bind=engine)
session = Session()
# Create a base class for declarative models
Base = declarative_base()
# Define "table_a" model
class table_a(Base):
__tablename__ = 'table_a'
id = Column(Integer, primary_key=True)
column_date = Column(DateTime)
column_a = Column(String)
# Define "table_b" model
class table_b(Base):
__tablename__ = 'table_b'
id = Column(Integer, primary_key=True)
column_date = Column(DateTime)
column_b = Column(String)
with app.app_context():
Base.metadata.drop_all(bind=engine)
Base.metadata.create_all(bind=engine)
# Add sample data to the "table_a" table
session.add_all(
[
table_a(column_date=date(2023, 11, 10), column_a='1'),
table_a(column_date=date(2023, 10, 31), column_a='1'),
table_a(column_date=date(2023, 10, 31), column_a='2'),
],
)
# Add sample data to the "table_b" table
session.add_all(
[
table_b(column_date=date(2023, 11, 1), column_b='1'),
table_b(column_date=date(2023, 11, 11), column_b='1'),
],
)
session.commit()
def read_table(*, table_name):
# Read a SQLite database engine
engine = create_engine(
url='sqlite:///test.db',
echo=False,
)
# Create a session
Session = sessionmaker(bind=engine)
session = Session()
# Create a base class for declarative models
Base = declarative_base()
# Define "table_a" model
class table_a(Base):
__tablename__ = 'table_a'
id = Column(Integer, primary_key=True)
column_date = Column(DateTime)
column_a = Column(String)
# Define "table_b" model
class table_b(Base):
__tablename__ = 'table_b'
id = Column(Integer, primary_key=True)
column_date = Column(DateTime)
column_b = Column(String)
if table_name == 'table_a':
sql_query = session.query(table_a)
# Filters
filter_column_date = request.args.get(
key='columns[0][search][value]',
default='',
type=str,
)
filter_column_a = request.args.get(
key='columns[1][search][value]',
default='',
type=str,
)
# Other parameters
order_columns = request.args.get(key='orderColumns', default='', type=bool)
if filter_column_date != '':
sql_query = sql_query.filter(
table_a.column_date.ilike(
f'%{filter_column_date}%',
),
)
if filter_column_a != '':
sql_query = sql_query.filter(
table_a.column_a.ilike(
f'%{filter_column_a}%',
),
)
if order_columns is True:
sql_query = sql_query.order_by(table_a.column_date.asc())
if table_name == 'table_b':
sql_query = session.query(table_b)
# Filters
filter_column_date = request.args.get(
key='columns[0][search][value]',
default='',
type=str,
)
filter_column_b = request.args.get(
key='columns[1][search][value]',
default='',
type=str,
)
# Other parameters
order_columns = request.args.get(key='orderColumns', default='', type=bool)
if filter_column_date != '':
sql_query = sql_query.filter(
table_b.column_date.ilike(
f'%{filter_column_date}%',
),
)
if filter_column_b != '':
sql_query = sql_query.filter(
table_b.column_b.ilike(
f'%{filter_column_b}%',
),
)
if order_columns is True:
sql_query = sql_query.order_by(table_b.column_date.asc())
return sql_query
@app.route('/')
def home():
return render_template(template_name_or_list='index.html')
@app.route(rule='/table-a', methods=['GET'])
def table_a():
# Get the pagination parameters from the request
start = request.args.get(key='start', default=0, type=int)
length = request.args.get(key='length', default=10, type=int)
sql_query = read_table(table_name='table_a')
response = {
'draw': request.args.get(key='draw', default=1, type=int),
'recordsTotal': sql_query.count(),
'recordsFiltered': sql_query.count(),
'data': jsonable_encoder(sql_query.offset(start).limit(length).all()),
}
return jsonify(response)
@app.route(rule='/table-b', methods=['GET'])
def table_b():
# Get the pagination parameters from the request
start = request.args.get(key='start', default=0, type=int)
length = request.args.get(key='length', default=10, type=int)
sql_query = read_table(table_name='table_b')
response = {
'draw': request.args.get(key='draw', default=1, type=int),
'recordsTotal': sql_query.count(),
'recordsFiltered': sql_query.count(),
'data': jsonable_encoder(sql_query.offset(start).limit(length).all()),
}
return jsonify(response)
@app.route(rule='/database-download', methods=['GET', 'POST'])
def database_download():
if request.method == 'POST':
table = request.form.get('table-selector')
sql_query = read_table(table_name=f'{table}')
# Create a binary data memory file
buffer = BytesIO()
df = pd.read_sql(
sql=sql_query.statement,
con=create_engine(
url='sqlite:///test.db',
echo=False,
),
index_col=None,
coerce_float=True,
params=None,
parse_dates=None,
chunksize=None,
)
# DataFrame to buffer
df.to_csv(
path_or_buf=buffer,
sep=',',
na_rep='',
header=True,
index=False,
index_label=None,
encoding='utf-8-sig',
)
# Change the stream position to the start of the stream
buffer.seek(0)
return send_file(
path_or_file=buffer,
download_name=f'{table}.csv',
as_attachment=True,
)
if __name__ == '__main__':
app.run(host='localhost', port=5011, debug=True)
Solution
I found the solution: using Flask's flask.session
, I stored the request arguments (column filters) in the current session and retrieved them as a werkzeug.ImmutableMultiDict
object to download the .csv with the applied filters once the download is requested. The final code can be found below:
/templates/index.html
<!DOCTYPE html>
<html>
<head>
<title>Flask Application</title>
<link rel="stylesheet" href="https://cdn.datatables.net/1.13.7/css/jquery.dataTables.css"/>
<link rel="stylesheet" href="https://cdn.datatables.net/1.13.7/css/dataTables.bootstrap5.min.css"/>
<style>
.text-wrap{
white-space:normal;
}
.width-200{
width:200px;
}
</style>
</head>
<body>
<h1>Table Viewer</h1>
<form id="selector" action="{{ url_for('.database_download') }}" method="POST" enctype="multipart/form-data">
Select table:
<select id="table-selector" name="table-selector">
<!-- <option disabled selected value>Select a Dataset</option> -->
<option value="table_a" selected>Table A</option>
<option value="table_b">Table B</option>
</select>
</form>
<br>
<p>
<input type="checkbox" id="order-columns" value="True" checked> Order columns by date</p>
<p><input type="button" id="button-database-selector" value="Select Table" /></p>
<br>
<br>
<br>
<div id="content-a" class="table table-striped table-hover" style="display:none; width:100%; table-layout:fixed; overflow-x:auto;">
<table id="table-a">
<thead>
<tr>
<th></th>
<th></th>
</tr>
<tr>
<th></th>
<th></th>
</tr>
</thead>
</table>
</div>
<div id="content-b" class="table table-striped table-hover" style="display:none; width:100%; table-layout:fixed; overflow-x:auto;">
<table id="table-b">
<thead>
<tr>
<th></th>
<th></th>
</tr>
<tr>
<th></th>
<th></th>
</tr>
</thead>
</table>
</div>
<div id="download" style="display:none">
<p>Download table: <input type="submit" value="Download" form="selector"></button></p>
</div>
<script type="text/javascript" src="https://code.jquery.com/jquery-3.7.0.js"></script>
<script type="text/javascript" src="https://cdn.datatables.net/1.13.7/js/jquery.dataTables.min.js"></script>
<script type="text/javascript" src="https://cdn.datatables.net/1.13.7/js/dataTables.bootstrap5.min.js"></script>
<script type="text/javascript">
$(document).ready( function() {
$('input[id="button-database-selector"]').click( function() {
if ($('input[id="order-columns"]').prop('checked')) {
var order_columns = 'True';
} else {
var order_columns = '';
}
var database_selected = $('select[id="table-selector"]').val();
// Display download button
$('div[id="download"]').css('display', 'block');
if (database_selected == 'table_a') {
// Hide all tables
$('div[id^="content-"]').hide();
// Display table
$('div[id="content-a"]').css('display', 'block');
// Setup - add a text input to each header cell
$('#table-a thead tr:eq(1) th').each( function () {
var title = $('#table-a thead tr:eq(0) th').eq( $(this).index() ).text();
$(this).html( '<input type="text" placeholder="Search '+title+'" />' );
} );
var table = $('table[id="table-a"]').DataTable({
'pageLength': 50,
'ordering': false,
// 'scrollX': true,
// 'sScrollX': '100%',
'searching': true,
'dom': 'lrtip',
'orderCellsTop': true,
'serverSide': true,
'bDestroy': true,
'ajax': {
'url': "{{ url_for('.table_a') }}",
'type': 'GET',
'data': {
'orderColumns': order_columns,
},
},
'columns': [
{ 'title': 'Column Date', 'data': 'column_date' },
{ 'title': 'Column A', 'data': 'column_a' },
],
'columnDefs': [
{
'render': function (data, type, full, meta) {
return "<div class='text-wrap width-200'>" + data + "</div>";
},
'defaultContent': '',
'targets': '_all',
}
],
});
// Apply the search
table.columns().every( function (index) {
$('#table-a thead tr:eq(1) th:eq(' + index + ') input').on('keydown', function (event) {
if (event.keyCode == 13) {
table.column($(this).parent().index() + ':visible')
.search(this.value)
.draw();
}
});
});
} else if (database_selected === 'table_b') {
// Hide all tables
$('div[id^="content-"]').hide();
// Display table
$('div[id="content-b"]').css('display', 'block');
// Setup - add a text input to each header cell
$('#table-b thead tr:eq(1) th').each( function () {
var title = $('#table-b thead tr:eq(0) th').eq( $(this).index() ).text();
$(this).html( '<input type="text" placeholder="Search '+title+'" />' );
} );
var table = $('table[id="table-b"]').DataTable({
'pageLength': 50,
'ordering': false,
// 'scrollX': true,
// 'sScrollX': '100%',
'searching': true,
'dom': 'lrtip',
'orderCellsTop': true,
'serverSide': true,
'bDestroy': true,
'ajax': {
'url': "{{ url_for('.table_b') }}",
'type': 'GET',
'data': {
'orderColumns': order_columns,
},
},
'columns': [
{ 'title': 'Column Date', 'data': 'column_date' },
{ 'title': 'Column B', 'data': 'column_b' },
],
'columnDefs': [
{
'render': function (data, type, full, meta) {
return "<div class='text-wrap width-200'>" + data + "</div>";
},
'defaultContent': '',
'targets': '_all',
}
],
});
// Apply the search
table.columns().every( function (index) {
$('#table-b thead tr:eq(1) th:eq(' + index + ') input').on('keydown', function (event) {
if (event.keyCode == 13) {
table.column($(this).parent().index() + ':visible')
.search(this.value)
.draw();
}
});
});
}
});
});
</script>
</body>
</html>
/app.py
# Import packages
from datetime import date
from io import BytesIO
import secrets
from fastapi.encoders import jsonable_encoder
from flask import Flask, render_template, request, jsonify, send_file, session
import pandas as pd
from sqlalchemy import create_engine, Column, DateTime, Integer, String
from sqlalchemy.orm import declarative_base, sessionmaker
from werkzeug.datastructures import ImmutableMultiDict
app = Flask(__name__)
# Create a SQLite database engine
engine = create_engine(
url='sqlite:///test.db',
echo=False,
)
# Create a session
Session = sessionmaker(bind=engine)
session_db = Session()
# Create a base class for declarative models
Base = declarative_base()
# Define "table_a" model
class table_a(Base):
__tablename__ = 'table_a'
id = Column(Integer, primary_key=True)
column_date = Column(DateTime)
column_a = Column(String)
# Define "table_b" model
class table_b(Base):
__tablename__ = 'table_b'
id = Column(Integer, primary_key=True)
column_date = Column(DateTime)
column_b = Column(String)
with app.app_context():
Base.metadata.drop_all(bind=engine)
Base.metadata.create_all(bind=engine)
# Add sample data to the "table_a" table
session_db.add_all(
[
table_a(column_date=date(2023, 11, 10), column_a='1'),
table_a(column_date=date(2023, 10, 31), column_a='1'),
table_a(column_date=date(2023, 10, 31), column_a='2'),
],
)
# Add sample data to the "table_b" table
session_db.add_all(
[
table_b(column_date=date(2023, 11, 1), column_b='1'),
table_b(column_date=date(2023, 11, 11), column_b='1'),
],
)
session_db.commit()
def read_table(*, table_name, request_dict):
# Read a SQLite database engine
engine = create_engine(
url='sqlite:///test.db',
echo=False,
)
# Create a session_db
Session = sessionmaker(bind=engine)
session_db = Session()
# Create a base class for declarative models
Base = declarative_base()
# Define "table_a" model
class table_a(Base):
__tablename__ = 'table_a'
id = Column(Integer, primary_key=True)
column_date = Column(DateTime)
column_a = Column(String)
# Define "table_b" model
class table_b(Base):
__tablename__ = 'table_b'
id = Column(Integer, primary_key=True)
column_date = Column(DateTime)
column_b = Column(String)
if table_name == 'table_a':
sql_query = session_db.query(table_a)
# Filters
filter_column_date = request_dict.get(
key='columns[0][search][value]',
default='',
type=str,
)
filter_column_a = request_dict.get(
key='columns[1][search][value]',
default='',
type=str,
)
# Other parameters
order_columns = request_dict.get(key='orderColumns', default='', type=bool)
if filter_column_date != '':
sql_query = sql_query.filter(
table_a.column_date.ilike(
f'%{filter_column_date}%',
),
)
if filter_column_a != '':
sql_query = sql_query.filter(
table_a.column_a.ilike(
f'%{filter_column_a}%',
),
)
if order_columns is True:
sql_query = sql_query.order_by(table_a.column_date.asc())
if table_name == 'table_b':
sql_query = session_db.query(table_b)
# Filters
filter_column_date = request_dict.get(
key='columns[0][search][value]',
default='',
type=str,
)
filter_column_b = request_dict.get(
key='columns[1][search][value]',
default='',
type=str,
)
# Other parameters
order_columns = request_dict.get(key='orderColumns', default='', type=bool)
if filter_column_date != '':
sql_query = sql_query.filter(
table_b.column_date.ilike(
f'%{filter_column_date}%',
),
)
if filter_column_b != '':
sql_query = sql_query.filter(
table_b.column_b.ilike(
f'%{filter_column_b}%',
),
)
if order_columns is True:
sql_query = sql_query.order_by(table_b.column_date.asc())
return sql_query
@app.route('/')
def home():
return render_template(template_name_or_list='index.html')
@app.route(rule='/table-a', methods=['GET'])
def table_a():
# Get request arguments from the request
request_dict = request.args
# Store request arguments in current session
session['REQUEST_DICT'] = request_dict
# Get the pagination arguments from the request
start = request_dict.get(key='start', default=0, type=int)
length = request_dict.get(key='length', default=10, type=int)
sql_query = read_table(table_name='table_a', request_dict=request_dict)
response = {
'draw': request_dict.get(key='draw', default=1, type=int),
'recordsTotal': sql_query.count(),
'recordsFiltered': sql_query.count(),
'data': jsonable_encoder(obj=sql_query.offset(start).limit(length).all(), exclude_none=False, sqlalchemy_safe=True),
}
return jsonify(response)
@app.route(rule='/table-b', methods=['GET'])
def table_b():
# Get request arguments from the request
request_dict = request.args
# Store request arguments in current session
session['REQUEST_DICT'] = request_dict
# Get the pagination arguments from the request
start = request_dict.get(key='start', default=0, type=int)
length = request_dict.get(key='length', default=10, type=int)
sql_query = read_table(table_name='table_b', request_dict=request_dict)
response = {
'draw': request_dict.get(key='draw', default=1, type=int),
'recordsTotal': sql_query.count(),
'recordsFiltered': sql_query.count(),
'data': jsonable_encoder(obj=sql_query.offset(start).limit(length).all(), exclude_none=False, sqlalchemy_safe=True),
}
return jsonify(response)
@app.route(rule='/database-download', methods=['GET', 'POST'])
def database_download():
if request.method == 'POST':
table = request.form.get('table-selector')
request_dict = ImmutableMultiDict(session['REQUEST_DICT'])
sql_query = read_table(table_name=f'{table}', request_dict=request_dict)
# Create a binary data memory file
buffer = BytesIO()
df = pd.read_sql(
sql=sql_query.statement,
con=create_engine(
url='sqlite:///test.db',
echo=False,
),
index_col=None,
coerce_float=True,
params=None,
parse_dates=None,
chunksize=None,
)
# DataFrame to buffer
df.to_csv(
path_or_buf=buffer,
sep=',',
na_rep='',
header=True,
index=False,
index_label=None,
encoding='utf-8-sig',
)
# Change the stream position to the start of the stream
buffer.seek(0)
return send_file(
path_or_file=buffer,
download_name=f'{table}.csv',
as_attachment=True,
)
if __name__ == '__main__':
app.secret_key = secrets.token_urlsafe(nbytes=16)
app.run(host='localhost', port=5011, debug=True)
Answered By - roboes
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.