Event Channeling Mechanism
Event Channeling Mechanism (Channel)
Another utility offered by the MadgikCommons library is the Event Channeling mechanism. In a distributed environment, a lot of scenarios can make use of a mechanism that offers them the possibility to open a communication channel among two or more entities to publish information that will travel to all of the channel partners.
The Event Channeling mechanism offers this functionality through a distributed event mechanism. A single entity initiates a channel and publishes its existence. This point acts as a sink for the events the other entities emit and as a broker to reproduce the incoming events to the remaining entities. Other entities that receive the identifying object pointing to the created channel connect to it and can from then on receive events that the rest of the entities emit, but also emit their own events.
Channel Events
As the Event Channeling mechanism bases its functionality on the brokerage of events, a fundamental concept of the mechanism is the ChannelEvent. The events declared here are the ones that define the flow of control governing the lifetime of a channel, its creation, brokerage, synchronization and destruction. Each record pool has associated a ChannelState class in which the events that the channel can emit are defined and through which an interested party can register for the event. The ChannelState keeps the events in a named collection through which a specific event can be requested.
The event model used is a derivative of the Observer – Observable pattern. Each one of the events declared in the package and stored in the ChannelState event collection extends java Observable. Observers can register for state updates of the specific instance of the event. This of course means that there is a single point of update for each event which automatically raises a lot of issues of concurrency, synchronization and race conditions. So the framework uses the Observables for Observers to register with, but the actual state update is not carried through the Observabe the Observers have registered with, but rather as an instance of the same event that is passed as an argument in the respective update call. That way the originally stored Observable instances in the ChannelState act as the registration point while a new instance of the Observable that is created by the emitting party is the one that actually carries the notification data.
All events extend the ChannelStateEvent which in turns implements the Observable interface and the ISerializable . The later is used during transport to serialize and deserialize an event and is explained in later sections. Events containing payload information that the client wants to emit to registered entities, extend the ChannelPayloadStateEvent which in turn extend the base ChannelStateEvent. The events defined and are the bases of the framework are the following:
- DisposeChannelEvent – This event is emitted when the channel is no longer needed and signals the disposal of the channel and all associated state. Since more than one entities, and not limited to two can be part of a single channel, any of these entities can cause the DisposeChannelEvent.
- BytePayloadChannelEvent – This event can be emitted by any of the channel's nozzles and be carried to all the rest of the participating nozzles. It carries with it a payload of a byte array that can be set on the emitting point.
- StringPayloadChannelEvent – This event can be emitted by any of the channel's nozzles and be carried to all the rest of the participating nozzles. It carries with it a payload of a string that can be set on the emitting point.
- ObjectPayloadChannelEvent – This event can be emitted by any of the channel's nozzles and be carried to all the rest of the participating nozzles. It carries with it a payload of an object that can be set on the emitting point. The object payload must implement a needed ISerializable interface that declare how the object can be serialized and deserialized in case over the wire transportation is needed.
Proxies and Registry
The two main actors in an Event Channeling mechanism scenario is the entity that creates an event sync / broker (inlet nozzle) and a number of entities that can register to this and consume events other entities emit but also emit their own events (outlet nozzle). The inlet creates a channel that the outlet needs to access. The two main cases one may distinguish between in this scenario is an outlet that is collocated with the inlet in the same host and the same VM in which case they share a common address space. The second case is that the two actors do not share the address space. This may mean that the inlet and outlet are running in a different VM either in the same or in a completely different host. The inlet and outlet should not be forced to make this distinction in their code. The Event Channeling mechanism frees them from handling the two cases differently and allows them to have the same behavior in their code regardless of the other party location. This is achieved through the proxy concept.
A proxy is in fact a mediator between the two actors. Different types of proxies can handle different type of communication, can be initialized with different synchronization protocols, and use different underlying transport technologies. All this are known to the client only at a declarative level in case he may want to have more control over the mechanics of the framework. Different implementations if the proxy semantics can be provided by implementing the IChannelProxy interface. Every proxy implementation will still need to provide a marshalable identification mechanism which can be send over to an outlet side which will in turn use it to initialize a new proxy instance that will mediate on his side the channel synchronization. This object is an implementation of the IChannelLocator interface. Implementations of this interface are capable to identify uniquely a channel. They must also hold enough information to contact the inlet side proxy that mediates the instantiating side of the mechanism. This implies information on the protocol used, and any protocol specific information needed by the outlet side proxy.
In any case of either collocated and remotely located inlets and outlets, there is always the case where a channel needs to be identified within the address space of the VM that it was created. To do so, since the identification token needs to be serializable and movable to a remote location a java reference is not adequate. There needs to be a different construct that will identify uniquely the channel and through it the channel itself can be retrievable. This construct in the Event Channeling mechanism is the ChannelRegistry class. The ChannelRegistry is a static class unique within the context of a VM in which channels can registered, assigned within a unique id and discoverable through it. Whenever a inlet wants to make the channel it is creating available for consumption trough a proxy, it registers the channel and the proxy through which the channel will be available with the ChannelRegistry class. The registration procedure produces a registry key consisting of a UUID. The channel and associated proxy is stored in the registry for future reference through the registry key and the produced registry key is provided back to the registration procedure caller to use it to produce the locator that will be able to identify the channel through it. The registry construct needs also be registered to some of the channel events so that it will be able to perform its own cleanup once the channel is disposed.
The ChannelRegistry is also the place where additional information on the function of each channel it stores is kept. This information includes the number of outlets that can be connected to a single channel. Since every time an outlet needs to get connected to a channel it must pass through the ChannelRegistry, this is also the policy enforcing point on the number of outlets that can be connected to a single channel depending on the initialization configuration. Additionally, the ChannelRegistry keeps track of the method each outlet uses to connect to each channel as well as their identity. This way proper cleanup can be performed, but also in cases of broadcasting like behavior, the whole set of outlets connected to a single inlet can be discovered.
Local Proxy
A common case in a dynamically distributed environment scenario is the one in which the entities that need to communicate are collocated, running within the boundaries of the same VM, possibly in different threads if their execution is concurrent or even serially one after the other. In such a situation it is obvious that for performance and for resource utilization reasons one would prefer to be able to access directly through the shared memory address space the events the inlet is emitting or brokering within the outlet.
The Event Channeling mechanism design and operation facilitates its usage as a shared event registration and brokerage point. The synchronization of inlet and outlet parties through events makes transparent for both actors the actual location of their counterpart. In this environment the only thing missing to enable optimized local consumption of a channel is a way to pass to the outlet a reference to the channel handled by the inlet. To do so in a manner that would hide from the two actors the actual location of each other and not force them to apply any kind of logic to distinguish between the two cases, the mechanism's registration and mediation classes comes in.
Once the inlet is ready to start its function as an event sync and brokerage point, it will register its channel with the ChannelRegistry class as described in previous sections. For the registration step an IChannelProxy implementing instance must be provided. Should the outlets that will need to contact the created channel need to be restricted to being collocated with the inlet (this knowledge is assumed to exist somewhere in the system, quite possibly to the component orchestrating the execution) an instance of the LocalChannelProxy class will be provided for the registration step. During the registration process the created channel will be assigned to the LocalChannelProxy class. From then on the proxy can be queried to produce an IChannelLocator instance. The LocalChannelProxy class will produce an instance of the LocalChannelLocator class. This locator can me serialized and deserialized to produce a new LocalChannelLocator instance that can still identify the referenced channel through the ChannelRegistryKey that was assigned to the channel and set to the LocalChannelProxy. This locator can in turn be used to create a new LocalChannelProxy instance and used to instantiate a new outlet that can connect to inlet's channel. The channel used will be a reference to the one produced since the LocalChannelProxy can lookup the ChannelRegistry to retrieve the stored channel associated with the LocalChannelLocator stored registry key.
TCPServer Proxy
The general case in a dynamically distributed environment scenario is to be unaware of the location of the two actors or specifically know that they are located in different hosts or in general different VMs. This means that no in memory reference can be shared. Connected nozzles must communicate externally to be able to synchronize their operation and exchange events.
As has already been described in the LocalChannelProxy synchronization scenario the outlets and inlets of a channel are already able to synchronize based on events emitted through the channel. In the protocol section we will illustrate how the events and data that are passed through a channel can be transferred and mirrored to a remote copy of the channel. The only thing left to enable remote production and consumption is a technology to transfer the protocol packets. Multiple protocols can be used and in this section the case of a TCP connection use case will be illustrated. Since the inlet and outlets need not be aware of any of the underlying technology details, the proxy and registration classes of the framework are the ones managing all needed technology specific details.
As in the case of the LocalChannelProxy the inlet at some point needs to share his event aggregation channel with a number of outlets. He will again go through the registration procedure but this time instead of a LocalChannelProxy proxy instance, a TCPServerChannelProxy instance is passed to the registration method. The first time a producer creates an instance of the TCPServerChannelProxy within the boundaries of a VM, a TCPTransportServer instance is created following the singleton pattern. From then on any instance of the TCPServerChannelProxy is directed to this server to manage its communication needs. The port the server listens to is configured through the already available configuration class of the proxy instance and after initialized, all traffic will pass through this port.
Once the registration process is completed by the inlet, the inlet's side proxy, transport mechanism and protocol are all ready to accept incoming calls. The proxy is in a position to create a valid IChannelLocator instance which for the TCPServerChannelProxy will be a TCPChannelLocator. The TCPChannelLocator contains the referenced channel RegistryKey, the hostname the producer is running on and the port the server is listening to. This locator can be serialized and deserialized into a new TCPChannelLocator that can be used to initialize an outlet to access the remote channel through a newly initialized trasnsporting protocol mechanism created from the TCPServerChannelProxy of the outlet.
The server uses the ChannelRegistry to keep and update a mapping between served channels and open sockets. Upon request it can provide interested parties with the sockets that are associated with the channel they are mediating for through the ChannelRegistry. Upon disposal of the channel, the internal state of the ChannelRegistry connected to the specific channel is also purged along with the connected sockets.
To be able to create such a mapping between the channel and the specific socket, the server bases its functionality to the outlet send UUID of the channel it needs to connect to as provided through the locator instance it is initialized with. This information is always send out as clear text along with an identifier that will uniquely identify the connecting nozzle.
Furthermore a vulnerability this implementation has is the way the server expects to retrieve the referenced channel UUID and nozzle identifier to map the opened socket. After opening a socket, the server will block waiting for the consumer to send its channel UUID and nozzle identifier. Until the information is provided, no other outlet will be accepted by the server. This leaves open ground for a denial of service attack but was chosen not to be handled by a separate thread spawned to save resources.
Synchronization protocol
Whenever an inlet and a number of outlets share a channel there needs to be some synchronization between them both to ensure access integrity but also to enable remote consumption. When the inlet and outlets are collocated, the synchronization is done without mediation directly through the events exposed by the shared channel. When the communication is mediated there is a need for a synchronization protocol that will handle the forwarding of locally produced events and their interpretation as well as the marshaling and unmarshaling of events and their payload.
This is the work of the synchronization protocol implementing classes. Specifically there are two classes that participate, the InletProtocol and OutletProtocol, that act on behalf of the inlet and outlet respectively.
Outlet Protocol
First the protocol needs to create a local copy of a channel the inlet is serving. It contacts the InletProtocol sending the channel identifier and the nozzle identifier of the outlet it acts on behalf of. After this the thread enters the protocol loop that is ended once the the local or remote channel is declared as disposed. The three steps of the protocol loop are
- Send any locally produced events that have been produced directly by the mediated nozzle.
- Receive any events that the inlet has to send and forward them to the local channel unless the received event was originally emitted by the mediated nozzle.
- Repeat either when a configurable predefined period has elapsed or the outlet has produced some event.
Inlet Protocol
The protocol acts on behalf of the inlet nozzle and is responsible of aggregating all connected outlet nozzle events, as well as forwarding aggregated events to all connected outlet nozzles as well as the inlet channel. The protocol thread once created, enters a protocol loop whose steps are the following:
- Consults the ChannelRegistry to find the connected outlets to the channel it mediates for.
- Waits for all connected outlet nozzles to be ready to send their produced events
- Receives all the events each outlet nozzle has to send and for each event, if it was not originally emitted by the inlet, it forwards it to the local channel.
- Emits to each outlet nozzle the events that were produced in the local channel since the last iteration as long as the event was not originally emitted by the current outlet nozzle.
The reason the protocol waits for all connected outlet nozzles to be ready before it starts its protocol iteration is to simplify the procedure as otherwise it would need to keep a detailed mapping of all events it has send to each connected outlet. This in cases of long interactions and selective big delays could mean a possibly very big detailed list that would consume a lot of resource. On the other hand the current approach could cause a delay on healthy outlet nozzles because of a single outlet nozzle with bad connectivity. Alternatives to the problem will be investigated in later versions.