35 char*
buffer =
new char[
sizeof(T)];
41 if(count+msg.size()<
sizeof(T))
44 memcpy(&
buffer[count],msg.data(),msg.size());
50 int oldrem =
sizeof(T)-count;
51 memcpy(&
buffer[count],msg.data(),oldrem);
54 buffer =
new char[
sizeof(T)];
55 memcpy(&
buffer[0],&(((
char*)msg.data())[oldrem]),msg.size()-oldrem);
56 count = msg.size()-oldrem;
75 friend void* pthread_ZMQObjectSink_worker<T>(
void* that);
89 virtual void initialize(zmq::context_t& context,
unsigned int Port)
91 m_socket =
new zmq::socket_t(context, ZMQ_PULL);
92 std::ostringstream address;
93 address<<
"tcp://*:"<<Port;
94 m_socket->bind(address.str().c_str());
95 m_mutex = PTHREAD_MUTEX_INITIALIZER;
98 typedef int ZmqRcvBufSizeType;
99 const ZmqRcvBufSizeType msgsize =
sizeof(T);
101 pthread_create(&
m_thread,NULL,pthread_ZMQObjectSink_worker<T>,
this);
123 bool value =
buffer.size()>0;
124 pthread_mutex_unlock(&
m_mutex);
130 T* value =
buffer.front();
132 pthread_mutex_unlock(&
m_mutex);
Definition: zmqobjectsink.h:74
zmq::socket_t * m_socket
Definition: zmqobjectsink.h:77
T * pop_data()
Definition: zmqobjectsink.h:127
pthread_mutex_t m_mutex
Definition: zmqobjectsink.h:79
bool has_data()
Definition: zmqobjectsink.h:120
virtual void initialize(zmq::context_t &context, unsigned int Port)
Definition: zmqobjectsink.h:89
std::list< T * > buffer
Definition: zmqobjectsink.h:81
ZMQObjectSink(zmq::context_t &context, unsigned int Port)
Definition: zmqobjectsink.h:104
bool terminate
Definition: zmqobjectsink.h:80
pthread_t m_thread
Definition: zmqobjectsink.h:78
ZMQObjectSink(unsigned int Port)
Definition: zmqobjectsink.h:109
zmq::context_t * m_context
Definition: zmqobjectsink.h:82
void push_data(T *value)
Definition: zmqobjectsink.h:83
virtual ~ZMQObjectSink()
Definition: zmqobjectsink.h:114
void * pthread_ZMQObjectSink_worker(void *that)
Definition: zmqobjectsink.h:30