Flex SDK provides an API for publish/subscribe operations through the Consumer/Producer classes. At the same time MSMQ is a robust messaging system enabling publish/subscribe message exchange paradigms for Windows/.NET environments. Integration between Flex pub/sub and MSMQ is powerful combination, but it is not clear how the two can integrate.
The solution is to use a .NET infrastructure environment which can understand the messages sent by Flex Producer/Consumer APIs and enable message exchange through MSMQ. WebORB for .NET enables such integration. The recipe review the code and the configuration required for the integration.
The recipe reviews a scenario where messages are published by a native Windows (.NET) application into MSMQ. Once a message is published, it is immediately delivered to all Flex consumers connected to the queue. The diagram below demonstrates data flow in the example. The 'Message Queue Publisher' is a native Windows application with access to a MSMQ queue. It publishes a message (.NET complex type) into a queue. WebORB subscribes to the same queue on behalf of the connected Flex clients. As soon as a message is placed in the queue, WebORB delivers it to all Flex clients as an ActionScript object:


When the timer is running, it publishes a message into a queue on each timer interval. Each message is a serializable object (instance of the Car class):
private void publishTimer_Tick( object sender, EventArgs e
)
{
// create a new object instance
Car car = new Car();
// populate instance properties
car.Make = "TurboQ-" + messageCounter;
car.Model = "MSMQ-2007-" + messageCounter;
car.Mileage = messageCounter;
messageCounter++;
// display the number of published message in the label
counterLabel.Text = messageCounter.ToString();
//publish message into the queue
myMessageQueue.Send( car );
}
myMessageQueue object. It is configured as a control
added to the application from the Visual Studio Toolbox:
namespace ORBExamples
{
[Serializable()]
public class Car
{
private string make;
private string model;
private long mileage;
public string Make
{
get { return make; }
set { make = value; }
}
public string Model
{
get { return model; }
set { model = value; }
}
public long Mileage
{
get { return mileage; }
set { mileage = value; }
}
}
}
mx.messaging.Consumer or
weborb.messaging.WeborbConsumer API to receive
messages from a queue. When a message is published in a queue,
WebORB receives it and routes to the client. WebORB exposes
MSMQ queues as messaging destinations. A queue must be
configured as a destination in messaging-config.xml located in
the [WEBORB-INSTALL]/WEB-INF/flex directory. The queue used by
the Windows Publisher application shown above is
.\private$\weborb-mqpush. To configure the queue
as a Flex messaging destination, the following XML must be
added to messaging-config.xml:
<destination id="mqtoflex">
<properties>
<msmq>
<path>.\private$\weborb-mqpush</path>
<!-- zero means 'do not send past messages'.
Set to -1 to receive previously sent messages -->
<deliverPastMessages>0</deliverPastMessages>
<BasePriority>0</BasePriority>
<Category>00000000-0000-0000-0000-000000000000</Category>
<Label>WebORB MessageQueue for Messages from Windows
client</Label>
<MaximumQueueSize>4294967295</MaximumQueueSize>
<UseJournalQueue>false</UseJournalQueue>
<MaximumJournalSize>4294967295</MaximumJournalSize>
</msmq>
</properties>
<channels>
<channel ref="weborb-rtmp"/>
</channels>
</destination>
mx.messaging.Consumer or
weborb.messaging.WeborbConsumer API to
subscribe to the messaging destination.MessageEvent.MESSAGE event:
import
weborb.messaging.WeborbConsumer;
import mx.messaging.events.MessageEvent;
[Bindable]
private var car:Car;
private var consumer:WeborbConsumer;
public function doConnect():void
{
consumer = new WeborbConsumer();
consumer.destination = "mqtoflex";
consumer.addEventListener( MessageEvent.MESSAGE, receivedMessage
);
consumer.subscribe();
}
public
function receivedMessage( event:MessageEvent ):void
{
// store the value in a local 'bindable' variable
// UI will display properties through the data binding
car = Car( event.message.body );
}
//
init is called through the creationComplete event
public function init():void
{
// map server-side to the client-side type
// ORBExamples.Car is a full type name of the class
// of the objects sent by the server
// Car is a client-side class the remote objects
// should be mapped to
flash.net.registerClassAlias( "ORBExamples.Car", Car );
}
package
{
[Bindable]
public class Car
{
public var Make:String;
public var Model:String;
public var Mileage:Number;
}
}
using System;
using System.Collections.Generic;
using System.Text;
using Weborb.V3Types;
namespace Weborb.Messaging.V3
{
public interface IMessageSelector
{
/**
* Passes the selector value from consumer.
* Consumer instances can assign a value to the
* 'selector' property of the Consumer class
*/
void setClientSelectorValue( String selectorValue );
/**
* Process a message originally sent by a Flex publisher.
* Return null to cancel message delivery.
*/
Object processClientMessage( V3Message message );
/**
* Process a message sent from a non-Flex client.
* Return null to cancel message delivery.
*/
Object processServerMessage( Object message );
/**
* Set the id of a client associated with this selector
*/
void setClientID( String clientID );
}
}
using System;
using Weborb.Messaging.V3;
using Weborb.V3Types;
namespace ORBExamples.Messaging
{
public class CustomSelector : IMessageSelector
{
private string clientID;
public void setClientSelectorValue( String selectorValue )
{
}
public object processClientMessage( V3Message message )
{
// no filtering for messages published from Flex
clients
return message;
}
public object processServerMessage( object message )
{
Car carObj = (Car) message;
// rollback a few miles.
// don't tell anyone we did it:
carObj.Mileage /= 2;
return carObj;
}
public void setClientID( string clientID )
{
this.clientID = clientID;
}
}
}
<destination id="mqtoflex">
<properties>
<selector>ORBExamples.Messaging.CustomSelector</selector>
<msmq>
<path>.\private$\weborb-mqpush</path>
<!-- zero means 'do not send past messages'.
Set to -1 to receive previously sent messages -->
<deliverPastMessages>0</deliverPastMessages>
<BasePriority>0</BasePriority>
<Category>00000000-0000-0000-0000-000000000000</Category>
<Label>WebORB MessageQueue for Messages from Windows
client</Label>
<MaximumQueueSize>4294967295</MaximumQueueSize>
<UseJournalQueue>false</UseJournalQueue>
<MaximumJournalSize>4294967295</MaximumJournalSize>
</msmq>
</properties>
<channels>
<channel ref="weborb-rtmp"/>
</channels>
</destination>
+