/* ***************************************************************************
FILE   : DistributedQueue.java
SUBJECT: Lamport's distributed queues.
AUTHOR : (C) Copyright 2011 by Peter C. Chapin

Please send comments or bug reports to

     Peter C. Chapin
     Computer Information Systems
     Vermont Technical College
     Randolph Center, VT 05061
     PChapin@vtc.vsc.edu
*************************************************************************** */

import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetAddress;
import java.util.ArrayList;

/**
 * This class implements the distributed queue as required by Lamport's distributed mutual
 * exclusion algorithm. This queue contains an array with one element for each node in the
 * distributed system. The methods are synchronized so that multiple threads can access the
 * queue in a coordinated manner.
 */
public class DistributedQueue {

    // These values are used for messageType in class Message.
    public static final int NONE    = 0;
    public static final int REQUEST = 1;
    public static final int REPLY   = 2;
    public static final int RELEASE = 3;


    // This class holds information about messages that are kept in the queue.
    private static class Message {
        int messageType;
        int timeStamp;

        public Message(int type, int time)
        {
            messageType = type;
            timeStamp = time;
        }

        public int getType()
         { return messageType; }

        public int getTime()
          { return timeStamp; }
    }


    private int myNode;
    private int clock = 0;
    private ArrayList<Message> theArray;


    // Constructor sets up the initial array.
    public DistributedQueue(int nodeID)
    {
        myNode = nodeID;

        // I know there are at least n + 1 nodes in the network.
        theArray = new ArrayList<Message>(nodeID + 1);
        for (int i = 0; i < nodeID + 1; ++i) {
            theArray.add(new Message(NONE, 0));
        }
    }


    /**
     * Attempts to acquire the resource. This method returns before the resource has been
     * acquired (it might take a while to acquire the resource so asynchronous acquisition makes
     * sense).
     */
    synchronized void requestResource(DatagramSocket socket)
        throws java.io.IOException
    {
        // If I already have a request pending, do nothing.
        if (theArray.get(myNode).getType() == REQUEST) return;

        // Otherwise make the request.
        clock++;
        Message newMessage = new Message(REQUEST, clock);
        theArray.set(myNode, newMessage);

        String data = new String(Integer.toString(REQUEST) + ":" +
                                 Integer.toString(clock)   + ":" +
                                 Integer.toString(myNode));
        byte[] rawData = data.getBytes();
        DatagramPacket outgoing = new DatagramPacket(
            rawData,
            rawData.length,
            InetAddress.getByName("127.0.0.1"),
            9999);
        socket.send(outgoing);
    }


    // When the network listening thread receives a message it calls this method to update the
    // queue and perhaps send a reply. The time parameter is the timestamp on the message.
    //
    synchronized void messageReceived(
        int            type,
        int            time,
        int            sourceNode,
        DatagramSocket socket,
        InetAddress    sourceIP) throws java.io.IOException
    {
        Message newMessage = new Message(type, time);

        // Advance the clock to be one more than the max(clock, time).
        if (time > clock) clock = time + 1;
          else clock = clock + 1;

        // Expand the array if necessary.
        if (sourceNode >= theArray.size()) {
            int newCount = sourceNode - theArray.size() + 1;
            for (int i = 0; i < newCount; ++i) {
                theArray.add(new Message(NONE, 0));
            }
        }

        // Install the message in the array.
        theArray.set(sourceNode, newMessage);

        // Send a reply if necessary.
        if (type == REQUEST && theArray.get(myNode).getType() != REQUEST) {
            String data = new String(Integer.toString(REPLY) + ":" +
                                     Integer.toString(clock) + ":" +
                                     Integer.toString(myNode));
            byte[] rawData = data.getBytes();
            DatagramPacket outgoing =
                new DatagramPacket(rawData, rawData.length, sourceIP, 9999);
            socket.send(outgoing);
        }
    }


    // If there is an outstanding resource request that can now be acted on, this method will
    // return true. The caller is then free to use the resource. This method should be called
    // after each messageReceived() call that occurs while a request is pending.
    //
    synchronized boolean resourceReady()
    {
        Message myMessage;

        // First check to be sure there is a pending request from me!
        myMessage = theArray.get(myNode);
        if (myMessage.getType() != REQUEST) return false;

        // Now see if my message is the oldest in the array.
        int myTime = myMessage.getTime();
        for (int i = 0; i < theArray.size(); ++i) {
            if (i == myNode) continue;
            Message someMessage = theArray.get(i);
            if (someMessage.getTime() <  myTime) return false;
            if (someMessage.getTime() == myTime && i < myNode) return false;
        }
        return true;
    }


    // Call this method when the resource is no longer needed. It updates the queue and
    // broadcasts a release message to every node in the network. This method can also be called
    // to abort a pending request.
    //
    synchronized void releaseResource(DatagramSocket socket)
        throws java.io.IOException
    {
        // If I don't have a request pending, do nothing.
        if (theArray.get(myNode).getType() != REQUEST) return;

        // Otherwise make the release.
        clock++;
        Message newMessage = new Message(RELEASE, clock);
        theArray.set(myNode, newMessage);

        String data = new String(Integer.toString(RELEASE) + ":" +
                                 Integer.toString(clock)   + ":" +
                                 Integer.toString(myNode));
        byte[] rawData = data.getBytes();
        DatagramPacket outgoing = new DatagramPacket(
            rawData,
            rawData.length,
            InetAddress.getByName("127.0.0.1"),
            9999);
        socket.send(outgoing);
    }
}
