您的位置:首页 > 其它

企业搜索引擎开发之连接器connector(十)

2013-03-19 03:33 357 查看
这里分析一下FeedConnection接口及其实现类GsaFeedConnection相关源码:

FeedConnection接口源码如下:

/**
* Interface for a feed connection. This takes in a data source name and a data
* source object that contains the data to be sent.  The actual connection to
* the feed server should be established by the implementation during
* construction or initialization.
*/
public interface FeedConnection {

/**
* Sends data contained in the given data object identified as the given data
* source name.
*
* @param feedData an object that encapsulates the feed data that needs to be
*        sent by the <code>FeedConnection</code>.
* @return response from the feed server.
* @throws FeedException if problem extracting the data or sending it.
* @throws RepositoryException if problem retrieving data from the Connector.
*/
public String sendData(FeedData feedData)
throws FeedException, RepositoryException;

/**
* Returns true if the Feed host has large number of unprocessed Feed items.
* The Feed host may temporarily stop processing Feed items during periodic
* maintenance, when resetting the index, during system configuration, or
* due to certain error conditions. If backlogged, the Feed client may choose
* to throttle back its feeds until the backlog clears.
*
* @return true if the Feed host is known to be backlogged processing feeds,
*         false otherwise.
*/
public boolean isBacklogged();

/**
* Return a String consisting of a comma-separated list supported content
* encodings.  For instance: "base64binary, base64compressed".
*
* @return supported content encodings.
*/
public String getContentEncodings();

}


方法String sendData(FeedData feedData)向应用中心发送数据;boolean isBacklogged()检查应用中心是否可用;String getContentEncodings()检测应用中心支持的编码类型

实现类GsaFeedConnection源码如下:

