Wednesday, May 18, 2005

ComboTimer from Java to C++

A few weeks back I started work of a port from Java to C++ of the ObjectRepository. The ObjectRepository (OR) is a load-balancing NameService like CORBA service that OCI developed for a client a couple of years ago. The client open-sourced it and I decided to create a C++ version of it. One of the first classes that I ported was the ComboTimer. Justin wrote the original Java version, so the design is his. He also made some good suggestions for improvements that I incorporated into the C++ version below.

I welcome comments about the implementation, specifically about any threading issues.

I will attempt to post more about the ObjectRepository in the future.

== File ComboTimer.java ==

/*
* Created on Nov 21, 2003
* Copyright 2003 Object Computing, Inc.
*/
package ObjectRepository;

import java.util.Timer;
import java.util.TimerTask;

/**
* This timer class allows scheduling of two related timeouts, one short and
* one long. The second timeout is used as a maximum. The first call to
* reset() schedules both timeouts, and subsequent calls to reset() will
* cancel the short timeout and reschedule it.
*
* If either timeout is <= 0, it is disabled.
* The max timeout must be >= the short timeout.
* Example:
* ComboTimer ct = new ComboTimer(cb, 50, 5000);
* This creates a timer that will callback 50ms after the first call to
* reset(). If, for example, reset() is called every 40ms, the timer will
* callback 5s after the first call to reset().
* It's expected that this timer will be used to allow files to be written
* during a period of inactivity, but with a maximum age to prevent much
* data loss in the event of a crash.
*
* Any call to save() or cancel() will stop both timers.
*
* Internally a separate thread is used for the timers.
*/
public final class ComboTimer {

/**
* Implement this interface to handle the timer event. There is no way to
* distinguish which timeout occurred.
*/
public interface Callback {
public void run();
}

private final class Timeout extends TimerTask {
public void run() {
ComboTimer.this.cancel();
if (cb_ != null) {
cb_.run();
}
}
}

// Setting false in the Timer constructor means that
// the timer thread will prevent the main application from
// exiting. This is typically desired, but can be a problem
// if the application using the combotimer doesn't remember
// to cancel() it before exiting.
private Timer tmr_ = new Timer(true);
private final Callback cb_;
private Timeout to_ = new Timeout();
private Timeout max_ = new Timeout();
private int ms_ = 0;
private int max_ms_ = 0;
private boolean is_set_ = false;

/**
* Schedule the Callback in ms milliseconds, with a maximum timeout
* of max_ms milliseconds.
* Specify 0 for either timer to disable that timer.
*/
public ComboTimer(Callback cb, int ms, int max_ms) {
if (cb == null) {
throw new IllegalArgumentException("Callback == null is not allowed.");
}
cb_ = cb;
ms_ = ms;
max_ms_ = max_ms;
is_set_ = false;
if (ms_ < 0) {
ms_ = 0;
}
if (max_ms_ > 0 && max_ms_ < ms_) {
max_ms_ = ms_;
ms_ = 0;
}
}
/**
* The first time this method is called, both timeouts (if > 0) are
* scheduled. Each subsequent call resets the short timeout.
* Once a timeout event occurs the state of the ComboTimer resets, and
* the next call to this method will act as the first.
*/
public synchronized void reset() {
if (tmr_ == null) {
return;
}
if (ms_ > 0) {
to_.cancel();
to_ = new Timeout();
tmr_.schedule(to_, ms_);
}
if (!is_set_) {
is_set_ = true;
if (max_ms_ > 0) {
max_.cancel();
max_ = new Timeout();
tmr_.schedule(max_, max_ms_);
}
}
}
/**
* Cancel both timeouts, resetting the ComboTimer to its initial state.
*/
public synchronized void cancel() {
if (tmr_ == null) {
return;
}
is_set_ = false;
to_.cancel();
max_.cancel();
}
/**
* Cancel both timeouts, and kill the timer thread. Once this method
* is called, the ComboTimer is no longer useable.
*/
public synchronized void close() {
if (tmr_ == null) {
return;
}
tmr_.cancel();
tmr_ = null;
}
/**
* Check to see if the timeout(s) have been scheduled.
*/
public synchronized boolean isSet() {
return is_set_;
}
}



== File ComboTimer.h ==



// Copyright (c) 2005 by Kevin Heifner. heifner at ociweb dot com.
// Permission is granted to use this code without restriction
// so long as this copyright notice appears in all source files.

