GRS2

From Gcube Wiki
Jump to: navigation, search

Introduction

The goal of the gRS framework is to enable point to point producer consumer communication. For this to be achieved and connect collocated or remotely located actors the framework protects the two parties from technology specific knowledge and limitations. Additionally it can provide upon request value adding functionalities to enhance the clients communication capabilities.

In the process of offering these functionalities to its clients the framework imposes minimal restrictions on the clients design. Providing an interface similar to commonly used programming structures such as a collection and a stream, client programs can easily incorporate it.

Furthermore, since one of the gRS main goals is to provide location independent services, it is modularly build to allow a number of different technologies to be used as transport and communication mediation depending on the scenario used. For communication through trusted channels a clear TCP connection can be used for fast delivery. Authentication of communication parties is build in to the framework and can be utilized on communication packet level. In situations where more secure connections are required, the full communication stream can be fully encrypted. In cases of firewalled communication, an http transport mechanism can be easily incorporated and supplied by the framework without a single change in the clients code. In case of collocated actors, in memory references can be directly passed from producer to consumer without either of them changing their behavior in any of the described cases.

In a disjoint environment where the producer and consumer are not customly made to interact only with each other or even in the case that the needs of the consumer change from invocation to invocation, a situation might appear where the producer generates more data than the consumer needs even at a per record scope. The gRS enables fine grained transport mechanisms at level not just of a record, but also at the level of record field. A record containing more than one fields may be needed entirely by its consumer or at the next interaction only a single field of the record may be needed. In such a situation a consumer can retrieve only the payload he is interested in without having to deal with any additional payload. Even at the level of the single field, the transported data can be further optimized to consume only just the needed bytes instead of having to transfer a large amount of data which will at the end be disposed without being used.

In modern systems the situation frequently appears where the producers output is already available for access through some protocol. The gRS has a very extendable design as to the way the producer will serve its payload. A number of protocols can be used, ftp, http, network storage, local filesystem, and many more can be implemented very easily and modularly.

gRS2 component is available in our Maven repositories with the following coordinates:

<groupId>org.gcube.execution</groupId>
<artifactId>grs2library</artifactId>
<version>...</version>

TCP Connection Manager

In case the gRS created is to be shared with a remote, or in general in a different address space than the one the writer is in, a TCPConnectionManager should be used. A TCPConnectionManager is static in the context of the address space it is used and is initialized though a singleton pattern. Therefore, one can safely initialize it in any context it is useful for him and do not worry if other components are also trying to initialize it. An example of initializing the TCPConnectionManager is shown in the following snippet.

List<PortRange> ports=new ArrayList<PortRange>(); //The ports that the TCPConnection manager should use
ports.add(new PortRange(3000, 3050));             //Any in the range between 3000 and 3050
ports.add(new PortRange(3055, 3055));             //Port 3055
TCPConnectionManager.Init(
  new TCPConnectionManagerConfig("localhost", //The hostname by which the machine is reachable 
    ports,                                    //The ports that can be used by the connection manager
    true                                      //If no port ranges were provided, or none of them could be used, use a random available port
));
TCPConnectionManager.RegisterEntry(new TCPConnectionHandler());      //Register the handler for the gRS2 incoming requests
TCPConnectionManager.RegisterEntry(new TCPStoreConnectionHandler()); //Register the handler for the gRS2 store incoming requests

A gCube service could use the onInit event to make this initialization. The TCPConnectionManager is located in the MadgikCommons package.

Definitions

Every gRS needs to have explicitly defined the definitions of the records it holds. The record definitions in turn contain the field definitions they contain. Every record that is then handled by the specific gRS must comply to one of the definitions initially provided to the gRS. For example, creating a gRS that handles only one type of records that in turn contains only a single type of field is presented in the following snippet

//The gRS will contain only one type of records which in turn contains only a single field 
RecordDefinition[] defs=new RecordDefinition[]{          //A gRS can contain a number of different record definitions
    new GenericRecordDefinition((new FieldDefinition[] { //A record can contain a number of different field definitions
    new StringFieldDefinition("ThisIsTheField")          //The definition of the field
  }))
};

Record Writer Initialization

To initialize a gRS2 writer one must simply create a new instance of the available writers. Depending on the configuration detail one wishes to set, a number of different constructors are available

writer=new RecordWriter<GenericRecord>(
  proxy, //The proxy that defines the way the writer can be accessed
  defs   //The definitions of the records the gRS handles
);
 
writer=new RecordWriter<GenericRecord>(
  proxy, //The proxy that defines the way the writer can be accessed
  defs,  //The definitions of the records the gRS handles
  50,    //The capacity of the underlying synchronization buffer
  2,     //The maximum number of parallel records that can be concurrently accessed on partial transfer  
  0.5f   //The maximum fraction of the buffer that should be transferred during mirroring  
);
 
writer=new RecordWriter<GenericRecord>(
  proxy,           //The proxy that defines the way the writer can be accessed
  defs,            //The definitions of the records the gRS handles
  50,              //The capacity of the underlying synchronization buffer
  2,               //The maximum number of parallel records that can be concurrently accessed on partial transfer  
  0.5f,            //The maximum fraction of the buffer that should be transferred during mirroring 
  60,              //The timeout in time units after which an inactive gRS can be disposed  
  TimeUnit.SECONDS //The time unit in timeout after which an inactive gRS can be disposed
);

For convenience, if one wishes to create a writer with the same configuration as a already available reader, one can use one of the following constructors

writer=new RecordWriter<GenericRecord>(
  proxy, //The proxy that defines the way the writer can be accessed
  reader //The reader to copy configuration from
);
 
writer=new RecordWriter<GenericRecord>(
  proxy, //The proxy that defines the way the writer can be accessed
  reader, //The reader to copy configuration from
  50,    //The capacity of the underlying synchronization buffer
  2,     //The maximum number of parallel records that can be concurrently accessed on partial transfer  
  0.5f   //The maximum fraction of the buffer that should be transferred during mirroring  
);
 
writer=new RecordWriter<GenericRecord>(
  proxy, //The proxy that defines the way the writer can be accessed
  reader, //The reader to copy configuration from
  50,    //The capacity of the underlying synchronization buffer
  2,     //The maximum number of parallel records that can be concurrently accessed on partial transfer  
  0.5f,   //The maximum fraction of the buffer that should be transferred during mirroring  
  60,              //The timeout in time units after which an inactive gRS can be disposed  
  TimeUnit.SECONDS //The time unit in timeout after which an inactive gRS can be disposed
);

In the first of the above examples, all configuration is copied from the reader. The extra configuration parameters override the ones of the reader, meaning that in the last example only the record definitions are duplicated from the reader configuration.

Custom Records

All records that are handled through gRS2 must extend a base class of gr.uoa.di.madgik.grs.record.Record. Extending this class means implementing a couple of required methods which are mainly used to properly manage the transport of the custom record and any additional information it might cary. Although the actual payload of the record is to be managed through fields, one can also assign additional information in the record itself. In the example that follows, a custom record with its respective definition is created. The record hides the fields that it manages by exposing direct getters and setters over a known definition that it provides and should be used with. Additionally, it defines additional information that describes the record as a whole and instead of using additional fields in order to do this, it defines placeholders within the record itself.

Definition

import java.io.DataInput;
import java.io.DataOutput;
import gr.uoa.di.madgik.grs.buffer.IBuffer.TransportDirective;
import gr.uoa.di.madgik.grs.record.GRS2RecordSerializationException;
import gr.uoa.di.madgik.grs.record.RecordDefinition;
import gr.uoa.di.madgik.grs.record.field.FieldDefinition;
import gr.uoa.di.madgik.grs.record.field.StringFieldDefinition;
 
public class MyRecordDefinition extends RecordDefinition
{
  public static final String PayloadFieldName="payload";
  public static final boolean DefaultCompress=false;
 
  public MyRecordDefinition()
  {
    StringFieldDefinition def=new StringFieldDefinition(MyRecordDefinition.PayloadFieldName);
    def.setTransportDirective(TransportDirective.Full);
    def.setCompress(MyRecordDefinition.DefaultCompress);
    this.Fields = new FieldDefinition[]{def};
  }
 
  public MyRecordDefinition(boolean compress)
  {
    StringFieldDefinition def=new StringFieldDefinition(MyRecordDefinition.PayloadFieldName);
    def.setTransportDirective(TransportDirective.Full);
    def.setCompress(compress);
    this.Fields = new FieldDefinition[]{def};
  }
 
  @Override
  public boolean extendEquals(Object obj)
  {
    if(!(obj instanceof MyRecordDefinition)) return false;
    return true;
  }
 
  @Override
  public void extendDeflate(DataOutput out) throws GRS2RecordSerializationException
  {
    //nothing to deflate
  }
 
  @Override
  public void extendInflate(DataInput in) throws GRS2RecordSerializationException
  {
    //nothing to inflate
  }
}

Record

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import gr.uoa.di.madgik.grs.GRS2Exception;
import gr.uoa.di.madgik.grs.buffer.GRS2BufferException;
import gr.uoa.di.madgik.grs.record.GRS2RecordDefinitionException;
import gr.uoa.di.madgik.grs.record.GRS2RecordSerializationException;
import gr.uoa.di.madgik.grs.record.Record;
import gr.uoa.di.madgik.grs.record.field.Field;
import gr.uoa.di.madgik.grs.record.field.StringField;
 
public class MyRecord extends Record
{
  private String id="none";
  private String collection="none";
 
  public MyRecord()
  {
    this.setFields(new Field[]{new StringField()});
  }
 
  public MyRecord(String payload)
  {
    this.setFields(new Field[]{new StringField(payload)});
  }
 
  public MyRecord(String id,String collection)
  {
    this.setId(id);
    this.setCollection(collection);
    this.setFields(new Field[]{new StringField()});
  }
 
  public MyRecord(String id,String collection,String payload)
  {
    this.setId(id);
    this.setCollection(collection);
    this.setFields(new Field[]{new StringField(payload)});
  }
 
  public void setId(String id) { this.id = id; }
 
  public String getId() { return id; }
 
  public void setCollection(String collection) { this.collection = collection; }
 
  public String getCollection() { return collection; }
 
  public void setPayload(String payload) throws GRS2Exception
  {
    this.getField(MyRecordDefinition.PayloadFieldName).setPayload(payload);
  }
 
  public String getPayload() throws GRS2Exception
  {
    return this.getField(MyRecordDefinition.PayloadFieldName).getPayload();
  }
 
