TCP半包和粘包

粘包的主要原因

  • 发送方每次写入数据 < 套接字(Socket)缓冲区大小
  • 接收方读取套接字(Socket)缓冲区数据不够及时

半包的主要原因:

  • 发送方每次写入数据 > 套接字(Socket)缓冲区大小
  • 发送的数据大于协议的 MTU (Maximum Transmission Unit,最大传输单元),因此必须拆包

原理

解决方法

解决方法

就像上面说的,UDP 之所以不会产生粘包和半包问题,主要是因为消息有边界,因此,我们也可以采取类似的思路。

改成短连接

将 TCP 连接改成短连接,一个请求一个短连接。这样的话,建立连接到释放连接之间的消息即为传输的信息,消息也就产生了边界。

这样的方法就是十分简单,不需要在我们的应用中做过多修改。但缺点也就很明显了,效率低下,TCP 连接和断开都会涉及三次握手以及四次握手,每个消息都会涉及这些过程,十分浪费性能。

因此,并不推介这种方式。

封装成帧

封装成帧(Framing),也就是原本发送消息的单位是缓冲大小,现在换成了帧,这样我们就可以自定义边界了。一般有4种方式:

固定长度

这种方式下,消息边界也就是固定长度即可。

优点就是实现很简单,缺点就是空间有极大的浪费,如果传递的消息中大部分都比较短,这样就会有很多空间是浪费的。

因此,这种方式一般也是不推介的。

分隔符

这种方式下,消息边界也就是分隔符本身。

优点是空间不再浪费,实现也比较简单。缺点是当内容本身出现分割符时需要转义,所以无论是发送还是接受,都需要进行整个内容的扫描。

因此,这种方式效率也不是很高,但可以尝试使用。

专门的 length 字段

这种方式,就有点类似 Http 请求中的 Content-Length,有一个专门的字段存储消息的长度。作为服务端,接受消息时,先解析固定长度的字段(length字段)获取消息总长度,然后读取后续内容。

优点是精确定位用户数据,内容也不用转义。缺点是长度理论上有限制,需要提前限制可能的最大长度从而定义长度占用字节数。

因此,十分推介用这种方式。

redis半包粘包问题

实验

代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
package network.tcp;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

/**
*
*
*/

public class NettyServer {
private int port;

public NettyServer(int port) {
this.port = port;
bind();
}

private void bind() {
EventLoopGroup boss = new NioEventLoopGroup();
EventLoopGroup worker = new NioEventLoopGroup();

try {
ServerBootstrap bootstrap = new ServerBootstrap();

bootstrap.group(boss, worker);
bootstrap.channel(NioServerSocketChannel.class);
bootstrap.option(ChannelOption.SO_BACKLOG, 1024); // 连接数
bootstrap.option(ChannelOption.TCP_NODELAY, true); // 不延迟,消息立即发送
bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true); // 长连接
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel)
throws Exception {
ChannelPipeline p = socketChannel.pipeline();
p.addLast(new NettyProblemServerHandler());// 添加NettyServerHandler,用来处理Server端接收和处理消息的逻辑
}
});
ChannelFuture channelFuture = bootstrap.bind(port).sync();
if (channelFuture.isSuccess()) {
System.err.println("启动Netty服务成功,端口号:" + this.port);
}
// 关闭连接
channelFuture.channel().closeFuture().sync();

} catch (Exception e) {
System.err.println("启动Netty服务异常,异常信息:" + e.getMessage());
e.printStackTrace();
} finally {
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}

public static void main(String[] args) throws InterruptedException {
new NettyServer(10086);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
package network.tcp;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

public class NettyClient {

/*
* 服务器端口号
*/
private int port;

/*
* 服务器IP
*/
private String host;

public NettyClient(int port, String host) throws InterruptedException {
this.port = port;
this.host = host;
start();
}

private void start() throws InterruptedException {

EventLoopGroup eventLoopGroup = new NioEventLoopGroup();

try {

Bootstrap bootstrap = new Bootstrap();
bootstrap.channel(NioSocketChannel.class);
bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
bootstrap.group(eventLoopGroup);
bootstrap.remoteAddress(host, port);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel)
throws Exception {
// socketChannel.pipeline().addLast(new MyProtocolDecoder(1024, 0, 4, 0, 4, true));
socketChannel.pipeline().addLast(new NettyProblemClientHandler());
}
});
ChannelFuture channelFuture = bootstrap.connect(host, port).sync();
if (channelFuture.isSuccess()) {
System.err.println("连接服务器成功");
}
channelFuture.channel().closeFuture().sync();
} finally {
eventLoopGroup.shutdownGracefully();
}
}

public static void main(String[] args) throws InterruptedException {
new NettyClient(10086, "localhost");
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
package network.tcp;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;

public class NettyProblemServerHandler extends ChannelHandlerAdapter {

private int counter;

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf buf = (ByteBuf) msg;
byte[] req = new byte[buf.readableBytes()];
buf.readBytes(req);
String body = new String(req);
System.out.println("-----start------\n"+ body + "\n------end------");
String content = "receive" + ++counter;
ByteBuf resp = Unpooled.copiedBuffer(content.getBytes());
ctx.writeAndFlush(resp);

}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
package network.tcp;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;

import java.io.UnsupportedEncodingException;

public class NettyProblemClientHandler extends ChannelHandlerAdapter {

private int counter;

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {

byte[] req = ("我是一条测试消息,快来读我吧,啦啦啦").getBytes();
for (int i = 0; i < 100; i++) {
ByteBuf message = Unpooled.buffer(req.length);
message.writeBytes(req);
ctx.writeAndFlush(message);
}
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
ByteBuf buf = (ByteBuf) msg;
byte[] req = new byte[buf.readableBytes()];
buf.readBytes(req);
String body = new String(req);
System.out.println(body + " count:" + ++counter + "----end----\n");

}

private String getMessage(ByteBuf buf) {
byte[] con = new byte[buf.readableBytes()];
buf.readBytes(con);
try {
return new String(con, "UTF8");
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
return null;
}
}
}

实验结果

  • server

  • client

-------------本文结束感谢您的阅读-------------
我知道是不会有人点的,但万一有人想不开呢?