`
tyny
  • 浏览: 74454 次
  • 性别: Icon_minigender_1
  • 来自: 黄冈
社区版块
存档分类
最新评论

Rhino.Queues随笔6 消息队列构造

阅读更多

至此已经对Rhino.Queues队列有一个大概的了解,为了更进一步了解队列,下面模拟一次消息队列的设计过程,如有错误,敬请指教。

“消息”是在两台计算机间传送的数据单位。消息可以非常简单,例如只包含文本字符串;也可以更复杂,可能包含嵌入对象。消息被发送到队列中。“消息队列”是在消息的传输过程中保存消息容器。消息队列管理器在将消息从它的源中继到它的目标时充当中间人。队列的主要目的是提供路由保证消息的传递;如果发送消息时接收者不可用,消息队列会保留消息,直到可以成功地传递它。(引用自百度百科)

无论从消息队列的定义,还是从基本的输入输出,可以简单的得到一个基本的概念,即消息队列让我们只需关注消息的发送和接收,至于消息中间的中继过程、路由及传递,无需关心。至于消息的存储只是为了消息的稳定性,保证消息完整的传递,否则一旦机器退出,所有在内存的消息全部消失。同样类似于数据库等的处理方式,在发送和接收的过程中需要事务的配合,这样基本保证了消息的一致性。

如此一来可以把消息队列当做一个负责消息发送和接收的黑盒,消息发送方只需要指定消息的发送目标发送消息即可,至于消息队列内部什么时候怎样把消息顺利得送到他的目的地就不需要发送方的关心了;同样消息接收方只需要等待接收消息就行,至于消息什么时候怎样路由传递到目的地也无关紧要了(哪怕是经历了多少个不同网络的队列的路由传递)。所以消息的源和他的目的物理位置就显得无关紧要了。这样,消息队列屏蔽了物理(包括机器也可以在同一个机器的不同程序)上位置的差异,同时,他也屏蔽了时间上的差异,即源只需要关注发送消息以及是否发送成功,至于究竟消息队列什么时候把消息发送到目的地、消息接收方什么时候从消息队列接收消息处理,发送方无需关心(当然不是不关注数据)。这种空间和时间上的分离,使得利用消息队列中间消息的路由机制进行消息的信息集成分类和均衡负载相对容易的多(很多EAI就是借助消息队列)。正是消息队列的这种特这,他的可扩展性非常强。一个典型的消息队列结构如下:

可以看到,发送方和接收方完全不用关心消息队列的中间细节,只需知道和消息队列系统交互即可。

单程序消息队列

先从我们数据结构队列说起,例如C#Queue类,如果不考虑跨程序和宕机带来的内存清空问题,我们暂且可以把他当做一个程序内的简单队列来使用,其实很多时候我们这样使用过。因为在内存中的数据处理,事务性和数据一致性都可以通过简单的加锁来保证。

针对上面开始提出的两个问题,跨程序和宕机带来数据丢失问题,我们现在解决数据清空问题。

为了不让内存中的数据丢失,目前通常的方法就是把数据存到硬盘等永久媒介上,咱们也是如此。目前存储数据的方式多种多样,基于自己定义格式的存储,借助第三方数据存储。后者即包括目前流行的数据库存储方式,而其中最流行的莫过关系数据库,近年来的nosql也相当不错。为了不过多的分散精力在存储方面,同时顾及到后面数据的事务控制,我们先选择sqlite,一个嵌入式数据库,可以包括到发布程序内。至此,我们已经解决了存储方式的问题,后续的问题是实现队列的基本操作EnQueueDeQueue,在EnQueue中,我们把消息存到数据库,在DeQueue中,我们删除我们取走的消息,同时使用guid产生每个消息的唯一编号来标识消息。这样某种程度上解决了消息持久化问题。 

 

using (TransactionScope scope = new TransactionScope())

{

    //准备发送或者接收

    //发送或接收操作EnQueue和DeQueue 

    //处理发送和接收数据

}


    实现了存储问题,还有个问题也相应的浮出水面。即EnQueueDeQueue的事务问题,无论MSMQ还是IBM MQ的使用,他们都是在事务内部的,例如

只有外部事务提交时,事务才会提交;或者回滚时,我们内部实现回滚。这个问题在存储队列里面不是特别明显,但是涉及到存储,他的影响才突出出来。即怎样保证保证内部事务和外部事务的一致性,通用的处理方式即为2PC(两阶段提交协议)和SPC(单阶段提交)。针对目前消息队列的简单性,选择资源略低的SPC,即实现ISinglePhaseNotification接口,在实现里面负责事务的提交或者回滚。使用流程可以参考微软的相关说明,这里简单说明一下,在EnQueueDeQueue里面检查是否存在当前上下文事务,如果不存在,抛出异常,如果存在,在开始sqlite事务时,使用sqlite事务实例化ISinglePhaseNotification接口的实现(QueueSendSinglePhaseNotification),然后把他注册到当前上下文事务中,具体的插入或者删除消息后,并不直接提交或者回滚,而是在QueueSinglePhaseNotification里面负责具体事务的提交和回滚。借助数据库的事务管理简单易用,我们不用实现太多的事务控制,但是这样做有个不好的结果,即sqlite数据库的事务很难控制了,如果使用者迟迟没有提交事务或者回滚事务,数据库的事务会越积越多;同时这样消息队列和底层存储耦合太严重。这也是在研究Rhino.Queues时,无意中发现的他的处理似乎很特别。发送消息大概流程如下:

1、检查是否当前上下文是否处于事务中。如果是,进入下一步;如果不是,抛出异常;

2、打开数据库连接准备存储消息;

3、启动连接的数据库事务,同时使用消息的唯一标识符初始化QueueSendSinglePhaseNotification,然后注册到当前的上下文事务中;

4、开始存储消息,略有不同,这里消息的状态为未准备发送;

5、提交事务

 

这里可以发现他并没有使用数据库事务初始化QueueSendSinglePhaseNotification,而是用消息的id来初始化,同时最后他是提交事务了,并没有留到QueueSendSinglePhaseNotification去实现。那么QueueSendSinglePhaseNotification具体做了什么呢,可能大家都注意到了,第四步里面多了一个消息的状态,存储的的时候,他的状态是未准备好发送,根据消息id,提交时,他的状态会更新为准备好发送;如果回滚,删除相应的消息即可。但是最终结果他们别无二致。

对于消息发送暂且谈到这里,我们来看看消息接收过程,同样基于SPC,我们实现一个QueueAcceptSinglePhaseNotification,接收和发送类似,前面是只要接收,先取出消息,然后删除消息,这个步骤在一个事务控制之内。现在由于需要跟上下文事务一致,同样和发送消息一样,我们也可以利用数据库事务初始化QueueAcceptSinglePhaseNotification,然后再他的具体函数里面实现提交和回滚,正如上面提到的,这样的设计很容易失控,即接收操作可能很复杂,这个事务会一直挂在那;同时把事务分在两个地方控制也很危险,万一遗漏,后患无穷。所以仿照发送流程的状态,我们在接收的是否也分成两个步骤,第一个步骤即在一个数据库事务内,取出待接收消息,更新相应的状态为准备接收,然后提交事务。然后在QueueAcceptSinglePhaseNotification的实现里面,根据消息id,提交里面删除相应的消息,或者回滚里面把消息重新置为原始状态。

目前数据库表结构就一个,结构如下:

列名

描述

Id

消息编号

Data

消息数据

Status

消息状态(准备发送,已发送、准备接收、已接收)

这里面只有一个消息队列,发送方和接收方可以多个,但是都是从一个消息队列进出。

为了能区分不同的消息队列,我们可以先加入一个名称字段用于区别不同的消息队列,这样发送方可以根据队列名称发送给指定的队列,接收方可以根据队列名称接收指定队列的消息。

列名

描述

QueueName

队列名称

Id

消息编号

Data

消息数据

Status

消息状态(准备发送,已发送、准备接收、已接收)

暂时单程序队列告一段落。

跨程序消息队列

上面提到的另一个问题即是消息队列在同一个机器的不同程序间的使用。

按照目前的方式,因为底层数据库本身就是多用户操作的,所以,如果没有太高的要求,只要涉及底层的数据存储在同一个数据库即可达到跨程序共享的层次;

跨机器消息队列

跟上面跨程序一样,他们面临着数据传递的问题,如果在一台机器上,公用一个数据库算是可以稍微满足这个要求,但是如果不是在一台机器上呢,可能提到网络数据库mysqlsql serveroracle等,当然不是完全不行,但是这种中心化得存储方式似乎与消息队列一个特点分布式背道而驰。应该是每个消息队列都是相互对立且彼此分离的,这样他完全可以独立运行,即使没有接收者也照样可以发送消息。所以通过网络数据库共享不太合适。如果数据不能通过网络数据库共享,我们得另想办法。

从上面消息队列的概念可以看出,上面的简单的消息队列可以采用同步的方式把消息存在本地等待接收处理,但是如果接收消息队列处于另一台机器上呢。我们可以通过网络把消息发送给目标消息队列。但是前面我们已经提到过,消息的发送和接收是分开的,所以发送尽可能快,一般都是基于异步的方式发送消息,使得发送方可以尽快发送消息。但是消息队列不光是异步就可以搞定的。因为即使目标方不存在,消息也可以发送,他唯一需要确定的是消息发送成功,即消息发送到到中间消息队列内部即可。这就带来一个疑问,消息队列属于分布式存储,所以存储可以存在任何一个机器上,即可以存在任何一个机器上面,即可以存在任何一个流程上存在的机器上,类似于工作流的状态的持久化,而且在中间任何两个节点间的状态转化都是事务性的,了解工作流的可能都了解这一点。所以伴随着消息的传递过程,每个节点都有需要保存,知道转向下一个节点(机器)。

由此,我们发送消息时只是激发一个消息状态,即发送状态,而不用关心后续状态的转化过程的,到达发送状态后,发送状态转向到接收存储状态,即通过网络发送到接收端消息队列,这个过程是事务保证的。Rhino.Queues里面使用的类似于tcp三次握手的方式保证这个事务。然后接收状态转变为已接收,即接收方接收消息,消息即到达终点,这个过程也是事务的。这个消息队列于前面的简单的消息队列的差别在于,发送状态到接收状态的区分,在单机上,基本不存在发送状态,因为消息可以很快直接的进入接收状态,但是一旦基于不同的机器,从发送到接收状态的转换可不那么容易了,极端点就可能永远不能转换到接收状态。但是发送方依然可以发送消息激发发送状态,所以发送消息可以把消息保存到本地,然后合适的时候发送到接收方的消息队列。流程如下:

 

 

发送的消息是存在本地的机器上面的消息发送数据表中的,类似于上面简单消息队列的,发送表的结构类似。

列名

描述

QueueName

队列名称

SendAt

存入时间

SendedAt

成功发送时间

Id

消息编号

Data

消息数据

Status

消息状态(准备发送,已发送)

为了检测消息队列的发送状况,加入了两个时间参数,即存入时间和发送时间,如此一来发送基本好了。

接收端的消息接收也可以存入一个接收消息队列。

列名

描述

QueueName

队列名称

Id

消息编号

AcceptAt

接收时间

AcceptedAt

接收处理时间

Data

消息数据

Status

消息状态(接收、准备已接收、已接收)

发送方和接收方的区别不大,包括事务控制和状态,这里需要特别注意发送队列和接收队列之间状态的转换,包括消息的传递。除了消息的传递,其中很重要的是保证这个过程的完整性,即要么发送成功要么发送失败。流程如下所示:



 

 

可以看到目前大概有三个状态的转换,第一个状态转换在发送端机器上进行,第二个在发送端机器和接收端机器上进行,第三个状态转换在接收端上进行。这几个状态的转换过程都是基于事务。第一个和第三个无需多说,单阶段协议保证,上面提到过,如需进一步了解,可以查询资料,后续文章会接着谈到这问题。需要着重提出的第二个状态转换过程,因为以前没了解过事务底层的具体算法实现,了解起来相当不容易。但是这里同样加入了一些中间状态打到这个目标。流程如下所示:


 

因为对于底层事务实现不是很了解,网上寻找了相关资料,一般的解释是在第一阶段完成复杂的数据处理操作,第二阶段只是简单的负责把状态修改为“提交”(预期的状态)即可。

另外在Rhino.Queues实现通讯时,使用了一个异步辅助框架(Wintellect.Threading),据称微软也出了类似的框架(并行与协调运行时(Concurrency and Coordination RuntimeCCR)),有时间会在接下来的章节探讨。

  • 大小: 18.1 KB
  • 大小: 39.2 KB
  • 大小: 37.5 KB
  • 大小: 86.2 KB
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics