There is a server that accepts connections from terminals, and throws them into a separate stream. Before creating a stream, I check the global container for connections to this terminal. At the end of the stream - from the container this entry is deleted.
Now actually about the error: when I connect for the first time - there is no problem, I can exchange packets until blue in the face. If I connect a second time on behalf of this terminal, I get an error.
At the same time, I can simultaneously connect on behalf of the twenty terminals in parallel - everything is ok. But if at least one of them I turn off (the shutdown happens normally, cleaning the container happens with the result of 1 - the key was unique).
Listings:
1) The function of receiving the package, in fact in which I am painted.
/* * @is_terminal_commands true, if we expecting terminal's commands, false - if platforms. * @ ret 0 if real package length don't matches with length from package * -1 if socket error * -2 if got unexpected command */ int getPackage(ClockConnection *connection, package *ret, bool is_terminal_commands = false){ memset(ret->CS,'\0',2); memset(ret->imay,'\0',10); memset(ret->len,'0',5);// inserting zeros. memset(ret->command,'\0',10); ret->have_args = false; memset(buffer, '\0', size); int recv_bytes = 0; int multiplier = 0; struct timeval t; // устанавливаем время ожидания в 1 сек. t.tv_sec = 1; t.tv_usec = 0; fd_set s; int select_res; /* * waiting for package */ pthread_mutex_lock(&(connection->mutex)); do{ FD_ZERO(&s); FD_SET(connection->tcp_socket, &s); select_res = select((connection->tcp_socket)+1,&s, nullptr, nullptr, &t); if(select_res < 0){ puts("socket error, while waited for data"); return -1; }else if(select_res > 0){ break; } t.tv_sec = 1; t.tv_usec = 0; pthread_mutex_unlock(&(connection->mutex)); sleep(2); pthread_mutex_lock(&(connection->mutex)); }while( select_res == 0); /* reading package to buffer. * if buffer too small and the package cannot be readen, * size of buffer increasing on opt_buffer, up to package will be readen */ int size = 1;// if I'll set big size (for example 1100) char *buffer = new char[size];// then memory corrupt will here. (on second time, when terminal will open second connection (previous - closed : 100%) do{ multiplier++; try{ delete[] buffer; size = opt_buffer*multiplier; buffer = new char[size];// here is double-linked list corrupted memset(buffer, '\0', size); }catch(std::exception & e){ puts("sth going wrong"); } recv_bytes = recv( connection->tcp_socket, buffer, opt_buffer * multiplier - 1, 0 | MSG_PEEK); if(recv_bytes == 0 || recv_bytes < 0) return -1; }while( strchr( buffer, (int)']') == nullptr); std::string result(buffer); if(recv_bytes < 0){ ret->good = false; pthread_mutex_unlock(&(connection->mutex)); delete[] buffer; return -1; } result = result.substr( 0, result.find_first_of(']')+1 )+'\n'; if(result.find_first_of(',') != string::npos) { puts("arguments detected"); ret->have_args = true; delete[] ret->arguments; ret->arguments = new char[recv_bytes]; } if ( opt_debug > 0 ) { printf("\nReceived %d bytes on socket.\n%s\n",recv_bytes, result.c_str()); } int res = sscanf(result.c_str(), "[%[^*]*%[^*]*%[^*]*%[^],],%[^]]", ret->CS, ret->imay, ret->len, ret->command, ret->arguments); if( res < 4){ pthread_mutex_lock(&stat_mutex); fprintf(statistics, "Bad data in sock %d. Waiting while send again...", connection->tcp_socket); printf("Bad data in sock %d. Source data: %s", connection->tcp_socket, result.c_str()); printf("Parsed:\n CS:%s\n imay:%s\n len:%s\n command:%s\n", ret->CS,ret->imay,ret->len,ret->command); fprintf(statistics,"Parsed:\n CS:%s\n imay:%s\n len:%s\n command:%s\n", ret->CS,ret->imay,ret->len,ret->command); pthread_mutex_unlock(&stat_mutex); ret->good = false; pthread_mutex_unlock(&(connection->mutex)); delete[] buffer; return -1; } unsigned int len_int = stoul(ret->len, nullptr, 16); /* len in package means length of package, after last '*' until ']', * so to check full length of package, need to include another parts * 2 bytes for CS, * 2 for '[' * 10 for imay, * 4 for len, * 3 for '*' */ ret->pack_leng = result.length() - 1; if( ret->pack_leng != len_int + 21 ){ printf("sizes don't match. ret->pack_leng = %d. len_int=%d", ret->pack_leng,len_int); recv( connection->tcp_socket, buffer, recv_bytes, 0 ); ret->good = false; pthread_mutex_unlock(&(connection->mutex)); delete[] buffer; return 0; } bool terminal_command = false, platform_command = false; for( unsigned int i = 0; i < const_arr_length(terminal_commands); i++){ if( strcmp(ret->command, terminal_commands[i].c_str()) == 0 ){ terminal_command = true; } } for( unsigned int i = 0; i < const_arr_length(platform_commands); i++){ if( strcmp(ret->command, platform_commands[i].c_str()) == 0 ){ platform_command = true; } } if( terminal_command == platform_command){ //pizdec... or unknown (unhandled) command ret->good = false; recv( connection->tcp_socket, buffer, recv_bytes, 0 ); puts("pizdec"); pthread_mutex_unlock(&(connection->mutex)); delete[] buffer; return -2; } if( (is_terminal_commands && !terminal_command) || (!is_terminal_commands && terminal_command)){ // unexpected command ret->good = true; puts("unexpected_command"); pthread_mutex_unlock(&(connection->mutex)); delete[] buffer; return -2; } puts("attempt to write statistics"); pthread_mutex_lock(&stat_mutex); fprintf(statistics,"from %s in %.6f sec.: %s\n", inet_ntoa(client_address.sin_addr), time_to_seconds( &time_start, &time_now ), result.c_str()); if ( fflush(statistics) != 0 ) { if ( opt_debug > 0 ) { fprintf(stderr, "Cannot flush buffers: %s\n", strerror(errno) ); } else { syslog( LOG_DAEMON | LOG_CRIT, "Cannot flush buffers: %s", strerror(errno) ); } } pthread_mutex_unlock(&stat_mutex); ret->good = true; recv( connection->tcp_socket, buffer, recv_bytes, 0 ); delete[] buffer; pthread_mutex_unlock(&(connection->mutex)); return recv_bytes; }
The polling of the function is performed cyclically in the body of the function, which is thrown by the main thread into the child. When the connection is broken, the cycle ends, and I clean the buffers, including the container (seemingly the only doubly linked list that I generally had to use ...)
while ( (recv_bytes = getPackage( connection, &pkg, true )) > -2 ) { // and reading socket // analysing }
2) Remove a connection from the unordered_map<long long, ClockConnection>
if(connections.connections.erase((long long)arg_imay) != 1){ printf("somehow connection with %lld wasn't erased from container", arg_imay); }else{ printf("connection with %lld was erased from container", arg_imay); }
There is a lot of code in the project to publish everything. If this is not enough, I will post an archive with workspace. I am writing from under ubuntu under eclipse kepler.
I’m managing the connection
list under a single, global mutex. I correct the container in two places - inserting the element before ejecting the stream, and removing the element before ending the stream. Both occur with captured mutex.
connection->mutex
for each connection, and the list ofconnection
shared. Under whichmutex
do you rule theconnection
list? - VladD