作者: Unmesh Joshi
译者: java达人
通过使用一个TCP连接来维护发送到服务器的请求的顺序。
问题
当我们使用领导者和追随者模式时,我们需要确保领导者和每个追随者之间的信息保持有序,并对丢失的信息进行重试。与此同时保持较低的新连接成本,这样打开新连接不会增加系统的延迟。
解决方案
幸运的是,长期广泛使用的TCP机制提供了所有这些必要的特征。因此,我们可以通过保证一个follower和leader之间的所有通信都通过一个套接字通道来实现我们需要的通信。然后追随者使用一个Singular Update Queue序列化来自leader的更新
节点在连接打开后永远不会关闭它,并持续读取新请求。节点对每个连接使用一个专用线程来读写请求。如果使用了非阻塞io,则不需要每个连接一个线程。一个简单的基于线程的实现如下:
class SocketHandlerThread…
@Override public void run() { try { //Continues to read/write to the socket connection till it is closed. while (true) { handleRequest(); } } catch (Exception e) { getLogger().debug(e); } }
private void handleRequest() { RequestOrResponse request = readRequestFrom(clientSocket); RequestId requestId = RequestId.valueOf(request.getRequestId()); requestConsumer.accept(new Message<>(request, requestId, clientSocket)); }
节点读取请求并将它们提交给一个单一的更新队列进行处理。一旦节点处理了请求,它就将响应写回套接字。
每当节点建立通信时,它就会打开一个套接字连接,用于与另一方的所有请求。
class SingleSocketChannel…
public class SingleSocketChannel implements Closeable { final InetAddressAndPort address; final int heartbeatIntervalMs; private Socket clientSocket; private final OutputStream socketOutputStream; private final InputStream inputStream;
public SingleSocketChannel(InetAddressAndPort address, int heartbeatIntervalMs) throws IOException { this.address = address; this.heartbeatIntervalMs = heartbeatIntervalMs; clientSocket = new Socket(); clientSocket.connect(new InetSocketAddress(address.getAddress(), address.getPort()), heartbeatIntervalMs); clientSocket.setSoTimeout(heartbeatIntervalMs * 10); //set socket read timeout to be more than heartbeat. socketOutputStream = clientSocket.getOutputStream(); inputStream = clientSocket.getInputStream(); }
public synchronized RequestOrResponse blockingSend(RequestOrResponse request) throws IOException { writeRequest(request); byte[] responseBytes = readResponse(); return deserialize(responseBytes); }
private void writeRequest(RequestOrResponse request) throws IOException { var dataStream = new DataOutputStream(socketOutputStream); byte[] messageBytes = serialize(request); dataStream.writeInt(messageBytes.length); dataStream.write(messageBytes); }
在连接上保持一个超时是很重要的,这样在出现错误时它就不会无限期地阻塞。我们使用HeartBeat机制,定期通过套接字通道发送请求,以使其保持活动状态。这个超时时间通常为心跳间隔的倍数,包含网络往返时间和一些可能的网络延迟。将连接超时设置为心跳间隔的10倍是合理的。
class SocketListener…
private void setReadTimeout(Socket clientSocket) throws SocketException { clientSocket.setSoTimeout(config.getHeartBeatIntervalMs() * 10); }
通过单个通道发送请求可能会产生头部阻塞问题。为了避免这些问题,我们可以使用Request Pipeline。
例子
?Zookeeper使用一个套接字通道和每个追随者一个线程来完成所有的通信。?Kafka在follower和leader分区之间使用单个套接字通道来复制消息。?参考Raft共识算法的实现,LogCabin使用单套接字通道在领导者和追随者之间进行通信