GRS2
Contents
Introduction
The goal of the gRS framework is to enable point to point producer consumer communication. For this to be achieved and connect collocated or remotely located actors the framework protects the two parties from technology specific knowledge and limitations. Additionally it can provide upon request value adding functionalities to enhance the clients communication capabilities.
In the process of offering these functionalities to its clients the framework imposes minimal restrictions on the clients design. Providing an interface similar to commonly used programming structures such as a collection and a stream, client programs can easily incorporate it.
Furthermore, since one of the gRS main goals is to provide location independent services, it is modularly build to allow a number of different technologies to be used as transport and communication mediation depending on the scenario used. For communication through trusted channels a clear TCP connection can be used for fast delivery. Authentication of communication parties is build in to the framework and can be utilized on communication packet level. In situations where more secure connections are required, the full communication stream can be fully encrypted. In cases of firewalled communication, an http transport mechanism can be easily incorporated and supplied by the framework without a single change in the clients code. In case of collocated actors, in memory references can be directly passed from producer to consumer without either of them changing their behavior in any of the described cases.
In a disjoint environment where the producer and consumer are not customly made to interact only with each other or even in the case that the needs of the consumer change from invocation to invocation, a situation might appear where the producer generates more data than the consumer needs even at a per record scope. The gRS enables fine grained transport mechanisms at level not just of a record, but also at the level of record field. A record containing more than one fields may be needed entirely by its consumer or at the next interaction only a single field of the record may be needed. In such a situation a consumer can retrieve only the payload he is interested in without having to deal with any additional payload. Even at the level of the single field, the transported data can be further optimized to consume only just the needed bytes instead of having to transfer a large amount of data which will at the end be disposed without being used.
In modern systems the situation frequently appears where the producers output is already available for access through some protocol. The gRS has a very extendable design as to the way the producer will serve its payload. A number of protocols can be used, ftp, http, network storage, local filesystem, and many more can be implemented very easily and modularly.
TCP Connection Manager
In case the gRS created is to be shared with a remote, or in general in a different address space than the one the writer is in, a TCPConnectionManager should be used. A TCPConnectionManager is static in the context of the address space it is used and is initialized though a singleton pattern. Therefore, one can safely initialize it in any context it is usefull for him and do not worry if other components are also trying to initialize it. An example of initializing the TCPConnectionManager is shown in the following snippet.
List<PortRange> ports=new ArrayList<PortRange>(); //The ports that the TCPConnection manager should use ports.add(new PortRange(3000, 3050)); //Any in the range between 3000 and 3050 ports.add(new PortRange(3055, 3055)); //Port 3055 TCPConnectionManager.Init( new TCPConnectionManagerConfig("localhost", //The hostname by which the machine is reachable ports, //The ports that can be used by the connection manager true //If no port ranges were provided, or none of them could be used, use a random available port )); TCPConnectionManager.RegisterEntry(new TCPConnectionHandler()); //Register the handler for the gRS2 incoming requests TCPConnectionManager.RegisterEntry(new TCPStoreConnectionHandler()); //Register the handler for the gRS2 store incoming requests
A gCube service could use the onInit event to make this initialization. The TCPConnectionManager is located in the MadgikCommons package.
Definitions
Every gRS needs to have explicitly defined the definitions of the records it holds. The record definitions in turn contain the field definitions they contain. Every record that is then handled by the specific gRS must comply to one of the definitions initially provided to the gRS. For example, creating a gRS that handles only one type of records that in turn contains only a single type of field is presented in the following snippet
//The gRS will contain only one type of records which in turn contains only a single field RecordDefinition[] defs=new RecordDefinition[]{ //A gRS can contain a number of different record definitions new GenericRecordDefinition((new FieldDefinition[] { //A record can contain a number of different field definitions new StringFieldDefinition("ThisIsTheField") //The definition of the field })) };
Record Writer Initialization
To initialize a gRS2 writer one must simply create a new instance of the available writers. Depending on the configursation detail one wishes to set, a number of different constructors are available
writer=new RecordWriter<GenericRecord>( proxy, //The proxy that defines the way the writer can be accessed defs //The definitions of the records the gRS handles ); writer=new RecordWriter<GenericRecord>( proxy, //The proxy that defines the way the writer can be accessed defs, //The definitions of the records the gRS handles 50, //The capacity of the underlying synchronization buffer 2, //The maximum number of parallel records that can be concurrently accessed on partial transfer 0.5f //The maximum fraction of the buffer that should be transfered during mirroring ); writer=new RecordWriter<GenericRecord>( proxy, //The proxy that defines the way the writer can be accessed defs, //The definitions of the records the gRS handles 50, //The capacity of the underlying synchronization buffer 2, //The maximum number of parallel records that can be concurrently accessed on partial transfer 0.5f, //The maximum fraction of the buffer that should be transfered during mirroring 60, //The timeout in time units after which an inactive gRS can be disposed TimeUnit.SECONDS //The time unit in timeout after which an inactive gRS can be disposed );
Simple Example
In the following snippets, a simple writer and respective reader is presented to show the full usage of the presented elements
Writer
import java.net.URI; import java.util.concurrent.TimeUnit; import gr.uoa.di.madgik.grs.buffer.IBuffer.Status; import gr.uoa.di.madgik.grs.proxy.IWriterProxy; import gr.uoa.di.madgik.grs.record.GenericRecord; import gr.uoa.di.madgik.grs.record.GenericRecordDefinition; import gr.uoa.di.madgik.grs.record.RecordDefinition; import gr.uoa.di.madgik.grs.record.field.Field; import gr.uoa.di.madgik.grs.record.field.FieldDefinition; import gr.uoa.di.madgik.grs.record.field.StringField; import gr.uoa.di.madgik.grs.record.field.StringFieldDefinition; import gr.uoa.di.madgik.grs.writer.GRS2WriterException; import gr.uoa.di.madgik.grs.writer.RecordWriter; public class StringWriter extends Thread { private RecordWriter<GenericRecord> writer=null; public StringWriter(IWriterProxy proxy) throws GRS2WriterException { //The gRS will contain only one type of records which in turn contains only a single field RecordDefinition[] defs=new RecordDefinition[]{ //A gRS can contain a number of different record definitions new GenericRecordDefinition((new FieldDefinition[] { //A record can contain a number of different field definitions new StringFieldDefinition("ThisIsTheField") //The definition of the field })) }; writer=new RecordWriter<GenericRecord>( proxy, //The proxy that defines the way the writer can be accessed defs //The definitions of the records the gRS handles ); } public URI getLocator() throws GRS2WriterException { return writer.getLocator(); } public void run() { try { for(int i=0;i<500;i+=1) { //while the reader hasn't stopped reading if(writer.getStatus()!=Status.Open) break; GenericRecord rec=new GenericRecord(); //Only a string field is added to the record as per definition rec.setFields(new Field[]{new StringField("Hello world "+i)}); //if the buffer is in maximum capacity for the specified interval don;t wait any more if(!writer.put(rec,60,TimeUnit.SECONDS)) break; } //if the reader hasn't already disposed the buffer, close to notify reader that no more will be provided if(writer.getStatus()!=Status.Dispose) writer.close(); }catch(Exception ex) { ex.printStackTrace(); } } }
Reader
import gr.uoa.di.madgik.grs.reader.ForwardReader; import gr.uoa.di.madgik.grs.reader.GRS2ReaderException; import gr.uoa.di.madgik.grs.record.GenericRecord; import gr.uoa.di.madgik.grs.record.field.StringField; import java.net.URI; public class StringReader extends Thread { ForwardReader<GenericRecord> reader=null; public StringReader(URI locator) throws GRS2ReaderException { reader=new ForwardReader<GenericRecord>(locator); } public void run() { try { for(GenericRecord rec : reader) { //In case a timeout occurs while optimistically waiting for more records form an originally open writer if(rec==null) break; //Retrieve the required field of the type available in the gRS definitions System.out.println(((StringField)rec.getField("ThisIsTheField")).getPayload()); } //Close the reader to release and dispose any resources in boith reader and writer sides reader.close(); }catch(Exception ex) { ex.printStackTrace(); } } }
Main
import java.util.ArrayList; import java.util.List; import gr.uoa.di.madgik.commons.server.PortRange; import gr.uoa.di.madgik.commons.server.TCPConnectionManager; import gr.uoa.di.madgik.commons.server.TCPConnectionManagerConfig; import gr.uoa.di.madgik.grs.proxy.tcp.TCPConnectionHandler; import gr.uoa.di.madgik.grs.proxy.tcp.TCPStoreConnectionHandler; import gr.uoa.di.madgik.grs.proxy.tcp.TCPWriterProxy; public class MainTest { public static void main(String []args) throws Exception { List<PortRange> ports=new ArrayList<PortRange>(); //The ports that the TCPConnection manager should use ports.add(new PortRange(3000, 3050)); //Any in the range between 3000 and 3050 ports.add(new PortRange(3055, 3055)); //Port 3055 TCPConnectionManager.Init( new TCPConnectionManagerConfig("localhost", //The hostname by which the machine is reachable ports, //The ports that can be used by the connection manager true //If no port ranges were provided, or none of them could be used, use a random available port )); TCPConnectionManager.RegisterEntry(new TCPConnectionHandler()); //Register the handler for the gRS2 incoming requests TCPConnectionManager.RegisterEntry(new TCPStoreConnectionHandler()); //Register the handler for the gRS2 store incoming requests // gr.uoa.di.madgik.grs.proxy.local.LocalWriterProxy Could be used instead for local only, all in memory, access StringWriter writer=new StringWriter(new TCPWriterProxy()); StringReader reader=new StringReader(writer.getLocator()); writer.start(); reader.start(); writer.join(); reader.join(); } }
Events
As was already mentioned, the gRS2 serves as a bydirectional communication channel through events that can be propagated from either the writer to the reader or form the reader to the writer along the normal data flow. These events can serve any purpose fitting to the application logic, from control signals to full data transport purposes, although in the later case, the payload size should be cept moderate as it is always handled in memory and cannot use services such as partial transfer, or be declared using internally managed files.
An example that has been taken to the extreams follows. In this example, a writer and a reader use only events to propagate their data. One should notice that the communication protocol is now up to the reader and writer clients to be implemented. In the example case this has been handled in a quick and dirty way using sleep timeouts. Additionally, it should be noted that other than the basic key - value internally provided event placeholder, an extendable event "placeholder" has been provided to facilitate any type of custom object transport.
Writer
import java.net.URI; import gr.uoa.di.madgik.grs.buffer.IBuffer.Status; import gr.uoa.di.madgik.grs.events.BufferEvent; import gr.uoa.di.madgik.grs.events.KeyValueEvent; import gr.uoa.di.madgik.grs.events.ObjectEvent; import gr.uoa.di.madgik.grs.proxy.IWriterProxy; import gr.uoa.di.madgik.grs.record.GenericRecord; import gr.uoa.di.madgik.grs.record.GenericRecordDefinition; import gr.uoa.di.madgik.grs.record.RecordDefinition; import gr.uoa.di.madgik.grs.record.field.FieldDefinition; import gr.uoa.di.madgik.grs.record.field.StringFieldDefinition; import gr.uoa.di.madgik.grs.writer.GRS2WriterException; import gr.uoa.di.madgik.grs.writer.RecordWriter; public class EventWriter extends Thread { private RecordWriter<GenericRecord> writer=null; public EventWriter(IWriterProxy proxy) throws GRS2WriterException { //The gRS will contain only one type of records which in turn contains only a single field RecordDefinition[] defs=new RecordDefinition[]{ //A gRS can contain a number of different record definitions new GenericRecordDefinition((new FieldDefinition[] { //A record can contain a number of different field definitions new StringFieldDefinition("ThisIsTheField") //The definition of the field })) }; writer=new RecordWriter<GenericRecord>( proxy, //The proxy that defines the way the writer can be accessed defs //The definitions of the records the gRS handles ); } public URI getLocator() throws GRS2WriterException { return writer.getLocator(); } public void receiveWaitKeyValue() throws GRS2WriterException { while(true) { BufferEvent ev=writer.receive(); if(ev==null) try{ Thread.sleep(100);}catch(Exception ex){} else { System.out.println("Reveived event from reader : "+((KeyValueEvent)ev).getKey()+" - "+((KeyValueEvent)ev).getValue()); break; } } } public void receiveWaitObject() throws GRS2WriterException { while(true) { BufferEvent ev=writer.receive(); if(ev==null) try{ Thread.sleep(100);}catch(Exception ex){} else { System.out.println("Reveived event from reader : "+((ObjectEvent)ev).getItem().toString()); break; } } } public void run() { try { writer.emit(new KeyValueEvent("init", "Hello")); for(int i=0;i<5;i+=1) this.receiveWaitKeyValue(); this.receiveWaitObject(); if(writer.getStatus()!=Status.Dispose) writer.close(); }catch(Exception ex) { ex.printStackTrace(); } } }
Reader
import gr.uoa.di.madgik.grs.events.BufferEvent; import gr.uoa.di.madgik.grs.events.KeyValueEvent; import gr.uoa.di.madgik.grs.events.ObjectEvent; import gr.uoa.di.madgik.grs.reader.ForwardReader; import gr.uoa.di.madgik.grs.reader.GRS2ReaderException; import gr.uoa.di.madgik.grs.record.GenericRecord; import java.net.URI; public class EventReader extends Thread { ForwardReader<GenericRecord> reader=null; public EventReader(URI locator) throws GRS2ReaderException { reader=new ForwardReader<GenericRecord>(locator); } public void run() { try { BufferEvent ev=reader.receive(); System.out.println("Reveived event from writer : "+((KeyValueEvent)ev).getKey()+" - "+((KeyValueEvent)ev).getValue()); reader.emit(new KeyValueEvent("info", "Hello...")); reader.emit(new KeyValueEvent("info", "Hello Again...")); reader.emit(new KeyValueEvent("info", "Hello And Again...")); reader.emit(new KeyValueEvent("info", "And again...")); reader.emit(new KeyValueEvent("info", "Bored already ?")); reader.emit(new ObjectEvent(new MyEvent("This is a custom object event", 234))); reader.close(); }catch(Exception ex) { ex.printStackTrace(); } } }
MyEvent
import java.io.DataInput; import java.io.DataOutput; import gr.uoa.di.madgik.grs.record.GRS2RecordSerializationException; import gr.uoa.di.madgik.grs.record.IPumpable; public class MyEvent implements IPumpable { private String message=null; private int count=0; public MyEvent(String message,int count) { this.message=message; this.count=count; } public String toString() { return "event payload message is : '"+this.message+"' with count '"+this.count+"'"; } public void deflate(DataOutput out) throws GRS2RecordSerializationException { try { out.writeUTF(message); out.writeInt(count); }catch(Exception ex) { throw new GRS2RecordSerializationException("Could not serialize event", ex); } } public void inflate(DataInput in) throws GRS2RecordSerializationException { try { this.message=in.readUTF(); this.count=in.readInt(); }catch(Exception ex) { throw new GRS2RecordSerializationException("Could not deserialize event", ex); } } public void inflate(DataInput in, boolean reset) throws GRS2RecordSerializationException { this.inflate(in); } }
Main
As in Simple Main, using the new writer and reader classes
/*...*/ EventWriter writer=new EventWriter(new TCPWriterProxy()); EventReader reader=new EventReader(writer.getLocator()); /*...*/
Custom Records
All records that are handled throug gRS2 must extend a base class of gr.uoa.di.madgik.grs.record.Record. Extending this class means implementing a couple of required methods which are mainly used to properly manage the transport of the custom record and any additional information it might cary. Although the actual payload of the record is to be managed through fields, one can also assign additional information in the record it self. In the example that follows, a custom record with its respective definition is created. The record hides the fields that it manages by exposing direct getters and setters over a known definition that it provides and should be used with. Additionally, it defines additional information that describes teh record as a whole and instead of using additional fields in order to do this, it defines placeholders within the record it self.
Definition
import java.io.DataInput; import java.io.DataOutput; import gr.uoa.di.madgik.grs.buffer.IBuffer.TransportDirective; import gr.uoa.di.madgik.grs.record.GRS2RecordSerializationException; import gr.uoa.di.madgik.grs.record.RecordDefinition; import gr.uoa.di.madgik.grs.record.field.FieldDefinition; import gr.uoa.di.madgik.grs.record.field.StringFieldDefinition; public class MyRecordDefinition extends RecordDefinition { public static final String PayloadFieldName="payload"; public static final boolean DefaultCompress=false; public MyRecordDefinition() { StringFieldDefinition def=new StringFieldDefinition(MyRecordDefinition.PayloadFieldName); def.setTransportDirective(TransportDirective.Full); def.setCompress(MyRecordDefinition.DefaultCompress); this.Fields = new FieldDefinition[]{def}; } public MyRecordDefinition(boolean compress) { StringFieldDefinition def=new StringFieldDefinition(MyRecordDefinition.PayloadFieldName); def.setTransportDirective(TransportDirective.Full); def.setCompress(compress); this.Fields = new FieldDefinition[]{def}; } @Override public boolean extendEquals(Object obj) { if(!(obj instanceof MyRecordDefinition)) return false; return true; } @Override public void extendDeflate(DataOutput out) throws GRS2RecordSerializationException { //nothing to deflate } @Override public void extendInflate(DataInput in) throws GRS2RecordSerializationException { //nothing to inflate } }
Record
import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import gr.uoa.di.madgik.grs.GRS2Exception; import gr.uoa.di.madgik.grs.buffer.GRS2BufferException; import gr.uoa.di.madgik.grs.record.GRS2RecordDefinitionException; import gr.uoa.di.madgik.grs.record.GRS2RecordSerializationException; import gr.uoa.di.madgik.grs.record.Record; import gr.uoa.di.madgik.grs.record.field.Field; import gr.uoa.di.madgik.grs.record.field.StringField; public class MyRecord extends Record { private String id="none"; private String collection="none"; public MyRecord() { this.setFields(new Field[]{new StringField()}); } public MyRecord(String payload) { this.setFields(new Field[]{new StringField(payload)}); } public MyRecord(String id,String collection) { this.setId(id); this.setCollection(collection); this.setFields(new Field[]{new StringField()}); } public MyRecord(String id,String collection,String payload) { this.setId(id); this.setCollection(collection); this.setFields(new Field[]{new StringField(payload)}); } public void setId(String id) { this.id = id; } public String getId() { return id; } public void setCollection(String collection) { this.collection = collection; } public String getCollection() { return collection; } public void setPayload(String payload) throws GRS2Exception { this.getField(MyRecordDefinition.PayloadFieldName).setPayload(payload); } public String getPayload() throws GRS2Exception { return this.getField(MyRecordDefinition.PayloadFieldName).getPayload(); } @Override public StringField getField(String name) throws GRS2RecordDefinitionException, GRS2BufferException { return (StringField)super.getField(name); } @Override public void extendSend(DataOutput out) throws GRS2RecordSerializationException { this.extendDeflate(out); } @Override public void extendReceive(DataInput in) throws GRS2RecordSerializationException { this.extendInflate(in, false); } @Override public void extendDeflate(DataOutput out) throws GRS2RecordSerializationException { try { out.writeUTF(this.id); out.writeUTF(this.collection); }catch(IOException ex) { throw new GRS2RecordSerializationException("Could not deflate record", ex); } } @Override public void extendInflate(DataInput in, boolean reset) throws GRS2RecordSerializationException { try { this.id=in.readUTF(); this.collection=in.readUTF(); }catch(IOException ex) { throw new GRS2RecordSerializationException("Could not inflate record", ex); } } @Override public void extendDispose() { this.id=null; this.collection=null; } }
Multi Field Records
Writer
import java.io.File; import java.net.URI; import java.net.URL; import java.util.concurrent.TimeUnit; import gr.uoa.di.madgik.grs.buffer.IBuffer.Status; import gr.uoa.di.madgik.grs.proxy.IWriterProxy; import gr.uoa.di.madgik.grs.record.GenericRecord; import gr.uoa.di.madgik.grs.record.GenericRecordDefinition; import gr.uoa.di.madgik.grs.record.RecordDefinition; import gr.uoa.di.madgik.grs.record.field.Field; import gr.uoa.di.madgik.grs.record.field.FieldDefinition; import gr.uoa.di.madgik.grs.record.field.FileField; import gr.uoa.di.madgik.grs.record.field.FileFieldDefinition; import gr.uoa.di.madgik.grs.record.field.StringField; import gr.uoa.di.madgik.grs.record.field.StringFieldDefinition; import gr.uoa.di.madgik.grs.record.field.URLField; import gr.uoa.di.madgik.grs.record.field.URLFieldDefinition; import gr.uoa.di.madgik.grs.writer.GRS2WriterException; import gr.uoa.di.madgik.grs.writer.RecordWriter; public class MultiWriter extends Thread { private RecordWriter<GenericRecord> writer=null; public MultiWriter(IWriterProxy proxy) throws GRS2WriterException { //The gRS will contain only one type of records which in turn contains only a single field RecordDefinition[] defs=new RecordDefinition[]{ //A gRS can contain a number of different record definitions new GenericRecordDefinition((new FieldDefinition[] { //A record can contain a number of different field definitions new StringFieldDefinition("MyStringField"), //The definition of a string field new StringFieldDefinition("MyOtherStringField"), //The definition of a string field new URLFieldDefinition("MyHTTPField"), //The definition of an http field new FileFieldDefinition("MyFileField") //The definition of a file field })) }; writer=new RecordWriter<GenericRecord>( proxy, //The proxy that defines the way the writer can be accessed defs //The definitions of the records the gRS handles ); } public URI getLocator() throws GRS2WriterException { return writer.getLocator(); } public void run() { try { for(int i=0;i<500;i+=1) { //while the reader hasn't stopped reading if(writer.getStatus()!=Status.Open) break; GenericRecord rec=new GenericRecord(); //Only a string field is added to the record as per definition Field[] fs=new Field[4]; fs[0]=new StringField("Hello world "+i); fs[1]=new StringField("Hello world of Diligent"); fs[2]=new URLField(new URL("http://www.d4science.eu/files/d4science_logo.jpg")); fs[3]=new FileField(new File("/bin/ls")); rec.setFields(fs); //if the buffer is in maximum capacity for the specified interval don't wait any more if(!writer.put(rec,60,TimeUnit.SECONDS)) break; } //if the reader hasn't already disposed the buffer, close to notify reader that no more will be provided if(writer.getStatus()!=Status.Dispose) writer.close(); }catch(Exception ex) { ex.printStackTrace(); } } }
Reader
import gr.uoa.di.madgik.grs.reader.ForwardReader; import gr.uoa.di.madgik.grs.reader.GRS2ReaderException; import gr.uoa.di.madgik.grs.record.GenericRecord; import gr.uoa.di.madgik.grs.record.field.FileField; import gr.uoa.di.madgik.grs.record.field.StringField; import gr.uoa.di.madgik.grs.record.field.URLField; import java.net.URI; public class MultiReader extends Thread { ForwardReader<GenericRecord> reader=null; public MultiReader(URI locator) throws GRS2ReaderException { reader=new ForwardReader<GenericRecord>(locator); } public void run() { try { for(GenericRecord rec : reader) { //In case a timeout occurs while optimistically waiting for more records form an originally open writer if(rec==null) break; //Retrieve the required field of the type available in the gRS definitions System.out.println(((StringField)rec.getField("MyStringField")).getPayload()); System.out.println(((StringField)rec.getField("MyOtherStringField")).getPayload()); System.out.println(((URLField)rec.getField("MyHTTPField")).getPayload()); //In case of in memory communication, the target file will be the same as the original. In other case //the target file will be a local copy of the original file, and depending on the transport directive //set by the writer, it will be fully available, or it will gradually become available as data are read System.out.println(((FileField)rec.getField("MyFileField")).getPayload() +" : "+((FileField)rec.getField("MyFileField")).getOriginalPayload()); } //Close the reader to release and dispose any resources in both reader and writer sides reader.close(); }catch(Exception ex) { ex.printStackTrace(); } } }