  @Override
  public StringField getField(String name) throws GRS2RecordDefinitionException, GRS2BufferException
  {
    return (StringField)super.getField(name);
  }
 
  @Override
  public void extendSend(DataOutput out) throws GRS2RecordSerializationException
  {
    this.extendDeflate(out);
  }
 
  @Override
  public void extendReceive(DataInput in) throws GRS2RecordSerializationException
  {
    this.extendInflate(in, false);
  }
 
  @Override
  public void extendDeflate(DataOutput out) throws GRS2RecordSerializationException
  {
    try
    {
      out.writeUTF(this.id);
      out.writeUTF(this.collection);
    }catch(IOException ex)
    {
      throw new GRS2RecordSerializationException("Could not deflate record", ex);
    }
  }
 
  @Override
  public void extendInflate(DataInput in, boolean reset) throws GRS2RecordSerializationException
  {
    try
    {
      this.id=in.readUTF();
      this.collection=in.readUTF();
    }catch(IOException ex)
    {
      throw new GRS2RecordSerializationException("Could not inflate record", ex);
    }
  }
 
  @Override
  public void extendDispose()
  {
    this.id=null;
    this.collection=null;
  }
}

Simple Example

In the following snippets, a simple writer and respective reader is presented to show the full usage of the presented elements

Writer

import java.net.URI;
import java.util.concurrent.TimeUnit;
import gr.uoa.di.madgik.grs.buffer.IBuffer.Status;
import gr.uoa.di.madgik.grs.proxy.IWriterProxy;
import gr.uoa.di.madgik.grs.record.GenericRecord;
import gr.uoa.di.madgik.grs.record.GenericRecordDefinition;
import gr.uoa.di.madgik.grs.record.RecordDefinition;
import gr.uoa.di.madgik.grs.record.field.Field;
import gr.uoa.di.madgik.grs.record.field.FieldDefinition;
import gr.uoa.di.madgik.grs.record.field.StringField;
import gr.uoa.di.madgik.grs.record.field.StringFieldDefinition;
import gr.uoa.di.madgik.grs.writer.GRS2WriterException;
import gr.uoa.di.madgik.grs.writer.RecordWriter;
 
public class StringWriter extends Thread
{
  private RecordWriter<GenericRecord> writer=null;
 
  public StringWriter(IWriterProxy proxy) throws GRS2WriterException
  {
    //The gRS will contain only one type of records which in turn contains only a single field 
    RecordDefinition[] defs=new RecordDefinition[]{          //A gRS can contain a number of different record definitions
        new GenericRecordDefinition((new FieldDefinition[] { //A record can contain a number of different field definitions
        new StringFieldDefinition("ThisIsTheField")          //The definition of the field
      }))
    };
    writer=new RecordWriter<GenericRecord>(
        proxy, //The proxy that defines the way the writer can be accessed
        defs   //The definitions of the records the gRS handles
      );
    }
 
    public URI getLocator() throws GRS2WriterException
    {
      return writer.getLocator();
    }
 
    public void run()
    {
      try
      {
        for(int i=0;i<500;i+=1)
        {
          //while the reader hasn't stopped reading
          if(writer.getStatus()!=Status.Open) break;
          GenericRecord rec=new GenericRecord();
          //Only a string field is added to the record as per definition
          rec.setFields(new Field[]{new StringField("Hello world "+i)});
          //if the buffer is in maximum capacity for the specified interval don;t wait any more
          if(!writer.put(rec,60,TimeUnit.SECONDS)) break;
        }
        //if the reader hasn't already disposed the buffer, close to notify reader that no more will be provided 
        if(writer.getStatus()!=Status.Dispose) writer.close();
      }catch(Exception ex)
      {
        ex.printStackTrace();
      }
    }
}

Reader

import gr.uoa.di.madgik.grs.reader.ForwardReader;
import gr.uoa.di.madgik.grs.reader.GRS2ReaderException;
import gr.uoa.di.madgik.grs.record.GenericRecord;
import gr.uoa.di.madgik.grs.record.field.StringField;
import java.net.URI;
 
public class StringReader extends Thread
{
  ForwardReader<GenericRecord> reader=null;
 
  public StringReader(URI locator) throws GRS2ReaderException
  {
    reader=new ForwardReader<GenericRecord>(locator);
  }
 
  public void run()
  {
    try
    {
      for(GenericRecord rec : reader)
      {
        //In case a timeout occurs while optimistically waiting for more records form an originally open writer
        if(rec==null) break;
        //Retrieve the required field of the type available in the gRS definitions
        System.out.println(((StringField)rec.getField("ThisIsTheField")).getPayload());
      }
      //Close the reader to release and dispose any resources in boith reader and writer sides
      reader.close();
    }catch(Exception ex)
    {
      ex.printStackTrace();
    }
  }
}

Main

import java.util.ArrayList;
import java.util.List;
import gr.uoa.di.madgik.commons.server.PortRange;
import gr.uoa.di.madgik.commons.server.TCPConnectionManager;
import gr.uoa.di.madgik.commons.server.TCPConnectionManagerConfig;
import gr.uoa.di.madgik.grs.proxy.tcp.TCPConnectionHandler;
import gr.uoa.di.madgik.grs.proxy.tcp.TCPStoreConnectionHandler;
import gr.uoa.di.madgik.grs.proxy.tcp.TCPWriterProxy;
 
public class MainTest
{
  public static void main(String []args) throws Exception
  {
    List<PortRange> ports=new ArrayList<PortRange>(); //The ports that the TCPConnection manager should use
    ports.add(new PortRange(3000, 3050));             //Any in the range between 3000 and 3050
    ports.add(new PortRange(3055, 3055));             //Port 3055
    TCPConnectionManager.Init(
      new TCPConnectionManagerConfig("localhost", //The hostname by which the machine is reachable 
        ports,                                    //The ports that can be used by the connection manager
        true                                      //If no port ranges were provided, or none of them could be used, use a random available port
    ));
    TCPConnectionManager.RegisterEntry(new TCPConnectionHandler());      //Register the handler for the gRS2 incoming requests
    TCPConnectionManager.RegisterEntry(new TCPStoreConnectionHandler()); //Register the handler for the gRS2 store incoming requests
 
    // gr.uoa.di.madgik.grs.proxy.local.LocalWriterProxy Could be used instead for local only, all in memory, access
    StringWriter writer=new StringWriter(new TCPWriterProxy());
    StringReader reader=new StringReader(writer.getLocator());
 
    writer.start();
    reader.start();
 
    writer.join();
    reader.join();
  }
}

Events

As was already mentioned, the gRS2 serves as a bidirectional communication channel through events that can be propagated from either the writer to the reader or form the reader to the writer along the normal data flow. These events can serve any purpose fitting to the application logic, from control signals to full data transport purposes, although in the latter case, the payload size should be kept moderate as it is always handled in memory and cannot use services such as partial transfer, or be declared using internally managed files.

An example that has been taken to the extremes follows. In this example, a writer and a reader use only events to propagate their data. One should notice that the communication protocol is now up to the reader and writer clients to be implemented. In the example case this has been handled in a quick and dirty way using sleep timeouts. Additionally, it should be noted that other than the basic key - value internally provided event placeholder, an extendable event "placeholder" has been provided to facilitate any type of custom object transport.

Writer

import java.net.URI;
import gr.uoa.di.madgik.grs.buffer.IBuffer.Status;
import gr.uoa.di.madgik.grs.events.BufferEvent;
import gr.uoa.di.madgik.grs.events.KeyValueEvent;
import gr.uoa.di.madgik.grs.events.ObjectEvent;
import gr.uoa.di.madgik.grs.proxy.IWriterProxy;
import gr.uoa.di.madgik.grs.record.GenericRecord;
import gr.uoa.di.madgik.grs.record.GenericRecordDefinition;
import gr.uoa.di.madgik.grs.record.RecordDefinition;
import gr.uoa.di.madgik.grs.record.field.FieldDefinition;
import gr.uoa.di.madgik.grs.record.field.StringFieldDefinition;
import gr.uoa.di.madgik.grs.writer.GRS2WriterException;
import gr.uoa.di.madgik.grs.writer.RecordWriter;
 
public class EventWriter extends Thread
{
  private RecordWriter<GenericRecord> writer=null;
 
  public EventWriter(IWriterProxy proxy) throws GRS2WriterException
  {
    //The gRS will contain only one type of records which in turn contains only a single field 
    RecordDefinition[] defs=new RecordDefinition[]{ //A gRS can contain a number of different record definitions
        new GenericRecordDefinition((new FieldDefinition[] { //A record can contain a number of different field definitions
        new StringFieldDefinition("ThisIsTheField") //The definition of the field
      }))
    };
    writer=new RecordWriter<GenericRecord>(
      proxy, //The proxy that defines the way the writer can be accessed
      defs //The definitions of the records the gRS handles
    );
  }
 
  public URI getLocator() throws GRS2WriterException
  {
    return writer.getLocator();
  }
 
  public void receiveWaitKeyValue() throws GRS2WriterException
  {
    while(true)
    {
      BufferEvent ev=writer.receive();
      if(ev==null) try{ Thread.sleep(100);}catch(Exception ex){}
      else 
      {
        System.out.println("Received event from reader : "+((KeyValueEvent)ev).getKey()+" - "+((KeyValueEvent)ev).getValue());
        break;
      }
    }
  }
 
  public void receiveWaitObject() throws GRS2WriterException
  {
    while(true)
    {
      BufferEvent ev=writer.receive();
      if(ev==null) try{ Thread.sleep(100);}catch(Exception ex){}
      else 
      {
        System.out.println("Reveived event from reader : "+((ObjectEvent)ev).getItem().toString());
        break;
      }
    }
  }
 
  public void run()
  {
    try
    {
      writer.emit(new KeyValueEvent("init", "Hello"));
      for(int i=0;i<5;i+=1) this.receiveWaitKeyValue();
      this.receiveWaitObject();
      if(writer.getStatus()!=Status.Dispose) writer.close();
    }catch(Exception ex)
    {
      ex.printStackTrace();
    }
  }
}

Reader

import gr.uoa.di.madgik.grs.events.BufferEvent;
import gr.uoa.di.madgik.grs.events.KeyValueEvent;
import gr.uoa.di.madgik.grs.events.ObjectEvent;
import gr.uoa.di.madgik.grs.reader.ForwardReader;
import gr.uoa.di.madgik.grs.reader.GRS2ReaderException;
import gr.uoa.di.madgik.grs.record.GenericRecord;
import java.net.URI;
 
public class EventReader extends Thread
{
  ForwardReader<GenericRecord> reader=null;
 
  public EventReader(URI locator) throws GRS2ReaderException
  {
    reader=new ForwardReader<GenericRecord>(locator);
  }
 
  public void run()
  {
    try
    {
      BufferEvent ev=reader.receive();
      System.out.println("Reveived event from writer : "+((KeyValueEvent)ev).getKey()+" - "+((KeyValueEvent)ev).getValue());
      reader.emit(new KeyValueEvent("info", "Hello..."));
      reader.emit(new KeyValueEvent("info", "Hello Again..."));
      reader.emit(new KeyValueEvent("info", "Hello And Again..."));
      reader.emit(new KeyValueEvent("info", "And again..."));
      reader.emit(new KeyValueEvent("info", "Bored already ?"));
      reader.emit(new ObjectEvent(new MyEvent("This is a custom object event", 234)));
      reader.close();
    }catch(Exception ex)
    {
      ex.printStackTrace();
    }
  }
}

MyEvent

import java.io.DataInput;
import java.io.DataOutput;
import gr.uoa.di.madgik.grs.record.GRS2RecordSerializationException;
import gr.uoa.di.madgik.grs.record.IPumpable;
 
public class MyEvent implements IPumpable
{
  private String message=null;
  private int count=0;
 
  public MyEvent(String message,int count)
  {
    this.message=message;
    this.count=count;
  }
 
  public String toString()
  {
    return "event payload message is : '"+this.message+"' with count '"+this.count+"'";
  }
 
  public void deflate(DataOutput out) throws GRS2RecordSerializationException
  {
    try
    {
      out.writeUTF(message);
      out.writeInt(count);
    }catch(Exception ex)
    {
      throw new GRS2RecordSerializationException("Could not serialize event", ex);
    }
  }
 
  public void inflate(DataInput in) throws GRS2RecordSerializationException
  {
    try
    {
      this.message=in.readUTF();
      this.count=in.readInt();
    }catch(Exception ex)
    {
      throw new GRS2RecordSerializationException("Could not deserialize event", ex);
    }
  }
 
  public void inflate(DataInput in, boolean reset) throws GRS2RecordSerializationException
  {
    this.inflate(in);
  }
}

Main

As in Simple Main, using the new writer and reader classes

  /*...*/
  EventWriter writer=new EventWriter(new TCPWriterProxy());
  EventReader reader=new EventReader(writer.getLocator());
  /*...*/

Multi Field Records

Writer

import java.io.File;
import java.net.URI;
import java.net.URL;
import java.util.concurrent.TimeUnit;
import gr.uoa.di.madgik.grs.buffer.IBuffer.Status;
import gr.uoa.di.madgik.grs.proxy.IWriterProxy;
import gr.uoa.di.madgik.grs.record.GenericRecord;
import gr.uoa.di.madgik.grs.record.GenericRecordDefinition;
import gr.uoa.di.madgik.grs.record.RecordDefinition;
import gr.uoa.di.madgik.grs.record.field.Field;
import gr.uoa.di.madgik.grs.record.field.FieldDefinition;
import gr.uoa.di.madgik.grs.record.field.FileField;
import gr.uoa.di.madgik.grs.record.field.FileFieldDefinition;
import gr.uoa.di.madgik.grs.record.field.StringField;
import gr.uoa.di.madgik.grs.record.field.StringFieldDefinition;
import gr.uoa.di.madgik.grs.record.field.URLField;
import gr.uoa.di.madgik.grs.record.field.URLFieldDefinition;
import gr.uoa.di.madgik.grs.writer.GRS2WriterException;
import gr.uoa.di.madgik.grs.writer.RecordWriter;
 
public class MultiWriter extends Thread
{
  private RecordWriter<GenericRecord> writer=null;
 
  public MultiWriter(IWriterProxy proxy) throws GRS2WriterException
  {
    //The gRS will contain only one type of records which in turn contains only a single field 
    RecordDefinition[] defs=new RecordDefinition[]{ //A gRS can contain a number of different record definitions
      new GenericRecordDefinition((new FieldDefinition[] { //A record can contain a number of different field definitions
        new StringFieldDefinition("MyStringField"), //The definition of a string field
        new StringFieldDefinition("MyOtherStringField"), //The definition of a string field
        new URLFieldDefinition("MyHTTPField"), //The definition of an http field
        new FileFieldDefinition("MyFileField") //The definition of a file field
      }))
    };
    writer=new RecordWriter<GenericRecord>(
      proxy, //The proxy that defines the way the writer can be accessed
      defs //The definitions of the records the gRS handles
    );
  }
 
  public URI getLocator() throws GRS2WriterException
  {
    return writer.getLocator();
  }
 
  public void run()
  {
    try
    {
      for(int i=0;i<500;i+=1)
      {
        //while the reader hasn't stopped reading
        if(writer.getStatus()!=Status.Open) break;
        GenericRecord rec=new GenericRecord();
        //Only a string field is added to the record as per definition
        Field[] fs=new Field[4];
        fs[0]=new StringField("Hello world "+i);
        fs[1]=new StringField("Hello world of Diligent");
        fs[2]=new URLField(new URL("http://www.d4science.eu/files/d4science_logo.jpg"));
        fs[3]=new FileField(new File("/bin/ls"));
        rec.setFields(fs);
        //if the buffer is in maximum capacity for the specified interval don't wait any more
        if(!writer.put(rec,60,TimeUnit.SECONDS)) break;
      }
      //if the reader hasn't already disposed the buffer, close to notify reader that no more will be provided 
      if(writer.getStatus()!=Status.Dispose) writer.close();
    }catch(Exception ex)
    {
      ex.printStackTrace();
    }
  }
}

Reader

import gr.uoa.di.madgik.grs.reader.ForwardReader;
import gr.uoa.di.madgik.grs.reader.GRS2ReaderException;
import gr.uoa.di.madgik.grs.record.GenericRecord;
import gr.uoa.di.madgik.grs.record.field.FileField;
import gr.uoa.di.madgik.grs.record.field.StringField;
import gr.uoa.di.madgik.grs.record.field.URLField;
import java.net.URI;
 
public class MultiReader extends Thread
{
  ForwardReader<GenericRecord> reader=null;
 
  public MultiReader(URI locator) throws GRS2ReaderException
  {
    reader=new ForwardReader<GenericRecord>(locator);
  }
 
  public void run()
  {
    try
    {
      for(GenericRecord rec : reader)
      {
        //In case a timeout occurs while optimistically waiting for more records form an originally open writer
        if(rec==null) break;
        //Retrieve the required field of the type available in the gRS definitions
        System.out.println(((StringField)rec.getField("MyStringField")).getPayload());
        System.out.println(((StringField)rec.getField("MyOtherStringField")).getPayload());
        System.out.println(((URLField)rec.getField("MyHTTPField")).getPayload());
        //In case of in memory communication, the target file will be the same as the original. In other case
        //the target file will be a local copy of the original file, and depending on the transport directive
        //set by the writer, it will be fully available, or it will gradually become available as data are read
        System.out.println(((FileField)rec.getField("MyFileField")).getPayload() +" : "+((FileField)rec.getField("MyFileField")).getOriginalPayload());
      }
      //Close the reader to release and dispose any resources in both reader and writer sides
      reader.close();
    }catch(Exception ex)
    {
      ex.printStackTrace();
    }
  }
}

Main

As in Simple Main, using the new writer and reader classes

  /*...*/
  MultiWriter writer=new MultiWriter(new LocalWriterProxy());
  MultiReader reader=new MultiReader(writer.getLocator());
  /*...*/

Retrieving Field Payload

As mentioned, depending on the transport directive set by the writer and the type of proxy used for the locator creation, the payload of some fields may not be fully available to the reader. However, the reader's clients should not have to deal with the details of this and should be able to uniformly access the payload of any field. Additionally, each field, depending on its characteristics, may support different transport directives, and may carry internally its payload, or it may only keep a reference to it. Therefore, only the field itself can be aware of how to access its own payload.

For example, the URLField, that is set to point to a remotely accessible http location, the field opens an HttpConnection and passes the stream to the client so that from then on he does not need to know the payloads actual location. For a FileField that is accessed remotely from the original file location, the data that are requested from the client are fetched remotely if not already available and are provided to the client through an intermediate local file. All these actions are performed in the background without the client having to perform any action.

The way this is achieved is by the MediationInputStream, an input stream implementation that wraps around the Field provided, protocol specific, input stream and from then on handles any remote transport that needs to be made. The field payload mediating input stream is provided by each field. Using the Mediating stream, data are transferred upon request using the field definition's defined chunk size to make the transport buffered. This does of course mean that depending on the size of the payload, a number of round trips need to be made. This will allow for less data been transferred in case only a part of the payload is needed, and also processing can be made on the initial parts while more data is retrieved. On the other hand, if the full payload is needed, the on demand transfer will impose additional overhead. In this cases, a specific field or in fact all the fields of a record can be requested to be made available if supported, in one blocking operation.

In the Multi Field example, if someone wants to read the HttpField and the FileField, the following snippet in the reader is what is needed

BufferedInputStream bin=new BufferedInputStream(rec.getField("MyHTTPField").getMediatingInputStream());
byte[] buffer = new byte[1024];
int bytesRead = 0;
while ((bytesRead = bin.read(buffer)) != -1) { /*...*/ }
 
/*...*/
 
BufferedInputStream bin=new BufferedInputStream(rec.getField("MyFileField").getMediatingInputStream());
byte[] buffer = new byte[1024];
int bytesRead = 0;
while ((bytesRead = bin.read(buffer)) != -1) { /*...*/ }

If the use case is that the a specific field is needed to be read entirely, as previously mentioned, one can request for the entire payload to be retrieved beforehand and not on request. For this, the following line needs to be added before the client starts reading the field payload as shown above

rec.getField("MyFileField").makeAvailable();

Random Access Reader

The example presented this far were targeting a forward only reader. There is also the alternative provided for a random access reader. This reader allows for random seeks forward and backward in the reader side. Since as was already mentioned, a record after it has been accessed by the reader is disposed by the writer, in order to achive going back to a previously accessed record, which is already disposed by the writer, all past records must be locally maintained in the reader side. To avoid bloating the in memory structures of the reader, already read records are persisted. When the client then wants to move back to a record he has already accessed, the desired record as well as a configurable record window ahead of it is restored from the persistency store. The window is always in front of the requested record assuming that forward movement is the predominent one. Note that in case of completely random seeks, a good strategy to achieve maximum performance is to disable the window by setting its size equal to 1. The persistency store that the past records are kept is internally configured. The only currently available one is a disk backed Random Access File. This of course inforces an overhead that for large data is not negligable. Short term plans include the usage of a different persistency backend to make the operation more performant.

One important detail in this, is that every record that is persisted in the reader side must be fully available once restored. That means that all payload that was previously served by the writer side must now be served directly from reader side. This means that whenever a record is persisted, the makeAvailable method is invoked on the record ensuring that any data that would be lost when the record would be disposed in the writer side, will now be directly accessible from the reader side.

In the following example, we assume a gRS created as in the case of the Simple Writer previously described. We present two ways to access the gRS in a random fashion, one using a list iterator, and one that provides random seeks.

Bidirectional Iterator

import gr.uoa.di.madgik.grs.reader.GRS2ReaderException;
import gr.uoa.di.madgik.grs.reader.RandomReader;
import gr.uoa.di.madgik.grs.record.GenericRecord;
import gr.uoa.di.madgik.grs.record.field.StringField;
import java.net.URI;
import java.util.ListIterator;
 
public class RandomIteratorReader extends Thread
{
  RandomReader<GenericRecord> reader=null;
 
  public RandomIteratorReader(URI locator) throws GRS2ReaderException
  {
    reader=new RandomReader<GenericRecord>(locator);
  }
 
  public void run()
  {
    try
    {
      ListIterator<GenericRecord> iter= reader.listIterator();
      while(iter.hasNext())
      {
        GenericRecord rec=iter.next();
        if(rec==null) break;
        System.out.println(((StringField)rec.getField("ThisIsTheField")).getPayload());
      }
      while(iter.hasPrevious())
      {
        GenericRecord rec=iter.previous();
        if(rec==null) break;
        System.out.println(((StringField)rec.getField("ThisIsTheField")).getPayload());
      }
      while(iter.hasNext())
      {
        GenericRecord rec=iter.next();
        if(rec==null) break;
        System.out.println(((StringField)rec.getField("ThisIsTheField")).getPayload());
      }
      reader.close();
    }catch(Exception ex)
    {
      ex.printStackTrace();
    }
  }
}

Random Access

import gr.uoa.di.madgik.grs.buffer.GRS2BufferException;
import gr.uoa.di.madgik.grs.buffer.IBuffer.Status;
import gr.uoa.di.madgik.grs.reader.GRS2ReaderException;
import gr.uoa.di.madgik.grs.reader.RandomReader;
import gr.uoa.di.madgik.grs.record.GRS2RecordDefinitionException;
import gr.uoa.di.madgik.grs.record.GenericRecord;
import gr.uoa.di.madgik.grs.record.field.StringField;
import java.net.URI;
import java.util.concurrent.TimeUnit;
 
public class RandomAccessReader extends Thread
{
  RandomReader<GenericRecord> reader=null;
 
  public RandomAccessReader(URI locator) throws GRS2ReaderException
  {
    reader=new RandomReader<GenericRecord>(locator);
  }
 
  private void read(int count) throws GRS2ReaderException, GRS2RecordDefinitionException, GRS2BufferException
  {
    for(int i=0;i<count;i+=1)
    {
      if(reader.getStatus()==Status.Dispose || (reader.getStatus()==Status.Close && reader.availableRecords()==0)) break;
      GenericRecord rec=reader.get(60, TimeUnit.SECONDS);
      if(rec==null) break;
      System.out.println(((StringField)rec.getField("ThisIsTheField")).getPayload());
    }
  }
 
  public void run()
  {
    try
    {
      this.read(20);
      reader.seek(-10);
      this.read(9);
      reader.seek(-5);
      this.read(3);
      reader.seek(10);
      this.read(5);
      reader.close();
    }catch(Exception ex)
    {
      ex.printStackTrace();
    }
  }
}

Main

As in Simple Main, using the appropriate respective new reader classe

  StringWriter writer=new StringWriter(new LocalWriterProxy());
  RandomIteratorReader reader1=new RandomIteratorReader(writer.getLocator());
  RandomAccessReader reader2=new RandomAccessReader(writer.getLocator());

Store

As already mentioned, the gRS2 is point to point only and once something has been read it is disposed. Furthermore, once a reader has been initialized, any additional request from a different reader seeking to utilize the same locator to access the previously accessed gRS2, will fail. However, there are cases where one might want to have a created gRS2 buffer that is reutilized by multiple readers. This is a case different to the one described in Random Access Reader, as in the latter, the gRS2 is read once from its source and it is persisted in local storage from then on. Also, the gRS2 is accessible by the same reader multiple times. In contrast, in the former case, every accessor of the stored gRS2, is a new reader, who can in turn be read by a random access one or only a forward one.

Creating a gRS2 store, a client can control its behaviour with some parameterization

  • The Store input is provided and it is not necessarily limited to a single incoming gRS2. Any number of incoming gRS2 can be provided and as a result the content of all of them will be multiplexed in a single new gRS2
  • The type of multiplexing that will take place between the number of available inputs, is a matter of parametrization and currently only two ways are available, a FIFO and a First Available policy
  • The amount of time that the created gRS2 will be maintained while inactive and not accessed by any reader is also provided at initialization time

The gRS2 Store is meant to be used as a checkpoint utility and is to be used only at appropriate use cases because of the performance penalties and resource consumptions it imposes. These can be identified in the following two:

  • A persistency store is utilized by the gRS2 Store in order to have all content of the incoming locators locally available and be able to serve them without relying on the possibly remote or even local input buffers which will be disposed and become unavailable. For this reason everything is brought under the management of the gRS2 Store management and persisted. The only store backend currently available is one based on a Random Access File which imposes non neglectable performance penalties. A new more efficient backend is planed for the short term evolution of the gRS
  • Although the gRS2 aims to minimize unnecessary record creation and movement, at most equal to the buffer capacity, the gRS2 Store operates on a different hypothesis. As already mentioned, its primary use is as part of a checkpointing operation. To this end, the Store once initialized will greedily read its inputs until the end no matter if some reader is accessing the stored buffer

As already mentioned, when using the gRS2 Store, there is no need to modify anything in either the writer or the reader. the Store is a utility and thus it can be used externally and without affecting the implicated components. The following example demonstrates the needed addition to persist the output of a number of writers and have a number of readers accessing the multiplexed output.

  StringWriter writer1=new StringWriter(new TCPWriterProxy());
  StringWriter writer2=new StringWriter(new LocalWriterProxy());
  StringWriter writer3=new StringWriter(new TCPWriterProxy());
  StringWriter writer4=new StringWriter(new LocalWriterProxy());
 
  URI []locators=new URI[]{writer1.getLocator(),writer2.getLocator(),writer3.getLocator(),writer4.getLocator()};
  //The respective TCPStoreWriterProxy.store operation could be used to create a locator with a different usage scope
  URI locator=LocalStoreWriterProxy.store(locators, IBufferStore.MultiplexType.FIFO, 60*60, TimeUnit.SECONDS);
 
  StringReader reader1=new StringReader(locator);
  RandomAccessReader reader2=new RandomAccessReader(locator);
  StringReader reader3=new StringReader(locator);
  RandomIteratorReader reader4=new RandomIteratorReader(locator);
  StringReader reader5=new StringReader(locator);
 
  //Start everything
  writer1.start();
  /*...*/
  reader5.start();
 
  //Join everyhting
  writer1.join();
  /*...*/
  reader5.join();

Since it is evident that through the gRS2 Store the writer and reader are disconnected, any events that are emitted by the writer, will reach the reader either the time it is emitted, if the reading is performed wile the store is actively persisting its inputs, or at initialization time of the reader where all the events that have been emitted up until that point are emitted all at once. Events that are emitted by the reader will never reach the writers whose output is persisted.

Utilities and Convenience tools

Record Import

It is quite common that a consumer might wish to forward a record to a producer using the same record definition, either completely unaltered or with minor modifications. To avoid explicitly copying the record, thus wasting valuable computational resources, one has the option of importing records directly to a producer. This can be done with the importRecord methods, which have the same signatures as the put methods used to add entirely new records to a writer.

The following snippet demonstrates how one can forward all records read from a reader to a writer without making any modification

for(GenericRecord rec: reader) {
   if(writer.getStatus() == Status.Close || writer.getStatus() == Status.Dispose)
      break;
   if(!writer.importRecord(rec, 60, TimeUnit.SECONDS))
	break;
}

The following snippet demonstrates how one can modify a known record field, leaving all other fields unchanged. In fact, one can be unaware of the existence of fields other than those which one is interested.

for(GenericRecord rec: reader) {
   ((StringField)rec.getField("payloadField")).setPayload("Modified payload");
   if(writer.getStatus() == Status.Close || writer.getStatus() == Status.Dispose)
      break;
   if(!writer.importRecord(rec, 60, TimeUnit.SECONDS))
	break;
}

Keep-Alive Functionality

Both forward and random access readers can have keep-alive functionality added to them using a typical decorator pattern. This functionality can be useful if a consumer wishes to delay the consumption of records for long periods of time and the producer has no indication of this behavior. In simple cases when the implementation of a transport protocol using events is not desirable, a keep-alive reader can be used in order to keep the communication channel open, by periodically reading records. Keep-alive functionality is transparent to the client, as each time a get method is called, the record either originates from the set of prefetched records, or it is actually read directly from the underlying reader. The order of records is guaranteed to be the same as if the underlying reader was used on its own. The following snippet demonstrates the creation of a keep-alive reader with a period of record retrieval of 30 seconds:

IRecordReader<Record> reader = new ForwardReader<Record>(this.locator);
reader = new KeepAliveReader<Record>(reader, 30, TimeUnit.SECONDS);

Locator Utilities

The Locators class provides locator-related convenience methods. Currently, the class provides a Local-to-TCP converter utility which can be useful if it cannot be known in advance whether a produced resultset should be available only locally or be exposed remotely through TCP. The result producer could then produce its results locally and the results can be exposed through TCP by converting the locator to a TCP locator if it is deemed necessary. The exact way of converting a local locator to TCP is shown in the following snippet:

URI localLocator = ... //An operation returning a local locator
URI TCPLocator = Locators.localToTCP(localLocator);

HTTP Protocol

gRS can also work on top of HTTP instead of TCP. The transferred messages are in XML format. There are corresponding HTTP classes for ConnectionManger, Proxies, etc that can be used instead of the TCP classes to enable this functionality.

Note that a both ends need to use the same protocol in order to be able to communicate.