]> pilppa.org Git - libplpbus.git/blob - src/plpbus/OrbServerImpl.cc
6e097486f5e7571effb81d332957bbed22393a3c
[libplpbus.git] / src / plpbus / OrbServerImpl.cc
1 /*
2  * OrbServerImpl.cc
3  *
4  *  Created on: Aug 11, 2010
5  *      Author: lamikr
6  */
7
8 #include <plp/log.h>
9
10 #include "ServerEventThread.hh"
11 #include "OrbServerImpl.hh"
12 #include "ClientServerCommon.hh"
13 #include "BusMessageInternal.hh"
14
15 using namespace std;
16 using namespace plpbus;
17 using namespace plpbus_orb;
18
19 static omni_mutex       server_thread_mutex;
20 static omni_condition   server_thread_shutdown_signal(&server_thread_mutex);
21
22 OrbServerImpl::OrbServerImpl()
23 {
24         _orb                    = NULL;
25         _poa                    = NULL;
26         _server_thread_count    = 0;
27         _shutdown_pending       = 0;
28         _listener               = NULL;
29 }
30
31 OrbServerImpl::~OrbServerImpl()
32 {
33         log_info("OrbServerImpl destroyed.\n");
34 }
35
36 void OrbServerImpl::send_message_and_request_response(OrbClient_ptr response_listener_param,
37                                                 const char* msg_req)
38 {
39         const char *msg_rsp;
40
41         msg_rsp = NULL;
42         if (CORBA::is_nil(response_listener_param) == false) {
43                 if (_listener != NULL) {
44                         log_debug("send_message_and_request_response(): %s, server callback != NULL\n", msg_req);
45                         _listener->request_received(msg_req, &msg_rsp);
46                         log_debug("send_message_and_request_response(), request_received\n");
47                         response_listener_param->receive_response_message(msg_rsp);
48                 }
49                 else {
50                         log_error("send_message_and_request_response() error, server callback == NULL\n");
51                 }
52         }
53         else {
54                 log_error("invalid callback object received.\n");
55         }
56 }
57
58 char *OrbServerImpl::send_message_and_wait_response(const char* msg_req_param, ::CORBA::Long& err_flg) {
59         const char      *msg_rsp;
60         char            *ret_val;
61
62         err_flg = 0;
63         msg_rsp = NULL;
64         if (_listener != NULL) {
65                 log_debug("send_message_and_wait_response(): %s, server_callback != NULL\n", msg_req_param);
66                 _listener->request_received(msg_req_param, &msg_rsp);
67         }
68         else {
69                 log_error("send_message_and_wait_response() error, server callback == NULL\n");
70                 msg_rsp = strdup(msg_req_param);
71                 err_flg = -1;
72         }
73         ret_val = CORBA::string_dup(msg_rsp);
74         return ret_val;
75 }
76
77 void OrbServerImpl::send_dataitem_message_and_request_response(OrbClient_ptr response_listener_param,
78                                                         const ::DataItemSequence& msg_req_param) {
79         BusMessage              *msg_req;
80         BusMessage              *msg_rsp;
81         DataItemSequence        *seq;
82         int                     err_flg;
83
84         if (CORBA::is_nil(response_listener_param) == false) {
85                 if (_listener != NULL) {
86                         msg_req = new BusMessageInternal(msg_req_param);
87                         log_debug("send_dataitem_message_and_request_response(), server_callback != NULL\n");
88                         log_debug("msg_req:\n");
89                         msg_req->printout();
90                         msg_rsp = NULL;
91                         err_flg = 0;
92                         msg_rsp = new BusMessage(msg_req->get_type(&err_flg));
93                         _listener->request_received(msg_req, msg_rsp);
94                         seq     = (DataItemSequence *)msg_rsp->_dataItemSeq;
95                         log_debug("msg_rsp length: %ld\n", seq->length());
96                         msg_rsp->printout();
97                         response_listener_param->receive_response_dataitem_sequence(*seq);
98                         delete(msg_req);
99                         delete(msg_rsp);
100                 }
101                 else {
102                         log_error("send_dataitem_message_and_request_response() error, server_callback == NULL\n");
103                 }
104         }
105 }
106
107 CORBA::Long OrbServerImpl::send_dataitem_message_and_wait_response(const DataItemSequence& req_seq_param,
108                                                                 DataItemSequence_out rsp_seq_param) {
109         BusMessage              *msg_req;
110         BusMessage              *msg_rsp;
111         DataItemSequence        *seq;
112         int                     err_flg;
113
114         seq     = NULL;
115         msg_rsp = NULL;
116         msg_req = new BusMessageInternal(req_seq_param);
117         msg_rsp = new BusMessage(msg_req->get_type(&err_flg));
118         _listener->request_received(msg_req, msg_rsp);
119         if (msg_rsp != NULL) {
120                 seq     = (DataItemSequence *)msg_rsp->_dataItemSeq;
121         }
122         else {
123                 seq     = new DataItemSequence();
124         }
125         rsp_seq_param._data     = seq;
126
127         return 0;
128 }
129
130 void OrbServerImpl::add_event_listener(OrbClient_ptr client_param,
131                         const char *msg,
132                         CORBA::UShort period_secs)
133 {
134         ServerEventThread       *srvr_thread;
135
136         if (CORBA::is_nil(client_param) == false) {
137                 log_debug("add_event_listener()\n");
138                 server_thread_mutex.lock();
139                 _server_thread_count++;
140                 server_thread_mutex.unlock();
141                 srvr_thread     = new ServerEventThread(client_param, msg, period_secs, this);
142                 srvr_thread->start();
143         }
144         else {
145                 log_error("Failed to add event listener, listener NULL.\n");
146         }
147 }
148
149 void OrbServerImpl::shutdown()
150 {
151         omni_mutex_lock sync(server_thread_mutex);
152         if (is_shutdown_pending() == 0) {
153                 log_debug("shutdown request received!\n");
154                 // Tell the servers to exit, and wait for them to do so.
155                 _shutdown_pending       = 1;
156                 while(_server_thread_count > 0) {
157                         server_thread_shutdown_signal.timedwait(5, 0);
158                         log_debug("signal received\n");
159                 }
160                 // Shutdown the ORB (but do not wait for completion).  This also
161                 // causes the main thread to unblock from CORBA::ORB::run().
162                 log_debug("calling orb shutdown\n");
163                 _orb->shutdown(0);
164                 log_debug("orb shutdown called\n");
165         }
166 }
167
168 int OrbServerImpl::init() {
169         int     argc;
170         char    **argv;
171         int     retVal;
172
173         argc    = -1;
174         argv    = NULL;
175         retVal  = 0;
176         _orb    = CORBA::ORB_init(argc, argv);
177         if (_orb != NULL) {
178                 _poa    = create_poa(_orb);
179                 retVal  = 0;
180         }
181         else {
182                 log_error("init() failed\n");
183         }
184         return retVal;
185 }
186
187 int OrbServerImpl::launch(const char *server_name)
188 {
189         CORBA::Object_var               server_ref;
190         PortableServer::ObjectId_var    server_id;
191         bool                            ok_flg;
192         int                             ret_val;
193         CosNaming::NamingContext_var    naming_context;
194
195         ret_val         = -1;
196         POA_plpbus_orb::OrbServer_tie<OrbServerImpl> server_impl(this);
197         server_id       = _poa->activate_object(&server_impl);
198         server_ref      = server_impl._this();
199         //this->_remove_ref();
200         naming_context  = get_service_naming_context(CONST_CONTEXT_NAME__PLPBUS, CONST_CONTEXT_KIND__PLPBUS);
201         if (naming_context != NULL) {
202                 ok_flg  = bind_naming_context_and_service(naming_context,
203                                                         server_ref,
204                                                         server_name,
205                                                         CONST_CONTEXT_KIND__PLPBUS);
206                 if (ok_flg == true) {
207                         log_debug("Registered to naming service: %s\n", server_name);
208                         _orb->run();
209                         ret_val = 0;
210                 }
211                 else {
212                         log_error("Failed to register to naming service: %s\n", server_name);
213                 }
214         }
215         return ret_val;
216 }
217
218 PortableServer::POA_var OrbServerImpl::create_poa(CORBA::ORB_var orb) {
219         PortableServer::POA_var         ret_val;
220         CORBA::Object_var               poa_obj;
221         CORBA::PolicyList               policy_list;
222         CORBA::Any                      policyVal;
223         PortableServer::POAManager_var  poa_man;
224         PortableServer::POA_var         rootpoa;
225
226         ret_val = NULL;
227         poa_obj = orb->resolve_initial_references(CONST_ROOT_POA_NAME);
228         if (poa_obj != NULL) {
229                 rootpoa = PortableServer::POA::_narrow(poa_obj);
230                 if (rootpoa != NULL) {
231                         poa_man = rootpoa->the_POAManager();
232                         if (poa_man != NULL) {
233                                 poa_man->activate();
234                                 // bidirectional policy
235                                 policy_list.length(1);
236                                 policyVal       <<= BiDirPolicy::BOTH;
237                                 policy_list[0] = orb->create_policy(BiDirPolicy::BIDIRECTIONAL_POLICY_TYPE, policyVal);
238                                 ret_val = rootpoa->create_POA(CONST_ROOT_POA_BIDIR_POLICY_NAME,
239                                                         poa_man,
240                                                         policy_list);
241                         }
242                 }
243         }
244         else {
245                 log_error("Failed to create RootPOA.\n");
246         }
247         return ret_val;
248 }
249
250 CosNaming::NamingContext_var OrbServerImpl::get_service_naming_context(const char *service_name_param,
251                                                                 const char *service_kind_param)
252 {
253         CosNaming::NamingContext_var    ret_val;
254         CosNaming::NamingContext_var    ns_context;
255         CORBA::Object_var               ns_obj;
256         CORBA::Object_var               service_obj;
257         CosNaming::Name                 context_data;
258
259         ret_val = NULL;
260         try {
261                 // get nameservice reference
262                 log_debug("get_service_naming_context() 1\n");
263                 ns_obj          = _orb->resolve_initial_references(CONST_NAME_SERVICE_NAME);
264                 log_debug("get_service_naming_context() 2\n");
265                 // get name service context
266                 ns_context      = CosNaming::NamingContext::_narrow(ns_obj);
267                 log_debug("get_service_naming_context() 3\n");
268                 if (CORBA::is_nil(ns_context) == false) {
269                         context_data.length(1);
270                         context_data[0].id      = service_name_param;
271                         context_data[0].kind    = service_kind_param;
272                         log_debug("get_service_naming_context() 4\n");
273                         try {
274                                 service_obj     = ns_context->resolve(context_data);
275                         }
276                         catch(CosNaming::NamingContext::NotFound& ex) {
277                                 log_error("Failed to get context from name service for %s. Context does not yet exist, but will try to create.\n", service_name_param);
278                         }
279                         try {
280                                 log_debug("get_service_naming_context() 5\n");
281                                 if (CORBA::is_nil(service_obj)) {
282                                         // not found, try to bind the new context to name service
283                                         log_debug("get_service_naming_context() 6\n");
284                                         ret_val = ns_context->bind_new_context(context_data);
285                                         log_debug("get_service_naming_context() 7\n");
286                                         if (CORBA::is_nil(ret_val) ) {
287                                                 log_error("Failed to create new context to name service for %s.\n", service_name_param);
288                                         }
289                                 }
290                                 else {
291                                         log_debug("get_service_naming_context() 8\n");
292                                         ret_val = CosNaming::NamingContext::_narrow(service_obj);
293                                         log_debug("get_service_naming_context() 9\n");
294                                         if (CORBA::is_nil(ret_val) ) {
295                                                 log_error("Failed to get existing context from name service for %s, narrowing failed.\n", service_name_param);
296                                         }
297                                 }
298                         }
299                         catch(CosNaming::NamingContext::AlreadyBound& ex) {
300                                 log_error("Failed to get context from name service for %s. Context with same name already exist.\n", service_name_param);
301                         }
302                 }
303         }
304         catch (CORBA::ORB::InvalidName&) {
305                 // This should not happen!
306                 log_error("Failed to get context from name service for %s, name service does not exist.\n", service_name_param);
307         }
308         catch(CORBA::TRANSIENT& ex) {
309                 log_error("Failed to get context from name service for %s, verify that name service is running.\n", service_name_param);
310         }
311         catch (CORBA::NO_RESOURCES&) {
312                 log_error("Failed to get context from name service for %s, Name service is not running or has configuration problem.\n", service_name_param);
313                 log_error("Check-list:\n");
314                 log_error("If you have OMNIORB_CONFIG environment variable defined, verify that omniORB.cfg file exist in that location.\n");
315                 log_error("If you do not have OMNIORB_CONFIG environment variable defined, verify that you have /etc/omniORB.cfg file\n");
316                 log_error("Verify that InitRef line is defined in omniORB.cfg file.\n");
317         }
318         catch(CORBA::SystemException& ex) {
319                 log_error("Failed to get context from name service for %s, system error.\n", service_name_param);
320         }
321         log_debug("get_service_naming_context() done\n");
322         return ret_val;
323 }
324
325 bool OrbServerImpl::bind_naming_context_and_service(CosNaming::NamingContext_var service_context_param,
326                                         CORBA::Object_ptr service_ref_param,
327                                         const char *service_name_param,
328                                         const char *service_kind_param)
329 {
330         bool            retVal;
331         CosNaming::Name context_data;
332
333         retVal = false;
334         try {
335                 context_data.length(1);
336                 context_data[0].id   = service_name_param;
337                 context_data[0].kind = service_kind_param;
338                 try {
339                         service_context_param->bind(context_data, service_ref_param);
340                         retVal  = true;
341                 }
342                 catch(CosNaming::NamingContext::AlreadyBound& ex) {
343                         /**
344                          * service existed already for the naming context with similar description.
345                          * Replace the existing one with a new one.
346                          */
347                         log_warning("service %s exist, replacing it\n", service_name_param);
348                         service_context_param->rebind(context_data, service_ref_param);
349                         retVal  = true;
350                 }
351         }
352         catch (CosNaming::NamingContext::InvalidName&) {
353                 log_error("Could not register service to name server, invalid service name.\n");
354         }
355         catch (CosNaming::NamingContext::NotFound&) {
356                 log_error("Could not register service to name server, service object reference is invalid.\n");
357         }
358         catch (CosNaming::NamingContext::CannotProceed&) {
359                 // This should not happen!
360                 log_error("Could not register service to name server, unknown error.\n");
361         }
362         catch(CORBA::SystemException& ex) {
363                 log_error("Could not register service to name server, unknown error.\n");
364         }
365         return retVal;
366 }
367
368 int OrbServerImpl::add_server_listener(IServerListener *listener_param)
369 {
370         int     retVal;
371
372         retVal          = 0;
373         log_debug("register_request_received_callback() started\n");
374         _listener       = listener_param;
375         return retVal;
376 }
377
378 int OrbServerImpl::is_shutdown_pending() {
379         return _shutdown_pending;
380 }
381
382 void OrbServerImpl::server_thread_closed() {
383         bool    send_signal;
384
385         log_debug("server_thread_closed() started\n");
386         send_signal = false;
387         server_thread_mutex.lock();
388         log_debug("server_thread_closed(), server_thread_count: %d\n", _server_thread_count);
389         _server_thread_count--;
390         if (_server_thread_count == 0) {
391                 send_signal     = true;
392         }
393         server_thread_mutex.unlock();
394         if (send_signal == true) {
395                 log_debug("server_thread_closed(), sending signal\n");
396                 server_thread_shutdown_signal.signal();
397                 log_debug("server_thread_closed(), signal send\n");
398         }
399 }