/**
 *
 * EXTERNAL STORAGE DISK BASED BUCKETED HASH TABLE...
 *
 * This use Object I/O to store and fetch objects from 
 * a file.
 *
 * @author Matthew W. Coan
 * @version Tue Sep  2 11:26:37 EDT 2014
 */
import java.io.*;
import java.util.*;

public class DiskHashtable< Key, Data > {
   private long size;
   private long tbl_size;
   private long free_list;
   private RandomAccessFile file;
   private int my_size;
 
   public static final long NULL = -1L;

   public DiskHashtable(String file_name) throws IOException {
      file = new RandomAccessFile(file_name, "rw");      
      tbl_size = file.readLong();
      size = file.readLong();
      my_size = 0;
   }

   public DiskHashtable(String file_name, long tbl_size) throws IOException {
      this.tbl_size = tbl_size;
      size = 0L;
      free_list = NULL;
      file = new RandomAccessFile(file_name, "rw");      
      file.writeLong(tbl_size);
      file.writeLong(size);
      file.writeLong(free_list);
      for(long i = 0L; i < tbl_size; i++) {
         file.writeLong(-1L);
      }
   }

   private void save_size() throws IOException {
      long pos = file.getFilePointer();
      file.seek(8);
      file.writeLong(size);
      file.seek(pos);
   }

   public void put(Key key, Data data) throws IOException,ClassNotFoundException {
      file.seek(8+8+8);
      long index = Math.abs(key.hashCode()) % tbl_size;
      long addr = 8+8+8+index*8;
      file.seek(addr);
      long list_addr = file.readLong();
      if(list_addr == NULL) {
         int the_size = 8 + 4 + key_size(key) + 4 + data_size(data);

         list_addr = d_alloc(the_size);

         file.seek(addr);

         file.writeLong(list_addr);

         file.seek(list_addr);

         long next = NULL;

         my_size = 0;
         file.writeLong(next);
         write_key(key);
         write_data(data);

         size++;
         save_size();
      }
      else {
         if(get(key) != null) {
            remove(key);

if(get(key) != null) {
   System.out.println("GOT IT...\n");
   try{System.in.read();}catch(Exception ex){}
}

            put(key, data);
         }
         else { 
            int the_size = 8 + 4 + key_size(key) + 4 + data_size(data);

            list_addr = d_alloc(the_size);

            file.seek(addr);

            long next = file.readLong();

            file.seek(addr);

            file.writeLong(list_addr);

            file.seek(list_addr);

            file.writeLong(next);
            write_key(key);
            write_data(data);

            size++;
            save_size();
         }
      }
   }

   public int key_size(Key key) throws IOException {
      ByteArrayOutputStream ba = new ByteArrayOutputStream();
      ObjectOutputStream out = new ObjectOutputStream(ba);
      out.writeObject(key);
      out.close();
      return ba.toByteArray().length;
   }

   public int data_size(Data data) throws IOException {
      ByteArrayOutputStream ba = new ByteArrayOutputStream();
      ObjectOutputStream out = new ObjectOutputStream(ba);
      out.writeObject(data);
      out.close();
      return ba.toByteArray().length;
   }

