
2. 流头由一些内核例程构成,应用进程针对流描述符执行系统调用(例如read、 putmsg、ioctl等)时这些内核例程将被激活。


在创建一个套接字时,套接字函数库把模块sockmod推入流中。向应用进程提供套接字API的正是套接字函数库和sockrnod流模块两者的组合。
在创建一个XTI端点时,XTI函数库把模块timod推入流中。向应用进程提供XTIAPI的正是XTI函数库和timod流模块两者的组合。XTI API的端点相当于套接字API的套接字。
为了针对XTI端点使用read和write访间网络数据,通常必须把模块tirdwr推入流中。图中中间那个使用TCP的进程就是这么做的。推入该模块后XTI函数库中的函数不能继续使用,那个进程这么做也许已经放弃使用XTI,因此我们没给它标上XTI函数库。
所标的三个服务接口定义顺着流上行和下行交换的网络消息的格式。传输提供者接口TIP定义了向它上方的模块提供的接口。网络提供者接口NPI定义了网络提供者向它上方的模块提供的接口。DLPI数据链路层接口。
一个流中的每个部件:流头、所有处理模块和驱动程序包含至少一对队列(queue):一个写队列和一个读队列,如下所示:

流消息可划分为高优先级、优先级带和普通三类。优先级共有 256带,在0- 255之间取值,其中普通消息位于带0。流消息的优先级用于排队和流揽控制。按约定高优先级消息不受流量控制影响。
虽然流系统支持 256个不同的优先级带,网络协议往往只用代表经加速数据的带1和代表普通数据的带0。
TPI不认为TCP带外数据是真正的经加速数据 ,事实上TCP的各通数据和带外数据都使用带0,只有那些让经加边数据(并不是像TCP中的紧急指针而已)先于普通数据发送的协议才使用带1发送经加速数据 。

普通优先级消息和高优先级消息两大类,它们却分别约有 12种和18种。有3种不同类型的消息: M_DATA、M_PROTO和M_PCPROTO (PC表示, 优先级控制,隐指高优先级消息)。下图 说明了这3种消息类型是如何使用write和putmsg这两个函数产生的。

//沿着流上行和下行的数据由消息构成,而且每个消息含有控制或数据亦或两者都有。
//如果在流上使用read和write。 那么所传送的仅仅是数据。
//下面两个函数提供进程能够读写数据和控制两部分信息
#include
int getmsg(int fd,struct strbuf *ctlptr,struct strubf *dataptr,int *flagsp);
//最后一个参数是值-结果参数。
//flagsp指向的整数值为0 ,那么返回的是流中第一个消息(既可能是普通消息,也可能是高优先级消息)。
//该整数值为RS_HIPRI. 那就等待一个高优先级消息到达流头。
//无论哪种情况,存放到如flagsp指向的整数中的值根据所返回消息的类型或为0, 或为RS_HIPRI。
//传递给ctlptr和dataptr参数均为非空指针,如果没有控制信息待返回(也就是即将返回一 个M_ DATA 消息),就在返回时把ctlptr->lenn设置为-1作为指示。
//类似地如果没有数据待返回就把 dataptr->len设投为-1。
int putmsg(int fd,const struct struct strbuf *ctlptr,const struct struct strbuf *dataptr,int flags);
//可以发送控制信息和数据也可以同时发送两者。
//指示缺失控制信息把vtlptr参数指定空指针,也可以ctlptr->len设置为-1。
//同样手段设置dataptr参数用于指示缺失数据。
//flags:0表示普通消息,为RS_HIPRI表示高优先级消息。
struct strbuf{
int maxlen;
int len;
char *buf;
};
//putmsg在成功时返回0 , 在出错时返回- 1。
//getmsg仅在整个消息完整返回给调用者时才返问0 。
//如果控制缓冲区不足以容纳完整的控制信息,那就返回非负的MORECTL。
//类似地如果数据缓冲区太小,那就返回MOREDATA 。
//如果两个缓冲区都太小,那就返回这两个标志的逻缉或。
缺失控制信息,putmsg将产生一个M_DATA信息,如下所示。否则根据flags产生一个M_PROTO或M_PCRROTO消息。

当对于不同优先级带的支持随 SVR 4 被增加到流系统时,以下两个g e tm sg和pu tm sg 的变体函数也被间时引入。
#include
int getpmsg(int fd,struct strbuf *ctlptr,struct strubf *dataptr,int *bandp,int *flagsp);
//bandp和flagsp参数是值-结果参数。
//flagsp指向的整数可以取值MSG_HIPRI(以读入一 个高优先消息)、MSG_BAND (以读入 一个优先级至少为bandp指向的整数值的消息)或MSG_ANY(以读入任一消息)。
//函数返回时,bandp指向的整数含有所读入消息的优先级带。
//flagsp指向的整数含有MSG_HIPRI (如果所读入的是一个商优先级消息)或MSG_BAND (如果所读入的是其他类型消息)。
int putpmsg(int fd,const struct struct strbuf *ctlptr,const struct struct strbuf *dataptr,int band,int flags);
//band参数必须在0-255之间(含)。
//如果flags参数为MSG_BAND,那就产生一个所指定优先级带的消息。
//把flags设置为MSG_BAND并且把band设置设为0等效于调用putmsg。
//如果flags为MSG_HIPRI, ban就必须为0, 所产生的是 一个环优先级消息。(注意,putmsg使用不同名字的RS_HIPRI标志)。
#include
int ioctl(int fd,int request,.../*void *arg*/);
//处理流所必须包含的头文件不一样。
TPI表示为传输层向它上方的模块提供的服务接口。在流环境中套接字和XTI都使用TPI。应用进程跟TCP和UD P交换TPI消息的是套接字函数库和sockmod的组合,或者是XTI函数库和tirnod的组合。
TPI是一 个基于消息的接口。它定义了在应用进程 (例如XTI函数库或套接字函数库)和传输层之间沿看流上行和下行交换的消息,包括消息的格式和每个消息执行的操作。
实例中,应用进程向提供者发出 一 个请求 ( “捆绑这个本地地址"), 提供者则发回一个响应(“成功 ” 或 “出错”)。一些事件在提供者异步地发生 (对某个服务器的连接请求的到达),它们导致沿着流向上发送的消息或信号。我们可以绕过XTI和套接字直接使用TPI。
头文件tpi_daytime.h头文件
#include "unpxti.h"
#include
#include
void tpi_bind(int, const void *, size_t);
void tpi_connect(int, const void *, size_t);
ssize_t tpi_read(int, void *, size_t);
void tpi_close(int);
时间获取客户程序的mian函数:
#include "tpi_daytime.h"
int main(int argc, char **argv)
{
int fd, n;
char recvline[MAXLINE + 1];
struct sockaddr_in myaddr, servaddr;
if (argc != 2)
err_quit("usage: tpi_daytime " );
//打开传输提供者,绑定本地地址
fd = open(XTI_TCP, O_RDWR, 0);
//填写服务器地址,建立连接
/*4bind any local address */
bzero(&myaddr, sizeof(myaddr));
myaddr.sin_family = AF_INET;
myaddr.sin_addr.s_addr = htonl(INADDR_ANY);
myaddr.sin_port = htons(0);
tpi_bind(fd, &myaddr, sizeof(struct sockaddr_in));
/*4fill in server's address */
bzero(&servaddr, sizeof(servaddr));
servaddr.sin_family = AF_INET;
servaddr.sin_port = htons(13); /* daytime server */
inet_pton(AF_INET, argv[1], &servaddr.sin_addr);
tpi_connect(fd, &servaddr, sizeof(struct sockaddr_in));
//从服务器读入数据,复制到标准输出
for ( ; ; ) {
if ( (n = tpi_read(fd, recvline, MAXLINE)) <= 0) {
if (n == 0)
break;
else
err_sys("tpi_read error");
}
recvline[n] = 0; /* null terminate */
fputs(recvline, stdout);
}
tpi_close(fd);
exit(0);
}
tpi_bind函数:
#include "tpi_daytime.h"
void tpi_bind(int fd, const void *addr, size_t addrlen)
{
struct {
struct T_bind_req msg_hdr;
char addr[128];
} bind_req;
/*
struct T_bind_req{
t_scalar_t PRIM_type;
t_scalar_t ADDR_length;
t_scalar_t ADDR_offset;
t_scalar_t CONIND_number;
};*/
struct {
struct T_bind_ack msg_hdr;
char addr[128];
} bind_ack;
struct strbuf ctlbuf;
struct T_error_ack *error_ack;
int flags;
//填写T_bind_req结构
bind_req.msg_hdr.PRIM_type = T_BIND_REQ;//T_BIND_REQ或T_ERROP_ACK
bind_req.msg_hdr.ADDR_length = addrlen;
bind_req.msg_hdr.ADDR_offset = sizeof(struct T_bind_req);
bind_req.msg_hdr.CONIND_number = 0;
memcpy(bind_req.addr, addr, addrlen); /* sockaddr_in{} */
ctlbuf.len = sizeof(struct T_bind_req) + addrlen;
ctlbuf.buf = (char *) &bind_req;
putmsg(fd, &ctlbuf, NULL, 0);
ctlbuf.maxlen = sizeof(bind_ack);
ctlbuf.len = 0;
ctlbuf.buf = (char *) &bind_ack;
flags = RS_HIPRI;
getmsg(fd, &ctlbuf, NULL, &flags);
/* *INDENT-OFF* */
if (ctlbuf.len < (int) sizeof(long))
err_quit("bad length from getmsg");
/* *INDENT-ON* */
switch(bind_ack.msg_hdr.PRIM_type) {
case T_BIND_ACK:
return;
case T_ERROR_ACK:
/* *INDENT-OFF* */
if (ctlbuf.len < (int) sizeof(struct T_error_ack))
err_quit("bad length for T_ERROR_ACK");
error_ack = (struct T_error_ack *) &bind_ack.msg_hdr;
err_quit("T_ERROR_ACK from bind (%d, %d)",
error_ack->TLI_error, error_ack->UNIX_error);
/* *INDENT-ON* */
default:
err_quit("unexpected message type: %d", bind_ack.msg_hdr.PRIM_type);
}
}
/*T_BIND_REQ或T_ERROP_ACK应答消息的结构:
struct T_bind_ack{
t_scalar_t PRIM_type;
t_scalar_t ADDR_length;
t_scalar_t ADDR_offset;
t_scalar_t CONIND_number;
};
struct T_error_ack{
t_scalar_t PRIM_type;
t_scalar_t ADDR_prim;
t_scalar_t ADDR_error;
t_scalar_t UNIX_error;
};
*/
tpi_connect建立与服务器连接
#include "tpi_daytime.h"
void tpi_connect(int fd, const void *addr, size_t addrlen)
{
struct {
struct T_conn_req msg_hdr;
char addr[128];
} conn_req;
struct {
struct T_conn_con msg_hdr;
char addr[128];
} conn_con;
/*T_conn_con存放1连接的协议地址和选项:
struct T_conn_req{
t_scalar_t PRIM_type;
t_scalar_t DEST_length;
t_scalar_t DEST_offset;
t_scalar_t OPT_length;
t_scalar_t OPT_offset;
};
*/
struct strbuf ctlbuf;
union T_primitives rcvbuf;
struct T_error_ack *error_ack;
struct T_discon_ind *discon_ind;
int flags;
//填写请求结构并发送给提供者
conn_req.msg_hdr.PRIM_type = T_CONN_REQ;
conn_req.msg_hdr.DEST_length = addrlen;
conn_req.msg_hdr.DEST_offset = sizeof(struct T_conn_req);
conn_req.msg_hdr.OPT_length = 0;
conn_req.msg_hdr.OPT_offset = 0;
memcpy(conn_req.addr, addr, addrlen); /* sockaddr_in{} */
ctlbuf.len = sizeof(struct T_conn_req) + addrlen;
ctlbuf.buf = (char *) &conn_req;
putmsg(fd, &ctlbuf, NULL, 0);
//读入响应
ctlbuf.maxlen = sizeof(union T_primitives);
ctlbuf.len = 0;
ctlbuf.buf = (char *) &rcvbuf;
flags = RS_HIPRI;
getmsg(fd, &ctlbuf, NULL, &flags);
/*
struct T_ok_ack{
t_scalar_t PRIM_type;
t_scalar_t CORRECT_prim;
};*/
/* *INDENT-OFF* */
if (ctlbuf.len < (int) sizeof(long))
err_quit("tpi_connect: bad length from getmsg");
/* *INDENT-ON* */
switch(rcvbuf.type) {
case T_OK_ACK:
break;
case T_ERROR_ACK:
/* *INDENT-OFF* */
if (ctlbuf.len < (int) sizeof(struct T_error_ack))
err_quit("tpi_connect: bad length for T_ERROR_ACK");
error_ack = (struct T_error_ack *) &rcvbuf;
err_quit("tpi_connect: T_ERROR_ACK from conn (%d, %d)",
error_ack->TLI_error, error_ack->UNIX_error);
/* *INDENT-ON* */
default:
err_quit("tpi_connect: unexpected message type: %d", rcvbuf.type);
}
//等待连接
ctlbuf.maxlen = sizeof(conn_con);
ctlbuf.len = 0;
ctlbuf.buf = (char *) &conn_con;
flags = 0;
getmsg(fd, &ctlbuf, NULL, &flags);
/* *INDENT-OFF* */
if (ctlbuf.len < (int) sizeof(long))
err_quit("tpi_connect2: bad length from getmsg");
/* *INDENT-ON* */
/*
struct T_conn_con{
t_scalar_t PRIM_type;
t_scalar_t RES_length;
t_scalar_t RES_offset;
t_scalar_t OPT_length;
t_scalar_t OPT_offset;
};*/
switch(conn_con.msg_hdr.PRIM_type) {
case T_CONN_CON:
break;
/*
struct T_discon_ind{
t_scalar_t PRIM_type;
t_scalar_t DISCON_reason;
t_scalar_t SEQ_number;
};
case T_DISCON_IND:
/* *INDENT-OFF* */
if (ctlbuf.len < (int) sizeof(struct T_discon_ind))
err_quit("tpi_connect2: bad length for T_DISCON_IND");
discon_ind = (struct T_discon_ind *) &conn_con.msg_hdr;
err_quit("tpi_connect2: T_DISCON_IND from conn (%d)",
discon_ind->DISCON_reason);
/* *INDENT-ON* */
default:
err_quit("tpi_connect2: unexpected message type: %d",
conn_con.msg_hdr.PRIM_type);
}
}
tpi_read从流中读入数据:
#include "tpi_daytime.h"
ssize_t
tpi_read(int fd, void *buf, size_t len)
{
struct strbuf ctlbuf;
struct strbuf datbuf;
union T_primitives rcvbuf;
int flags;
//读取控制信息和数据;处理应答
ctlbuf.maxlen = sizeof(union T_primitives);
ctlbuf.buf = (char *) &rcvbuf;
datbuf.maxlen = len;
datbuf.buf = buf;
datbuf.len = 0;
flags = 0;
getmsg(fd, &ctlbuf, &datbuf, &flags);
if (ctlbuf.len >= (int) sizeof(long)) {
if (rcvbuf.type == T_DATA_IND)
return(datbuf.len);
else if (rcvbuf.type == T_ORDREL_IND)
return(0);
else
err_quit("tpi_read: unexpected type %d", rcvbuf.type);
} else if (ctlbuf.len == -1)
return(datbuf.len);
else
err_quit("tpi_read: bad length from getmsg");
}
#include "tpi_daytime.h"
void tpi_close(int fd)
{
struct T_ordrel_req ordrel_req;
struct strbuf ctlbuf;
//向对端发送顺序释放
ordrel_req.PRIM_type = T_ORDREL_REQ;
ctlbuf.len = sizeof(struct T_ordrel_req);
ctlbuf.buf = (char *) &ordrel_req;
putmsg(fd, &ctlbuf, NULL, 0);
close(fd);