The problem that operations on sockets block the flow of execution can be solved in three ways:
- Select for each blocking operation a separate thread of execution, which it can block without harming the other threads;
- Translate sockets into non-blocking mode and poll their state in a loop;
- Work with sockets in asynchronous mode.
Below are all three options.
Multithreaded server
Perhaps the most frequently used method. You can say classic. For interaction between threads we will use queue.Queue . First, the logic of the queue works for us. Secondly, the queue is thread-safe.
# -*- coding: utf-8 -*- import socket import threading import queue # Определяем константу содержащую имя ОС # для учёта особенностей данной операционной системы import platform OS_NAME = platform.system() # Константы HOST = 'localhost' PORT = 1080 # Единственная глобальная переменная # доступная всем потокам run = True def shutdown_socket(s): # В Linux'ах просто закрыть заблокированный сокет будет мало, # он так и не выйдет из состояния блокировки. Нужно передать # ему команду на завершение. Но в Windows наоборот, команда # на завершение вызовет зависание, если сокет был заблокирован # вызовом accept(), а простое закрытие сработает. if OS_NAME == 'Linux': s.shutdown(socket.SHUT_RDWR) s.close() def reciver(client, q): while run: try: # Здесь поток блокируется до тех пор # пока не будут считаны все имеющиеся # в сокете данные data = client.recv(1024) if data: # Если есть данные # Отправляем в очередь сообщений кортеж # содержащий сокет отправителя # и принятые данные q.put((client, data)) print('{} отправил: {}'.format(client.getpeername(), data.decode())) except: break # В случае ошибки выходим из цикла client.close() # И закрываем клиентский сокет def sender(q, connections): while run: closed_sockets = set() try: # Получаем из очереди сообщений # сокет отправителя и принятые данные sender, message = q.get(timeout=1) except queue.Empty: pass # Игнорируем отсутствие сообщений в очереди else: # Если же сообщения есть for s in set(connections): # Обходим все сокеты if s != sender: # Кроме сокета отправителя try: s.send(message) # И отправляем им принятое сообщение except: closed_sockets.add(s) if closed_sockets: with threading.Lock(): connections -= closed_sockets print("Подключено:", len(connections)) q.task_done() # Сообщаем, что сообщение обработано def accepter(server, connections, q): while run: try: # Здесь поток блокируется до тех пор, пока кто-то не подключится к серверу client, addr = server.accept() except OSError as e: # Если отловлена не ожидаемая ошибка закрытия серверного сокета, а какая-то другая if (OS_NAME == 'Windows' and e.errno != 10038) or (OS_NAME == 'Linux' and e.errno != 22): raise # то возбуждаем её повторно else: # Если кто-то подключился и создан новый клиентский сокет # Устанавливаем ему таймаут, чтобы считать его сбойным, # если в этот сокет не будут ничего писать более 5 минут client.settimeout(60 * 5) with threading.Lock(): connections.add(client) # Запускаем новый поток, выполняющий функцию receiver # для только что полученного сокета threading.Thread(target=reciver, args=(client, q)).start() print("Подключено:", len(connections)) if __name__ == '__main__': print('Запуск...') # Очередь сообщений, через которую будут общаться потоки q = queue.Queue() # Множество соединений connections = set() server = socket.socket() server.bind((HOST, PORT)) server.listen() print(u'Сервер запущен на {}\n'.format(server.getsockname())) # Поток получающий сообщения из очереди # и отправляющий их всем сокетам в множестве connections threading.Thread(target=sender, args=(q, connections)).start() # Поток принимающий новые соединения threading.Thread(target=accepter, args=(server, connections, q)).start() while True: command = input() if command == 'exit': # Если в консоли введена команда exit run = False # отменяем выполнение циклов во всех потоках break # и выходим из этого цикла for s in connections: shutdown_socket(s) shutdown_socket(server)
In addition to making the changes necessary for the example to work in your code, I got rid of unnecessary global variables and made stylistic changes.
Non-blocking server
Very old, but not lost its relevance way. It was used by the ancients when working in single-tasking operating systems. To poll the state of sockets, we will use the select function from the module of the same name. It is not the fastest of these, but it works in all operating systems.
# -*- coding: utf-8 -*- import select import socket import queue server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server.setblocking(0) # Неблокирующийся сокет server.bind(('localhost', 1080)) server.listen() sockets = [server] message_queues = {} def close_connection(con): sockets.remove(con) if con in message_queues: del message_queues[con] con.close() # Пока есть хоть один сокет while sockets: # Опрашиваем сокеты на готовность к чтению, записи, ошибки. # С таймаутом в 1 секунду для того, чтобы программа реагировала # на другие события. readable, writable, exceptional = select.select(sockets, sockets, sockets, 1) for s in readable: # Для каждого сокета готового к чтению if s is server: # Если это сокет принимающий соединения connection, client_address = s.accept() connection.setblocking(0) # Этот клиентский сокет тоже будет неблокируемым sockets.append(connection) # Добавляем клиентский сокет в список сокетов message_queues[connection] = queue.Queue() # Создаём очередь сообщений для сокета else: try: data = s.recv(1024) # Читаем без блокировки except: close_connection(s) # В случае ошибки закрываем этот сокет и удаляем else: # Если ошибка не произошла if data: # И данные получены for c in message_queues: # Обходим все очереди сообщений if c != s: # Кроме очереди текущего сокета message_queues[c].put(data) # Отправляем данные в каждую очередь else: # Если данных нет в сокете готовом для чтения # значит он в состоянии закрытия на клиентской # стороне. Закрываем его на стороне сервера. close_connection(s) for s in writable: # Для каждого сокета готового к записи try: next_msg = message_queues[s].get_nowait() # Получаем сообщение из очереди except queue.Empty: pass # Игнорируем пустые очереди except KeyError: pass # Игнорируем очереди удалённые до того, как до них дошла очередь обработки else: s.send(next_msg) # Отправляем без блокировки for s in exceptional: # Для каждого сбойного сокета close_connection(s) # Закрываем сбойный сокет
Asynchronous server
Relatively new way. Asyncio provides three levels of abstraction for working with sockets - asynchronous wrappers on operations with sockets , streams and protocols . Names can cause some confusion, asyncio threads have nothing to do with threads, and protocols are not the same as network protocols. Other things being equal, one should always choose the highest level of abstraction, so we implement the protocol.
# -*- coding: utf-8 -*- import asyncio clients = [] class SimpleChatClientProtocol(asyncio.Protocol): def connection_made(self, transport): self.transport = transport self.peername = transport.get_extra_info("peername") print('Подключился: {}'.format(self.peername)) clients.append(self) def data_received(self, data): print('{} отправил: {}'.format(self.peername, data.decode())) for client in clients: if client is not self: client.transport.write(data) def connection_lost(self, ex): print('Отключился: {}'.format(self.peername)) clients.remove(self) # Цикл событий невозможно прервать, если в нём # не происходят события. Чтобы избежать этого # регистрируем в цикле фунцию, которая будет # вызываться раз в секунду. def wakeup(): loop = asyncio.get_event_loop() loop.call_later(1, wakeup) if __name__ == '__main__': print('Запуск...') # Получаем цикл событий loop = asyncio.get_event_loop() # Регистрируем "отлипатель" loop.call_later(1, wakeup) # Создаём асинхронную сопрограмму-протокол coro = loop.create_server(SimpleChatClientProtocol, host='localhost', port=1080) # Регистрируем её в цикле событий на выполнение server = loop.run_until_complete(coro) for socket in server.sockets: print('Сервер запущен на {}'.format(socket.getsockname())) print('Выход по Ctrl+C\n') try: loop.run_forever() # Запускаем бесконечный цикл событий except KeyboardInterrupt: # Программа прервана нажатием Ctrl+C pass finally: server.close() # Закрываем протокол loop.run_until_complete(server.wait_closed()) # Асинхронно ожидаем окончания закрытия loop.close() # Закрываем цикл событий
The protocol code is so simple and obvious that I didn’t even find the appropriate comments.
With each of these examples, you can work through telnet , and with the help of your client code.