How to work correctly with a large number of records with SQLAlchemy?

I have two signs. In the first 5 million records of the form: question_id , view_count , counted . The second table contains the sum of view_count for each unique question_id . If we have taken into account the record from the first table to the second, counted set to true.

Now it looks like this:

 def update_most_viewed(): query = QuestionViewHistory.query.filter_by(counted=False).distinct() question_count = query.count() frame_size = 1000 counter = 0 while counter <= question_count: all_questions = query.offset(counter*frame_size).limit(frame_size).all() counter = counter + frame_size for question in all_questions: most_viewed_question = MostViewedQuestion.query.filter_by(question_id=question.question_id).first() if most_viewed_question is None: most_viewed_question = MostViewedQuestion(question.question_id, question.view_count) db.session.add(most_viewed_question) else: most_viewed_question.view_count += question.view_count question.counted = True db.session.commit() 

I cause function from the console. Initialization:

 app = Flask(__name__) db = SQLAlchemy(app) 

The problem is that with each pass, the time increases exponentially: after the fifth pass, everything hangs. If you run the program again, everything repeats one to one.

As far as I understand, the problem is that with every commit call, SQLAlchemy updates all attributes of all objects in the session, but unfortunately I did not find a way to fix it.

Update

Classes of models that appear in the query.

 class MostViewedQuestion(db.Model): __tablename__ = 'most_viewed_question' id = db.Column(db.Integer, primary_key=True) question_id = db.Column(db.Integer) view_count = db.Column(db.Integer) is_associated = db.Column(db.Boolean) can_be_associated = db.Column(db.Boolean) title = db.Column(db.String(500)) body = db.Column(db.String(30000)) tags = db.Column(db.String(500)) last_update_date = db.Column(db.DateTime) def __init__(self, question_id, view_count, is_associated=False): self.question_id = question_id self.view_count = view_count self.is_associated = is_associated self.can_be_associated = True self.last_update_date = datetime.datetime.now() def __repr__(self): return '<MostViewedQuestion %s>' % str(self.id) class QuestionViewHistory(db.Model): __tablename__ = 'question_view_history' id = db.Column(db.Integer, primary_key=True) question_id = db.Column(db.Integer) view_count = db.Column(db.Integer) view_date = db.Column(db.DateTime) counted = db.Column(db.Boolean) def __init__(self, question_id, view_count, view_date): self.question_id = question_id self.view_count = view_count self.view_date = view_date self.counted = False def __repr__(self): return '<QuestionViewHistory %s>' % str(self.id) 

The code for the entire project is available on GitHub , all models are in the models.py file, the update_most_viewed function in the database.py file. In the folder cvs_data_ru data for tests.

  • For bulk inserts / updates, there is a set of bulk_... methods. For example, bulk_save_objects . With this method, saving will look something like this: db.session.bulk_save_objects([most_viewed_q1, most_viewed_q2, most_viewed_q3 ... most_viewed_qn]) + commit . - m9_psy
  • @ m9_psy Unfortunately, it does not help, I tried. Moreover, it seems that the problem is not in the insert, but in the sample. I preliminarily fill the first table with 5 million records in approximately the same way, but without read operations from the database (I collect a session of 100K records and make a commit ). Everything passes very quickly. - Nicolas Chabanovsky
  • @ m9_psy Apparently, I found a solution: I create two sessions, one for writing, the other for reading (although this is apparently not important), then I read not the objects from the first tablet, but only id , question_id , view_count . In the second, I write down the change in both tablets and send them to the database. That is, there is nothing to track when sending ORM data, in fact. Chasing tests yet. If all is well, I will post the answer. - Nicolas Chabanovsky
  • one
    Still models and any code would help to fill tables in a database with random data. The minimum example is shorter. And then there may be many hypotheses. Here, for example, another hypothesis is to use the update + insert methods, i.e. All the hard work is transferred to the base and thereby remove the cycles in the Python code, but since I am too lazy to invent models and fixtures myself, then the hypothesis cannot be verified - do not turn the question into a conjecture fair? - m9_psy
  • Absolutely agree with @ m9_psy. It is better to do this on the side of the RDBMS instead of cycles ... - MaxU

