Changeset 777


Ignore:
Timestamp:
05/10/11 19:02:32 (8 years ago)
Author:
philsmart
Message:
 
Location:
raptor-mua/trunk/src/main
Files:
6 edited

Legend:

Unmodified
Added
Removed
  • raptor-mua/trunk/src/main/config/dao-beans.xml

    r773 r777  
    2424                <property name="hibernateProperties"> 
    2525                        <props> 
    26                                  <prop key="hibernate.dialect"> org.hibernate.dialect.PostgreSQLDialect</prop>  
    27                                 <!--  <prop key="hibernate.dialect"> org.hibernate.dialect.HSQLDialect</prop> --> 
     26                                 <prop key="hibernate.dialect"> org.hibernate.dialect.PostgreSQLDialect</prop> 
     27                                 <!--<prop key="hibernate.dialect"> org.hibernate.dialect.HSQLDialect</prop>--> 
    2828                                <prop key="hibernate.show_sql">false</prop> 
    29                                 <prop key="hibernate.hbm2ddl.auto">update</prop> 
     29                                <prop key="hibernate.hbm2ddl.auto">create</prop> 
    3030 
    3131                        </props> 
     
    8080        </property> 
    8181        <property name="jdbcUrl"> 
    82             <value>jdbc:hsqldb:file:data/mua-second;hsqldb.default_table_type=cached</value> 
     82            <value>jdbc:hsqldb:file:data/mua-third;hsqldb.default_table_type=cached</value> 
    8383        </property> 
    8484        <property name="user"> 
  • raptor-mua/trunk/src/main/config/event-release.xml

    r773 r777  
    2424        http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd 
    2525        http://cxf.apache.org/jaxws http://cxf.apache.org/schemas/jaxws.xsd"> 
     26 
     27          <!--  the event release client, for sending events to other MUA's --> 
     28 
     29     <bean id="eventReleaseClient" class="uk.ac.cardiff.raptor.remoting.client.impl.EventReleaseClientImpl"> 
     30        <property name="endpointRegistry"><ref bean="endpointRegistry"/></property> 
     31        <property name="eventReleaseEngine"><ref bean="eventReleaseEngine"></ref></property> 
     32        <property name="enableEventRelease"><value>false</value></property> 
     33    </bean> 
    2634 
    2735         <!-- Configure how the client talks to the MUA, setting appropriate trust and keystore locations  --> 
  • raptor-mua/trunk/src/main/config/mua-core.xml

    r770 r777  
    9898 
    9999 
    100         <!--  the event release client, for sending events to other MUA's --> 
    101  
    102          <bean id="eventReleaseClient" class="uk.ac.cardiff.raptor.remoting.client.impl.EventReleaseClientImpl"> 
    103         <property name="endpointRegistry"><ref bean="endpointRegistry"/></property> 
    104         <property name="eventReleaseEngine"><ref bean="eventReleaseEngine"></ref></property> 
    105         <property name="enableEventRelease"><value>true</value></property> 
    106     </bean> 
    107  
    108100    <bean id="eventReleaseEngine" class="uk.ac.cardiff.raptor.registry.EventReleaseEngine"> 
    109101        <property name="attributeFilterEngine"><ref bean="attributeFilterEngine"/></property> 
  • raptor-mua/trunk/src/main/config/server.properties

    r770 r777  
    1 jetty.port=8444 
     1jetty.port=8443 
    22jetty.webapp.contextPath=/MUA 
    33 
  • raptor-mua/trunk/src/main/java/uk/ac/cardiff/raptormua/engine/MUAEngine.java

    r770 r777  
    8282        private DataAccessRegister dataAccessRegister; 
    8383 
     84        private final int MAX_RELEASE_EVENT_SET_SIZE=15; 
     85 
    8486        public MUAEngine() { 
    8587                log.info("Setup Multi-Unit Aggregator Engine..."); 
     
    135137             } 
    136138 
    137                 List<Event> eventsToSend = storageEngine.getEventsOnOrAfter(earliestReleaseTime); 
     139                List<Event> eventsToSend = storageEngine.getEventsOnOrAfter(earliestReleaseTime,MAX_RELEASE_EVENT_SET_SIZE); 
     140 
     141 
     142//              for (Event e : eventsToSend){ 
     143//                  log.debug("EventTime [{}] - {}",e.getEventTime(),e); 
     144//              } 
    138145 
    139146                boolean success = false; 
  • raptor-mua/trunk/src/main/java/uk/ac/cardiff/raptormua/service/impl/MUAProcessImpl.java

    r770 r777  
    1919package uk.ac.cardiff.raptormua.service.impl; 
    2020 
     21import java.util.ArrayList; 
    2122import java.util.List; 
    2223import java.util.concurrent.locks.Lock; 
     
    4243 
    4344/** 
    44  * All operations should go through this service class, so as to obey locks and 
    45  * synchronisation issues. Locks collisions are thrown use a 
    46  * <code>SoapFault</code>. Fault codes are: Client (if a malformed input e.g. 
    47  * statistic name is wrong) Server (we use for locks, as server side issue) 
    48  * VersionMismatch MustUnderstand 
     45 * All operations should go through this service class, so as to obey locks and synchronisation issues. Locks collisions are thrown use a <code>SoapFault</code> 
     46 * . Fault codes are: Client (if a malformed input e.g. statistic name is wrong) Server (we use for locks, as server side issue) VersionMismatch MustUnderstand 
    4947 * 
    5048 * @author philsmart 
     
    5351public class MUAProcessImpl implements MUAProcess { 
    5452 
    55         /** class logger */ 
    56         private final Logger log = LoggerFactory.getLogger(MUAProcessImpl.class); 
    57  
    58         /** main engine of the MultiUnitAggregator */ 
    59         private MUAEngine engine; 
    60  
    61         /** 
    62          * ReentrantLock to prevent more than one operation at the same time 
     53    /** class logger */ 
     54    private final Logger log = LoggerFactory.getLogger(MUAProcessImpl.class); 
     55 
     56    /** main engine of the MultiUnitAggregator */ 
     57    private MUAEngine engine; 
     58 
     59    /** 
     60     * ReentrantLock to prevent more than one operation at the same time 
     61     */ 
     62    final Lock lockR = new ReentrantLock(); 
     63 
     64    public void setEngine(MUAEngine engine) { 
     65        this.engine = engine; 
     66    } 
     67 
     68    public MUAEngine getEngine() { 
     69        return engine; 
     70    } 
     71 
     72    public AggregatorGraphModel performStatistic(String statisticName) throws SoapFault { 
     73        if (lockR.tryLock()) { 
     74            try { 
     75                log.info("WebSservice call for perform statistic {} ", statisticName); 
     76                return engine.performStatistic(statisticName); 
     77            } catch (Exception e) { 
     78                log.error("{}", e); 
     79            } finally { 
     80                lockR.unlock(); 
     81            } 
     82        } 
     83        log.warn("Lock was hit for method [performStatistic]"); 
     84        throw new SoapFault("lock was hit on method performStatistic", new QName("Server")); 
     85 
     86    } 
     87 
     88    public void release() { 
     89        if (lockR.tryLock()) { 
     90            try { 
     91                engine.release(); 
     92 
     93            } catch (Exception e) { 
     94                log.error(e.getMessage()); 
     95                e.printStackTrace(); 
     96            } finally { 
     97                lockR.unlock(); 
     98            } 
     99        } else { 
     100            log.warn("Lock was hit for method [release]"); 
     101        } 
     102    } 
     103 
     104    public Capabilities getCapabilities() { 
     105        log.info("WebSservice call for get capabilities"); 
     106        return engine.getCapabilities(); 
     107    } 
     108 
     109    @Override 
     110    public void updateStatisticalUnit(StatisticalUnitInformation statisticalUnitInformation) throws SoapFault { 
     111        boolean success = false; 
     112        if (lockR.tryLock()) { 
     113            try { 
     114                log.info("Updating statistical unit {}", statisticalUnitInformation.getStatisticParameters().getUnitName()); 
     115                engine.updateStatisticalUnit(statisticalUnitInformation); 
     116                success = true; 
     117            } catch (Exception e) { 
     118                log.error("{}", e); 
     119            } finally { 
     120                lockR.unlock(); 
     121            } 
     122        } 
     123        if (!success) { 
     124            log.warn("Lock was hit for method [updateStatisticalUnit]"); 
     125            throw new SoapFault("lock was hit on method updateStatisticalUnit", new QName("Server")); 
     126        } 
     127    } 
     128 
     129    @Override 
     130    public boolean performAdministrativeFunction(AdministrativeFunction function) throws SoapFault { 
     131        if (lockR.tryLock()) { 
     132            try { 
     133                log.info("Performing administrative function {}, request orginates from {}", function.getAdministrativeFunction(), function.getRequester()); 
     134                return engine.performAdministrativeFunction(function); 
     135            } catch (Exception e) { 
     136                log.error("{}", e); 
     137                return false; 
     138            } finally { 
     139                lockR.unlock(); 
     140 
     141            } 
     142 
     143        } 
     144        log.warn("Lock was hit for method [performAdministrativeFunction]"); 
     145        throw new SoapFault("lock was hit on method performAdministrativeFunction", new QName("Server")); 
     146    } 
     147 
     148    /** 
     149     * Because this is perform async, the lock is not useful, its the exceptions that are. 
     150     */ 
     151    @Override 
     152    public void addAuthentications(EventPushMessage pushed) throws SoapFault { 
     153        boolean success = false; 
     154        if (lockR.tryLock()) { 
     155            try { 
     156                log.info("MUA has received {} entries from {}", pushed.getEvents().size(), pushed.getClientMetadata().getServiceName()); 
     157                engine.addAuthentications(pushed); 
     158                success = true; 
     159            } catch (Exception e) { 
     160                log.error("Error trying to add authentications to this MUA", e); 
     161 
     162            } finally { 
     163                lockR.unlock(); 
     164 
     165            } 
     166        } else 
     167            log.warn("Lock was hit for method [addAuthentications]"); 
     168        if (!success) { 
     169            log.error("WARNING, technical fault, could not add events to this MUA"); 
     170            throw new SoapFault("Technical fault at the server, could not add events to MUA [" + this.getEngine().getMuaMetadata().getServiceName() + "]", new QName("Server")); 
     171        } 
     172 
     173    } 
     174 
     175    @Override 
     176    public AggregatorGraphModel updateAndInvokeStatisticalUnit(StatisticalUnitInformation statisticalUnitInformation) throws SoapFault { 
     177        if (lockR.tryLock()) { 
     178            try { 
     179                log.info("Webservice call to update and perform statistic {}", statisticalUnitInformation.getStatisticParameters().getUnitName()); 
     180                engine.updateStatisticalUnit(statisticalUnitInformation); 
     181                return engine.performStatistic(statisticalUnitInformation.getStatisticParameters().getUnitName()); 
     182            } catch (Exception e) { 
     183                log.error("{}", e); 
     184            } finally { 
     185                lockR.unlock(); 
     186            } 
     187        } 
     188        log.warn("Lock was hit for method [updateAndInvokeStatisticalUnit]"); 
     189        throw new SoapFault("lock was hit on method updateAndInvokeStatisticalUnit", new QName("Server")); 
     190 
     191    } 
     192 
     193    /** 
     194         * 
    63195         */ 
    64         final Lock lockR = new ReentrantLock(); 
    65  
    66         public void setEngine(MUAEngine engine) { 
    67                 this.engine = engine; 
    68         } 
    69  
    70         public MUAEngine getEngine() { 
    71                 return engine; 
    72         } 
    73  
    74  
    75         public AggregatorGraphModel performStatistic(String statisticName) throws SoapFault { 
    76                 if (lockR.tryLock()) { 
    77                         try { 
    78                                 log.info("WebSservice call for perform statistic {} ", statisticName); 
    79                                 return engine.performStatistic(statisticName); 
    80                         } catch (Exception e) { 
    81                                 log.error("{}",e); 
    82                         } finally { 
    83                                 lockR.unlock(); 
    84                         } 
    85                 } 
    86                 log.warn("Lock was hit for method [performStatistic]"); 
    87                 throw new SoapFault("lock was hit on method performStatistic", new QName("Server")); 
    88  
    89         } 
    90  
    91         public void release()  { 
    92             if (lockR.tryLock()) { 
    93                     try { 
    94                         engine.release(); 
    95  
    96                     } catch (Exception e) { 
    97                             log.error(e.getMessage()); 
    98                             e.printStackTrace(); 
    99                     } finally { 
    100                             lockR.unlock(); 
    101                     } 
    102             } 
    103             else{ 
    104                 log.warn("Lock was hit for method [release]"); 
    105             } 
    106        } 
    107  
    108  
    109         public Capabilities getCapabilities() { 
    110                 log.info("WebSservice call for get capabilities"); 
    111                 return engine.getCapabilities(); 
    112         } 
    113  
    114  
    115         @Override 
    116         public void updateStatisticalUnit(StatisticalUnitInformation statisticalUnitInformation) throws SoapFault { 
    117                 boolean success = false; 
    118                 if (lockR.tryLock()) { 
    119                         try { 
    120                                 log.info("Updating statistical unit {}", statisticalUnitInformation.getStatisticParameters().getUnitName()); 
    121                                 engine.updateStatisticalUnit(statisticalUnitInformation); 
    122                                 success = true; 
    123                         } catch (Exception e) { 
    124                                 log.error("{}",e); 
    125                         } finally { 
    126                                 lockR.unlock(); 
    127                         } 
    128                 } 
    129                 if (!success) { 
    130                         log.warn("Lock was hit for method [updateStatisticalUnit]"); 
    131                         throw new SoapFault("lock was hit on method updateStatisticalUnit", new QName("Server")); 
    132                 } 
    133         } 
    134  
    135  
    136         @Override 
    137         public boolean performAdministrativeFunction(AdministrativeFunction function) throws SoapFault { 
    138                 if (lockR.tryLock()) { 
    139                         try { 
    140                                 log.info("Performing administrative function {}, request orginates from {}", 
    141                                                 function.getAdministrativeFunction(), function.getRequester()); 
    142                                 return engine.performAdministrativeFunction(function); 
    143                         } catch (Exception e) { 
    144                                 log.error("{}",e); 
    145                                 return false; 
    146                         } finally { 
    147                                 lockR.unlock(); 
    148  
    149                         } 
    150  
    151                 } 
    152                 log.warn("Lock was hit for method [performAdministrativeFunction]"); 
    153                 throw new SoapFault("lock was hit on method performAdministrativeFunction", new QName("Server")); 
    154         } 
    155  
    156         /** 
    157          * Because this is perform async, the lock is not useful, its the exceptions that are. 
    158          */ 
    159         @Override 
    160         public void addAuthentications(EventPushMessage pushed) throws SoapFault{ 
    161                 boolean success = false; 
    162                 if (lockR.tryLock()) { 
    163                         try { 
    164                                 log.info("MUA has received {} entries from {}", pushed.getEvents().size(), pushed.getClientMetadata().getServiceName()); 
    165                                 engine.addAuthentications(pushed); 
    166                                 success = true; 
    167                         } catch (Exception e) { 
    168                                 log.error("Error trying to add authentications to this MUA",e); 
    169  
    170                         } finally { 
    171                                 lockR.unlock(); 
    172  
    173                         } 
    174                 } 
    175                 else 
    176                     log.warn("Lock was hit for method [addAuthentications]"); 
    177                 if (!success){ 
    178                     log.error("WARNING, technical fault, could not add events to this MUA"); 
    179                     throw new SoapFault("Technical fault at the server, could not add events to MUA ["+this.getEngine().getMuaMetadata().getServiceName()+"]", new QName("Server")); 
    180                 } 
    181  
    182  
    183         } 
    184  
    185         @Override 
    186         public AggregatorGraphModel updateAndInvokeStatisticalUnit(StatisticalUnitInformation statisticalUnitInformation) 
    187                         throws SoapFault { 
    188                 if (lockR.tryLock()) { 
    189                         try { 
    190                                 log.info("Webservice call to update and perform statistic {}", statisticalUnitInformation 
    191                                                 .getStatisticParameters().getUnitName()); 
    192                                 engine.updateStatisticalUnit(statisticalUnitInformation); 
    193                                 return engine.performStatistic(statisticalUnitInformation.getStatisticParameters().getUnitName()); 
    194                         } catch (Exception e) { 
    195                                 log.error("{}",e); 
    196                         } finally { 
    197                                 lockR.unlock(); 
    198                         } 
    199                 } 
    200                 log.warn("Lock was hit for method [updateAndInvokeStatisticalUnit]"); 
    201                 throw new SoapFault("lock was hit on method updateAndInvokeStatisticalUnit", new QName("Server")); 
    202  
    203         } 
    204  
    205         /** 
    206          * Batch upload does not employ a lock on the MUA 
    207          */ 
    208         @Override 
    209         public List<LogFileUploadResult> batchUpload(List<LogFileUpload> uploadFiles) throws SoapFault { 
    210                 log.info("Webservice call to parse {} batch log file(s)",uploadFiles.size()); 
    211  
    212                 for (LogFileUpload logfile : uploadFiles){ 
    213                         log.debug("Log File details: name [{}], MIME type [{}], Length [{}], ID [{}]",new Object[]{logfile.getName(),logfile.getMime(),logfile.getData().length, logfile.getId()}); 
    214                 } 
    215                 try{ 
    216                     return engine.batchParse(uploadFiles); 
    217                 } 
    218                 catch (TransactionInProgressException e){ 
    219                     log.error("Could not parse and store batch upload {}",e.getMessage()); 
    220                     throw new SoapFault("Could not parse and store batch upload "+e.getMessage(),new QName("Server")); 
    221                 } 
    222  
    223         } 
     196    @Override 
     197    public List<LogFileUploadResult> batchUpload(List<LogFileUpload> uploadFiles) throws SoapFault { 
     198        List<LogFileUploadResult> result= new ArrayList<LogFileUploadResult>(); 
     199        boolean success = false; 
     200        if (lockR.tryLock()) { 
     201            try { 
     202                log.info("Webservice call to parse {} batch log file(s)", uploadFiles.size()); 
     203 
     204                for (LogFileUpload logfile : uploadFiles) { 
     205                    log.debug("Log File details: name [{}], MIME type [{}], Length [{}], ID [{}]", new Object[] { logfile.getName(), logfile.getMime(), logfile.getData().length, logfile.getId() }); 
     206                } 
     207                result = engine.batchParse(uploadFiles); 
     208                success = true; 
     209            } catch (TransactionInProgressException e) { 
     210                log.error("Could not parse and store batch upload {}", e.getMessage()); 
     211 
     212            } finally { 
     213                lockR.unlock(); 
     214            } 
     215        } 
     216        else{ 
     217            log.warn("Lock was hit for method [batchUpload]"); 
     218            throw new SoapFault("lock was hit on method batchUpload", new QName("Server")); 
     219        } 
     220        if (!success){ 
     221            throw new SoapFault("Could not parse and store batch upload", new QName("Server")); 
     222        } 
     223        return result; 
     224 
     225 
     226    } 
    224227 
    225228} 
Note: See TracChangeset for help on using the changeset viewer.