Ensure no new wait is accepted by boost::deadline_timer unless previous wait is expired -


i trying achieve synchronization operation hardware devices controlled c++ code.

suppose 2 types of devices there on can perform open/close. need achieve open 1 type of device specified duration. same true second type of device.

i have written code boost::deadline_timer:

#include <boost/date_time/posix_time/posix_time.hpp> #include <boost/thread.hpp> #include <boost/asio.hpp>   class test : public std::enable_shared_from_this <test> { public:     test() :io_(), timerone_(io_),timertwo_(io_){}     void open(int num);     void close(int num);     void timedopen(int num, int dur);     void run(); private:     boost::asio::io_service io_;     boost::asio::deadline_timer timerone_;     boost::asio::deadline_timer timertwo_; };  void test::open(int type) {     std::cout << "open number : " << type << std::endl; }  void test::close(int type) {     std::cout << "close number : " << type << std::endl; }  void test::timedopen(int type, int dur) {     switch (type)     {     case 1:     {               io_.reset();               auto fn = std::bind(&test::open, shared_from_this(), std::placeholders::_1);               fn(type);               timerone_.expires_from_now(boost::posix_time::seconds(dur));               timerone_.async_wait(std::bind(&test::close, shared_from_this(), type));               run();               std::cout << "function exiting" << std::endl;               std::cout << "-----------------------------------------------" << std::endl;               return;     }      case 2:     {               io_.reset();               auto fn = std::bind(&test::open, shared_from_this(), std::placeholders::_1);               fn(type);               timertwo_.expires_from_now(boost::posix_time::seconds(dur));               timertwo_.async_wait(std::bind(&test::close, shared_from_this(), type));               run();               std::cout << "function exiting" << std::endl;               std::cout << "-----------------------------------------------" << std::endl;               return;     }      }  }  void test::run() {     boost::thread th(boost::bind(&boost::asio::io_service::run, &io_)); }  int main() {     auto t = std::make_shared<test>();     t->timedopen(1, 60);     t->timedopen(2, 30);     t->timedopen(1, 5);     t->timedopen(2, 2);     char line[128];     while (std::cin.getline(line, 128))     {         if (strcmp(line, "\n")) break;     }     return 0; } 

the output is:

open number : 1 function exiting ----------------------------------------------- open number : 2 function exiting ----------------------------------------------- open number : 1 close number : 1 function exiting ----------------------------------------------- open number : 2 close number : 2 function exiting ----------------------------------------------- close number : 2 close number : 1 

for timerone_ not wait previous wait expire i.e. t->timedopen(1, 5) executed previous action t->timedopen(1, 60) cancelled.

so close number : 1 appears in output without waiting t->timedopen(1, 60).

what want achieve if multiple waits encountered type of timer, operations should queued i.e.

if type:

t->timedopen(1, 60); t->timedopen(1, 10); t->timedopen(1, 5); 

it should timedopen operation 60+10+5 seconds. 5 secs. should non blocking i.e. can not use wait() instead of async_wait().

how achieve it?

summary: requirement schedule operations on boost::deadline_timer() i.e. multiple operations on queued unless previous wait expired.

like mentioned in comment, want have queues per "type".

let's name per-type queue "session".

by chaining async waits single queue on single strand¹ effective serialization (also avoids synchronization on queue/session).

the tricky bit start async wait when none in flight. invariant async operations in flight iff !queue_.empty():

struct session : std::enable_shared_from_this<session> {     session(boost::asio::io_service &io, int type) : strand_(io), timer_(io), type(type) {}      void enqueue(int duration) {         auto = shared_from_this();         strand_.post([this,duration,this] {                  std::cout << "t0 + " << std::setw(4) << mark() << "ms enqueue number: "  << type <<  " (dur:"  << duration       <<  ")\n";                 this->queue_.push(duration);                 if (this->queue_.size() == 1)                     this->wait();             });     }    private:     boost::asio::strand strand_;     boost::asio::deadline_timer timer_;     int type;     std::queue<int> queue_;      void close() {         assert(!queue_.empty());         std::cout << "t0 + " << std::setw(4) << mark() << "ms close number :  "  << type <<  " (dur:"  << queue_.front() <<  ") (depth " << queue_.size() << ")\n";          queue_.pop();         wait();     }     void wait() {         if (!queue_.empty()) {             std::cout << "t0 + " << std::setw(4) << mark() << "ms open number :   "  << type <<  " (dur:"  << queue_.front() <<  ") (depth " << queue_.size() << ")\n";             timer_.expires_from_now(boost::posix_time::milliseconds(queue_.front()));             timer_.async_wait(strand_.wrap(std::bind(&session::close, shared_from_this())));         }     } }; 

