Async Socket I/O
Asynchronous I/O is essential for building high-performance network servers that can handle thousands of concurrent connections efficiently. Traditional blocking I/O requires one thread per connection, which doesn't scale well due to memory overhead and context switching costs. Asynchronous I/O solves this by allowing a single thread to handle multiple connections using I/O multiplexing—the program monitors multiple sockets simultaneously and processes whichever ones are ready for reading or writing.
This section covers the evolution of I/O multiplexing in Python, from the classic select() system call (portable but limited) to modern high-performance mechanisms like epoll (Linux) and kqueue (BSD/macOS) that can efficiently handle tens of thousands of connections. We also cover the selectors module, which provides a high-level, platform-independent interface that automatically uses the best available mechanism. Understanding these primitives is valuable even if you use higher-level frameworks like asyncio, as they build upon these same concepts.
Async TCP Server - select
Learn More
For more examples and detailed explanations, see the Real Python guide on async tcp server - select.
select() is the oldest and most portable I/O multiplexing mechanism, available on virtually all platforms including Windows, Linux, and macOS. It monitors file descriptors for three conditions: readability (data available to read), writability (buffer space available to write), and exceptional conditions (errors). While portable, select() has limitations: it typically supports only up to 1024 file descriptors and has O(n) performance as it must scan all monitored descriptors on each call.
from select import select
import socket
host = ('localhost', 5566)
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind(host)
sock.listen(5)
read_list = [sock]
write_list = []
messages = {}
try:
while True:
readable, writable, _ = select(read_list, write_list, [])
for s in readable:
if s == sock:
conn, addr = sock.accept()
read_list.append(conn)
else:
msg = s.recv(1024)
if msg:
messages[s.fileno()] = msg
write_list.append(s)
else:
read_list.remove(s)
s.close()
for s in writable:
msg = messages.pop(s.fileno(), None)
if msg:
s.send(msg)
write_list.remove(s)
except KeyboardInterrupt:
sock.close()Async TCP Server - poll
Learn More
For more examples and detailed explanations, see the Real Python guide on async tcp server - poll.
poll() is similar to select() but more efficient for large numbers of file descriptors. Available on Unix systems.
import socket
import select
import contextlib
host = 'localhost'
port = 5566
connections = {}
requests = {}
responses = {}
@contextlib.contextmanager
def create_server(host, port):
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.setblocking(False)
s.bind((host, port))
s.listen(10)
try:
yield s
finally:
s.close()
def accept(server, poll):
conn, addr = server.accept()
conn.setblocking(False)
fd = conn.fileno()
poll.register(fd, select.POLLIN)
requests[fd] = conn
connections[fd] = conn
def recv(fd, poll):
conn = requests.pop(fd, None)
if not conn:
return
msg = conn.recv(1024)
if msg:
responses[fd] = msg
poll.modify(fd, select.POLLOUT)
else:
poll.unregister(fd)
conn.close()
connections.pop(fd, None)
def send(fd, poll):
conn = connections.get(fd)
msg = responses.pop(fd, None)
if conn and msg:
conn.send(msg)
requests[fd] = conn
poll.modify(fd, select.POLLIN)
with create_server(host, port) as server:
poll = select.poll()
poll.register(server.fileno(), select.POLLIN)
try:
while True:
events = poll.poll(1000)
for fd, event in events:
if fd == server.fileno():
accept(server, poll)
elif event & (select.POLLIN | select.POLLPRI):
recv(fd, poll)
elif event & select.POLLOUT:
send(fd, poll)
except KeyboardInterrupt:
passAsync TCP Server - epoll
Learn More
For more examples and detailed explanations, see the Real Python guide on async tcp server - epoll.
epoll is Linux-specific and the most efficient for handling thousands of connections. It uses edge-triggered or level-triggered notifications.
import socket
import select
import contextlib
host = 'localhost'
port = 5566
connections = {}
requests = {}
responses = {}
@contextlib.contextmanager
def create_server(host, port):
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.setblocking(False)
s.bind((host, port))
s.listen(10)
try:
yield s
finally:
s.close()
def accept(server, epoll):
conn, addr = server.accept()
conn.setblocking(False)
fd = conn.fileno()
epoll.register(fd, select.EPOLLIN)
requests[fd] = conn
connections[fd] = conn
def recv(fd, epoll):
conn = requests.pop(fd, None)
if not conn:
return
msg = conn.recv(1024)
if msg:
responses[fd] = msg
epoll.modify(fd, select.EPOLLOUT)
else:
epoll.unregister(fd)
conn.close()
connections.pop(fd, None)
def send(fd, epoll):
conn = connections.get(fd)
msg = responses.pop(fd, None)
if conn and msg:
conn.send(msg)
requests[fd] = conn
epoll.modify(fd, select.EPOLLIN)
with create_server(host, port) as server:
epoll = select.epoll()
epoll.register(server.fileno(), select.EPOLLIN)
try:
while True:
events = epoll.poll(1)
for fd, event in events:
if fd == server.fileno():
accept(server, epoll)
elif event & select.EPOLLIN:
recv(fd, epoll)
elif event & select.EPOLLOUT:
send(fd, epoll)
except KeyboardInterrupt:
pass
finally:
epoll.close()Async TCP Server - kqueue
Learn More
For more examples and detailed explanations, see the Real Python guide on async tcp server - kqueue.
kqueue is the BSD/macOS equivalent of epoll, providing efficient event notification for large numbers of file descriptors.
import socket
import select
import contextlib
if not hasattr(select, 'kqueue'):
print("kqueue not supported on this platform")
exit(1)
host = 'localhost'
port = 5566
connections = {}
requests = {}
responses = {}
@contextlib.contextmanager
def create_server(host, port):
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.setblocking(False)
s.bind((host, port))
s.listen(10)
try:
yield s
finally:
s.close()
def accept(server, kq):
conn, addr = server.accept()
conn.setblocking(False)
fd = conn.fileno()
ke = select.kevent(fd, select.KQ_FILTER_READ, select.KQ_EV_ADD)
kq.control([ke], 0)
requests[fd] = conn
connections[fd] = conn
def recv(fd, kq):
conn = requests.pop(fd, None)
if not conn:
return
msg = conn.recv(1024)
if msg:
responses[fd] = msg
# Switch from read to write
ke_del = select.kevent(fd, select.KQ_FILTER_READ, select.KQ_EV_DELETE)
ke_add = select.kevent(fd, select.KQ_FILTER_WRITE, select.KQ_EV_ADD)
kq.control([ke_del, ke_add], 0)
requests[fd] = conn
else:
ke = select.kevent(fd, select.KQ_FILTER_READ, select.KQ_EV_DELETE)
kq.control([ke], 0)
conn.close()
connections.pop(fd, None)
def send(fd, kq):
conn = connections.get(fd)
msg = responses.pop(fd, None)
if conn and msg:
conn.send(msg)
# Switch from write to read
ke_del = select.kevent(fd, select.KQ_FILTER_WRITE, select.KQ_EV_DELETE)
ke_add = select.kevent(fd, select.KQ_FILTER_READ, select.KQ_EV_ADD)
kq.control([ke_del, ke_add], 0)
requests[fd] = conn
with create_server(host, port) as server:
kq = select.kqueue()
ke = select.kevent(server.fileno(), select.KQ_FILTER_READ, select.KQ_EV_ADD)
kq.control([ke], 0)
try:
while True:
events = kq.control(None, 1024, 1)
for e in events:
fd = e.ident
if fd == server.fileno():
accept(server, kq)
elif e.filter == select.KQ_FILTER_READ:
recv(fd, kq)
elif e.filter == select.KQ_FILTER_WRITE:
send(fd, kq)
except KeyboardInterrupt:
pass
finally:
kq.close()High-Level API - selectors
Learn More
For more examples and detailed explanations, see the Real Python guide on high-level api - selectors.
The selectors module (Python 3.4+) provides a high-level, platform-independent interface that automatically uses the best available mechanism (epoll, kqueue, etc.).
import selectors
import socket
import contextlib
@contextlib.contextmanager
def create_server(host, port):
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.bind((host, port))
s.listen(10)
sel = selectors.DefaultSelector()
try:
yield s, sel
finally:
s.close()
sel.close()
def accept_handler(sock, sel):
conn, addr = sock.accept()
sel.register(conn, selectors.EVENT_READ, read_handler)
def read_handler(conn, sel):
msg = conn.recv(1024)
if msg:
conn.send(msg)
else:
sel.unregister(conn)
conn.close()
host = 'localhost'
port = 5566
with create_server(host, port) as (sock, sel):
sel.register(sock, selectors.EVENT_READ, accept_handler)
try:
while True:
events = sel.select()
for key, mask in events:
handler = key.data
handler(key.fileobj, sel)
except KeyboardInterrupt:
passComparison of I/O Multiplexing Methods
Learn More
For more examples and detailed explanations, see the Real Python guide on comparison of i o multiplexing methods.
| Method | Platform | Scalability | Notes |
|---|---|---|---|
| select | All | O(n) - Limited | Max ~1024 FDs |
| poll | Unix | O(n) - Better | No FD limit |
| epoll | Linux | O(1) - Excellent | Edge/level triggered |
| kqueue | BSD/macOS | O(1) - Excellent | Similar to epoll |
| selectors | All | Best available | Recommended for new code |
TIP
For new code, use the selectors module or asyncio for async I/O. The low-level APIs (select, poll, epoll, kqueue) are mainly useful for understanding how async I/O works or when you need fine-grained control.