#ifndef OR_COMBOTIMER_H
#define OR_COMBOTIMER_H

#include "OR_Export.h"
#include "ace/Timer_Queue.h"
#include "ace/Reactor.h"
#include "ace/Thread_Manager.h"
#include "boost/function.hpp"

namespace OR {

/**
* This timer class allows scheduling of two related timeouts, one short and
* one long. The second timeout is used as a maximum. The first call to
* reset() schedules both timeouts, and subsequent calls to reset() will
* cancel the short timeout and reschedule it.
*
* If either timeout is == 0, it is disabled.
* The max timeout must be >= the short timeout.
* Example:
* ComboTimer ct(cb, ACE_Time_Value(10), ACE_Time_Value(60));
* This creates a timer that will callback 10sec after the first call to
* reset(). If, for example, reset() is called every 5secs, the timer will
* callback 60secs after the first call to reset().
*
* cancel() will stop both timers.
*
* Provide a Callback function object. There is no way to
* distinguish which timeout occurred, although that could be easily
* added.
*
* Internally a separate thread is used for the timers.
*/
class OR_Export ComboTimer {
public:

// pointer to function of form: void foo();
typedef boost::function Callback_t;

/**
* Schedule the Callback.
* Specify 0 for either timer to disable that timer.
* Spawns separate thread, but timer does not start until
* reset() is called.
* @throw std::invalid_argument if maxTime < shortTime.
*/
ComboTimer(
Callback_t callback,
const ACE_Time_Value& shortTime,
const ACE_Time_Value& maxTime = ACE_Time_Value(0)
);

/**
* Calls fini()
*/
~ComboTimer();

/**
* The first time this method is called, both timeouts (if > 0) are
* scheduled. Each subsequent call resets the short timeout.
* Once a timeout event occurs the state of the ComboTimer resets, and
* the next call to this method will act as the first.
*/
void reset();

/**
* Cancel both timeouts, resetting the ComboTimer to its initial state.
*/
void cancel();

/**
* Cancel both timeouts, and kill the timer thread. Once this method
* is called, the ComboTimer is no longer useable.
*/
void fini();

/**
* Check to see if the timeout(s) have been scheduled.
*/
bool scheduled() const;

private:
// Calls run()
static ACE_THR_FUNC_RETURN thr_func(void* arg);
// The worker thread's main loop.
void run();

friend class TimerHandler;
class TimerHandler : public ACE_Event_Handler {
public:
explicit TimerHandler(ComboTimer& ct)
: ct_(ct)
{}
// Method which is called back by the Reactor when timeout occurs.
virtual int handle_timeout(const ACE_Time_Value& tv, const void* arg);
private:
ComboTimer& ct_;
};

typedef ACE_Guard TGuard;

private:
ACE_Reactor reactor_;
Callback_t cb_;
const ACE_Time_Value shortTime_;
const ACE_Time_Value maxTime_;
long shortTimerId_;
long maxTimerId_;
int threadGroupId_;
mutable ACE_Thread_Mutex mtx_;
// reactor_ has to live longer than timerHandler_
// because the timerHandler_ destructor calls back
// on the reactor_.
TimerHandler timerHandler_;

};

} // namespace OR

#endif // OR_COMBOTIMER_H



== File ComboTimer.cpp ==



// Copyright (c) 2005 by Kevin Heifner. heifner at ociweb dot com.
// Permission is granted to use this code without restriction
// so long as this copyright notice appears in all source files.

#include "ObjectRepository_pch.h"

#include "ComboTimer.h"
#include


OR::ComboTimer::ComboTimer(
Callback_t cb,
const ACE_Time_Value& shortTime,
const ACE_Time_Value& maxTime
)
: reactor_()
, cb_(cb)
, shortTime_(shortTime)
, maxTime_(maxTime)
, shortTimerId_(-1)
, maxTimerId_(-1)
, threadGroupId_(-1)
, mtx_()
, timerHandler_(*this)
{
if (maxTime_.msec() > 0 && maxTime_ < shortTime_) {
throw std::invalid_argument("maxTime < shortTime");
}

TGuard g(mtx_);
threadGroupId_ = ACE_Thread_Manager::instance()->spawn(thr_func, this);
}


OR::ComboTimer::~ComboTimer()
{
fini();
}


