/* EQ2Emu: Everquest II Server Emulator Copyright (C) 2007-2025 EQ2Emu Development Team (https://www.eq2emu.com) This file is part of EQ2Emu. EQ2Emu is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. EQ2Emu is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with EQ2Emu. If not, see . */ #include "HTTPSClientPool.h" #include "PeerManager.h" #include "../net.h" #include "../World.h" #include "../LoginServer.h" #include "../WorldDatabase.h" #include "../Guilds/Guild.h" #include "../../common/Log.h" #include #include #include #include #include extern NetConnection net; extern PeerManager peer_manager; extern World world; extern LoginServer loginserver; extern ZoneList zone_list; extern WorldDatabase database; extern GuildList guild_list; extern HTTPSClientPool peer_https_pool; // Handler functions for each endpoint void AddCharAuth(boost::property_tree::ptree tree) { int32 success = 0; std::string charName(""), zoneName(""); int32 acct_id = 0; int32 zoneId = 0, instanceId = 0; bool firstLogin = false; std::string worldAddr(""), internalWorldAddr(""), clientIP(""); int16 worldPort = 0; int32 charID = 0; int32 worldID = 0, fromID = 0; int32 loginKey = 0; if (auto char_name = tree.get_optional("character_name")) { charName = char_name.get(); } if (auto successful = tree.get_optional("success")) { success = successful.get(); } if (auto account_id = tree.get_optional("account_id")) { acct_id = account_id.get(); } if (auto zone_name = tree.get_optional("zone_name")) { zoneName = zone_name.get(); } if (auto zone_id = tree.get_optional("zone_id")) { zoneId = zone_id.get(); } if (auto instance_id = tree.get_optional("instance_id")) { instanceId = instance_id.get(); } if (auto first_login = tree.get_optional("first_login")) { firstLogin = first_login.get(); } if (auto peerclientaddr = tree.get_optional("peer_client_address")) { worldAddr = peerclientaddr.get(); } if (auto peerclient_internaladdr = tree.get_optional("peer_client_internal_address")) { internalWorldAddr = peerclient_internaladdr.get(); } if (auto peerclientport = tree.get_optional("peer_client_port")) { worldPort = peerclientport.get(); } if (auto clientip = tree.get_optional("client_ip")) { clientIP = clientip.get(); } if (auto character_id = tree.get_optional("character_id")) { charID = character_id.get(); } if (auto login_key = tree.get_optional("login_key")) { loginKey = login_key.get(); } if (auto world_id = tree.get_optional("world_id")) { worldID = world_id.get(); } if (auto from_id = tree.get_optional("from_id")) { fromID = from_id.get(); } LogWrite(PEERING__INFO, 0, "Peering", "%s: Handling AddCharAuth %s (%u) for zone %u instance %u", __FUNCTION__, charName.c_str(), charID, zoneId, instanceId); if (firstLogin) { loginserver.SendCharApprovedLogin(success, worldAddr, internalWorldAddr, clientIP, worldPort, acct_id, charID, loginKey, worldID, fromID); } else if (acct_id > 0 && charName.length() > 0) { world.ClientAuthApproval(success, charName, acct_id, zoneName, zoneId, instanceId, firstLogin); } } void StartZone(boost::property_tree::ptree tree) { std::string peerWebAddress(""); int16 peerWebPort = 0; if (auto addr = tree.get_optional("peer_web_address")) { peerWebAddress = addr.get(); } if (auto port = tree.get_optional("peer_web_port")) { peerWebPort = port.get(); } std::string id = peer_manager.isPeer(peerWebAddress, peerWebPort); if (id.size() > 0) { std::string strPort = std::to_string(peerWebPort); auto client = peer_https_pool.getOrCreateClient(id, peerWebAddress, strPort); if (client) { auto responseZones = client->sendRequest(peerWebAddress, strPort, "/zones"); // Assumes HTTPSClient has a get method // Load the JSON data into the property tree std::istringstream json_stream(responseZones); if (json_stream.str().empty()) { LogWrite(PEERING__ERROR, 0, "Peering", "%s: JSON Stream Empty for %s:%s/zones", __FUNCTION__, peerWebAddress.c_str(), strPort.c_str()); } else if (json_stream.fail()) { LogWrite(PEERING__ERROR, 0, "Peering", "%s: JSON Failed State for %s:%s/zones", __FUNCTION__, peerWebAddress.c_str(), strPort.c_str()); } else if (json_stream.bad()) { LogWrite(PEERING__ERROR, 0, "Peering", "%s: JSON Stream Bad for %s:%s/zones", __FUNCTION__, peerWebAddress.c_str(), strPort.c_str()); } else if (json_stream.eof()) { LogWrite(PEERING__ERROR, 0, "Peering", "%s: JSON Stream EOF for %s:%s/zones", __FUNCTION__, peerWebAddress.c_str(), strPort.c_str()); } else { boost::property_tree::ptree pt; boost::property_tree::read_json(json_stream, pt); peer_manager.updateZoneTree(id, pt); LogWrite(PEERING__DEBUG, 5, "Peering", "%s: StartZone Update for %s:%s/zones complete, zone tree updated.", __FUNCTION__, peerWebAddress.c_str(), strPort.c_str()); } } } } void SendGlobalMessage(boost::property_tree::ptree tree) { int32 success = 0; int8 language = 0; int16 in_channel = 0; std::string toName(""), fromName(""), msg(""), awaymsg(""); int8 away_language = 0; int32 group_id = 0; if (auto successful = tree.get_optional("success")) { success = successful.get(); } if (auto name = tree.get_optional("to_name")) { toName = name.get(); } if (auto name = tree.get_optional("from_name")) { fromName = name.get(); } if (auto message = tree.get_optional("message")) { msg = message.get(); } if (auto from_language = tree.get_optional("from_language")) { language = from_language.get(); } if (auto channel = tree.get_optional("channel")) { in_channel = channel.get(); } if (auto msg = tree.get_optional("away_message")) { awaymsg = msg.get(); } if (auto away_lang = tree.get_optional("away_language")) { away_language = away_lang.get(); } if (auto group = tree.get_optional("group_id")) { group_id = group.get(); } switch (in_channel) { case CHANNEL_PRIVATE_TELL: { Client* from_client = zone_list.GetClientByCharName(fromName.c_str()); if (from_client) { if (success) { from_client->HandleTellMessage(from_client->GetPlayer()->GetName(), msg.c_str(), toName.c_str(), language); if (awaymsg.size() > 0) { from_client->HandleTellMessage(toName.c_str(), awaymsg.c_str(), from_client->GetPlayer()->GetName(), away_language); } } else { from_client->SimpleMessage(CHANNEL_COLOR_CHAT_RELATIONSHIP, "That character does not exist."); } } break; } } } void HandleNewGroup(boost::property_tree::ptree tree) { int32 success = 0; if (auto successful = tree.get_optional("success")) { success = successful.get(); } if (!net.is_primary) { // we are a peer we ask primary for a new group and get the response GroupOptions options; std::string leader(""), member(""); int32 group_id = 0; int32 member_entity_id = 0; if (auto leader_name = tree.get_optional("leader_name")) { leader = leader_name.get(); } if (auto member_name = tree.get_optional("member_name")) { member = member_name.get(); } if (auto lootmethod = tree.get_optional("loot_method")) { options.loot_method = lootmethod.get(); } if (auto itemrarity = tree.get_optional("loot_item_rarity")) { options.loot_items_rarity = itemrarity.get(); } if (auto autosplit = tree.get_optional("auto_split")) { options.auto_split = autosplit.get(); } if (auto defaultyell = tree.get_optional("default_yell")) { options.default_yell = defaultyell.get(); } if (auto grouplockmethod = tree.get_optional("group_lock_method")) { options.group_lock_method = grouplockmethod.get(); } if (auto groupautolock = tree.get_optional("group_auto_lock")) { options.group_autolock = groupautolock.get(); } if (auto soloautolock = tree.get_optional("solo_auto_lock")) { options.solo_autolock = soloautolock.get(); } if (auto autoloot = tree.get_optional("auto_loot_method")) { options.auto_loot_method = autoloot.get(); } if (auto lastlootindex = tree.get_optional("last_looted_index")) { options.last_looted_index = lastlootindex.get(); } if (auto groupid = tree.get_optional("group_id")) { group_id = groupid.get(); } if (auto entityid = tree.get_optional("member_entity_id")) { member_entity_id = entityid.get(); } LogWrite(PEERING__INFO, 0, "Peering", "%s: Add New Group %u with member %s (%u) leader %s", __FUNCTION__, group_id, member.c_str(), member_entity_id, leader.c_str()); Client* member_client = zone_list.GetClientByCharName(member.c_str()); Client* client_leader = zone_list.GetClientByCharName(leader.c_str()); if (success) { if (!member_client) { if (client_leader && client_leader->GetPlayer()->GetZone()) { Spawn* spawn = client_leader->GetPlayer()->GetZone()->GetSpawnByID(member_entity_id); if (spawn && spawn->IsEntity()) { world.GetGroupManager()->AcceptInvite((Entity*)spawn, &group_id); } else { LogWrite(PEERING__ERROR, 0, "Peering", "%s: Failed Adding New Group %u, entity %s (%u) did not exist.", __FUNCTION__, group_id, member.c_str(), member_entity_id); } } else { LogWrite(PEERING__ERROR, 0, "Peering", "%s: Failed Adding New Group %u, member %s (%u) did not exist.", __FUNCTION__, group_id, member.c_str(), member_entity_id); } } else if (!client_leader) { if (member_client) member_client->HandleGroupAcceptResponse(2); LogWrite(PEERING__ERROR, 0, "Peering", "%s: Failed Adding New Group %u, leader %s did not exist.", __FUNCTION__, group_id, leader.c_str()); } else { world.GetGroupManager()->AcceptInvite(member_client->GetPlayer(), &group_id); } } } } void HandleCreateGuild(boost::property_tree::ptree tree) { bool success = false; int32 guildID = 0; std::string leaderName(""); if (auto successful = tree.get_optional("success")) { success = successful.get(); } if (auto guild_id = tree.get_optional("guild_id")) { guildID = guild_id.get(); } if (auto name = tree.get_optional("leader_name")) { leaderName = name.get(); } if (net.is_primary) { // we send out to peers } else if (guildID) { database.LoadGuild(guildID); Guild* guild = guild_list.GetGuild(guildID); Client* leader = zone_list.GetClientByCharName(leaderName.c_str()); if (leader && guild && !leader->GetPlayer()->GetGuild()) { guild->AddNewGuildMember(leader, 0, GUILD_RANK_LEADER); database.SaveGuildMembers(guild); if (leader && leader->GetPlayer()->GetGroupMemberInfo()) { world.GetGroupManager()->GroupLock(__FUNCTION__, __LINE__); PlayerGroup* group = world.GetGroupManager()->GetGroup(leader->GetPlayer()->GetGroupMemberInfo()->group_id); if (group) { GroupMemberInfo* gmi = nullptr; deque::iterator itr; group->MGroupMembers.readlock(__FUNCTION__, __LINE__); deque* members = group->GetMembers(); for (itr = members->begin(); itr != members->end(); itr++) { gmi = *itr; if (gmi->client && gmi->client != leader && !gmi->client->GetPlayer()->GetGuild()) guild->InvitePlayer(gmi->client, leader->GetPlayer()->GetName()); } group->MGroupMembers.releasereadlock(__FUNCTION__, __LINE__); } world.GetGroupManager()->ReleaseGroupLock(__FUNCTION__, __LINE__); } } } } // Define a type for handler functions using HandlerFunction = std::function; // Set up the endpoint-to-function map std::unordered_map endpointHandlers = { {"/addcharauth", AddCharAuth}, {"/startzone", StartZone}, {"/sendglobalmessage", SendGlobalMessage}, {"/newgroup", HandleNewGroup}, {"/createguild", HandleCreateGuild} }; HTTPSClientPool::HTTPSClientPool() {} void HTTPSClientPool::init(const std::string& cert, const std::string& key) { certFile = cert; keyFile = key; pollingInterval = 1000; int32 numThreads = 2; // Start worker threads for (int32 i = 0; i < numThreads; ++i) { workers.emplace_back(&HTTPSClientPool::workerFunction, this); } } HTTPSClientPool::~HTTPSClientPool() { { std::unique_lock lock(queueMutex); stop = true; } condition.notify_all(); for (std::thread& worker : workers) { if (worker.joinable()) { worker.join(); } } } void HTTPSClientPool::addPeerClient(const std::string& peerId, const std::string& server, const std::string& port, const std::string& authEndpoint) { bool newClient = false; // Check if client already exists for the specified (server, port) pair auto it = clients.find(std::make_pair(server, port)); if (it == clients.end()) { newClient = true; } if (newClient) { auto client = getOrCreateClient(peerId, server, port); } } boost::property_tree::ptree HTTPSClientPool::sendRequestToPeer(const std::string& peerId, const std::string& target) { auto client = getClient(peerId); boost::property_tree::ptree pt; if (!client) { LogWrite(PEERING__ERROR, 0, "Peering", "%s: Client for peer %s could not be found.", __FUNCTION__, peerId.c_str()); return pt; } // Retrieve the response from the client std::string responseBody = client->sendRequest(client->getServer(), client->getPort(), target); if (responseBody.size() < 1) { int16 peer_port = 0; try { peer_port = std::stoul(client->getPort()); } catch (const std::exception& e) { LogWrite(PEERING__ERROR, 0, "Peering", "%s: Error for convering peer port for %s: %s.", __FUNCTION__, peerId.c_str(), e.what() ? e.what() : "??"); } HealthStatus curStatus = peer_manager.getPeerStatus(client->getServer(), peer_port); switch (curStatus) { case HealthStatus::WARN: { peer_manager.updateHealth(peerId, HealthStatus::ERROR); break; } case HealthStatus::ERROR: { peer_manager.updateHealth(peerId, HealthStatus::SHUTDOWN); break; } default: { peer_manager.updateHealth(peerId, HealthStatus::WARN); break; } } } else { // Parse the response as JSON try { std::istringstream responseStream(responseBody); boost::property_tree::read_json(responseStream, pt); } catch (const boost::property_tree::json_parser_error& e) { LogWrite(PEERING__ERROR, 0, "Peering", "%s: JSON Parsing error for %s: %s.", __FUNCTION__, peerId.c_str(), e.what() ? e.what() : "??"); } } return pt; } boost::property_tree::ptree HTTPSClientPool::sendPostRequestToPeer(const std::string& peerId, const std::string& target, const std::string& jsonPayload) { auto client = getClient(peerId); boost::property_tree::ptree pt; if (!client) { LogWrite(PEERING__ERROR, 0, "Peering", "%s: Client for peer %s could not be found.", __FUNCTION__, peerId.c_str()); return pt; } // Retrieve the response from the client std::string responseBody = client->sendPostRequest(client->getServer(), client->getPort(), target, jsonPayload); // Parse the response as JSON try { std::istringstream responseStream(responseBody); boost::property_tree::read_json(responseStream, pt); } catch (const boost::property_tree::json_parser_error& e) { LogWrite(PEERING__ERROR, 0, "Peering", "%s: Client for peer %s could not be found.", __FUNCTION__, peerId.c_str()); } return pt; } std::shared_ptr HTTPSClientPool::getClient(const std::string& peerId) { std::unique_lock lock(queueMutex); // Lookup the client by ID auto idIt = clientsById.find(peerId); if (idIt != clientsById.end()) { return idIt->second; } return nullptr; // Return nullptr if no client exists with the given ID } std::shared_ptr HTTPSClientPool::getOrCreateClient(const std::string& id, const std::string& server, const std::string& port) { std::unique_lock lock(queueMutex); // Create a key based on (server, port) auto clientKey = std::make_pair(server, port); // Check if client already exists for the specified (server, port) pair auto it = clients.find(clientKey); if (it != clients.end()) { // Client already exists, return the existing client return it->second; } // No existing client for this (server, port), create and store a new HTTPSClient auto client = std::make_shared(certFile, keyFile); clients[clientKey] = client; clientsById[id] = client; return client; } void HTTPSClientPool::pollPeerHealth(const std::string& server, const std::string& port) { int16 web_worldport = 0; try { web_worldport = std::stoul(port); } catch (const std::exception& e) { LogWrite(PEERING__ERROR, 0, "Peering", "%s: Getting port for peer %s:%s could not be complete.", __FUNCTION__, server.c_str(), port.c_str()); } std::string id = peer_manager.isPeer(server, web_worldport); if (id.size() < 1) { LogWrite(PEERING__ERROR, 0, "Peering", "%s: Error finding peer %s:%s.", __FUNCTION__, server.c_str(), port.c_str()); } else { auto client = getOrCreateClient(id, port, server + ":" + port); int16 interval = pollingInterval; while (running.load()) { interval++; if (interval > pollingInterval) { HealthStatus curStatus = peer_manager.getPeerStatus(server, web_worldport); id = peer_manager.isPeer(server, web_worldport); try { auto response = client->sendRequest(server, port, "/status"); // Assumes HTTPSClient has a get method //std::cout << "Health check response from " << server << ":" << port << " - " << response << std::endl; boost::property_tree::ptree json_tree; std::istringstream json_stream(response); boost::property_tree::read_json(json_stream, json_tree); std::string online_status; int16 peer_priority = 65535; bool peer_primary = false; if (auto status = json_tree.get_optional("world_status")) { online_status = status.get(); } if (auto priority = json_tree.get_optional("peer_priority")) { peer_priority = priority.get(); } if (auto isprimary = json_tree.get_optional("peer_primary")) { peer_primary = isprimary.get(); } peer_manager.updatePriority(id, peer_priority); if (peer_primary && net.is_primary) { peer_manager.handlePrimaryConflict(id); std::shared_ptr hasPrimary = peer_manager.getHealthyPrimaryPeerPtr(); if (hasPrimary) { // demote self LogWrite(PEERING__INFO, 0, "Peering", "%s: Peer %s at %s:%s - HAS PRIMARY status, demoting self.", __FUNCTION__, id.c_str(), server.c_str(), port.c_str()); net.SetPrimary(false); } } switch (curStatus) { case HealthStatus::STARTUP: { pollPeerHealthData(client, id, server, port); if (online_status == "offline") { std::shared_ptr peer = peer_manager.getPeerById(id); if (peer) { peer->wasOffline = true; if (peer->sentInitialPeerData) { peer->sentInitialPeerData = false; } } } if (online_status == "online") { peer_manager.updateHealth(id, HealthStatus::OK); std::shared_ptr peer = peer_manager.getPeerById(id); if (net.is_primary) { if (peer) { if (peer->wasOffline && !peer->sentInitialPeerData) { world.GetGroupManager()->SendPeerGroupData(id); } peer->sentInitialPeerData = true; } } else if (peer) { // set as if we already sent the data since if we take over we don't want the peer trying to resubmit all groups peer->wasOffline = false; peer->sentInitialPeerData = true; } LogWrite(PEERING__INFO, 0, "Peering", "%s: Peer %s at %s:%s - HAS OK/UP state.", __FUNCTION__, id.c_str(), server.c_str(), port.c_str()); } if (!net.is_primary && !peer_manager.hasPrimary() && peer_priority < net.GetPeerPriority()) { LogWrite(PEERING__INFO, 0, "Peering", "%s: Peer %s at %s:%s - HAS PRIMARY.", __FUNCTION__, id.c_str(), server.c_str(), port.c_str()); peer_manager.setPrimary(id); net.SetPrimary(false); } else if (!peer_manager.hasPrimary() && !net.is_primary && net.GetPeerPriority() <= peer_priority) { LogWrite(PEERING__INFO, 0, "Peering", "%s: I AM PRIMARY!", __FUNCTION__); net.SetPrimary(); } break; } case HealthStatus::OK: { pollPeerHealthData(client, id, server, port); if (!net.is_primary && !peer_manager.hasPrimary() && peer_priority < net.GetPeerPriority()) { LogWrite(PEERING__INFO, 0, "Peering", "%s: Peer %s at %s:%s - HAS PRIMARY.", __FUNCTION__, id.c_str(), server.c_str(), port.c_str()); peer_manager.setPrimary(id); net.SetPrimary(false); } else if (!peer_manager.hasPrimary() && !net.is_primary && net.GetPeerPriority() <= peer_priority) { LogWrite(PEERING__INFO, 0, "Peering", "%s: I AM PRIMARY!!", __FUNCTION__); net.SetPrimary(); } break; } case HealthStatus::WARN: case HealthStatus::ERROR: case HealthStatus::SHUTDOWN: { peer_manager.updateHealth(id, HealthStatus::STARTUP); LogWrite(PEERING__INFO, 0, "Peering", "%s: Peer %s at %s:%s - HAS ENTERED STARTUP state.", __FUNCTION__, id.c_str(), server.c_str(), port.c_str()); if (net.is_primary) { std::shared_ptr peer = peer_manager.getPeerById(id); if (peer && peer->sentInitialPeerData == true) { peer->sentInitialPeerData = false; } } break; } } } catch (const std::exception& e) { HealthStatus curStatus = peer_manager.getPeerStatus(server, web_worldport); switch (curStatus) { case HealthStatus::WARN: { peer_manager.updateHealth(id, HealthStatus::ERROR); break; } case HealthStatus::ERROR: { LogWrite(PEERING__ERROR, 0, "Peering", "%s: Peer %s at %s:%s - HAS ERROR->SHUTDOWN state.", __FUNCTION__, id.c_str(), server.c_str(), port.c_str()); peer_manager.updateHealth(id, HealthStatus::SHUTDOWN); if (peer_manager.getHealthyPeer() == std::nullopt) { if (!net.is_primary && world.world_loaded) { LogWrite(PEERING__INFO, 0, "Peering", "%s: TAKING OVER AS PRIMARY, NO PEERS AVAILABLE TO CHECK", __FUNCTION__); net.SetPrimary(); } } else if (!peer_manager.hasPrimary()) { std::string newPrimary = peer_manager.getPriorityPeer(); if (newPrimary.size() > 0) { LogWrite(PEERING__INFO, 0, "Peering", "%s: NEW PRIMARY %s", __FUNCTION__, newPrimary); peer_manager.setPrimary(newPrimary); net.SetPrimary(false); } else { LogWrite(PEERING__ERROR, 0, "Peering", "%s: NEW PRIMARY CANNOT BE ESTABLISHED!", __FUNCTION__); } } break; } default: { peer_manager.updateHealth(id, HealthStatus::WARN); break; } } LogWrite(PEERING__ERROR, 0, "Peering", "%s: ERROR POLLING %s:%s reason: %s", __FUNCTION__, server.c_str(), port.c_str(), e.what() ? e.what() : "??"); } interval = 0; } std::this_thread::sleep_for(std::chrono::milliseconds(1)); } } } void HTTPSClientPool::pollPeerHealthData(auto client, const std::string& id, const std::string& server, const std::string& port) { if (client == nullptr) { } auto responseZones = client->sendRequest(server, port, "/zones"); // Assumes HTTPSClient has a get method // Load the JSON data into the property tree std::istringstream json_stream(responseZones); if (json_stream.str().empty()) { LogWrite(PEERING__ERROR, 0, "Peering", "%s: Polling JSON Stream Empty for %s:%s/zones", __FUNCTION__, server.c_str(), port.c_str()); } else if (json_stream.fail()) { LogWrite(PEERING__ERROR, 0, "Peering", "%s: Polling JSON Failed State for %s:%s/zones", __FUNCTION__, server.c_str(), port.c_str()); } else if (json_stream.bad()) { LogWrite(PEERING__ERROR, 0, "Peering", "%s: Polling JSON Stream Bad for %s:%s/zones", __FUNCTION__, server.c_str(), port.c_str()); } else if (json_stream.eof()) { LogWrite(PEERING__ERROR, 0, "Peering", "%s: Polling JSON Stream EOF for %s:%s/zones", __FUNCTION__, server.c_str(), port.c_str()); } else { boost::property_tree::ptree pt; boost::property_tree::read_json(json_stream, pt); peer_manager.updateZoneTree(id, pt); LogWrite(PEERING__DEBUG, 5, "Peering", "%s: Polling for %s:%s/zones complete, zone tree updated.", __FUNCTION__, server.c_str(), port.c_str()); } auto responseClients = client->sendRequest(server, port, "/clients"); // Assumes HTTPSClient has a get method // Load the JSON data into the property tree std::istringstream json_stream2(responseClients); if (json_stream2.str().empty()) { LogWrite(PEERING__ERROR, 0, "Peering", "%s: Polling JSON Stream Empty for %s:%s/clients", __FUNCTION__, server.c_str(), port.c_str()); } else if (json_stream2.fail()) { LogWrite(PEERING__ERROR, 0, "Peering", "%s: Polling JSON Failed State for %s:%s/clients", __FUNCTION__, server.c_str(), port.c_str()); } else if (json_stream2.bad()) { LogWrite(PEERING__ERROR, 0, "Peering", "%s: Polling JSON Stream Bad for %s:%s/clients", __FUNCTION__, server.c_str(), port.c_str()); } else if (json_stream2.eof()) { LogWrite(PEERING__ERROR, 0, "Peering", "%s: Polling JSON Stream EOF for %s:%s/clients", __FUNCTION__, server.c_str(), port.c_str()); } else { boost::property_tree::ptree pt2; boost::property_tree::read_json(json_stream2, pt2); peer_manager.updateClientTree(id, pt2); } } void HTTPSClientPool::startPolling() { running.store(true); for (const auto& clientPair : clients) { auto server = clientPair.first.first; auto port = clientPair.first.second; std::async(std::launch::async, &HTTPSClientPool::pollPeerHealth, this, server, port); } } void HTTPSClientPool::stopPolling() { running.store(false); } void HTTPSClientPool::sendPostRequestToPeerAsync(const std::string& peerId, const std::string& server, const std::string& port, const std::string& target, const std::string& payload) { { std::unique_lock lock(queueMutex); taskQueue.emplace([this, peerId, server, port, target, payload]() { std::shared_ptr client = getClient(peerId); if (client) { std::string response = client->sendPostRequest(server, port, target, payload); boost::property_tree::ptree pt; try { std::istringstream responseStream(response); boost::property_tree::read_json(responseStream, pt); } catch (const boost::property_tree::json_parser_error& e) { LogWrite(PEERING__ERROR, 0, "Peering", "%s: JSON Parsing error for %s (%s:%s): %s.", __FUNCTION__, peerId.c_str(), server.c_str(), port.c_str(), e.what() ? e.what() : "??"); } if (endpointHandlers.find(target) != endpointHandlers.end()) { endpointHandlers[target](pt); // Call the corresponding handler } else { //std::cout << "No handler for endpoint: " << endpoint << std::endl; } } else { LogWrite(PEERING__ERROR, 0, "Peering", "%s: Client not found for %s (%s:%s).", __FUNCTION__, peerId.c_str(), server.c_str(), port.c_str()); } }); } condition.notify_one(); } void HTTPSClientPool::workerFunction() { while (true) { std::function task; { std::unique_lock lock(queueMutex); condition.wait(lock, [this] { return stop || !taskQueue.empty(); }); if (stop && taskQueue.empty()) { return; } task = std::move(taskQueue.front()); taskQueue.pop(); } task(); } }