   @SuppressWarnings({"unchecked"})
   public Key read_key() throws IOException,ClassNotFoundException {
      int sz = file.readInt();
      my_size += sz + 4;
      byte data[] = new byte[sz];
      file.read(data);
      ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(data));
      Object ret = in.readObject();
      in.close();
      return (Key)ret;
   }

   @SuppressWarnings({"unchecked"})
   public Data read_data() throws IOException,ClassNotFoundException {
      int sz = file.readInt();
      byte data[] = new byte[sz];
      my_size += sz + 4;
      file.read(data);
      ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(data));
      Object ret = in.readObject();
      in.close();
      return (Data)ret;
   }

   public void write_key(Key key) throws IOException {
      ByteArrayOutputStream bout = new ByteArrayOutputStream();
      ObjectOutputStream out = new ObjectOutputStream(bout);
      out.writeObject(key);
      byte temp[] = bout.toByteArray();
      file.writeInt(temp.length);
      file.write(temp);
   }

   public void write_data(Data data) throws IOException {
      ByteArrayOutputStream bout = new ByteArrayOutputStream();
      ObjectOutputStream out = new ObjectOutputStream(bout);
      out.writeObject(data);
      byte temp[] = bout.toByteArray();
      file.writeInt(temp.length);
      file.write(temp);
   }

   @SuppressWarnings({"unchecked"})
   public Data get(Key key) throws IOException,ClassNotFoundException {
      file.seek(8+8+8);
      long index = Math.abs(key.hashCode()) % tbl_size;
      long addr = 8+8+8+index*8;
      file.seek(addr);
      long list_addr = file.readLong();
      long next = list_addr;
      Object current_key;
      Object data = null;
      while(next != NULL) {
         file.seek(next);
         next = file.readLong(); 
         current_key = read_key();
         if(current_key.equals(key)) {
            data = read_data();
            break;
         }
      }
      return (Data)data;
   }

   private void d_free(long addr) throws IOException,ClassNotFoundException {
      file.seek(8+8);
      long head = file.readLong();
      long next = head;
      long tnext;
      long taddr;
      long size; 
      long free;
      long temp;
      while(next != NULL) {
         file.seek(next);
         tnext = next;
         next = file.readLong();
         taddr = file.readLong();
         size = file.readLong();
         free = file.readLong();

         if(taddr == addr) {
            file.seek(tnext+8+8+8);
            file.writeLong(1L);
         }
      }
   }

   private long d_alloc(long size) throws IOException,ClassNotFoundException {
      file.seek(8+8);

      long my_head = file.readLong();
      long my_next = my_head;
      long my_addr;
      long my_size;
      long my_free;
      long last = NULL;
      long current_size = Long.MAX_VALUE;
      long addr, ret = NULL;

      while(my_next != NULL) {
         file.seek(my_next);
         addr = my_next;
         my_next = file.readLong();
         my_addr = file.readLong();
         my_size = file.readLong();
         my_free = file.readLong();
         if(my_size >= size && my_size < current_size) {
            current_size = my_size;
            ret = addr+8+8+8;
         }
      }

      if(ret == NULL) {
         file.seek(file.length());
         ret = file.getFilePointer();
/*
         byte ch = '\0';
         for(long i = 0; i < size; i++) {
            file.writeByte(ch);
         }
         file.seek(8+8);
         file.writeLong(ret);
*/
      }

      return ret;
   }

   public void remove(Key key) throws IOException,ClassNotFoundException {
      long index = Math.abs(key.hashCode()) % tbl_size;
      long addr = 8+8+8+index*8;
      file.seek(addr);
      long next = file.readLong();
      Key current_key;
      long prev = NULL;
      long last = NULL;

      int i = 0;
      while(next != NULL) {
         file.seek(next);
         prev = last;
         last = next;
         next = file.readLong();
         current_key = read_key();
         if(current_key.equals(key)) {
            if(i == 0) {
               file.seek(addr);
               file.writeLong(next);
               d_free(last);
               size--;
               save_size();
               break;
            }
            else {
               file.seek(prev);
               file.writeLong(next);
               d_free(last);
               size--;
               save_size();
               break;
            }
         }
         i++;
      }
   }

   public long size() { 
      return size; 
   }

   public void close() throws IOException {
      file.close();
   }

   public static void main(String args[]) throws IOException,ClassNotFoundException {
      if(args.length == 2) {
         DiskHashtable< String, String > hash_tbl= new DiskHashtable< String, String >("hash_data.dat");
         hash_tbl.put(args[0], args[1]);
      }
      else if(args.length == 1) {
         DiskHashtable< String, String > hash_tbl= new DiskHashtable< String, String >("hash_data.dat");
         String data = hash_tbl.get(args[0]);
         System.out.println(args[0] + " -> " + data);
      }
      else if(args.length == 0) {
         DiskHashtable< String, String > hash_tbl= new DiskHashtable< String, String >("hash_data.dat", 1001);
      }
   }
}
