Apache Camel-聚合不同的对象类型

最后发布: 2014-05-28 08:07:12


问题

我有一条路由,启动后会从两个数据源(routeA和routeB)中请求消息,并将它们聚合为一条消息。 每个聚合的消息必须恰好包含一个routeA消息和一个routeB消息,如果没有,则将其丢弃。

此过程必须在特定的时间间隔(即每5分钟)启动。

我的问题是,如何让聚合者知道必须丢弃来自routeA和routeB的所有已处理消息以及未找到它们的消息?

我目前正在使用completionTimeout功能,但是出于明显的原因,我不喜欢此解决方案。

我知道骆驼具有completeFromBatchConsumer功能,但是我不知道如何将其与多个数据集一起使用。

非常感谢任何建议。

这是我现在所拥有的:

<!-- main route -->
<route id="main">
<camel:from uri="timer://timer1?period=20000"/>
<multicast>
  <to uri="direct:startA"/>
  <to uri="direct:startB"/>
</multicast>
</route>

<!-- messages from route A -->
<route id="routeA" />
<from uri="direct:startA" />
<to uri="sql:select * from sampleDB?dataSource=ds"/>
<split>
   <simple>${body}<simple>       
   <marshal ref="ObjectAJsonConverter"/>
   <unmarshal ref="ObjectAJsonConverter"/>
   <to uri="bean:myProcessor?method=addObjectACorrelationKey"/>
   <to uri="seda:myAggregator"/>
</split>

<!-- messages from route B -->
<route id="routeB" />
<from uri="direct:startB"/>
<to uri="ldap:ldapcontext?base=DC=company,DC=net"/>
<split>
   <simple>${body}<simple>       
   <marshal ref="ObjectBJsonConverter"/>
   <unmarshal ref="ObjectBJsonConverter"/>
   <to uri="bean:myProcessor?method=addObjectBCorrelationKey"/>
   <to uri="seda:myAggregator"/>
</split>

<!-- aggregate the messages, create new ObjectC that contains ObjectA and ObjectB -->
<!-- wait 200000 ms for all messages from routeA and routeB to enter the aggregator -->
<route id="aggretatorRoute">
<from uri="seda:myAggregator"/>
<aggregate ref="myEntityAggregator" completionSize="2" completionTimeout="200000" discartOnCompletionTimeout="true" ignoreInvalidCorrelationKeys="true">
  <correlationExpression><simple>${in.header.objectid}</simple></correlationExpression>
  <to uri="bean:myProcessor?method=doSomethingWithObjectC"/>
</aggregate>
apache-camel aggregate
回答

您只能在AggregationStrategy仅聚合ObjectA之一和ObjectB之一。 因此,如果您看到其中任何一个的第二个,则不要将其汇总。 如果您想放弃目前为止所做的一切,则可以通过设置

exchange.setProperty("CamelRouteStop", true);

然后,如果您想立即删除它,则添加一个completementPredicate,它检查是否已设置该停止点。

<completionPredicate><simple>${property.CamelRouteStop} == true</simple></completionPredicate>

对于correlationExpression,您可能只使用<constant>true</constant>因为您似乎只在一组上工作。


回答

谢谢克劳斯。 实际上,我采取了另一种方法。 现在,我不再使用聚合器来连接三个不同的对象,而是查询一个ID列表,并使用这些ID逐步构建我的复杂对象。

<route id="composeObject">
  <from uri="sql:select id from people?oneSource">
   <split><simple>${body}</simple>
    <to uri="direct:getobjectOne"/>;
    <to uri="bean:addToComplexObject"/>

    <to uri="direct:getObjectTwo/>
    <to uri="bean:addToComplexObject"/>

    <to uri="direct:getobjectThree/>
    <to uri="bean:addToComplexObject"/>

    <to uri="seda:outChannel"/>
  </split>
</route>