Changeset 1081 for raptor-client


Ignore:
Timestamp:
08/25/11 17:28:01 (9 years ago)
Author:
philsmart
Message:
 
File:
1 edited

Legend:

Unmodified
Added
Removed
  • raptor-client/trunk/src/main/java/uk/ac/cardiff/raptor/store/impl/PersistantEventHandler.java

    r1043 r1081  
    1717 * 
    1818 */ 
     19 
    1920package uk.ac.cardiff.raptor.store.impl; 
    2021 
     
    2627import java.util.Set; 
    2728 
    28 import org.springframework.dao.DataAccessException; 
    2929import org.joda.time.DateTime; 
    3030import org.slf4j.Logger; 
    3131import org.slf4j.LoggerFactory; 
     32import org.springframework.dao.DataAccessException; 
    3233 
    3334import uk.ac.cardiff.model.event.Event; 
     
    3839/** 
    3940 * @author philsmart 
    40  * 
     41 *  
    4142 */ 
    4243public class PersistantEventHandler implements EventHandler { 
    43      
    44         /** class logger. */ 
    45         private final Logger log = LoggerFactory.getLogger(PersistantEventHandler.class); 
    46  
    47         /** data connection used to persist entries. */ 
    48         private RaptorDataConnection dataConnection; 
    49  
    50  
    51         /** Used to hold events temporarily before they are persisted, allows 
    52          * resilience if events can not be immediately stored, for example 
    53          * failure of the underlying persistent store. Also, as its a set, it 
    54          * prevents the addition of duplicate values */ 
    55         private Set<Event> persistQueue; 
    56  
    57         public PersistantEventHandler(RaptorDataConnection dataConnection) { 
    58                 this.setDataConnection(dataConnection); 
    59                 persistQueue = new HashSet<Event>(); 
    60  
    61         } 
    62  
    63         /** 
    64          * Initialises the entry handler. In particular, loads all entries from the 
    65          * main datastore, through the <code>dataConnection</code> instance. 
    66          */ 
    67         public void initialise() { 
    68                 log.info("Persistant entry handler [{}] initialising", this); 
    69                 log.info("Persistent data store has {} entries for [{}]", this.getNumberOfEvents(),this); 
    70                 log.info("Persistant entry handler [{}] started", this); 
    71         } 
    72  
    73  
    74         public List query(String query) { 
    75                  log.trace("SQL query to entry handler [{}]",query); 
    76                 return dataConnection.runQuery(query, null); 
    77         } 
    78  
    79         public Object queryUnique(String query) { 
    80                  log.trace("SQL query to entry handler [{}]",query); 
    81                 return dataConnection.runQueryUnique(query, null); 
    82         } 
    83  
    84         public List query(String query, Object[] parameters) { 
    85                 log.trace("SQL query to entry handler [{}], with parameters [{}]",query,Arrays.asList(parameters)); 
    86                 return dataConnection.runQuery(query, parameters); 
    87         } 
    88  
    89         public Object queryUnique(String query, Object[] parameters) { 
    90                 return dataConnection.runQueryUnique(query, parameters); 
    91         } 
    92  
    93         public void update(String query, Object[] parameters) throws StorageException{ 
    94                 try{ 
    95                         dataConnection.runUpdate(query, parameters); 
    96                 } 
    97                 catch(DataAccessException e){ 
    98                         throw new StorageException("Could not perform entry handler update",e); 
    99                 } 
    100         } 
    101  
    102         public List query(String query, Object[] parameters, int maxNoResults) { 
    103               return dataConnection.runQuery(query, parameters, maxNoResults); 
    104         } 
    105  
    106         public void save(Object object) throws StorageException { 
    107                 try{ 
    108                         dataConnection.save(object); 
    109                 } 
    110                 catch (DataAccessException e){ 
    111                         throw new StorageException("Could not save object",e); 
    112                 } 
    113         } 
    114  
    115         public void saveAll(Collection object) throws StorageException { 
    116                 try{ 
    117                         dataConnection.saveAll(object); 
    118                 } 
    119                 catch (DataAccessException e){ 
    120                         throw new StorageException("Could not save collection",e); 
    121                 } 
    122         } 
    123  
    124         /** 
    125          * The <code>entries</code> are stored in the <code>persistQueue</code> until they are persisted. If an exception is thrown before 
    126          * they are persisted, they remain in the <code>persistQueue</code>. This method then saves this collection in batch, as opposed to the 
    127          * <code>addEntry</code> method which stores events one at a time. In order to detect duplicates, a query is run over the database in order 
    128          * to check duplicates within already stored events, as well as adding new events to a <code>Set</code> <code>persist</code> which 
    129          * prevents duplicates in the incomming <code>List</code> of entries 
    130          * 
    131          * @param entries the list of events that are to be stored 
    132          * @throws 
    133          */ 
    134         public void addEvents(List<Event> entries) throws StorageException{ 
    135                 log.info("Persistent Entry Handler has {} entries, with {} new entries inputted, and {} exist in the queue", 
    136                                 new Object[]{this.getNumberOfEvents(), entries.size(),persistQueue.size()}); 
    137  
    138                 int duplicates = 0; 
    139                 persistQueue.addAll(entries); 
    140  
    141                 Set<Event> persist = new HashSet<Event>(); 
    142                 for (Event event : persistQueue) { 
    143                         String query ="select count(*) from "+event.getClass().getSimpleName()+" where eventTime = ? and eventId =?"; 
    144                         Object[] parameters= new Object[]{event.getEventTime(),event.getEventId()}; 
    145                         long storedDuplicates = ((Long) dataConnection.runQueryUnique(query, parameters)).intValue(); 
    146                         if (storedDuplicates == 0){ 
    147                             persist.add(event); 
    148                         } 
    149                         else{ 
    150                             duplicates++; 
    151                         } 
    152                 } 
    153                 try{ 
    154                     dataConnection.saveAll(persist); 
    155                 } 
    156                 catch(DataAccessException e){ 
    157                     throw new StorageException("Could not persist events",e); 
    158                 } 
    159                 persistQueue.clear(); 
    160                 log.info("Total No. of Entries after addition = {}, finding {} duplicates", this.getNumberOfEvents(), duplicates); 
    161         } 
    162  
    163         public boolean addEvent(Event event) { 
    164  
    165                 String query ="select count(*) from "+event.getClass().getSimpleName()+" where eventTime = ? and eventId =?"; 
    166                 Object[] parameters= new Object[]{event.getEventTime().toDate(),event.getEventId()}; 
    167                 int numberOfDuplicates = ((Integer) dataConnection.runQueryUnique(query, parameters)).intValue(); 
    168  
    169                 if (numberOfDuplicates == 0){ 
    170                    dataConnection.save(event); 
    171                    return true; 
    172                 } 
    173                 else{ 
    174                         log.error("Duplicated event found\n{}", event); 
    175                         return false; 
    176                 } 
    177  
    178         } 
    179  
    180         public List<Event> getEvents() { 
    181                 List<Event> runQuery = dataConnection.runQuery("from Event",null); 
    182                 return runQuery; 
    183         } 
    184  
    185         public void removeAllEvents() { 
    186                 log.error("Method removeAllEntries not yet implemented"); 
    187         } 
    188  
    189         public void setDataConnection(RaptorDataConnection dataConnection) { 
    190                 this.dataConnection = dataConnection; 
    191         } 
    192  
    193         public RaptorDataConnection getDataConnection() { 
    194                 return dataConnection; 
    195         } 
    196  
    197         public void setEvents(Set<Event> entries) { 
    198  
    199         } 
    200  
    201         public long getNumberOfEvents() { 
    202                 Object result = dataConnection.runQueryUnique("select count(*) from Event", null); 
    203                 return (Long) result; 
    204         } 
    205  
    206         public DateTime getLatestEventTime(){ 
    207                 return (DateTime) dataConnection.runQueryUnique("select max(eventTime) from Event", null); 
    208         } 
    209  
    210         //TODO Implementation may not work - PLEASE DO USE 
    211         public void removeEventsBefore(DateTime earliestReleaseTime, Set<Integer> latestEqualEntries) { 
    212                 dataConnection.runQueryUnique("delete from Event where eventTime < ?", new Object[]{earliestReleaseTime.toDate()}); 
    213                 for (Iterator<Integer> entries = latestEqualEntries.iterator();entries.hasNext();){ 
    214                     Integer hash = entries.next(); 
    215                     dataConnection.runQueryUnique("delete from Event where hashCode = ?", new Object[]{hash}); 
     44 
     45    /** class logger. */ 
     46    private final Logger log = LoggerFactory.getLogger(PersistantEventHandler.class); 
     47 
     48    /** data connection used to persist entries. */ 
     49    private RaptorDataConnection dataConnection; 
     50 
     51    /** 
     52     * Used to hold events temporarily before they are persisted, allows resilience if events can not be immediately 
     53     * stored, for example failure of the underlying persistent store. Also, as its a set, it prevents the addition of 
     54     * duplicate values 
     55     */ 
     56    private Set<Event> persistQueue; 
     57 
     58    public PersistantEventHandler(RaptorDataConnection dataConnection) { 
     59        this.setDataConnection(dataConnection); 
     60        persistQueue = new HashSet<Event>(); 
     61 
     62    } 
     63 
     64    /** 
     65     * Initialises the entry handler. In particular, loads all entries from the main datastore, through the 
     66     * <code>dataConnection</code> instance. 
     67     */ 
     68    public void initialise() { 
     69        log.info("Persistant entry handler [{}] initialising", this); 
     70        log.info("Persistent data store has {} entries for [{}]", this.getNumberOfEvents(), this); 
     71        log.info("Persistant entry handler [{}] started", this); 
     72    } 
     73 
     74    public List query(String query) { 
     75        log.trace("SQL query to entry handler [{}]", query); 
     76        return dataConnection.runQuery(query, null); 
     77    } 
     78 
     79    public Object queryUnique(String query) { 
     80        log.trace("SQL query to entry handler [{}]", query); 
     81        return dataConnection.runQueryUnique(query, null); 
     82    } 
     83 
     84    public List query(String query, Object[] parameters) { 
     85        log.trace("SQL query to entry handler [{}], with parameters [{}]", query, Arrays.asList(parameters)); 
     86        return dataConnection.runQuery(query, parameters); 
     87    } 
     88 
     89    public Object queryUnique(String query, Object[] parameters) { 
     90        return dataConnection.runQueryUnique(query, parameters); 
     91    } 
     92 
     93    public void update(String query, Object[] parameters) throws StorageException { 
     94        try { 
     95            dataConnection.runUpdate(query, parameters); 
     96        } catch (DataAccessException e) { 
     97            throw new StorageException("Could not perform entry handler update", e); 
     98        } 
     99    } 
     100 
     101    public List query(String query, Object[] parameters, int maxNoResults) { 
     102        return dataConnection.runQuery(query, parameters, maxNoResults); 
     103    } 
     104 
     105    public void save(Object object) throws StorageException { 
     106        try { 
     107            dataConnection.save(object); 
     108        } catch (DataAccessException e) { 
     109            throw new StorageException("Could not save object", e); 
     110        } 
     111    } 
     112 
     113    public void saveAll(Collection object) throws StorageException { 
     114        try { 
     115            dataConnection.saveAll(object); 
     116        } catch (DataAccessException e) { 
     117            throw new StorageException("Could not save collection", e); 
     118        } 
     119    } 
     120 
     121    /** 
     122     * The <code>entries</code> are stored in the <code>persistQueue</code> until they are persisted. If an exception is 
     123     * thrown before they are persisted, they remain in the <code>persistQueue</code>. This method then saves this 
     124     * collection in batch, as opposed to the <code>addEntry</code> method which stores events one at a time. In order 
     125     * to detect duplicates, a query is run over the database in order to check duplicates within already stored events, 
     126     * as well as adding new events to a <code>Set</code> <code>persist</code> which prevents duplicates in the 
     127     * incomming <code>List</code> of entries 
     128     *  
     129     * @param entries the list of events that are to be stored 
     130     * @throws 
     131     */ 
     132    public void addEvents(final List<Event> entries) throws StorageException { 
     133        log.info("Persistent Entry Handler has {} entries, with {} new entries inputted, and {} exist in the queue", 
     134                new Object[] {this.getNumberOfEvents(), entries.size(), persistQueue.size()}); 
     135 
     136        int duplicates = 0; 
     137        persistQueue.addAll(entries); 
     138 
     139        Set<Event> persist = new HashSet<Event>(); 
     140        for (Event event : persistQueue) { 
     141            String query = 
     142                    "select count(*) from " + event.getClass().getSimpleName() + " where eventTime = ? and eventId =?"; 
     143            Object[] parameters = new Object[] {event.getEventTime(), event.getEventId()}; 
     144            long storedDuplicates = ((Long) dataConnection.runQueryUnique(query, parameters)).intValue(); 
     145            if (storedDuplicates == 0) { 
     146                persist.add(event); 
     147            } else { 
     148                duplicates++; 
     149            } 
     150        } 
     151        try { 
     152            dataConnection.saveAll(persist); 
     153        } catch (DataAccessException e) { 
     154            throw new StorageException("Could not persist events", e); 
     155        } 
     156        persistQueue.clear(); 
     157        log.info("Total No. of Entries after addition = {}, finding {} duplicates", this.getNumberOfEvents(), 
     158                duplicates); 
     159    } 
     160 
     161    public boolean addEvent(final Event event) { 
     162 
     163        String query = 
     164                "select count(*) from " + event.getClass().getSimpleName() + " where eventTime = ? and eventId =?"; 
     165        Object[] parameters = new Object[] {event.getEventTime().toDate(), event.getEventId()}; 
     166        int numberOfDuplicates = ((Integer) dataConnection.runQueryUnique(query, parameters)).intValue(); 
     167 
     168        if (numberOfDuplicates == 0) { 
     169            dataConnection.save(event); 
     170            return true; 
     171        } else { 
     172            log.error("Duplicated event found\n{}", event); 
     173            return false; 
     174        } 
     175 
     176    } 
     177 
     178    public List<Event> getEvents() { 
     179        List<Event> runQuery = dataConnection.runQuery("from Event", null); 
     180        return runQuery; 
     181    } 
     182 
     183    public void removeAllEvents() { 
     184        log.error("Method removeAllEntries not yet implemented"); 
     185    } 
     186 
     187    public void setDataConnection(RaptorDataConnection dataConnection) { 
     188        this.dataConnection = dataConnection; 
     189    } 
     190 
     191    public RaptorDataConnection getDataConnection() { 
     192        return dataConnection; 
     193    } 
     194 
     195    public void setEvents(final Set<Event> entries) { 
     196 
     197    } 
     198 
     199    public long getNumberOfEvents() { 
     200        Object result = dataConnection.runQueryUnique("select count(*) from Event", null); 
     201        return (Long) result; 
     202    } 
     203 
     204    public DateTime getLatestEventTime() { 
     205        return (DateTime) dataConnection.runQueryUnique("select max(eventTime) from Event", null); 
     206    } 
     207 
     208    // TODO Implementation may not work - PLEASE DO USE 
     209    public void removeEventsBefore(final DateTime earliestReleaseTime, final Set<Integer> latestEqualEntries) { 
     210        dataConnection.runQueryUnique("delete from Event where eventTime < ?", 
     211                new Object[] {earliestReleaseTime.toDate()}); 
     212        for (Iterator<Integer> entries = latestEqualEntries.iterator(); entries.hasNext();) { 
     213            Integer hash = entries.next(); 
     214            dataConnection.runQueryUnique("delete from Event where hashCode = ?", new Object[] {hash}); 
    216215        } 
    217216 
Note: See TracChangeset for help on using the changeset viewer.