now test class becomes simpler (in fact doesn't need "shared" @ all, i've left detail proverbial exercise reader):

class test : public std::enable_shared_from_this<test> {     using guard = boost::lock_guard<boost::mutex>; public:     test() : io_(), work_(boost::asio::io_service::work(io_)) {         io_thread = boost::thread([this] { io_.run(); });     }      void timedopen(int num, int duration);      void stop() {         {             guard lk(mx_);             if (work_) work_.reset();         }         io_thread.join();     }      ~test() {         stop();          guard lk(mx_);         timers_ex_.clear();     }  private:     mutable boost::mutex mx_;     boost::asio::io_service io_;     boost::optional<boost::asio::io_service::work> work_;     std::map<int, std::shared_ptr<session> > timers_ex_;     boost::thread io_thread; };  void test::timedopen(int type, int duration) {     guard lk(mx_);      auto &session = timers_ex_[type];     if (!session) session = std::make_shared<session>(io_, type);      session->enqueue(duration); } 

as can see i've

  • extrapolated number of types
  • made operations thread-safe
  • added relative timestamps in milliseconds since t0
  • fixed broken io_service lifetime. now, construction starts service. work_ variable keeps alive when idle.
  • stop() shuts down (draining session queues first).
  • destruction calls stop() implicitly

here's test run:

live on coliru

int main() {     auto t = std::make_shared<test>();     t->timedopen(1, 300);     t->timedopen(2, 150);     t->timedopen(1,  50);     t->timedopen(2,  20);      boost::this_thread::sleep_for(boost::chrono::milliseconds(400));     std::cout << "================\n";     t->timedopen(1,  50);     t->timedopen(2,  20);     t->timedopen(1, 300);     t->timedopen(2, 150);      t->stop(); } 

prints

t0 +    0ms enqueue number: 1 (dur:300) t0 +    0ms open number :   1 (dur:300) (depth 1) t0 +    0ms enqueue number: 2 (dur:150) t0 +    0ms open number :   2 (dur:150) (depth 1) t0 +    0ms enqueue number: 1 (dur:50) t0 +    0ms enqueue number: 2 (dur:20) t0 +  150ms close number :  2 (dur:150) (depth 2) t0 +  150ms open number :   2 (dur:20) (depth 1) t0 +  170ms close number :  2 (dur:20) (depth 1) t0 +  300ms close number :  1 (dur:300) (depth 2) t0 +  300ms open number :   1 (dur:50) (depth 1) t0 +  350ms close number :  1 (dur:50) (depth 1) ================ t0 +  400ms enqueue number: 1 (dur:50) t0 +  400ms open number :   1 (dur:50) (depth 1) t0 +  400ms enqueue number: 2 (dur:20) t0 +  400ms open number :   2 (dur:20) (depth 1) t0 +  400ms enqueue number: 1 (dur:300) t0 +  400ms enqueue number: 2 (dur:150) t0 +  420ms close number :  2 (dur:20) (depth 2) t0 +  420ms open number :   2 (dur:150) (depth 1) t0 +  450ms close number :  1 (dur:50) (depth 2) t0 +  450ms open number :   1 (dur:300) (depth 1) t0 +  570ms close number :  2 (dur:150) (depth 1) t0 +  750ms close number :  1 (dur:300) (depth 1) 

¹ why need strand per connection when using boost::asio?


Comments

Popular posts from this blog

Payment information shows nothing in one page checkout page magento -

tcpdump - How to check if server received packet (acknowledged) -