Révision 96454bcd
Ajouté par Sylvain Sauvage il y a plus de 11 ans
IoTa-DiscoveryWS/IoTa-DiscoveryWS/src/main/java/fr/unicaen/iota/discovery/server/util/Publisher.java | ||
---|---|---|
import fr.unicaen.iota.discovery.server.hibernate.Event;
|
||
import fr.unicaen.iota.discovery.server.hibernate.EventToPublish;
|
||
import fr.unicaen.iota.discovery.server.query.QueryOperationsModule;
|
||
import fr.unicaen.iota.discovery.server.util.EPCUtilities.InvalidFormatException;
|
||
import fr.unicaen.iota.nu.EPCUtilities;
|
||
import fr.unicaen.iota.nu.EPCUtilities.InvalidFormatException;
|
||
import fr.unicaen.iota.nu.ONSEntryType;
|
||
import fr.unicaen.iota.nu.ONSOperation;
|
||
import java.rmi.RemoteException;
|
||
import java.util.*;
|
||
import org.apache.commons.logging.Log;
|
||
... | ... | |
public class Publisher extends Thread {
|
||
|
||
private static final Log log = LogFactory.getLog(Publisher.class);
|
||
private QueryOperationsModule queryOperationsModule = new QueryOperationsModule();
|
||
private fr.unicaen.iota.discovery.client.model.Session sessionInfo;
|
||
private QueryOperationsModule queryOperationsModule;
|
||
private ONSOperation ons;
|
||
private transient fr.unicaen.iota.discovery.client.model.Session sessionInfo;
|
||
private transient DsClient dsClient;
|
||
|
||
public Publisher() {
|
||
ons = new ONSOperation(Constants.ONS_HOSTS);
|
||
queryOperationsModule = new QueryOperationsModule();
|
||
}
|
||
|
||
@Override
|
||
public void run() {
|
||
... | ... | |
log.info("The publisher was not used and terminated normally.");
|
||
return;
|
||
}
|
||
// TODO: use a Timer
|
||
while (true) {
|
||
log.debug("publisher [RUNNING]");
|
||
Collection<EventToPublish> events = queryOperationsModule.eventToPublishLookup(Constants.DS_TO_DS_POOL_EVENT);
|
||
... | ... | |
}
|
||
}
|
||
try {
|
||
log.debug("publisher [WAITNING]");
|
||
log.debug("publisher [WAITING]");
|
||
PublisherMonitor.notification();
|
||
Thread.sleep(Constants.PUBLISHER_FREQUENCY);
|
||
} catch (InterruptedException ex) {
|
||
... | ... | |
}
|
||
}
|
||
|
||
private void logout(String dsAddress) throws RemoteException, EnhancedProtocolException {
|
||
private String login(String dsAddress) throws Exception {
|
||
log.debug("LOGIN: " + dsAddress);
|
||
try {
|
||
dsClient = new DsClient(dsAddress);
|
||
sessionInfo = dsClient.userLogin(Configuration.DEFAULT_SESSION, Constants.DS_LOGIN, Constants.DS_PASSWORD);
|
||
} catch (EnhancedProtocolException ex) {
|
||
log.error("BAD LOGIN OR PASSWORD => END (" + ex.getMessage() + ")");
|
||
return null;
|
||
}
|
||
return sessionInfo.getSessionId();
|
||
}
|
||
|
||
private void logout() throws RemoteException, EnhancedProtocolException {
|
||
log.debug("LOGOUT");
|
||
if (sessionInfo == null) {
|
||
return;
|
||
} else {
|
||
new DsClient(dsAddress).userLogout(sessionInfo.getSessionId());
|
||
if (sessionInfo != null) {
|
||
dsClient.userLogout(sessionInfo.getSessionId());
|
||
sessionInfo = null;
|
||
dsClient = null;
|
||
}
|
||
}
|
||
|
||
... | ... | |
|
||
private List<EventToPublish> proceedEvents(Collection<EventToPublish> events) {
|
||
List<EventToPublish> blackList = new ArrayList<EventToPublish>();
|
||
Map<String, List<EventToPublish>> map = sortByEPCBase(events);
|
||
Map<String, List<EventToPublish>> map = splitByEPCBase(events);
|
||
for (String key : map.keySet()) { // sort by epcBase
|
||
Map<String, String> dsAddresses = queryOperationsModule.queryONS(key, Constants.ONS_HOSTS);
|
||
Map<ONSEntryType, String> dsAddresses = ons.queryONS(key);
|
||
List<EventToPublish> toPublishList = map.get(key);
|
||
if (dsAddresses == null) {
|
||
log.error(key + "impossible de récuperer l'addresse du DS référant !");
|
||
log.error(key + " could not get the referent DS address!");
|
||
blackList.addAll(toPublishList);
|
||
continue;
|
||
}
|
||
String dsAddress = dsAddresses.get(Constants.DS_SERVICE_TYPE);
|
||
String dsAddress = dsAddresses.get(ONSEntryType.ds);
|
||
if (dsAddress == null) {
|
||
log.error("The ONS doesn't know the address of the referent DS for the EPC code: " + key);
|
||
blackList.addAll(toPublishList);
|
||
... | ... | |
log.warn("Event not published: already in the corresponding DS.");
|
||
continue;
|
||
} else {
|
||
String SESSION_ID = null;
|
||
String SESSION_ID;
|
||
if ((SESSION_ID = login(dsAddress)).equals(Constants.SESSION_FAILED_ID)) {
|
||
log.warn("Can't connect to the DS with provided login and password" + dsAddress);
|
||
blackList.addAll(toPublishList);
|
||
... | ... | |
if (sessionInfo == null) {
|
||
throw new Exception("not logged user tryed to publish");
|
||
}
|
||
UserInfo userInfo = new DsClient(dsAddress).userInfo(sessionInfo.getSessionId(), Constants.DS_LOGIN);
|
||
UserInfo userInfo = dsClient.userInfo(sessionInfo.getSessionId(), Constants.DS_LOGIN);
|
||
String partnerId = userInfo.getPartnerId();
|
||
log.debug("partner: " + partnerId);
|
||
int tmp = 0;
|
||
DsClient dsClient = new DsClient(dsAddress);
|
||
while (tmp < toPublishList.size()) {
|
||
List<EventInfo> list = new ArrayList<EventInfo>();
|
||
for (int i = tmp; i < tmp + Constants.SIMULTANEOUS_PUBLISH_LIMIT; i++) {
|
||
... | ... | |
tmp += Constants.SIMULTANEOUS_PUBLISH_LIMIT;
|
||
dsClient.multipleEventCreate(sessionInfo.getSessionId(), partnerId, list);
|
||
log.debug(list.size() + " events published to " + dsAddress);
|
||
sleep(1);
|
||
}
|
||
logout(dsAddress);
|
||
logout();
|
||
}
|
||
} catch (RemoteException ex) {
|
||
log.error("Events can't be published; the DS won't answer.", ex);
|
||
... | ... | |
return blackList;
|
||
}
|
||
|
||
private String login(String dsAddress) throws Exception {
|
||
log.debug("LOGIN: " + dsAddress);
|
||
try {
|
||
sessionInfo = new DsClient(dsAddress).userLogin(Configuration.DEFAULT_SESSION, Constants.DS_LOGIN, Constants.DS_PASSWORD);
|
||
} catch (EnhancedProtocolException ex) {
|
||
log.error("BAD LOGIN OR PASSWORD => END (" + ex.getMessage() + ")");
|
||
return null;
|
||
}
|
||
return sessionInfo.getSessionId();
|
||
}
|
||
|
||
private Map<String, List<EventToPublish>> sortByEPCBase(Collection<EventToPublish> list) {
|
||
private Map<String, List<EventToPublish>> splitByEPCBase(Collection<EventToPublish> list) {
|
||
Map<String, List<EventToPublish>> result = new HashMap<String, List<EventToPublish>>();
|
||
EPCUtilities formater = new EPCUtilities();
|
||
for (EventToPublish e : list) {
|
||
String epcBase;
|
||
try {
|
||
epcBase = formater.formatRevertEpc(e.getEvent().getEpc());
|
||
} catch (InvalidFormatException ex) {
|
||
log.warn(ex.getMessage());
|
||
epcBase = e.getEvent().getEpc();
|
||
}
|
||
String epcBase = e.getEvent().getEpc();
|
||
if (result.containsKey(epcBase)) {
|
||
result.get(epcBase).add(e);
|
||
} else {
|
Formats disponibles : Unified diff
Version 1.9-mock
This is version 1.9-mock. 1.9 because the API still have changes pending
(principally relative to the Discovery Services). “mock” because TLS
configuration is not yet available and the signatures (SigMa) are not fully
implemented.
- code cleaned and refactored
- lots of bugs fixed
- dependencies checked and trimmed
- documentation added
- Identity handling added
- code refactored
- new, better APIs
- Identity handling added
- use EPCglobal and DS events (no proxy types anymore)
- ETa-Callback modules are now available as web applications
- filtering rules: if a part of an event is not allowed, now the whole
event is deleted from the result (before only the rejectd part was)
- new temporary User web service
- new Xi module: XACML Interrogation web service (was two modules: TCP and
servlet)
- ETa and its Callback modules
- ActiveMQ
- SigMa
- certificate/signing key
- new simplified figures (sans IoTa and simplified IoTa)
- new figure for ETa modules
- show 3rd party clients
- data flows specified
- TLS and link security added
- IDs and trusted IDs added
- color adjusted for printing
- GREYC logo added