]> pilppa.org Git - libplpbus.git/blob - src/plpbus/OrbServerImpl.cc
initial version
[libplpbus.git] / src / plpbus / OrbServerImpl.cc
1 /*
2  * OrbServerImpl.cc
3  *
4  *  Created on: Aug 11, 2010
5  *      Author: lamikr
6  */
7
8 #include "ServerEventThread.hh"
9 #include "OrbServerImpl.hh"
10 #include "ClientServerCommon.hh"
11 #include "BusMessageInternal.hh"
12
13 using namespace std;
14 using namespace plpbus;
15 using namespace plpbus_orb;
16
17 static omni_mutex       server_thread_mutex;
18 static omni_condition   server_thread_shutdown_signal(&server_thread_mutex);
19
20 OrbServerImpl::OrbServerImpl()
21 {
22         _orb                    = NULL;
23         _poa                    = NULL;
24         _server_thread_count    = 0;
25         _shutdown_pending       = 0;
26         _listener               = NULL;
27 }
28
29 OrbServerImpl::~OrbServerImpl()
30 {
31         cout << "OrbServerImpl destroyed." << endl;
32 }
33
34 void OrbServerImpl::send_message_and_request_response(OrbClient_ptr response_listener_param,
35                                                 const char* msg_req)
36 {
37         char    *msg_rsp;
38
39         msg_rsp = NULL;
40         if (CORBA::is_nil(response_listener_param) == false) {
41                 if (_listener != NULL) {
42                         cout << "send_message_and_request_response(): " << msg_req << ", server_callback != NULL" << endl;
43                         _listener->request_received(msg_req, &msg_rsp);
44                         response_listener_param->receive_response_message(msg_rsp);
45                 }
46                 else {
47                         cout << "send_message_and_request_response() error! " << msg_req << ", server_callback == NULL" << endl;
48                 }
49         }
50         else {
51                 cerr << "invalid callback object received.\n";
52         }
53 }
54
55 char *OrbServerImpl::send_message_and_wait_response(const char* msg_req_param, ::CORBA::Long& err_flg) {
56         char    *msg_rsp;
57         char    *ret_val;
58
59         err_flg = 0;
60         msg_rsp = NULL;
61         if (_listener != NULL) {
62                 cout << "send_message_and_wait_response(): " << msg_req_param << ", server_callback != NULL" << endl;
63                 _listener->request_received(msg_req_param, &msg_rsp);
64         }
65         else {
66                 cout << "send_message_and_wait_response() error! " << msg_req_param << ", server_callback == NULL" << endl;
67                 msg_rsp = strdup(msg_req_param);
68                 err_flg = -1;
69         }
70         ret_val = CORBA::string_dup(msg_rsp);
71         return ret_val;
72 }
73
74 void OrbServerImpl::send_dataitem_message_and_request_response(OrbClient_ptr response_listener_param,
75                                                         const ::DataItemSequence& msg_req_param) {
76         BusMessage      *msg_req;
77         BusMessage      *msg_rsp;
78
79         if (CORBA::is_nil(response_listener_param) == false) {
80                 if (_listener != NULL) {
81                         msg_req = new BusMessageInternal(msg_req_param);
82                         cout << "send_dataitem_message_and_request_response(), server_callback != NULL" << endl;
83                         cout << "msg_req:" << endl;
84                         msg_req->printout();
85                         msg_rsp = NULL;
86                         _listener->request_received(msg_req, &msg_rsp);
87                         cout << "msg_rsp length:" << msg_rsp->_dataItemSeq.length() << endl;
88                         msg_rsp->printout();
89                         response_listener_param->receive_response_dataitem_sequence(msg_rsp->_dataItemSeq);
90                 }
91                 else {
92                         cout << "send_dataitem_message_and_request_response() error, server_callback == NULL" << endl;
93                 }
94         }
95 }
96
97 CORBA::Long OrbServerImpl::send_dataitem_message_and_wait_response(const DataItemSequence& req_seq_param,
98                 DataItemSequence_out rsp_seq_param) {
99         BusMessage      *msg_req;
100         BusMessage      *msg_rsp;
101
102         msg_req = new BusMessageInternal(req_seq_param);
103         msg_rsp = NULL;
104         _listener->request_received(msg_req, &msg_rsp);
105         //rsp_seq_param = new DataItemSequence_out(msg_rsp._dataItemSeq);
106         rsp_seq_param._data     = &(msg_rsp->_dataItemSeq);
107
108         return 0;
109 }
110
111 void OrbServerImpl::add_event_listener(OrbClient_ptr client_param,
112                         const char *msg,
113                         CORBA::UShort period_secs)
114 {
115         ServerEventThread       *srvr_thread;
116
117         if (CORBA::is_nil(client_param) == false) {
118                 cout << "add_event_listener()" << endl;
119                 server_thread_mutex.lock();
120                 _server_thread_count++;
121                 server_thread_mutex.unlock();
122                 srvr_thread     = new ServerEventThread(client_param, msg, period_secs, this);
123                 srvr_thread->start();
124         }
125         else {
126                 cerr << "Failed to add event listener, listener NULL.\n";
127         }
128 }
129
130 void OrbServerImpl::shutdown()
131 {
132         omni_mutex_lock sync(server_thread_mutex);
133         if (is_shutdown_pending() == 0) {
134                 cout << "shutdown request received!" << endl;
135                 // Tell the servers to exit, and wait for them to do so.
136                 _shutdown_pending       = 1;
137                 while(_server_thread_count > 0) {
138                         server_thread_shutdown_signal.wait();
139                 }
140                 // Shutdown the ORB (but do not wait for completion).  This also
141                 // causes the main thread to unblock from CORBA::ORB::run().
142                 _orb->shutdown(0);
143         }
144 }
145
146 int OrbServerImpl::init() {
147         int     argc;
148         char    **argv;
149         int     retVal;
150
151         argc    = -1;
152         argv    = NULL;
153         retVal  = 0;
154         _orb    = CORBA::ORB_init(argc, argv);
155         if (_orb != NULL) {
156                 _poa    = create_poa(_orb);
157                 retVal  = 0;
158         }
159         else {
160                 cout << "init() failed"  << endl;
161         }
162         return retVal;
163 }
164
165 int OrbServerImpl::launch(const char *server_name)
166 {
167         CORBA::Object_var               server_ref;
168         PortableServer::ObjectId_var    server_id;
169         bool                            ok_flg;
170         int                             ret_val;
171         CosNaming::NamingContext_var    naming_context;
172
173         ret_val         = -1;
174         POA_plpbus_orb::OrbServer_tie<OrbServerImpl> server_impl(this);
175         server_id       = _poa->activate_object(&server_impl);
176         server_ref      = server_impl._this();
177         //this->_remove_ref();
178         naming_context  = get_service_naming_context(CONST_CONTEXT_NAME__PLPBUS, CONST_CONTEXT_KIND__PLPBUS);
179         if (naming_context != NULL) {
180                 ok_flg  = bind_naming_context_and_service(naming_context, server_ref, server_name, CONST_CONTEXT_KIND__PLPBUS);
181                 if (ok_flg == true) {
182                         cout << "Registered to naming service: " << server_name << endl;
183                         _orb->run();
184                         ret_val = 0;
185                 }
186                 else {
187                         cout << "Failed to register to naming service: " << server_name << endl;
188                 }
189         }
190         return ret_val;
191 }
192
193 PortableServer::POA_var OrbServerImpl::create_poa(CORBA::ORB_var orb) {
194         PortableServer::POA_var         ret_val;
195         CORBA::Object_var               poa_obj;
196         CORBA::PolicyList               policy_list;
197         CORBA::Any                      policyVal;
198         PortableServer::POAManager_var  poa_man;
199         PortableServer::POA_var         rootpoa;
200
201         ret_val = NULL;
202         poa_obj = orb->resolve_initial_references(CONST_ROOT_POA_NAME);
203         if (poa_obj != NULL) {
204                 rootpoa = PortableServer::POA::_narrow(poa_obj);
205                 if (rootpoa != NULL) {
206                         poa_man = rootpoa->the_POAManager();
207                         if (poa_man != NULL) {
208                                 poa_man->activate();
209                                 // bidirectional policy
210                                 policy_list.length(1);
211                                 policyVal       <<= BiDirPolicy::BOTH;
212                                 policy_list[0] = orb->create_policy(BiDirPolicy::BIDIRECTIONAL_POLICY_TYPE, policyVal);
213                                 ret_val = rootpoa->create_POA(CONST_ROOT_POA_BIDIR_POLICY_NAME,
214                                                         poa_man,
215                                                         policy_list);
216                         }
217                 }
218         }
219         else {
220                 cerr << "Failed to create RootPOA." << endl;
221         }
222         return ret_val;
223 }
224
225 CosNaming::NamingContext_var OrbServerImpl::get_service_naming_context(const char *service_name_param,
226                                                                 const char *service_kind_param)
227 {
228         CosNaming::NamingContext_var    ret_val;
229         CosNaming::NamingContext_var    ns_context;
230         CORBA::Object_var               ns_obj;
231         CORBA::Object_var               service_obj;
232         CosNaming::Name                 context_data;
233
234         ret_val = NULL;
235         try {
236                 // get nameservice reference
237                 ns_obj          = _orb->resolve_initial_references(CONST_NAME_SERVICE_NAME);
238                 // get nameservice context
239                 ns_context      = CosNaming::NamingContext::_narrow(ns_obj);
240                 if (CORBA::is_nil(ns_context) == false) {
241                         context_data.length(1);
242                         context_data[0].id   = service_name_param;
243                         context_data[0].kind = service_kind_param;
244                         try {
245                                 service_obj     = ns_context->resolve(context_data);
246                                 if (CORBA::is_nil(service_obj)) {
247                                         // not found, try to bind the new context to name service
248                                         ret_val = ns_context->bind_new_context(context_data);
249                                         if (CORBA::is_nil(ret_val) ) {
250                                                 cerr << "Failed to create new context to name service for " << service_name_param << "." << endl;
251                                         }
252                                 }
253                                 else {
254                                         ret_val = CosNaming::NamingContext::_narrow(service_obj);
255                                         if (CORBA::is_nil(ret_val) ) {
256                                                 cerr << "Failed to get existing context from name service for " << service_name_param << ", narrowing failed." << endl;
257                                         }
258                                 }
259                         }
260                         catch(CosNaming::NamingContext::AlreadyBound& ex) {
261                                 cerr << "Could not get context from nameservice for " << service_name_param << ". Context with same name already existed."<< endl;
262                         }
263                 }
264         }
265         catch (CORBA::ORB::InvalidName&) {
266                 // This should not happen!
267                 cerr << "Could not get context from name service for " << service_name_param << ", name service does not exist." << endl;
268         }
269         catch(CORBA::TRANSIENT& ex) {
270                 cerr << "Could not get context from name service for " << service_name_param << ",  verify that name service is running. " << service_name_param << endl;
271         }
272         catch (CORBA::NO_RESOURCES&) {
273                 cerr << "Could not get context from name service for " << service_name_param << ". Name service is not running or has configuration problem." << endl;
274         }
275         catch(CORBA::SystemException& ex) {
276                 cerr << "Could not get context from name service for " << service_name_param << ", could not determine reason." << endl;
277         }
278         return ret_val;
279 }
280
281 bool OrbServerImpl::bind_naming_context_and_service(CosNaming::NamingContext_var service_context_param,
282                                         CORBA::Object_ptr service_ref_param,
283                                         const char *service_name_param,
284                                         const char *service_kind_param)
285 {
286         bool            retVal;
287         CosNaming::Name context_data;
288
289         retVal = false;
290         try {
291                 context_data.length(1);
292                 context_data[0].id   = service_name_param;
293                 context_data[0].kind = service_kind_param;
294                 try {
295                         service_context_param->bind(context_data, service_ref_param);
296                         retVal  = true;
297                 }
298                 catch(CosNaming::NamingContext::AlreadyBound& ex) {
299                         /**
300                          * service existed already for the naming context with similar description.
301                          * Replace the existing one with a new one.
302                          */
303                         cout << "service " << service_name_param << " existed, replacing it." << endl;
304                         service_context_param->rebind(context_data, service_ref_param);
305                         retVal  = true;
306                 }
307         }
308         catch (CosNaming::NamingContext::InvalidName&) {
309                 cerr << "Could not register service to name server, invalid service name." << endl;
310         }
311         catch (CosNaming::NamingContext::NotFound&) {
312                 cerr << "Could not register service to name server, service object reference is invalid." << endl;
313         }
314         catch (CosNaming::NamingContext::CannotProceed&) {
315                 // This should not happen!
316                 cerr << "Could not register service to name server, unknown error." << endl;
317         }
318         catch(CORBA::SystemException& ex) {
319                 cerr << "Could not register service to name server, unknown error." << endl;
320         }
321         return retVal;
322 }
323
324 int OrbServerImpl::add_server_listener(IServerListener *listener_param)
325 {
326         int     retVal;
327
328         retVal          = 0;
329         cout << "register_request_received_callback() started"  << endl;
330         _listener       = listener_param;
331         cout << "register_callback() done"  << endl;
332         return retVal;
333 }
334
335 int OrbServerImpl::is_shutdown_pending() {
336         return _shutdown_pending;
337 }
338
339 void OrbServerImpl::server_thread_closed() {
340         bool send_signal = false;
341
342         server_thread_mutex.lock();
343         _server_thread_count--;
344         if (_server_thread_count == 0) {
345                 send_signal     = true;
346         }
347         server_thread_mutex.unlock();
348         if (send_signal == true) {
349                 server_thread_shutdown_signal.signal();
350         }
351 }