1357 lines
39 KiB
C
1357 lines
39 KiB
C
/*
|
|
* Copyright (C) 2007 The Android Open Source Project
|
|
*
|
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
* you may not use this file except in compliance with the License.
|
|
* You may obtain a copy of the License at
|
|
*
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
*
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
* See the License for the specific language governing permissions and
|
|
* limitations under the License.
|
|
*/
|
|
|
|
#define LOG_TAG "mq"
|
|
|
|
#include <assert.h>
|
|
#include <errno.h>
|
|
#include <fcntl.h>
|
|
#include <pthread.h>
|
|
#include <stdlib.h>
|
|
#include <string.h>
|
|
#include <unistd.h>
|
|
|
|
#include <sys/socket.h>
|
|
#include <sys/types.h>
|
|
#include <sys/un.h>
|
|
#include <sys/uio.h>
|
|
|
|
#include <cutils/array.h>
|
|
#include <cutils/hashmap.h>
|
|
#include <cutils/selector.h>
|
|
|
|
#include "loghack.h"
|
|
#include "buffer.h"
|
|
|
|
/** Number of dead peers to remember. */
|
|
#define PEER_HISTORY (16)
|
|
|
|
typedef struct sockaddr SocketAddress;
|
|
typedef struct sockaddr_un UnixAddress;
|
|
|
|
/**
|
|
* Process/user/group ID. We don't use ucred directly because it's only
|
|
* available on Linux.
|
|
*/
|
|
typedef struct {
|
|
pid_t pid;
|
|
uid_t uid;
|
|
gid_t gid;
|
|
} Credentials;
|
|
|
|
/** Listens for bytes coming from remote peers. */
|
|
typedef void BytesListener(Credentials credentials, char* bytes, size_t size);
|
|
|
|
/** Listens for the deaths of remote peers. */
|
|
typedef void DeathListener(pid_t pid);
|
|
|
|
/** Types of packets. */
|
|
typedef enum {
|
|
/** Request for a connection to another peer. */
|
|
CONNECTION_REQUEST,
|
|
|
|
/** A connection to another peer. */
|
|
CONNECTION,
|
|
|
|
/** Reports a failed connection attempt. */
|
|
CONNECTION_ERROR,
|
|
|
|
/** A generic packet of bytes. */
|
|
BYTES,
|
|
} PacketType;
|
|
|
|
typedef enum {
|
|
/** Reading a packet header. */
|
|
READING_HEADER,
|
|
|
|
/** Waiting for a connection from the master. */
|
|
ACCEPTING_CONNECTION,
|
|
|
|
/** Reading bytes. */
|
|
READING_BYTES,
|
|
} InputState;
|
|
|
|
/** A packet header. */
|
|
// TODO: Use custom headers for master->peer, peer->master, peer->peer.
|
|
typedef struct {
|
|
PacketType type;
|
|
union {
|
|
/** Packet size. Used for BYTES. */
|
|
size_t size;
|
|
|
|
/** Credentials. Used for CONNECTION and CONNECTION_REQUEST. */
|
|
Credentials credentials;
|
|
};
|
|
} Header;
|
|
|
|
/** A packet which will be sent to a peer. */
|
|
typedef struct OutgoingPacket OutgoingPacket;
|
|
struct OutgoingPacket {
|
|
/** Packet header. */
|
|
Header header;
|
|
|
|
union {
|
|
/** Connection to peer. Used with CONNECTION. */
|
|
int socket;
|
|
|
|
/** Buffer of bytes. Used with BYTES. */
|
|
Buffer* bytes;
|
|
};
|
|
|
|
/** Frees all resources associated with this packet. */
|
|
void (*free)(OutgoingPacket* packet);
|
|
|
|
/** Optional context. */
|
|
void* context;
|
|
|
|
/** Next packet in the queue. */
|
|
OutgoingPacket* nextPacket;
|
|
};
|
|
|
|
/** Represents a remote peer. */
|
|
typedef struct PeerProxy PeerProxy;
|
|
|
|
/** Local peer state. You typically have one peer per process. */
|
|
typedef struct {
|
|
/** This peer's PID. */
|
|
pid_t pid;
|
|
|
|
/**
|
|
* Map from pid to peer proxy. The peer has a peer proxy for each remote
|
|
* peer it's connected to.
|
|
*
|
|
* Acquire mutex before use.
|
|
*/
|
|
Hashmap* peerProxies;
|
|
|
|
/** Manages I/O. */
|
|
Selector* selector;
|
|
|
|
/** Used to synchronize operations with the selector thread. */
|
|
pthread_mutex_t mutex;
|
|
|
|
/** Is this peer the master? */
|
|
bool master;
|
|
|
|
/** Peer proxy for the master. */
|
|
PeerProxy* masterProxy;
|
|
|
|
/** Listens for packets from remote peers. */
|
|
BytesListener* onBytes;
|
|
|
|
/** Listens for deaths of remote peers. */
|
|
DeathListener* onDeath;
|
|
|
|
/** Keeps track of recently dead peers. Requires mutex. */
|
|
pid_t deadPeers[PEER_HISTORY];
|
|
size_t deadPeerCursor;
|
|
} Peer;
|
|
|
|
struct PeerProxy {
|
|
/** Credentials of the remote process. */
|
|
Credentials credentials;
|
|
|
|
/** Keeps track of data coming in from the remote peer. */
|
|
InputState inputState;
|
|
Buffer* inputBuffer;
|
|
PeerProxy* connecting;
|
|
|
|
/** File descriptor for this peer. */
|
|
SelectableFd* fd;
|
|
|
|
/**
|
|
* Queue of packets to be written out to the remote peer.
|
|
*
|
|
* Requires mutex.
|
|
*/
|
|
// TODO: Limit queue length.
|
|
OutgoingPacket* currentPacket;
|
|
OutgoingPacket* lastPacket;
|
|
|
|
/** Used to write outgoing header. */
|
|
Buffer outgoingHeader;
|
|
|
|
/** True if this is the master's proxy. */
|
|
bool master;
|
|
|
|
/** Reference back to the local peer. */
|
|
Peer* peer;
|
|
|
|
/**
|
|
* Used in master only. Maps this peer proxy to other peer proxies to
|
|
* which the peer has been connected to. Maps pid to PeerProxy. Helps
|
|
* keep track of which connections we've sent to whom.
|
|
*/
|
|
Hashmap* connections;
|
|
};
|
|
|
|
/** Server socket path. */
|
|
static const char* MASTER_PATH = "/master.peer";
|
|
|
|
/** Credentials of the master peer. */
|
|
static const Credentials MASTER_CREDENTIALS = {0, 0, 0};
|
|
|
|
/** Creates a peer proxy and adds it to the peer proxy map. */
|
|
static PeerProxy* peerProxyCreate(Peer* peer, Credentials credentials);
|
|
|
|
/** Sets the non-blocking flag on a descriptor. */
|
|
static void setNonBlocking(int fd) {
|
|
int flags;
|
|
if ((flags = fcntl(fd, F_GETFL, 0)) < 0) {
|
|
LOG_ALWAYS_FATAL("fcntl() error: %s", strerror(errno));
|
|
}
|
|
if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) < 0) {
|
|
LOG_ALWAYS_FATAL("fcntl() error: %s", strerror(errno));
|
|
}
|
|
}
|
|
|
|
/** Closes a fd and logs a warning if the close fails. */
|
|
static void closeWithWarning(int fd) {
|
|
int result = close(fd);
|
|
if (result == -1) {
|
|
LOGW("close() error: %s", strerror(errno));
|
|
}
|
|
}
|
|
|
|
/** Hashes pid_t keys. */
|
|
static int pidHash(void* key) {
|
|
pid_t* pid = (pid_t*) key;
|
|
return (int) (*pid);
|
|
}
|
|
|
|
/** Compares pid_t keys. */
|
|
static bool pidEquals(void* keyA, void* keyB) {
|
|
pid_t* a = (pid_t*) keyA;
|
|
pid_t* b = (pid_t*) keyB;
|
|
return *a == *b;
|
|
}
|
|
|
|
/** Gets the master address. Not thread safe. */
|
|
static UnixAddress* getMasterAddress() {
|
|
static UnixAddress masterAddress;
|
|
static bool initialized = false;
|
|
if (initialized == false) {
|
|
masterAddress.sun_family = AF_LOCAL;
|
|
strcpy(masterAddress.sun_path, MASTER_PATH);
|
|
initialized = true;
|
|
}
|
|
return &masterAddress;
|
|
}
|
|
|
|
/** Gets exclusive access to the peer for this thread. */
|
|
static void peerLock(Peer* peer) {
|
|
pthread_mutex_lock(&peer->mutex);
|
|
}
|
|
|
|
/** Releases exclusive access to the peer. */
|
|
static void peerUnlock(Peer* peer) {
|
|
pthread_mutex_unlock(&peer->mutex);
|
|
}
|
|
|
|
/** Frees a simple, i.e. header-only, outgoing packet. */
|
|
static void outgoingPacketFree(OutgoingPacket* packet) {
|
|
LOGD("Freeing outgoing packet.");
|
|
free(packet);
|
|
}
|
|
|
|
/**
|
|
* Prepare to read a new packet from the peer.
|
|
*/
|
|
static void peerProxyExpectHeader(PeerProxy* peerProxy) {
|
|
peerProxy->inputState = READING_HEADER;
|
|
bufferPrepareForRead(peerProxy->inputBuffer, sizeof(Header));
|
|
}
|
|
|
|
/** Sets up the buffer for the outgoing header. */
|
|
static void peerProxyPrepareOutgoingHeader(PeerProxy* peerProxy) {
|
|
peerProxy->outgoingHeader.data
|
|
= (char*) &(peerProxy->currentPacket->header);
|
|
peerProxy->outgoingHeader.size = sizeof(Header);
|
|
bufferPrepareForWrite(&peerProxy->outgoingHeader);
|
|
}
|
|
|
|
/** Adds a packet to the end of the queue. Callers must have the mutex. */
|
|
static void peerProxyEnqueueOutgoingPacket(PeerProxy* peerProxy,
|
|
OutgoingPacket* newPacket) {
|
|
newPacket->nextPacket = NULL; // Just in case.
|
|
if (peerProxy->currentPacket == NULL) {
|
|
// The queue is empty.
|
|
peerProxy->currentPacket = newPacket;
|
|
peerProxy->lastPacket = newPacket;
|
|
|
|
peerProxyPrepareOutgoingHeader(peerProxy);
|
|
} else {
|
|
peerProxy->lastPacket->nextPacket = newPacket;
|
|
}
|
|
}
|
|
|
|
/** Takes the peer lock and enqueues the given packet. */
|
|
static void peerProxyLockAndEnqueueOutgoingPacket(PeerProxy* peerProxy,
|
|
OutgoingPacket* newPacket) {
|
|
Peer* peer = peerProxy->peer;
|
|
peerLock(peer);
|
|
peerProxyEnqueueOutgoingPacket(peerProxy, newPacket);
|
|
peerUnlock(peer);
|
|
}
|
|
|
|
/**
|
|
* Frees current packet and moves to the next one. Returns true if there is
|
|
* a next packet or false if the queue is empty.
|
|
*/
|
|
static bool peerProxyNextPacket(PeerProxy* peerProxy) {
|
|
Peer* peer = peerProxy->peer;
|
|
peerLock(peer);
|
|
|
|
OutgoingPacket* current = peerProxy->currentPacket;
|
|
|
|
if (current == NULL) {
|
|
// The queue is already empty.
|
|
peerUnlock(peer);
|
|
return false;
|
|
}
|
|
|
|
OutgoingPacket* next = current->nextPacket;
|
|
peerProxy->currentPacket = next;
|
|
current->nextPacket = NULL;
|
|
current->free(current);
|
|
if (next == NULL) {
|
|
// The queue is empty.
|
|
peerProxy->lastPacket = NULL;
|
|
peerUnlock(peer);
|
|
return false;
|
|
} else {
|
|
peerUnlock(peer);
|
|
peerProxyPrepareOutgoingHeader(peerProxy);
|
|
|
|
// TODO: Start writing next packet? It would reduce the number of
|
|
// system calls, but we could also starve other peers.
|
|
return true;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Checks whether a peer died recently.
|
|
*/
|
|
static bool peerIsDead(Peer* peer, pid_t pid) {
|
|
size_t i;
|
|
for (i = 0; i < PEER_HISTORY; i++) {
|
|
pid_t deadPeer = peer->deadPeers[i];
|
|
if (deadPeer == 0) {
|
|
return false;
|
|
}
|
|
if (deadPeer == pid) {
|
|
return true;
|
|
}
|
|
}
|
|
return false;
|
|
}
|
|
|
|
/**
|
|
* Cleans up connection information.
|
|
*/
|
|
static bool peerProxyRemoveConnection(void* key, void* value, void* context) {
|
|
PeerProxy* deadPeer = (PeerProxy*) context;
|
|
PeerProxy* otherPeer = (PeerProxy*) value;
|
|
hashmapRemove(otherPeer->connections, &(deadPeer->credentials.pid));
|
|
return true;
|
|
}
|
|
|
|
/**
|
|
* Called when the peer dies.
|
|
*/
|
|
static void peerProxyKill(PeerProxy* peerProxy, bool errnoIsSet) {
|
|
if (errnoIsSet) {
|
|
LOGI("Peer %d died. errno: %s", peerProxy->credentials.pid,
|
|
strerror(errno));
|
|
} else {
|
|
LOGI("Peer %d died.", peerProxy->credentials.pid);
|
|
}
|
|
|
|
// If we lost the master, we're up a creek. We can't let this happen.
|
|
if (peerProxy->master) {
|
|
LOG_ALWAYS_FATAL("Lost connection to master.");
|
|
}
|
|
|
|
Peer* localPeer = peerProxy->peer;
|
|
pid_t pid = peerProxy->credentials.pid;
|
|
|
|
peerLock(localPeer);
|
|
|
|
// Remember for awhile that the peer died.
|
|
localPeer->deadPeers[localPeer->deadPeerCursor]
|
|
= peerProxy->credentials.pid;
|
|
localPeer->deadPeerCursor++;
|
|
if (localPeer->deadPeerCursor == PEER_HISTORY) {
|
|
localPeer->deadPeerCursor = 0;
|
|
}
|
|
|
|
// Remove from peer map.
|
|
hashmapRemove(localPeer->peerProxies, &pid);
|
|
|
|
// External threads can no longer get to this peer proxy, so we don't
|
|
// need the lock anymore.
|
|
peerUnlock(localPeer);
|
|
|
|
// Remove the fd from the selector.
|
|
if (peerProxy->fd != NULL) {
|
|
peerProxy->fd->remove = true;
|
|
}
|
|
|
|
// Clear outgoing packet queue.
|
|
while (peerProxyNextPacket(peerProxy)) {}
|
|
|
|
bufferFree(peerProxy->inputBuffer);
|
|
|
|
// This only applies to the master.
|
|
if (peerProxy->connections != NULL) {
|
|
// We can't leave these other maps pointing to freed memory.
|
|
hashmapForEach(peerProxy->connections, &peerProxyRemoveConnection,
|
|
peerProxy);
|
|
hashmapFree(peerProxy->connections);
|
|
}
|
|
|
|
// Invoke death listener.
|
|
localPeer->onDeath(pid);
|
|
|
|
// Free the peer proxy itself.
|
|
free(peerProxy);
|
|
}
|
|
|
|
static void peerProxyHandleError(PeerProxy* peerProxy, char* functionName) {
|
|
if (errno == EINTR) {
|
|
// Log interruptions but otherwise ignore them.
|
|
LOGW("%s() interrupted.", functionName);
|
|
} else if (errno == EAGAIN) {
|
|
LOGD("EWOULDBLOCK");
|
|
// Ignore.
|
|
} else {
|
|
LOGW("Error returned by %s().", functionName);
|
|
peerProxyKill(peerProxy, true);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Buffers output sent to a peer. May be called multiple times until the entire
|
|
* buffer is filled. Returns true when the buffer is empty.
|
|
*/
|
|
static bool peerProxyWriteFromBuffer(PeerProxy* peerProxy, Buffer* outgoing) {
|
|
ssize_t size = bufferWrite(outgoing, peerProxy->fd->fd);
|
|
if (size < 0) {
|
|
peerProxyHandleError(peerProxy, "write");
|
|
return false;
|
|
} else {
|
|
return bufferWriteComplete(outgoing);
|
|
}
|
|
}
|
|
|
|
/** Writes packet bytes to peer. */
|
|
static void peerProxyWriteBytes(PeerProxy* peerProxy) {
|
|
Buffer* buffer = peerProxy->currentPacket->bytes;
|
|
if (peerProxyWriteFromBuffer(peerProxy, buffer)) {
|
|
LOGD("Bytes written.");
|
|
peerProxyNextPacket(peerProxy);
|
|
}
|
|
}
|
|
|
|
/** Sends a socket to the peer. */
|
|
static void peerProxyWriteConnection(PeerProxy* peerProxy) {
|
|
int socket = peerProxy->currentPacket->socket;
|
|
|
|
// Why does sending and receiving fds have to be such a PITA?
|
|
struct msghdr msg;
|
|
struct iovec iov[1];
|
|
|
|
union {
|
|
struct cmsghdr cm;
|
|
char control[CMSG_SPACE(sizeof(int))];
|
|
} control_un;
|
|
|
|
struct cmsghdr *cmptr;
|
|
|
|
msg.msg_control = control_un.control;
|
|
msg.msg_controllen = sizeof(control_un.control);
|
|
cmptr = CMSG_FIRSTHDR(&msg);
|
|
cmptr->cmsg_len = CMSG_LEN(sizeof(int));
|
|
cmptr->cmsg_level = SOL_SOCKET;
|
|
cmptr->cmsg_type = SCM_RIGHTS;
|
|
|
|
// Store the socket in the message.
|
|
*((int *) CMSG_DATA(cmptr)) = peerProxy->currentPacket->socket;
|
|
|
|
msg.msg_name = NULL;
|
|
msg.msg_namelen = 0;
|
|
iov[0].iov_base = "";
|
|
iov[0].iov_len = 1;
|
|
msg.msg_iov = iov;
|
|
msg.msg_iovlen = 1;
|
|
|
|
ssize_t result = sendmsg(peerProxy->fd->fd, &msg, 0);
|
|
|
|
if (result < 0) {
|
|
peerProxyHandleError(peerProxy, "sendmsg");
|
|
} else {
|
|
// Success. Queue up the next packet.
|
|
peerProxyNextPacket(peerProxy);
|
|
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Writes some outgoing data.
|
|
*/
|
|
static void peerProxyWrite(SelectableFd* fd) {
|
|
// TODO: Try to write header and body with one system call.
|
|
|
|
PeerProxy* peerProxy = (PeerProxy*) fd->data;
|
|
OutgoingPacket* current = peerProxy->currentPacket;
|
|
|
|
if (current == NULL) {
|
|
// We have nothing left to write.
|
|
return;
|
|
}
|
|
|
|
// Write the header.
|
|
Buffer* outgoingHeader = &peerProxy->outgoingHeader;
|
|
bool headerWritten = bufferWriteComplete(outgoingHeader);
|
|
if (!headerWritten) {
|
|
LOGD("Writing header...");
|
|
headerWritten = peerProxyWriteFromBuffer(peerProxy, outgoingHeader);
|
|
if (headerWritten) {
|
|
LOGD("Header written.");
|
|
}
|
|
}
|
|
|
|
// Write body.
|
|
if (headerWritten) {
|
|
PacketType type = current->header.type;
|
|
switch (type) {
|
|
case CONNECTION:
|
|
peerProxyWriteConnection(peerProxy);
|
|
break;
|
|
case BYTES:
|
|
peerProxyWriteBytes(peerProxy);
|
|
break;
|
|
case CONNECTION_REQUEST:
|
|
case CONNECTION_ERROR:
|
|
// These packets consist solely of a header.
|
|
peerProxyNextPacket(peerProxy);
|
|
break;
|
|
default:
|
|
LOG_ALWAYS_FATAL("Unknown packet type: %d", type);
|
|
}
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Sets up a peer proxy's fd before we try to select() it.
|
|
*/
|
|
static void peerProxyBeforeSelect(SelectableFd* fd) {
|
|
LOGD("Before select...");
|
|
|
|
PeerProxy* peerProxy = (PeerProxy*) fd->data;
|
|
|
|
peerLock(peerProxy->peer);
|
|
bool hasPackets = peerProxy->currentPacket != NULL;
|
|
peerUnlock(peerProxy->peer);
|
|
|
|
if (hasPackets) {
|
|
LOGD("Packets found. Setting onWritable().");
|
|
|
|
fd->onWritable = &peerProxyWrite;
|
|
} else {
|
|
// We have nothing to write.
|
|
fd->onWritable = NULL;
|
|
}
|
|
}
|
|
|
|
/** Prepare to read bytes from the peer. */
|
|
static void peerProxyExpectBytes(PeerProxy* peerProxy, Header* header) {
|
|
LOGD("Expecting %d bytes.", header->size);
|
|
|
|
peerProxy->inputState = READING_BYTES;
|
|
if (bufferPrepareForRead(peerProxy->inputBuffer, header->size) == -1) {
|
|
LOGW("Couldn't allocate memory for incoming data. Size: %u",
|
|
(unsigned int) header->size);
|
|
|
|
// TODO: Ignore the packet and log a warning?
|
|
peerProxyKill(peerProxy, false);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Gets a peer proxy for the given ID. Creates a peer proxy if necessary.
|
|
* Sends a connection request to the master if desired.
|
|
*
|
|
* Returns NULL if an error occurs. Sets errno to EHOSTDOWN if the peer died
|
|
* or ENOMEM if memory couldn't be allocated.
|
|
*/
|
|
static PeerProxy* peerProxyGetOrCreate(Peer* peer, pid_t pid,
|
|
bool requestConnection) {
|
|
if (pid == peer->pid) {
|
|
errno = EINVAL;
|
|
return NULL;
|
|
}
|
|
|
|
if (peerIsDead(peer, pid)) {
|
|
errno = EHOSTDOWN;
|
|
return NULL;
|
|
}
|
|
|
|
PeerProxy* peerProxy = hashmapGet(peer->peerProxies, &pid);
|
|
if (peerProxy != NULL) {
|
|
return peerProxy;
|
|
}
|
|
|
|
// If this is the master peer, we already know about all peers.
|
|
if (peer->master) {
|
|
errno = EHOSTDOWN;
|
|
return NULL;
|
|
}
|
|
|
|
// Try to create a peer proxy.
|
|
Credentials credentials;
|
|
credentials.pid = pid;
|
|
|
|
// Fake gid and uid until we have the real thing. The real creds are
|
|
// filled in by masterProxyExpectConnection(). These fake creds will
|
|
// never be exposed to the user.
|
|
credentials.uid = 0;
|
|
credentials.gid = 0;
|
|
|
|
// Make sure we can allocate the connection request packet.
|
|
OutgoingPacket* packet = NULL;
|
|
if (requestConnection) {
|
|
packet = calloc(1, sizeof(OutgoingPacket));
|
|
if (packet == NULL) {
|
|
errno = ENOMEM;
|
|
return NULL;
|
|
}
|
|
|
|
packet->header.type = CONNECTION_REQUEST;
|
|
packet->header.credentials = credentials;
|
|
packet->free = &outgoingPacketFree;
|
|
}
|
|
|
|
peerProxy = peerProxyCreate(peer, credentials);
|
|
if (peerProxy == NULL) {
|
|
free(packet);
|
|
errno = ENOMEM;
|
|
return NULL;
|
|
} else {
|
|
// Send a connection request to the master.
|
|
if (requestConnection) {
|
|
PeerProxy* masterProxy = peer->masterProxy;
|
|
peerProxyEnqueueOutgoingPacket(masterProxy, packet);
|
|
}
|
|
|
|
return peerProxy;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Switches the master peer proxy into a state where it's waiting for a
|
|
* connection from the master.
|
|
*/
|
|
static void masterProxyExpectConnection(PeerProxy* masterProxy,
|
|
Header* header) {
|
|
// TODO: Restructure things so we don't need this check.
|
|
// Verify that this really is the master.
|
|
if (!masterProxy->master) {
|
|
LOGW("Non-master process %d tried to send us a connection.",
|
|
masterProxy->credentials.pid);
|
|
// Kill off the evil peer.
|
|
peerProxyKill(masterProxy, false);
|
|
return;
|
|
}
|
|
|
|
masterProxy->inputState = ACCEPTING_CONNECTION;
|
|
Peer* localPeer = masterProxy->peer;
|
|
|
|
// Create a peer proxy so we have somewhere to stash the creds.
|
|
// See if we already have a proxy set up.
|
|
pid_t pid = header->credentials.pid;
|
|
peerLock(localPeer);
|
|
PeerProxy* peerProxy = peerProxyGetOrCreate(localPeer, pid, false);
|
|
if (peerProxy == NULL) {
|
|
LOGW("Peer proxy creation failed: %s", strerror(errno));
|
|
} else {
|
|
// Fill in full credentials.
|
|
peerProxy->credentials = header->credentials;
|
|
}
|
|
peerUnlock(localPeer);
|
|
|
|
// Keep track of which peer proxy we're accepting a connection for.
|
|
masterProxy->connecting = peerProxy;
|
|
}
|
|
|
|
/**
|
|
* Reads input from a peer process.
|
|
*/
|
|
static void peerProxyRead(SelectableFd* fd);
|
|
|
|
/** Sets up fd callbacks. */
|
|
static void peerProxySetFd(PeerProxy* peerProxy, SelectableFd* fd) {
|
|
peerProxy->fd = fd;
|
|
fd->data = peerProxy;
|
|
fd->onReadable = &peerProxyRead;
|
|
fd->beforeSelect = &peerProxyBeforeSelect;
|
|
|
|
// Make the socket non-blocking.
|
|
setNonBlocking(fd->fd);
|
|
}
|
|
|
|
/**
|
|
* Accepts a connection sent by the master proxy.
|
|
*/
|
|
static void masterProxyAcceptConnection(PeerProxy* masterProxy) {
|
|
struct msghdr msg;
|
|
struct iovec iov[1];
|
|
ssize_t size;
|
|
char ignored;
|
|
int incomingFd;
|
|
|
|
// TODO: Reuse code which writes the connection. Who the heck designed
|
|
// this API anyway?
|
|
union {
|
|
struct cmsghdr cm;
|
|
char control[CMSG_SPACE(sizeof(int))];
|
|
} control_un;
|
|
struct cmsghdr *cmptr;
|
|
msg.msg_control = control_un.control;
|
|
msg.msg_controllen = sizeof(control_un.control);
|
|
|
|
msg.msg_name = NULL;
|
|
msg.msg_namelen = 0;
|
|
|
|
// We sent 1 byte of data so we can detect EOF.
|
|
iov[0].iov_base = &ignored;
|
|
iov[0].iov_len = 1;
|
|
msg.msg_iov = iov;
|
|
msg.msg_iovlen = 1;
|
|
|
|
size = recvmsg(masterProxy->fd->fd, &msg, 0);
|
|
if (size < 0) {
|
|
if (errno == EINTR) {
|
|
// Log interruptions but otherwise ignore them.
|
|
LOGW("recvmsg() interrupted.");
|
|
return;
|
|
} else if (errno == EAGAIN) {
|
|
// Keep waiting for the connection.
|
|
return;
|
|
} else {
|
|
LOG_ALWAYS_FATAL("Error reading connection from master: %s",
|
|
strerror(errno));
|
|
}
|
|
} else if (size == 0) {
|
|
// EOF.
|
|
LOG_ALWAYS_FATAL("Received EOF from master.");
|
|
}
|
|
|
|
// Extract fd from message.
|
|
if ((cmptr = CMSG_FIRSTHDR(&msg)) != NULL
|
|
&& cmptr->cmsg_len == CMSG_LEN(sizeof(int))) {
|
|
if (cmptr->cmsg_level != SOL_SOCKET) {
|
|
LOG_ALWAYS_FATAL("Expected SOL_SOCKET.");
|
|
}
|
|
if (cmptr->cmsg_type != SCM_RIGHTS) {
|
|
LOG_ALWAYS_FATAL("Expected SCM_RIGHTS.");
|
|
}
|
|
incomingFd = *((int*) CMSG_DATA(cmptr));
|
|
} else {
|
|
LOG_ALWAYS_FATAL("Expected fd.");
|
|
}
|
|
|
|
// The peer proxy this connection is for.
|
|
PeerProxy* peerProxy = masterProxy->connecting;
|
|
if (peerProxy == NULL) {
|
|
LOGW("Received connection for unknown peer.");
|
|
closeWithWarning(incomingFd);
|
|
} else {
|
|
Peer* peer = masterProxy->peer;
|
|
|
|
SelectableFd* selectableFd = selectorAdd(peer->selector, incomingFd);
|
|
if (selectableFd == NULL) {
|
|
LOGW("Error adding fd to selector for %d.",
|
|
peerProxy->credentials.pid);
|
|
closeWithWarning(incomingFd);
|
|
peerProxyKill(peerProxy, false);
|
|
}
|
|
|
|
peerProxySetFd(peerProxy, selectableFd);
|
|
}
|
|
|
|
peerProxyExpectHeader(masterProxy);
|
|
}
|
|
|
|
/**
|
|
* Frees an outgoing packet containing a connection.
|
|
*/
|
|
static void outgoingPacketFreeSocket(OutgoingPacket* packet) {
|
|
closeWithWarning(packet->socket);
|
|
outgoingPacketFree(packet);
|
|
}
|
|
|
|
/**
|
|
* Connects two known peers.
|
|
*/
|
|
static void masterConnectPeers(PeerProxy* peerA, PeerProxy* peerB) {
|
|
int sockets[2];
|
|
int result = socketpair(AF_LOCAL, SOCK_STREAM, 0, sockets);
|
|
if (result == -1) {
|
|
LOGW("socketpair() error: %s", strerror(errno));
|
|
// TODO: Send CONNECTION_FAILED packets to peers.
|
|
return;
|
|
}
|
|
|
|
OutgoingPacket* packetA = calloc(1, sizeof(OutgoingPacket));
|
|
OutgoingPacket* packetB = calloc(1, sizeof(OutgoingPacket));
|
|
if (packetA == NULL || packetB == NULL) {
|
|
free(packetA);
|
|
free(packetB);
|
|
LOGW("malloc() error. Failed to tell process %d that process %d is"
|
|
" dead.", peerA->credentials.pid, peerB->credentials.pid);
|
|
return;
|
|
}
|
|
|
|
packetA->header.type = CONNECTION;
|
|
packetB->header.type = CONNECTION;
|
|
|
|
packetA->header.credentials = peerB->credentials;
|
|
packetB->header.credentials = peerA->credentials;
|
|
|
|
packetA->socket = sockets[0];
|
|
packetB->socket = sockets[1];
|
|
|
|
packetA->free = &outgoingPacketFreeSocket;
|
|
packetB->free = &outgoingPacketFreeSocket;
|
|
|
|
peerLock(peerA->peer);
|
|
peerProxyEnqueueOutgoingPacket(peerA, packetA);
|
|
peerProxyEnqueueOutgoingPacket(peerB, packetB);
|
|
peerUnlock(peerA->peer);
|
|
}
|
|
|
|
/**
|
|
* Informs a peer that the peer they're trying to connect to couldn't be
|
|
* found.
|
|
*/
|
|
static void masterReportConnectionError(PeerProxy* peerProxy,
|
|
Credentials credentials) {
|
|
OutgoingPacket* packet = calloc(1, sizeof(OutgoingPacket));
|
|
if (packet == NULL) {
|
|
LOGW("malloc() error. Failed to tell process %d that process %d is"
|
|
" dead.", peerProxy->credentials.pid, credentials.pid);
|
|
return;
|
|
}
|
|
|
|
packet->header.type = CONNECTION_ERROR;
|
|
packet->header.credentials = credentials;
|
|
packet->free = &outgoingPacketFree;
|
|
|
|
peerProxyLockAndEnqueueOutgoingPacket(peerProxy, packet);
|
|
}
|
|
|
|
/**
|
|
* Handles a request to be connected to another peer.
|
|
*/
|
|
static void masterHandleConnectionRequest(PeerProxy* peerProxy,
|
|
Header* header) {
|
|
Peer* master = peerProxy->peer;
|
|
pid_t targetPid = header->credentials.pid;
|
|
if (!hashmapContainsKey(peerProxy->connections, &targetPid)) {
|
|
// We haven't connected these peers yet.
|
|
PeerProxy* targetPeer
|
|
= (PeerProxy*) hashmapGet(master->peerProxies, &targetPid);
|
|
if (targetPeer == NULL) {
|
|
// Unknown process.
|
|
masterReportConnectionError(peerProxy, header->credentials);
|
|
} else {
|
|
masterConnectPeers(peerProxy, targetPeer);
|
|
}
|
|
}
|
|
|
|
// This packet is complete. Get ready for the next one.
|
|
peerProxyExpectHeader(peerProxy);
|
|
}
|
|
|
|
/**
|
|
* The master told us this peer is dead.
|
|
*/
|
|
static void masterProxyHandleConnectionError(PeerProxy* masterProxy,
|
|
Header* header) {
|
|
Peer* peer = masterProxy->peer;
|
|
|
|
// Look up the peer proxy.
|
|
pid_t pid = header->credentials.pid;
|
|
PeerProxy* peerProxy = NULL;
|
|
peerLock(peer);
|
|
peerProxy = hashmapGet(peer->peerProxies, &pid);
|
|
peerUnlock(peer);
|
|
|
|
if (peerProxy != NULL) {
|
|
LOGI("Couldn't connect to %d.", pid);
|
|
peerProxyKill(peerProxy, false);
|
|
} else {
|
|
LOGW("Peer proxy for %d not found. This shouldn't happen.", pid);
|
|
}
|
|
|
|
peerProxyExpectHeader(masterProxy);
|
|
}
|
|
|
|
/**
|
|
* Handles a packet header.
|
|
*/
|
|
static void peerProxyHandleHeader(PeerProxy* peerProxy, Header* header) {
|
|
switch (header->type) {
|
|
case CONNECTION_REQUEST:
|
|
masterHandleConnectionRequest(peerProxy, header);
|
|
break;
|
|
case CONNECTION:
|
|
masterProxyExpectConnection(peerProxy, header);
|
|
break;
|
|
case CONNECTION_ERROR:
|
|
masterProxyHandleConnectionError(peerProxy, header);
|
|
break;
|
|
case BYTES:
|
|
peerProxyExpectBytes(peerProxy, header);
|
|
break;
|
|
default:
|
|
LOGW("Invalid packet type from %d: %d", peerProxy->credentials.pid,
|
|
header->type);
|
|
peerProxyKill(peerProxy, false);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Buffers input sent by peer. May be called multiple times until the entire
|
|
* buffer is filled. Returns true when the buffer is full.
|
|
*/
|
|
static bool peerProxyBufferInput(PeerProxy* peerProxy) {
|
|
Buffer* in = peerProxy->inputBuffer;
|
|
ssize_t size = bufferRead(in, peerProxy->fd->fd);
|
|
if (size < 0) {
|
|
peerProxyHandleError(peerProxy, "read");
|
|
return false;
|
|
} else if (size == 0) {
|
|
// EOF.
|
|
LOGI("EOF");
|
|
peerProxyKill(peerProxy, false);
|
|
return false;
|
|
} else if (bufferReadComplete(in)) {
|
|
// We're done!
|
|
return true;
|
|
} else {
|
|
// Continue reading.
|
|
return false;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Reads input from a peer process.
|
|
*/
|
|
static void peerProxyRead(SelectableFd* fd) {
|
|
LOGD("Reading...");
|
|
PeerProxy* peerProxy = (PeerProxy*) fd->data;
|
|
int state = peerProxy->inputState;
|
|
Buffer* in = peerProxy->inputBuffer;
|
|
switch (state) {
|
|
case READING_HEADER:
|
|
if (peerProxyBufferInput(peerProxy)) {
|
|
LOGD("Header read.");
|
|
// We've read the complete header.
|
|
Header* header = (Header*) in->data;
|
|
peerProxyHandleHeader(peerProxy, header);
|
|
}
|
|
break;
|
|
case READING_BYTES:
|
|
LOGD("Reading bytes...");
|
|
if (peerProxyBufferInput(peerProxy)) {
|
|
LOGD("Bytes read.");
|
|
// We have the complete packet. Notify bytes listener.
|
|
peerProxy->peer->onBytes(peerProxy->credentials,
|
|
in->data, in->size);
|
|
|
|
// Get ready for the next packet.
|
|
peerProxyExpectHeader(peerProxy);
|
|
}
|
|
break;
|
|
case ACCEPTING_CONNECTION:
|
|
masterProxyAcceptConnection(peerProxy);
|
|
break;
|
|
default:
|
|
LOG_ALWAYS_FATAL("Unknown state: %d", state);
|
|
}
|
|
}
|
|
|
|
static PeerProxy* peerProxyCreate(Peer* peer, Credentials credentials) {
|
|
PeerProxy* peerProxy = calloc(1, sizeof(PeerProxy));
|
|
if (peerProxy == NULL) {
|
|
return NULL;
|
|
}
|
|
|
|
peerProxy->inputBuffer = bufferCreate(sizeof(Header));
|
|
if (peerProxy->inputBuffer == NULL) {
|
|
free(peerProxy);
|
|
return NULL;
|
|
}
|
|
|
|
peerProxy->peer = peer;
|
|
peerProxy->credentials = credentials;
|
|
|
|
// Initial state == expecting a header.
|
|
peerProxyExpectHeader(peerProxy);
|
|
|
|
// Add this proxy to the map. Make sure the key points to the stable memory
|
|
// inside of the peer proxy itself.
|
|
pid_t* pid = &(peerProxy->credentials.pid);
|
|
hashmapPut(peer->peerProxies, pid, peerProxy);
|
|
return peerProxy;
|
|
}
|
|
|
|
/** Accepts a connection to the master peer. */
|
|
static void masterAcceptConnection(SelectableFd* listenerFd) {
|
|
// Accept connection.
|
|
int socket = accept(listenerFd->fd, NULL, NULL);
|
|
if (socket == -1) {
|
|
LOGW("accept() error: %s", strerror(errno));
|
|
return;
|
|
}
|
|
|
|
LOGD("Accepted connection as fd %d.", socket);
|
|
|
|
// Get credentials.
|
|
Credentials credentials;
|
|
struct ucred ucredentials;
|
|
socklen_t credentialsSize = sizeof(struct ucred);
|
|
int result = getsockopt(socket, SOL_SOCKET, SO_PEERCRED,
|
|
&ucredentials, &credentialsSize);
|
|
// We might want to verify credentialsSize.
|
|
if (result == -1) {
|
|
LOGW("getsockopt() error: %s", strerror(errno));
|
|
closeWithWarning(socket);
|
|
return;
|
|
}
|
|
|
|
// Copy values into our own structure so we know we have the types right.
|
|
credentials.pid = ucredentials.pid;
|
|
credentials.uid = ucredentials.uid;
|
|
credentials.gid = ucredentials.gid;
|
|
|
|
LOGI("Accepted connection from process %d.", credentials.pid);
|
|
|
|
Peer* masterPeer = (Peer*) listenerFd->data;
|
|
|
|
peerLock(masterPeer);
|
|
|
|
// Make sure we don't already have a connection from that process.
|
|
PeerProxy* peerProxy
|
|
= hashmapGet(masterPeer->peerProxies, &credentials.pid);
|
|
if (peerProxy != NULL) {
|
|
peerUnlock(masterPeer);
|
|
LOGW("Alread connected to process %d.", credentials.pid);
|
|
closeWithWarning(socket);
|
|
return;
|
|
}
|
|
|
|
// Add connection to the selector.
|
|
SelectableFd* socketFd = selectorAdd(masterPeer->selector, socket);
|
|
if (socketFd == NULL) {
|
|
peerUnlock(masterPeer);
|
|
LOGW("malloc() failed.");
|
|
closeWithWarning(socket);
|
|
return;
|
|
}
|
|
|
|
// Create a peer proxy.
|
|
peerProxy = peerProxyCreate(masterPeer, credentials);
|
|
peerUnlock(masterPeer);
|
|
if (peerProxy == NULL) {
|
|
LOGW("malloc() failed.");
|
|
socketFd->remove = true;
|
|
closeWithWarning(socket);
|
|
}
|
|
peerProxy->connections = hashmapCreate(10, &pidHash, &pidEquals);
|
|
peerProxySetFd(peerProxy, socketFd);
|
|
}
|
|
|
|
/**
|
|
* Creates the local peer.
|
|
*/
|
|
static Peer* peerCreate() {
|
|
Peer* peer = calloc(1, sizeof(Peer));
|
|
if (peer == NULL) {
|
|
LOG_ALWAYS_FATAL("malloc() error.");
|
|
}
|
|
peer->peerProxies = hashmapCreate(10, &pidHash, &pidEquals);
|
|
peer->selector = selectorCreate();
|
|
|
|
pthread_mutexattr_t attributes;
|
|
if (pthread_mutexattr_init(&attributes) != 0) {
|
|
LOG_ALWAYS_FATAL("pthread_mutexattr_init() error.");
|
|
}
|
|
if (pthread_mutexattr_settype(&attributes, PTHREAD_MUTEX_RECURSIVE) != 0) {
|
|
LOG_ALWAYS_FATAL("pthread_mutexattr_settype() error.");
|
|
}
|
|
if (pthread_mutex_init(&peer->mutex, &attributes) != 0) {
|
|
LOG_ALWAYS_FATAL("pthread_mutex_init() error.");
|
|
}
|
|
|
|
peer->pid = getpid();
|
|
return peer;
|
|
}
|
|
|
|
/** The local peer. */
|
|
static Peer* localPeer;
|
|
|
|
/** Frees a packet of bytes. */
|
|
static void outgoingPacketFreeBytes(OutgoingPacket* packet) {
|
|
LOGD("Freeing outgoing packet.");
|
|
bufferFree(packet->bytes);
|
|
free(packet);
|
|
}
|
|
|
|
/**
|
|
* Sends a packet of bytes to a remote peer. Returns 0 on success.
|
|
*
|
|
* Returns -1 if an error occurs. Sets errno to ENOMEM if memory couldn't be
|
|
* allocated. Sets errno to EHOSTDOWN if the peer died recently. Sets errno
|
|
* to EINVAL if pid is the same as the local pid.
|
|
*/
|
|
int peerSendBytes(pid_t pid, const char* bytes, size_t size) {
|
|
Peer* peer = localPeer;
|
|
assert(peer != NULL);
|
|
|
|
OutgoingPacket* packet = calloc(1, sizeof(OutgoingPacket));
|
|
if (packet == NULL) {
|
|
errno = ENOMEM;
|
|
return -1;
|
|
}
|
|
|
|
Buffer* copy = bufferCreate(size);
|
|
if (copy == NULL) {
|
|
free(packet);
|
|
errno = ENOMEM;
|
|
return -1;
|
|
}
|
|
|
|
// Copy data.
|
|
memcpy(copy->data, bytes, size);
|
|
copy->size = size;
|
|
|
|
packet->bytes = copy;
|
|
packet->header.type = BYTES;
|
|
packet->header.size = size;
|
|
packet->free = outgoingPacketFreeBytes;
|
|
bufferPrepareForWrite(packet->bytes);
|
|
|
|
peerLock(peer);
|
|
|
|
PeerProxy* peerProxy = peerProxyGetOrCreate(peer, pid, true);
|
|
if (peerProxy == NULL) {
|
|
// The peer is already dead or we couldn't alloc memory. Either way,
|
|
// errno is set.
|
|
peerUnlock(peer);
|
|
packet->free(packet);
|
|
return -1;
|
|
} else {
|
|
peerProxyEnqueueOutgoingPacket(peerProxy, packet);
|
|
peerUnlock(peer);
|
|
selectorWakeUp(peer->selector);
|
|
return 0;
|
|
}
|
|
}
|
|
|
|
/** Keeps track of how to free shared bytes. */
|
|
typedef struct {
|
|
void (*free)(void* context);
|
|
void* context;
|
|
} SharedBytesFreer;
|
|
|
|
/** Frees shared bytes. */
|
|
static void outgoingPacketFreeSharedBytes(OutgoingPacket* packet) {
|
|
SharedBytesFreer* sharedBytesFreer
|
|
= (SharedBytesFreer*) packet->context;
|
|
sharedBytesFreer->free(sharedBytesFreer->context);
|
|
free(sharedBytesFreer);
|
|
free(packet);
|
|
}
|
|
|
|
/**
|
|
* Sends a packet of bytes to a remote peer without copying the bytes. Calls
|
|
* free() with context after the bytes have been sent.
|
|
*
|
|
* Returns -1 if an error occurs. Sets errno to ENOMEM if memory couldn't be
|
|
* allocated. Sets errno to EHOSTDOWN if the peer died recently. Sets errno
|
|
* to EINVAL if pid is the same as the local pid.
|
|
*/
|
|
int peerSendSharedBytes(pid_t pid, char* bytes, size_t size,
|
|
void (*free)(void* context), void* context) {
|
|
Peer* peer = localPeer;
|
|
assert(peer != NULL);
|
|
|
|
OutgoingPacket* packet = calloc(1, sizeof(OutgoingPacket));
|
|
if (packet == NULL) {
|
|
errno = ENOMEM;
|
|
return -1;
|
|
}
|
|
|
|
Buffer* wrapper = bufferWrap(bytes, size, size);
|
|
if (wrapper == NULL) {
|
|
free(packet);
|
|
errno = ENOMEM;
|
|
return -1;
|
|
}
|
|
|
|
SharedBytesFreer* sharedBytesFreer = malloc(sizeof(SharedBytesFreer));
|
|
if (sharedBytesFreer == NULL) {
|
|
free(packet);
|
|
free(wrapper);
|
|
errno = ENOMEM;
|
|
return -1;
|
|
}
|
|
sharedBytesFreer->free = free;
|
|
sharedBytesFreer->context = context;
|
|
|
|
packet->bytes = wrapper;
|
|
packet->context = sharedBytesFreer;
|
|
packet->header.type = BYTES;
|
|
packet->header.size = size;
|
|
packet->free = &outgoingPacketFreeSharedBytes;
|
|
bufferPrepareForWrite(packet->bytes);
|
|
|
|
peerLock(peer);
|
|
|
|
PeerProxy* peerProxy = peerProxyGetOrCreate(peer, pid, true);
|
|
if (peerProxy == NULL) {
|
|
// The peer is already dead or we couldn't alloc memory. Either way,
|
|
// errno is set.
|
|
peerUnlock(peer);
|
|
packet->free(packet);
|
|
return -1;
|
|
} else {
|
|
peerProxyEnqueueOutgoingPacket(peerProxy, packet);
|
|
peerUnlock(peer);
|
|
selectorWakeUp(peer->selector);
|
|
return 0;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Starts the master peer. The master peer differs from other peers in that
|
|
* it is responsible for connecting the other peers. You can only have one
|
|
* master peer.
|
|
*
|
|
* Goes into an I/O loop and does not return.
|
|
*/
|
|
void masterPeerInitialize(BytesListener* bytesListener,
|
|
DeathListener* deathListener) {
|
|
// Create and bind socket.
|
|
int listenerSocket = socket(AF_LOCAL, SOCK_STREAM, 0);
|
|
if (listenerSocket == -1) {
|
|
LOG_ALWAYS_FATAL("socket() error: %s", strerror(errno));
|
|
}
|
|
unlink(MASTER_PATH);
|
|
int result = bind(listenerSocket, (SocketAddress*) getMasterAddress(),
|
|
sizeof(UnixAddress));
|
|
if (result == -1) {
|
|
LOG_ALWAYS_FATAL("bind() error: %s", strerror(errno));
|
|
}
|
|
|
|
LOGD("Listener socket: %d", listenerSocket);
|
|
|
|
// Queue up to 16 connections.
|
|
result = listen(listenerSocket, 16);
|
|
if (result != 0) {
|
|
LOG_ALWAYS_FATAL("listen() error: %s", strerror(errno));
|
|
}
|
|
|
|
// Make socket non-blocking.
|
|
setNonBlocking(listenerSocket);
|
|
|
|
// Create the peer for this process. Fail if we already have one.
|
|
if (localPeer != NULL) {
|
|
LOG_ALWAYS_FATAL("Peer is already initialized.");
|
|
}
|
|
localPeer = peerCreate();
|
|
if (localPeer == NULL) {
|
|
LOG_ALWAYS_FATAL("malloc() failed.");
|
|
}
|
|
localPeer->master = true;
|
|
localPeer->onBytes = bytesListener;
|
|
localPeer->onDeath = deathListener;
|
|
|
|
// Make listener socket selectable.
|
|
SelectableFd* listenerFd = selectorAdd(localPeer->selector, listenerSocket);
|
|
if (listenerFd == NULL) {
|
|
LOG_ALWAYS_FATAL("malloc() error.");
|
|
}
|
|
listenerFd->data = localPeer;
|
|
listenerFd->onReadable = &masterAcceptConnection;
|
|
}
|
|
|
|
/**
|
|
* Starts a local peer.
|
|
*
|
|
* Goes into an I/O loop and does not return.
|
|
*/
|
|
void peerInitialize(BytesListener* bytesListener,
|
|
DeathListener* deathListener) {
|
|
// Connect to master peer.
|
|
int masterSocket = socket(AF_LOCAL, SOCK_STREAM, 0);
|
|
if (masterSocket == -1) {
|
|
LOG_ALWAYS_FATAL("socket() error: %s", strerror(errno));
|
|
}
|
|
int result = connect(masterSocket, (SocketAddress*) getMasterAddress(),
|
|
sizeof(UnixAddress));
|
|
if (result != 0) {
|
|
LOG_ALWAYS_FATAL("connect() error: %s", strerror(errno));
|
|
}
|
|
|
|
// Create the peer for this process. Fail if we already have one.
|
|
if (localPeer != NULL) {
|
|
LOG_ALWAYS_FATAL("Peer is already initialized.");
|
|
}
|
|
localPeer = peerCreate();
|
|
if (localPeer == NULL) {
|
|
LOG_ALWAYS_FATAL("malloc() failed.");
|
|
}
|
|
localPeer->onBytes = bytesListener;
|
|
localPeer->onDeath = deathListener;
|
|
|
|
// Make connection selectable.
|
|
SelectableFd* masterFd = selectorAdd(localPeer->selector, masterSocket);
|
|
if (masterFd == NULL) {
|
|
LOG_ALWAYS_FATAL("malloc() error.");
|
|
}
|
|
|
|
// Create a peer proxy for the master peer.
|
|
PeerProxy* masterProxy = peerProxyCreate(localPeer, MASTER_CREDENTIALS);
|
|
if (masterProxy == NULL) {
|
|
LOG_ALWAYS_FATAL("malloc() error.");
|
|
}
|
|
peerProxySetFd(masterProxy, masterFd);
|
|
masterProxy->master = true;
|
|
localPeer->masterProxy = masterProxy;
|
|
}
|
|
|
|
/** Starts the master peer I/O loop. Doesn't return. */
|
|
void peerLoop() {
|
|
assert(localPeer != NULL);
|
|
|
|
// Start selector.
|
|
selectorLoop(localPeer->selector);
|
|
}
|
|
|