I am writing a program for parsing a single site. The site itself is parsed using CsQuery. It is necessary at once to process the desired range of pages on the site. The initial and final links for parsing are specified and the program goes through all the pages in a range in several threads and extracts the necessary information in the List, so that after the end it is saved to a file. The required number of threads is started, and they in turn take their number from the counter of the current page and work with it. In threads, a While loop is written, so that they do not close until the last page is spars. After the end of the parsing, all information in the List is saved separately. But the problem is that the parsit will have large ranges of pages over a million, and with a test run on a range of 10,000 pages, the program begins to take up more than 1.5 gigabytes in memory. In a separate program I tried to fill the List with random data, by the type of those that should have been extracted. Added 100,000 lines, and the size of the RAM used by the program did not exceed 100 megabytes. Parsing also works correctly, it does not add any redundant data. I sin on my wrong work with threads, and the fact that the garbage collector does not destroy data from past parsing passes. I tried different ways and did not solve the problem with a memory leak. Help me find a bug, or suggest a more correct method for working with threads. I attach the code.

class Program { static int begin_of_post = 2950774; //начальный индекс постов static int end_of_post = 2951774; //конечный индекс static int current_post; //текущий пост для потоков static List<string> list_posts = new List<string>(); //список хранения данных о постах static void Main(string[] args) { ServicePointManager.DefaultConnectionLimit = 1000000000; // количество одновременных соединений current_post = begin_of_post; Thread my_tr; for (int i = 0; i < 10; i++) //запуск потоков { my_tr = new Thread(parse_site); my_tr.Start(); } Console.ReadLine(); save_to_file(); } static void parse_site() { while (current_post <= end_of_post) { int link_to_post =current_post; //ссылка на пост Interlocked.Increment(ref current_post); //инкремент счётчика CQ cq; try { cq = CQ.CreateFromUrl("http://site.ru/" + link_to_post); // загрузка кода страницы } catch { Console.WriteLine("Error " + link_to_post); continue; } string post_info; ... //сам парсинг сайта ... int current = int.Parse(link_to_post) - begin_of_post; int end = end_of_post - begin_of_post; Console.WriteLine("Обработана ссылка " + current.ToString() + " ИЗ " + end.ToString()); Thread my_tr_save=new Thread(save_post); my_tr_save.Start(post_info); } } static void save_post(object post_info) { ... // Парсинг информации о странице ... lock (list_posts) { list_posts.Add(post_info.ToString()); } } static void save_to_file() { ... //сохранение строк list_posts в файл ... } } 
  • 2
    Take any memory profiler (at least a studio profiler) and compare two snapshots - at the beginning of the launch, and when the program turns off the memory. - PashaPash
  • and what exactly gets in list_posts? all post content? - PashaPash
  • No, a list of up to 100 characters long is in list_posts. - Kipik
  • Most of all it is created on ConcurrentStack + Node <Object>. for 1000 executions about 500 links. And he each object occupies 2 megabytes. But what is it? And where can I see? - Kipik
  • "While cycle is written in threads, so that they do not close" - better release the stream as soon as you do not need it. And better take threads from ThreadPool or use Task. - Stack

3 answers 3

In general, in order to judge a leak, you need to use a profiler, make two out-of-photoshoots (before and after) and see what takes away memory.


What offhand problems are in this code:

  1. Inside each of the threads reading the pages, you continuously create new threads in a loop:

     Thread my_tr_save=new Thread(save_post); my_tr_save.Start(post_info); 

    First, you spawn multiple streams, and they take up memory. Secondly, it is redundant, because you are already inside a separate stream. And there is no point in spreading parsing and saving to different streams for a start.

  2. The current_post variable is accessed by different threads, and in an unsafe way. Theoretically, it may happen that each post is processed several times and you get duplicate pages in your final list, which means extra memory. You need to atomically fulfill the condition current_post <= end_of_post with a subsequent increment and return the current value, and use this value in the body of the loop.

