HTTPSClientPool.cpp 28 KB


  1. /*
  2. EQ2Emu: Everquest II Server Emulator
  3. Copyright (C) 2007-2025 EQ2Emu Development Team (https://www.eq2emu.com)
  4. This file is part of EQ2Emu.
  5. EQ2Emu is free software: you can redistribute it and/or modify
  6. it under the terms of the GNU General Public License as published by
  7. the Free Software Foundation, either version 3 of the License, or
  8. (at your option) any later version.
  9. EQ2Emu is distributed in the hope that it will be useful,
  10. but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. GNU General Public License for more details.
  13. You should have received a copy of the GNU General Public License
  14. along with EQ2Emu. If not, see <http://www.gnu.org/licenses/>.
  15. */
  16. #include "HTTPSClientPool.h"
  17. #include "PeerManager.h"
  18. #include "../net.h"
  19. #include "../World.h"
  20. #include "../LoginServer.h"
  21. #include "../WorldDatabase.h"
  22. #include "../Guilds/Guild.h"
  23. #include "../../common/Log.h"
  24. #include <boost/property_tree/ptree.hpp>
  25. #include <boost/property_tree/json_parser.hpp>
  26. #include <regex>
  27. #include <iostream>
  28. #include <sstream>
  29. extern NetConnection net;
  30. extern PeerManager peer_manager;
  31. extern World world;
  32. extern LoginServer loginserver;
  33. extern ZoneList zone_list;
  34. extern WorldDatabase database;
  35. extern GuildList guild_list;
  36. extern HTTPSClientPool peer_https_pool;
  37. // Handler functions for each endpoint
  38. void AddCharAuth(boost::property_tree::ptree tree) {
  39. int32 success = 0;
  40. std::string charName(""), zoneName("");
  41. int32 acct_id = 0;
  42. int32 zoneId = 0, instanceId = 0;
  43. bool firstLogin = false;
  44. std::string worldAddr(""), internalWorldAddr(""), clientIP("");
  45. int16 worldPort = 0;
  46. int32 charID = 0;
  47. int32 worldID = 0, fromID = 0;
  48. int32 loginKey = 0;
  49. if (auto char_name = tree.get_optional<std::string>("character_name")) {
  50. charName = char_name.get();
  51. }
  52. if (auto successful = tree.get_optional<int32>("success")) {
  53. success = successful.get();
  54. }
  55. if (auto account_id = tree.get_optional<int32>("account_id")) {
  56. acct_id = account_id.get();
  57. }
  58. if (auto zone_name = tree.get_optional<std::string>("zone_name")) {
  59. zoneName = zone_name.get();
  60. }
  61. if (auto zone_id = tree.get_optional<int32>("zone_id")) {
  62. zoneId = zone_id.get();
  63. }
  64. if (auto instance_id = tree.get_optional<int32>("instance_id")) {
  65. instanceId = instance_id.get();
  66. }
  67. if (auto first_login = tree.get_optional<bool>("first_login")) {
  68. firstLogin = first_login.get();
  69. }
  70. if (auto peerclientaddr = tree.get_optional<std::string>("peer_client_address")) {
  71. worldAddr = peerclientaddr.get();
  72. }
  73. if (auto peerclient_internaladdr = tree.get_optional<std::string>("peer_client_internal_address")) {
  74. internalWorldAddr = peerclient_internaladdr.get();
  75. }
  76. if (auto peerclientport = tree.get_optional<int16>("peer_client_port")) {
  77. worldPort = peerclientport.get();
  78. }
  79. if (auto clientip = tree.get_optional<std::string>("client_ip")) {
  80. clientIP = clientip.get();
  81. }
  82. if (auto character_id = tree.get_optional<int32>("character_id")) {
  83. charID = character_id.get();
  84. }
  85. if (auto login_key = tree.get_optional<int32>("login_key")) {
  86. loginKey = login_key.get();
  87. }
  88. if (auto world_id = tree.get_optional<int32>("world_id")) {
  89. worldID = world_id.get();
  90. }
  91. if (auto from_id = tree.get_optional<int32>("from_id")) {
  92. fromID = from_id.get();
  93. }
  94. LogWrite(PEERING__INFO, 0, "Peering", "%s: Handling AddCharAuth %s (%u) for zone %u instance %u", __FUNCTION__, charName.c_str(), charID, zoneId, instanceId);
  95. if (firstLogin) {
  96. loginserver.SendCharApprovedLogin(success, worldAddr, internalWorldAddr, clientIP, worldPort, acct_id, charID, loginKey, worldID, fromID);
  97. }
  98. else if (acct_id > 0 && charName.length() > 0) {
  99. world.ClientAuthApproval(success, charName, acct_id, zoneName, zoneId, instanceId, firstLogin);
  100. }
  101. }
  102. void StartZone(boost::property_tree::ptree tree) {
  103. std::string peerWebAddress("");
  104. int16 peerWebPort = 0;
  105. if (auto addr = tree.get_optional<std::string>("peer_web_address")) {
  106. peerWebAddress = addr.get();
  107. }
  108. if (auto port = tree.get_optional<int16>("peer_web_port")) {
  109. peerWebPort = port.get();
  110. }
  111. std::string id = peer_manager.isPeer(peerWebAddress, peerWebPort);
  112. if (id.size() > 0) {
  113. std::string strPort = std::to_string(peerWebPort);
  114. auto client = peer_https_pool.getOrCreateClient(id, peerWebAddress, strPort);
  115. if (client) {
  116. auto responseZones = client->sendRequest(peerWebAddress, strPort, "/zones"); // Assumes HTTPSClient has a get method
  117. // Load the JSON data into the property tree
  118. std::istringstream json_stream(responseZones);
  119. if (json_stream.str().empty()) {
  120. LogWrite(PEERING__ERROR, 0, "Peering", "%s: JSON Stream Empty for %s:%s/zones", __FUNCTION__, peerWebAddress.c_str(), strPort.c_str());
  121. }
  122. else if (json_stream.fail()) {
  123. LogWrite(PEERING__ERROR, 0, "Peering", "%s: JSON Failed State for %s:%s/zones", __FUNCTION__, peerWebAddress.c_str(), strPort.c_str());
  124. }
  125. else if (json_stream.bad()) {
  126. LogWrite(PEERING__ERROR, 0, "Peering", "%s: JSON Stream Bad for %s:%s/zones", __FUNCTION__, peerWebAddress.c_str(), strPort.c_str());
  127. }
  128. else if (json_stream.eof()) {
  129. LogWrite(PEERING__ERROR, 0, "Peering", "%s: JSON Stream EOF for %s:%s/zones", __FUNCTION__, peerWebAddress.c_str(), strPort.c_str());
  130. }
  131. else {
  132. boost::property_tree::ptree pt;
  133. boost::property_tree::read_json(json_stream, pt);
  134. peer_manager.updateZoneTree(id, pt);
  135. LogWrite(PEERING__DEBUG, 5, "Peering", "%s: StartZone Update for %s:%s/zones complete, zone tree updated.", __FUNCTION__, peerWebAddress.c_str(), strPort.c_str());
  136. }
  137. }
  138. }
  139. }
  140. void SendGlobalMessage(boost::property_tree::ptree tree) {
  141. int32 success = 0;
  142. int8 language = 0;
  143. int16 in_channel = 0;
  144. std::string toName(""), fromName(""), msg(""), awaymsg("");
  145. int8 away_language = 0;
  146. int32 group_id = 0;
  147. if (auto successful = tree.get_optional<int32>("success")) {
  148. success = successful.get();
  149. }
  150. if (auto name = tree.get_optional<std::string>("to_name")) {
  151. toName = name.get();
  152. }
  153. if (auto name = tree.get_optional<std::string>("from_name")) {
  154. fromName = name.get();
  155. }
  156. if (auto message = tree.get_optional<std::string>("message")) {
  157. msg = message.get();
  158. }
  159. if (auto from_language = tree.get_optional<int8>("from_language")) {
  160. language = from_language.get();
  161. }
  162. if (auto channel = tree.get_optional<int16>("channel")) {
  163. in_channel = channel.get();
  164. }
  165. if (auto msg = tree.get_optional<std::string>("away_message")) {
  166. awaymsg = msg.get();
  167. }
  168. if (auto away_lang = tree.get_optional<int8>("away_language")) {
  169. away_language = away_lang.get();
  170. }
  171. if (auto group = tree.get_optional<int32>("group_id")) {
  172. group_id = group.get();
  173. }
  174. switch (in_channel) {
  175. case CHANNEL_PRIVATE_TELL: {
  176. Client* from_client = zone_list.GetClientByCharName(fromName.c_str());
  177. if (from_client) {
  178. if (success) {
  179. from_client->HandleTellMessage(from_client->GetPlayer()->GetName(), msg.c_str(), toName.c_str(), language);
  180. if (awaymsg.size() > 0) {
  181. from_client->HandleTellMessage(toName.c_str(), awaymsg.c_str(), from_client->GetPlayer()->GetName(), away_language);
  182. }
  183. }
  184. else {
  185. from_client->SimpleMessage(CHANNEL_COLOR_CHAT_RELATIONSHIP, "That character does not exist.");
  186. }
  187. }
  188. break;
  189. }
  190. }
  191. }
  192. void HandleNewGroup(boost::property_tree::ptree tree) {
  193. int32 success = 0;
  194. if (auto successful = tree.get_optional<int32>("success")) {
  195. success = successful.get();
  196. }
  197. if (!net.is_primary) {
  198. // we are a peer we ask primary for a new group and get the response
  199. GroupOptions options;
  200. std::string leader(""), member("");
  201. int32 group_id = 0;
  202. int32 member_entity_id = 0;
  203. if (auto leader_name = tree.get_optional<std::string>("leader_name")) {
  204. leader = leader_name.get();
  205. }
  206. if (auto member_name = tree.get_optional<std::string>("member_name")) {
  207. member = member_name.get();
  208. }
  209. if (auto lootmethod = tree.get_optional<int8>("loot_method")) {
  210. options.loot_method = lootmethod.get();
  211. }
  212. if (auto itemrarity = tree.get_optional<int8>("loot_item_rarity")) {
  213. options.loot_items_rarity = itemrarity.get();
  214. }
  215. if (auto autosplit = tree.get_optional<int8>("auto_split")) {
  216. options.auto_split = autosplit.get();
  217. }
  218. if (auto defaultyell = tree.get_optional<int8>("default_yell")) {
  219. options.default_yell = defaultyell.get();
  220. }
  221. if (auto grouplockmethod = tree.get_optional<int8>("group_lock_method")) {
  222. options.group_lock_method = grouplockmethod.get();
  223. }
  224. if (auto groupautolock = tree.get_optional<int8>("group_auto_lock")) {
  225. options.group_autolock = groupautolock.get();
  226. }
  227. if (auto soloautolock = tree.get_optional<int8>("solo_auto_lock")) {
  228. options.solo_autolock = soloautolock.get();
  229. }
  230. if (auto autoloot = tree.get_optional<int8>("auto_loot_method")) {
  231. options.auto_loot_method = autoloot.get();
  232. }
  233. if (auto lastlootindex = tree.get_optional<int8>("last_looted_index")) {
  234. options.last_looted_index = lastlootindex.get();
  235. }
  236. if (auto groupid = tree.get_optional<int32>("group_id")) {
  237. group_id = groupid.get();
  238. }
  239. if (auto entityid = tree.get_optional<int32>("member_entity_id")) {
  240. member_entity_id = entityid.get();
  241. }
  242. LogWrite(PEERING__INFO, 0, "Peering", "%s: Add New Group %u with member %s (%u) leader %s", __FUNCTION__, group_id, member.c_str(), member_entity_id, leader.c_str());
  243. Client* member_client = zone_list.GetClientByCharName(member.c_str());
  244. Client* client_leader = zone_list.GetClientByCharName(leader.c_str());
  245. if (success) {
  246. if (!member_client) {
  247. if (client_leader && client_leader->GetPlayer()->GetZone()) {
  248. Spawn* spawn = client_leader->GetPlayer()->GetZone()->GetSpawnByID(member_entity_id);
  249. if (spawn && spawn->IsEntity()) {
  250. world.GetGroupManager()->AcceptInvite((Entity*)spawn, &group_id);
  251. }
  252. else {
  253. LogWrite(PEERING__ERROR, 0, "Peering", "%s: Failed Adding New Group %u, entity %s (%u) did not exist.", __FUNCTION__, group_id, member.c_str(), member_entity_id);
  254. }
  255. }
  256. else {
  257. LogWrite(PEERING__ERROR, 0, "Peering", "%s: Failed Adding New Group %u, member %s (%u) did not exist.", __FUNCTION__, group_id, member.c_str(), member_entity_id);
  258. }
  259. }
  260. else if (!client_leader) {
  261. if (member_client)
  262. member_client->HandleGroupAcceptResponse(2);
  263. LogWrite(PEERING__ERROR, 0, "Peering", "%s: Failed Adding New Group %u, leader %s did not exist.", __FUNCTION__, group_id, leader.c_str());
  264. }
  265. else {
  266. world.GetGroupManager()->AcceptInvite(member_client->GetPlayer(), &group_id);
  267. }
  268. }
  269. }
  270. }
  271. void HandleCreateGuild(boost::property_tree::ptree tree) {
  272. bool success = false;
  273. int32 guildID = 0;
  274. std::string leaderName("");
  275. if (auto successful = tree.get_optional<bool>("success")) {
  276. success = successful.get();
  277. }
  278. if (auto guild_id = tree.get_optional<int32>("guild_id")) {
  279. guildID = guild_id.get();
  280. }
  281. if (auto name = tree.get_optional<std::string>("leader_name")) {
  282. leaderName = name.get();
  283. }
  284. if (net.is_primary) {
  285. // we send out to peers
  286. }
  287. else if (guildID) {
  288. database.LoadGuild(guildID);
  289. Guild* guild = guild_list.GetGuild(guildID);
  290. Client* leader = zone_list.GetClientByCharName(leaderName.c_str());
  291. if (leader && guild && !leader->GetPlayer()->GetGuild()) {
  292. guild->AddNewGuildMember(leader, 0, GUILD_RANK_LEADER);
  293. database.SaveGuildMembers(guild);
  294. if (leader && leader->GetPlayer()->GetGroupMemberInfo()) {
  295. world.GetGroupManager()->GroupLock(__FUNCTION__, __LINE__);
  296. PlayerGroup* group = world.GetGroupManager()->GetGroup(leader->GetPlayer()->GetGroupMemberInfo()->group_id);
  297. if (group)
  298. {
  299. GroupMemberInfo* gmi = nullptr;
  300. deque<GroupMemberInfo*>::iterator itr;
  301. group->MGroupMembers.readlock(__FUNCTION__, __LINE__);
  302. deque<GroupMemberInfo*>* members = group->GetMembers();
  303. for (itr = members->begin(); itr != members->end(); itr++) {
  304. gmi = *itr;
  305. if (gmi->client && gmi->client != leader && !gmi->client->GetPlayer()->GetGuild())
  306. guild->InvitePlayer(gmi->client, leader->GetPlayer()->GetName());
  307. }
  308. group->MGroupMembers.releasereadlock(__FUNCTION__, __LINE__);
  309. }
  310. world.GetGroupManager()->ReleaseGroupLock(__FUNCTION__, __LINE__);
  311. }
  312. }
  313. }
  314. }
  315. // Define a type for handler functions
  316. using HandlerFunction = std::function<void(boost::property_tree::ptree tree)>;
  317. // Set up the endpoint-to-function map
  318. std::unordered_map<std::string, HandlerFunction> endpointHandlers = {
  319. {"/addcharauth", AddCharAuth},
  320. {"/startzone", StartZone},
  321. {"/sendglobalmessage", SendGlobalMessage},
  322. {"/newgroup", HandleNewGroup},
  323. {"/createguild", HandleCreateGuild}
  324. };
  325. HTTPSClientPool::HTTPSClientPool() {}
  326. void HTTPSClientPool::init(const std::string& cert, const std::string& key) {
  327. certFile = cert;
  328. keyFile = key;
  329. pollingInterval = 1000;
  330. int32 numThreads = 2;
  331. // Start worker threads
  332. for (int32 i = 0; i < numThreads; ++i) {
  333. workers.emplace_back(&HTTPSClientPool::workerFunction, this);
  334. }
  335. }
  336. HTTPSClientPool::~HTTPSClientPool() {
  337. {
  338. std::unique_lock<std::mutex> lock(queueMutex);
  339. stop = true;
  340. }
  341. condition.notify_all();
  342. for (std::thread& worker : workers) {
  343. if (worker.joinable()) {
  344. worker.join();
  345. }
  346. }
  347. }
  348. void HTTPSClientPool::addPeerClient(const std::string& peerId, const std::string& server, const std::string& port, const std::string& authEndpoint) {
  349. bool newClient = false;
  350. // Check if client already exists for the specified (server, port) pair
  351. auto it = clients.find(std::make_pair(server, port));
  352. if (it == clients.end()) {
  353. newClient = true;
  354. }
  355. if (newClient) {
  356. auto client = getOrCreateClient(peerId, server, port);
  357. }
  358. }
  359. boost::property_tree::ptree HTTPSClientPool::sendRequestToPeer(const std::string& peerId, const std::string& target) {
  360. auto client = getClient(peerId);
  361. boost::property_tree::ptree pt;
  362. if (!client) {
  363. LogWrite(PEERING__ERROR, 0, "Peering", "%s: Client for peer %s could not be found.", __FUNCTION__, peerId.c_str());
  364. return pt;
  365. }
  366. // Retrieve the response from the client
  367. std::string responseBody = client->sendRequest(client->getServer(), client->getPort(), target);
  368. if (responseBody.size() < 1) {
  369. int16 peer_port = 0;
  370. try {
  371. peer_port = std::stoul(client->getPort());
  372. }
  373. catch (const std::exception& e) {
  374. LogWrite(PEERING__ERROR, 0, "Peering", "%s: Error for convering peer port for %s: %s.", __FUNCTION__, peerId.c_str(), e.what() ? e.what() : "??");
  375. }
  376. HealthStatus curStatus = peer_manager.getPeerStatus(client->getServer(), peer_port);
  377. switch (curStatus) {
  378. case HealthStatus::WARN: {
  379. peer_manager.updateHealth(peerId, HealthStatus::ERROR);
  380. break;
  381. }
  382. case HealthStatus::ERROR: {
  383. peer_manager.updateHealth(peerId, HealthStatus::SHUTDOWN);
  384. break;
  385. }
  386. default: {
  387. peer_manager.updateHealth(peerId, HealthStatus::WARN);
  388. break;
  389. }
  390. }
  391. }
  392. else {
  393. // Parse the response as JSON
  394. try {
  395. std::istringstream responseStream(responseBody);
  396. boost::property_tree::read_json(responseStream, pt);
  397. }
  398. catch (const boost::property_tree::json_parser_error& e) {
  399. LogWrite(PEERING__ERROR, 0, "Peering", "%s: JSON Parsing error for %s: %s.", __FUNCTION__, peerId.c_str(), e.what() ? e.what() : "??");
  400. }
  401. }
  402. return pt;
  403. }
  404. boost::property_tree::ptree HTTPSClientPool::sendPostRequestToPeer(const std::string& peerId, const std::string& target, const std::string& jsonPayload) {
  405. auto client = getClient(peerId);
  406. boost::property_tree::ptree pt;
  407. if (!client) {
  408. LogWrite(PEERING__ERROR, 0, "Peering", "%s: Client for peer %s could not be found.", __FUNCTION__, peerId.c_str());
  409. return pt;
  410. }
  411. // Retrieve the response from the client
  412. std::string responseBody = client->sendPostRequest(client->getServer(), client->getPort(), target, jsonPayload);
  413. // Parse the response as JSON
  414. try {
  415. std::istringstream responseStream(responseBody);
  416. boost::property_tree::read_json(responseStream, pt);
  417. }
  418. catch (const boost::property_tree::json_parser_error& e) {
  419. LogWrite(PEERING__ERROR, 0, "Peering", "%s: Client for peer %s could not be found.", __FUNCTION__, peerId.c_str());
  420. }
  421. return pt;
  422. }
  423. std::shared_ptr<HTTPSClient> HTTPSClientPool::getClient(const std::string& peerId) {
  424. std::unique_lock<std::mutex> lock(queueMutex);
  425. // Lookup the client by ID
  426. auto idIt = clientsById.find(peerId);
  427. if (idIt != clientsById.end()) {
  428. return idIt->second;
  429. }
  430. return nullptr; // Return nullptr if no client exists with the given ID
  431. }
  432. std::shared_ptr<HTTPSClient> HTTPSClientPool::getOrCreateClient(const std::string& id, const std::string& server, const std::string& port) {
  433. std::unique_lock<std::mutex> lock(queueMutex);
  434. // Create a key based on (server, port)
  435. auto clientKey = std::make_pair(server, port);
  436. // Check if client already exists for the specified (server, port) pair
  437. auto it = clients.find(clientKey);
  438. if (it != clients.end()) {
  439. // Client already exists, return the existing client
  440. return it->second;
  441. }
  442. // No existing client for this (server, port), create and store a new HTTPSClient
  443. auto client = std::make_shared<HTTPSClient>(certFile, keyFile);
  444. clients[clientKey] = client;
  445. clientsById[id] = client;
  446. return client;
  447. }
  448. void HTTPSClientPool::pollPeerHealth(const std::string& server, const std::string& port) {
  449. int16 web_worldport = 0;
  450. try {
  451. web_worldport = std::stoul(port);
  452. }
  453. catch (const std::exception& e) {
  454. LogWrite(PEERING__ERROR, 0, "Peering", "%s: Getting port for peer %s:%s could not be complete.", __FUNCTION__, server.c_str(), port.c_str());
  455. }
  456. std::string id = peer_manager.isPeer(server, web_worldport);
  457. if (id.size() < 1) {
  458. LogWrite(PEERING__ERROR, 0, "Peering", "%s: Error finding peer %s:%s.", __FUNCTION__, server.c_str(), port.c_str());
  459. }
  460. else {
  461. auto client = getOrCreateClient(id, port, server + ":" + port);
  462. int16 interval = pollingInterval;
  463. while (running.load()) {
  464. interval++;
  465. if (interval > pollingInterval) {
  466. HealthStatus curStatus = peer_manager.getPeerStatus(server, web_worldport);
  467. id = peer_manager.isPeer(server, web_worldport);
  468. try {
  469. auto response = client->sendRequest(server, port, "/status"); // Assumes HTTPSClient has a get method
  470. //std::cout << "Health check response from " << server << ":" << port << " - " << response << std::endl;
  471. boost::property_tree::ptree json_tree;
  472. std::istringstream json_stream(response);
  473. boost::property_tree::read_json(json_stream, json_tree);
  474. std::string online_status;
  475. int16 peer_priority = 65535;
  476. bool peer_primary = false;
  477. if (auto status = json_tree.get_optional<std::string>("world_status")) {
  478. online_status = status.get();
  479. }
  480. if (auto priority = json_tree.get_optional<int16>("peer_priority")) {
  481. peer_priority = priority.get();
  482. }
  483. if (auto isprimary = json_tree.get_optional<bool>("peer_primary")) {
  484. peer_primary = isprimary.get();
  485. }
  486. peer_manager.updatePriority(id, peer_priority);
  487. if (peer_primary && net.is_primary) {
  488. peer_manager.handlePrimaryConflict(id);
  489. std::shared_ptr<Peer> hasPrimary = peer_manager.getHealthyPrimaryPeerPtr();
  490. if (hasPrimary) { // demote self
  491. LogWrite(PEERING__INFO, 0, "Peering", "%s: Peer %s at %s:%s - HAS PRIMARY status, demoting self.", __FUNCTION__, id.c_str(), server.c_str(), port.c_str());
  492. net.SetPrimary(false);
  493. }
  494. }
  495. switch (curStatus) {
  496. case HealthStatus::STARTUP: {
  497. pollPeerHealthData(client, id, server, port);
  498. if (online_status == "offline") {
  499. std::shared_ptr<Peer> peer = peer_manager.getPeerById(id);
  500. if (peer) {
  501. peer->wasOffline = true;
  502. if (peer->sentInitialPeerData) {
  503. peer->sentInitialPeerData = false;
  504. }
  505. }
  506. }
  507. if (online_status == "online") {
  508. peer_manager.updateHealth(id, HealthStatus::OK);
  509. std::shared_ptr<Peer> peer = peer_manager.getPeerById(id);
  510. if (net.is_primary) {
  511. if (peer) {
  512. if (peer->wasOffline && !peer->sentInitialPeerData) {
  513. world.GetGroupManager()->SendPeerGroupData(id);
  514. }
  515. peer->sentInitialPeerData = true;
  516. }
  517. }
  518. else if (peer) { // set as if we already sent the data since if we take over we don't want the peer trying to resubmit all groups
  519. peer->wasOffline = false;
  520. peer->sentInitialPeerData = true;
  521. }
  522. LogWrite(PEERING__INFO, 0, "Peering", "%s: Peer %s at %s:%s - HAS OK/UP state.", __FUNCTION__, id.c_str(), server.c_str(), port.c_str());
  523. }
  524. if (!net.is_primary && !peer_manager.hasPrimary() && peer_priority < net.GetPeerPriority()) {
  525. LogWrite(PEERING__INFO, 0, "Peering", "%s: Peer %s at %s:%s - HAS PRIMARY.", __FUNCTION__, id.c_str(), server.c_str(), port.c_str());
  526. peer_manager.setPrimary(id);
  527. net.SetPrimary(false);
  528. }
  529. else if (!peer_manager.hasPrimary() && !net.is_primary && net.GetPeerPriority() <= peer_priority) {
  530. LogWrite(PEERING__INFO, 0, "Peering", "%s: I AM PRIMARY!", __FUNCTION__);
  531. net.SetPrimary();
  532. }
  533. break;
  534. }
  535. case HealthStatus::OK: {
  536. pollPeerHealthData(client, id, server, port);
  537. if (!net.is_primary && !peer_manager.hasPrimary() && peer_priority < net.GetPeerPriority()) {
  538. LogWrite(PEERING__INFO, 0, "Peering", "%s: Peer %s at %s:%s - HAS PRIMARY.", __FUNCTION__, id.c_str(), server.c_str(), port.c_str());
  539. peer_manager.setPrimary(id);
  540. net.SetPrimary(false);
  541. }
  542. else if (!peer_manager.hasPrimary() && !net.is_primary && net.GetPeerPriority() <= peer_priority) {
  543. LogWrite(PEERING__INFO, 0, "Peering", "%s: I AM PRIMARY!!", __FUNCTION__);
  544. net.SetPrimary();
  545. }
  546. break;
  547. }
  548. case HealthStatus::WARN:
  549. case HealthStatus::ERROR:
  550. case HealthStatus::SHUTDOWN: {
  551. peer_manager.updateHealth(id, HealthStatus::STARTUP);
  552. LogWrite(PEERING__INFO, 0, "Peering", "%s: Peer %s at %s:%s - HAS ENTERED STARTUP state.", __FUNCTION__, id.c_str(), server.c_str(), port.c_str());
  553. if (net.is_primary) {
  554. std::shared_ptr<Peer> peer = peer_manager.getPeerById(id);
  555. if (peer && peer->sentInitialPeerData == true) {
  556. peer->sentInitialPeerData = false;
  557. }
  558. }
  559. break;
  560. }
  561. }
  562. }
  563. catch (const std::exception& e) {
  564. HealthStatus curStatus = peer_manager.getPeerStatus(server, web_worldport);
  565. switch (curStatus) {
  566. case HealthStatus::WARN: {
  567. peer_manager.updateHealth(id, HealthStatus::ERROR);
  568. break;
  569. }
  570. case HealthStatus::ERROR: {
  571. LogWrite(PEERING__ERROR, 0, "Peering", "%s: Peer %s at %s:%s - HAS ERROR->SHUTDOWN state.", __FUNCTION__, id.c_str(), server.c_str(), port.c_str());
  572. peer_manager.updateHealth(id, HealthStatus::SHUTDOWN);
  573. if (peer_manager.getHealthyPeer() == std::nullopt) {
  574. if (!net.is_primary && world.world_loaded) {
  575. LogWrite(PEERING__INFO, 0, "Peering", "%s: TAKING OVER AS PRIMARY, NO PEERS AVAILABLE TO CHECK", __FUNCTION__);
  576. net.SetPrimary();
  577. }
  578. }
  579. else if (!peer_manager.hasPrimary()) {
  580. std::string newPrimary = peer_manager.getPriorityPeer();
  581. if (newPrimary.size() > 0) {
  582. LogWrite(PEERING__INFO, 0, "Peering", "%s: NEW PRIMARY %s", __FUNCTION__, newPrimary);
  583. peer_manager.setPrimary(newPrimary);
  584. net.SetPrimary(false);
  585. }
  586. else {
  587. LogWrite(PEERING__ERROR, 0, "Peering", "%s: NEW PRIMARY CANNOT BE ESTABLISHED!", __FUNCTION__);
  588. }
  589. }
  590. break;
  591. }
  592. default: {
  593. peer_manager.updateHealth(id, HealthStatus::WARN);
  594. break;
  595. }
  596. }
  597. LogWrite(PEERING__ERROR, 0, "Peering", "%s: ERROR POLLING %s:%s reason: %s", __FUNCTION__, server.c_str(), port.c_str(), e.what() ? e.what() : "??");
  598. }
  599. interval = 0;
  600. }
  601. std::this_thread::sleep_for(std::chrono::milliseconds(1));
  602. }
  603. }
  604. }
  605. void HTTPSClientPool::pollPeerHealthData(auto client, const std::string& id, const std::string& server, const std::string& port) {
  606. if (client == nullptr) {
  607. }
  608. auto responseZones = client->sendRequest(server, port, "/zones"); // Assumes HTTPSClient has a get method
  609. // Load the JSON data into the property tree
  610. std::istringstream json_stream(responseZones);
  611. if (json_stream.str().empty()) {
  612. LogWrite(PEERING__ERROR, 0, "Peering", "%s: Polling JSON Stream Empty for %s:%s/zones", __FUNCTION__, server.c_str(), port.c_str());
  613. }
  614. else if (json_stream.fail()) {
  615. LogWrite(PEERING__ERROR, 0, "Peering", "%s: Polling JSON Failed State for %s:%s/zones", __FUNCTION__, server.c_str(), port.c_str());
  616. }
  617. else if (json_stream.bad()) {
  618. LogWrite(PEERING__ERROR, 0, "Peering", "%s: Polling JSON Stream Bad for %s:%s/zones", __FUNCTION__, server.c_str(), port.c_str());
  619. }
  620. else if (json_stream.eof()) {
  621. LogWrite(PEERING__ERROR, 0, "Peering", "%s: Polling JSON Stream EOF for %s:%s/zones", __FUNCTION__, server.c_str(), port.c_str());
  622. }
  623. else {
  624. boost::property_tree::ptree pt;
  625. boost::property_tree::read_json(json_stream, pt);
  626. peer_manager.updateZoneTree(id, pt);
  627. LogWrite(PEERING__DEBUG, 5, "Peering", "%s: Polling for %s:%s/zones complete, zone tree updated.", __FUNCTION__, server.c_str(), port.c_str());
  628. }
  629. auto responseClients = client->sendRequest(server, port, "/clients"); // Assumes HTTPSClient has a get method
  630. // Load the JSON data into the property tree
  631. std::istringstream json_stream2(responseClients);
  632. if (json_stream2.str().empty()) {
  633. LogWrite(PEERING__ERROR, 0, "Peering", "%s: Polling JSON Stream Empty for %s:%s/clients", __FUNCTION__, server.c_str(), port.c_str());
  634. }
  635. else if (json_stream2.fail()) {
  636. LogWrite(PEERING__ERROR, 0, "Peering", "%s: Polling JSON Failed State for %s:%s/clients", __FUNCTION__, server.c_str(), port.c_str());
  637. }
  638. else if (json_stream2.bad()) {
  639. LogWrite(PEERING__ERROR, 0, "Peering", "%s: Polling JSON Stream Bad for %s:%s/clients", __FUNCTION__, server.c_str(), port.c_str());
  640. }
  641. else if (json_stream2.eof()) {
  642. LogWrite(PEERING__ERROR, 0, "Peering", "%s: Polling JSON Stream EOF for %s:%s/clients", __FUNCTION__, server.c_str(), port.c_str());
  643. }
  644. else {
  645. boost::property_tree::ptree pt2;
  646. boost::property_tree::read_json(json_stream2, pt2);
  647. peer_manager.updateClientTree(id, pt2);
  648. }
  649. }
  650. void HTTPSClientPool::startPolling() {
  651. running.store(true);
  652. for (const auto& clientPair : clients) {
  653. auto server = clientPair.first.first;
  654. auto port = clientPair.first.second;
  655. std::async(std::launch::async, &HTTPSClientPool::pollPeerHealth, this, server, port);
  656. }
  657. }
  658. void HTTPSClientPool::stopPolling() {
  659. running.store(false);
  660. }
  661. void HTTPSClientPool::sendPostRequestToPeerAsync(const std::string& peerId, const std::string& server, const std::string& port, const std::string& target, const std::string& payload) {
  662. {
  663. std::unique_lock<std::mutex> lock(queueMutex);
  664. taskQueue.emplace([this, peerId, server, port, target, payload]() {
  665. std::shared_ptr<HTTPSClient> client = getClient(peerId);
  666. if (client) {
  667. std::string response = client->sendPostRequest(server, port, target, payload);
  668. boost::property_tree::ptree pt;
  669. try {
  670. std::istringstream responseStream(response);
  671. boost::property_tree::read_json(responseStream, pt);
  672. }
  673. catch (const boost::property_tree::json_parser_error& e) {
  674. LogWrite(PEERING__ERROR, 0, "Peering", "%s: JSON Parsing error for %s (%s:%s): %s.", __FUNCTION__, peerId.c_str(), server.c_str(), port.c_str(), e.what() ? e.what() : "??");
  675. }
  676. if (endpointHandlers.find(target) != endpointHandlers.end()) {
  677. endpointHandlers[target](pt); // Call the corresponding handler
  678. }
  679. else {
  680. //std::cout << "No handler for endpoint: " << endpoint << std::endl;
  681. }
  682. }
  683. else {
  684. LogWrite(PEERING__ERROR, 0, "Peering", "%s: Client not found for %s (%s:%s).", __FUNCTION__, peerId.c_str(), server.c_str(), port.c_str());
  685. }
  686. });
  687. }
  688. condition.notify_one();
  689. }
  690. void HTTPSClientPool::workerFunction() {
  691. while (true) {
  692. std::function<void()> task;
  693. {
  694. std::unique_lock<std::mutex> lock(queueMutex);
  695. condition.wait(lock, [this] { return stop || !taskQueue.empty(); });
  696. if (stop && taskQueue.empty()) {
  697. return;
  698. }
  699. task = std::move(taskQueue.front());
  700. taskQueue.pop();
  701. }
  702. task();
  703. }
  704. }