void
OR::ComboTimer::fini()
{
mtx_.acquire();
long shortTimerId = shortTimerId_;
long maxTimerId = maxTimerId_;
int threadGroupId = threadGroupId_;
mtx_.release();

reactor_.cancel_timer(shortTimerId);
reactor_.cancel_timer(maxTimerId);
// This should not be necessary because the reactor_
// is destroyed after timerHandler_.
timerHandler_.reactor(0);
// If this is called before run_reactor_event_loop(), which
// is possible if you create and then quickly destroy, then
// end_reactor_event_loop() deactivates the reactor so that
// run_reactor_event_loop() just returns, which is what we want.
reactor_.end_reactor_event_loop();
// Wait for the worker thread to exit.
// Using wait_grp because it does not interact with ACE_Object_Manager.
// We want to wait even if we are at program shutdown.
ACE_Thread_Manager::instance()->wait_grp(threadGroupId);

mtx_.acquire();
maxTimerId_ = shortTimerId_ = threadGroupId_ = -1;
mtx_.release();
}


ACE_THR_FUNC_RETURN
OR::ComboTimer::thr_func(void* arg)
{
try {
OR::ComboTimer& worker = *static_cast(arg);
worker.run();
} catch (std::exception&) {
// todo: log exception
}
return 0;
}


void
OR::ComboTimer::run()
{
// The reactor checks to see that it is the owner thread when calling
// the handle_timeout so we must tell it we are the owning thread.
reactor_.owner(ACE_Thread::self());
// Don't call reactor_.reset_reactor_event_loop() because
// if end_reactor_event_looop is called first then we
// don't want to run.
reactor_.run_reactor_event_loop();
}


int
OR::ComboTimer::TimerHandler::handle_timeout(const ACE_Time_Value& /*tv*/, const void* /*arg*/)
{
// Make sure user exception does not cause us to die.
try {
ct_.cancel();
ct_.cb_();
} catch (std::exception&) {
// todo: need to log, or somehow handle exception
}
return 0;
}


void
OR::ComboTimer::reset()
{
mtx_.acquire();
long shortTimerId = shortTimerId_;
long maxTimerId = maxTimerId_;
mtx_.release();

bool scheduled = maxTimerId != -1;

if (shortTime_.msec() > 0) {
reactor_.cancel_timer(shortTimerId);
shortTimerId =
reactor_.schedule_timer(
&timerHandler_, // ACE_Event_Handler
0, // argument sent to handle_timeout()
shortTime_ // set timer to go off with delay
);
}
if (!scheduled) {
if (maxTime_.msec() > 0) {
reactor_.cancel_timer(maxTimerId);
maxTimerId =
reactor_.schedule_timer(
&timerHandler_, // ACE_Event_Handler
0, // argument sent to handle_timeout()
maxTime_ // set timer to go off with delay
);
}
}

mtx_.acquire();
shortTimerId_ = shortTimerId;
maxTimerId_ = maxTimerId;
mtx_.release();
}


void
OR::ComboTimer::cancel()
{
mtx_.acquire();
long shortTimerId = shortTimerId_;
long maxTimerId = maxTimerId_;
mtx_.release();

reactor_.cancel_timer(shortTimerId);
reactor_.cancel_timer(maxTimerId);

mtx_.acquire();
maxTimerId_ = shortTimerId_ = -1;
mtx_.release();
}


bool
OR::ComboTimer::scheduled() const
{
TGuard g(mtx_);
return maxTimerId_ != -1;
}



== Unit test: ComboTimer_UT.cpp ==


#include "ut_pch.h"

#include "unittester.h"
#include "ObjectRepository/ComboTimer.h"
#include "boost/bind.hpp"
#include

namespace {

int callbackCalled = 0;

struct Callback {
void operator()() {
++callbackCalled;
}
};

struct CTCallback {
CTCallback() : called(0) {}
void operator()() {
++called;
}
int called;
};
}