/**
* Opens a connection to a url and sends data to it.
*/
public class GsaFeedConnection implements FeedConnection {

/**
* The GSA's response when it successfully receives a feed.
*/
public static final String SUCCESS_RESPONSE = "Success";

/**
* The GSA's response when the client is not authorized to send feeds.
*/
public static final String UNAUTHORIZED_RESPONSE =
"Error - Unauthorized Request";

/**
* The GSA's response when it runs out of disk space.
*/
public static final String DISKFULL_RESPONSE =
"Feed not accepted due to insufficient disk space.";

/**
* The GSA's response when there was an internal error.
*/
public static final String INTERNAL_ERROR_RESPONSE = "Internal Error";

// Multipart/form-data uploads require a boundary to delimit controls.
// Since we XML-escape or base64-encode all data provided by the connector,
// the feed XML will never contain "<<".
private static final String BOUNDARY = "<<";

private static final String CRLF = "\r\n";

// Content encodings supported by GSA.
private String contentEncodings = null;

// True if we recently got a feed error of some sort.
private boolean gotFeedError = false;

// XmlFeed URL
private URL feedUrl = null;

// XmlFeed DTD URL
private URL dtdUrl = null;

// BacklogCount URL
private URL backlogUrl = null;

// BacklogCount Ceiling. Throttle back feed if backlog exceeds the ceiling.
private int backlogCeiling = 50000;

// BacklogCount Floor. Stop throttling feed if backlog drops below floor.
private int backlogFloor = 15000;

// True if the feed is throttled back due to excessive backlog.
private boolean isBacklogged = false;

// Time of last backlog check.
private long lastBacklogCheck;

// How often to check for backlog (in milliseconds).
private long backlogCheckInterval = 15 * 60 * 1000L;

private static final Logger LOGGER =
Logger.getLogger(GsaFeedConnection.class.getName());

public GsaFeedConnection(String host, int port) throws MalformedURLException {
this.setFeedHostAndPort(host, port);
}

public synchronized void setFeedHostAndPort(String host, int port)
throws MalformedURLException {
feedUrl = new URL("http", host, port, "/xmlfeed");
dtdUrl = new URL("http", host, port, "/getdtd");
contentEncodings = null;
backlogUrl = new URL("http", host, port, "/getbacklogcount");
lastBacklogCheck = 0L;
}

/**
* Set the backlog check parameters. The Feed connection can check to see
* if the GSA is falling behind processing feeds by calling the GSA's
* {@code getbacklogcount} servlet. If the number of outstanding feed
* items exceeds the {@code ceiling}, then the GSA is considered
* backlogged.  If the number of outstanding feed items then drops below
* the {@code floor}, it may be considered no longer backlogged.
*
* @param floor backlog count floor value, below which the GSA is no
*        longer considered backlogged.
* @param ceiling backlog count ceiling value, above which the GSA is
*        considered backlogged.
* @param interval number of seconds to wait between backlog count checks.
*/
public void setBacklogCheck(int floor, int ceiling, int interval) {
backlogFloor = floor;
backlogCeiling = ceiling;
backlogCheckInterval = interval * 1000L;
}

public void setContentEncodings(String contentEncodings) {
this.contentEncodings = contentEncodings;
}

private static final void controlHeader(StringBuilder builder,
String name, String mimetype) {
builder.append("--").append(BOUNDARY).append(CRLF);
builder.append("Content-Disposition: form-data;");
builder.append(" name=\"").append(name).append("\"").append(CRLF);
builder.append("Content-Type: ").append(mimetype).append(CRLF);
builder.append(CRLF);
}

/* @Override */
public String sendData(FeedData feedData)
throws FeedException {
try {
String response = sendFeedData((XmlFeed)feedData);
gotFeedError = !response.equalsIgnoreCase(SUCCESS_RESPONSE);
return response;
} catch (FeedException fe) {
gotFeedError = true;
throw fe;
}
}

private String sendFeedData(XmlFeed feed)
throws FeedException {
String feedType = feed.getFeedType();
String dataSource = feed.getDataSource();
OutputStream outputStream;
HttpURLConnection uc;
StringBuilder buf = new StringBuilder();
byte[] prefix;
byte[] suffix;
try {
// Build prefix.
controlHeader(buf, "datasource", ServletUtil.MIMETYPE_TEXT_PLAIN);
buf.append(dataSource).append(CRLF);
controlHeader(buf, "feedtype", ServletUtil.MIMETYPE_TEXT_PLAIN);
buf.append(feedType).append(CRLF);
controlHeader(buf, "data", ServletUtil.MIMETYPE_XML);
prefix = buf.toString().getBytes("UTF-8");

// Build suffix.
buf.setLength(0);
buf.append(CRLF).append("--").append(BOUNDARY).append("--").append(CRLF);
suffix = buf.toString().getBytes("UTF-8");

LOGGER.finest("Opening feed connection.");
synchronized (this) {
uc = (HttpURLConnection) feedUrl.openConnection();
}
uc.setDoInput(true);
uc.setDoOutput(true);
uc.setFixedLengthStreamingMode(prefix.length + feed.size()
+ suffix.length);
uc.setRequestProperty("Content-Type", "multipart/form-data; boundary="
+ BOUNDARY);
outputStream = uc.getOutputStream();
} catch (IOException ioe) {
throw new FeedException(ioe);
}

boolean isThrowing = false;
buf.setLength(0);
try {
LOGGER.finest("Writing feed data to feed connection.");
// If there is an exception during this read/write, we do our
// best to close the url connection and read the result.
try {
outputStream.write(prefix);
feed.writeTo(outputStream);
outputStream.write(suffix);
outputStream.flush();
} catch (IOException e) {
LOGGER.log(Level.SEVERE,
"IOException while posting: will retry later", e);
isThrowing = true;
throw new FeedException(e);
} catch (RuntimeException e) {
isThrowing = true;
throw e;
} catch (Error e) {
isThrowing = true;
throw e;
} finally {
try {
outputStream.close();
} catch (IOException e) {
LOGGER.log(Level.SEVERE,
"IOException while closing after post: will retry later", e);
if (!isThrowing) {
isThrowing = true;
throw new FeedException(e);
}
}
}
} finally {
BufferedReader br = null;
try {
LOGGER.finest("Waiting for response from feed connection.");
InputStream inputStream = uc.getInputStream();
br = new BufferedReader(new InputStreamReader(inputStream, "UTF8"));
String line;
while ((line = br.readLine()) != null) {
buf.append(line);
}
} catch (IOException ioe) {
if (!isThrowing) {
throw new FeedException(ioe);
}
} finally {
try {
if (br != null) {
br.close();
}
} catch (IOException e) {
LOGGER.log(Level.SEVERE,
"IOException while closing after post: continuing", e);
}
if (uc != null) {
uc.disconnect();
}
if (LOGGER.isLoggable(Level.FINEST)) {
LOGGER.finest("Received response from feed connection: "
+ buf.toString());
}
}
}
return buf.toString();
}

/* @Override */
public synchronized String getContentEncodings() {
if (contentEncodings == null) {
String dtd = getDtd();
if (dtd == null) {
// Failed to get a DTD. Assume the GSA only supports base64 encoded.
contentEncodings = "base64binary";
} else {
// TODO: Extract the supported content encodings from the DTD.
// As of GSA 6.2, returning a DTD at all also means compression
// is supported.
contentEncodings = "base64binary,base64compressed";
}
if (LOGGER.isLoggable(Level.FINE)) {
LOGGER.fine("GSA supports Content Encodings: " + contentEncodings);
}
}
return contentEncodings;
}

/* @Override */
public synchronized boolean isBacklogged() {
if (lastBacklogCheck != Long.MAX_VALUE) {
long now = System.currentTimeMillis();
if ((now - lastBacklogCheck) > backlogCheckInterval) {
lastBacklogCheck = now;
// If we got a feed error and the feed is still down, delay.
if (gotFeedError) {
if (isFeedAvailable()) {
gotFeedError = false;
} else {
// Feed is still unavailable.
return true;
}
}
try {
int backlogCount = getBacklogCount();
if (backlogCount >= 0) {
if (isBacklogged) {
// If we were backlogged, but have dropped below the
// floor value, then we are no longer backlogged.
if (backlogCount < backlogFloor) {
isBacklogged = false;
LOGGER.info("Resuming traversal after feed backlog clears.");
}
} else if (backlogCount > backlogCeiling) {
// If the backlogcount exceeds the ceiling value,
// then we are definitely backlogged.
isBacklogged = true;
LOGGER.info("Pausing traversal due to excessive feed backlog.");
}
}
} catch (UnsupportedOperationException e) {
// This older GSA does not support getbacklogcount.
// Assume never backlogged and don't check again.
isBacklogged = false;
lastBacklogCheck = Long.MAX_VALUE;
LOGGER.fine("Older GSA lacks backlogcount support.");
}
}
}
return isBacklogged;
}

/**
* @return the current feed backlog count of the GSA,
*         or -1 if the count is unavailable.
* @throws UnsupportedOperationException if the GSA does
*         not support getbacklogcount.
*/
private int getBacklogCount() {
try {
HttpResponse response = doGet(backlogUrl, "backlogcount");
if (response != null && response.content != null) {
return Integer.parseInt(response.content);
}
} catch (NumberFormatException ignored) {
// Got a non-integer backlog count - probably an error message,
// which we have already logged (at Finest).  Simply return -1,
// indicating that the backlogcount is not currently available.
}
// If we get here something bad happened.  It is not the case that the
// GSA doesn't support getbacklogcount, but we still failed to retrieve it.
return -1;
}

/**
* Tests for feed error conditions such as insufficient disk space,
* unauthorized clients, etc.  If the /xmlfeed command is sent with no
* arguments, the server will return an error message and a 200 response
* code if it can't accept feeds.  If it can continue to accept feeds, then
* it will return a 400 bad request since it's missing required parameters.
*
* @return True if feed host is likely to accept a feed request.
*/
private boolean isFeedAvailable() {
try {
HttpResponse response = doGet(feedUrl, "XmlFeed");
if (response != null) {
if (response.responseCode == HttpURLConnection.HTTP_BAD_REQUEST) {
// The expected responseCode if no error conditions are present.
LOGGER.finest("XmlFeed connection seems to be accepting new feeds.");
return true;
}
if (response.content != null) {
response.content.contains(SUCCESS_RESPONSE);
}
}
} catch (UnsupportedOperationException ignored) {
// This GSA does not support feeds?  Return false.
}
// If we get here something bad happened.
return false;
}

/**
* @return the current feed XML DTD for the GSA,
*         or null if the DTD is unavailable.
*/
private String getDtd() {
try {
HttpResponse response = doGet(dtdUrl, "DTD");
if (response != null && response.content != null) {
return response.content;
}
} catch (UnsupportedOperationException ignored) {
// This older GSA does not support getdtd, so return null.
LOGGER.fine("Older GSA lacks get DTD support.");
}
return null;
}

/**
* Get the response to a URL request.  The response is returned
* as an HttpResponse containing the HTTP ResponseCode and the
* returned content as a String. The content String is only returned
* if the response code was OK.
*
* @param url the URL to request
* @param name the name of the feature requested (for logging)
* @return HttpResponse representing response to an HTTP GET.
*         or null if the GSA is unavailable.
* @throws UnsupportedOperationException if the GSA does
*         not support the requested feature.
*/
private HttpResponse doGet(URL url, String name) {
HttpURLConnection conn = null;
BufferedReader br = null;
String str = null;
StringBuilder buf = new StringBuilder();
try {
if (LOGGER.isLoggable(Level.FINEST)) {
LOGGER.finest("Opening " + name + " connection.");
}
conn = (HttpURLConnection)url.openConnection();
conn.connect();
int responseCode = conn.getResponseCode();
if (responseCode == HttpURLConnection.HTTP_OK) {
br = new BufferedReader(new InputStreamReader(conn.getInputStream(),
"UTF8"));
while ((str = br.readLine()) != null) {
buf.append(str);
}
str = buf.toString().trim();
if (LOGGER.isLoggable(Level.FINEST)) {
LOGGER.finest("Received " + name + ": " + str);
}
return new HttpResponse(responseCode, str);
} else if (responseCode == HttpURLConnection.HTTP_NOT_FOUND) {
throw new UnsupportedOperationException(
"GSA lacks " + name + " support.");
} else {
return new HttpResponse(responseCode);
}
} catch (IOException ioe) {
LOGGER.finest("Error while reading " + name + ": " + ioe.getMessage());
} finally {
try {
if (br != null) {
br.close();
}
} catch (IOException e) {
LOGGER.finest("Error after reading " + name + ": " + e.getMessage());
}
if (conn != null) {
conn.disconnect();
}
}
// If we get here something bad happened. It is not the case that the GSA
// doesn't support the requested feature, but we failed to retrieve it.
return null;
}

private static class HttpResponse {
public int responseCode;  // The HTTP response code.
public String content;    // The returned content as a String.

public HttpResponse(int responseCode) {
this(responseCode, null);
}

public HttpResponse(int responseCode, String content) {
this.responseCode = responseCode;
this.content = content;
}
}
}


在String sendFeedData(XmlFeed feed)方法中是通过HttpURLConnection类来发送数据的,向HttpURLConnection的OutputStream输出流写入xmlfeed数据

---------------------------------------------------------------------------

本系列企业搜索引擎开发之连接器connector系本人原创

转载请注明出处 博客园 刺猬的温驯

本文链接http://www.cnblogs.com/chenying99/archive/2013/03/19/2968427.html
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: