Adf.QueueServer

本文简要介绍Adf.QueueServer的一些常规应用和协议。

1. 协议描述

本服务支持 WebSocket Json/ WebSocket Binary / HTTP Json 三种通信方式
一般使用 WebSocket 做异步push/pull, 使用HTTP做同步push/pull

若同时使用多种格式JSON/BINARY,需要注意消息体(body)是否可以通过UTF8编码于二进制与字符串之间相互转化,否则可能会出现乱码情况。理论上同一队列应尽量保持使用同一种格式传输。

本服务为FIFO QUEUE,消息将保持顺序PULL,在单线程消费下消息与业务均保持顺序性。在多终端或多线程或多PULL下、消息保持顺序PULl但不保证业务执行的顺序性。

本服务支持命令:

rpush 队列右侧插入一项
lpush 队列左侧插入一项
pull 从队列中获取最左侧一项
delete 删除已从队列中pull过的项,未被pull过的项不可被删除
lcancel 恢复已pull过的项至队列左侧
rcancel 恢复已pull过的项至队列右侧
count 查询队列长度
clear 清空队列项

2. JSON格式

字符编码格式: UTF8

//push request:
{
 "action":"lpush/rpush",
 "requestid":"1 ~ 32 chr.  propose use uuid",
 "queue": "/order/new",
 "body":"this is a test message."
}

//push response:
{
 "action":"lpush/rpush",
 "requestid":"88bea088837d4743af67a2d49e6d08d1",
 "queue": "/order/new",
 "result":"ok or failure message",
 "messageid": 1452443242
}

//delete/lcancel/rcancel/createqueue/deletequeue/count/clear/pull request:
{
 "action":"delete/lcancel/rcancel/createqueue/deletequeue/count/clear/pull",
 "requestid":"1 ~ 32 chr.  propose use uuid",
 "queue": "/order/new"
}

//delete/lcancel/rcancel/createqueue/deletequeue response:
{
  "action":"delete/lcancel/rcancel/createqueue/deletequeue",
  "requestid":"88bea088837d4743af67a2d49e6d08d1",
  "queue": "/order/new",
  "result":"ok or failure message"
}

//count/clear response:
{
  "action":"count/clear",
  "requestid":"88bea088837d4743af67a2d49e6d08d1",
  "queue": "/order/new",
  "result":"ok or failure message",
  "count":0
}

//pull response:
{
	"action":"pull",
	"requestid":"88bea088837d4743af67a2d49e6d08d1",
	"queue": "/order/new",
	"result":"ok or failure message",
	"body":"message body.",
	"duplications": 0,
	"messageid": 1452443242
}

2. 二进制传输格式

Endianness : Big-Endian
Body: UTF8

二进制元素内容与JSON结构一致。

//type:
action		: byte
xx length	: uint16 
duplications: uint16
count		: int32
messageid	: uint64


//push request:
+---------------------------------------------------------------+
|	action (1)
+---------------------------------------------------------------+
|	id length (2)
+---------------------------------------------------------------+
|	id  (1 ~ 65535)
+---------------------------------------------------------------+
|	queue length (2)
+---------------------------------------------------------------+
|	queue (1 ~ 65535)
+---------------------------------------------------------------+
|	body length (2)
+---------------------------------------------------------------+
|	body (1 ~ 65535)
+---------------------------------------------------------------+

//push response:
+---------------------------------------------------------------+
|	action (1)
+---------------------------------------------------------------+
|	id length (2)
+---------------------------------------------------------------+
|	id  (1 ~ 65535)
+---------------------------------------------------------------+
|	queue length (2)
+---------------------------------------------------------------+
|	queue (1 ~ 65535)
+---------------------------------------------------------------+
|	result length (2)
+---------------------------------------------------------------+
|	result (1 ~ 65535)
+---------------------------------------------------------------+
| OK | messageid length (8)
+---------------------------------------------------------------+


//delete/lcancel/rcancel/createqueue/deletequeue/count/clear/pull request:
+---------------------------------------------------------------+
|	action (1)
+---------------------------------------------------------------+
|	id length (2)
+---------------------------------------------------------------+
|	id  (1 ~ 65535)
+---------------------------------------------------------------+
|	queue length (2)
+---------------------------------------------------------------+
|	queue (1 ~ 65535)
+---------------------------------------------------------------+

//delete/lcancel/rcancel/createqueue/deletequeue response:
+---------------------------------------------------------------+
|	action (1)
+---------------------------------------------------------------+
|	id length (2)
+---------------------------------------------------------------+
|	id  (1 ~ 65535)
+---------------------------------------------------------------+
|	queue length (2)
+---------------------------------------------------------------+
|	queue (1 ~ 65535)
+---------------------------------------------------------------+
|	result length (2)
+---------------------------------------------------------------+
|	result (1 ~ 65535)
+---------------------------------------------------------------+

