There is a server, it accepts and sends messages, with sending I figured out, through the cycle to everyone, but it’s not possible to receive by the cycle, due to the fact that the one who is currently in the cycle will receive, the rest will be in the queue, and this is not the case what do you need. What is the message reception code now?

def Reciver(): global conn while 1: for i in set(conn): try: data = i.recv(1024) if data: print(data.decode()) except: pass 

conn - set with connections. Need every message that came to display on the screen.

  • would try kolbek api from asinkio. more convenient but harder - eri
  • No, you what, I hardly understood socket in my 15) - Mr Lucky Tomas
  • docs.python.org/3/library/asyncio-protocol.html there are examples from below. much easier raw socket - eri
  • Maybe I will throw you my code on the socket and you will redo it under asyncio? I wrote the code for day 4, it would take me to transfer everything there, I don’t know how much more, at the same time you solve the problem with receiving messages and write comments, but I will read and analyze some functions separately ... - Mr Lucky Tomas
  • @eri is easier - it means worse if the task is to learn. - Sergey Gornostaev

1 answer 1

The problem that operations on sockets block the flow of execution can be solved in three ways:

  1. Select for each blocking operation a separate thread of execution, which it can block without harming the other threads;
  2. Translate sockets into non-blocking mode and poll their state in a loop;
  3. Work with sockets in asynchronous mode.

Below are all three options.

Multithreaded server

Perhaps the most frequently used method. You can say classic. For interaction between threads we will use queue.Queue . First, the logic of the queue works for us. Secondly, the queue is thread-safe.

 # -*- coding: utf-8 -*- import socket import threading import queue # Определяем константу содержащую имя ОС # для учёта особенностей данной операционной системы import platform OS_NAME = platform.system() # Константы HOST = 'localhost' PORT = 1080 # Единственная глобальная переменная # доступная всем потокам run = True def shutdown_socket(s): # В Linux'ах просто закрыть заблокированный сокет будет мало, # он так и не выйдет из состояния блокировки. Нужно передать # ему команду на завершение. Но в Windows наоборот, команда # на завершение вызовет зависание, если сокет был заблокирован # вызовом accept(), а простое закрытие сработает. if OS_NAME == 'Linux': s.shutdown(socket.SHUT_RDWR) s.close() def reciver(client, q): while run: try: # Здесь поток блокируется до тех пор # пока не будут считаны все имеющиеся # в сокете данные data = client.recv(1024) if data: # Если есть данные # Отправляем в очередь сообщений кортеж # содержащий сокет отправителя # и принятые данные q.put((client, data)) print('{} отправил: {}'.format(client.getpeername(), data.decode())) except: break # В случае ошибки выходим из цикла client.close() # И закрываем клиентский сокет def sender(q, connections): while run: closed_sockets = set() try: # Получаем из очереди сообщений # сокет отправителя и принятые данные sender, message = q.get(timeout=1) except queue.Empty: pass # Игнорируем отсутствие сообщений в очереди else: # Если же сообщения есть for s in set(connections): # Обходим все сокеты if s != sender: # Кроме сокета отправителя try: s.send(message) # И отправляем им принятое сообщение except: closed_sockets.add(s) if closed_sockets: with threading.Lock(): connections -= closed_sockets print("Подключено:", len(connections)) q.task_done() # Сообщаем, что сообщение обработано def accepter(server, connections, q): while run: try: # Здесь поток блокируется до тех пор, пока кто-то не подключится к серверу client, addr = server.accept() except OSError as e: # Если отловлена не ожидаемая ошибка закрытия серверного сокета, а какая-то другая if (OS_NAME == 'Windows' and e.errno != 10038) or (OS_NAME == 'Linux' and e.errno != 22): raise # то возбуждаем её повторно else: # Если кто-то подключился и создан новый клиентский сокет # Устанавливаем ему таймаут, чтобы считать его сбойным, # если в этот сокет не будут ничего писать более 5 минут client.settimeout(60 * 5) with threading.Lock(): connections.add(client) # Запускаем новый поток, выполняющий функцию receiver # для только что полученного сокета threading.Thread(target=reciver, args=(client, q)).start() print("Подключено:", len(connections)) if __name__ == '__main__': print('Запуск...') # Очередь сообщений, через которую будут общаться потоки q = queue.Queue() # Множество соединений connections = set() server = socket.socket() server.bind((HOST, PORT)) server.listen() print(u'Сервер запущен на {}\n'.format(server.getsockname())) # Поток получающий сообщения из очереди # и отправляющий их всем сокетам в множестве connections threading.Thread(target=sender, args=(q, connections)).start() # Поток принимающий новые соединения threading.Thread(target=accepter, args=(server, connections, q)).start() while True: command = input() if command == 'exit': # Если в консоли введена команда exit run = False # отменяем выполнение циклов во всех потоках break # и выходим из этого цикла for s in connections: shutdown_socket(s) shutdown_socket(server) 