     static bool HasPostsToParse(out current) { lock (lockObject) { // к переменной current_post обращаетесь только в этом методе if (current_post <= end_of_post) { current = ++current_post; return true; } else { current = current_post; return false; } } } static void parse_site() { int current; while (HasPostsToParse(out current)) { // используете локальную переменную current } } 
  • Do not need Volatile.Read - it will not remove the race! You just need to use the return value of Interlocked.Increment. - Pavel Mayorov
  • @PavelMayorov and in while (current_post <= end_of_post) what would you do? Although you are right about the race, in an amicable way, you need to synchronize the code block with this condition and the subsequent increment. - andreycha
  • one
    Yes, your new version looks much better. I would, however, instead of lock, would have managed a simple Interlocked.Increment. Would have increased, then remembered - and in the end already checked. - Pavel Mayorov
  • New streams are created for saving, because in the method of saving it is performed a lot of operations, and leaving it running in the same stream will slow it down a lot and reduce the performance of parsing. That's why I brought it to a separate thread. - Kipik
  • @PavelMayorov check should go to increment. - andreycha

In your case, you have new threads, each of which is allocated a stack of 4 megabytes and none of the threads can finish the work and free the memory, because each of the threads is twisted while (current_post <= end_of_post) right up to the end of the entire work . Therefore, you will have a constant increase in memory due to the stacks right up to the end of the work.

a more correct method with threads is TPL + a correct understanding of IO-bound threads and you can shovel the entire million pages with threads from the pool alone.

the author asked for examples of how best to do in such cases (of course, the author’s parser may not allow this)

  class SomeNetParser { private const int ThreadCount=20; private CountdownEvent _countdownEvent; private SemaphoreSlim _throttler; public void Check(IList<string> urls) { _countdownEvent = new CountdownEvent(urls.Count); _throttler = new SemaphoreSlim(ThreadCount); foreach (var url in urls) { await _throttler.WaitAsync(ct); ProccessUrl(url); } _countdownEvent.Wait(); } private async void ProccessUrl(string url) { try { var page = await new WebClient().DownloadStringTaskAsync(new Uri(url)); ProccessResult(page); } finally { _semaphoreSlim.Release(); _countdownEvent.Signal(); } } private void ProccessResult(string page){/*....*/} } 

you need to not forget the Check method to call not a UI thread. CountdownEvent is needed to wait for the last task after exiting the loop. the disadvantage of this solution is that it holds 1 thread.

CountdownEvent can be discarded and the contents of Check replaced by

 var allTasks = new List<Task>(); foreach (var url in urls) { await _throttler.WaitAsync(ct); allTasks.Add(ProccessUrl(url)); } await Task.WhenAll(allTasks); 

and ProccessUrl should return Task. In this case, allTasks will accumulate a million Task instances and I don’t even know how quickly Task.WhenAll will check them.

There is another option with LINQ, but it is difficult to understand for beginners.

PS: WebClient bad for this. It is written incorrectly and performs part of its work with the thread that caused it and did not fix it. HttpClient is better suited.

  • "allTasks = new List <Task> ();" - it is not necessary to do so, because There is a TaskCreationOptions.AttachedToParent. see UPDATE in my reply - Stack
  • @Stack Not for my example. really external task will wait for the end of the nested. But in my example, this external task is not defined. But clinging to an implicit task (if Task.Run is above the method being called) cannot be due to the fact that after the loop has run, there can still be code and Task.WhenAll (allTasks) guarantees that by the end of the loop, but the unfinished external task all cycle tasks will be completed. - vitidev

For parallel data processing in a given range, you can use Parallel.For. If you take parts of your code, it will be something like this:

 using System.Threading.Tasks; using System.Collections.Concurrent; // ... int begin_of_post = 2950774; //начальный индекс постов int end_of_post = 2951774; //конечный индекс var list_posts = new BlockingCollection<string>(); Parallel.For(begin_of_post, end_of_post, (current_post) => { // ... var cq = CQ.CreateFromUrl("http://site.ru/" + link_to_post); list_posts.Add(link_to_post); // ... код для парсинга и т.д. save(file_name); }); save_to_file(list_posts); 

UPDATE:

If you need to download many pages, parse them and save to files, as well as get a log, you can do something like this:

 public class App { BlockingCollection<string> log; // для синхронизации вывода в log public Run(int start, int end) { log = new BlockingCollection<string>(); Task.Run(() => { foreach(var s in log) { // тут пишем в log.txt } }); Task.Factory.StartNew(() => { // для каждого запроса создаем отдельный Task foreach (var page in Enumerable.Range(start, end)) Task.Factory.StartNew(() => Download(page), TaskCreationOptions.AttachedToParent); }).Wait(); // ждем завершение всех запущенных Task'ов log.CompleteAdding(); // завершим Task логирования } void Download(int page) { // выполняется в отдельном потоке var url = "http://...." + page; log.Add(url); try { var html = RequestPage(url); Task.Factory.StartNew(() => Parse(url, html), TaskCreationOptions.AttachedToParent); } catch(...) { log.Add("fail"); } } void Parse(string url, string html) { // выполняется в отдельном потоке // тут парсим html и сохраняем его в файл } } 
  • Comments are not intended for extended discussion; conversation moved to chat . - Nicolas Chabanovsky