Ensure no new wait is accepted by boost::deadline_timer unless previous wait is expired -
i trying achieve synchronization operation hardware devices controlled c++ code.
suppose 2 types of devices there on can perform open/close
. need achieve open 1 type of device specified duration
. same true second type of device.
i have written code boost::deadline_timer
:
#include <boost/date_time/posix_time/posix_time.hpp> #include <boost/thread.hpp> #include <boost/asio.hpp> class test : public std::enable_shared_from_this <test> { public: test() :io_(), timerone_(io_),timertwo_(io_){} void open(int num); void close(int num); void timedopen(int num, int dur); void run(); private: boost::asio::io_service io_; boost::asio::deadline_timer timerone_; boost::asio::deadline_timer timertwo_; }; void test::open(int type) { std::cout << "open number : " << type << std::endl; } void test::close(int type) { std::cout << "close number : " << type << std::endl; } void test::timedopen(int type, int dur) { switch (type) { case 1: { io_.reset(); auto fn = std::bind(&test::open, shared_from_this(), std::placeholders::_1); fn(type); timerone_.expires_from_now(boost::posix_time::seconds(dur)); timerone_.async_wait(std::bind(&test::close, shared_from_this(), type)); run(); std::cout << "function exiting" << std::endl; std::cout << "-----------------------------------------------" << std::endl; return; } case 2: { io_.reset(); auto fn = std::bind(&test::open, shared_from_this(), std::placeholders::_1); fn(type); timertwo_.expires_from_now(boost::posix_time::seconds(dur)); timertwo_.async_wait(std::bind(&test::close, shared_from_this(), type)); run(); std::cout << "function exiting" << std::endl; std::cout << "-----------------------------------------------" << std::endl; return; } } } void test::run() { boost::thread th(boost::bind(&boost::asio::io_service::run, &io_)); } int main() { auto t = std::make_shared<test>(); t->timedopen(1, 60); t->timedopen(2, 30); t->timedopen(1, 5); t->timedopen(2, 2); char line[128]; while (std::cin.getline(line, 128)) { if (strcmp(line, "\n")) break; } return 0; }
the output is:
open number : 1 function exiting ----------------------------------------------- open number : 2 function exiting ----------------------------------------------- open number : 1 close number : 1 function exiting ----------------------------------------------- open number : 2 close number : 2 function exiting ----------------------------------------------- close number : 2 close number : 1
for timerone_
not wait previous wait
expire i.e. t->timedopen(1, 5)
executed previous action t->timedopen(1, 60)
cancelled.
so close number : 1
appears in output without waiting t->timedopen(1, 60)
.
what want achieve if multiple waits encountered
type of timer
, operations should queued i.e.
if type:
t->timedopen(1, 60); t->timedopen(1, 10); t->timedopen(1, 5);
it should timedopen operation
60+10+5
seconds. 5 secs. should non blocking i.e. can not use wait() instead of async_wait()
.
how achieve it?
summary: requirement schedule operations on boost::deadline_timer()
i.e. multiple operations on queued unless previous wait expired.
like mentioned in comment, want have queues per "type".
let's name per-type queue "session".
by chaining async waits single queue on single strand
¹ effective serialization (also avoids synchronization on queue/session).
the tricky bit start async wait when none in flight. invariant async operations in flight iff !queue_.empty()
:
struct session : std::enable_shared_from_this<session> { session(boost::asio::io_service &io, int type) : strand_(io), timer_(io), type(type) {} void enqueue(int duration) { auto = shared_from_this(); strand_.post([this,duration,this] { std::cout << "t0 + " << std::setw(4) << mark() << "ms enqueue number: " << type << " (dur:" << duration << ")\n"; this->queue_.push(duration); if (this->queue_.size() == 1) this->wait(); }); } private: boost::asio::strand strand_; boost::asio::deadline_timer timer_; int type; std::queue<int> queue_; void close() { assert(!queue_.empty()); std::cout << "t0 + " << std::setw(4) << mark() << "ms close number : " << type << " (dur:" << queue_.front() << ") (depth " << queue_.size() << ")\n"; queue_.pop(); wait(); } void wait() { if (!queue_.empty()) { std::cout << "t0 + " << std::setw(4) << mark() << "ms open number : " << type << " (dur:" << queue_.front() << ") (depth " << queue_.size() << ")\n"; timer_.expires_from_now(boost::posix_time::milliseconds(queue_.front())); timer_.async_wait(strand_.wrap(std::bind(&session::close, shared_from_this()))); } } };
now test
class becomes simpler (in fact doesn't need "shared" @ all, i've left detail proverbial exercise reader):
class test : public std::enable_shared_from_this<test> { using guard = boost::lock_guard<boost::mutex>; public: test() : io_(), work_(boost::asio::io_service::work(io_)) { io_thread = boost::thread([this] { io_.run(); }); } void timedopen(int num, int duration); void stop() { { guard lk(mx_); if (work_) work_.reset(); } io_thread.join(); } ~test() { stop(); guard lk(mx_); timers_ex_.clear(); } private: mutable boost::mutex mx_; boost::asio::io_service io_; boost::optional<boost::asio::io_service::work> work_; std::map<int, std::shared_ptr<session> > timers_ex_; boost::thread io_thread; }; void test::timedopen(int type, int duration) { guard lk(mx_); auto &session = timers_ex_[type]; if (!session) session = std::make_shared<session>(io_, type); session->enqueue(duration); }
as can see i've
- extrapolated number of types
- made operations thread-safe
- added relative timestamps in milliseconds since
t0
- fixed broken
io_service
lifetime. now, construction starts service.work_
variable keeps alive when idle. stop()
shuts down (draining session queues first).- destruction calls
stop()
implicitly
here's test run:
int main() { auto t = std::make_shared<test>(); t->timedopen(1, 300); t->timedopen(2, 150); t->timedopen(1, 50); t->timedopen(2, 20); boost::this_thread::sleep_for(boost::chrono::milliseconds(400)); std::cout << "================\n"; t->timedopen(1, 50); t->timedopen(2, 20); t->timedopen(1, 300); t->timedopen(2, 150); t->stop(); }
prints
t0 + 0ms enqueue number: 1 (dur:300) t0 + 0ms open number : 1 (dur:300) (depth 1) t0 + 0ms enqueue number: 2 (dur:150) t0 + 0ms open number : 2 (dur:150) (depth 1) t0 + 0ms enqueue number: 1 (dur:50) t0 + 0ms enqueue number: 2 (dur:20) t0 + 150ms close number : 2 (dur:150) (depth 2) t0 + 150ms open number : 2 (dur:20) (depth 1) t0 + 170ms close number : 2 (dur:20) (depth 1) t0 + 300ms close number : 1 (dur:300) (depth 2) t0 + 300ms open number : 1 (dur:50) (depth 1) t0 + 350ms close number : 1 (dur:50) (depth 1) ================ t0 + 400ms enqueue number: 1 (dur:50) t0 + 400ms open number : 1 (dur:50) (depth 1) t0 + 400ms enqueue number: 2 (dur:20) t0 + 400ms open number : 2 (dur:20) (depth 1) t0 + 400ms enqueue number: 1 (dur:300) t0 + 400ms enqueue number: 2 (dur:150) t0 + 420ms close number : 2 (dur:20) (depth 2) t0 + 420ms open number : 2 (dur:150) (depth 1) t0 + 450ms close number : 1 (dur:50) (depth 2) t0 + 450ms open number : 1 (dur:300) (depth 1) t0 + 570ms close number : 2 (dur:150) (depth 1) t0 + 750ms close number : 1 (dur:300) (depth 1)
Comments
Post a Comment