2 answers 2

It is worth starting with how to do it. For example, it is not necessary to iterate over objects in the database one by one:

 for question in all_questions: most_viewed_question = MostViewedQuestion.query.filter_by(question_id=question.question_id).first() 

this cycle is no, no no. Such behavior should be avoided at all costs - MUCH better to request a million lines at a time than a million times one line at a time. If you need to get all the objects MostViewedQuestion , then it is better to do it with one request:

 most_viewed_questions = MostViewedQuestion.query.filter_by(question_id.in_=questions) 

In this case, there is no need for an external while , because as before, it is better to request once a million than a thousand times a thousand. After such a query, the database will return the most_viewed_questions for which there is a corresponding entry. The question is: what to do with those who have no such record? Such queries are executed in the database very often and they are often called UPSERT ( UPDATE + INSERT ) - you need to update an entry at the same time, and if not, then create it. All that is needed is to execute this one upsert means of sqlalchemy . The request will consist of two subqueries - one will update the existing records ( update ), the other will create new ones ( insert ).

UPDATE is generally pretty straightforward:

 from sqlalchemy import not_, select, exists update_query = MostViewedQuestion.__table__.update().values( view_count=MostViewedQuestion.view_count + QuestionViewHistory.view_count ).where(and_( MostViewedQuestion.question_id == QuestionViewHistory.question_id, QuestionViewHistory.counted == True )) 

It generates the following SQL:

 UPDATE most_viewed_question SET view_count=(most_viewed_question.view_count + question_view_history.view_count) FROM question_view_history WHERE most_viewed_question.question_id = question_view_history.question_id AND question_view_history.counted = true 

