9. diel - Java server - Event bus
V predchádzajúcom kvíze, Kvíz - Správa klientov a spojení, zapisovacie vlákno v Jave, sme si overili nadobudnuté skúsenosti z predchádzajúcich lekcií.
V minulej lekcii, Kvíz - Správa klientov a spojení, zapisovacie vlákno v Jave , sme navrhli svoj komunikačný protokol a konečne komunikáciu servera s klientom úspešne otestovali. Dnes vytvoríme jednoduchú event bus, pomocou ktorej budeme propagovať udalosti naprieč celým serverom.
Event bus
Ide o mechanizmus, ktorý dovoľuje komunikáciu medzi dvoma rôznym komponentmi, bez toho aby o sebe vedeli. Jedna komponenta vyprodukuje udalosť a je jej jedno, koľko počúvajúcich komponentov túto udalosť zachytí a zareaguje na nej.
Návrh rozhrania
V balíčku core
vytvoríme nový balíček s
názvom event
. Všetky triedy, ktoré teraz
vytvoríme, sa budú nachádzať v tomto balíčku, pokiaľ neuvedu inak.
Najskôr si navrhneme rozhrania a potom ich implementujeme. Rozhranie vytvoríme
nasledovné:
IEvent
- rozhranie predstavujúce vzniknutú udalosťIEventHandler
- rozhranie reagujúci na vzniknutú udalosťIEventBus
- rozhranie umožňujúce správu udalostí
Rozhranie IEvent
bude obsahovať jedinú
metódu getEventType()
, ktorá bude vracať jedinečný
identifikátor vzniknutej udalosti:
public interface IEvent { String getEventType(); }
Rozhranie IEventHandler
tiež bude obsahovať
jednu metódu handleEvent()
, ktorú budú musieť prekryť všetci,
ktorí budú chcieť počúvať danú udalosť:
@FunctionalInterface public interface IEventHandler { void handleEvent(IEvent event); }
Nakoniec rozhranie IEventBus
, ktoré obsahuje
metódy pre prihlásenie / odhlásenie odberu udalostí a metódu pre vyvolanie
udalosti:
public interface IEventBus { void registerEventHandler(String messageType, IEventHandler listener); void unregisterEventHandler(String messageType, IEventHandler listener); void publishEvent(IEvent event); }
Implementácia
Jediné rozhranie, ktoré budeme implementovať v balíčku
core
, je rozhranie IEventBus
, pomocou
rovnomennej triedy EventBus
:
@Singleton public class EventBus implements IEventBus { private final Map<String, List<IEventHandler>> listenerMap = new HashMap<>(); @Override public void registerEventHandler(String messageType, IEventHandler listener) { List<IEventHandler> listeners = listenerMap .computeIfAbsent(messageType, k -> new ArrayList<>()); listeners.add(listener); } @Override public void unregisterEventHandler(String messageType, IEventHandler listener) { final List<IEventHandler> listeners = listenerMap .getOrDefault(messageType, Collections.emptyList()); listeners.remove(listener); } @Override public void publishEvent(IEvent event) { final List<IEventHandler> handlers = listenerMap .getOrDefault(event.getEventType(), Collections.emptyList()); handlers.forEach(handler -> handler.handleEvent(event)); } }
Trieda obsahuje jednu triedny konštantu listenerMap
, do ktorej
sa budú ukladať poslucháča jednotlivých udalostí. Mapa ako kľúč
používa String
, ktorý budeme získavať pomocou
metódy getEventType()
z rozhrania
IEvent
. Hodnota v mape je kolekcia poslucháčov,
aby sa k jednej udalosti mohlo zaregistrovať viac poslucháčov.
Nakoniec zaregistrujeme event bus do modulu pre Guice:
bind(IEventBus.class).to(EventBus.class);
Použitie
Pre zatiaľ sa obmedzíme na publikovanie udalostí, konkrétne:
- užívateľ sa pripojil
- používateľ sa odpojil
- server prijal správu od užívateľa
Konzumovanie týchto udalostí necháme na budúce implementáciu.
Informáciu, že sa užívateľ pripojil / odpojil, môže publikovať trieda
ConnectionManager
, pretože tá jediná sa to
dozvie. Pridáme teda do balíčka connection
dve
triedy predstavujúce udalosť pripojenie a odpojenie:
ClientConnectedEvent
:
public class ClientConnectedEvent implements IEvent { public static final String EVENT_TYPE = "client-connected"; private final Client client; ClientConnectedEvent(Client client) { this.client = client; } public Client getClient() { return client; } @Override public String getEventType() { return EVENT_TYPE; } }
A ClientDisconnectedEvent
:
public class ClientDisconnectedEvent implements IEvent { public static final String EVENT_TYPE = "client-disonnected"; private final Client client; ClientDisconnectedEvent(Client client) { this.client = client; } public Client getClient() { return client; } @Override public String getEventType() { return EVENT_TYPE; } }
Do triedy ConnectionManager
pridáme novú
triednu konštantu typu IEventBus
a necháme ju
inicializovať v konštruktory z parametra:
private final IEventBus eventBus; @Inject public ConnectionManager(IClientDispatcher clientDispatcher, IWriterThread writerThread, IEventBus eventBus, ExecutorService pool, int maxClients) { this.clientDispatcher = clientDispatcher; this.writerThread = writerThread; this.eventBus = eventBus; this.pool = pool; this.maxClients = maxClients; }
Samotné udalosti budeme vyvolávať v metóde
insertClientToListOrQueue()
. Udalosť, že klient sa odpojil,
publikujeme hneď potom, čo odstránime klienta zo zoznamu klientov:
client.setConnectionClosedListener(() -> { clients.remove(client); // Vytvoření nové události eventBus.publishEvent(new ClientDisconnectedEvent(client)); if (clientDispatcher.hasClientInQueue()) { this.insertClientToListOrQueue(clientDispatcher.getClientFromQueue()); } });
Udalosť o pripojenom klientovi publikujeme po vložení klienta do threadpoolu:
pool.submit(client);
eventBus.publishEvent(new ClientConnectedEvent(client));
Posledná udalosť, ktorú budeme publikovať, je udalosť prijaté správy od klienta. Najskôr vytvoríme triedu, ktorú budeme reprezentovať danú udalosť:
public class MessageReceivedEvent implements IEvent { private final IMessage receivedMessage; private final Client client; MessageReceivedEvent(IMessage receivedMessage, Client client) { this.receivedMessage = receivedMessage; this.client = client; } @Override public String getEventType() { return receivedMessage.getType(); } public IMessage getReceivedMessage() { return receivedMessage; } public Client getClient() { return client; } }
Teraz sa presunieme do triedy Client
. Opäť tu
vytvoríme triedny konštantu typu IEventBus
a
inicializujeme ju v konstruktoru pomocou parametra:
private final IEventBus eventBus; Client(Socket socket, IWriterThread writerThread, IEventBus eventBus) throws IOException { this.socket = socket; writer = new ObjectOutputStream(socket.getOutputStream()); this.writerThread = writerThread; this.eventBus = eventBus; }
Nakoniec v metóde run()
keď prijmeme správu, tak ju
publikujeme:
IMessage received; while ((received = (IMessage) reader.readObject()) != null) { eventBus.publishEvent(new MessageReceivedEvent(received, this)); }
Tým by sme mali hotové publikovanie udalostí naprieč celým serverom. V budúcej lekcii, Java server - Systém pluginov , naimplementujeme systém pluginov pre ľahkú rozšíriteľnosť servera o novú funkcionalitu.
Mal si s čímkoľvek problém? Stiahni si vzorovú aplikáciu nižšie a porovnaj ju so svojím projektom, chybu tak ľahko nájdeš.
Stiahnuť
Stiahnutím nasledujúceho súboru súhlasíš s licenčnými podmienkami
Stiahnuté 19x (143.33 kB)
Aplikácia je vrátane zdrojových kódov v jazyku Java