001    package org.LiveGraph.dataCache;
002    
003    import java.io.File;
004    import java.io.FileInputStream;
005    import java.io.FileNotFoundException;
006    import java.io.IOException;
007    import java.util.ArrayList;
008    import java.util.List;
009    
010    import org.LiveGraph.settings.DataFileSettings;
011    import org.LiveGraph.settings.ErrorWhileSettingHasChangedProcessingException;
012    import org.LiveGraph.settings.ObservableSettings;
013    import org.LiveGraph.settings.SettingsObserver;
014    
015    
016    /**
017     * An object of this class is used to triger updates from a data input stream
018     * into a {@link DataCache} at regular intervals.
019     * 
020     * <p style="font-size:smaller;">This product includes software developed by the
021     *    <strong>LiveGraph</strong> project and its contributors.<br />
022     *    (<a href="http://www.live-graph.org" target="_blank">http://www.live-graph.org</a>)<br />
023     *    Copyright (c) 2007 G. Paperin.<br />
024     *    All rights reserved.
025     * </p>
026     * <p style="font-size:smaller;">File: UpdateInvoker.java</p> 
027     * <p style="font-size:smaller;">Redistribution and use in source and binary forms, with or
028     *    without modification, are permitted provided that the following terms and conditions are met:
029     * </p>
030     * <p style="font-size:smaller;">1. Redistributions of source code must retain the above
031     *    acknowledgement of the LiveGraph project and its web-site, the above copyright notice,
032     *    this list of conditions and the following disclaimer.<br />
033     *    2. Redistributions in binary form must reproduce the above acknowledgement of the
034     *    LiveGraph project and its web-site, the above copyright notice, this list of conditions
035     *    and the following disclaimer in the documentation and/or other materials provided with
036     *    the distribution.<br />
037     *    3. All advertising materials mentioning features or use of this software or any derived
038     *    software must display the following acknowledgement:<br />
039     *    <em>This product includes software developed by the LiveGraph project and its
040     *    contributors.<br />(http://www.live-graph.org)</em><br />
041     *    4. All advertising materials distributed in form of HTML pages or any other technology
042     *    permitting active hyper-links that mention features or use of this software or any
043     *    derived software must display the acknowledgment specified in condition 3 of this
044     *    agreement, and in addition, include a visible and working hyper-link to the LiveGraph
045     *    homepage (http://www.live-graph.org).
046     * </p>
047     * <p style="font-size:smaller;">THIS SOFTWARE IS PROVIDED &quot;AS IS&quot;, WITHOUT WARRANTY
048     *    OF ANY KIND, EXPRESSED OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
049     *    MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND  NONINFRINGEMENT. IN NO EVENT SHALL
050     *    THE AUTHORS, CONTRIBUTORS OR COPYRIGHT  HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
051     *    LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING  FROM, OUT OF OR
052     *    IN CONNECTION WITH THE SOFTWARE OR THE USE OR  OTHER DEALINGS IN THE SOFTWARE.
053     * </p>
054     * 
055     * @author Greg Paperin (<a href="http://www.paperin.org" target="_blank">http://www.paperin.org</a>)
056     * @version {@value org.LiveGraph.LiveGraph#version}
057     */
058    public class UpdateInvoker implements Runnable, SettingsObserver {
059    
060    /**
061     * How long to sleep for when updates are to be invoked automatically.
062     */
063    private static final long defaultTimeTickLength = 500; // milliseconds 
064    
065    /**
066     * How long to sleep for when updates are to be invoked manually.
067     */
068    private static final long restingTimeTickLength = 2000; // milliseconds
069    
070    
071    /**
072     * The data reader for the input stream.
073     */
074    private DataStreamToCacheReader dataReader = null;
075    
076    /**
077     * Cache to hold the data.
078     */
079    private DataCache dataCache = null;
080    
081    /**
082     * Data file from which to update.
083     */
084    private File dataFile = null;
085    
086    /**
087     * Whether cache must be reset before each read.
088     */
089    private boolean dontCache = false;
090    
091    
092    /**
093     * Interval between updates in milliseconds.
094     */
095    private long interval = -1;
096    
097    /**
098     * Whether an update is currently running. 
099     */
100    private boolean updateInProgress = false;
101    
102    /**
103     * Whether the invoker thread should wind up at the next possibility.
104     */
105    private boolean mustQuit = false;
106    
107    /**
108     * The sleep length betweek ticks at which this invoker can react to events and send
109     * mesages.
110     */
111    private long timeTickLength = restingTimeTickLength;
112    
113    /**
114     * Remaining milliseconds till the next update.
115     */
116    private long remainingMillis = -1;
117    
118    /**
119     * System milliseconds at last update.
120     */
121    private long lastUpdateTime = 0;
122    
123    /**
124     * Milliseconds since last update.
125     */
126    private long sinceUpdateTime = 0;
127    
128    /**
129     * Thread actually performing the update.
130     */
131    private Thread fileReadingThread = null;
132    
133    
134    /**
135     * List of observers.
136     */
137    private List<UpdateInvokerObserver> observers = null;
138    
139    /**
140     * Constructs a new invoker.
141     */
142    public UpdateInvoker() {
143            dataReader = null;
144            dataCache = null;
145            dataFile = null;
146            dontCache = false;
147            
148            interval = -1;
149            updateInProgress = false;
150            mustQuit = false;
151            timeTickLength = restingTimeTickLength;
152            remainingMillis = -1;
153            lastUpdateTime = 0;
154            sinceUpdateTime = 0;
155            fileReadingThread = null;
156            
157            observers = new ArrayList<UpdateInvokerObserver>();
158    }
159    
160    /**
161     * Sets the file from which the next update will be read and resets the data cache.
162     * 
163     * @param file File from which to read the data from now on.
164     * @throws FileNotFoundException If no such file exists.
165     * @throws IllegalStateException If no valid data cache is set.
166     */
167    public synchronized void setDataFile(File file) throws FileNotFoundException {
168            if (null == file)
169                    throw new NullPointerException("Cannot read data from a null file");
170            
171            if (null == dataCache)
172                    throw new IllegalStateException("Must set dataCache before setting an input stream");
173            
174            dataFile = file;
175            createDataReader();     
176    }
177    
178    /**
179     * Sets the file from which the next update will be read and resets the data cache.
180     * 
181     * @param fileName File from which to read the data from now on.
182     * @throws FileNotFoundException If no such file exists.
183     * @throws IllegalStateException If no valid data cache is set.
184     */
185    public synchronized void setDataFile(String fileName) throws FileNotFoundException {
186            
187            if (null == fileName)
188                    throw new NullPointerException("Cannot read data from a null file name");
189            
190            setDataFile(new File(fileName));
191    }
192    
193    /**
194     * Sets whether the cache should be reset before each update.
195     * @param state Indicates whether the cache should be reset before each update.
196     */
197    public synchronized void setDontCacheData(boolean state) {
198            dontCache = state;
199    }
200    
201    /**
202     * Specifies the data cache to which the data read on the next update will be written.
203     * 
204     * @param cache The data cache to use from now on. 
205     */
206    public synchronized void setDataCache(DataCache cache) {
207            if (null == cache)
208                    throw new NullPointerException("Cannot read data into a null cache");
209            dataCache = cache;
210    }
211    
212    /**
213     * Sets the length of the interval between automatic data updates in milliseconds.
214     * If {@code interval <= 0} the update will not be triggered automatically.
215     * 
216     * @param interval The length of the interval between automatic data updates in milliseconds
217     * (if {@code interval <= 0} the update will not be triggered automatically).
218     */
219    public synchronized void setInterval(long interval) {
220            if (this.interval <= 0 && interval > 0)
221                    this.lastUpdateTime = 0;
222            this.interval = interval;
223            this.timeTickLength = interval <= 0 ? restingTimeTickLength : defaultTimeTickLength;
224    }
225    
226    /**
227     * Creates a reader on the currently set data input file and resets the cache.
228     * 
229     * @throws FileNotFoundException If the input file does not exist.
230     */
231    private synchronized void createDataReader() throws FileNotFoundException {
232            
233            if (null == dataFile)
234                    return;
235            
236            if (0 == dataFile.getPath().length())
237                    return;
238            
239            if (!dataFile.exists())
240                    throw new FileNotFoundException("Data source file does not exist (" + dataFile.getPath() + ")");
241            
242            if (null != dataReader)
243                    closeReader();
244            
245            dataCache.resetCache();
246            FileInputStream ins = new FileInputStream(dataFile);    
247            dataReader = new DataStreamToCacheReader(ins, dataCache);
248    }
249    
250    /**
251     * The length of the interval between data updates.
252     * If {@code interval <= 0} the update will not be triggered automatically.
253     * @return The length of the interval between automatic data updates in milliseconds;
254     * a value {@code interval <= 0} indicated that no updates will be triggered automatically.
255     */
256    public long getInterval() {
257            return interval;
258    }
259    
260    /**
261     * Executes the next update from the current input file into the current data cache.
262     * 
263     * @return {@code true} if the update was started successfully, {@code false} if the
264     * update could not be started (possibly, because a previous update was still in progress).
265     */
266    public synchronized boolean update() {
267            
268            if (isUpdateInProgress())
269                    return false;
270            
271            setUpdateInProgress(true);
272            notifyObserversUpdateStarted();
273            
274            fileReadingThread = new Thread(new Runnable() {
275                    public void run() {
276                            
277                            String errorMsg = null;
278                            try {
279                                    
280                                    if (dontCache || null == dataReader)
281                                            createDataReader();
282                                    
283                                    if (null == dataReader) {
284                                            notifyObserversUpdateFinished("Data source not set or data source not valid");
285                                            lastUpdateTime = System.currentTimeMillis();
286                                            setUpdateInProgress(false);
287                                            return;
288                                    }
289                                            
290                                    dataCache.startDelayEvents();
291                                    dataReader.readFromStream();
292                                    dataCache.fireDelayedEvents();
293                                    
294                                    if (dontCache)
295                                            closeReader();
296                                    
297                            } catch(Exception e) {                          
298                                    errorMsg = e.getMessage() + "(" + e.getClass().getSimpleName() + ")";
299                            }
300                            
301                            Thread.yield();
302                            notifyObserversUpdateFinished(errorMsg);
303                            lastUpdateTime = System.currentTimeMillis();
304                            setUpdateInProgress(false);
305                    }
306            }, "File reading thread");
307            
308            fileReadingThread.start();
309            
310            return true;
311    }
312    
313    /**
314     * Add an observer to this invoker.
315     * 
316     * @param observer Observer to add.
317     * @return Whether the observer was really added because it was not there yet.
318     */
319    public boolean addObserver(UpdateInvokerObserver observer) {
320            if (hasObserver(observer))
321                    return false;
322            return observers.add(observer);
323    }
324    
325    /**
326     * Check for the specified observer.
327     * 
328     * @param observer An observer.
329     * @return Whether the specified observer is in the observer list.
330     */
331    public boolean hasObserver(UpdateInvokerObserver observer) {
332            return observers.contains(observer);    
333    }
334    
335    /**
336     * Removes an observer.
337     * 
338     * @param observer An observer.
339     * @return Whether the observer was successfully removed.
340     */
341    public boolean removeObserver(UpdateInvokerObserver observer) {
342            return observers.remove(observer);
343    }
344    
345    /**
346     * Count observers.
347     * 
348     * @return Current number of observers.
349     */
350    public int countObervers() {
351            return observers.size();
352    }
353    
354    /**
355     * Notifies this invoker's observers that this observer has waken up to process events.
356     * This gives listeners displaying various information about this invoker a chance to
357     * update their state.
358     */
359    public void notifyObserversTimerTick() {
360            for (UpdateInvokerObserver observer : observers)
361                    observer.timerTick(this);
362    }
363    
364    /**
365     * Notifies this invoker's observers that an update was triggered.
366     */
367    public void notifyObserversUpdateStarted() {
368            for (UpdateInvokerObserver observer : observers)
369                    observer.updateStarted(this);
370    }
371    
372    /**
373     * Notifies this invoker's observers that an update has finished.
374     *  
375     * @param errorMsg Describes any problems during the update. If there were no problems
376     * {@code errorMsg} will be {@code null}.
377     */
378    public void notifyObserversUpdateFinished(String errorMsg) {
379            for (UpdateInvokerObserver observer : observers)
380                    observer.updateFinished(this, errorMsg);
381    }
382    
383    /**
384     * Used to notify this invoker that is must stop running at the next possibility.
385     * 
386     * @param val Whether this invoker should stop running at the next possibility.
387     */
388    public synchronized void setMustQuit(boolean val) {
389            this.mustQuit = val;
390    }
391    
392    /**
393     * Time to next update.
394     * 
395     * @return Number of milliseconds left until the next update.
396     */
397    public long getRemainingMillis() {
398            return remainingMillis;
399    }
400    
401    /**
402     * Whether an update is currently running.
403     * @return Whether an update is currently running.
404     */
405    public boolean isUpdateInProgress() {
406            return updateInProgress;
407    }
408    
409    /**
410     * Sets the internal {@code updateInProgress} state.
411     * @param state The new state.
412     */
413    private synchronized void setUpdateInProgress(boolean state) {
414            updateInProgress = state;
415    }
416    
417    /**
418     * Closes the current data reader.
419     */
420    private void closeReader() {
421            try {
422                    dataReader.close();
423            } catch (IOException e) {
424                    e.printStackTrace();
425            }
426            dataReader = null;
427    }
428    
429    /**
430     * Winds up the operations by closing the current data reader.
431     */
432    private void tidyUp() {
433            if (null != dataReader) {
434                    closeReader();
435            }
436    }
437    
438    /**
439     * Send the this invoker to sleep for {@code timeTickLength} milliseconds.
440     * When it wakes it, internal time state is updated an the observers notified.
441     */
442    private void timeTick() {
443            Thread.yield();
444            try { Thread.sleep(timeTickLength); } catch (InterruptedException e) {}
445            
446            sinceUpdateTime = System.currentTimeMillis() - lastUpdateTime;
447            remainingMillis = interval <= 0 ? -1 : Math.max(0, interval - sinceUpdateTime);
448            
449            notifyObserversTimerTick();
450            Thread.yield();
451    }
452    
453    /**
454     * Main invoker loop:
455     * call {@link #timeTick()};
456     * if it is time for the next update, call {@link #update()};
457     * call {@link #timeTick()} again and continue the loop until {@link #mustQuit} is set to true;
458     * call {@link #tidyUp()} before quitting. 
459     */
460    public void run() {
461                    
462            while (!mustQuit) {
463                    
464                    timeTick();
465                    
466                    if (sinceUpdateTime >= interval && interval > 0)
467                            update();
468            }
469            
470            tidyUp();
471    }
472    
473    /**
474     * Dispatches settings change events.
475     */
476    public void settingHasChanged(ObservableSettings settings, Object info) {
477            
478            if (null == info || null == settings)
479                    return;
480            
481            if ((settings instanceof DataFileSettings) && (info instanceof String)) {
482                    settingHasChanged((DataFileSettings) settings, (String) info);
483                    return;
484            }
485    }
486    
487    /**
488     * When the application's data file settings change which method is called in order
489     * to update the internal state accordingly.
490     * 
491     * @param settings Application's data file settings.
492     * @param info Describes the change event.
493     */
494    public void settingHasChanged(DataFileSettings settings, String info) {
495            
496            if (null == info || null == settings)
497                    return;
498            
499            if (info.equals("DataFile")) {
500                    DataFileSettings_DataFile(settings);
501                    return;
502            }
503            
504            if (info.equals("UpdateFrequency")) {
505                    DataFileSettings_UpdateFrequency(settings);
506                    return;
507            }
508            
509            if (info.equals("DoNotCacheData")) {
510                    DataFileSettings_DoNotCacheData(settings);
511                    return;
512            }
513            
514            if (info.equals("ShowOnlyTailData")) {
515                    DataFileSettings_ShowOnlyTailData(settings);            
516                    return;
517            }
518            
519            if (info.equals("load")) {
520                    ErrorWhileSettingHasChangedProcessingException ex = null;
521                    
522                    try {
523                            DataFileSettings_DataFile(settings);
524                    } catch (ErrorWhileSettingHasChangedProcessingException e) {
525                            if (null == ex) ex = e;
526                    }
527                    
528                    try {
529                            DataFileSettings_UpdateFrequency(settings);
530                    } catch (ErrorWhileSettingHasChangedProcessingException e) {
531                            if (null == ex) ex = e;
532                    }
533                    
534                    try {
535                            DataFileSettings_DoNotCacheData(settings);
536                    } catch (ErrorWhileSettingHasChangedProcessingException e) {
537                            if (null == ex) ex = e;
538                    }
539                    
540                    try {
541                            DataFileSettings_ShowOnlyTailData(settings);
542                    } catch (ErrorWhileSettingHasChangedProcessingException e) {
543                            if (null == ex) ex = e;
544                    }
545                    
546                    if (null != ex) {
547                            ex.setDontBreakObserverNotification(true);
548                            throw ex;
549                    }
550                    
551                    return;
552            }
553    }
554    
555    /**
556     * Adjusts the file used by this invoker in response to a corresponding change in the
557     * data file settings.
558     * 
559     * @param settings The data file settigs.
560     */
561    private void DataFileSettings_DataFile(DataFileSettings settings) {
562            try {
563                    setDataFile(settings.getDataFile());
564                    update();
565            } catch (FileNotFoundException e1) {
566                    throw new ErrorWhileSettingHasChangedProcessingException(e1);
567            } catch (NullPointerException e2) {
568                    throw new ErrorWhileSettingHasChangedProcessingException(e2);                   
569            }
570    }
571    
572    /**
573     * Adjusts the update frequency used by this invoker in response to a corresponding change in the
574     * data file settings.
575     * 
576     * @param settings The data file settigs.
577     */
578    private void DataFileSettings_UpdateFrequency(DataFileSettings settings) {
579            setInterval(settings.getUpdateFrequency());
580    }
581    
582    /**
583     * Adjusts the cache resetting option used by this invoker in response to a corresponding change in the
584     * data file settings.
585     * 
586     * @param settings The data file settigs.
587     */
588    private void DataFileSettings_DoNotCacheData(DataFileSettings settings) {
589            setDontCacheData(settings.getDoNotCacheData());
590    }
591    
592    /**
593     * Adjusts the cache mode used by this invoker in response to a corresponding change in the
594     * data file settings.
595     * 
596     * @param settings The data file settigs.
597     */
598    private void DataFileSettings_ShowOnlyTailData(DataFileSettings settings) {
599            if (null == dataCache)
600                    return;
601            
602            boolean onlyTale = settings.getShowOnlyTailData();
603            
604            if (dataCache.getCacheMode() == DataCache.CacheMode.CacheAllData && !onlyTale)
605                    return;
606            
607            if (dataCache.getCacheMode() == DataCache.CacheMode.CacheTailData && onlyTale)
608                    return;
609            
610            dataCache.resetData(onlyTale ? DataCache.CacheMode.CacheTailData : DataCache.CacheMode.CacheAllData);
611            try {
612                    createDataReader();
613                    update();
614            } catch (FileNotFoundException e) {
615                    throw new ErrorWhileSettingHasChangedProcessingException(e);
616            }
617    }
618    
619    }