24. February 2021 12:45
/
Administrator
/
/
Comments (0)
Recently I had to deal with a concurrency issue in a legacy Flask application which did not enforce uniqueness. Multiple API clients trying to create a record in the database with the same value in 'name' column would end up creating duplicate entries. Applying a unique constraint to the name column required cleaning the duplicates first, and cleaning up data was not easy, so I decided to first implement a workaround to prevent more duplicates, and then go about cleaning up the data.
The solution was to use SET TRANSACTION ISOLATION LEVEL SERIALIZABLE statement in Postgres to lock the table and first check to see if the record exists, before inserting a new one.
As you may have encountered, in a Flask-SqlAlchemy application, database session exposed by SqlAlchemy is shared and that blocks you from setting the transaction isolation level. I ended up creating a function (in a mixin class) to do the get_or_create operation in a new database session which is created and destroyed inside the function. When multiple clients try to create the same element in the database, a specific exception occurs which needs to be captured so the operation can be retried.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 | class GetOrCreateMixin( object ):
def get_or_create( self , * * kwargs):
session = db.create_session({})()
if db.engine.name = = 'postgresql' :
session.execute( 'SET TRANSACTION ISOLATION LEVEL SERIALIZABLE;' )
retry_count = 0
model = type ( self )
while True :
try :
item = session.query(model).filter_by( * * kwargs).first()
if not item:
item = self
session.add(item)
session.commit()
session.close()
break
except sqlalchemy.exc.OperationalError:
session.rollback()
time.sleep( 0.1 )
retry_count + = 1
if retry_count > MAX_RETRY:
session.close()
raise
return db.session.query(model).filter_by( * * kwargs).first()
|
Now you can use this function in your data models:
1 2 3 4 5 6 | class Employee(db.Model, NameMixin, GetOrCreateMixin):
department_id = db.Column(db.Integer, db.ForeignKey( "department.id" ),
nullable = False )
department = db.relationship( "Department" ,
backref = db.backref( "employees" , cascade = "delete" ))
|
And the API which creates the employee can leverage the function to make sure no duplicate employee is created:
1 2 3 4 5 6 7 8 9 10 11 12 13 | @employee_api .route( "/" , endpoint = "employees" )
class EmployeesResource(Resource):
@employee_api .expect(parsers.employee)
def post( self ):
args = parsers.employee.parse_args()
employee = models.Employee( * * args)
employee = employee.get_or_create(
name = employee.name,
department_id = employee.department_id)
return schemas.Employee().dump(employee)
|
Currently rated 5.0 by 2 people
- Currently 5.0/5 Stars.
- 1
- 2
- 3
- 4
- 5
Tags :