I used the MostViewedQuestion.__table__ , because my models inherit from declarative_base() , and the update() , delete() , insert() methods have a Table class ( Base doesn't have them). For declarative_base the table itself is in the __table__ field.

INSERT bit more tangled, but the very pulp is from_select() , which generates INSERT ... FROM SELECT :

 insert_query = MostViewedQuestion.__table__.insert().\ from_select([MostViewedQuestion.question_id, MostViewedQuestion.view_count], select([QuestionViewHistory.question_id, QuestionViewHistory.view_count]). where(and_(not_(exists([MostViewedQuestion.question_id]).where(MostViewedQuestion.question_id == QuestionViewHistory.question_id) ), # WHERE ... AND ... QuestionViewHistory.counted == True)) ) SESSION.execute(update_query) SESSION.execute(insert_query) SESSION.commit() 

SQL:

 INSERT INTO most_viewed_question (question_id, view_count) SELECT question_view_history.question_id, question_view_history.view_count FROM question_view_history WHERE NOT (EXISTS ( SELECT most_viewed_question.question_id FROM most_viewed_question WHERE most_viewed_question.question_id = question_view_history.question_id)) AND question_view_history.counted = true 

I would not say that a given query is a sample of speed, but the most important thing in these queries is that the database works. The Python code at this time just waits for a response from the database and we don’t have to think about optimizing the Python code at all. No need to puzzle over the SQLAlchemy device. But it is worth thinking about optimizing SQL, but this is somewhat easier, because UPSERT is a typical operation and a lot of things are written about it. But this is not a reason to relax, because when updating / inserting a large number of records in the database there are some nuances (for example, table bloating or indexes / triggers that slow down the process and turn them off before mass insertion).

  • I get the general idea, thank you so much! I will understand, but so far I will cut down the data and leave the previous version. The example you cited a little does not work correctly in my case because: 1) the insert does not process the same entries in the first table and inserts everything into the second, 2) as I understand it, the update request only summarizes the entries in the table, not for everyone, 3) since update and insertion do not happen at the same time it is impossible to track what is taken into account (and set counted=True and what is not), and in the case when it is not necessary to update on the first table, the request will not work. - Nicolas Chabanovsky
  • As I understand it, people write extensions on the principle of ON DUPLICATE KEY UPDATE . But, again, we need to figure out how to update the first table when inserting it into the second. - Nicolas Chabanovsky
  • @NicolasChabanovsky, in your example, all the records from QuestionViewHistory are updated - without any conditions. Well, the queries are reference simple, yes. If requests become more complicated and conditions appear - there is no problem either - you can first request the necessary id to change / insert - and then change them. PostgreSQL even has a shortened RETURNING statement that returns all inserted or modified columns — a handy thing. Also beware of textual modifications such as ON DUPLICATE. In some bases it is - in some not. - m9_psy
  • When sampling QuestionViewHistory I filter by counted=False - QuestionViewHistory.query.filter_by(counted=False) . We need to explore how to improve the work. - Nicolas Chabanovsky
  • @NicolasChabanovsky, still, by the way, if the raw insert (), update () does not seem appropriate, the only way out is to add or change models from another thread. In your code, the python is mostly idle - waiting for a database response, waiting for data transfer (also not free), so you can wait in a parallel stream. Also in your app.config you can try to put SQLALCHEMY_TRACK_MODIFICATIONS=False - it also seems like it should help a little. But the basic rule is still not to request objects one by one or one thousand. - m9_psy

What happened as a result:

 def update_most_viewed(): reader_session = db_session() question_count = reader_session.query(func.count(QuestionViewHistory.id)).filter_by(counted=False).scalar() query = reader_session.query(QuestionViewHistory.id, QuestionViewHistory.question_id, QuestionViewHistory.view_count).filter_by(counted=False) frame_size = 1000 progress_index = 0 counter = 0 print "Questions to update: %s, frame size: %s" % (question_count, frame_size) while counter <= question_count: all_questions = query.offset(0).limit(frame_size).all() counter = counter + frame_size wiriter_session = db_session() for question in all_questions: record_id, question_id, view_count = question most_viewed_question = wiriter_session.query(MostViewedQuestion).filter_by(question_id=question_id).first() if most_viewed_question is None: most_viewed_question = MostViewedQuestion(question_id, view_count) wiriter_session.add(most_viewed_question) else: most_viewed_question.view_count += view_count qh = wiriter_session.query(QuestionViewHistory).filter_by(id=record_id).first() qh.counted = True wiriter_session.add(qh) print_progress_bar(progress_index, question_count, prefix = 'Progress:', suffix = 'Complete') progress_index +=1 wiriter_session.commit() wiriter_session.close() print "All questions were counted" 

What is the difference from the original version

If you believe the documentation SQLAlchemy, ORM tracks all objects in the session, in order to ensure their relevance to the data in the database. This leads to the fact that after each commit call, the ORM marks the session objects as requiring updates, which leads to a heavy load on the database. As the number of objects in a session grows, so does the time for updating them. The solution is to keep as few objects as possible in the session.

  1. I added two sessions: reader_session and wiriter_session , which, judging by the answer , are in fact the same session.

  2. Further (the main trick) when accessing the database, I request not the entire object, but only the necessary information ( id , question_id , view_count ), I count the number of objects through the func.count(QuestionViewHistory.id) function func.count(QuestionViewHistory.id) . As a result, as I understand it, I have no objects in the session to track.

  3. I add all write operations to wiriter_session .

Now the speed of the function does not depend on the amount of data (on the number of the pass), yet it remains low. I will think about improving the algorithm.

P.S. As I understand it, the source code made a mistake in working with the “window”: in the loop we change the table (update counted ), and then we shift the window, by which we skip part of the records. As I understand it, you have to start from zero position all the time. I.e:

  all_questions = query.offset(0).limit(frame_size).all()