In addition to making the changes necessary for the example to work in your code, I got rid of unnecessary global variables and made stylistic changes.

Non-blocking server

Very old, but not lost its relevance way. It was used by the ancients when working in single-tasking operating systems. To poll the state of sockets, we will use the select function from the module of the same name. It is not the fastest of these, but it works in all operating systems.

 # -*- coding: utf-8 -*- import select import socket import queue server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server.setblocking(0) # Неблокирующийся сокет server.bind(('localhost', 1080)) server.listen() sockets = [server] message_queues = {} def close_connection(con): sockets.remove(con) if con in message_queues: del message_queues[con] con.close() # Пока есть хоть один сокет while sockets: # Опрашиваем сокеты на готовность к чтению, записи, ошибки. # С таймаутом в 1 секунду для того, чтобы программа реагировала # на другие события. readable, writable, exceptional = select.select(sockets, sockets, sockets, 1) for s in readable: # Для каждого сокета готового к чтению if s is server: # Если это сокет принимающий соединения connection, client_address = s.accept() connection.setblocking(0) # Этот клиентский сокет тоже будет неблокируемым sockets.append(connection) # Добавляем клиентский сокет в список сокетов message_queues[connection] = queue.Queue() # Создаём очередь сообщений для сокета else: try: data = s.recv(1024) # Читаем без блокировки except: close_connection(s) # В случае ошибки закрываем этот сокет и удаляем else: # Если ошибка не произошла if data: # И данные получены for c in message_queues: # Обходим все очереди сообщений if c != s: # Кроме очереди текущего сокета message_queues[c].put(data) # Отправляем данные в каждую очередь else: # Если данных нет в сокете готовом для чтения # значит он в состоянии закрытия на клиентской # стороне. Закрываем его на стороне сервера. close_connection(s) for s in writable: # Для каждого сокета готового к записи try: next_msg = message_queues[s].get_nowait() # Получаем сообщение из очереди except queue.Empty: pass # Игнорируем пустые очереди except KeyError: pass # Игнорируем очереди удалённые до того, как до них дошла очередь обработки else: s.send(next_msg) # Отправляем без блокировки for s in exceptional: # Для каждого сбойного сокета close_connection(s) # Закрываем сбойный сокет 

Asynchronous server

Relatively new way. Asyncio provides three levels of abstraction for working with sockets - asynchronous wrappers on operations with sockets , streams and protocols . Names can cause some confusion, asyncio threads have nothing to do with threads, and protocols are not the same as network protocols. Other things being equal, one should always choose the highest level of abstraction, so we implement the protocol.

 # -*- coding: utf-8 -*- import asyncio clients = [] class SimpleChatClientProtocol(asyncio.Protocol): def connection_made(self, transport): self.transport = transport self.peername = transport.get_extra_info("peername") print('Подключился: {}'.format(self.peername)) clients.append(self) def data_received(self, data): print('{} отправил: {}'.format(self.peername, data.decode())) for client in clients: if client is not self: client.transport.write(data) def connection_lost(self, ex): print('Отключился: {}'.format(self.peername)) clients.remove(self) # Цикл событий невозможно прервать, если в нём # не происходят события. Чтобы избежать этого # регистрируем в цикле фунцию, которая будет # вызываться раз в секунду. def wakeup(): loop = asyncio.get_event_loop() loop.call_later(1, wakeup) if __name__ == '__main__': print('Запуск...') # Получаем цикл событий loop = asyncio.get_event_loop() # Регистрируем "отлипатель" loop.call_later(1, wakeup) # Создаём асинхронную сопрограмму-протокол coro = loop.create_server(SimpleChatClientProtocol, host='localhost', port=1080) # Регистрируем её в цикле событий на выполнение server = loop.run_until_complete(coro) for socket in server.sockets: print('Сервер запущен на {}'.format(socket.getsockname())) print('Выход по Ctrl+C\n') try: loop.run_forever() # Запускаем бесконечный цикл событий except KeyboardInterrupt: # Программа прервана нажатием Ctrl+C pass finally: server.close() # Закрываем протокол loop.run_until_complete(server.wait_closed()) # Асинхронно ожидаем окончания закрытия loop.close() # Закрываем цикл событий 

The protocol code is so simple and obvious that I didn’t even find the appropriate comments.

With each of these examples, you can work through telnet , and with the help of your client code.

  • Sumptuously! And which of these codes is more productive? - Mr Lucky Tomas
  • I think it depends on the operating conditions. Most likely the asynchronous version will be a bit more productive. - Sergey Gornostaev
  • Okay I will try. Thank! - Mr Lucky Tomas
  • Although, if portability is not needed, and instead of select in the non-blocking version, use epoll , for example, the option on non-blocking sockets can get ahead :) - Sergey Gornostaev