]> pilppa.org Git - libplpbus.git/blob - src/plpbus/OrbServerImpl.cc
[http://pilppa.org/home/issues/4] Fix the service creation on new name server instances
[libplpbus.git] / src / plpbus / OrbServerImpl.cc
1 /*
2  * OrbServerImpl.cc
3  *
4  *  Created on: Aug 11, 2010
5  *      Author: lamikr
6  */
7
8 #include <plp/log.h>
9
10 #include "ServerEventThread.hh"
11 #include "OrbServerImpl.hh"
12 #include "ClientServerCommon.hh"
13 #include "BusMessageInternal.hh"
14
15 using namespace std;
16 using namespace plpbus;
17 using namespace plpbus_orb;
18
19 static omni_mutex       server_thread_mutex;
20 static omni_condition   server_thread_shutdown_signal(&server_thread_mutex);
21
22 OrbServerImpl::OrbServerImpl()
23 {
24         _orb                    = NULL;
25         _poa                    = NULL;
26         _server_thread_count    = 0;
27         _shutdown_pending       = 0;
28         _listener               = NULL;
29 }
30
31 OrbServerImpl::~OrbServerImpl()
32 {
33         log_info("OrbServerImpl destroyed.\n");
34 }
35
36 void OrbServerImpl::send_message_and_request_response(OrbClient_ptr response_listener_param,
37                                                 const char* msg_req)
38 {
39         char    *msg_rsp;
40
41         msg_rsp = NULL;
42         if (CORBA::is_nil(response_listener_param) == false) {
43                 if (_listener != NULL) {
44                         log_debug("send_message_and_request_response(): %s, server callback != NULL\n", msg_req);
45                         _listener->request_received(msg_req, &msg_rsp);
46                         response_listener_param->receive_response_message(msg_rsp);
47                 }
48                 else {
49                         log_error("send_message_and_request_response() error, server callback == NULL\n");
50                 }
51         }
52         else {
53                 log_error("invalid callback object received.\n");
54         }
55 }
56
57 char *OrbServerImpl::send_message_and_wait_response(const char* msg_req_param, ::CORBA::Long& err_flg) {
58         char    *msg_rsp;
59         char    *ret_val;
60
61         err_flg = 0;
62         msg_rsp = NULL;
63         if (_listener != NULL) {
64                 log_debug("send_message_and_wait_response(): %s, server_callback != NULL\n", msg_req_param);
65                 _listener->request_received(msg_req_param, &msg_rsp);
66         }
67         else {
68                 log_error("send_message_and_wait_response() error, server callback == NULL\n");
69                 msg_rsp = strdup(msg_req_param);
70                 err_flg = -1;
71         }
72         ret_val = CORBA::string_dup(msg_rsp);
73         return ret_val;
74 }
75
76 void OrbServerImpl::send_dataitem_message_and_request_response(OrbClient_ptr response_listener_param,
77                                                         const ::DataItemSequence& msg_req_param) {
78         BusMessage      *msg_req;
79         BusMessage      *msg_rsp;
80
81         if (CORBA::is_nil(response_listener_param) == false) {
82                 if (_listener != NULL) {
83                         msg_req = new BusMessageInternal(msg_req_param);
84                         log_debug("send_dataitem_message_and_request_response(), server_callback != NULL\n");
85                         log_debug("msg_req:\n");
86                         msg_req->printout();
87                         msg_rsp = NULL;
88                         _listener->request_received(msg_req, &msg_rsp);
89                         log_debug("msg_rsp length: %d\n", msg_rsp->_dataItemSeq.length());
90                         msg_rsp->printout();
91                         response_listener_param->receive_response_dataitem_sequence(msg_rsp->_dataItemSeq);
92                 }
93                 else {
94                         log_error("send_dataitem_message_and_request_response() error, server_callback == NULL\n");
95                 }
96         }
97 }
98
99 CORBA::Long OrbServerImpl::send_dataitem_message_and_wait_response(const DataItemSequence& req_seq_param,
100                 DataItemSequence_out rsp_seq_param) {
101         BusMessage      *msg_req;
102         BusMessage      *msg_rsp;
103
104         msg_req = new BusMessageInternal(req_seq_param);
105         msg_rsp = NULL;
106         _listener->request_received(msg_req, &msg_rsp);
107         //rsp_seq_param = new DataItemSequence_out(msg_rsp._dataItemSeq);
108         rsp_seq_param._data     = &(msg_rsp->_dataItemSeq);
109
110         return 0;
111 }
112
113 void OrbServerImpl::add_event_listener(OrbClient_ptr client_param,
114                         const char *msg,
115                         CORBA::UShort period_secs)
116 {
117         ServerEventThread       *srvr_thread;
118
119         if (CORBA::is_nil(client_param) == false) {
120                 log_debug("add_event_listener()\n");
121                 server_thread_mutex.lock();
122                 _server_thread_count++;
123                 server_thread_mutex.unlock();
124                 srvr_thread     = new ServerEventThread(client_param, msg, period_secs, this);
125                 srvr_thread->start();
126         }
127         else {
128                 cerr << "Failed to add event listener, listener NULL.\n";
129         }
130 }
131
132 void OrbServerImpl::shutdown()
133 {
134         omni_mutex_lock sync(server_thread_mutex);
135         if (is_shutdown_pending() == 0) {
136                 cout << "shutdown request received!" << endl;
137                 // Tell the servers to exit, and wait for them to do so.
138                 _shutdown_pending       = 1;
139                 while(_server_thread_count > 0) {
140                         server_thread_shutdown_signal.wait();
141                 }
142                 // Shutdown the ORB (but do not wait for completion).  This also
143                 // causes the main thread to unblock from CORBA::ORB::run().
144                 _orb->shutdown(0);
145         }
146 }
147
148 int OrbServerImpl::init() {
149         int     argc;
150         char    **argv;
151         int     retVal;
152
153         argc    = -1;
154         argv    = NULL;
155         retVal  = 0;
156         _orb    = CORBA::ORB_init(argc, argv);
157         if (_orb != NULL) {
158                 _poa    = create_poa(_orb);
159                 retVal  = 0;
160         }
161         else {
162                 cout << "init() failed"  << endl;
163         }
164         return retVal;
165 }
166
167 int OrbServerImpl::launch(const char *server_name)
168 {
169         CORBA::Object_var               server_ref;
170         PortableServer::ObjectId_var    server_id;
171         bool                            ok_flg;
172         int                             ret_val;
173         CosNaming::NamingContext_var    naming_context;
174
175         ret_val         = -1;
176         POA_plpbus_orb::OrbServer_tie<OrbServerImpl> server_impl(this);
177         server_id       = _poa->activate_object(&server_impl);
178         server_ref      = server_impl._this();
179         //this->_remove_ref();
180         naming_context  = get_service_naming_context(CONST_CONTEXT_NAME__PLPBUS, CONST_CONTEXT_KIND__PLPBUS);
181         if (naming_context != NULL) {
182                 ok_flg  = bind_naming_context_and_service(naming_context, server_ref, server_name, CONST_CONTEXT_KIND__PLPBUS);
183                 if (ok_flg == true) {
184                         cout << "Registered to naming service: " << server_name << endl;
185                         _orb->run();
186                         ret_val = 0;
187                 }
188                 else {
189                         cout << "Failed to register to naming service: " << server_name << endl;
190                 }
191         }
192         return ret_val;
193 }
194
195 PortableServer::POA_var OrbServerImpl::create_poa(CORBA::ORB_var orb) {
196         PortableServer::POA_var         ret_val;
197         CORBA::Object_var               poa_obj;
198         CORBA::PolicyList               policy_list;
199         CORBA::Any                      policyVal;
200         PortableServer::POAManager_var  poa_man;
201         PortableServer::POA_var         rootpoa;
202
203         ret_val = NULL;
204         poa_obj = orb->resolve_initial_references(CONST_ROOT_POA_NAME);
205         if (poa_obj != NULL) {
206                 rootpoa = PortableServer::POA::_narrow(poa_obj);
207                 if (rootpoa != NULL) {
208                         poa_man = rootpoa->the_POAManager();
209                         if (poa_man != NULL) {
210                                 poa_man->activate();
211                                 // bidirectional policy
212                                 policy_list.length(1);
213                                 policyVal       <<= BiDirPolicy::BOTH;
214                                 policy_list[0] = orb->create_policy(BiDirPolicy::BIDIRECTIONAL_POLICY_TYPE, policyVal);
215                                 ret_val = rootpoa->create_POA(CONST_ROOT_POA_BIDIR_POLICY_NAME,
216                                                         poa_man,
217                                                         policy_list);
218                         }
219                 }
220         }
221         else {
222                 cerr << "Failed to create RootPOA." << endl;
223         }
224         return ret_val;
225 }
226
227 CosNaming::NamingContext_var OrbServerImpl::get_service_naming_context(const char *service_name_param,
228                                                                 const char *service_kind_param)
229 {
230         CosNaming::NamingContext_var    ret_val;
231         CosNaming::NamingContext_var    ns_context;
232         CORBA::Object_var               ns_obj;
233         CORBA::Object_var               service_obj;
234         CosNaming::Name                 context_data;
235
236         ret_val = NULL;
237         try {
238                 // get nameservice reference
239                 log_debug("get_service_naming_context() 1\n");
240                 ns_obj          = _orb->resolve_initial_references(CONST_NAME_SERVICE_NAME);
241                 log_debug("get_service_naming_context() 2\n");
242                 // get name service context
243                 ns_context      = CosNaming::NamingContext::_narrow(ns_obj);
244                 log_debug("get_service_naming_context() 3\n");
245                 if (CORBA::is_nil(ns_context) == false) {
246                         context_data.length(1);
247                         context_data[0].id      = service_name_param;
248                         context_data[0].kind    = service_kind_param;
249                         log_debug("get_service_naming_context() 4\n");
250                         try {
251                                 service_obj     = ns_context->resolve(context_data);
252                         }
253                         catch(CosNaming::NamingContext::NotFound& ex) {
254                                 log_error("Failed to get context from name service for %s. Context does not yet exist, but will try to create.\n", service_name_param);
255                         }
256                         try {
257                                 log_debug("get_service_naming_context() 5\n");
258                                 if (CORBA::is_nil(service_obj)) {
259                                         // not found, try to bind the new context to name service
260                                         log_debug("get_service_naming_context() 6\n");
261                                         ret_val = ns_context->bind_new_context(context_data);
262                                         log_debug("get_service_naming_context() 7\n");
263                                         if (CORBA::is_nil(ret_val) ) {
264                                                 log_error("Failed to create new context to name service for %s.\n", service_name_param);
265                                         }
266                                 }
267                                 else {
268                                         log_debug("get_service_naming_context() 8\n");
269                                         ret_val = CosNaming::NamingContext::_narrow(service_obj);
270                                         log_debug("get_service_naming_context() 9\n");
271                                         if (CORBA::is_nil(ret_val) ) {
272                                                 log_error("Failed to get existing context from name service for %s, narrowing failed.\n", service_name_param);
273                                         }
274                                 }
275                         }
276                         catch(CosNaming::NamingContext::AlreadyBound& ex) {
277                                 log_error("Failed to get context from name service for %s. Context with same name already exist.\n", service_name_param);
278                         }
279                 }
280         }
281         catch (CORBA::ORB::InvalidName&) {
282                 // This should not happen!
283                 log_error("Failed to get context from name service for %s, name service does not exist.\n", service_name_param);
284         }
285         catch(CORBA::TRANSIENT& ex) {
286                 log_error("Failed to get context from name service for %s, verify that name service is running.\n", service_name_param);
287         }
288         catch (CORBA::NO_RESOURCES&) {
289                 log_error("Failed to get context from name service for %s, Name service is not running or has configuration problem.\n", service_name_param);
290                 log_error("Things to check:\n", service_name_param);
291                 log_error("If you have OMNIORB_CONFIG environment variable defined, verify that omniORB.cfg file exist in that location.\n", service_name_param);
292                 log_error("If you do not have OMNIORB_CONFIG environment variable defined, verify that you have /etc/omniORB.cfg file\n", service_name_param);
293                 log_error("Verify that InitRef line is defined in omniORB.cfg file.\n", service_name_param);
294         }
295         catch(CORBA::SystemException& ex) {
296                 log_error("Failed to get context from name service for %s, system error.\n", service_name_param);
297         }
298         log_debug("get_service_naming_context() done\n");
299         return ret_val;
300 }
301
302 bool OrbServerImpl::bind_naming_context_and_service(CosNaming::NamingContext_var service_context_param,
303                                         CORBA::Object_ptr service_ref_param,
304                                         const char *service_name_param,
305                                         const char *service_kind_param)
306 {
307         bool            retVal;
308         CosNaming::Name context_data;
309
310         retVal = false;
311         try {
312                 context_data.length(1);
313                 context_data[0].id   = service_name_param;
314                 context_data[0].kind = service_kind_param;
315                 try {
316                         service_context_param->bind(context_data, service_ref_param);
317                         retVal  = true;
318                 }
319                 catch(CosNaming::NamingContext::AlreadyBound& ex) {
320                         /**
321                          * service existed already for the naming context with similar description.
322                          * Replace the existing one with a new one.
323                          */
324                         cout << "service " << service_name_param << " existed, replacing it." << endl;
325                         service_context_param->rebind(context_data, service_ref_param);
326                         retVal  = true;
327                 }
328         }
329         catch (CosNaming::NamingContext::InvalidName&) {
330                 cerr << "Could not register service to name server, invalid service name." << endl;
331         }
332         catch (CosNaming::NamingContext::NotFound&) {
333                 cerr << "Could not register service to name server, service object reference is invalid." << endl;
334         }
335         catch (CosNaming::NamingContext::CannotProceed&) {
336                 // This should not happen!
337                 cerr << "Could not register service to name server, unknown error." << endl;
338         }
339         catch(CORBA::SystemException& ex) {
340                 cerr << "Could not register service to name server, unknown error." << endl;
341         }
342         return retVal;
343 }
344
345 int OrbServerImpl::add_server_listener(IServerListener *listener_param)
346 {
347         int     retVal;
348
349         retVal          = 0;
350         cout << "register_request_received_callback() started"  << endl;
351         _listener       = listener_param;
352         cout << "register_callback() done"  << endl;
353         return retVal;
354 }
355
356 int OrbServerImpl::is_shutdown_pending() {
357         return _shutdown_pending;
358 }
359
360 void OrbServerImpl::server_thread_closed() {
361         bool send_signal = false;
362
363         server_thread_mutex.lock();
364         _server_thread_count--;
365         if (_server_thread_count == 0) {
366                 send_signal     = true;
367         }
368         server_thread_mutex.unlock();
369         if (send_signal == true) {
370                 server_thread_shutdown_signal.signal();
371         }
372 }