Difference between revisions of "GRS2"

From Gcube Wiki
Jump to: navigation, search
(HTTP Protocol)
 
(65 intermediate revisions by 4 users not shown)
Line 1: Line 1:
{|align=right
+
<!-- CATEGORIES -->
|| __TOC__
+
[[Category:Developer's Guide]]
|}
+
<!-- END CATEGORIES -->
 
+
== Introduction ==
== gRS ==
+
=== Introduction ===
+
 
The goal of the gRS framework is to enable point to point producer consumer communication. For this to be achieved and connect collocated or remotely located actors the framework protects the two parties from technology specific knowledge and limitations. Additionally it can provide upon request value adding functionalities to enhance the clients communication capabilities.
 
The goal of the gRS framework is to enable point to point producer consumer communication. For this to be achieved and connect collocated or remotely located actors the framework protects the two parties from technology specific knowledge and limitations. Additionally it can provide upon request value adding functionalities to enhance the clients communication capabilities.
  
Line 15: Line 13:
 
In modern systems the situation frequently appears where the producers output is already available for access through some protocol. The gRS has a very extendable design as to the way the producer will serve its payload. A number of protocols can be used, ftp, http, network storage, local filesystem, and many more can be implemented very easily and modularly.
 
In modern systems the situation frequently appears where the producers output is already available for access through some protocol. The gRS has a very extendable design as to the way the producer will serve its payload. A number of protocols can be used, ftp, http, network storage, local filesystem, and many more can be implemented very easily and modularly.
  
In all producer consumer scenarios the common delimiter is always the synchronization of the two. The gRS handles this synchronization transparently to the clients who can either use the framework's mechanisms to block waiting for desired circumstances or event to be notified when some criteria is met. The whole design of the framework is event based. It is possible to have a producer consumer scenario basing all the interactions of the two clients solely on events that the two exchange attaching custom objects and information. Additionally to making the framework more versatile at its core, events also allow to have a two way flow of communication not only for control events, but also as feedback from the consumer to the producer and vise versa, external to the usual control signals in common producer consumer scenarios.
+
gRS2 component is available in our Maven repositories with the following coordinates:
 
+
<source lang="xml">
=== Overview ===
+
<groupId>org.gcube.execution</groupId>
One can distinguish between two common cases in gRS usage as will be detailed in later sections. The following two overviews depict this two cases.
+
<artifactId>grs2library</artifactId>
 
+
<version>...</version>
[[Image:]]
+
</source>
 
+
 
+
[[Image:]]
+
 
+
 
+
== Records and Record Pools ==
+
=== Record Pool ===
+
The main data store of the gRS framework and main synchronization point is the record pool. Any record pool implementation must conform to a specific interface. Different implementations of the record pool may use different technologies to store their payload. Currently there is one available implementation that keeps all the records stored in an in memory list. The records are consumed in the same order they are produced although consumers can choose to consume them in a random order. Additionally to the list based storage of the records, a map based interface is also provided for easy access of records based on their ids.
+
 
+
A record pool is associated with a number of management observing, monitoring and facilitating a number of operations that can be performed on the pool.
+
 
+
* ''DiskRecordBuffer'' – A disk buffer that can be used as temporary storage for records and fields as will be explained in later sections.
+
* ''HibernationObserver'' – An observer of events of specific type that manage temporal storage and retrieval from a disk buffer. More will be explained in later sections.
+
* ''DefinitionManager'' – A common container class that manages definitions of records and fields associated with the pool as will be explained in later sections.
+
* ''PoolState'' – A class holding events that can be emitted by the pool or forwarded through the pool. More will be detailed in later sections.
+
* ''PoolStatistics'' – A class holding and updating usage statistics of the record pool as will be explained in later sections.
+
* ''LifeCycleManager'' – Policies that govern the lifetime of the record pool can be defined and implemented through this class as will be detailed in later sections.
+
 
+
The pool instantiation is handled internally by the framework and is done through a set of configuration parameters. This parameters include the following information.
+
 
+
* Transport directive – Whenever a pool is transported to a remote location, a directive can be set to describe the transport method preferred for the pool's items. This directive will be used for all its records and their fields unless some record or field has specifically requested some other type of preferred transport. The available directives are full and partial. The former implies that all the records and fields payload will be immediately made available to the remote site once they will be transferred. The later will not send any payload initially for any of the fields unless the field attributes specify that some prefetching is to be done.
+
* Pool Type – The pool type property indicates the type of pool to be instantiated. Since the pool instantiation is done internally through the framework the available pools must be known to the framework to perform their instantiation and handle differently cases that may require different parametrization or configuration. The only currently available pool type is the memory list pool.
+
* Pool Synchronization – When a record pool is consumed remotely to avoid continuous unnecessary communication between remote locations, a synchronization interval can be set. This interval is used by the synchronization protocol as will be explained in later sections. Depending on the protocol used this interval can be used or not based on the protocols implementation and synchronization capabilities. The synchronization hint will be available for the protocols that need to use it.
+
* Incomplete Pool Synchronization factor – During synchronization of remote pools situations will arise where one of the two parties will be requesting information that the other party will not be able to respond immediately to. In such cases one may need to alter the synchronization interval hint to require more frequent or less frequent communication. This is performed through the factor provided through this parameter.
+
 
+
There are two cases that can be distinguished for a record pool consumption. The first is the degenerated one where the producer and consumer are collocated and within the boundaries of VM. This means that they have a shared memory address space and references to shared classes can be used. In this situation for optimization purposes the reference to the authored record pool can simply be passed to the consumer. The consumer sees directly the record pool the producer is authoring, accesses the references the producer creates and no marshaling is performed. The synchronization is performed directly through events producer and consumer emit and there is a single instance of all the pool's configuration and management classes. The second case is that of a different address space. This may imply the same physical machine but a different process, or a completely different host altogether. In that case the reference to the authored record pool cannot be shared and a new instance must be created. In this case a synchronization protocol stands between the two instances of the record pool and based on the produced events handles transparently the synchronization of the two instances. The two instances of the record pool are created with the same configuration parameters and therefor have exactly the same behavior. The mediation of the remote pool and their synchronization is detailed in depth in later sections.
+
 
+
 
+
[[Image:]]
+
 
+
 
+
=== Buffering ===
+
The gRS is meant to mediate between to sides. A producing and a consuming side. Either of the two sides can have inherent knowledge on their processing requirements and their production / consumption needs. A pr4oducer for example may know that in order to produce the next record he will require some processing that takes a curtain amount of time. A consumer may likewise know that the processing of its input will require some amount of time. In order to better optimize their communication they may deem necessary that some amount of data will always be there ready for the other party to have available. The Producer may request that at all times no matter how many records the Consumer has actually requested this far, there will be some amount of records produced by him to be readily available in case the consumer requests for them. The consumer respectively may want to specify that some number of records will already be available to his side whether or not he has already specifically requested them or not so that when he requests them he will not have to wait until they are transferred to him.
+
 
+
[[Image:]]
+
 
+
 
+
This is achieved though buffering policies that may be applicable to either, both or none of the two sides. Each one of the actors can specify the buffering policy it want to have applied. The framework will try to honor the actor's request by having always the number of records specified available for him in case of the Consumer, or already produced in case of the Producer. The Buffer policy is configurable through the ''BufferPolicyConfig'' configuration class. There are three configurable values the configuration class exposes.
+
 
+
* Use buffering – Whether or not the client wants to have buffering enabled or not. If the buffering is disabled the producer will have to specifically request when and how many more items he wants to have delivered to him and wait until they are produced (depending on the Producer behavior) and then transferred to him (depending on the mediating proxy and the producer and consumer relative position)
+
* Number of records to have buffered – The number of records the buffer policy will try to keep for each side. For the Writer side, this means that the buffer policy will emit items requested events to the client to always keep him ahead of the number of items the Reader has requested by the specified number. For the Reader side it means that the policy will request more items from the Writer than the number that the consumer has actually requested. Since while this process takes place the producer and consumer are also active, depending on the clients usage the buffering policy may not be able to be fully complied with. This means that at some point the number of items that the producer will have produced will be greater or less than what the Buffering policy needs, and / or that the consumer will have available more or less items than the ones that the policy needs. The policy will make a best effort attempt to stay within the bounds specified.
+
* Flexibility Factor – Having an absolute number of items as a buffering policy attribute means that there might be cases that a lot of traffic can be produced from the Reader to the Writer and from the Writer to the Producer client. If for example the Reader has requested a buffering of 20 records ahead of what he is consuming, the moment he will request the next record, the request will be served by the buffered records and a request for a new item will be immediately send to the Writer. This means a new event for each reader consumption. Analogous examples can be made for the Writer's side. The flexibility factor allows for some softening of the policy. It can be specified that some percentage of the buffer can be allowed to be emptied without necessarily immediately requesting to be filled. For example a flexibility factor of 0.5 will let the buffer become half empty until an event is emitted to cover the half missing buffer.
+
 
+
Another interesting point is the one of event flooding. The buffer becomes half empty and an event is emitted. Before the needed items are supplied the consumer requests for another record. The buffering policy is consulted and decides that half+1 items are now needed. This process continues and until the initially requested items have been delivered, the Reader has already requested a large number of items quite possibly a great deal far from the number actually needed. To protect it self from this situation, whenever a Reader or a Writer is instructed by the ''BufferPolicy'' to request for some records, he temporarily suspends the buffer until the requested items have been made available. That way until then the client works with the number of items still available in the buffer without producing unnecessary requests until the initially requested number of records have arrived in which case the buffer will recheck what is now needed and may produce a new request event.
+
 
+
 
+
=== Definition Management ===
+
All records that are added to a record pool must always contain a definition. A record definition (''RecordDef)'' contains information that enable the framework to handle the record in a uniform manner regardless of its actual implementation. The contents of a record definition is discussed in a different section. Similarly, every field that may be contained in a record must also contain a field definition (''FieldDef'') for the same purpose.
+
 
+
The common case as expected is that most record pools will contain largely similar in their definition records. Similarly each record is expected largely similar field in their definition. Having even though each record and each field of a record define their own definitions is an obvious waste of resources and an overhead that can be easily dealt with. Each record pool contains a ''DefinitionManager. ''This class is the placeholder of all definitions of records and fields within that pool.
+
 
+
[[Image:]]
+
 
+
Record definitions and field definitions can be added and removed from the manager. Every time a definition is added to the manager, the existing definitions are checked. If the definition is found to be already contained in the manager's collection, a reference counter for this definition is incremented and a reference to this already stored definition is returned to the caller. This reference is then stored in the respective record or field thereby sharing the definition with all other elements that have the same definition. In case a definition must be disposed, instead of destroying the actual definition the operation is passed to the manager which will locate the referenced definition and decrease its reference counter. In case the reference counter has reached 0, the definition can be disposed.
+
 
+
This approach although it serves its purpose very well and very efficiently, has two implications. Firstly, after a record has been added to a pool and its definition shared with the rest of the records available, its definition cannot be modified. This of course although it is a side affect is not really a limitation as this restrictions must be imposed anyway from the framework. After a record is added to a pool it is immediately available to the pool's consumers. Changing any of its properties does not necessarily mean that the consumer will be notified of this change as he might have already consumed it and moved on. The second implication is the need to be able to identify when two definitions are considered equal. Equality is handled at the level of the ''RecordDef'' and ''FieldDef ''classes and is simply a matter of identifying the distinct values of the definitions that make each definition unique.
+
 
+
 
+
=== Pool Events ===
+
The whole function of the gRS is event based. This package is the hart of the framework. The events declared here are the ones that define the flow of control governing the lifetime of a record pool, its creation, consumption, synchronization and destruction. Each record pool has associated a ''PoolState'' class in which the events that the pool can emit are defined and through which an interested party can register for the event. The ''PoolState'' keeps the events in a named collection through which a specific event can be requested.
+
 
+
The event model used is a derivative of the Observer – Observable pattern. Each one of the events declared in the package and stored in the PoolState event collection extends java Observable. Observers can register for state updates of the specific instance of the event. This of course means that there is a single point of update for each event which automatically raises a lot of issues of concurrency, synchronization and race conditions. So the framework uses the Observables for Observers to register with, but the actual state update is not carried through the Observabe the Observers have registered with, but rather as an instance of the same event that is passed as an argument in the respective update call. That way the originally stored Observable instances in the ''PoolState ''act as the registration point while a new instance of the Observable that is created by the emitting party is the one that actually carries the notification data.
+
 
+
All events extend the ''PoolStateEvent'' which in turns implements the Observable interface and the ''IProxySubPacket'' . The later is used during transport to serialize and deserialize an event and is explained in later sections. The events defined and are the bases of the framework are the following:
+
 
+
* MessageToReaderEvent – This event is send with a defined recipient. It can be send by either some component of the framework to notify the consumer client of some condition, such as an error that the consumer may want to be notified for, or from the client of the writer, the producer him self. The message is composed of two parts. One is a simple string payload that can contain any kind of information the sender may want to include. The other part is an attachment. The sender of the object can attach an object implementing the IMarshlableObject interface and the message will be delivered to the consumer along with the attachment object as submitted.
+
* MessageToWriterEvent – This event is send with a defined recipient. It can be send by either some component of the framework to notify the producer client of some condition, such as an error that the producer may want to be notified for, or from the client of the reader, the consumer him self. The message is composed of two parts. One is a simple string payload that can contain any kind of information the sender may want to include. The other part is an attachment. The sender of the object can attach an object implementing the IMarshlableObject interface and the message will be delivered to the produer along with the attachment object as submitted.
+
* ItemAddedEvent – This event is generated by the writers when a number of records are inserted in the record pool. Based on this event the reader can evaluate when there are more records for him to consume.
+
* ItemRequestedEvent – This event is generated by the readers to request a number of records to be produced. Based on this the writer can evaluate when he should produce more records to be consumed.
+
* ProductionCompletedEvent – This event is generated by the writers when they finish authoring the record pool. It can be used by the readers to identify whether or not they can continue requesting more records than the ones already available to them or not. This event does not imply that the producer is disposed. The Writer instance is still available to respond to control signals as they may be needed. The producer can still use the record pool to exchange messages with the consumer.
+
 
+
[[Image:]]
+
 
+
* ConsumptionCompletedEvent – This event is generated by the readers when they have finished consuming the record pool. It can be used by the consumers to stop producing more records and stop authoring the record pool. This event does not imply that the consumer is disposed. The Reader can still be used to access records already made available to it, and the records that have already been accessed can be used. The producer and consumer can still exchange messages.
+
* ItemTouchedEvent – This event is emitted whenever a record is accessed in some way. Whether it is modified, transfered, accessed, some of its payload moved to the consumer from the producer, etc. It is used by the pool life cycle management for pool housekeeping as explained in later section.
+
* ItemNotNeededEvent – This event is emitted by a consumer whenever he wants to declare that he is done accessing the record and all its payload and that the record is no longer needed. It is used by the pool life cycle management for pool housekeeping as explained in later section. After this event is emitted for some record, the specific record is candidate for disposal at any time the framework sees fitting.
+
* FieldPayloadRequestedEvent – The framework supports multiple transfer functionalities and parameterizations. In any but the simple cases of collocated producers and consumers and full payload being transferred immediately across the wire to the consumer, the payload of a record's field may need to become gradually available to the consumer through partial transfer. For a payload part to be transferred, the consumer will need to request explicitly through a call the additional payload or implicitly through the use of one of the frameworks mediation utilities. This request is done through this event which requests for a specific record, a specific field of the record, a number of bytes to be made available. There are two ways that this event can be handled by the framework. The first case is when the producer and consumer are connected through a ''LocalProxy''. In this case both actors are collocated and the payload of every field is already available to the consumer. In this case each writer must be able to catch these events and immediately respond with a FieldNoMorePayloadEvent to indicate to the consumer that all the available payload for the field is already available for him to consume. The other case is to have some other type of mediation between the two actors where there must be some kind of transport between the two actors. In this case the protocol connecting the producer side of the proxy must catch the event, retrieve the needed payload from the respective field and respond with the appropriate event, FieldPayloadAddedEvent or FieldNoMorePayloadEvent depending on whether more payload has been made available or no more payload can be made available as all the payload is already transferred,
+
* FieldPayloadAddedEvent – This event is send as a response to a FieldPayloadRequestedEvent when more data has been made available to the consumer as per request as is sketched above.
+
* FieldNoMorePayloadEvent – This event is send as a response to a FieldPayloadRequestedEvent when no more data can be send to the consumer as per request because all the available data is already there, as is sketched above.
+
* LocalPoolItemAddedEvent – This event indicate when a new item has been added to the local pool. Local to the component that catches the event. For example a reader catching the event will interpret it as new item created by the writer, and made available to the reader side as a response to an ItemRequestedEvent that he send. This event is used by the reader and writer classes to update their buffer policies.
+
* LocalPoolItemRemovedEvent – This event indicate when an item has been retrieved from the local pool. Local to the component that catches the event. For example a writer catching the event will interpret it as an item requested previously by the reader and created and added to the pool by the writer has been transferred to the reader who consumed it. This event is used by the reader and writer classes to update their buffer policies.
+
* ItemHibernateEvent – A record during its lifetime can be considered as needed to be kept for later use by a consumer but for the time being it is not needed and should be somehow hidden to free up resources. For this purpose any record and its included fields which is already contained in a record pool can be hibernated. The record is persisted to disk and can be subsequently waken up in its initial state. When a record is marked to be hibernated this event is emitted and both the pools that may contain it, producer and consumer pool if the pools are remote and synchronized, will catch the event and hibernate the record. This event processing may be asynchronous.
+
* ItemAwakeEvent – At some point a hibernated record may be needed again. At this point both the record in the consumer and in the producer side must be awaken and restored to the exact state that is was before the hibernation. This event is emitted and both pools will resurrect the record and its fields from the disk buffer.
+
* DisposePoolEvent – This event is emitted when the record pool and all its records are no longer needed. The dispose method is called either because the producer decided that he wants to revoke the produced record pool or because the reader decided that he is done consuming the record pool. Since the communication is point to point, once the consumer stops consuming, the record pool is fully disposed. The record pool disposal disposes each record, and if the record is hibernated it wakes it up before it is disposed to make sure each field performs its cleanup. Once this event is emitted all state associated with the record pool is cleaned up whether it is data within the record pool it self or registry entries, open sockets, etc
+
 
+
=== Record Hibernation ===
+
A record pool may grow to large sizes. There is practically little to limit the size of a record pool that a producer may need to create. There are however limitations imposed by the available resources. The gRS makes an attempt to facilitate the creation and consumption of big sets of records with as less resources as technically possible. One aspect of the approach is through the record hibernation facility.
+
 
+
Records that are consumed but should not be disposed of yet, marking them as not needed through an ItemNotNeededEvent, can be set in a state of minimum memory requirements. The record and its fields can then whenever they are needed again to become active once more and reset to their exact previous state ready to be used again. This state is hibernation.
+
 
+
[[Image:]]
+
 
+
 
+
Each record pool has an associated ''DiskRecordBuffer''. Items of the pool can persist themselves through this instance and put themselves in a state of low resource consumption. When and if they are needed again they can be woken up again and deserialize the state they previously persisted and set themselves usable again.
+
 
+
As stated in the pool events section, the hibernation and awakening of records is handled through events. Each record pool registers a ''HibernationObserver ''instance to receive the ItemHibernateEvent and ItemAwakeEvent events that are emitted through it, This observer whenever it receives one of the above events will synchronously go to the disk buffer and perform the requested action. When a record pool is disposed, or a single record is disposed, if it is hibernated it will firstly be awaken. This way each field and record can perform its own cleanup.
+
 
+
 
+
=== Life Cycle Management ===
+
During the life cycle of a record pool there might be situations in which the actors will need for the pool or part of the pool to be disposed without necessarily waiting for a dispose pool event to be emitted. One example of this is the ItemNotNeededEvent as presented in previous sections. The life cycle of a record pool is separated in two stages. The first stage is considered the pool's active span. Policies can be defined to be applied during this pool's period and their aggregated decision can keep the pool at this phase or move it to the next one. The next phase of the a pool's life cycle is the one where the pool is regarded as candidate for disposal. Policies can be applied to this phase to keep the pool active even at this stage but once the policies aggregated decision dictates it, the pool is disposed along with all of its records. During both these phases there is a third check that takes place. This is the policies defined not for the pool as a whole but for each record. An aggregated decision is made for each record and if this decision dictates it the record can be disposed even if none of the clients have explicitly requested it though an ItemNotNeededEvent.
+
 
+
A class implementing a life cycle policy must implement the ''IlifeCyclePolicy'' interface. With this interface the meaning of the decision is distinguished. Whether the policy decides on whether the inspected object should be kept, destroyed or it cannot make a decision. Life cycle policies can be distinguished between two types. Policies that can be applied to a record pool, and policies that can be applied to a record. These are distinguished by two extensions of the ''IlifeCyclePolicy. IlifeCyclePoolPolicy'' and ''IlifeCycleRecordPolicy ''other than tagging the type of object the policy can be applied on, defines the main entry point to the policy's decision making.
+
 
+
In order for the policies to be applied on a pool or on a record, there must be some statistics kept for all items of the record pool so that the decision on each of the item's life cycle can be informed. Statistics are kept for the record pool as well as for all the records of the record pool by monitoring the events exposed by the record pool. The ''PoolStatistics'' class is initialized for each record pool. This class is registered for all the events exposed by the pool. Whenever an event is caught it is processed and the statistics kept by the instance is updated. The statistics kept for the pool are the following:
+
 
+
* If the pool is being disposed
+
* The life cycle phase the pool is now in
+
* The time the pool was created
+
* The time the consumption of the pool was completed
+
* The time the pool production was completed
+
* The number of records added to the pool
+
* The number of records requested by the pool
+
* The number of records touched in the pool
+
* The number of messages transfered through the pool
+
* The time the last record was produced
+
* The time the last record was requested
+
* The time the last record was touched
+
* The time the last message was transfered
+
* The mean record production time
+
* The mean record request time
+
* The mean record touched time
+
* The mean message transfered time
+
 
+
Additionally to the statistics kept for the record pool in general, statistics are kept for each record of the record pool too. These statistics are the following:
+
 
+
* The time the record was created
+
* The time the record was marked as not needed
+
* The last time the record was touched
+
 
+
Based on these statistics a number of life cycle policies have been defined and more can be added. Each policy defines its scope, which can be either Pool, or Record. A policy might be applicable to either of the two or to both. The available types of policies that are now defined are
+
  
* LastUsed
+
== TCP Connection Manager ==
* UsageInterval
+
In case the gRS created is to be shared with a remote, or in general in a different address space than the one the writer is in, a TCPConnectionManager should be used. A TCPConnectionManager is static in the context of the address space it is used and is initialized though a singleton pattern. Therefore, one can safely initialize it in any context it is useful for him and do not worry if other components are also trying to initialize it. An example of initializing the TCPConnectionManager is shown in the following snippet.
* UsageRate
+
* ProductionConsumptionCompleted
+
  
The first three currently have only Pool scope, while the third has both Record and Pool scope. Going into more detail, there are the following five implementations of life cycle policies.
+
<source lang="java">
 +
List<PortRange> ports=new ArrayList<PortRange>(); //The ports that the TCPConnection manager should use
 +
ports.add(new PortRange(3000, 3050));            //Any in the range between 3000 and 3050
 +
ports.add(new PortRange(3055, 3055));            //Port 3055
 +
TCPConnectionManager.Init(
 +
  new TCPConnectionManagerConfig("localhost", //The hostname by which the machine is reachable
 +
    ports,                                    //The ports that can be used by the connection manager
 +
    true                                      //If no port ranges were provided, or none of them could be used, use a random available port
 +
));
 +
TCPConnectionManager.RegisterEntry(new TCPConnectionHandler());      //Register the handler for the gRS2 incoming requests
 +
TCPConnectionManager.RegisterEntry(new TCPStoreConnectionHandler()); //Register the handler for the gRS2 store incoming requests
 +
</source>
  
* ''ConsumptionCompletedRecordPolicy'' – This policy has a Record scope and a policy type of ProductionConsumptionCompleted. The policy will respond that a record is candidate for distraction if the record has been marked as not needed though a ItemNotNeededEvent or the last time the record was touched exceeds some configurable threshold.
+
A gCube service could use the onInit event to make this initialization. The TCPConnectionManager is located in the MadgikCommons package.
* ''ProductionConsumptionCompletedPoolPolicy'' – This policy has a Pool scope and a policy type of ProductionConsumptionCompleted. The policy will respond that a pool has exceeded the policy defined grace period if the production of the pool has been completed (ProductionCompletedEvent), the consumption of the pool has been completed (ConsumptionCompletedEvent) and a configurable time threshold has been exceeded since both the time the consumption has been completed and the production has been completed. Depending on the phase of the record pool life cycle the policy has been used the outcome of the policy will be used differently. It will either contribute to moving the pool in the next life cycle phase or disposing it.
+
* ''UsageIntervalLifeCyclePoolPolicy'' – This policy has a Pool scope and a policy type of UsageInterval. The policy will respond that a pool has exceeded the policy defined grace period if the mean message transfer time, the mean record production time, the mean record touch time and the mean record request time have all a value less then some configurable threshold value. Depending on the phase of the record pool life cycle the policy has been used the outcome of the policy will be used differently. It will either contribute to moving the pool in the next life cycle phase or disposing it.
+
* ''UsageRateLifeCyclePoolPolicy'' – This policy has a Pool scope and a policy type of UsageRate. The policy will respond that a pool has exceeded the policy defined grace period if the message rate, the record production rate, the record touched rate and the record request rate have all values less than some configurable threshold value. Depending on the phase of the record pool life cycle the policy has been used the outcome of the policy will be used differently. It will either contribute to moving the pool in the next life cycle phase or disposing it.
+
* LastUsedLifeCyclePoolPolicy – This policy has a Pool scope and a policy type of LastUsed. The policy will respond that a pool has exceeded the policy defined grace period the time past since the last message, the time since the last record production, the time since the last record touched and the time since the last record request have all values greater than some configurable threshold value. Depending on the phase of the record pool life cycle the policy has been used the outcome of the policy will be used differently. It will either contribute to moving the pool in the next life cycle phase or disposing it.
+
  
[[Image:]]
+
== Definitions ==
 +
Every gRS needs to have explicitly defined the definitions of the records it holds. The record definitions in turn contain the field definitions they contain. Every record that is then handled by the specific gRS must comply to one of the definitions initially provided to the gRS. For example, creating a gRS that handles only one type of records that in turn contains only a single type of field is presented in the following snippet
  
 +
<source lang="java">
 +
//The gRS will contain only one type of records which in turn contains only a single field
 +
RecordDefinition[] defs=new RecordDefinition[]{          //A gRS can contain a number of different record definitions
 +
    new GenericRecordDefinition((new FieldDefinition[] { //A record can contain a number of different field definitions
 +
    new StringFieldDefinition("ThisIsTheField")          //The definition of the field
 +
  }))
 +
};
 +
</source>
  
In order to apply the life cycle policies, each record pool has associated a ''LifeCycleManager'' instance. In the configuration of the manager the policies to be used are defined as well as some manager specific properties. Whenever a record pool is remotely instantiated to a consumer site the life cycle manager as well as all its properties and policies are cloned to the remote pool. The attributes a life cycle manager can be parametrized with is weather some default life cycle policies should be used if none are defined and the interval every which the policies will be evaluated. If no life cycle policies are defined and the default ones are not used either, the pool and its records will only be reclaimed on a DipsosePoolEvent. This is also the case for records that are marked as not needed through ItemNotNeededEvent. The life cycle manager is configured as to which policies to use through a list of policy definition arguments represented by the ''LifeCyclePolicyDefinitionArguments'' class. The policies to be used are declared indirectly by defining the policy name, the use of the policy, and its scope, along with the specific policy configuration. During the manager's initialization the concrete policy instances are created and the implementor of the policies which will evaluate the policies at the specified interval, aggregate the results of each policy, handle the phase of the pool's life cycle and perform the actual disposal of pool and records is instantiated.
+
== Record Writer Initialization ==
 +
To initialize a gRS2 writer one must simply create a new instance of the available writers. Depending on the configuration detail one wishes to set, a number of different constructors are available
  
 +
<source lang="java">
 +
writer=new RecordWriter<GenericRecord>(
 +
  proxy, //The proxy that defines the way the writer can be accessed
 +
  defs  //The definitions of the records the gRS handles
 +
);
  
=== Records and Fields ===
+
writer=new RecordWriter<GenericRecord>(
The whole purpose of the gRS is the movement of payload. This is done through the records and the fields their of. Each information stored and retrieved from the record pool is organized as a record. A record is simply a way to group fields. The fields of a record is the actual construct that contains the payload that needs to be moved from the producer to the consumer.
+
  proxy, //The proxy that defines the way the writer can be accessed
 +
  defs,  //The definitions of the records the gRS handles
 +
  50,    //The capacity of the underlying synchronization buffer
 +
  2,    //The maximum number of parallel records that can be concurrently accessed on partial transfer 
 +
  0.5f  //The maximum fraction of the buffer that should be transferred during mirroring 
 +
);
  
A record groups fields. A record is the grouping unit that defines the items that can be added to a pool. Each record has a definition, ''RecordDef'', that contains the schema of the record. The schema of the record has some system level attributes needed by the framework and different implementations of a record can add their own additional properties. The two basic attributes of a record is its type, and a transport directive. The type of the record is the class name of the class that implements the ''IRecord'' interface. This way when a record is transferred over the wire to a remote location the framework can unmarshal the record to its originally defined instance. The transport directive is a hint to the framework as to how the payload of the records, the fields, should be moved over the wire unless some field has specific requirements. The transport directive a record can define is:
+
writer=new RecordWriter<GenericRecord>(
 +
  proxy,          //The proxy that defines the way the writer can be accessed
 +
  defs,           //The definitions of the records the gRS handles
 +
  50,              //The capacity of the underlying synchronization buffer
 +
  2,               //The maximum number of parallel records that can be concurrently accessed on partial transfer 
 +
  0.5f,            //The maximum fraction of the buffer that should be transferred during mirroring
 +
  60,              //The timeout in time units after which an inactive gRS can be disposed 
 +
  TimeUnit.SECONDS //The time unit in timeout after which an inactive gRS can be disposed
 +
);
 +
</source>
  
* Full – This directive specifies that all the record fields must be transferred fully to the remote location unless some field has overridden in its own definition this specifying that it needs to be transferred partially.
+
For convenience, if one wishes to create a writer with the same configuration as a already available reader, one can use one of the following constructors
* Partial – This directive specifies that all the record fields must be transferred partially to the remote location unless some field has overridden in its own definition this specifying that it needs to be transferred fully.
+
* Inherit – This directive does not specify a concrete directive but it specifies that its directive is instead inherited from the transport directive the record pool defines.
+
  
When a record is added to a record pool it cannot be added to any other record pool and its definition can no longer be modified.
+
<source lang="java">
 +
writer=new RecordWriter<GenericRecord>(
 +
  proxy, //The proxy that defines the way the writer can be accessed
 +
  reader //The reader to copy configuration from
 +
);
  
A record is simply the grouping placeholder to keep organized a number of fields that may compose the actual record's payload. The payload of the record is the collection of fields. Each field can be of various types depending both on the field's payload type as well as the access methods it provides. Each field has a definition, ''FieldDef'', that contains the schema of the field. The field schema contains some system level attributes needed by the framework, but each field implementation can define additional ones. The system level definition attributes are the following:
+
writer=new RecordWriter<GenericRecord>(
 +
  proxy, //The proxy that defines the way the writer can be accessed
 +
  reader, //The reader to copy configuration from
 +
  50,   //The capacity of the underlying synchronization buffer
 +
  2,    //The maximum number of parallel records that can be concurrently accessed on partial transfer 
 +
  0.5f  //The maximum fraction of the buffer that should be transferred during mirroring 
 +
);
  
* FieldType – The class name of the field. This is needed so that when a field in transferred at a remote location the actual instance of the field is unmarhaled and recreated.
+
writer=new RecordWriter<GenericRecord>(
* SupportsChunking – Whether or not chunking of payload should be used when transferring the field to a remote location. Whenever a number of data is requested by a remote consumer to be transported, and chunking is enabled, a multiple of of the ChunkSizeInBytes is send that covers the requested number of data if the field is set to support chunking.
+
  proxy, //The proxy that defines the way the writer can be accessed
* ChunkSizeInBytes – The number of data that constitutes an undividable unit of transfer when the field supports chunking.
+
  reader, //The reader to copy configuration from
* TransportDirective – The fields desired method of transfer. The available options are Full, Partial an Inherit. If Full transport directive is defined whenever the field is transferred to a remote consumer the full field's payload is always transfered immediately. On partial transfer no payload or a portion of it as defined by the prefetching and chunking properties is transfered. Additional payload must be explicitly requested to be transferred. On Inherit transport directive the transport directive defined for the containing record is used.
+
  50,   //The capacity of the underlying synchronization buffer
* DoPrefetching – Whether or not prefetching of the field's payload must be applied in case of a partial transport directive. In case this is applicable the PrefetchSizeInBytes and the Chunking properties are consulted and the calculated number of bytes is also transfered to the remote consumer site.
+
  2,    //The maximum number of parallel records that can be concurrently accessed on partial transfer
* PrefetchSizeInBytes – The number of bytes that should be moved along with the field definition in case the field supports prefetching and the field is being transferred partially.
+
  0.5f,  //The maximum fraction of the buffer that should be transferred during mirroring 
* MIMEType – This attribute is meant to facilitate the consumer to identify the type of payload the field contains.
+
  60,              //The timeout in time units after which an inactive gRS can be disposed 
 +
  TimeUnit.SECONDS //The time unit in timeout after which an inactive gRS can be disposed
 +
);
 +
</source>
  
When a field is added to a record it cannot be added to any other record and once the record is added to a record pool the field's definition cannot be modified.
+
In the first of the above examples, all configuration is copied from the reader. The extra configuration parameters override the ones of the reader, meaning that in the last example only the record definitions are duplicated from the reader configuration.
  
Additionally to the ''FieldDef'' each field contain a ''FieldState'' instance. Through this instance the interested client can register for notifications concerning the fields payload and its availability. The field state instance performs lazy initialization of its events to avoid explosive memory needs for the fields as overhead. The events that are exposed through this instance are ''FieldPayloadAddedEvent'' and ''FieldNoMorePayloadEvent''<nowiki>. and are used to handle the partial availability of a field's payload. Whenever a field's payload is partially transferred to the consumer, in order for the consumer to request for more payload he needs to explicitly do so. The framework will then consult the prefetching attributes and request for some specific amount of data to be made available. In case of a locally [consumed pool, the writer that is authoring the pool must respond with a </nowiki>''NoMorePayloadEvent'' as already explained in other section. This will indicate for to the consumer that all the data that can be consumed from the field is already available to him. Otherwise, in case of a mediated consumption and a framework synchnronized consumption the synchronization protocol will intercept the requesting event, consult the referenced field, retrieve the desired number of data, and send it to the consumer. Additionally it will emit a ''FieldPayloadAddedEvent'' to let the field consumer know that new payload has been made available to him. In case the referenced field respond that no more payload can be send as the full amount of data available has already been send, a'' NoMorePayloadEvent ''will instead be send to the field consumer through the respective ''FieldState. ''
+
== Custom Records ==
 +
All records that are handled through gRS2 must extend a base class of gr.uoa.di.madgik.grs.record.Record. Extending this class means implementing a couple of required methods which are mainly used to properly manage the transport of the custom record and any additional information it might cary. Although the actual payload of the record is to be managed through fields, one can also assign additional information in the record itself. In the example that follows, a custom record with its respective definition is created. The record hides the fields that it manages by exposing direct getters and setters over a known definition that it provides and should be used with. Additionally, it defines additional information that describes the record as a whole and instead of using additional fields in order to do this, it defines placeholders within the record itself.
  
The process of field payload synchronization as sketched above is expected to put stress to a simple consumer that needs to take advantage of the partially transferable field payload but want to have a more simplified access method. To this end an additional utility is provided to the field consumer. The ''MediatingInputStream'' class can be used for the fields that can support it to consume their payload. This stream can be used to automatically handle the request for additional payload, handle responding events, and blocking until more data are available.  
+
=== Definition ===
 +
<source lang="java">
 +
import java.io.DataInput;
 +
import java.io.DataOutput;
 +
import gr.uoa.di.madgik.grs.buffer.IBuffer.TransportDirective;
 +
import gr.uoa.di.madgik.grs.record.GRS2RecordSerializationException;
 +
import gr.uoa.di.madgik.grs.record.RecordDefinition;
 +
import gr.uoa.di.madgik.grs.record.field.FieldDefinition;
 +
import gr.uoa.di.madgik.grs.record.field.StringFieldDefinition;
  
Depending on the type of payload a field contains and preferences on the way the payload is accessed the following ''IField'' extending interfaces have been defined. While the names of the interfaces define also to a big extend their logic, there is nothing that is actually stopping the implementors of an interface to implement a different logic than the one expected.
+
public class MyRecordDefinition extends RecordDefinition
 +
{
 +
  public static final String PayloadFieldName="payload";
 +
  public static final boolean DefaultCompress=false;
  
[[Image:]]
+
  public MyRecordDefinition()
 +
  {
 +
    StringFieldDefinition def=new StringFieldDefinition(MyRecordDefinition.PayloadFieldName);
 +
    def.setTransportDirective(TransportDirective.Full);
 +
    def.setCompress(MyRecordDefinition.DefaultCompress);
 +
    this.Fields = new FieldDefinition[]{def};
 +
  }
  
 +
  public MyRecordDefinition(boolean compress)
 +
  {
 +
    StringFieldDefinition def=new StringFieldDefinition(MyRecordDefinition.PayloadFieldName);
 +
    def.setTransportDirective(TransportDirective.Full);
 +
    def.setCompress(compress);
 +
    this.Fields = new FieldDefinition[]{def};
 +
  }
  
* ''IReferenceField'' – This interface is meant to be implemented by fields whose payload is simply a URI identifying a resource that can be consumed through some protocol through which it is accessible. The protocol may be able to serve the payload in any location or there may be a need fro the framework to transfer the payload. This logic is incorporated in each interface implementing field.
+
  @Override
** ''FTPReferenceField'' – Implementation of the ''IReferenceField'' interface. Denotes a field whose payload is an ftp accessible resource. The field contains as payload simply the URI to the ftp location and path to the resource. Whenever a consumer needs to access the resource the field will open an input stream that can read from the remote location. This means that the field can serve the resource to any location without the framework having to transfer any data other than the URI itself. Mediation for this field is provided in the sense that the mediating input stream returned is simply the input stream that would read the remote resource anyway. Requests for additional payload would not provide any additional data other than the one already available through the protocol. So requests for more payload are immediately answered with a no more payload event.
+
  public boolean extendEquals(Object obj)
** ''HTTPReferenceField'' – Implementation of the ''IReferenceField'' interface. Denotes a field whose payload is an http accessible resource. The field contains as payload simply the URI to the http location and path to the resource. Whenever a consumer needs to access the resource the field will open an input stream that can read from the remote location. This means that the field can serve the resource to any location without the framework having to transfer any data other than the URI itself. Mediation for this field is provided in the sense that the mediating input stream returned is simply the input stream that would read the remote resource anyway. Requests for additional payload would not provide any additional data other than the one already available through the protocol. So requests for more payload are immediately answered with a no more payload event.
+
  {
** ''GlobalFSReferenceField'' – Implementation of the ''IReferenceField'' interface. Denotes a field whose payload is a globally accessible filesystem resource. The field contains as payload simply the URI to the filesystem location and path to the resource. Whenever a consumer needs to access the resource the field will open an input stream that can read from the filesystem location. This means that the field can serve the resource to any location without the framework having to transfer any data other than the URI itself. Mediation for this field is provided in the sense that the mediating input stream returned is simply the input stream that would read the filesystem resource anyway. Requests for additional payload would not provide any additional data other than the one already available through the protocol. So requests for more payload are immediately answered with a no more payload event.
+
    if(!(obj instanceof MyRecordDefinition)) return false;
** ''LocalFSReferenceField'' – Implementation of the ''IReferenceField'' interface. Denotes a field whose payload is resource accessible only through the producer's filesystem. The field contains as payload simply the URI to the filesystem location and path to the resource. This means that in case the producer resides in the same host as the consumer and their mediation is only performed through a ''LocalProxy'' the URI is indeed enough for the consumer to access the resource. In case the communication is mediated the local filesystem resource needs to be transfered as well. The producer side field keeps track of the amount of data it has already transferred and the consumer side field creates a temporary file deleted on field disposal to which it appends the data of the original field as they are made available. The consumer accesses the locally available copy and in case of partial transfer keeps requesting the amount of data that it needs to consume. Mediation for the field is available wrapping an input stream to the local partial file and hiding the additional payload requests as well as blocking until more data is made available whenever a client request would meet the end of the temporary file while more data can be retrieved.
+
    return true;
* ''IStreamField'' – This interface is meant to be implemented by fields whose payload is accessible through a stream. On the producer side, the client will provide the field with an input stream that reads over the available payload. No data will be read unless they need to be moved to the consumer. If the consumer is collocate with the producer, the consumer will access the payload of the field though the same InputStream the producer provided. Otherwise the mediating proxy will read the needed payload from the producer input stream, transfer them and make them available to the consumer through a respective input stream.
+
  }
* ''IValueField'' – This interface is meant to be implemented by fields whose payload is directly available fully to the gRS at its instantiation and it will become accessible to the consumer directly from the field itself.
+
** ''IObjectField'' – Extension of the ''IValueField'' interface that is meant to be implemented by fields that want to offer typed java objects as their payload. The object must be able to be marshaled and unmarshaled to and from a byte stream. Implementations of the field must specify whether or not partial transfer is supported for this field.
+
** ''StringField'' – Implementation of the ''IObjectField'' interface. Denotes a field whose payload is simply a string that can be fully or partially transferred to a remote consumer. The specific implementation of the field whenever partially transferred, will create a new instance of its payload on the arrival of new data. Furthermore since the full payload is kept in memory this field is not meant to be used for large data payload.
+
* ''IThirdPartyTransferField – ''This interface is meant to be implemented by fields whose payload will be transferred by a third party protocol / application. This field has a lot in common with the ''IReferenceField'' implementations in the sense that the framework needs not transfer anything but an identifying object to enable retrieval of an externally available payload. The difference is that while the ''IReferenceField ''expects data to be already available in the resource serving location, implementors of this interface can perform this action upon request and their own field logic. An example of such a field could be a field that uses a database accessible by both producer and consumer. The field could store its payload to the database when requested to serialize itself for transport and transfer a row identifier. At the consumer side it could retrieve the data using this identifier.
+
  
== Remote and local Pool Synchronization ==
+
  @Override
=== Proxies and Registry ===
+
  public void extendDeflate(DataOutput out) throws GRS2RecordSerializationException
The two main actors in a gRS scenario is the producer and the consumer. The producer authors a record pool that the consumer needs to access. The two main cases one may distinguish between in this scenario is a consumer that is collocated with the producer in the same host and the same VM in which case they share a common address space. The second case is that the two actors do not share the address space. This may mean that the consumer and producer are running in a different VM either in the same or in a completely different host. The producer and consumer should not be forced to make this distinction in their code. The gRS frees them from handling the two cases differently and allows them to have the same behavior in their code regardless of the other party location. This is achieved through the proxy concept.
+
  {
 +
    //nothing to deflate
 +
  }
  
 +
  @Override
 +
  public void extendInflate(DataInput in) throws GRS2RecordSerializationException
 +
  {
 +
    //nothing to inflate
 +
  }
 +
}
 +
</source>
  
[[Image:]]
+
=== Record ===
 +
<source lang="java">
 +
import java.io.DataInput;
 +
import java.io.DataOutput;
 +
import java.io.IOException;
 +
import gr.uoa.di.madgik.grs.GRS2Exception;
 +
import gr.uoa.di.madgik.grs.buffer.GRS2BufferException;
 +
import gr.uoa.di.madgik.grs.record.GRS2RecordDefinitionException;
 +
import gr.uoa.di.madgik.grs.record.GRS2RecordSerializationException;
 +
import gr.uoa.di.madgik.grs.record.Record;
 +
import gr.uoa.di.madgik.grs.record.field.Field;
 +
import gr.uoa.di.madgik.grs.record.field.StringField;
  
A proxy is in fact a mediator between the two actors. Different types of proxies can handle different type of communication, can be initialized with different synchronization protocols, and use different underlying transport technologies. All this are known to the client only at a declarative level in case he may want to have more control over the mechanics of the framework. Different implementations if the proxy semantics can be provided by implementing the ''IProxy'' interface.
+
public class MyRecord extends Record
 +
{
 +
  private String id="none";
 +
  private String collection="none";
 +
 +
  public MyRecord()
 +
  {
 +
    this.setFields(new Field[]{new StringField()});
 +
  }
 +
 +
  public MyRecord(String payload)
 +
  {
 +
    this.setFields(new Field[]{new StringField(payload)});
 +
  }
 +
 +
  public MyRecord(String id,String collection)
 +
  {
 +
    this.setId(id);
 +
    this.setCollection(collection);
 +
    this.setFields(new Field[]{new StringField()});
 +
  }
 +
 +
  public MyRecord(String id,String collection,String payload)
 +
  {
 +
    this.setId(id);
 +
    this.setCollection(collection);
 +
    this.setFields(new Field[]{new StringField(payload)});
 +
  }
 +
 +
  public void setId(String id) { this.id = id; }
  
Every proxy implementation will still need to provide a marshalable identification mechanism which can be send over to a consumer side which will in turn use it to initialize a new proxy instance that will mediate on his side the pool consumption. This object is an implementation of the ''IProxyLocator'' interface. Implementations of this interface are capable to identify uniquely a record pool. They must also hold enough information to contact the producer side proxy that mediates the producing side pool. This implies information on the protocol used, and any protocol specific information needed by the consumer side proxy.
+
  public String getId() { return id; }
  
A proxy's responsibility is to initialize all needed protocol mechanics and to produce the locator that can be used by the producer proxy. The actual transport, if needed, is not performed by the proxy itself. A proxy although it may have some knowledge on the underlying technologies that will be used does not delve into the protocol details. This is the job of yet another construct, the transporter. Different transporters implementing the ''ITransporter'' interface are the constructs that hiding the underlying communication technology mechanics provide a common way for the synchronization protocol to send and receive the needed data. The transporters are also the ones responsible for any communication level security but also for the creation and destruction of the communication link. Depending on the underlying technology a transporter's work may be extremely simplified or it may need to perform its own constructs for providing session like communication over a stateless protocol or for multiplexing communication for multiple record pools over a single communication link or even to chunk protocol packets to comply with communication protocol restrictions. All this is hidden from the layers above it.  
+
  public void setCollection(String collection) { this.collection = collection; }
  
Remote synchronization security comes in two levels. The first is the protocol packet authentication headers and it will be discussed in alter sections. The second level is the communication channel encryption. This level of security is handled by the transporter. Should the client wishes to use communication level security, he must also provide with a set of parameters. These include:
+
  public String getCollection() { return collection; }
  
* A pair of Private and Public Keys used by the local party component for signatures
+
  public void setPayload(String payload) throws GRS2Exception
* The Public key of the remote party used for signatures
+
  {
* A supported algorithm to be used for signatures
+
    this.getField(MyRecordDefinition.PayloadFieldName).setPayload(payload);
* A pair of Private and Public Keys used by the local party component for encryption
+
  }
* The Public key of the remote party used for encryption
+
* A supported algorithm to be used for symmetric encryption
+
  public String getPayload() throws GRS2Exception
 +
  {
 +
    return this.getField(MyRecordDefinition.PayloadFieldName).getPayload();
 +
  }
 +
 +
  @Override
 +
  public StringField getField(String name) throws GRS2RecordDefinitionException, GRS2BufferException
 +
  {
 +
    return (StringField)super.getField(name);
 +
  }
  
With this information provided the communication initializing side transporter initializes a security header that is send to the other end of the communication channel. This header contains the symmetric secret key to be used for the communication. The key is encrypted using Public key encryption and signed by the sender. The other end decrypts and verifies the key send. From then on all data that is exchanged between the two parties is encrypted with the shared key. The availability of remote keys is not handled by the framework in any way. It is assumed that there is some infrastructure capable of storing securely and making available the communicating parties keys.
+
  @Override
 +
  public void extendSend(DataOutput out) throws GRS2RecordSerializationException
 +
  {
 +
    this.extendDeflate(out);
 +
  }
  
In both the above mentioned cases of collocated and remotely located producers and consumers, there is always the case where a record pool needs to be identified within the address space of the VM that it was created. To do so, since the identification token needs to be serializable and movable to a remote location a java reference is not adequate. There needs to be a different construct that will identify uniquely the record pool and through it the record pool itself can be retrievable. This construct in the gRS framework is the ''Registry'' class. The ''Registry'' is a static class unique within the context of a VM in which record pools can registered, assigned within a unique id and discoverable through it. Whenever a producer wants to make the record pool it is authoring available for consumption trough a proxy, it registers the record pool and the proxy through which the record pool will be available with the ''Registry'' class. The registration procedure produces a registry key consisting of a UUID. The record pool and associated proxy is stored in the registry for future reference through the registry key and the produced registry key is provided back to the registration procedure caller to use it to produce the locator that will be able to identify the record pool through it. The registry construct needs also be registered to some of the record pool events so that it will be able to perform its own cleanup once the record pool is disposed.
+
  @Override
 +
  public void extendReceive(DataInput in) throws GRS2RecordSerializationException
 +
  {
 +
    this.extendInflate(in, false);
 +
  }
  
 +
  @Override
 +
  public void extendDeflate(DataOutput out) throws GRS2RecordSerializationException
 +
  {
 +
    try
 +
    {
 +
      out.writeUTF(this.id);
 +
      out.writeUTF(this.collection);
 +
    }catch(IOException ex)
 +
    {
 +
      throw new GRS2RecordSerializationException("Could not deflate record", ex);
 +
    }
 +
  }
  
=== Synchronization protocol ===
+
  @Override
Whenever a producer and a consumer share a record pool there needs to be some synchronization between them both to ensure access integrity but also to enable remote consumption. When the producer and consumer are collocated, the synchronization is done without mediation directly through the events exposed by the shared record pool. When the communication is mediated there is a need for a synchronization protocol that will handle the forwarding of locally produced events and their interpretation as well as the marshaling and unmarshaling of events, records, fields and their payload.
+
  public void extendInflate(DataInput in, boolean reset) throws GRS2RecordSerializationException
 +
  {
 +
    try
 +
    {
 +
      this.id=in.readUTF();
 +
      this.collection=in.readUTF();
 +
    }catch(IOException ex)
 +
    {
 +
      throw new GRS2RecordSerializationException("Could not inflate record", ex);
 +
    }
 +
  }
  
This is the work of the ''IProtocol'' interface implementing classes. Specifically there are two interfaces extending the ''IProtocol'' interface that need to be implemented. The ''IConsumerProtocol'' and the ''IProducerProtocol'' need to be implemented in pairs. Currently there is only a Client Pull implementing protocol that will be detailed in later section.
+
  @Override
 +
  public void extendDispose()
 +
  {
 +
    this.id=null;
 +
    this.collection=null;
 +
  }
 +
}
 +
</source>
  
 +
== Simple Example ==
 +
In the following snippets, a simple writer and respective reader is presented to show the full usage of the presented elements
  
[[Image:]]
+
=== Writer ===
 +
<source lang="java">
 +
import java.net.URI;
 +
import java.util.concurrent.TimeUnit;
 +
import gr.uoa.di.madgik.grs.buffer.IBuffer.Status;
 +
import gr.uoa.di.madgik.grs.proxy.IWriterProxy;
 +
import gr.uoa.di.madgik.grs.record.GenericRecord;
 +
import gr.uoa.di.madgik.grs.record.GenericRecordDefinition;
 +
import gr.uoa.di.madgik.grs.record.RecordDefinition;
 +
import gr.uoa.di.madgik.grs.record.field.Field;
 +
import gr.uoa.di.madgik.grs.record.field.FieldDefinition;
 +
import gr.uoa.di.madgik.grs.record.field.StringField;
 +
import gr.uoa.di.madgik.grs.record.field.StringFieldDefinition;
 +
import gr.uoa.di.madgik.grs.writer.GRS2WriterException;
 +
import gr.uoa.di.madgik.grs.writer.RecordWriter;
  
 +
public class StringWriter extends Thread
 +
{
 +
  private RecordWriter<GenericRecord> writer=null;
  
==== Protocol packets ====
+
  public StringWriter(IWriterProxy proxy) throws GRS2WriterException
The communication between the producer and the consumer protocol is done by means of packets. The producer and consumer protocols will usually operate in conceptual loops. At every loop one of the two actors will need to send some information, request, payload, events, etc to the other actor. At every protocol loop all the information to be send is aggregated to a single packet. This packet is then send to the other side to process it and possibly to create in respond a new packet and send it to the party that started the process.
+
  {
 +
    //The gRS will contain only one type of records which in turn contains only a single field
 +
    RecordDefinition[] defs=new RecordDefinition[]{          //A gRS can contain a number of different record definitions
 +
        new GenericRecordDefinition((new FieldDefinition[] { //A record can contain a number of different field definitions
 +
        new StringFieldDefinition("ThisIsTheField")          //The definition of the field
 +
      }))
 +
    };
 +
    writer=new RecordWriter<GenericRecord>(
 +
        proxy, //The proxy that defines the way the writer can be accessed
 +
        defs  //The definitions of the records the gRS handles
 +
      );
 +
    }
  
 +
    public URI getLocator() throws GRS2WriterException
 +
    {
 +
      return writer.getLocator();
 +
    }
  
[[Image:]]
+
    public void run()
 +
    {
 +
      try
 +
      {
 +
        for(int i=0;i<500;i+=1)
 +
        {
 +
          //while the reader hasn't stopped reading
 +
          if(writer.getStatus()!=Status.Open) break;
 +
          GenericRecord rec=new GenericRecord();
 +
          //Only a string field is added to the record as per definition
 +
          rec.setFields(new Field[]{new StringField("Hello world "+i)});
 +
          //if the buffer is in maximum capacity for the specified interval don;t wait any more
 +
          if(!writer.put(rec,60,TimeUnit.SECONDS)) break;
 +
        }
 +
        //if the reader hasn't already disposed the buffer, close to notify reader that no more will be provided
 +
        if(writer.getStatus()!=Status.Dispose) writer.close();
 +
      }catch(Exception ex)
 +
      {
 +
        ex.printStackTrace();
 +
      }
 +
    }
 +
}
 +
</source>
  
A packet can contain the following types of information :
+
=== Reader ===
 +
<source lang="java">
 +
import gr.uoa.di.madgik.grs.reader.ForwardReader;
 +
import gr.uoa.di.madgik.grs.reader.GRS2ReaderException;
 +
import gr.uoa.di.madgik.grs.record.GenericRecord;
 +
import gr.uoa.di.madgik.grs.record.field.StringField;
 +
import java.net.URI;
  
* The UUID of the record pool assigned to it by the registry. This is used to identify the referenced record pool. The proxy and transporter types implied may not require it if they already base their communication on a session enabled protocol, but the information is provided.
+
public class StringReader extends Thread
* A flag indicating whether the sending side will close its communication marking the disposal of all kept information.
+
{
* Authentication headers. This is the second level of communication security. The first level being the encryption of the whole communication stream. Even if such a drastic measure is not needed, one can request for only the authentication of the packets send. This would leave the payload send out in the clear but the sender and receiver would be able to identify one another.
+
  ForwardReader<GenericRecord> reader=null;
* The current size of the pool that the packet sending side has available. This can be used to offer to the clients a view of the remote pool status. This means that the view of the remote pool status will be updated at the same rate that the packets are exchanged so it will not always mirror the most up to date information.
+
* A pool definition sub packet that is used during the initial communication initialization. Through this packet the pool the producer has created is defined so that the consumer can create a new instance of the same pool at its own location. The packet contains the pool configuration that instantiated the pool in the first place as well as the life cycle policies definitions.
+
* An events sub packet that contains events produced by the sending side and need to be send to the packet receiving side.
+
* A Record payload sub packet that contains definitions of records that are send from the producer to the consumer. Along with the record definitions its field definitions are send also. The transport directive of each field is consulted to check how to handle the payload contained. In case the transport directive is full or it is defined that some prefetching should be employed, additional field payload data is transferred through this sub packet. Field payload is written directly from the field to the transporter to avoid unnecessary resource consumption.
+
* A field payload sub packet contains additional payload requested for a single field. Whenever s field is consumed with on demand payload transfer, additional field payload is requested through the events sub packet and transferred through this sub packet. Field write directly to the transporter to avoid unnecessary resource consumption.
+
  
Whenever a record or field is unmarshaled, its attributes from its definition is used to determine the exact type to instantiate. The field or record is instantiated through reflection and passed the provided initialization definition and supplied payload. When a field part is unmarshaled, the record that contains the field is retrieved from the local pool, the respective field is found and the provided payload is supplied to the field to make it available to its consumer.
+
  public StringReader(URI locator) throws GRS2ReaderException
 +
  {
 +
    reader=new ForwardReader<GenericRecord>(locator);
 +
  }
  
 +
  public void run()
 +
  {
 +
    try
 +
    {
 +
      for(GenericRecord rec : reader)
 +
      {
 +
        //In case a timeout occurs while optimistically waiting for more records form an originally open writer
 +
        if(rec==null) break;
 +
        //Retrieve the required field of the type available in the gRS definitions
 +
        System.out.println(((StringField)rec.getField("ThisIsTheField")).getPayload());
 +
      }
 +
      //Close the reader to release and dispose any resources in boith reader and writer sides
 +
      reader.close();
 +
    }catch(Exception ex)
 +
    {
 +
      ex.printStackTrace();
 +
    }
 +
  }
 +
}
 +
</source>
  
==== Client pull ====
+
=== Main ===
Different communication protocols can be implemented to handle the synchronization between the producer and the consumer. Protocols should be implemented in pairs for the consumer and the producer side. Currently one protocol is available. The implemented protocol will not transfer any record or field payload unless specifically requested by the consumer. Events and control data are periodically synchronized but no payload is unnecessarily transfered.  
+
<source lang="java">
 +
import java.util.ArrayList;
 +
import java.util.List;
 +
import gr.uoa.di.madgik.commons.server.PortRange;
 +
import gr.uoa.di.madgik.commons.server.TCPConnectionManager;
 +
import gr.uoa.di.madgik.commons.server.TCPConnectionManagerConfig;
 +
import gr.uoa.di.madgik.grs.proxy.tcp.TCPConnectionHandler;
 +
import gr.uoa.di.madgik.grs.proxy.tcp.TCPStoreConnectionHandler;
 +
import gr.uoa.di.madgik.grs.proxy.tcp.TCPWriterProxy;
  
===== Consumer protocol =====
+
public class MainTest
First the protocol needs to create a local copy of a record pool with the same attributes and characteristics as the one the producer is serving, To do so it communicates with the producer side protocol to receive the record pool served attributes and description.
+
{
 +
  public static void main(String []args) throws Exception
 +
  {
 +
    List<PortRange> ports=new ArrayList<PortRange>(); //The ports that the TCPConnection manager should use
 +
    ports.add(new PortRange(3000, 3050));            //Any in the range between 3000 and 3050
 +
    ports.add(new PortRange(3055, 3055));            //Port 3055
 +
    TCPConnectionManager.Init(
 +
      new TCPConnectionManagerConfig("localhost", //The hostname by which the machine is reachable
 +
        ports,                                   //The ports that can be used by the connection manager
 +
        true                                      //If no port ranges were provided, or none of them could be used, use a random available port
 +
    ));
 +
    TCPConnectionManager.RegisterEntry(new TCPConnectionHandler());      //Register the handler for the gRS2 incoming requests
 +
    TCPConnectionManager.RegisterEntry(new TCPStoreConnectionHandler()); //Register the handler for the gRS2 store incoming requests
  
A communication initialization packet is created and as a response the consumer will receive a packet that will contain the served pool configuration and all needed information to create an exact copy of the remote record pool.
+
    // gr.uoa.di.madgik.grs.proxy.local.LocalWriterProxy Could be used instead for local only, all in memory, access
 +
    StringWriter writer=new StringWriter(new TCPWriterProxy());
 +
    StringReader reader=new StringReader(writer.getLocator());
  
After this the thread enters the protocol loop that is ended once the the local or remote pool is declared as disposed.
+
    writer.start();
 +
    reader.start();
  
# The first step of the protocol loop is to send the local events the consumer has to send to the remote end and receive the producer's responses. Events that have been received by the producer are not send back and events that are for local use only are blocked to avoid unnecessary traffic. Other events that support it are grouped together to send as less data as possible and to minimize object marshaling and unmarshaling. The packet is send and the protocol waits to receive a packet as response. If needed the packet is authenticated and while unmarshaling the packet any provided field payload is also unmarshaled and stored as the specific field defines.
+
    writer.join();
# The second step is the received packet processing. The received events are marked so that they are not send back, new records provided are added to the local pool, and the received events are send to the local record pool while if requested field payload is supplied the field is also retrieved and an event is emitted through that field too.
+
    reader.join();
# Finally depending on whether there are still pending synchronization issues for the consumer or not an interval is defined to avoid flooding the producer with requests and the process starts from step 1.
+
  }
 +
}
 +
</source>
  
[[Image:]]
+
== Events ==
 +
As was already mentioned, the gRS2 serves as a bidirectional communication channel through events that can be propagated from either the writer to the reader or form the reader to the writer along the normal data flow. These events can serve any purpose fitting to the application logic, from control signals to full data transport purposes, although in the latter case, the payload size should be kept moderate as it is always handled in memory and cannot use services such as partial transfer, or be declared using internally managed files.
  
===== Producer protocol =====
+
An example that has been taken to the extremes follows. In this example, a writer and a reader use only events to propagate their data. One should notice that the communication protocol is now up to the reader and writer clients to be implemented. In the example case this has been handled in a quick and dirty way using sleep timeouts. Additionally, it should be noted that other than the basic key - value internally provided event placeholder, an extendable event "placeholder" has been provided to facilitate any type of custom object transport.
Once initialized the producer protocol waits for the communication to be initialized by the consumer. Once the communication initialization packet is received the protocol will retrieve the served record pool configuration and initialization parameters, include it in a response packet and send it to the consumer. Every time a packet is received, if needed an authentication check will be firstly made.
+
  
After the communication is fully initialized the thread enters in the main protocol loop which ends only when the producer or consumer dispose the managed record pool.
+
=== Writer ===
 +
<source lang="java">
 +
import java.net.URI;
 +
import gr.uoa.di.madgik.grs.buffer.IBuffer.Status;
 +
import gr.uoa.di.madgik.grs.events.BufferEvent;
 +
import gr.uoa.di.madgik.grs.events.KeyValueEvent;
 +
import gr.uoa.di.madgik.grs.events.ObjectEvent;
 +
import gr.uoa.di.madgik.grs.proxy.IWriterProxy;
 +
import gr.uoa.di.madgik.grs.record.GenericRecord;
 +
import gr.uoa.di.madgik.grs.record.GenericRecordDefinition;
 +
import gr.uoa.di.madgik.grs.record.RecordDefinition;
 +
import gr.uoa.di.madgik.grs.record.field.FieldDefinition;
 +
import gr.uoa.di.madgik.grs.record.field.StringFieldDefinition;
 +
import gr.uoa.di.madgik.grs.writer.GRS2WriterException;
 +
import gr.uoa.di.madgik.grs.writer.RecordWriter;
  
The first and only step of the protocol loop is to wait for the consumer protocol to send a packet containing its produced events and requests. Once the packet is received the consumer events are processed for their requests. They are marked so that they will not be send back and depending on each event type the respective action is taken. Records are requested from the producer, existing records are packed into a response packet, additional field payload is packed, the locally generated events that can be send are packed and send to the consumer protocol. The protocol will start over and wait for the next requesting packet to be received.
+
public class EventWriter extends Thread
 +
{
 +
  private RecordWriter<GenericRecord> writer=null;
  
 +
  public EventWriter(IWriterProxy proxy) throws GRS2WriterException
 +
  {
 +
    //The gRS will contain only one type of records which in turn contains only a single field
 +
    RecordDefinition[] defs=new RecordDefinition[]{ //A gRS can contain a number of different record definitions
 +
        new GenericRecordDefinition((new FieldDefinition[] { //A record can contain a number of different field definitions
 +
        new StringFieldDefinition("ThisIsTheField") //The definition of the field
 +
      }))
 +
    };
 +
    writer=new RecordWriter<GenericRecord>(
 +
      proxy, //The proxy that defines the way the writer can be accessed
 +
      defs //The definitions of the records the gRS handles
 +
    );
 +
  }
  
=== Local Proxy ===
+
  public URI getLocator() throws GRS2WriterException
A common case in producer – consumer scenarios is the one in which the producer and consumer are collocated, running within the boundaries of the same VM, possibly in different threads if their execution is concurrent or even serially one after the other. In such a situation it is obvious that for performance and for resource utilization reasons one would prefer to be able to access directly through the shared memory address space the products of the producer within the consumer.
+
  {
 +
    return writer.getLocator();
 +
  }
  
 +
  public void receiveWaitKeyValue() throws GRS2WriterException
 +
  {
 +
    while(true)
 +
    {
 +
      BufferEvent ev=writer.receive();
 +
      if(ev==null) try{ Thread.sleep(100);}catch(Exception ex){}
 +
      else
 +
      {
 +
        System.out.println("Received event from reader : "+((KeyValueEvent)ev).getKey()+" - "+((KeyValueEvent)ev).getValue());
 +
        break;
 +
      }
 +
    }
 +
  }
  
[[Image:]]
+
  public void receiveWaitObject() throws GRS2WriterException
 +
  {
 +
    while(true)
 +
    {
 +
      BufferEvent ev=writer.receive();
 +
      if(ev==null) try{ Thread.sleep(100);}catch(Exception ex){}
 +
      else
 +
      {
 +
        System.out.println("Reveived event from reader : "+((ObjectEvent)ev).getItem().toString());
 +
        break;
 +
      }
 +
    }
 +
  }
  
The record pool design and operation facilitates its usage as a shared buffer of records. The synchronization of producing and consuming parties through events makes transparent for both actors the actual location of their counterpart. In this environment the only thing missing to enable optimized local consumption of a record pool is a way to pass to the consumer a reference to the record pool authored by the producer. To do so in a manner that would hide from the two actors the actual location of each other and not force them to apply any kind of logic to distinguish between the two cases, the gRS registration and mediation classes comes in.
+
  public void run()
 +
  {
 +
    try
 +
    {
 +
      writer.emit(new KeyValueEvent("init", "Hello"));
 +
      for(int i=0;i<5;i+=1) this.receiveWaitKeyValue();
 +
      this.receiveWaitObject();
 +
      if(writer.getStatus()!=Status.Dispose) writer.close();
 +
    }catch(Exception ex)
 +
    {
 +
      ex.printStackTrace();
 +
    }
 +
  }
 +
}
 +
</source>
  
Once the producer is ready to provide its output to any consumer interested, it will register its record pool with the ''Registry'' class as described in previous sections. For the registration step an ''IProxy'' implementing instance must be provided. Should the consumer is collocated with the producer (this knowledge is assumed to exist somewhere in the system, quite possibly to the component orchestrating the execution) an instance of the ''LocalProxy'' class will be provided for the registration step. During the registration process the authored record pool will be assigned to the ''LocalProxy'' class. From then on the proxy can be queried to produce an ''IProxyLocator'' instance. The ''LocalProxy'' class will produce an instance of the ''LocalProxyLocator'' class. This locator can me serialized and deserialized to produce a new ''LocalProxyLocator'' instance that can still identify the referenced record pool through the ''RegistryKey'' that was assigned to the record pool and set to the ''LocalProxy. ''This locator can in turn be used to create a new ''LocalProxy'' instance and used to instantiate a new ''Reader'' class that can consume to produced record pool. The record pool used will be a reference to the one produced since the ''LocalProxy'' can lookup the ''Registry'' to retrieve the stored record pool associated with the ''LocalProxyLocator'' stored registry key.
+
=== Reader ===
 +
<source lang="java">
 +
import gr.uoa.di.madgik.grs.events.BufferEvent;
 +
import gr.uoa.di.madgik.grs.events.KeyValueEvent;
 +
import gr.uoa.di.madgik.grs.events.ObjectEvent;
 +
import gr.uoa.di.madgik.grs.reader.ForwardReader;
 +
import gr.uoa.di.madgik.grs.reader.GRS2ReaderException;
 +
import gr.uoa.di.madgik.grs.record.GenericRecord;
 +
import java.net.URI;
  
The current version of the ''LocalProxy'' dos not utilize any security specification. These are used only to authenticate and secure remote communication. There is although the possibility to utilize this in later versions to authenticate also components running within the same VM in a uniform manner as one would for remote ones.
+
public class EventReader extends Thread
 +
{
 +
  ForwardReader<GenericRecord> reader=null;
  
 +
  public EventReader(URI locator) throws GRS2ReaderException
 +
  {
 +
    reader=new ForwardReader<GenericRecord>(locator);
 +
  }
  
=== TCPAdHoc Proxy ===
+
  public void run()
The general case of producer – consumer scenarios is to be unaware of the location of the two actors or specifically know that they are located in different hosts or in general different VMs. This means that no in memory reference can be shared. Producers and consumers must communicate externally to be able to synchronize their operation and exchange data. The protocol of synchronization and data exchange has been details in previous sections.  
+
  {
 +
    try
 +
    {
 +
      BufferEvent ev=reader.receive();
 +
      System.out.println("Reveived event from writer : "+((KeyValueEvent)ev).getKey()+" - "+((KeyValueEvent)ev).getValue());
 +
      reader.emit(new KeyValueEvent("info", "Hello..."));
 +
      reader.emit(new KeyValueEvent("info", "Hello Again..."));
 +
      reader.emit(new KeyValueEvent("info", "Hello And Again..."));
 +
      reader.emit(new KeyValueEvent("info", "And again..."));
 +
      reader.emit(new KeyValueEvent("info", "Bored already ?"));
 +
      reader.emit(new ObjectEvent(new MyEvent("This is a custom object event", 234)));
 +
      reader.close();
 +
    }catch(Exception ex)
 +
    {
 +
      ex.printStackTrace();
 +
    }
 +
  }
 +
}
 +
</source>
  
As has already been described in the ''LocalProxy'' synchronization scenario the readers and writers of a record pool are already able to synchronize based on events emitted through the record pool. In the protocol section we illustrated how the events and data that are passed through a record pool can be transferred and mirrored to a remote copy of the record pool. The only thing left to enable remote production and consumption is a technology to transfer the protocol packets. Multiple protocols can be used and in this section the case of a TCP connection use case will be illustrated. Since the producer and consumer need not be aware of any of the underlying technology details, the proxy and registration classes of the framework are the ones managing all needed technology specific details.
+
=== MyEvent ===
 +
<source lang="java">
 +
import java.io.DataInput;
 +
import java.io.DataOutput;
 +
import gr.uoa.di.madgik.grs.record.GRS2RecordSerializationException;
 +
import gr.uoa.di.madgik.grs.record.IPumpable;
  
As in the case of the ''LocalProxy'' the producer is at some point in need to share his output with a consumer. He will again go through the registration procedure but this time instead of a ''LocalProxy'' proxy instance, a ''TCPAdHocProxy'' instance is passed to the registration method.
+
public class MyEvent implements IPumpable
 +
{
 +
  private String message=null;
 +
  private int count=0;
  
 +
  public MyEvent(String message,int count)
 +
  {
 +
    this.message=message;
 +
    this.count=count;
 +
  }
  
[[Image:]]
+
  public String toString()
 +
  {
 +
    return "event payload message is : '"+this.message+"' with count '"+this.count+"'";
 +
  }
  
The ad hoc TCP proxy for every record pool it is called to mediate, will open a new port in the producer side and wait for the consumer to connect to this port. The consumer side proxy will receive through the locator the port that the producer proxy is listening to and will connect there. After that the security specifications are consulted to create an encoded communication stream or not as already described in different section. Then the chosen in the proxy configuration ''IProtocol'' implementations handle the synchronization. The over the wire communication is done through an implementation of the ''ITransporter'' interface that reads from and writes to the TCP input and output stream, using a ''Cipher'' if needed.
+
  public void deflate(DataOutput out) throws GRS2RecordSerializationException
 +
  {
 +
    try
 +
    {
 +
      out.writeUTF(message);
 +
      out.writeInt(count);
 +
    }catch(Exception ex)
 +
    {
 +
      throw new GRS2RecordSerializationException("Could not serialize event", ex);
 +
    }
 +
  }
  
Once the registration process is completed in the producer the producer's side proxy, transport and protocol are all ready to accept incoming calls. The proxy is in a position to create a valid ''IProxyLocator'' instance which for the ''TCPAdHocProxy'' will be a ''TCPProxyLocator''. The ''TCPProxyLocator ''contains the referenced pool ''RegistryKey'', the hostname the producer is running on, the port the ''ITransporter'' is listening to, the ''IProtocol'' type used, whether the packet headers should be authenticated and whether the communication streams should be encrypted. This locator can be serialized and deserialized into a new ''TCPProxyLocator ''that can be used to initialize a ''Reader'' to access the remote record pool through a newly initialized ''ITransporter'' created from the ''TCPAdHocProxy'' of the consumer that instantiated a consumer side ''IProtocol'' of the same type as the one used by the producer.
+
  public void inflate(DataInput in) throws GRS2RecordSerializationException
 +
  {
 +
    try
 +
    {
 +
      this.message=in.readUTF();
 +
      this.count=in.readInt();
 +
    }catch(Exception ex)
 +
    {
 +
      throw new GRS2RecordSerializationException("Could not deserialize event", ex);
 +
    }
 +
  }
  
 +
  public void inflate(DataInput in, boolean reset) throws GRS2RecordSerializationException
 +
  {
 +
    this.inflate(in);
 +
  }
 +
}
 +
</source>
  
=== TCPServer Proxy ===
+
=== Main ===
The ''TCPAdHocProxy'' described above needs to open a new port for every record pool that needs to be served by the producer. This though it makes individual pool serving autonomic and more robust can become difficult to be maintained in an infrastructure with security restrictions and firewalled communication. In response to this the ''TCPServerProxy'' can be used to bound the communication ports.
+
As in [[GRS2#Main | Simple Main]], using the new writer and reader classes
  
The first time a producer creates an instance of the TCPServer proxy within the boundaries of a VM, a ''TCPTransportServer'' instance is created following the singleton pattern. From then on any instance of the ''TCPServerProxy ''is directed to this server to manage its communication needs. The port the server listens to is configured through the already available configuration class of the proxy instance and after initialized, all traffic will pass through this port.
+
<source lang="java">
 +
  /*...*/
 +
  EventWriter writer=new EventWriter(new TCPWriterProxy());
 +
  EventReader reader=new EventReader(writer.getLocator());
 +
  /*...*/
 +
</source>
  
The server keeps an internal mapping between served record pools and open sockets. Upon request it can provide interested transporters with the socket that is associated with the record pool they are mediating for. Upon disposal of the transporters, the internal state of the server is also purged as far as pool specific information is concerned.  
+
== Multi Field Records ==
 +
=== Writer ===
 +
<source lang="java">
 +
import java.io.File;
 +
import java.net.URI;
 +
import java.net.URL;
 +
import java.util.concurrent.TimeUnit;
 +
import gr.uoa.di.madgik.grs.buffer.IBuffer.Status;
 +
import gr.uoa.di.madgik.grs.proxy.IWriterProxy;
 +
import gr.uoa.di.madgik.grs.record.GenericRecord;
 +
import gr.uoa.di.madgik.grs.record.GenericRecordDefinition;
 +
import gr.uoa.di.madgik.grs.record.RecordDefinition;
 +
import gr.uoa.di.madgik.grs.record.field.Field;
 +
import gr.uoa.di.madgik.grs.record.field.FieldDefinition;
 +
import gr.uoa.di.madgik.grs.record.field.FileField;
 +
import gr.uoa.di.madgik.grs.record.field.FileFieldDefinition;
 +
import gr.uoa.di.madgik.grs.record.field.StringField;
 +
import gr.uoa.di.madgik.grs.record.field.StringFieldDefinition;
 +
import gr.uoa.di.madgik.grs.record.field.URLField;
 +
import gr.uoa.di.madgik.grs.record.field.URLFieldDefinition;
 +
import gr.uoa.di.madgik.grs.writer.GRS2WriterException;
 +
import gr.uoa.di.madgik.grs.writer.RecordWriter;
  
To be able to create such a mapping between the record pool and the specific socket, the server bases its functionality to the consumer send UUID of the record pool it needs to consume as provided through the locator instance it is initialized with. This information is always send out as clear text regardless of specified security specifications, as the configured security specifications are always at a per session scope, while the server needs to recognize the pool UUID regardless of the session that will be created between the two protocol implementations and their transporters.
+
public class MultiWriter extends Thread
 +
{
 +
  private RecordWriter<GenericRecord> writer=null;
 +
 +
  public MultiWriter(IWriterProxy proxy) throws GRS2WriterException
 +
  {
 +
    //The gRS will contain only one type of records which in turn contains only a single field
 +
    RecordDefinition[] defs=new RecordDefinition[]{ //A gRS can contain a number of different record definitions
 +
      new GenericRecordDefinition((new FieldDefinition[] { //A record can contain a number of different field definitions
 +
        new StringFieldDefinition("MyStringField"), //The definition of a string field
 +
        new StringFieldDefinition("MyOtherStringField"), //The definition of a string field
 +
        new URLFieldDefinition("MyHTTPField"), //The definition of an http field
 +
        new FileFieldDefinition("MyFileField") //The definition of a file field
 +
      }))
 +
    };
 +
    writer=new RecordWriter<GenericRecord>(
 +
      proxy, //The proxy that defines the way the writer can be accessed
 +
      defs //The definitions of the records the gRS handles
 +
    );
 +
  }
 +
 +
  public URI getLocator() throws GRS2WriterException
 +
  {
 +
    return writer.getLocator();
 +
  }
  
Furthermore a vulnerability this implementation has is the way the server expects to retrieve the referenced pool UUID to map the opened socket. After opening a socket, the server will block waiting for the consumer to send its pool UUID. Until the information is provided, no other consumer will be accepted by the server. This leaves open ground for a denial of service attack but was chosen not to be handled by a separate thread spawned to save resources.
+
  public void run()
 +
  {
 +
    try
 +
    {
 +
      for(int i=0;i<500;i+=1)
 +
      {
 +
        //while the reader hasn't stopped reading
 +
        if(writer.getStatus()!=Status.Open) break;
 +
        GenericRecord rec=new GenericRecord();
 +
        //Only a string field is added to the record as per definition
 +
        Field[] fs=new Field[4];
 +
        fs[0]=new StringField("Hello world "+i);
 +
        fs[1]=new StringField("Hello world of Diligent");
 +
        fs[2]=new URLField(new URL("http://www.d4science.eu/files/d4science_logo.jpg"));
 +
        fs[3]=new FileField(new File("/bin/ls"));
 +
        rec.setFields(fs);
 +
        //if the buffer is in maximum capacity for the specified interval don't wait any more
 +
        if(!writer.put(rec,60,TimeUnit.SECONDS)) break;
 +
      }
 +
      //if the reader hasn't already disposed the buffer, close to notify reader that no more will be provided
 +
      if(writer.getStatus()!=Status.Dispose) writer.close();
 +
    }catch(Exception ex)
 +
    {
 +
      ex.printStackTrace();
 +
    }
 +
  }
 +
}
 +
</source>
  
 +
=== Reader ===
 +
<source lang="java">
 +
import gr.uoa.di.madgik.grs.reader.ForwardReader;
 +
import gr.uoa.di.madgik.grs.reader.GRS2ReaderException;
 +
import gr.uoa.di.madgik.grs.record.GenericRecord;
 +
import gr.uoa.di.madgik.grs.record.field.FileField;
 +
import gr.uoa.di.madgik.grs.record.field.StringField;
 +
import gr.uoa.di.madgik.grs.record.field.URLField;
 +
import java.net.URI;
  
[[Image:]]
+
public class MultiReader extends Thread
 +
{
 +
  ForwardReader<GenericRecord> reader=null;
 +
 +
  public MultiReader(URI locator) throws GRS2ReaderException
 +
  {
 +
    reader=new ForwardReader<GenericRecord>(locator);
 +
  }
 +
 +
  public void run()
 +
  {
 +
    try
 +
    {
 +
      for(GenericRecord rec : reader)
 +
      {
 +
        //In case a timeout occurs while optimistically waiting for more records form an originally open writer
 +
        if(rec==null) break;
 +
        //Retrieve the required field of the type available in the gRS definitions
 +
        System.out.println(((StringField)rec.getField("MyStringField")).getPayload());
 +
        System.out.println(((StringField)rec.getField("MyOtherStringField")).getPayload());
 +
        System.out.println(((URLField)rec.getField("MyHTTPField")).getPayload());
 +
        //In case of in memory communication, the target file will be the same as the original. In other case
 +
        //the target file will be a local copy of the original file, and depending on the transport directive
 +
        //set by the writer, it will be fully available, or it will gradually become available as data are read
 +
        System.out.println(((FileField)rec.getField("MyFileField")).getPayload() +" : "+((FileField)rec.getField("MyFileField")).getOriginalPayload());
 +
      }
 +
      //Close the reader to release and dispose any resources in both reader and writer sides
 +
      reader.close();
 +
    }catch(Exception ex)
 +
    {
 +
      ex.printStackTrace();
 +
    }
 +
  }
 +
}
 +
</source>
  
== Third-party point of view ==
+
=== Main ===
=== Producing a record pool ===
+
As in [[GRS2#Main | Simple Main]], using the new writer and reader classes
In order for a producer to create and author record pool a record pool writer must be created. All writer classes of a record pool are implementations of the ''IWriter'' interface. A writer is initialized with a set of configuration options. These options are the ones that define the type of the record pool that will be created, the buffer policy that the producer will use, the intermediate record buffer that will hold records before flushing them to the record pool and in general the behavior of the writer.
+
  
The writer monitors the record pool it produces for emitted events. Depending on the type of event caught, the writer may need to perform some operation, bubble the event to its client or for some events it can simply ignore them. The specific needs of each event handling is detailed in previous sections. When the event caught is something that its client might be interested in, the writer will in turn have to be able to emit that event to its client. This is done through the ''WriterState'' class that contains the events that a writer's client can register for and receive notifications. These events include the following:
+
<source lang="java">
 +
  /*...*/
 +
  MultiWriter writer=new MultiWriter(new LocalWriterProxy());
 +
  MultiReader reader=new MultiReader(writer.getLocator());
 +
  /*...*/
 +
</source>
  
* ConsumptionCompletedEvent – The consumer of the record pool has declared that he will no longer be accessing records and fields from the record pool.  
+
== Retrieving Field Payload ==
* ItemRequestedEvent – Either the consumer or the producer's buffer policy requested that a number of additional records are required.
+
As mentioned, depending on the transport directive set by the writer and the type of proxy used for the locator creation, the payload of some fields may not be fully available to the reader. However, the reader's clients should not have to deal with the details of this and should be able to uniformly access the payload of any field. Additionally, each field, depending on its characteristics, may support different transport directives, and may carry internally its payload, or it may only keep a reference to it. Therefore, only the field itself can be aware of how to access its own payload.
* MessageToWriterEvent – A message with the producer as its recipient was emitted by either the consumer or some component of the platform.  
+
* DisposePoolEvent – The record pool is no longer needed by the consumer. Since the gRs support point to point communication this means that the producer's state must also be disposed.
+
  
Depending on the type of writer initialized, the client may have to be able to handle and respond to these events, or it can rely on the writer to simplify its usage by offering blocking mechanisms that will call upon the client to act on a specific request without having to take any notice of the events. The ''BlockingRecordStreamWriter'' class is such an example while the ''RecordStreamWriter'' class is exposing all the needed events a client needs to act only upon notification in an asynchronous fashion.  
+
For example, the URLField, that is set to point to a remotely accessible http location, the field opens an HttpConnection and passes the stream to the client so that from then on he does not need to know the payloads actual location. For a FileField that is accessed remotely from the original file location, the data that are requested from the client are fetched remotely if not already available and are provided to the client through an intermediate local file. All these actions are performed in the background without the client having to perform any action.
  
 +
The way this is achieved is by the MediationInputStream, an input stream implementation that wraps around the Field provided, protocol specific, input stream and from then on handles any remote transport that needs to be made. The field payload mediating input stream is provided by each field. Using the Mediating stream, data are transferred upon request using the field definition's defined chunk size to make the transport buffered. This does of course mean that depending on the size of the payload, a number of round trips need to be made. This will allow for less data been transferred in case only a part of the payload is needed, and also processing can be made on the initial parts while more data is retrieved. On the other hand, if the full payload is needed, the on demand transfer will impose additional overhead. In this cases, a specific field or in fact all the fields of a record can be requested to be made available if supported, in one blocking operation.
  
[[Image:]]
+
In the [[GRS2#Multi_Field_Records | Multi Field]] example, if someone wants to read the HttpField and the FileField, the following snippet in the reader is what is needed
 +
<source lang="java">
 +
BufferedInputStream bin=new BufferedInputStream(rec.getField("MyHTTPField").getMediatingInputStream());
 +
byte[] buffer = new byte[1024];
 +
int bytesRead = 0;
 +
while ((bytesRead = bin.read(buffer)) != -1) { /*...*/ }
  
=== Consuming a record pool ===
+
/*...*/
In order for a consumer to access a record pool a reader class must be initialized. All record pool readers are implementors of the ''IReader'' interface. Reader classes are initialized with a set of configuration options that do not affect the record pool that the readers are accessing but rather the behavior of the reader while accessing the record pool.
+
  
A reader is initialized by providing an ''IProxy'' implementation instantiated though an ''IProxyLocator'' capable of identifying a record pool produced by an ''IWriter'' instance. Through this proxy the reader gets a reference to the record pool it is going to consume no matter if this reference is the actual reference to the producer's record pool or a synchronizable copy.
+
BufferedInputStream bin=new BufferedInputStream(rec.getField("MyFileField").getMediatingInputStream());
 +
byte[] buffer = new byte[1024];
 +
int bytesRead = 0;
 +
while ((bytesRead = bin.read(buffer)) != -1) { /*...*/ }
 +
</source>
  
The consumption of the records contained in the record pool and their fields may vary depending on the readers desired functionality. Random access may be provided over the pool's records as the ''RandomRecordReader'' class provides, or stream like access such as the ''RecordStreamReader'' class provides. The ''Reader'' will subscribe for events produced by the record pool it accesses and depending on their type it may have to perform some operation, bubble the event to its client, or simply ignore it. The specific needs of each event handling is detailed in previous sections. When the event caught is something that its client might be interested in, the reader will in turn have to be able to emit that event to its client. This is done through the Reader''State'' class that contains the events that a reader's client can register for and receive notifications. These events include the following:
+
If the use case is that the a specific field is needed to be read entirely, as previously mentioned, one can request for the entire payload to be retrieved beforehand and not on request. For this, the following line needs to be added before the client starts reading the field payload as shown above
 +
<source lang="java">
 +
rec.getField("MyFileField").makeAvailable();
 +
</source>
  
* ProductionCompletedEvent – The producer of the record pool has declared that he will no longer be producing any more records. This does not mean that the consumption has to stop. The consumer can still access the existing records and their fields.
+
== Random Access Reader ==
* ItemAddedEvent – The producer in response to a direct consuming client's request or to a buffering policy request has added some records to the shared record pool.
+
The example presented this far were targeting a forward only reader. There is also the alternative provided for a random access reader. This reader allows for random seeks forward and backward in the reader side. Since as was already mentioned, a record after it has been accessed by the reader is disposed by the writer, in order to achive going back to a previously accessed record, which is already disposed by the writer, all past records must be locally maintained in the reader side. To avoid bloating the in memory structures of the reader, already read records are persisted. When the client then wants to move back to a record he has already accessed, the desired record as well as a configurable record window ahead of it is restored from the persistency store. The window is always in front of the requested record assuming that forward movement is the predominent one. Note that in case of completely random seeks, a good strategy to achieve maximum performance is to disable the window by setting its size equal to 1. The persistency store that the past records are kept is internally configured. The only currently available one is a disk backed Random Access File. This of course inforces an overhead that for large data is not negligable. Short term plans include the usage of a different persistency backend to make the operation more performant.
* MessageToReaderEvent – A message with the consumer as its recipient was emitted by either the producer or some component of the platform.  
+
* DisposePoolEvent – The record pool is no longer needed. Since the gRs support point to point communication this means that the producer and consumer state must be disposed.
+
  
Depending on the type of reader initialized, the client may have to be able to handle and respond to these events, or it can rely on the reader to simplify its usage by offering blocking mechanisms that will call upon the client to act on a specific request without having to take any notice of the events. The ''BlockingRecordStreamReader'' class is such an example while the ''RecordStreamreader'' and the ''RandomAccessReader ''class are exposing all the needed events a client needs to act only upon notification in an asynchronous fashion.
+
One important detail in this, is that every record that is persisted in the reader side must be fully available once restored. That means that all payload that was previously served by the writer side must now be served directly from reader side. This means that whenever a record is persisted, the makeAvailable method is invoked on the record ensuring that any data that would be lost when the record would be disposed in the writer side, will now be directly accessible from the reader side.
  
 +
In the following example, we assume a gRS created as in the case of the [[GRS2#Writer | Simple Writer]] previously described. We present two ways to access the gRS in a random fashion, one using a list iterator, and one that provides random seeks.
  
[[Image:]]
+
=== Bidirectional Iterator ===
 +
<source lang="java">
 +
import gr.uoa.di.madgik.grs.reader.GRS2ReaderException;
 +
import gr.uoa.di.madgik.grs.reader.RandomReader;
 +
import gr.uoa.di.madgik.grs.record.GenericRecord;
 +
import gr.uoa.di.madgik.grs.record.field.StringField;
 +
import java.net.URI;
 +
import java.util.ListIterator;
  
== Record Store Utility ==
+
public class RandomIteratorReader extends Thread
One of the characteristics of the gRS framework described is that it is point to point. This means that one producer is authoring an ''IRecordPool'', publishes it through a ''IProxyLocator'' and a single consumer can access it and iterate through it and its payload. There are cases where this behavior needs to be enhanced. One can easily think of a scenario where it is desirable for a single ''IRecordPool'' to be accessed by multiple consumers either concurrently or at different times. Furthermore, one might want to be able to replay the same input multiple times in cases of failure for example so that he will not have to reproduce the previously created ''IRecordPool'' or if he is simply not able to reproduce it. Additionally one might want after producing a ''IRecordPool'' to be able to store it for future use without having the overhead of keeping the writer alive and tweaking the lifespan policies in order to do so. These and many more example can come o mind for the need to have a utility that can store and reproduce a previously created ''IRecordPool. ''This need is covered in the gRS framework through the ''StoreRegistry'' class and related utilities and classes. This functionality is provided externally to the ''IWriter'' and ''IReader'' interfaces for a reason. The gRS was designed as point to point to layer out problems such as access reference counting, garbage collection and costly unnecessary disk access operations. As it will become obvious in the following paragraphs, the Record Store utility can cause a performance penalty in simple scenarios that would be otherwise unnecessary, and additionally keeps track of a complex yet not complicated resource purging policy which needs not be incorporated in peer to peer usage.
+
{
 +
  RandomReader<GenericRecord> reader=null;
 +
 +
  public RandomIteratorReader(URI locator) throws GRS2ReaderException
 +
  {
 +
    reader=new RandomReader<GenericRecord>(locator);
 +
  }
 +
 +
  public void run()
 +
  {
 +
    try
 +
    {
 +
      ListIterator<GenericRecord> iter= reader.listIterator();
 +
      while(iter.hasNext())
 +
      {
 +
        GenericRecord rec=iter.next();
 +
        if(rec==null) break;
 +
        System.out.println(((StringField)rec.getField("ThisIsTheField")).getPayload());
 +
      }
 +
      while(iter.hasPrevious())
 +
      {
 +
        GenericRecord rec=iter.previous();
 +
        if(rec==null) break;
 +
        System.out.println(((StringField)rec.getField("ThisIsTheField")).getPayload());
 +
      }
 +
      while(iter.hasNext())
 +
      {
 +
        GenericRecord rec=iter.next();
 +
        if(rec==null) break;
 +
        System.out.println(((StringField)rec.getField("ThisIsTheField")).getPayload());
 +
      }
 +
      reader.close();
 +
    }catch(Exception ex)
 +
    {
 +
      ex.printStackTrace();
 +
    }
 +
  }
 +
}
 +
</source>
  
[[Image:]]
+
=== Random Access ===
 +
<source lang="java">
 +
import gr.uoa.di.madgik.grs.buffer.GRS2BufferException;
 +
import gr.uoa.di.madgik.grs.buffer.IBuffer.Status;
 +
import gr.uoa.di.madgik.grs.reader.GRS2ReaderException;
 +
import gr.uoa.di.madgik.grs.reader.RandomReader;
 +
import gr.uoa.di.madgik.grs.record.GRS2RecordDefinitionException;
 +
import gr.uoa.di.madgik.grs.record.GenericRecord;
 +
import gr.uoa.di.madgik.grs.record.field.StringField;
 +
import java.net.URI;
 +
import java.util.concurrent.TimeUnit;
  
=== StoreRegistry ===
+
public class RandomAccessReader extends Thread
The main player of the Record Store scenario is the ''StoreRegistry''. The rest of the gRS framework's identifying mechanism is centered around the ''Registry'' identifier. An identifier that provides access to a point to point accessed ''IRecordPool'' as was described in previous paragraphs. For the Record Store utility to serve its purpose we need an identifying mechanism that is disjoint from the peer to peer existing mechanism and can engulf them under an identifier that has a more general purpose. This purpose is served by the ''StoreRegistry''.  
+
{
 +
  RandomReader<GenericRecord> reader=null;
 +
 +
  public RandomAccessReader(URI locator) throws GRS2ReaderException
 +
  {
 +
    reader=new RandomReader<GenericRecord>(locator);
 +
  }
 +
 +
  private void read(int count) throws GRS2ReaderException, GRS2RecordDefinitionException, GRS2BufferException
 +
  {
 +
    for(int i=0;i<count;i+=1)
 +
    {
 +
      if(reader.getStatus()==Status.Dispose || (reader.getStatus()==Status.Close && reader.availableRecords()==0)) break;
 +
      GenericRecord rec=reader.get(60, TimeUnit.SECONDS);
 +
      if(rec==null) break;
 +
      System.out.println(((StringField)rec.getField("ThisIsTheField")).getPayload());
 +
    }
 +
  }
 +
 +
  public void run()
 +
  {
 +
    try
 +
    {
 +
      this.read(20);
 +
      reader.seek(-10);
 +
      this.read(9);
 +
      reader.seek(-5);
 +
      this.read(3);
 +
      reader.seek(10);
 +
      this.read(5);
 +
      reader.close();
 +
    }catch(Exception ex)
 +
    {
 +
      ex.printStackTrace();
 +
    }
 +
  }
 +
}
 +
</source>
  
 +
=== Main ===
 +
As in [[GRS2#Main | Simple Main]], using the appropriate respective new reader classe
  
[[Image:]]
+
<source lang="java">
 +
  StringWriter writer=new StringWriter(new LocalWriterProxy());
 +
  RandomIteratorReader reader1=new RandomIteratorReader(writer.getLocator());
 +
  RandomAccessReader reader2=new RandomAccessReader(writer.getLocator());
 +
</source>
  
The scope of the ''StoreRegistry'' class is the virtual machine address space that hosts it. Within its scope the ''StoreRegistry'' utilizes a ''StoreRegistryKey'' to uniquely identify ''StoreRegistryEntries''. Each ''StoreRegistryEntry ''contains all the needed state to enable the Record Store behavior described above for a single ''IRecordPool''. The state kept in the entry is comprised of the following components:
+
== Store ==
 +
As already mentioned, the gRS2 is point to point only and once something has been read it is disposed. Furthermore, once a reader has been initialized, any additional request from a different reader seeking to utilize the same locator to access the previously accessed gRS2, will fail.  However, there are cases where one might want to have a created gRS2 buffer that is reutilized by multiple readers. This is a case different to the one described in [[GRS2#Random_Access_Reader | Random Access Reader]], as in the latter, the gRS2 is read once from its source and it is persisted in local storage from then on. Also, the gRS2 is accessible by the same reader multiple times. In contrast, in the former case, every accessor of the stored gRS2, is a new reader, who can in turn be read by a random access one or only a forward one.
  
* ''RecordSink – ''This class is responsible of accessing one or more incoming ''IRecordPools, ''retrieving when needed records and events that they produce, and storing them in permanent storage throug the ''DiskStorage ''instance.
+
Creating a gRS2 store, a client can control its behaviour with some parameterization
* ''DiskStorage – ''This class is responsible to store in permanent storage the records retrieved by the ''RecordSink'' and serve the ''AccessStore'' instances that wants to access them.
+
* The Store input is provided and it is not necessarily limited to a single incoming gRS2. Any number of incoming gRS2 can be provided and as a result the content of all of them will be multiplexed in a single new gRS2
* ''StoreState – ''The synchronization between the single author of the local permanently stored ''IRecordPool'' and its consumers is performed through events. These events are declared through this instance along with others that have to do with initiation and completion of the sinking procedure, initiation and completion of an access procedure, and other statistics and internal housekeeping related events. These events are not propagated to any client or component outside the gRS Record Store utility. The full list of available events is the following:
+
* The type of multiplexing that will take place between the number of available inputs, is a matter of parametrization and currently only two ways are available, a FIFO and a First Available policy
** ''DisposeStoreEvent''
+
* The amount of time that the created gRS2 will be maintained while inactive and not accessed by any reader is also provided at initialization time
** ''MaintenanceStoreEvent''
+
** ''ShareCompletedStoreEvent''
+
** ''ShareRecordRequestedStoreEvent''
+
** ''ShareStartedStoreEvent''
+
** ''SinkCompletedStoreEvent''
+
** ''SinkRecordAvailableStoreEvent''
+
* ''StoreRegistryEntryStats – ''This class is responsible to keep statistics concerning the current status of the entities involved in the ''RecordStoreEntry'' as well as facilitate the synchronization between the ''RecordSink'' instance and the ''AccessStore'' instances utilizing the events exposed by the ''StoreState'' as well as internal statistics.
+
* ''RecordStoreConfig – ''This class represents the configuration that is provided during the initialization of a Record Store procedure and governs the behavior and lifetime of the store.
+
* ''SinkMultiplexPick'' – Another feature of the Record Store utility is the ability to multiplex and merge multiple incoming ''IRecordPools'' to a single one which is then shared among one or many accessors. This class is responsible to pick which of the available incoming ''IRecordPools'' will be the next to draw items from acording to the initial Record Store configuration.
+
* A list of ''AccessStore – ''These classes each represent a client that is accessing the ''IRecordPool'' that the ''RecordSink'' is authoring and the ''DiskStorage'' is persisting. Each of the clients are independent from one another.
+
  
[[Image:]]
+
The gRS2 Store is meant to be used as a checkpoint utility and is to be used only at appropriate use cases because of the performance penalties and resource consumptions it imposes. These can be identified in the following two:
 +
* A persistency store is utilized by the gRS2 Store in order to have all content of the incoming locators locally available and be able to serve them without relying on the possibly remote or even local input buffers which will be disposed and become unavailable. For this reason everything is brought under the management of the gRS2 Store management and persisted. The only store backend currently available is one based on a Random Access File which imposes non neglectable performance penalties. A new more efficient backend is planed for the short term evolution of the gRS
 +
* Although the gRS2 aims to minimize unnecessary record creation and movement, at most equal to the buffer capacity, the gRS2 Store operates on a different hypothesis. As already mentioned, its primary use is as part of a checkpointing operation. To this end, the Store once initialized will greedily read its inputs until the end no matter if some reader is accessing the stored buffer
  
=== Lifetime ===
+
As already mentioned, when using the gRS2 Store, there is no need to modify anything in either the writer or the reader. the Store is a utility and thus it can be used externally and without affecting the implicated components. The following example demonstrates the needed addition to persist the output of a number of writers and have a number of readers accessing the multiplexed output.
Moving from point to point production and consumption of an ''IRecordPool'' to its intermediate buffering and multi sharing, automatically brings up the issue of lifetime and proper disposal. To properly address the issue the Record Store utility follows the following policies to determine when a Record Store entry needs to be disposed:
+
  
* In memory – The ''StoreRegistry'' keeps all its registry entries in an in memory data structure so in case of VM exit all state is lost. Additionally all data files handled by the ''DiskStorage'' class are marked to be deleted on VM exit. This way every time the VM is terminated all resources are properly purged. Even though this transient storage behavior provides a safety pillow for resource hogging it might not be the behavior one might expect from a Record ''Store''. Provisions have been taken to alter this behavior on request. The only thing missing to do so and the reason it was not provided in this version is the need for a state management module. The StroreRegistry would simply need to persist its registry entries with this module and be able to resume from its previous state once restarted. Of course this approach leaves the risk that the ''StoreRegistry'' persisted state could be out of sink and that would mean that once restarted there would be orphan files left on disk that would never be reclaimed. This problem only gets tougher if one considers that nothing prohibits two ''StoreRegistries'' to operate on the same host in different address spaces thereby both storing data files to the same filesystem. This means that no “best effort” attempts can be made to reclaim orphan files. Of course solutions do exit and they will be evaluated in later versions.
+
<source lang="java">
* Access Leasing – When a ''StoreRegistryEntry'' is created part of the configuration that is provided defines the number of times the entry can be accessed by consumers. After the number has been reached no more consumers can connect to the entry. This both a measure to restrict the times an ''IRecordPool'' can be accessed as well as a lifetime policy enforcer. This way no entry can be left active for an infinite amount of time by continuously accessing it. Since the initial boundary can prove limiting later on, through the ''StoreClient'' one might change this boundary.
+
  StringWriter writer1=new StringWriter(new TCPWriterProxy());
* Idle time – When a ''StoreRegistryEntry'' is created part of the configuration that is provided defines the time an entry can remain inactive before being considered as candidate for disposal. Depending on the operations that are executed on the entry the last activity time of the entry is updated. The operations that update the last activity time are:
+
  StringWriter writer2=new StringWriter(new LocalWriterProxy());
** When the authoring of the ''IRecordPool'' by the ''RecordSink'' is completed.
+
  StringWriter writer3=new StringWriter(new TCPWriterProxy());
** When a consumer requests to consume the ''IRecordPool''.
+
  StringWriter writer4=new StringWriter(new LocalWriterProxy());
** When a consumer completes its consumption of the ''IRecordPool''.
+
  
The idle time policy is thus exceeded in case the authoring of the IRecordPool is too time consuming and no consumer has been interested during this period and in case no consumer has been interested for a large amount of time. Since the initial boundary can prove limiting later on, through the ''StoreClient'' one might change this boundary.
+
  URI []locators=new URI[]{writer1.getLocator(),writer2.getLocator(),writer3.getLocator(),writer4.getLocator()};
 +
  //The respective TCPStoreWriterProxy.store operation could be used to create a locator with a different usage scope
 +
  URI locator=LocalStoreWriterProxy.store(locators, IBufferStore.MultiplexType.FIFO, 60*60, TimeUnit.SECONDS);
  
* Explicit disposal – At runtime one might decide that the Record Store has served its purpose and it is no longer needed. Instead of leaving the rest of the lifetime policies gradually figure that out he mey present a hint for that. Through the ''StoreClient'' one can set the entry's ''DisposeOnIdle'' property indicating that the next time the entry becomes idle it can be considered as candidate for disposal. The property is left somewhat relaxed and not a strict “dispose now” so that in case a consumer is currently accessing the IRecordPool, he will not be affected. Also if before the active consumer, a new consumer is connected, access leasing permitting, he will also be able to complete its operation. If an entry has its ''DisposeOnIdle'' set but it has not yet taken affect one might change its value back to unset.
+
  StringReader reader1=new StringReader(locator);
* Active sharing – Throughout the lifetime of a ''StoreRegistryEntry'' the number of active consumers is kept updated so that all lifetime policy decisions can take it under consideration and not willingly or unwillingly interrupt the operation of an active consumer.
+
  RandomAccessReader reader2=new RandomAccessReader(locator);
 +
  StringReader reader3=new StringReader(locator);
 +
  RandomIteratorReader reader4=new RandomIteratorReader(locator);
 +
  StringReader reader5=new StringReader(locator);
  
The lifetime policy decision making can be summarized in the following. If there is someone that is currently accessing the ''IRecordPool'' then the entry cannot be disposed. Otherwise if the ''DisposeOnIdle'' property is set the ''IRecordPool'' is candidate for disposal. Otherwise the inactivity period is checked and if the last activity time is more than the one initially configured or reset through the ''StoreClient'' the ''IRecordPool'' is also considered candidate for disposal. Otherwise it may not be disposed.
+
  //Start everything
 +
  writer1.start();
 +
  /*...*/
 +
  reader5.start();
  
=== Record Availability ===
+
  //Join everyhting
As it has become apparent from previous paragraphs the Record Store utility makes a best effort attempt to provide a permanent storage service and a complete delivery of the multi sharing service. Another point that needs to be made clear is the record and field availability that the Record Store can provide depending on the incoming ''IRecordPools'' and the access modes it can be configured to utilize.
+
  writer1.join();
 +
  /*...*/
 +
  reader5.join();
 +
</source>
  
Since the operation of the Record Store is in plain words to read its input, whatever it might be, store it, whatever it might be, and then possibly serve it, whatever it might be, to interesting consumers, it needs to use a layer of abstraction as to the nature of the content it moves around. These abstractions are naturally the ones provided by the gRS itself, namely the ''IRecord,'' ''IField ''and'' PoolStateEvent'' ones. Whenever a record is read by one of its inputs, the RecordSink reads it as an ''IRecord'', and forwards it to the ''DiskStorage'' class. Similarly in case a ''PoolStateEvent'' is bubbled to it and is something that should be again forwarded to any future consumer, such as a ''MessageToReaderEvent'', it too is forwarded to the ''DiskStorage'' class.  
+
Since it is evident that through the gRS2 Store the writer and reader are disconnected, any events that are emitted by the writer, will reach the reader either the time it is emitted, if the reading is performed wile the store is actively persisting its inputs, or at initialization time of the reader where all the events that have been emitted up until that point are emitted all at once. Events that are emitted by the reader will never reach the writers whose output is persisted.
  
In the later case no payload availability issue arises as the full payload of the event is present along with the ''IMarshalableObject'' payload that may be contained in the message. But since the instance of the ''IMarshalableObject'' itself may be anything the producer provided, the Record Store cannot guarantee that the resource that may be referenced from the ''IMarshalableObject'' will still be valid when it replays the ''IRecordPool'' to the consumer. In case for example of an ''IMarhsalableObject'' that contains a reference to an ftp accessible resource in a server that the producer is managing, the producer might very well decide after its production and consumption is completed, to remove the ftp resource. In this case the consumer of the Record Store will receive the ''MessageToReaderEvent'', will see the ftp url provided by the producer but may not be able to access the resource. As a side note it should be noted that the ''MessageToReaderEvents'' the ''RecordSink'' receives while accessing the incoming ''IRecordPools'' are later one replayed in the order they were received.
+
==Utilities and Convenience tools==
 +
===Record Import===
 +
It is quite common that a consumer might wish to forward a record to a producer using the same record definition, either completely unaltered or with minor modifications.
 +
To avoid explicitly copying the record, thus wasting valuable computational resources, one has the option of importing records directly to a producer.
 +
This can be done with the <code>importRecord</code> methods, which have the same signatures as the <code>put</code> methods used to add entirely new records to a writer.
  
In the former case of the ''IRecords'' received and forwarded to the ''DiskStorage'', there are similar difficulties. An ''IRecord'' has a ''TransportDirective'' property associated with it as do the ''IFields'' that it is comprised of. Depending on their initialization by the producer and the specific implimentation of these directives from the payload containing ''IField'' the effect they may have would be that the time the specific ''IRecord'' reaches the consumer, in this case the Record Store ''RecordSink'', the payload of the ''IFields'' may not be fully present. In case the ''DiskStorage'' simply stores the available payload, this will mean that it will only be able to serve consumers the payload it will have stored, meaning not the entire payload if it might become needed. To cover this case the ''RecordSink'' process requests its ''IRecord'' to make its content fully available as if ''TransportDirective'' to the ''IRecord'' as well as its comprising ''IFields'' was set to move the content fully from the beginning. But agan this covers only a subset of the cases that a problem might arise. This will for example make fully available the content of a ''LocalFSReferenceField'' but as in the case of the ''MessageToReaderEvent'' described above where the actual payload is an externally provided resource, the payload of a ''FTPReferenceField'' may or may not be available when it is needed. Another problem with this necessary approach is that the process of sinking may become very costly in terms of data transportation while at the bottom line the transported data may not even be needed by the final consumers. This is yet another reason why the Record Store utility is offered externally to the standard behavior of the ''IReader – IWriter'' operation.
+
The following snippet demonstrates how one can forward all records read from a reader to a writer without making any modification
 +
<source lang="java">
 +
for(GenericRecord rec: reader) {
 +
  if(writer.getStatus() == Status.Close || writer.getStatus() == Status.Dispose)
 +
      break;
 +
  if(!writer.importRecord(rec, 60, TimeUnit.SECONDS))
 +
break;
 +
}
 +
</source>
  
A solution to the partial availability of an ''IField'' payload would be to include a method in the IField and IMarshalableObject contract to transform any third party available resources to resources that can be handled internally. For example an ''FTPReferenceField ''may if needed transform itself to a ''LocalFSReferenceField'' and serve the payload itself. Of course this would mean that the Record Store utility would then possibly need to modify the semantics of each ''IRecord'' it moves which might be interfering with what the consumer is expecting to retrieving each ''IRecord''. Additionally such a transformation as described above may not even be possible. This and other alternatives will be evaluated in future versions.
+
The following snippet demonstrates how one can modify a known record field, leaving all other fields unchanged. In fact, one can be unaware of the existence of fields other than those which one is interested.
  
Another aspect of the record availability issue concerns the time the ''RecordSink'' accesses the incoming ''IRecordPools ''entries. Depending on the configuration of the ''StoreRegistryEntry'' during initialization the sinking process can start independently of the consumption of the produced ''IRecordPool'' from the ''StoreAccess ''instances that act on behalf of the consumers, or it may be tightly bound to them. In the formes case once initialized, the ''RecordSink'' starts iterating over all the incoming ''IRecordPools'' multiplexing the entries it receives on a record level depending on its configuration as the ''SinkMultiplexPick'' instance dictates until all ''IRecordPools'' have been fully consumed. In the later case no access is performed to the incoming ''IRecordPools'', unless the used buffering policy requires it, and the ''RecordSink'' waits until some ''StoreAccess'' is instantiated and starts requesting records. At this point the Record Store first checks if the requested record item can be served by one of the records that the ''RecordSink'' has already consumed form the incoming ''IRecordPools'' and stored in the ''DiskStorage''. If so it serves it and the ''RecordSink'' is not notified at all. Otherwise the ''RecordSink'' receives an event to access a new item from its input which after stored is consumed as requested. This simplified description does not of course mean that in the consumer side the third party code that used the ''IReader'' which through the ''StoreAccess'' retrieves the requested records will have to endure all this delay of a second jump to a possibly different host, the retrieval of the I''Record'', the full payload transport, the serialization of the ''IRecord'' and later the reading from the file and over the network to finally reach him. Even though this is the simplified route to be followed, the buffering configuration that can be provided at the ''IReader'' side as well as a configuration parameter of the ''StoreRegistryEntry'' instantiation, will make all this happen well before the actual client code needs the record that made this journey.
+
<source lang="java">
 +
for(GenericRecord rec: reader) {
 +
  ((StringField)rec.getField("payloadField")).setPayload("Modified payload");
 +
  if(writer.getStatus() == Status.Close || writer.getStatus() == Status.Dispose)
 +
      break;
 +
  if(!writer.importRecord(rec, 60, TimeUnit.SECONDS))
 +
break;
 +
}
 +
</source>
  
=== Access ===
+
===Keep-Alive Functionality===
A third party client sees only one entry point to the Record Store Utility. This entry point is the ''StoreClient'' class. This class is responsible for creating a new ''StoreRegistryEntry'' initializing the access, multiplexing and storing the incoming set of ''IRecordPools'', retrieving the status of an existing ''StoreRegistryEntry'', changing configuration parameters that affect the operation of a specific ''StoreRegistryEntry'', as well as retrieving a new ''IProxyLocator'' to enable a new consumer to iterate over the store produced ''IRecordPool''. Upon initialization of a new ''StoreRegistryEntry'', a way to identify the produced ''StoreRegistryEntry'' is produced. This identification mechanism is the ''IStoreLocator'' and is a simplification of the ''IProxyLocator'' and is similar in logic. There are different ways to create an ''IStoreLocator'' depending on the way the ''StoreRegistryEntry'' will need to be contacted again and the possibilities it provides based on its configuration. The available ''IStoreLocator'' implementations which define the way the ''StoreRegistry'' can be contacted to retrieve the needed ''StoreRegistryEntry ''are the following two:
+
Both forward and random access readers can have keep-alive functionality added to them using a typical decorator pattern. This functionality can be useful if a consumer wishes to delay the consumption of records for long periods of time and the producer has no indication of this behavior. In simple cases when the implementation of a transport protocol using events is not desirable, a keep-alive reader can be used in order to keep the communication channel open, by periodically reading records. Keep-alive functionality is transparent to the client, as each time a <code>get</code> method is called, the record either originates from the set of prefetched records, or it is actually read directly from the underlying reader. The order of records is guaranteed to be the same as if the underlying reader was used on its own.
 +
The following snippet demonstrates the creation of a keep-alive reader with a period of record retrieval of 30 seconds:
 +
<source lang="java">
 +
IRecordReader<Record> reader = new ForwardReader<Record>(this.locator);
 +
reader = new KeepAliveReader<Record>(reader, 30, TimeUnit.SECONDS);
 +
</source>
  
* ''LocalStoreLocator'' – This locator is only capable of identifying a ''StoreRegistryEntry'' as long as it is used in the same address space as the ''StoreRegistry'' that hosts the needed entry.
+
===Locator Utilities===
* ''TCPStoreLocator'' – This locator is capable of identifying any ''StoreRegistryEntry'' independently of the host it resides. It uses a dedicated server listening on some configurable port and serving access to the ''StoreRegistry'' instance that resides in the same address space.
+
The <code>Locators</code> class provides locator-related convenience methods.
 +
Currently, the class provides a Local-to-TCP converter utility which can be useful if it cannot be known in advance whether a produced resultset should be available only locally or be exposed remotely through TCP. The result producer could then produce its results locally and the results can be exposed through TCP by converting the locator to a TCP locator if it is deemed necessary.
 +
The exact way of converting a local locator to TCP is shown in the following snippet:
 +
<source lang="java">
 +
URI localLocator = ... //An operation returning a local locator
 +
URI TCPLocator = Locators.localToTCP(localLocator);
 +
</source>
  
[[Image:]]
+
== HTTP Protocol ==
 +
gRS can also work on top of HTTP instead of TCP. The transferred messages are in XML format.
 +
There are corresponding ''HTTP'' classes for ConnectionManger, Proxies, etc that can be used instead of the ''TCP'' classes to enable this functionality.
  
The type of ''IStoreLocator'' that will be created is defined during initialization of the ''StoreRegistryEntry'' and it remains the only contact entry point for the ''StoreRegistryEntry'' it identifies. This means that once a specific type of ''IStoreLocator'' is requested to be initialized a client cannot request to receive also a different type of ''IStoreLocator''. Since the type of ''IStoreLocator'' is not tied in any way to the types of ''IProxyLocators'' a client may request the ''StoreClient'' to produce for the specific ''IStoreLocator'', this does not impose a significant performance penalty in case a ''TCPStoreLocator'' is created while a ''LocalStoreLocator'' would have suffice. But it does serve as a security enforcing mechanism when one wants to make sure that the specific ''StoreRegistryEntry'' cannot jump over the boundaries of a specific host. Even still, this policy can easily be changed and additional methods can be added to create a new ''IStoreLocator'' of a different type even after the initialization of the ''StoreRegistryEntry''. After the initialization of the StoreRegistryEntry a client code can use the Access method of the ''StoreClient'' to retrieve new ''IProxyLocators'' pointing to the ''StoreRegistryEntry'' authored ''IRecordPool''. The retrieved ''IProxyLocators'' are enriched in this case with the ''IStoreLocator'' that was used to reach this ''IRecordPool''. This way if the client receiving the created ''IProxyLocator'' will be able to know not only that it is being served an ''IRecordPool'' that is stored with the Record Store Utility, but also will have a contact point to that ''StoreRegistryEntry'' so as to create new ''IproxyLocators,'' policy and ''IStoreLocator'' type permitting.
+
'''Note''' that a both ends need to use the same protocol in order to be able to communicate.

Latest revision as of 18:02, 11 December 2013

Introduction

The goal of the gRS framework is to enable point to point producer consumer communication. For this to be achieved and connect collocated or remotely located actors the framework protects the two parties from technology specific knowledge and limitations. Additionally it can provide upon request value adding functionalities to enhance the clients communication capabilities.

In the process of offering these functionalities to its clients the framework imposes minimal restrictions on the clients design. Providing an interface similar to commonly used programming structures such as a collection and a stream, client programs can easily incorporate it.

Furthermore, since one of the gRS main goals is to provide location independent services, it is modularly build to allow a number of different technologies to be used as transport and communication mediation depending on the scenario used. For communication through trusted channels a clear TCP connection can be used for fast delivery. Authentication of communication parties is build in to the framework and can be utilized on communication packet level. In situations where more secure connections are required, the full communication stream can be fully encrypted. In cases of firewalled communication, an http transport mechanism can be easily incorporated and supplied by the framework without a single change in the clients code. In case of collocated actors, in memory references can be directly passed from producer to consumer without either of them changing their behavior in any of the described cases.

In a disjoint environment where the producer and consumer are not customly made to interact only with each other or even in the case that the needs of the consumer change from invocation to invocation, a situation might appear where the producer generates more data than the consumer needs even at a per record scope. The gRS enables fine grained transport mechanisms at level not just of a record, but also at the level of record field. A record containing more than one fields may be needed entirely by its consumer or at the next interaction only a single field of the record may be needed. In such a situation a consumer can retrieve only the payload he is interested in without having to deal with any additional payload. Even at the level of the single field, the transported data can be further optimized to consume only just the needed bytes instead of having to transfer a large amount of data which will at the end be disposed without being used.

In modern systems the situation frequently appears where the producers output is already available for access through some protocol. The gRS has a very extendable design as to the way the producer will serve its payload. A number of protocols can be used, ftp, http, network storage, local filesystem, and many more can be implemented very easily and modularly.

gRS2 component is available in our Maven repositories with the following coordinates:

<groupId>org.gcube.execution</groupId>
<artifactId>grs2library</artifactId>
<version>...</version>

TCP Connection Manager

In case the gRS created is to be shared with a remote, or in general in a different address space than the one the writer is in, a TCPConnectionManager should be used. A TCPConnectionManager is static in the context of the address space it is used and is initialized though a singleton pattern. Therefore, one can safely initialize it in any context it is useful for him and do not worry if other components are also trying to initialize it. An example of initializing the TCPConnectionManager is shown in the following snippet.

List<PortRange> ports=new ArrayList<PortRange>(); //The ports that the TCPConnection manager should use
ports.add(new PortRange(3000, 3050));             //Any in the range between 3000 and 3050
ports.add(new PortRange(3055, 3055));             //Port 3055
TCPConnectionManager.Init(
  new TCPConnectionManagerConfig("localhost", //The hostname by which the machine is reachable 
    ports,                                    //The ports that can be used by the connection manager
    true                                      //If no port ranges were provided, or none of them could be used, use a random available port
));
TCPConnectionManager.RegisterEntry(new TCPConnectionHandler());      //Register the handler for the gRS2 incoming requests
TCPConnectionManager.RegisterEntry(new TCPStoreConnectionHandler()); //Register the handler for the gRS2 store incoming requests

A gCube service could use the onInit event to make this initialization. The TCPConnectionManager is located in the MadgikCommons package.

Definitions

Every gRS needs to have explicitly defined the definitions of the records it holds. The record definitions in turn contain the field definitions they contain. Every record that is then handled by the specific gRS must comply to one of the definitions initially provided to the gRS. For example, creating a gRS that handles only one type of records that in turn contains only a single type of field is presented in the following snippet

//The gRS will contain only one type of records which in turn contains only a single field 
RecordDefinition[] defs=new RecordDefinition[]{          //A gRS can contain a number of different record definitions
    new GenericRecordDefinition((new FieldDefinition[] { //A record can contain a number of different field definitions
    new StringFieldDefinition("ThisIsTheField")          //The definition of the field
  }))
};

Record Writer Initialization

To initialize a gRS2 writer one must simply create a new instance of the available writers. Depending on the configuration detail one wishes to set, a number of different constructors are available

writer=new RecordWriter<GenericRecord>(
  proxy, //The proxy that defines the way the writer can be accessed
  defs   //The definitions of the records the gRS handles
);
 
writer=new RecordWriter<GenericRecord>(
  proxy, //The proxy that defines the way the writer can be accessed
  defs,  //The definitions of the records the gRS handles
  50,    //The capacity of the underlying synchronization buffer
  2,     //The maximum number of parallel records that can be concurrently accessed on partial transfer  
  0.5f   //The maximum fraction of the buffer that should be transferred during mirroring  
);
 
writer=new RecordWriter<GenericRecord>(
  proxy,           //The proxy that defines the way the writer can be accessed
  defs,            //The definitions of the records the gRS handles
  50,              //The capacity of the underlying synchronization buffer
  2,               //The maximum number of parallel records that can be concurrently accessed on partial transfer  
  0.5f,            //The maximum fraction of the buffer that should be transferred during mirroring 
  60,              //The timeout in time units after which an inactive gRS can be disposed  
  TimeUnit.SECONDS //The time unit in timeout after which an inactive gRS can be disposed
);

For convenience, if one wishes to create a writer with the same configuration as a already available reader, one can use one of the following constructors

writer=new RecordWriter<GenericRecord>(
  proxy, //The proxy that defines the way the writer can be accessed
  reader //The reader to copy configuration from
);
 
writer=new RecordWriter<GenericRecord>(
  proxy, //The proxy that defines the way the writer can be accessed
  reader, //The reader to copy configuration from
  50,    //The capacity of the underlying synchronization buffer
  2,     //The maximum number of parallel records that can be concurrently accessed on partial transfer  
  0.5f   //The maximum fraction of the buffer that should be transferred during mirroring  
);
 
writer=new RecordWriter<GenericRecord>(
  proxy, //The proxy that defines the way the writer can be accessed
  reader, //The reader to copy configuration from
  50,    //The capacity of the underlying synchronization buffer
  2,     //The maximum number of parallel records that can be concurrently accessed on partial transfer  
  0.5f,   //The maximum fraction of the buffer that should be transferred during mirroring  
  60,              //The timeout in time units after which an inactive gRS can be disposed  
  TimeUnit.SECONDS //The time unit in timeout after which an inactive gRS can be disposed
);

In the first of the above examples, all configuration is copied from the reader. The extra configuration parameters override the ones of the reader, meaning that in the last example only the record definitions are duplicated from the reader configuration.

Custom Records

All records that are handled through gRS2 must extend a base class of gr.uoa.di.madgik.grs.record.Record. Extending this class means implementing a couple of required methods which are mainly used to properly manage the transport of the custom record and any additional information it might cary. Although the actual payload of the record is to be managed through fields, one can also assign additional information in the record itself. In the example that follows, a custom record with its respective definition is created. The record hides the fields that it manages by exposing direct getters and setters over a known definition that it provides and should be used with. Additionally, it defines additional information that describes the record as a whole and instead of using additional fields in order to do this, it defines placeholders within the record itself.

Definition

import java.io.DataInput;
import java.io.DataOutput;
import gr.uoa.di.madgik.grs.buffer.IBuffer.TransportDirective;
import gr.uoa.di.madgik.grs.record.GRS2RecordSerializationException;
import gr.uoa.di.madgik.grs.record.RecordDefinition;
import gr.uoa.di.madgik.grs.record.field.FieldDefinition;
import gr.uoa.di.madgik.grs.record.field.StringFieldDefinition;
 
public class MyRecordDefinition extends RecordDefinition
{
  public static final String PayloadFieldName="payload";
  public static final boolean DefaultCompress=false;
 
  public MyRecordDefinition()
  {
    StringFieldDefinition def=new StringFieldDefinition(MyRecordDefinition.PayloadFieldName);
    def.setTransportDirective(TransportDirective.Full);
    def.setCompress(MyRecordDefinition.DefaultCompress);
    this.Fields = new FieldDefinition[]{def};
  }
 
  public MyRecordDefinition(boolean compress)
  {
    StringFieldDefinition def=new StringFieldDefinition(MyRecordDefinition.PayloadFieldName);
    def.setTransportDirective(TransportDirective.Full);
    def.setCompress(compress);
    this.Fields = new FieldDefinition[]{def};
  }
 
  @Override
  public boolean extendEquals(Object obj)
  {
    if(!(obj instanceof MyRecordDefinition)) return false;
    return true;
  }
 
  @Override
  public void extendDeflate(DataOutput out) throws GRS2RecordSerializationException
  {
    //nothing to deflate
  }
 
  @Override
  public void extendInflate(DataInput in) throws GRS2RecordSerializationException
  {
    //nothing to inflate
  }
}

Record

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import gr.uoa.di.madgik.grs.GRS2Exception;
import gr.uoa.di.madgik.grs.buffer.GRS2BufferException;
import gr.uoa.di.madgik.grs.record.GRS2RecordDefinitionException;
import gr.uoa.di.madgik.grs.record.GRS2RecordSerializationException;
import gr.uoa.di.madgik.grs.record.Record;
import gr.uoa.di.madgik.grs.record.field.Field;
import gr.uoa.di.madgik.grs.record.field.StringField;
 
public class MyRecord extends Record
{
  private String id="none";
  private String collection="none";
 
  public MyRecord()
  {
    this.setFields(new Field[]{new StringField()});
  }
 
  public MyRecord(String payload)
  {
    this.setFields(new Field[]{new StringField(payload)});
  }
 
  public MyRecord(String id,String collection)
  {
    this.setId(id);
    this.setCollection(collection);
    this.setFields(new Field[]{new StringField()});
  }
 
  public MyRecord(String id,String collection,String payload)
  {
    this.setId(id);
    this.setCollection(collection);
    this.setFields(new Field[]{new StringField(payload)});
  }
 
  public void setId(String id) { this.id = id; }
 
  public String getId() { return id; }
 
  public void setCollection(String collection) { this.collection = collection; }
 
  public String getCollection() { return collection; }
 
  public void setPayload(String payload) throws GRS2Exception
  {
    this.getField(MyRecordDefinition.PayloadFieldName).setPayload(payload);
  }
 
  public String getPayload() throws GRS2Exception
  {
    return this.getField(MyRecordDefinition.PayloadFieldName).getPayload();
  }
 
  @Override
  public StringField getField(String name) throws GRS2RecordDefinitionException, GRS2BufferException
  {
    return (StringField)super.getField(name);
  }
 
  @Override
  public void extendSend(DataOutput out) throws GRS2RecordSerializationException
  {
    this.extendDeflate(out);
  }
 
  @Override
  public void extendReceive(DataInput in) throws GRS2RecordSerializationException
  {
    this.extendInflate(in, false);
  }
 
  @Override
  public void extendDeflate(DataOutput out) throws GRS2RecordSerializationException
  {
    try
    {
      out.writeUTF(this.id);
      out.writeUTF(this.collection);
    }catch(IOException ex)
    {
      throw new GRS2RecordSerializationException("Could not deflate record", ex);
    }
  }
 
  @Override
  public void extendInflate(DataInput in, boolean reset) throws GRS2RecordSerializationException
  {
    try
    {
      this.id=in.readUTF();
      this.collection=in.readUTF();
    }catch(IOException ex)
    {
      throw new GRS2RecordSerializationException("Could not inflate record", ex);
    }
  }
 
  @Override
  public void extendDispose()
  {
    this.id=null;
    this.collection=null;
  }
}

Simple Example

In the following snippets, a simple writer and respective reader is presented to show the full usage of the presented elements

Writer

import java.net.URI;
import java.util.concurrent.TimeUnit;
import gr.uoa.di.madgik.grs.buffer.IBuffer.Status;
import gr.uoa.di.madgik.grs.proxy.IWriterProxy;
import gr.uoa.di.madgik.grs.record.GenericRecord;
import gr.uoa.di.madgik.grs.record.GenericRecordDefinition;
import gr.uoa.di.madgik.grs.record.RecordDefinition;
import gr.uoa.di.madgik.grs.record.field.Field;
import gr.uoa.di.madgik.grs.record.field.FieldDefinition;
import gr.uoa.di.madgik.grs.record.field.StringField;
import gr.uoa.di.madgik.grs.record.field.StringFieldDefinition;
import gr.uoa.di.madgik.grs.writer.GRS2WriterException;
import gr.uoa.di.madgik.grs.writer.RecordWriter;
 
public class StringWriter extends Thread
{
  private RecordWriter<GenericRecord> writer=null;
 
  public StringWriter(IWriterProxy proxy) throws GRS2WriterException
  {
    //The gRS will contain only one type of records which in turn contains only a single field 
    RecordDefinition[] defs=new RecordDefinition[]{          //A gRS can contain a number of different record definitions
        new GenericRecordDefinition((new FieldDefinition[] { //A record can contain a number of different field definitions
        new StringFieldDefinition("ThisIsTheField")          //The definition of the field
      }))
    };
    writer=new RecordWriter<GenericRecord>(
        proxy, //The proxy that defines the way the writer can be accessed
        defs   //The definitions of the records the gRS handles
      );
    }
 
    public URI getLocator() throws GRS2WriterException
    {
      return writer.getLocator();
    }
 
    public void run()
    {
      try
      {
        for(int i=0;i<500;i+=1)
        {
          //while the reader hasn't stopped reading
          if(writer.getStatus()!=Status.Open) break;
          GenericRecord rec=new GenericRecord();
          //Only a string field is added to the record as per definition
          rec.setFields(new Field[]{new StringField("Hello world "+i)});
          //if the buffer is in maximum capacity for the specified interval don;t wait any more
          if(!writer.put(rec,60,TimeUnit.SECONDS)) break;
        }
        //if the reader hasn't already disposed the buffer, close to notify reader that no more will be provided 
        if(writer.getStatus()!=Status.Dispose) writer.close();
      }catch(Exception ex)
      {
        ex.printStackTrace();
      }
    }
}

Reader

import gr.uoa.di.madgik.grs.reader.ForwardReader;
import gr.uoa.di.madgik.grs.reader.GRS2ReaderException;
import gr.uoa.di.madgik.grs.record.GenericRecord;
import gr.uoa.di.madgik.grs.record.field.StringField;
import java.net.URI;
 
public class StringReader extends Thread
{
  ForwardReader<GenericRecord> reader=null;
 
  public StringReader(URI locator) throws GRS2ReaderException
  {
    reader=new ForwardReader<GenericRecord>(locator);
  }
 
  public void run()
  {
    try
    {
      for(GenericRecord rec : reader)
      {
        //In case a timeout occurs while optimistically waiting for more records form an originally open writer
        if(rec==null) break;
        //Retrieve the required field of the type available in the gRS definitions
        System.out.println(((StringField)rec.getField("ThisIsTheField")).getPayload());
      }
      //Close the reader to release and dispose any resources in boith reader and writer sides
      reader.close();
    }catch(Exception ex)
    {
      ex.printStackTrace();
    }
  }
}

Main

import java.util.ArrayList;
import java.util.List;
import gr.uoa.di.madgik.commons.server.PortRange;
import gr.uoa.di.madgik.commons.server.TCPConnectionManager;
import gr.uoa.di.madgik.commons.server.TCPConnectionManagerConfig;
import gr.uoa.di.madgik.grs.proxy.tcp.TCPConnectionHandler;
import gr.uoa.di.madgik.grs.proxy.tcp.TCPStoreConnectionHandler;
import gr.uoa.di.madgik.grs.proxy.tcp.TCPWriterProxy;
 
public class MainTest
{
  public static void main(String []args) throws Exception
  {
    List<PortRange> ports=new ArrayList<PortRange>(); //The ports that the TCPConnection manager should use
    ports.add(new PortRange(3000, 3050));             //Any in the range between 3000 and 3050
    ports.add(new PortRange(3055, 3055));             //Port 3055
    TCPConnectionManager.Init(
      new TCPConnectionManagerConfig("localhost", //The hostname by which the machine is reachable 
        ports,                                    //The ports that can be used by the connection manager
        true                                      //If no port ranges were provided, or none of them could be used, use a random available port
    ));
    TCPConnectionManager.RegisterEntry(new TCPConnectionHandler());      //Register the handler for the gRS2 incoming requests
    TCPConnectionManager.RegisterEntry(new TCPStoreConnectionHandler()); //Register the handler for the gRS2 store incoming requests
 
    // gr.uoa.di.madgik.grs.proxy.local.LocalWriterProxy Could be used instead for local only, all in memory, access
    StringWriter writer=new StringWriter(new TCPWriterProxy());
    StringReader reader=new StringReader(writer.getLocator());
 
    writer.start();
    reader.start();
 
    writer.join();
    reader.join();
  }
}

Events

As was already mentioned, the gRS2 serves as a bidirectional communication channel through events that can be propagated from either the writer to the reader or form the reader to the writer along the normal data flow. These events can serve any purpose fitting to the application logic, from control signals to full data transport purposes, although in the latter case, the payload size should be kept moderate as it is always handled in memory and cannot use services such as partial transfer, or be declared using internally managed files.

An example that has been taken to the extremes follows. In this example, a writer and a reader use only events to propagate their data. One should notice that the communication protocol is now up to the reader and writer clients to be implemented. In the example case this has been handled in a quick and dirty way using sleep timeouts. Additionally, it should be noted that other than the basic key - value internally provided event placeholder, an extendable event "placeholder" has been provided to facilitate any type of custom object transport.

Writer

import java.net.URI;
import gr.uoa.di.madgik.grs.buffer.IBuffer.Status;
import gr.uoa.di.madgik.grs.events.BufferEvent;
import gr.uoa.di.madgik.grs.events.KeyValueEvent;
import gr.uoa.di.madgik.grs.events.ObjectEvent;
import gr.uoa.di.madgik.grs.proxy.IWriterProxy;
import gr.uoa.di.madgik.grs.record.GenericRecord;
import gr.uoa.di.madgik.grs.record.GenericRecordDefinition;
import gr.uoa.di.madgik.grs.record.RecordDefinition;
import gr.uoa.di.madgik.grs.record.field.FieldDefinition;
import gr.uoa.di.madgik.grs.record.field.StringFieldDefinition;
import gr.uoa.di.madgik.grs.writer.GRS2WriterException;
import gr.uoa.di.madgik.grs.writer.RecordWriter;
 
public class EventWriter extends Thread
{
  private RecordWriter<GenericRecord> writer=null;
 
  public EventWriter(IWriterProxy proxy) throws GRS2WriterException
  {
    //The gRS will contain only one type of records which in turn contains only a single field 
    RecordDefinition[] defs=new RecordDefinition[]{ //A gRS can contain a number of different record definitions
        new GenericRecordDefinition((new FieldDefinition[] { //A record can contain a number of different field definitions
        new StringFieldDefinition("ThisIsTheField") //The definition of the field
      }))
    };
    writer=new RecordWriter<GenericRecord>(
      proxy, //The proxy that defines the way the writer can be accessed
      defs //The definitions of the records the gRS handles
    );
  }
 
  public URI getLocator() throws GRS2WriterException
  {
    return writer.getLocator();
  }
 
  public void receiveWaitKeyValue() throws GRS2WriterException
  {
    while(true)
    {
      BufferEvent ev=writer.receive();
      if(ev==null) try{ Thread.sleep(100);}catch(Exception ex){}
      else 
      {
        System.out.println("Received event from reader : "+((KeyValueEvent)ev).getKey()+" - "+((KeyValueEvent)ev).getValue());
        break;
      }
    }
  }
 
  public void receiveWaitObject() throws GRS2WriterException
  {
    while(true)
    {
      BufferEvent ev=writer.receive();
      if(ev==null) try{ Thread.sleep(100);}catch(Exception ex){}
      else 
      {
        System.out.println("Reveived event from reader : "+((ObjectEvent)ev).getItem().toString());
        break;
      }
    }
  }
 
  public void run()
  {
    try
    {
      writer.emit(new KeyValueEvent("init", "Hello"));
      for(int i=0;i<5;i+=1) this.receiveWaitKeyValue();
      this.receiveWaitObject();
      if(writer.getStatus()!=Status.Dispose) writer.close();
    }catch(Exception ex)
    {
      ex.printStackTrace();
    }
  }
}

Reader

import gr.uoa.di.madgik.grs.events.BufferEvent;
import gr.uoa.di.madgik.grs.events.KeyValueEvent;
import gr.uoa.di.madgik.grs.events.ObjectEvent;
import gr.uoa.di.madgik.grs.reader.ForwardReader;
import gr.uoa.di.madgik.grs.reader.GRS2ReaderException;
import gr.uoa.di.madgik.grs.record.GenericRecord;
import java.net.URI;
 
public class EventReader extends Thread
{
  ForwardReader<GenericRecord> reader=null;
 
  public EventReader(URI locator) throws GRS2ReaderException
  {
    reader=new ForwardReader<GenericRecord>(locator);
  }
 
  public void run()
  {
    try
    {
      BufferEvent ev=reader.receive();
      System.out.println("Reveived event from writer : "+((KeyValueEvent)ev).getKey()+" - "+((KeyValueEvent)ev).getValue());
      reader.emit(new KeyValueEvent("info", "Hello..."));
      reader.emit(new KeyValueEvent("info", "Hello Again..."));
      reader.emit(new KeyValueEvent("info", "Hello And Again..."));
      reader.emit(new KeyValueEvent("info", "And again..."));
      reader.emit(new KeyValueEvent("info", "Bored already ?"));
      reader.emit(new ObjectEvent(new MyEvent("This is a custom object event", 234)));
      reader.close();
    }catch(Exception ex)
    {
      ex.printStackTrace();
    }
  }
}

MyEvent

import java.io.DataInput;
import java.io.DataOutput;
import gr.uoa.di.madgik.grs.record.GRS2RecordSerializationException;
import gr.uoa.di.madgik.grs.record.IPumpable;
 
public class MyEvent implements IPumpable
{
  private String message=null;
  private int count=0;
 
  public MyEvent(String message,int count)
  {
    this.message=message;
    this.count=count;
  }
 
  public String toString()
  {
    return "event payload message is : '"+this.message+"' with count '"+this.count+"'";
  }
 
  public void deflate(DataOutput out) throws GRS2RecordSerializationException
  {
    try
    {
      out.writeUTF(message);
      out.writeInt(count);
    }catch(Exception ex)
    {
      throw new GRS2RecordSerializationException("Could not serialize event", ex);
    }
  }
 
  public void inflate(DataInput in) throws GRS2RecordSerializationException
  {
    try
    {
      this.message=in.readUTF();
      this.count=in.readInt();
    }catch(Exception ex)
    {
      throw new GRS2RecordSerializationException("Could not deserialize event", ex);
    }
  }
 
  public void inflate(DataInput in, boolean reset) throws GRS2RecordSerializationException
  {
    this.inflate(in);
  }
}

Main

As in Simple Main, using the new writer and reader classes

  /*...*/
  EventWriter writer=new EventWriter(new TCPWriterProxy());
  EventReader reader=new EventReader(writer.getLocator());
  /*...*/

Multi Field Records

Writer

import java.io.File;
import java.net.URI;
import java.net.URL;
import java.util.concurrent.TimeUnit;
import gr.uoa.di.madgik.grs.buffer.IBuffer.Status;
import gr.uoa.di.madgik.grs.proxy.IWriterProxy;
import gr.uoa.di.madgik.grs.record.GenericRecord;
import gr.uoa.di.madgik.grs.record.GenericRecordDefinition;
import gr.uoa.di.madgik.grs.record.RecordDefinition;
import gr.uoa.di.madgik.grs.record.field.Field;
import gr.uoa.di.madgik.grs.record.field.FieldDefinition;
import gr.uoa.di.madgik.grs.record.field.FileField;
import gr.uoa.di.madgik.grs.record.field.FileFieldDefinition;
import gr.uoa.di.madgik.grs.record.field.StringField;
import gr.uoa.di.madgik.grs.record.field.StringFieldDefinition;
import gr.uoa.di.madgik.grs.record.field.URLField;
import gr.uoa.di.madgik.grs.record.field.URLFieldDefinition;
import gr.uoa.di.madgik.grs.writer.GRS2WriterException;
import gr.uoa.di.madgik.grs.writer.RecordWriter;
 
public class MultiWriter extends Thread
{
  private RecordWriter<GenericRecord> writer=null;
 
  public MultiWriter(IWriterProxy proxy) throws GRS2WriterException
  {
    //The gRS will contain only one type of records which in turn contains only a single field 
    RecordDefinition[] defs=new RecordDefinition[]{ //A gRS can contain a number of different record definitions
      new GenericRecordDefinition((new FieldDefinition[] { //A record can contain a number of different field definitions
        new StringFieldDefinition("MyStringField"), //The definition of a string field
        new StringFieldDefinition("MyOtherStringField"), //The definition of a string field
        new URLFieldDefinition("MyHTTPField"), //The definition of an http field
        new FileFieldDefinition("MyFileField") //The definition of a file field
      }))
    };
    writer=new RecordWriter<GenericRecord>(
      proxy, //The proxy that defines the way the writer can be accessed
      defs //The definitions of the records the gRS handles
    );
  }
 
  public URI getLocator() throws GRS2WriterException
  {
    return writer.getLocator();
  }
 
  public void run()
  {
    try
    {
      for(int i=0;i<500;i+=1)
      {
        //while the reader hasn't stopped reading
        if(writer.getStatus()!=Status.Open) break;
        GenericRecord rec=new GenericRecord();
        //Only a string field is added to the record as per definition
        Field[] fs=new Field[4];
        fs[0]=new StringField("Hello world "+i);
        fs[1]=new StringField("Hello world of Diligent");
        fs[2]=new URLField(new URL("http://www.d4science.eu/files/d4science_logo.jpg"));
        fs[3]=new FileField(new File("/bin/ls"));
        rec.setFields(fs);
        //if the buffer is in maximum capacity for the specified interval don't wait any more
        if(!writer.put(rec,60,TimeUnit.SECONDS)) break;
      }
      //if the reader hasn't already disposed the buffer, close to notify reader that no more will be provided 
      if(writer.getStatus()!=Status.Dispose) writer.close();
    }catch(Exception ex)
    {
      ex.printStackTrace();
    }
  }
}

Reader

import gr.uoa.di.madgik.grs.reader.ForwardReader;
import gr.uoa.di.madgik.grs.reader.GRS2ReaderException;
import gr.uoa.di.madgik.grs.record.GenericRecord;
import gr.uoa.di.madgik.grs.record.field.FileField;
import gr.uoa.di.madgik.grs.record.field.StringField;
import gr.uoa.di.madgik.grs.record.field.URLField;
import java.net.URI;
 
public class MultiReader extends Thread
{
  ForwardReader<GenericRecord> reader=null;
 
  public MultiReader(URI locator) throws GRS2ReaderException
  {
    reader=new ForwardReader<GenericRecord>(locator);
  }
 
  public void run()
  {
    try
    {
      for(GenericRecord rec : reader)
      {
        //In case a timeout occurs while optimistically waiting for more records form an originally open writer
        if(rec==null) break;
        //Retrieve the required field of the type available in the gRS definitions
        System.out.println(((StringField)rec.getField("MyStringField")).getPayload());
        System.out.println(((StringField)rec.getField("MyOtherStringField")).getPayload());
        System.out.println(((URLField)rec.getField("MyHTTPField")).getPayload());
        //In case of in memory communication, the target file will be the same as the original. In other case
        //the target file will be a local copy of the original file, and depending on the transport directive
        //set by the writer, it will be fully available, or it will gradually become available as data are read
        System.out.println(((FileField)rec.getField("MyFileField")).getPayload() +" : "+((FileField)rec.getField("MyFileField")).getOriginalPayload());
      }
      //Close the reader to release and dispose any resources in both reader and writer sides
      reader.close();
    }catch(Exception ex)
    {
      ex.printStackTrace();
    }
  }
}

Main

As in Simple Main, using the new writer and reader classes

  /*...*/
  MultiWriter writer=new MultiWriter(new LocalWriterProxy());
  MultiReader reader=new MultiReader(writer.getLocator());
  /*...*/

Retrieving Field Payload

As mentioned, depending on the transport directive set by the writer and the type of proxy used for the locator creation, the payload of some fields may not be fully available to the reader. However, the reader's clients should not have to deal with the details of this and should be able to uniformly access the payload of any field. Additionally, each field, depending on its characteristics, may support different transport directives, and may carry internally its payload, or it may only keep a reference to it. Therefore, only the field itself can be aware of how to access its own payload.

For example, the URLField, that is set to point to a remotely accessible http location, the field opens an HttpConnection and passes the stream to the client so that from then on he does not need to know the payloads actual location. For a FileField that is accessed remotely from the original file location, the data that are requested from the client are fetched remotely if not already available and are provided to the client through an intermediate local file. All these actions are performed in the background without the client having to perform any action.

The way this is achieved is by the MediationInputStream, an input stream implementation that wraps around the Field provided, protocol specific, input stream and from then on handles any remote transport that needs to be made. The field payload mediating input stream is provided by each field. Using the Mediating stream, data are transferred upon request using the field definition's defined chunk size to make the transport buffered. This does of course mean that depending on the size of the payload, a number of round trips need to be made. This will allow for less data been transferred in case only a part of the payload is needed, and also processing can be made on the initial parts while more data is retrieved. On the other hand, if the full payload is needed, the on demand transfer will impose additional overhead. In this cases, a specific field or in fact all the fields of a record can be requested to be made available if supported, in one blocking operation.

In the Multi Field example, if someone wants to read the HttpField and the FileField, the following snippet in the reader is what is needed

BufferedInputStream bin=new BufferedInputStream(rec.getField("MyHTTPField").getMediatingInputStream());
byte[] buffer = new byte[1024];
int bytesRead = 0;
while ((bytesRead = bin.read(buffer)) != -1) { /*...*/ }
 
/*...*/
 
BufferedInputStream bin=new BufferedInputStream(rec.getField("MyFileField").getMediatingInputStream());
byte[] buffer = new byte[1024];
int bytesRead = 0;
while ((bytesRead = bin.read(buffer)) != -1) { /*...*/ }

If the use case is that the a specific field is needed to be read entirely, as previously mentioned, one can request for the entire payload to be retrieved beforehand and not on request. For this, the following line needs to be added before the client starts reading the field payload as shown above

rec.getField("MyFileField").makeAvailable();

Random Access Reader

The example presented this far were targeting a forward only reader. There is also the alternative provided for a random access reader. This reader allows for random seeks forward and backward in the reader side. Since as was already mentioned, a record after it has been accessed by the reader is disposed by the writer, in order to achive going back to a previously accessed record, which is already disposed by the writer, all past records must be locally maintained in the reader side. To avoid bloating the in memory structures of the reader, already read records are persisted. When the client then wants to move back to a record he has already accessed, the desired record as well as a configurable record window ahead of it is restored from the persistency store. The window is always in front of the requested record assuming that forward movement is the predominent one. Note that in case of completely random seeks, a good strategy to achieve maximum performance is to disable the window by setting its size equal to 1. The persistency store that the past records are kept is internally configured. The only currently available one is a disk backed Random Access File. This of course inforces an overhead that for large data is not negligable. Short term plans include the usage of a different persistency backend to make the operation more performant.

One important detail in this, is that every record that is persisted in the reader side must be fully available once restored. That means that all payload that was previously served by the writer side must now be served directly from reader side. This means that whenever a record is persisted, the makeAvailable method is invoked on the record ensuring that any data that would be lost when the record would be disposed in the writer side, will now be directly accessible from the reader side.

In the following example, we assume a gRS created as in the case of the Simple Writer previously described. We present two ways to access the gRS in a random fashion, one using a list iterator, and one that provides random seeks.

Bidirectional Iterator

import gr.uoa.di.madgik.grs.reader.GRS2ReaderException;
import gr.uoa.di.madgik.grs.reader.RandomReader;
import gr.uoa.di.madgik.grs.record.GenericRecord;
import gr.uoa.di.madgik.grs.record.field.StringField;
import java.net.URI;
import java.util.ListIterator;
 
public class RandomIteratorReader extends Thread
{
  RandomReader<GenericRecord> reader=null;
 
  public RandomIteratorReader(URI locator) throws GRS2ReaderException
  {
    reader=new RandomReader<GenericRecord>(locator);
  }
 
  public void run()
  {
    try
    {
      ListIterator<GenericRecord> iter= reader.listIterator();
      while(iter.hasNext())
      {
        GenericRecord rec=iter.next();
        if(rec==null) break;
        System.out.println(((StringField)rec.getField("ThisIsTheField")).getPayload());
      }
      while(iter.hasPrevious())
      {
        GenericRecord rec=iter.previous();
        if(rec==null) break;
        System.out.println(((StringField)rec.getField("ThisIsTheField")).getPayload());
      }
      while(iter.hasNext())
      {
        GenericRecord rec=iter.next();
        if(rec==null) break;
        System.out.println(((StringField)rec.getField("ThisIsTheField")).getPayload());
      }
      reader.close();
    }catch(Exception ex)
    {
      ex.printStackTrace();
    }
  }
}

Random Access

import gr.uoa.di.madgik.grs.buffer.GRS2BufferException;
import gr.uoa.di.madgik.grs.buffer.IBuffer.Status;
import gr.uoa.di.madgik.grs.reader.GRS2ReaderException;
import gr.uoa.di.madgik.grs.reader.RandomReader;
import gr.uoa.di.madgik.grs.record.GRS2RecordDefinitionException;
import gr.uoa.di.madgik.grs.record.GenericRecord;
import gr.uoa.di.madgik.grs.record.field.StringField;
import java.net.URI;
import java.util.concurrent.TimeUnit;
 
public class RandomAccessReader extends Thread
{
  RandomReader<GenericRecord> reader=null;
 
  public RandomAccessReader(URI locator) throws GRS2ReaderException
  {
    reader=new RandomReader<GenericRecord>(locator);
  }
 
  private void read(int count) throws GRS2ReaderException, GRS2RecordDefinitionException, GRS2BufferException
  {
    for(int i=0;i<count;i+=1)
    {
      if(reader.getStatus()==Status.Dispose || (reader.getStatus()==Status.Close && reader.availableRecords()==0)) break;
      GenericRecord rec=reader.get(60, TimeUnit.SECONDS);
      if(rec==null) break;
      System.out.println(((StringField)rec.getField("ThisIsTheField")).getPayload());
    }
  }
 
  public void run()
  {
    try
    {
      this.read(20);
      reader.seek(-10);
      this.read(9);
      reader.seek(-5);
      this.read(3);
      reader.seek(10);
      this.read(5);
      reader.close();
    }catch(Exception ex)
    {
      ex.printStackTrace();
    }
  }
}

Main

As in Simple Main, using the appropriate respective new reader classe

  StringWriter writer=new StringWriter(new LocalWriterProxy());
  RandomIteratorReader reader1=new RandomIteratorReader(writer.getLocator());
  RandomAccessReader reader2=new RandomAccessReader(writer.getLocator());

Store

As already mentioned, the gRS2 is point to point only and once something has been read it is disposed. Furthermore, once a reader has been initialized, any additional request from a different reader seeking to utilize the same locator to access the previously accessed gRS2, will fail. However, there are cases where one might want to have a created gRS2 buffer that is reutilized by multiple readers. This is a case different to the one described in Random Access Reader, as in the latter, the gRS2 is read once from its source and it is persisted in local storage from then on. Also, the gRS2 is accessible by the same reader multiple times. In contrast, in the former case, every accessor of the stored gRS2, is a new reader, who can in turn be read by a random access one or only a forward one.

Creating a gRS2 store, a client can control its behaviour with some parameterization

  • The Store input is provided and it is not necessarily limited to a single incoming gRS2. Any number of incoming gRS2 can be provided and as a result the content of all of them will be multiplexed in a single new gRS2
  • The type of multiplexing that will take place between the number of available inputs, is a matter of parametrization and currently only two ways are available, a FIFO and a First Available policy
  • The amount of time that the created gRS2 will be maintained while inactive and not accessed by any reader is also provided at initialization time

The gRS2 Store is meant to be used as a checkpoint utility and is to be used only at appropriate use cases because of the performance penalties and resource consumptions it imposes. These can be identified in the following two:

  • A persistency store is utilized by the gRS2 Store in order to have all content of the incoming locators locally available and be able to serve them without relying on the possibly remote or even local input buffers which will be disposed and become unavailable. For this reason everything is brought under the management of the gRS2 Store management and persisted. The only store backend currently available is one based on a Random Access File which imposes non neglectable performance penalties. A new more efficient backend is planed for the short term evolution of the gRS
  • Although the gRS2 aims to minimize unnecessary record creation and movement, at most equal to the buffer capacity, the gRS2 Store operates on a different hypothesis. As already mentioned, its primary use is as part of a checkpointing operation. To this end, the Store once initialized will greedily read its inputs until the end no matter if some reader is accessing the stored buffer

As already mentioned, when using the gRS2 Store, there is no need to modify anything in either the writer or the reader. the Store is a utility and thus it can be used externally and without affecting the implicated components. The following example demonstrates the needed addition to persist the output of a number of writers and have a number of readers accessing the multiplexed output.

  StringWriter writer1=new StringWriter(new TCPWriterProxy());
  StringWriter writer2=new StringWriter(new LocalWriterProxy());
  StringWriter writer3=new StringWriter(new TCPWriterProxy());
  StringWriter writer4=new StringWriter(new LocalWriterProxy());
 
  URI []locators=new URI[]{writer1.getLocator(),writer2.getLocator(),writer3.getLocator(),writer4.getLocator()};
  //The respective TCPStoreWriterProxy.store operation could be used to create a locator with a different usage scope
  URI locator=LocalStoreWriterProxy.store(locators, IBufferStore.MultiplexType.FIFO, 60*60, TimeUnit.SECONDS);
 
  StringReader reader1=new StringReader(locator);
  RandomAccessReader reader2=new RandomAccessReader(locator);
  StringReader reader3=new StringReader(locator);
  RandomIteratorReader reader4=new RandomIteratorReader(locator);
  StringReader reader5=new StringReader(locator);
 
  //Start everything
  writer1.start();
  /*...*/
  reader5.start();
 
  //Join everyhting
  writer1.join();
  /*...*/
  reader5.join();

Since it is evident that through the gRS2 Store the writer and reader are disconnected, any events that are emitted by the writer, will reach the reader either the time it is emitted, if the reading is performed wile the store is actively persisting its inputs, or at initialization time of the reader where all the events that have been emitted up until that point are emitted all at once. Events that are emitted by the reader will never reach the writers whose output is persisted.

Utilities and Convenience tools

Record Import

It is quite common that a consumer might wish to forward a record to a producer using the same record definition, either completely unaltered or with minor modifications. To avoid explicitly copying the record, thus wasting valuable computational resources, one has the option of importing records directly to a producer. This can be done with the importRecord methods, which have the same signatures as the put methods used to add entirely new records to a writer.

The following snippet demonstrates how one can forward all records read from a reader to a writer without making any modification

for(GenericRecord rec: reader) {
   if(writer.getStatus() == Status.Close || writer.getStatus() == Status.Dispose)
      break;
   if(!writer.importRecord(rec, 60, TimeUnit.SECONDS))
	break;
}

The following snippet demonstrates how one can modify a known record field, leaving all other fields unchanged. In fact, one can be unaware of the existence of fields other than those which one is interested.

for(GenericRecord rec: reader) {
   ((StringField)rec.getField("payloadField")).setPayload("Modified payload");
   if(writer.getStatus() == Status.Close || writer.getStatus() == Status.Dispose)
      break;
   if(!writer.importRecord(rec, 60, TimeUnit.SECONDS))
	break;
}

Keep-Alive Functionality

Both forward and random access readers can have keep-alive functionality added to them using a typical decorator pattern. This functionality can be useful if a consumer wishes to delay the consumption of records for long periods of time and the producer has no indication of this behavior. In simple cases when the implementation of a transport protocol using events is not desirable, a keep-alive reader can be used in order to keep the communication channel open, by periodically reading records. Keep-alive functionality is transparent to the client, as each time a get method is called, the record either originates from the set of prefetched records, or it is actually read directly from the underlying reader. The order of records is guaranteed to be the same as if the underlying reader was used on its own. The following snippet demonstrates the creation of a keep-alive reader with a period of record retrieval of 30 seconds:

IRecordReader<Record> reader = new ForwardReader<Record>(this.locator);
reader = new KeepAliveReader<Record>(reader, 30, TimeUnit.SECONDS);

Locator Utilities

The Locators class provides locator-related convenience methods. Currently, the class provides a Local-to-TCP converter utility which can be useful if it cannot be known in advance whether a produced resultset should be available only locally or be exposed remotely through TCP. The result producer could then produce its results locally and the results can be exposed through TCP by converting the locator to a TCP locator if it is deemed necessary. The exact way of converting a local locator to TCP is shown in the following snippet:

URI localLocator = ... //An operation returning a local locator
URI TCPLocator = Locators.localToTCP(localLocator);

HTTP Protocol

gRS can also work on top of HTTP instead of TCP. The transferred messages are in XML format. There are corresponding HTTP classes for ConnectionManger, Proxies, etc that can be used instead of the TCP classes to enable this functionality.

Note that a both ends need to use the same protocol in order to be able to communicate.