]> pilppa.org Git - libplpbus.git/blob - src/plpbus/OrbServerImpl.cc
memory leak fix
[libplpbus.git] / src / plpbus / OrbServerImpl.cc
1 /*
2  * OrbServerImpl.cc
3  *
4  *  Created on: Aug 11, 2010
5  *      Author: lamikr
6  */
7
8 #include <plp/log.h>
9
10 #include "ServerEventThread.hh"
11 #include "OrbServerImpl.hh"
12 #include "ClientServerCommon.hh"
13 #include "BusMessageInternal.hh"
14
15 using namespace std;
16 using namespace plpbus;
17 using namespace plpbus_orb;
18
19 static omni_mutex       server_thread_mutex;
20 static omni_condition   server_thread_shutdown_signal(&server_thread_mutex);
21
22 OrbServerImpl::OrbServerImpl()
23 {
24         _orb                    = NULL;
25         _poa                    = NULL;
26         _server_thread_count    = 0;
27         _shutdown_pending       = 0;
28         _listener               = NULL;
29 }
30
31 OrbServerImpl::~OrbServerImpl()
32 {
33         _orb->destroy();
34         log_info("done\n");
35 }
36
37 void OrbServerImpl::send_message_and_request_response(OrbClient_ptr response_listener_param,
38                                                 const char* msg_req)
39 {
40         const char *msg_rsp;
41
42         msg_rsp = NULL;
43         if (CORBA::is_nil(response_listener_param) == false) {
44                 if (_listener != NULL) {
45                         log_debug("send_message_and_request_response(): %s, server callback != NULL\n", msg_req);
46                         _listener->request_received(msg_req, &msg_rsp);
47                         log_debug("send_message_and_request_response(), request_received\n");
48                         response_listener_param->receive_response_message(msg_rsp);
49                 }
50                 else {
51                         log_error("send_message_and_request_response() error, server callback == NULL\n");
52                 }
53         }
54         else {
55                 log_error("invalid callback object received.\n");
56         }
57 }
58
59 char *OrbServerImpl::send_message_and_wait_response(const char* msg_req_param, ::CORBA::Long& err_flg) {
60         const char      *msg_rsp;
61         char            *ret_val;
62
63         err_flg = 0;
64         msg_rsp = NULL;
65         if (_listener != NULL) {
66                 log_debug("send_message_and_wait_response(): %s, server_callback != NULL\n", msg_req_param);
67                 _listener->request_received(msg_req_param, &msg_rsp);
68                 //ret_val       = CORBA::string_dup(msg_rsp);
69                 ret_val = CORBA::string_dup(msg_rsp);
70         }
71         else {
72                 log_error("send_message_and_wait_response() error, server callback == NULL\n");
73                 ret_val = CORBA::string_dup(msg_req_param);
74                 err_flg = -1;
75         }
76         return ret_val;
77 }
78
79 void OrbServerImpl::send_dataitem_message_and_request_response(OrbClient_ptr response_listener_param,
80                                                         const ::DataItemSequence& msg_req_param) {
81         BusMessage              *msg_req;
82         BusMessage              *msg_rsp;
83         DataItemSequence        *seq;
84         int                     err_flg;
85
86         if (CORBA::is_nil(response_listener_param) == false) {
87                 if (_listener != NULL) {
88                         msg_req = new BusMessageInternal(msg_req_param);
89                         log_debug("send_dataitem_message_and_request_response(), server_callback != NULL\n");
90                         log_debug("msg_req:\n");
91                         msg_req->printout();
92                         msg_rsp = NULL;
93                         err_flg = 0;
94                         msg_rsp = new BusMessage(msg_req->get_type(&err_flg));
95                         _listener->request_received(msg_req, msg_rsp);
96                         seq     = (DataItemSequence *)msg_rsp->_dataItemSeq;
97                         log_debug("msg_rsp length: %lu\n", (long unsigned int)seq->length());
98                         msg_rsp->printout();
99                         response_listener_param->receive_response_dataitem_sequence(*seq);
100                         delete(msg_req);
101                         delete(msg_rsp);
102                 }
103                 else {
104                         log_error("send_dataitem_message_and_request_response() error, server_callback == NULL\n");
105                 }
106         }
107 }
108
109 void copy( const DataItemSequence& orig_seq )
110 {
111         DataItemSequence new_seq;
112
113         new_seq = orig_seq;
114 }
115
116 CORBA::Long OrbServerImpl::send_dataitem_message_and_wait_response(const DataItemSequence& req_seq_param,
117                                                                 DataItemSequence_out rsp_seq_param) {
118         BusMessage      *msg_req;
119         BusMessage      *msg_rsp;
120         int             err_flg;
121
122         msg_req = new BusMessageInternal(req_seq_param);
123         msg_rsp = new BusMessage(msg_req->get_type(&err_flg));
124         _listener->request_received(msg_req, msg_rsp);
125         rsp_seq_param._data     = new DataItemSequence(*(msg_rsp->_dataItemSeq));
126         delete(msg_req);
127         delete(msg_rsp);
128
129         return 0;
130 }
131
132 void OrbServerImpl::add_event_listener(OrbClient_ptr client_param,
133                         const char *msg,
134                         CORBA::UShort period_secs)
135 {
136         ServerEventThread       *srvr_thread;
137
138         if (CORBA::is_nil(client_param) == false) {
139                 log_debug("add_event_listener()\n");
140                 server_thread_mutex.lock();
141                 _server_thread_count++;
142                 server_thread_mutex.unlock();
143                 srvr_thread     = new ServerEventThread(client_param, msg, period_secs, this);
144                 srvr_thread->start();
145         }
146         else {
147                 log_error("Failed to add event listener, listener NULL.\n");
148         }
149 }
150
151 void OrbServerImpl::shutdown()
152 {
153         omni_mutex_lock sync(server_thread_mutex);
154         if (is_shutdown_pending() == 0) {
155                 log_debug("shutdown request received!\n");
156                 // Tell the servers to exit, and wait for them to do so.
157                 _shutdown_pending       = 1;
158                 while(_server_thread_count > 0) {
159                         server_thread_shutdown_signal.timedwait(5, 0);
160                         log_debug("signal received\n");
161                 }
162                 // Shutdown the ORB (but do not wait for completion).  This also
163                 // causes the main thread to unblock from CORBA::ORB::run().
164                 log_debug("calling orb shutdown\n");
165                 _orb->shutdown(0);
166                 log_debug("orb shutdown called\n");
167         }
168 }
169
170 int OrbServerImpl::init() {
171         int     argc;
172         char    **argv;
173         int     retVal;
174
175         argc    = -1;
176         argv    = NULL;
177         retVal  = 0;
178         log_debug("started\n");
179         _orb    = CORBA::ORB_init(argc, argv);
180         if (_orb != NULL) {
181                 _poa    = create_poa(_orb);
182                 if (CORBA::is_nil(_poa) == false) {
183                         retVal  = 0;
184                         log_info("init success\n");
185                 }
186                 else {
187                         log_info("init failed\n");
188                 }
189         }
190         else {
191                 log_error("init() failed\n");
192         }
193         return retVal;
194 }
195
196 int OrbServerImpl::launch(const char *server_name)
197 {
198         CORBA::Object_var               server_ref;
199         PortableServer::ObjectId_var    server_id;
200         bool                            ok_flg;
201         int                             ret_val;
202         CosNaming::NamingContext_var    naming_cntx;
203
204         ret_val         = -1;
205         POA_plpbus_orb::OrbServer_tie<OrbServerImpl> server_impl(this);
206         server_id       = _poa->activate_object(&server_impl);
207         server_ref      = server_impl._this();
208         //this->_remove_ref();
209         naming_cntx     = get_naming_service_context(CONST_CONTEXT_NAME__PLPBUS, CONST_CONTEXT_KIND__PLPBUS);
210         if (naming_cntx != NULL) {
211                 ok_flg  = bind_naming_context_and_service(naming_cntx,
212                                                         server_ref,
213                                                         server_name,
214                                                         CONST_CONTEXT_KIND__PLPBUS);
215                 if (ok_flg == true) {
216                         log_debug("Registered to naming service: %s\n", server_name);
217                         _orb->run();
218                         log_debug("run stopped: %s\n", server_name);
219                         ret_val = 0;
220                 }
221                 else {
222                         log_error("Failed to register to naming service: %s\n", server_name);
223                 }
224         }
225         return ret_val;
226 }
227
228 PortableServer::POA_var OrbServerImpl::create_poa(CORBA::ORB_var orb) {
229         PortableServer::POA_var         ret_val;
230         CORBA::Object_var               poa_obj;
231         CORBA::PolicyList               policy_list;
232         CORBA::Any                      policyVal;
233         PortableServer::POAManager_var  poa_man;
234         PortableServer::POA_var         rootpoa;
235
236         ret_val = NULL;
237         log_debug("started\n");
238         try {
239                 poa_obj = orb->resolve_initial_references(CONST_ROOT_POA_NAME);
240                 log_debug("received initial reference to %s\n", CONST_ROOT_POA_NAME);
241                 rootpoa = PortableServer::POA::_narrow(poa_obj);
242                 poa_man = rootpoa->the_POAManager();
243                 poa_man->activate();
244                 // bidirectional policy
245                 policy_list.length(1);
246                 policyVal       <<= BiDirPolicy::BOTH;
247                 policy_list[0]  = orb->create_policy(BiDirPolicy::BIDIRECTIONAL_POLICY_TYPE, policyVal);
248                 ret_val         = rootpoa->create_POA(CONST_ROOT_POA_BIDIR_POLICY_NAME,
249                                         poa_man,
250                                         policy_list);
251                 if (CORBA::is_nil(ret_val) == false) {
252                         log_info("Created root poa %s\n", CONST_ROOT_POA_NAME);
253                 }
254                 else {
255                         log_error("Failed to create RootPOA %s\n", CONST_ROOT_POA_NAME);
256                 }
257         }
258         catch(CORBA::SystemException& ex) {
259                 log_error("Failed to create RootPOA, received system exception CORBA::%s\n", ex._name());
260         }
261         catch(CORBA::Exception& ex) {
262                 log_error("Failed to create RootPOA, received exception CORBA::%s\n", ex._name());
263         }
264         catch(omniORB::fatalException& ex) {
265                 log_error("Failed to create RootPOA, received fatal exception CORBA::%s\n", ex.errmsg());
266         }
267         return ret_val;
268 }
269
270 CosNaming::NamingContext_var OrbServerImpl::get_naming_service_context(const char *service_name_param,
271                                                                 const char *service_kind_param)
272 {
273         CosNaming::NamingContext_var    ret_val;
274         CosNaming::NamingContext_var    ns_cntx;
275         CORBA::Object_var               ns_obj;
276         CORBA::Object_var               service_obj;
277         CosNaming::Name                 cntx_dta;
278
279         ret_val = NULL;
280         try {
281                 // get nameservice reference
282                 //log_debug("started\n");
283                 ns_obj  = _orb->resolve_initial_references("NameService");
284                 if (CORBA::is_nil(ns_obj) == false) {
285                         // get naming service context
286                         ns_cntx = CosNaming::NamingContext::_narrow(ns_obj.in());
287                         if (CORBA::is_nil(ns_cntx.in()) == false) {
288                                 cntx_dta.length(1);
289                                 cntx_dta[0].id          = CORBA::string_dup(service_name_param);
290                                 cntx_dta[0].kind        = CORBA::string_dup(service_kind_param);
291                                 try {
292                                         service_obj     = ns_cntx->resolve(cntx_dta);
293                                 }
294                                 catch(CosNaming::NamingContext::NotFound& ex) {
295                                         log_error("Trying to create new naming service context %s.\n", service_name_param);
296                                 }
297                                 try {
298                                         if (CORBA::is_nil(service_obj)) {
299                                                 // not found, try to bind new context from naming service
300                                                 ret_val = ns_cntx->bind_new_context(cntx_dta);
301                                                 if (CORBA::is_nil(ret_val) ) {
302                                                         log_error("Failed to create new context to name service for %s.\n", service_name_param);
303                                                 }
304                                         }
305                                         else {
306                                                 ret_val = CosNaming::NamingContext::_narrow(service_obj);
307                                                 if (CORBA::is_nil(ret_val) ) {
308                                                         log_error("Failed to get naming service context for  %s, narrowing failed for resolved service context.\n", service_name_param);
309                                                 }
310                                         }
311                                 }
312                                 catch(CosNaming::NamingContext::AlreadyBound& ex) {
313                                         log_error("Failed to get naming service context for %s. Context with same name already exist.\n", service_name_param);
314                                 }
315                                 //CORBA::release(ns_cntx);
316                         }
317                         else {
318                                 log_error("Failed to get naming service context for %s. Could not narrow the name service.\n", service_name_param);
319                         }
320                         //CORBA::release(ns_obj);
321                 }
322                 else {
323                         log_error("Failed to get naming service context for %s. Could not get reference to name service.\n", service_name_param);
324                 }
325         }
326         catch (CORBA::ORB::InvalidName&) {
327                 // This should not happen!
328                 log_error("Failed to get naming service context for %s, name service does not exist.\n", service_name_param);
329         }
330         catch(CORBA::TRANSIENT& ex) {
331                 log_error("Failed to get naming service context for %s, verify that name service is running.\n", service_name_param);
332         }
333         catch (CORBA::NO_RESOURCES&) {
334                 log_error("Failed to get context from name service for %s, Naming service is not running or has configuration problem.\n", service_name_param);
335                 log_error("Check-list:\n");
336                 log_error("If you have OMNIORB_CONFIG environment variable defined, verify that omniORB.cfg file exist in that location.\n");
337                 log_error("If you do not have OMNIORB_CONFIG environment variable defined, verify that you have /etc/omniORB.cfg file.\n");
338                 log_error("Verify that InitRef line is defined in /etc/omniORB.cfg file.\n");
339         }
340         catch(CORBA::SystemException& ex) {
341                 log_error("Failed to get naming service context for %s, system error.\n", service_name_param);
342         }
343         //log_debug("done\n");
344         return ret_val;
345 }
346
347 bool OrbServerImpl::bind_naming_context_and_service(CosNaming::NamingContext_var srv_cntx_param,
348                                         CORBA::Object_ptr srv_ref_param,
349                                         const char *srv_name_param,
350                                         const char *srv_kind_param)
351 {
352         bool            retVal;
353         CosNaming::Name cntx_dta;
354
355         retVal = false;
356         try {
357                 cntx_dta.length(1);
358                 cntx_dta[0].id   = CORBA::string_dup(srv_name_param);
359                 cntx_dta[0].kind = CORBA::string_dup(srv_kind_param);
360                 try {
361                         srv_cntx_param->bind(cntx_dta, srv_ref_param);
362                         retVal  = true;
363                 }
364                 catch(CosNaming::NamingContext::AlreadyBound& ex) {
365                         /**
366                          * service existed already for the naming context with similar description.
367                          * Replace the existing one with a new one.
368                          */
369                         log_warning("service %s exist, replacing it\n", srv_name_param);
370                         srv_cntx_param->rebind(cntx_dta, srv_ref_param);
371                         retVal  = true;
372                 }
373         }
374         catch (CosNaming::NamingContext::InvalidName&) {
375                 log_error("Could not register service to name server, invalid service name.\n");
376         }
377         catch (CosNaming::NamingContext::NotFound&) {
378                 log_error("Could not register service to name server, service object reference is invalid.\n");
379         }
380         catch (CosNaming::NamingContext::CannotProceed&) {
381                 // This should not happen!
382                 log_error("Could not register service to name server, unknown error.\n");
383         }
384         catch(CORBA::SystemException& ex) {
385                 log_error("Could not register service to name server, unknown error.\n");
386         }
387         return retVal;
388 }
389
390 int OrbServerImpl::add_server_listener(IServerListener *listener_param)
391 {
392         int     retVal;
393
394         retVal          = 0;
395         log_debug("started\n");
396         _listener       = listener_param;
397         return retVal;
398 }
399
400 int OrbServerImpl::is_shutdown_pending() {
401         return _shutdown_pending;
402 }
403
404 void OrbServerImpl::server_thread_closed() {
405         bool    send_signal;
406
407         log_debug("started\n");
408         send_signal = false;
409         server_thread_mutex.lock();
410         log_debug("server_thread_count: %d\n", _server_thread_count);
411         _server_thread_count--;
412         if (_server_thread_count == 0) {
413                 send_signal     = true;
414         }
415         server_thread_mutex.unlock();
416         if (send_signal == true) {
417                 log_debug("sending signal\n");
418                 server_thread_shutdown_signal.signal();
419                 log_debug("signal send\n");
420         }
421 }