namespace UnitTest {


class ComboTimer_UT : public cppunit::Test {
private:
static bool register_tests() {
add(&ComboTimer_UT::testStartStop, "ComboTimer_UT::testStartStop");
add(&ComboTimer_UT::testShortTimer, "ComboTimer_UT::testShortTimer");
add(&ComboTimer_UT::testMaxTimer, "ComboTimer_UT::testMaxTimer");
add(&ComboTimer_UT::testCancel, "ComboTimer_UT::testCancel");
add(&ComboTimer_UT::testException, "ComboTimer_UT::testException");
add(&ComboTimer_UT::testCallback, "ComboTimer_UT::testCallback");
return true;
}
static bool tests_registered;

public:

virtual void setup() {
callbackCalled = 0;
}
virtual void teardown() {}

void testStartStop() {
Callback cb;
{
OR::ComboTimer ct(cb, ACE_Time_Value(0,1), ACE_Time_Value(0,2));
assertEqual(callbackCalled, 0);
}
{
OR::ComboTimer ct(cb, ACE_Time_Value(0,1), ACE_Time_Value(0,2));
ct.fini();
assertEqual(callbackCalled, 0);
}
{
OR::ComboTimer ct(cb, ACE_Time_Value(0,1));
assertEqual(callbackCalled, 0);
}
{
OR::ComboTimer ct(cb, ACE_Time_Value(0,0));
assertEqual(callbackCalled, 0);
}
{
OR::ComboTimer ct(cb, ACE_Time_Value(0,1), ACE_Time_Value(0,2));
ct.fini();
assertEqual(callbackCalled, 0);
}
{
OR::ComboTimer ct(cb, ACE_Time_Value(2), ACE_Time_Value(20));
ct.reset();
assertEqual(callbackCalled, 0);
}
{
OR::ComboTimer ct(cb, ACE_Time_Value(2), ACE_Time_Value(20));
ct.reset();
ct.reset();
assertEqual(callbackCalled, 0);
}
{
OR::ComboTimer ct(cb, ACE_Time_Value(2), ACE_Time_Value(20));
ct.cancel();
ct.reset();
assertEqual(callbackCalled, 0);
}
{
OR::ComboTimer ct(cb, ACE_Time_Value(2), ACE_Time_Value(20));
ct.reset();
ct.cancel();
ct.fini();
ct.fini();
assertEqual(callbackCalled, 0);
}
}

void testShortTimer() {
Callback cb;
{
OR::ComboTimer ct(cb, ACE_Time_Value(0,1000), ACE_Time_Value(20));
ct.reset();
bool reset = false;
for (int i = 0; i < 1000; ++i) {
if (callbackCalled == 1 && !reset) {
ct.reset();
reset = true;
}
if (callbackCalled == 2) break;
ACE_OS::thr_yield();
ACE_OS::sleep(ACE_Time_Value(0,1000));
}
assertEqual(callbackCalled, 2);
}
}

void testMaxTimer() {
Callback cb;
{
OR::ComboTimer ct(cb, ACE_Time_Value(0,3000), ACE_Time_Value(0,5000));
for (int i = 0; i < 1000; ++i) {
ct.reset();
ACE_OS::sleep(ACE_Time_Value(0,1000));
ct.reset();
if (callbackCalled > 0) break;
}
assertTrue(callbackCalled > 0);
}
}

void testCancel() {
Callback cb;
{
OR::ComboTimer ct(cb, ACE_Time_Value(0,1000), ACE_Time_Value(0,2000));
ct.reset();
ct.cancel();
ACE_OS::thr_yield();
ACE_OS::sleep(ACE_Time_Value(0,5000));
assertEqual(callbackCalled, 0);

ct.reset();
for (int i = 0; i < 1000; ++i) {
ACE_OS::thr_yield();
ACE_OS::sleep(ACE_Time_Value(0,1000));
if (callbackCalled > 0) break;
}
assertTrue(callbackCalled > 0);
}
}

void testException() {
Callback cb;
try {
OR::ComboTimer ct(cb, ACE_Time_Value(3), ACE_Time_Value(2));
failTest("expected exception");
} catch (std::invalid_argument&) {
}
}

void testCallback() {
CTCallback cb;
OR::ComboTimer ct(boost::ref(cb), ACE_Time_Value(0,1000), ACE_Time_Value(0,2000));
ct.reset();
for (int i = 0; i < 1000; ++i) {
ACE_OS::thr_yield();
ACE_OS::sleep(ACE_Time_Value(0,1000));
if (cb.called > 0) break;
}
assertTrue(cb.called > 0);
}

};

bool ComboTimer_UT::tests_registered = ComboTimer_UT::register_tests();

} // namespace UnitTest

1 comment:

Unknown said...

Java 5 adds a new package called java.util.concurrent that would probably make the Java implementation significantly easier to implement. ScheduledExecutorService might do the trick.