4 * Created on: Aug 11, 2010
8 #include "ServerEventThread.hh"
9 #include "OrbServerImpl.hh"
10 #include "ClientServerCommon.hh"
11 #include "BusMessageInternal.hh"
14 using namespace plpbus;
15 using namespace plpbus_orb;
17 static omni_mutex server_thread_mutex;
18 static omni_condition server_thread_shutdown_signal(&server_thread_mutex);
20 OrbServerImpl::OrbServerImpl()
24 _server_thread_count = 0;
25 _shutdown_pending = 0;
29 OrbServerImpl::~OrbServerImpl()
31 cout << "OrbServerImpl destroyed." << endl;
34 void OrbServerImpl::send_message_and_request_response(OrbClient_ptr response_listener_param,
40 if (CORBA::is_nil(response_listener_param) == false) {
41 if (_listener != NULL) {
42 cout << "send_message_and_request_response(): " << msg_req << ", server_callback != NULL" << endl;
43 _listener->request_received(msg_req, &msg_rsp);
44 response_listener_param->receive_response_message(msg_rsp);
47 cout << "send_message_and_request_response() error! " << msg_req << ", server_callback == NULL" << endl;
51 cerr << "invalid callback object received.\n";
55 char *OrbServerImpl::send_message_and_wait_response(const char* msg_req_param, ::CORBA::Long& err_flg) {
61 if (_listener != NULL) {
62 cout << "send_message_and_wait_response(): " << msg_req_param << ", server_callback != NULL" << endl;
63 _listener->request_received(msg_req_param, &msg_rsp);
66 cout << "send_message_and_wait_response() error! " << msg_req_param << ", server_callback == NULL" << endl;
67 msg_rsp = strdup(msg_req_param);
70 ret_val = CORBA::string_dup(msg_rsp);
74 void OrbServerImpl::send_dataitem_message_and_request_response(OrbClient_ptr response_listener_param,
75 const ::DataItemSequence& msg_req_param) {
79 if (CORBA::is_nil(response_listener_param) == false) {
80 if (_listener != NULL) {
81 msg_req = new BusMessageInternal(msg_req_param);
82 cout << "send_dataitem_message_and_request_response(), server_callback != NULL" << endl;
83 cout << "msg_req:" << endl;
86 _listener->request_received(msg_req, &msg_rsp);
87 cout << "msg_rsp length:" << msg_rsp->_dataItemSeq.length() << endl;
89 response_listener_param->receive_response_dataitem_sequence(msg_rsp->_dataItemSeq);
92 cout << "send_dataitem_message_and_request_response() error, server_callback == NULL" << endl;
97 CORBA::Long OrbServerImpl::send_dataitem_message_and_wait_response(const DataItemSequence& req_seq_param,
98 DataItemSequence_out rsp_seq_param) {
102 msg_req = new BusMessageInternal(req_seq_param);
104 _listener->request_received(msg_req, &msg_rsp);
105 //rsp_seq_param = new DataItemSequence_out(msg_rsp._dataItemSeq);
106 rsp_seq_param._data = &(msg_rsp->_dataItemSeq);
111 void OrbServerImpl::add_event_listener(OrbClient_ptr client_param,
113 CORBA::UShort period_secs)
115 ServerEventThread *srvr_thread;
117 if (CORBA::is_nil(client_param) == false) {
118 cout << "add_event_listener()" << endl;
119 server_thread_mutex.lock();
120 _server_thread_count++;
121 server_thread_mutex.unlock();
122 srvr_thread = new ServerEventThread(client_param, msg, period_secs, this);
123 srvr_thread->start();
126 cerr << "Failed to add event listener, listener NULL.\n";
130 void OrbServerImpl::shutdown()
132 omni_mutex_lock sync(server_thread_mutex);
133 if (is_shutdown_pending() == 0) {
134 cout << "shutdown request received!" << endl;
135 // Tell the servers to exit, and wait for them to do so.
136 _shutdown_pending = 1;
137 while(_server_thread_count > 0) {
138 server_thread_shutdown_signal.wait();
140 // Shutdown the ORB (but do not wait for completion). This also
141 // causes the main thread to unblock from CORBA::ORB::run().
146 int OrbServerImpl::init() {
154 _orb = CORBA::ORB_init(argc, argv);
156 _poa = create_poa(_orb);
160 cout << "init() failed" << endl;
165 int OrbServerImpl::launch(const char *server_name)
167 CORBA::Object_var server_ref;
168 PortableServer::ObjectId_var server_id;
171 CosNaming::NamingContext_var naming_context;
174 POA_plpbus_orb::OrbServer_tie<OrbServerImpl> server_impl(this);
175 server_id = _poa->activate_object(&server_impl);
176 server_ref = server_impl._this();
177 //this->_remove_ref();
178 naming_context = get_service_naming_context(CONST_CONTEXT_NAME__PLPBUS, CONST_CONTEXT_KIND__PLPBUS);
179 if (naming_context != NULL) {
180 ok_flg = bind_naming_context_and_service(naming_context, server_ref, server_name, CONST_CONTEXT_KIND__PLPBUS);
181 if (ok_flg == true) {
182 cout << "Registered to naming service: " << server_name << endl;
187 cout << "Failed to register to naming service: " << server_name << endl;
193 PortableServer::POA_var OrbServerImpl::create_poa(CORBA::ORB_var orb) {
194 PortableServer::POA_var ret_val;
195 CORBA::Object_var poa_obj;
196 CORBA::PolicyList policy_list;
197 CORBA::Any policyVal;
198 PortableServer::POAManager_var poa_man;
199 PortableServer::POA_var rootpoa;
202 poa_obj = orb->resolve_initial_references(CONST_ROOT_POA_NAME);
203 if (poa_obj != NULL) {
204 rootpoa = PortableServer::POA::_narrow(poa_obj);
205 if (rootpoa != NULL) {
206 poa_man = rootpoa->the_POAManager();
207 if (poa_man != NULL) {
209 // bidirectional policy
210 policy_list.length(1);
211 policyVal <<= BiDirPolicy::BOTH;
212 policy_list[0] = orb->create_policy(BiDirPolicy::BIDIRECTIONAL_POLICY_TYPE, policyVal);
213 ret_val = rootpoa->create_POA(CONST_ROOT_POA_BIDIR_POLICY_NAME,
220 cerr << "Failed to create RootPOA." << endl;
225 CosNaming::NamingContext_var OrbServerImpl::get_service_naming_context(const char *service_name_param,
226 const char *service_kind_param)
228 CosNaming::NamingContext_var ret_val;
229 CosNaming::NamingContext_var ns_context;
230 CORBA::Object_var ns_obj;
231 CORBA::Object_var service_obj;
232 CosNaming::Name context_data;
236 // get nameservice reference
237 ns_obj = _orb->resolve_initial_references(CONST_NAME_SERVICE_NAME);
238 // get nameservice context
239 ns_context = CosNaming::NamingContext::_narrow(ns_obj);
240 if (CORBA::is_nil(ns_context) == false) {
241 context_data.length(1);
242 context_data[0].id = service_name_param;
243 context_data[0].kind = service_kind_param;
245 service_obj = ns_context->resolve(context_data);
246 if (CORBA::is_nil(service_obj)) {
247 // not found, try to bind the new context to name service
248 ret_val = ns_context->bind_new_context(context_data);
249 if (CORBA::is_nil(ret_val) ) {
250 cerr << "Failed to create new context to name service for " << service_name_param << "." << endl;
254 ret_val = CosNaming::NamingContext::_narrow(service_obj);
255 if (CORBA::is_nil(ret_val) ) {
256 cerr << "Failed to get existing context from name service for " << service_name_param << ", narrowing failed." << endl;
260 catch(CosNaming::NamingContext::AlreadyBound& ex) {
261 cerr << "Could not get context from nameservice for " << service_name_param << ". Context with same name already existed."<< endl;
265 catch (CORBA::ORB::InvalidName&) {
266 // This should not happen!
267 cerr << "Could not get context from name service for " << service_name_param << ", name service does not exist." << endl;
269 catch(CORBA::TRANSIENT& ex) {
270 cerr << "Could not get context from name service for " << service_name_param << ", verify that name service is running. " << service_name_param << endl;
272 catch (CORBA::NO_RESOURCES&) {
273 cerr << "Could not get context from name service for " << service_name_param << ". Name service is not running or has configuration problem." << endl;
275 catch(CORBA::SystemException& ex) {
276 cerr << "Could not get context from name service for " << service_name_param << ", could not determine reason." << endl;
281 bool OrbServerImpl::bind_naming_context_and_service(CosNaming::NamingContext_var service_context_param,
282 CORBA::Object_ptr service_ref_param,
283 const char *service_name_param,
284 const char *service_kind_param)
287 CosNaming::Name context_data;
291 context_data.length(1);
292 context_data[0].id = service_name_param;
293 context_data[0].kind = service_kind_param;
295 service_context_param->bind(context_data, service_ref_param);
298 catch(CosNaming::NamingContext::AlreadyBound& ex) {
300 * service existed already for the naming context with similar description.
301 * Replace the existing one with a new one.
303 cout << "service " << service_name_param << " existed, replacing it." << endl;
304 service_context_param->rebind(context_data, service_ref_param);
308 catch (CosNaming::NamingContext::InvalidName&) {
309 cerr << "Could not register service to name server, invalid service name." << endl;
311 catch (CosNaming::NamingContext::NotFound&) {
312 cerr << "Could not register service to name server, service object reference is invalid." << endl;
314 catch (CosNaming::NamingContext::CannotProceed&) {
315 // This should not happen!
316 cerr << "Could not register service to name server, unknown error." << endl;
318 catch(CORBA::SystemException& ex) {
319 cerr << "Could not register service to name server, unknown error." << endl;
324 int OrbServerImpl::add_server_listener(IServerListener *listener_param)
329 cout << "register_request_received_callback() started" << endl;
330 _listener = listener_param;
331 cout << "register_callback() done" << endl;
335 int OrbServerImpl::is_shutdown_pending() {
336 return _shutdown_pending;
339 void OrbServerImpl::server_thread_closed() {
340 bool send_signal = false;
342 server_thread_mutex.lock();
343 _server_thread_count--;
344 if (_server_thread_count == 0) {
347 server_thread_mutex.unlock();
348 if (send_signal == true) {
349 server_thread_shutdown_signal.signal();