//count/clear response:
+---------------------------------------------------------------+
|	action (1)
+---------------------------------------------------------------+
|	id length (2)
+---------------------------------------------------------------+
|	id  (1 ~ 65535)
+---------------------------------------------------------------+
|	queue length (2)
+---------------------------------------------------------------+
|	queue (1 ~ 65535)
+---------------------------------------------------------------+
|	result length (2)
+---------------------------------------------------------------+
|	result (1 ~ 65535)
+---------------------------------------------------------------+
| OK | count (4)
+---------------------------------------------------------------+

//pull response:
+---------------------------------------------------------------+
|	action (1)
+---------------------------------------------------------------+
|	id length (2)
+---------------------------------------------------------------+
|	id  (1 ~ 65535)
+---------------------------------------------------------------+
|	queue length (2)
+---------------------------------------------------------------+
|	queue (1 ~ 65535)
+---------------------------------------------------------------+
|	result length (2)
+---------------------------------------------------------------+
|	result (1 ~ 65535)
+---------------------------------------------------------------+
|	 | body length (2)
+    -----------------------------------------------------------+
|	 | body (1 ~ 65535)
+ OK -----------------------------------------------------------+
|    | duplications (2)
+    -----------------------------------------------------------+
|    | messageid (8)
+---------------------------------------------------------------+


3. HTTP 请求

字符编码: UTF8
元素与JSON结构一致,但在请求时除rpush/lpush时的body以外,其它所有元素均以Queue String方式进行传递。
返回结果与同JSON结构
以.json后缀的接口,返回JSON
以.bin 后缀的接口,返回二进制数据

PATH METHOD REMARKS
/queue/rpush.json?queue=&requestid=
/queue/rpush.bin?queue=&requestid=
POST application/octet-stream
内容为消息实体BODY
/queue/lpush.json?queue=&requestid=
/queue/lpush.bin?queue=&requestid=
POST application/octet-stream
内容为消息实体BODY
/queue/pull.json?queue=&requestid=
/queue/pull.bin?queue=&requestid=
GET 拉取一个消息
/queue/delete.json?queue=&requestid=
/queue/delete.bin?queue=&requestid=
GET 删除一个已拉取消息
/queue/lcancel.json?queue=&requestid=
/queue/lcancel.bin?queue=&requestid=
GET 恢复一个已拉取消息至队列左侧
/queue/rcancel.json?queue=&requestid=
/queue/rcancel.bin?queue=&requestid=
GET 恢复一个已拉取消息至队列右侧
/queue/count.json?queue=&requestid=
/queue/count.bin?queue=&requestid=
GET 获取一个队列元素数
/queue/clear.json?queue=&requestid=
/queue/clear.bin?queue=&requestid=
GET 清空一个队列元素

4. Websocket

Binary ws://{host}:{port}/queue/bin
Json  ws://{host}:{port}/queue/json

同一个ws连接允许同时多个PULL请求。

5. RequestID 描述

协议中 RequestID 为请求标识,不做为消息标识,该标识由使用者自行生成,可以是1-32位字符,除PULL以外的其它所有命令均允许该体重复。

在PULL时若使用websocket则必需保证同一连接中不重复,常在同一连接中使用递增数做为标识。若使用http则需保证http全局不重复。 不能重复是因为PULL的标识需在要delete/lcancel/rcancel时做为识别标识使用。

6. Action 描述

编码如下

const byte LPUSH = 1;
const byte RPUSH = 2;
const byte DELETE = 3;
const byte PULL = 4;
const byte CLEAR = 5;
const byte COUNT = 6;
const byte LCANCEL = 7;
const byte RCANCEL = 8;
const byte CREATEQUEUE = 9;
const byte DELETEQUEUE = 10;
const string OK = "ok";

7. 消息事务模式

本服务原生支持事务模式, 不支持非事务模式的消息消费,所有消息在消费后必需通过delete命令删除;
若终端在处理业务时认为是一个失败的业务消费可以通过lcancel/rcancel将消息退回至队列的左侧或右侧;
面向连接使用方式,若不进行delete操作,则在连接断开后将自动恢复至队列左侧;
消息每被退回后再次PULL一次则duplications将为递增1,通常可以通过该值来判断该消息是否为被初次消费,若不是被初次消费则应检查重复消费所导致的脏数据或重复数据,如:不应产生两次相同的订单或扣款。

 

此条目发表在adf分类目录,贴了, 标签。将固定链接加入收藏夹。