]> pilppa.org Git - libplpbus.git/blob - src/plpbus/OrbServerImpl.cc
cleaned up public headers available for bus client and server
[libplpbus.git] / src / plpbus / OrbServerImpl.cc
1 /*
2  * OrbServerImpl.cc
3  *
4  *  Created on: Aug 11, 2010
5  *      Author: lamikr
6  */
7
8 #include <plp/log.h>
9
10 #include "ServerEventThread.hh"
11 #include "OrbServerImpl.hh"
12 #include "ClientServerCommon.hh"
13 #include "BusMessageInternal.hh"
14
15 using namespace std;
16 using namespace plpbus;
17 using namespace plpbus_orb;
18
19 static omni_mutex       server_thread_mutex;
20 static omni_condition   server_thread_shutdown_signal(&server_thread_mutex);
21
22 OrbServerImpl::OrbServerImpl()
23 {
24         _orb                    = NULL;
25         _poa                    = NULL;
26         _server_thread_count    = 0;
27         _shutdown_pending       = 0;
28         _listener               = NULL;
29 }
30
31 OrbServerImpl::~OrbServerImpl()
32 {
33         log_info("OrbServerImpl destroyed.\n");
34 }
35
36 void OrbServerImpl::send_message_and_request_response(OrbClient_ptr response_listener_param,
37                                                 const char* msg_req)
38 {
39         char    *msg_rsp;
40
41         msg_rsp = NULL;
42         if (CORBA::is_nil(response_listener_param) == false) {
43                 if (_listener != NULL) {
44                         log_debug("send_message_and_request_response(): %s, server callback != NULL\n", msg_req);
45                         _listener->request_received(msg_req, &msg_rsp);
46                         log_debug("send_message_and_request_response(), request_received\n");
47                         response_listener_param->receive_response_message(msg_rsp);
48                 }
49                 else {
50                         log_error("send_message_and_request_response() error, server callback == NULL\n");
51                 }
52         }
53         else {
54                 log_error("invalid callback object received.\n");
55         }
56 }
57
58 char *OrbServerImpl::send_message_and_wait_response(const char* msg_req_param, ::CORBA::Long& err_flg) {
59         char    *msg_rsp;
60         char    *ret_val;
61
62         err_flg = 0;
63         msg_rsp = NULL;
64         if (_listener != NULL) {
65                 log_debug("send_message_and_wait_response(): %s, server_callback != NULL\n", msg_req_param);
66                 _listener->request_received(msg_req_param, &msg_rsp);
67         }
68         else {
69                 log_error("send_message_and_wait_response() error, server callback == NULL\n");
70                 msg_rsp = strdup(msg_req_param);
71                 err_flg = -1;
72         }
73         ret_val = CORBA::string_dup(msg_rsp);
74         return ret_val;
75 }
76
77 void OrbServerImpl::send_dataitem_message_and_request_response(OrbClient_ptr response_listener_param,
78                                                         const ::DataItemSequence& msg_req_param) {
79         BusMessage              *msg_req;
80         BusMessage              *msg_rsp;
81         DataItemSequence        *seq;
82
83         if (CORBA::is_nil(response_listener_param) == false) {
84                 if (_listener != NULL) {
85                         msg_req = new BusMessageInternal(msg_req_param);
86                         log_debug("send_dataitem_message_and_request_response(), server_callback != NULL\n");
87                         log_debug("msg_req:\n");
88                         msg_req->printout();
89                         msg_rsp = NULL;
90                         _listener->request_received(msg_req, &msg_rsp);
91                         seq     = (DataItemSequence *)msg_rsp->_dataItemSeq;
92                         log_debug("msg_rsp length: %ld\n", seq->length());
93                         msg_rsp->printout();
94                         response_listener_param->receive_response_dataitem_sequence(*seq);
95                 }
96                 else {
97                         log_error("send_dataitem_message_and_request_response() error, server_callback == NULL\n");
98                 }
99         }
100 }
101
102 CORBA::Long OrbServerImpl::send_dataitem_message_and_wait_response(const DataItemSequence& req_seq_param,
103                                                                 DataItemSequence_out rsp_seq_param) {
104         BusMessage              *msg_req;
105         BusMessage              *msg_rsp;
106         DataItemSequence        *seq;
107
108         msg_req = new BusMessageInternal(req_seq_param);
109         msg_rsp = NULL;
110         _listener->request_received(msg_req, &msg_rsp);
111         //rsp_seq_param = new DataItemSequence_out(msg_rsp._dataItemSeq);
112         seq                     = (DataItemSequence *)msg_rsp->_dataItemSeq;
113         rsp_seq_param._data     = seq;
114
115         return 0;
116 }
117
118 void OrbServerImpl::add_event_listener(OrbClient_ptr client_param,
119                         const char *msg,
120                         CORBA::UShort period_secs)
121 {
122         ServerEventThread       *srvr_thread;
123
124         if (CORBA::is_nil(client_param) == false) {
125                 log_debug("add_event_listener()\n");
126                 server_thread_mutex.lock();
127                 _server_thread_count++;
128                 server_thread_mutex.unlock();
129                 srvr_thread     = new ServerEventThread(client_param, msg, period_secs, this);
130                 srvr_thread->start();
131         }
132         else {
133                 cerr << "Failed to add event listener, listener NULL.\n";
134         }
135 }
136
137 void OrbServerImpl::shutdown()
138 {
139         omni_mutex_lock sync(server_thread_mutex);
140         if (is_shutdown_pending() == 0) {
141                 cout << "shutdown request received!" << endl;
142                 // Tell the servers to exit, and wait for them to do so.
143                 _shutdown_pending       = 1;
144                 while(_server_thread_count > 0) {
145                         server_thread_shutdown_signal.wait();
146                 }
147                 // Shutdown the ORB (but do not wait for completion).  This also
148                 // causes the main thread to unblock from CORBA::ORB::run().
149                 _orb->shutdown(0);
150         }
151 }
152
153 int OrbServerImpl::init() {
154         int     argc;
155         char    **argv;
156         int     retVal;
157
158         argc    = -1;
159         argv    = NULL;
160         retVal  = 0;
161         _orb    = CORBA::ORB_init(argc, argv);
162         if (_orb != NULL) {
163                 _poa    = create_poa(_orb);
164                 retVal  = 0;
165         }
166         else {
167                 cout << "init() failed"  << endl;
168         }
169         return retVal;
170 }
171
172 int OrbServerImpl::launch(const char *server_name)
173 {
174         CORBA::Object_var               server_ref;
175         PortableServer::ObjectId_var    server_id;
176         bool                            ok_flg;
177         int                             ret_val;
178         CosNaming::NamingContext_var    naming_context;
179
180         ret_val         = -1;
181         POA_plpbus_orb::OrbServer_tie<OrbServerImpl> server_impl(this);
182         server_id       = _poa->activate_object(&server_impl);
183         server_ref      = server_impl._this();
184         //this->_remove_ref();
185         naming_context  = get_service_naming_context(CONST_CONTEXT_NAME__PLPBUS, CONST_CONTEXT_KIND__PLPBUS);
186         if (naming_context != NULL) {
187                 ok_flg  = bind_naming_context_and_service(naming_context,
188                                                         server_ref,
189                                                         server_name,
190                                                         CONST_CONTEXT_KIND__PLPBUS);
191                 if (ok_flg == true) {
192                         cout << "Registered to naming service: " << server_name << endl;
193                         _orb->run();
194                         ret_val = 0;
195                 }
196                 else {
197                         cout << "Failed to register to naming service: " << server_name << endl;
198                 }
199         }
200         return ret_val;
201 }
202
203 PortableServer::POA_var OrbServerImpl::create_poa(CORBA::ORB_var orb) {
204         PortableServer::POA_var         ret_val;
205         CORBA::Object_var               poa_obj;
206         CORBA::PolicyList               policy_list;
207         CORBA::Any                      policyVal;
208         PortableServer::POAManager_var  poa_man;
209         PortableServer::POA_var         rootpoa;
210
211         ret_val = NULL;
212         poa_obj = orb->resolve_initial_references(CONST_ROOT_POA_NAME);
213         if (poa_obj != NULL) {
214                 rootpoa = PortableServer::POA::_narrow(poa_obj);
215                 if (rootpoa != NULL) {
216                         poa_man = rootpoa->the_POAManager();
217                         if (poa_man != NULL) {
218                                 poa_man->activate();
219                                 // bidirectional policy
220                                 policy_list.length(1);
221                                 policyVal       <<= BiDirPolicy::BOTH;
222                                 policy_list[0] = orb->create_policy(BiDirPolicy::BIDIRECTIONAL_POLICY_TYPE, policyVal);
223                                 ret_val = rootpoa->create_POA(CONST_ROOT_POA_BIDIR_POLICY_NAME,
224                                                         poa_man,
225                                                         policy_list);
226                         }
227                 }
228         }
229         else {
230                 cerr << "Failed to create RootPOA." << endl;
231         }
232         return ret_val;
233 }
234
235 CosNaming::NamingContext_var OrbServerImpl::get_service_naming_context(const char *service_name_param,
236                                                                 const char *service_kind_param)
237 {
238         CosNaming::NamingContext_var    ret_val;
239         CosNaming::NamingContext_var    ns_context;
240         CORBA::Object_var               ns_obj;
241         CORBA::Object_var               service_obj;
242         CosNaming::Name                 context_data;
243
244         ret_val = NULL;
245         try {
246                 // get nameservice reference
247                 log_debug("get_service_naming_context() 1\n");
248                 ns_obj          = _orb->resolve_initial_references(CONST_NAME_SERVICE_NAME);
249                 log_debug("get_service_naming_context() 2\n");
250                 // get name service context
251                 ns_context      = CosNaming::NamingContext::_narrow(ns_obj);
252                 log_debug("get_service_naming_context() 3\n");
253                 if (CORBA::is_nil(ns_context) == false) {
254                         context_data.length(1);
255                         context_data[0].id      = service_name_param;
256                         context_data[0].kind    = service_kind_param;
257                         log_debug("get_service_naming_context() 4\n");
258                         try {
259                                 service_obj     = ns_context->resolve(context_data);
260                         }
261                         catch(CosNaming::NamingContext::NotFound& ex) {
262                                 log_error("Failed to get context from name service for %s. Context does not yet exist, but will try to create.\n", service_name_param);
263                         }
264                         try {
265                                 log_debug("get_service_naming_context() 5\n");
266                                 if (CORBA::is_nil(service_obj)) {
267                                         // not found, try to bind the new context to name service
268                                         log_debug("get_service_naming_context() 6\n");
269                                         ret_val = ns_context->bind_new_context(context_data);
270                                         log_debug("get_service_naming_context() 7\n");
271                                         if (CORBA::is_nil(ret_val) ) {
272                                                 log_error("Failed to create new context to name service for %s.\n", service_name_param);
273                                         }
274                                 }
275                                 else {
276                                         log_debug("get_service_naming_context() 8\n");
277                                         ret_val = CosNaming::NamingContext::_narrow(service_obj);
278                                         log_debug("get_service_naming_context() 9\n");
279                                         if (CORBA::is_nil(ret_val) ) {
280                                                 log_error("Failed to get existing context from name service for %s, narrowing failed.\n", service_name_param);
281                                         }
282                                 }
283                         }
284                         catch(CosNaming::NamingContext::AlreadyBound& ex) {
285                                 log_error("Failed to get context from name service for %s. Context with same name already exist.\n", service_name_param);
286                         }
287                 }
288         }
289         catch (CORBA::ORB::InvalidName&) {
290                 // This should not happen!
291                 log_error("Failed to get context from name service for %s, name service does not exist.\n", service_name_param);
292         }
293         catch(CORBA::TRANSIENT& ex) {
294                 log_error("Failed to get context from name service for %s, verify that name service is running.\n", service_name_param);
295         }
296         catch (CORBA::NO_RESOURCES&) {
297                 log_error("Failed to get context from name service for %s, Name service is not running or has configuration problem.\n", service_name_param);
298                 log_error("Check-list:\n");
299                 log_error("If you have OMNIORB_CONFIG environment variable defined, verify that omniORB.cfg file exist in that location.\n");
300                 log_error("If you do not have OMNIORB_CONFIG environment variable defined, verify that you have /etc/omniORB.cfg file\n");
301                 log_error("Verify that InitRef line is defined in omniORB.cfg file.\n");
302         }
303         catch(CORBA::SystemException& ex) {
304                 log_error("Failed to get context from name service for %s, system error.\n", service_name_param);
305         }
306         log_debug("get_service_naming_context() done\n");
307         return ret_val;
308 }
309
310 bool OrbServerImpl::bind_naming_context_and_service(CosNaming::NamingContext_var service_context_param,
311                                         CORBA::Object_ptr service_ref_param,
312                                         const char *service_name_param,
313                                         const char *service_kind_param)
314 {
315         bool            retVal;
316         CosNaming::Name context_data;
317
318         retVal = false;
319         try {
320                 context_data.length(1);
321                 context_data[0].id   = service_name_param;
322                 context_data[0].kind = service_kind_param;
323                 try {
324                         service_context_param->bind(context_data, service_ref_param);
325                         retVal  = true;
326                 }
327                 catch(CosNaming::NamingContext::AlreadyBound& ex) {
328                         /**
329                          * service existed already for the naming context with similar description.
330                          * Replace the existing one with a new one.
331                          */
332                         cout << "service " << service_name_param << " existed, replacing it." << endl;
333                         service_context_param->rebind(context_data, service_ref_param);
334                         retVal  = true;
335                 }
336         }
337         catch (CosNaming::NamingContext::InvalidName&) {
338                 cerr << "Could not register service to name server, invalid service name." << endl;
339         }
340         catch (CosNaming::NamingContext::NotFound&) {
341                 cerr << "Could not register service to name server, service object reference is invalid." << endl;
342         }
343         catch (CosNaming::NamingContext::CannotProceed&) {
344                 // This should not happen!
345                 cerr << "Could not register service to name server, unknown error." << endl;
346         }
347         catch(CORBA::SystemException& ex) {
348                 cerr << "Could not register service to name server, unknown error." << endl;
349         }
350         return retVal;
351 }
352
353 int OrbServerImpl::add_server_listener(IServerListener *listener_param)
354 {
355         int     retVal;
356
357         retVal          = 0;
358         cout << "register_request_received_callback() started"  << endl;
359         _listener       = listener_param;
360         cout << "register_callback() done"  << endl;
361         return retVal;
362 }
363
364 int OrbServerImpl::is_shutdown_pending() {
365         return _shutdown_pending;
366 }
367
368 void OrbServerImpl::server_thread_closed() {
369         bool send_signal = false;
370
371         server_thread_mutex.lock();
372         _server_thread_count--;
373         if (_server_thread_count == 0) {
374                 send_signal     = true;
375         }
376         server_thread_mutex.unlock();
377         if (send_signal == true) {
378                 server_thread_shutdown_signal.signal();
379         }
380 }