]> pilppa.org Git - libplpbus.git/blob - src/plpbus/OrbServerImpl.cc
57dcac4f0d71fc283f5172c2a1d559662561fb98
[libplpbus.git] / src / plpbus / OrbServerImpl.cc
1 /*
2  * OrbServerImpl.cc
3  *
4  *  Created on: Aug 11, 2010
5  *      Author: lamikr
6  */
7
8 #include <plp/log.h>
9
10 #include "ServerEventThread.hh"
11 #include "OrbServerImpl.hh"
12 #include "ClientServerCommon.hh"
13 #include "BusMessageInternal.hh"
14
15 using namespace std;
16 using namespace plpbus;
17 using namespace plpbus_orb;
18
19 static omni_mutex       server_thread_mutex;
20 static omni_condition   server_thread_shutdown_signal(&server_thread_mutex);
21
22 OrbServerImpl::OrbServerImpl()
23 {
24         _orb                    = NULL;
25         _poa                    = NULL;
26         _server_thread_count    = 0;
27         _shutdown_pending       = 0;
28         _listener               = NULL;
29 }
30
31 OrbServerImpl::~OrbServerImpl()
32 {
33         log_info("OrbServerImpl destroyed.\n");
34 }
35
36 void OrbServerImpl::send_message_and_request_response(OrbClient_ptr response_listener_param,
37                                                 const char* msg_req)
38 {
39         const char *msg_rsp;
40
41         msg_rsp = NULL;
42         if (CORBA::is_nil(response_listener_param) == false) {
43                 if (_listener != NULL) {
44                         log_debug("send_message_and_request_response(): %s, server callback != NULL\n", msg_req);
45                         _listener->request_received(msg_req, &msg_rsp);
46                         log_debug("send_message_and_request_response(), request_received\n");
47                         response_listener_param->receive_response_message(msg_rsp);
48                 }
49                 else {
50                         log_error("send_message_and_request_response() error, server callback == NULL\n");
51                 }
52         }
53         else {
54                 log_error("invalid callback object received.\n");
55         }
56 }
57
58 char *OrbServerImpl::send_message_and_wait_response(const char* msg_req_param, ::CORBA::Long& err_flg) {
59         const char      *msg_rsp;
60         char            *ret_val;
61
62         err_flg = 0;
63         msg_rsp = NULL;
64         if (_listener != NULL) {
65                 log_debug("send_message_and_wait_response(): %s, server_callback != NULL\n", msg_req_param);
66                 _listener->request_received(msg_req_param, &msg_rsp);
67                 //ret_val       = CORBA::string_dup(msg_rsp);
68                 ret_val = CORBA::string_dup(msg_rsp);
69         }
70         else {
71                 log_error("send_message_and_wait_response() error, server callback == NULL\n");
72                 ret_val = CORBA::string_dup(msg_req_param);
73                 err_flg = -1;
74         }
75         return ret_val;
76 }
77
78 void OrbServerImpl::send_dataitem_message_and_request_response(OrbClient_ptr response_listener_param,
79                                                         const ::DataItemSequence& msg_req_param) {
80         BusMessage              *msg_req;
81         BusMessage              *msg_rsp;
82         DataItemSequence        *seq;
83         int                     err_flg;
84
85         if (CORBA::is_nil(response_listener_param) == false) {
86                 if (_listener != NULL) {
87                         msg_req = new BusMessageInternal(msg_req_param);
88                         log_debug("send_dataitem_message_and_request_response(), server_callback != NULL\n");
89                         log_debug("msg_req:\n");
90                         msg_req->printout();
91                         msg_rsp = NULL;
92                         err_flg = 0;
93                         msg_rsp = new BusMessage(msg_req->get_type(&err_flg));
94                         _listener->request_received(msg_req, msg_rsp);
95                         seq     = (DataItemSequence *)msg_rsp->_dataItemSeq;
96                         log_debug("msg_rsp length: %ld\n", seq->length());
97                         msg_rsp->printout();
98                         response_listener_param->receive_response_dataitem_sequence(*seq);
99                         delete(msg_req);
100                         delete(msg_rsp);
101                 }
102                 else {
103                         log_error("send_dataitem_message_and_request_response() error, server_callback == NULL\n");
104                 }
105         }
106 }
107
108 CORBA::Long OrbServerImpl::send_dataitem_message_and_wait_response(const DataItemSequence& req_seq_param,
109                                                                 DataItemSequence_out rsp_seq_param) {
110         BusMessage              *msg_req;
111         BusMessage              *msg_rsp;
112         DataItemSequence        *seq;
113         int                     err_flg;
114
115         seq     = NULL;
116         msg_rsp = NULL;
117         msg_req = new BusMessageInternal(req_seq_param);
118         msg_rsp = new BusMessage(msg_req->get_type(&err_flg));
119         _listener->request_received(msg_req, msg_rsp);
120         if (msg_rsp != NULL) {
121                 seq     = (DataItemSequence *)msg_rsp->_dataItemSeq;
122         }
123         else {
124                 seq     = new DataItemSequence();
125         }
126         rsp_seq_param._data     = seq;
127
128         return 0;
129 }
130
131 void OrbServerImpl::add_event_listener(OrbClient_ptr client_param,
132                         const char *msg,
133                         CORBA::UShort period_secs)
134 {
135         ServerEventThread       *srvr_thread;
136
137         if (CORBA::is_nil(client_param) == false) {
138                 log_debug("add_event_listener()\n");
139                 server_thread_mutex.lock();
140                 _server_thread_count++;
141                 server_thread_mutex.unlock();
142                 srvr_thread     = new ServerEventThread(client_param, msg, period_secs, this);
143                 srvr_thread->start();
144         }
145         else {
146                 log_error("Failed to add event listener, listener NULL.\n");
147         }
148 }
149
150 void OrbServerImpl::shutdown()
151 {
152         omni_mutex_lock sync(server_thread_mutex);
153         if (is_shutdown_pending() == 0) {
154                 log_debug("shutdown request received!\n");
155                 // Tell the servers to exit, and wait for them to do so.
156                 _shutdown_pending       = 1;
157                 while(_server_thread_count > 0) {
158                         server_thread_shutdown_signal.timedwait(5, 0);
159                         log_debug("signal received\n");
160                 }
161                 // Shutdown the ORB (but do not wait for completion).  This also
162                 // causes the main thread to unblock from CORBA::ORB::run().
163                 log_debug("calling orb shutdown\n");
164                 _orb->shutdown(0);
165                 log_debug("orb shutdown called\n");
166         }
167 }
168
169 int OrbServerImpl::init() {
170         int     argc;
171         char    **argv;
172         int     retVal;
173
174         argc    = -1;
175         argv    = NULL;
176         retVal  = 0;
177         log_debug("started\n");
178         _orb    = CORBA::ORB_init(argc, argv);
179         if (_orb != NULL) {
180                 _poa    = create_poa(_orb);
181                 if (CORBA::is_nil(_poa) == false) {
182                         retVal  = 0;
183                         log_info("init success\n");
184                 }
185                 else {
186                         log_info("init failed\n");
187                 }
188         }
189         else {
190                 log_error("init() failed\n");
191         }
192         return retVal;
193 }
194
195 int OrbServerImpl::launch(const char *server_name)
196 {
197         CORBA::Object_var               server_ref;
198         PortableServer::ObjectId_var    server_id;
199         bool                            ok_flg;
200         int                             ret_val;
201         CosNaming::NamingContext_var    naming_cntx;
202
203         ret_val         = -1;
204         POA_plpbus_orb::OrbServer_tie<OrbServerImpl> server_impl(this);
205         server_id       = _poa->activate_object(&server_impl);
206         server_ref      = server_impl._this();
207         //this->_remove_ref();
208         naming_cntx     = get_service_naming_context(CONST_CONTEXT_NAME__PLPBUS, CONST_CONTEXT_KIND__PLPBUS);
209         if (naming_cntx != NULL) {
210                 ok_flg  = bind_naming_context_and_service(naming_cntx,
211                                                         server_ref,
212                                                         server_name,
213                                                         CONST_CONTEXT_KIND__PLPBUS);
214                 if (ok_flg == true) {
215                         log_debug("Registered to naming service: %s\n", server_name);
216                         _orb->run();
217                         log_debug("run stopped: %s\n", server_name);
218                         ret_val = 0;
219                 }
220                 else {
221                         log_error("Failed to register to naming service: %s\n", server_name);
222                 }
223         }
224         return ret_val;
225 }
226
227 PortableServer::POA_var OrbServerImpl::create_poa(CORBA::ORB_var orb) {
228         PortableServer::POA_var         ret_val;
229         CORBA::Object_var               poa_obj;
230         CORBA::PolicyList               policy_list;
231         CORBA::Any                      policyVal;
232         PortableServer::POAManager_var  poa_man;
233         PortableServer::POA_var         rootpoa;
234
235         ret_val = NULL;
236         log_debug("started");
237         try {
238                 poa_obj = orb->resolve_initial_references(CONST_ROOT_POA_NAME);
239                 log_debug("received initial reference to %s\n", CONST_ROOT_POA_NAME);
240                 rootpoa = PortableServer::POA::_narrow(poa_obj);
241                 poa_man = rootpoa->the_POAManager();
242                 poa_man->activate();
243                 // bidirectional policy
244                 policy_list.length(1);
245                 policyVal       <<= BiDirPolicy::BOTH;
246                 policy_list[0]  = orb->create_policy(BiDirPolicy::BIDIRECTIONAL_POLICY_TYPE, policyVal);
247                 ret_val         = rootpoa->create_POA(CONST_ROOT_POA_BIDIR_POLICY_NAME,
248                                         poa_man,
249                                         policy_list);
250                 if (CORBA::is_nil(ret_val) == false) {
251                         log_info("Created root poa %s\n", CONST_ROOT_POA_NAME);
252                 }
253                 else {
254                         log_error("Failed to create RootPOA %s\n", CONST_ROOT_POA_NAME);
255                 }
256         }
257         catch(CORBA::SystemException& ex) {
258                 log_error("Failed to create RootPOA, received system exception CORBA::%s\n", ex._name());
259         }
260         catch(CORBA::Exception& ex) {
261                 log_error("Failed to create RootPOA, received exception CORBA::%s\n", ex._name());
262         }
263         catch(omniORB::fatalException& ex) {
264                 log_error("Failed to create RootPOA, received fatal exception CORBA::%s\n", ex.errmsg());
265         }
266         return ret_val;
267 }
268
269 CosNaming::NamingContext_var OrbServerImpl::get_service_naming_context(const char *service_name_param,
270                                                                 const char *service_kind_param)
271 {
272         CosNaming::NamingContext_var    ret_val;
273         CosNaming::NamingContext_var    ns_cntx;
274         CORBA::Object_var               ns_obj;
275         CORBA::Object_var               service_obj;
276         CosNaming::Name                 cntx_dta;
277
278         ret_val = NULL;
279         try {
280                 // get nameservice reference
281                 log_debug("1\n");
282                 ns_obj  = _orb->resolve_initial_references(CONST_NAME_SERVICE_NAME);
283                 if (CORBA::is_nil(ns_obj) == false) {
284                         log_debug("2\n");
285                         // get name service context
286                         ns_cntx = CosNaming::NamingContext::_narrow(ns_obj);
287                         log_debug("3\n");
288                         if (CORBA::is_nil(ns_cntx) == false) {
289                                 cntx_dta.length(1);
290                                 cntx_dta[0].id          = CORBA::string_dup(service_name_param);
291                                 cntx_dta[0].kind        = CORBA::string_dup(service_kind_param);
292                                 log_debug("4\n");
293                                 try {
294                                         service_obj     = ns_cntx->resolve(cntx_dta);
295                                 }
296                                 catch(CosNaming::NamingContext::NotFound& ex) {
297                                         log_error("Trying to create new name service context %s.\n", service_name_param);
298                                 }
299                                 try {
300                                         log_debug("5\n");
301                                         if (CORBA::is_nil(service_obj)) {
302                                                 // not found, try to bind the new context to name service
303                                                 log_debug("6\n");
304                                                 ret_val = ns_cntx->bind_new_context(cntx_dta);
305                                                 log_debug("7\n");
306                                                 if (CORBA::is_nil(ret_val) ) {
307                                                         log_error("Failed to create new context to name service for %s.\n", service_name_param);
308                                                 }
309                                         }
310                                         else {
311                                                 log_debug("8\n");
312                                                 ret_val = CosNaming::NamingContext::_narrow(service_obj);
313                                                 if (CORBA::is_nil(ret_val) ) {
314                                                         log_error("Failed to get name service context for  %s, narrowing failed for resolved service context.\n", service_name_param);
315                                                 }
316                                                 log_debug("9\n");
317                                         }
318                                 }
319                                 catch(CosNaming::NamingContext::AlreadyBound& ex) {
320                                         log_error("Failed to get name service context for %s. Context with same name already exist.\n", service_name_param);
321                                 }
322                         }
323                         else {
324                                 log_error("Failed to get name service context for %s. Could not narrow the name service.\n", service_name_param);
325                         }
326                 }
327                 else {
328                         log_error("Failed to get name service context for %s. Could not get reference to name service.\n", service_name_param);
329                 }
330         }
331         catch (CORBA::ORB::InvalidName&) {
332                 // This should not happen!
333                 log_error("Failed to get name service context for %s, name service does not exist.\n", service_name_param);
334         }
335         catch(CORBA::TRANSIENT& ex) {
336                 log_error("Failed to get name service context for %s, verify that name service is running.\n", service_name_param);
337         }
338         catch (CORBA::NO_RESOURCES&) {
339                 log_error("Failed to get context from name service for %s, Name service is not running or has configuration problem.\n", service_name_param);
340                 log_error("Check-list:\n");
341                 log_error("If you have OMNIORB_CONFIG environment variable defined, verify that omniORB.cfg file exist in that location.\n");
342                 log_error("If you do not have OMNIORB_CONFIG environment variable defined, verify that you have /etc/omniORB.cfg file\n");
343                 log_error("Verify that InitRef line is defined in omniORB.cfg file.\n");
344         }
345         catch(CORBA::SystemException& ex) {
346                 log_error("Failed to get name service context for %s, system error.\n", service_name_param);
347         }
348         log_debug("done\n");
349         return ret_val;
350 }
351
352 bool OrbServerImpl::bind_naming_context_and_service(CosNaming::NamingContext_var srv_cntx_param,
353                                         CORBA::Object_ptr srv_ref_param,
354                                         const char *srv_name_param,
355                                         const char *srv_kind_param)
356 {
357         bool            retVal;
358         CosNaming::Name cntx_dta;
359
360         retVal = false;
361         try {
362                 cntx_dta.length(1);
363                 cntx_dta[0].id   = CORBA::string_dup(srv_name_param);
364                 cntx_dta[0].kind = CORBA::string_dup(srv_kind_param);
365                 try {
366                         srv_cntx_param->bind(cntx_dta, srv_ref_param);
367                         retVal  = true;
368                 }
369                 catch(CosNaming::NamingContext::AlreadyBound& ex) {
370                         /**
371                          * service existed already for the naming context with similar description.
372                          * Replace the existing one with a new one.
373                          */
374                         log_warning("service %s exist, replacing it\n", srv_name_param);
375                         srv_cntx_param->rebind(cntx_dta, srv_ref_param);
376                         retVal  = true;
377                 }
378         }
379         catch (CosNaming::NamingContext::InvalidName&) {
380                 log_error("Could not register service to name server, invalid service name.\n");
381         }
382         catch (CosNaming::NamingContext::NotFound&) {
383                 log_error("Could not register service to name server, service object reference is invalid.\n");
384         }
385         catch (CosNaming::NamingContext::CannotProceed&) {
386                 // This should not happen!
387                 log_error("Could not register service to name server, unknown error.\n");
388         }
389         catch(CORBA::SystemException& ex) {
390                 log_error("Could not register service to name server, unknown error.\n");
391         }
392         return retVal;
393 }
394
395 int OrbServerImpl::add_server_listener(IServerListener *listener_param)
396 {
397         int     retVal;
398
399         retVal          = 0;
400         log_debug("started\n");
401         _listener       = listener_param;
402         return retVal;
403 }
404
405 int OrbServerImpl::is_shutdown_pending() {
406         return _shutdown_pending;
407 }
408
409 void OrbServerImpl::server_thread_closed() {
410         bool    send_signal;
411
412         log_debug("started\n");
413         send_signal = false;
414         server_thread_mutex.lock();
415         log_debug("server_thread_count: %d\n", _server_thread_count);
416         _server_thread_count--;
417         if (_server_thread_count == 0) {
418                 send_signal     = true;
419         }
420         server_thread_mutex.unlock();
421         if (send_signal == true) {
422                 log_debug("sending signal\n");
423                 server_thread_shutdown_signal.signal();
424                 log_debug("signal send\n");
425         }
426 }