Java 线程池下载网页

Java Thread Programming 有一个线程池的例子, 使用了一个FIFO队列来作为线程池. 利用这个线程池, 可以实现对多个网页进行并行下载, 例如可以对100个网站抓取其更新内容, 一个一个来就太慢. 下面是实现线程池的几个类

 
public class ObjectFIFO extends Object {
    private Object[] queue;
    private int capacity;
    private int size;
    private int head;
    private int tail;
 
    public ObjectFIFO ( int cap ) {
        capacity = ( cap > 0 ) ? cap : 1;
        queue = new Object[capacity];
        head = 0;
        tail = 0;
        size = 0;
    }
 
    public int getCapacity () {
        return capacity;
    }
 
    public synchronized int getSize () {
        return size;
    }
 
    public synchronized boolean isEmpty () {
        return ( size == 0 );
    }
 
    public synchronized boolean isFull () {
        return ( size == capacity );
    }
 
    public synchronized void add ( Object obj ) throws InterruptedException {
        waitWhileFull ();
        queue[head] = obj;
        head = ( head + 1 ) % capacity;
        size++;
        notifyAll(); 
    }
 
    public synchronized void addEach  ( Object[] list ) throws InterruptedException {
        for ( int i = 0 ; i < list.length ; i++ ) {
            add(list[i]);
        }
    }
 
    public synchronized Object remove  ( ) throws InterruptedException {
 
        waitWhileEmpty();
 
        Object obj = queue[tail];
        queue[tail] = null;
 
        tail = ( tail + 1 ) % capacity;
        size--;
 
        notifyAll();
 
        return obj;
    }
 
    public synchronized Object[] removeAll  ( ) throws InterruptedException {
        Object[] list = new Object[size];
 
        for ( int i = 0 ; i < list.length ; i++ ) {
            list[i] = remove();
        }
 
        return list;
    }
 
    public synchronized Object[] removeAtLeastOne()      
               throws InterruptedException {     
 
        waitWhileEmpty(); // wait for at least one to be in 
 
        return removeAll();     
    }
 
    public synchronized boolean waitUntilEmpty(long msTimeout)  throws InterruptedException {     
 
           if ( msTimeout == 0L ) {     
               waitUntilEmpty();  // use other method     
               return true;     
           }     
 
           // wait only for the specified amount of time     
 
           long endTime = System.currentTimeMillis() + msTimeout; 
 
 
           long msRemaining = msTimeout;     
 
           while ( !isEmpty() && ( msRemaining > 0L ) ) {     
               wait(msRemaining);     
               msRemaining = endTime - System.currentTimeMillis(); 
           }     
 
           // May have timed out, or may have met condition,      
           // calc return value.     
           return isEmpty();     
    }
 
    public synchronized void waitUntilEmpty()      
               throws InterruptedException {     
 
           while ( !isEmpty() ) {     
               wait();     
           }     
    }     
 
    public synchronized void waitWhileEmpty()      
            throws InterruptedException {     
 
 
        while ( isEmpty() ) {     
            // ObjectFIFOTest.print("is empty and we will wait before wait waitWhileEmpty");
            wait();
            // ObjectFIFOTest.print("wakeup from waitWhileEmpty");
        }
    }       
 
    public synchronized void waitUntilFull()      
            throws InterruptedException {     
 
        while ( !isFull() ) {     
            wait();     
        }     
    }     
 
    public synchronized void waitWhileFull()      
            throws InterruptedException {     
 
        while ( isFull() ) {     
            wait(); // tells the calling thread to give up the monitor and go to sleep until some other thread enters the same monitor and calls notify( ).
                    // wait on this object.  A thread that calls wait() on any object becomes inactive until another thread calls notify() on that object. 
 
        }     
    }
}
 
 
 
 

Worker执行具体的任务:

 
 
 
public class ThreadPoolWorker extends Object {
 
    private static int nextWorkerID = 0;     
 
    private ObjectFIFO idleWorkers;       
    private int workerID;     
    private ObjectFIFO handoffBox;     
 
    private Thread internalThread;     
    private volatile boolean noStopRequested;     
 
    public ThreadPoolWorker(ObjectFIFO idleWorkers) {     
        this.idleWorkers = idleWorkers;     
 
        workerID = getNextWorkerID();     
        handoffBox = new ObjectFIFO(1); // only one slot     
 
 
        // just before returning, the thread should be created. 
 
 
        noStopRequested = true;     
 
        Runnable r = new Runnable() {     
                public void run() {     
                    try {     
                        runWork();     
                    } catch ( Exception x ) {     
                        // in case ANY exception slips through 
                        x.printStackTrace();     
                    }     
                }     
            };     
 
        internalThread = new Thread(r);     
        internalThread.start();     
    }     
 
    public static synchronized int getNextWorkerID() {     
 
        // notice: sync’d at the class level to ensure uniqueness 
 
 
        int id = nextWorkerID;     
        nextWorkerID++;     
        return id;     
    }     
 
 
    public void process(Runnable target) throws InterruptedException {
 
        handoffBox.add(target);      // construct when ThreadPoolWorker constructed
    }     
 
 
    private void runWork() {
 
 
        while ( noStopRequested ) {     
            try {     
                System.out.println("workerID=" + workerID +     
                        ", ready for work");     
 
                // Worker is ready work. This will never block 
 
 
                // because the idleWorker FIFO queue has     
                // enough capacity for all the workers.      
 
 
                Runnable r = (Runnable) handoffBox.remove();     
 
                System.out.println("workerID=" + workerID + ", starting execution of new Runnable: " + r); 
 
 
                runIt(r); // catches all exceptions     
            } catch ( InterruptedException x ) {     
 
                Thread.currentThread().interrupt(); // re-assert 
            }     
        }     
    }     
 
    private void runIt(Runnable r) {     
        try {     
            r.run();     
        } catch ( Exception runex ) {     
            // catch any and all exceptions     
            System.err.println("Uncaught exception fell through from run()"); 
 
 
            runex.printStackTrace();     
        } finally {     
 
            // Clear the interrupted flag (in case it comes back 
 
 
            // set) so that if the loop goes again, the     
            // handoffBox.remove() does not mistakenly     
            // throw an InterruptedException.     
            Thread.interrupted();     
        }     
    }     
 
    public void stopRequest() {     
        System.out.println("workerID=" + workerID +     
                ", stopRequest() received.");     
        noStopRequested = false;     
        internalThread.interrupt();      //       interrupt is like the pause
    }     
 
    public boolean isAlive() {     
        return internalThread.isAlive();     
    } 
}
 
 

访问线程池的接口.

 
public class ThreadPool extends Object {
    private ObjectFIFO idleWorkers;
    private ThreadPoolWorker[] workerList;
 
    public ThreadPool(int numberOfThreads) {     
          // make sure that it’s at least one     
          numberOfThreads = Math.max(1, numberOfThreads);     
 
           idleWorkers = new ObjectFIFO(numberOfThreads);     
           workerList = new ThreadPoolWorker[numberOfThreads];     
 
           for ( int i = 0; i < workerList.length; i++ ) {     
 
               workerList[i] = new ThreadPoolWorker(idleWorkers); 
 
 
           }     
       }     
 
 
 
       public void execute(Runnable target) throws InterruptedException {  
           // block (forever) until a worker is available     
 
           ThreadPoolWorker worker = (ThreadPoolWorker) idleWorkers.remove(); 
 
 
           worker.process(target);     
       }     
 
       public void stopRequestIdleWorkers() {     
           try {     
               Object[] idle = idleWorkers.removeAll();     
               for ( int i = 0; i < idle.length; i++ ) {     
                   ( (ThreadPoolWorker) idle[i] ).stopRequest();     
               }     
           } catch ( InterruptedException x ) {     
               Thread.currentThread().interrupt(); // re-assert     
           }     
       }     
 
       public void stopRequestAllWorkers() {     
           // Stop the idle one’s first      
           // productive.     
           stopRequestIdleWorkers();     
 
           // give the idle workers a quick chance to die     
 
           try { Thread.sleep(250); } catch ( InterruptedException x ) { } 
 
 
 
           // Step through the list of ALL workers.     
           for ( int i = 0; i < workerList.length; i++ ) {     
               if ( workerList[i].isAlive() ) {     
                   workerList[i].stopRequest();     
               }     
           }     
       }
 
}
 

将需要访问的网网址保存在数据库中:

 
CREATE TABLE `domainlist` (
    `id` INT(11) NOT NULL AUTO_INCREMENT,
    `domain` VARCHAR(100) COLLATE utf8_unicode_ci NOT NULL DEFAULT '',
    `linkSelector` VARCHAR(100) COLLATE utf8_unicode_ci NOT NULL DEFAULT '',
    `nextPageSelector` VARCHAR(100) COLLATE utf8_unicode_ci NOT NULL DEFAULT '',
 
    PRIMARY KEY  (`ID`)
  ) ENGINE=MyISAM AUTO_INCREMENT=11 DEFAULT CHARSET=utf8 COLLATE=utf8_unicode_ci;
 
INSERT INTO domainlist VALUES('', 'mikeperham.com', '.div {}', '.div{}');
INSERT INTO domainlist VALUES('', 'ocaoimh.ie', '.div {}', '.div{}');
INSERT INTO domainlist VALUES('', 'codinghorror.com', '.div {}', '.div{}');
 
 

下面的代码遍历数据库, 然后每个网址使用一个线程来处理:

 
import java.io.*;
import java.net.*;
import java.sql.*;
import java.util.Iterator;     
import java.util.List;     
import java.util.Map;     
import java.util.Set;  
import java.util.zip.*;
 
import info.monitorenter.cpdetector.io.CodepageDetectorProxy;     
import info.monitorenter.cpdetector.io.HTMLCodepageDetector;     
import info.monitorenter.cpdetector.io.JChardetFacade;  
 
import org.xml.sax.SAXException;
 
 
import cz.vutbr.web.css.*;
 
public class RssGrabTest {
 
    public ThreadPool tp;
    private static CodepageDetectorProxy detector = CodepageDetectorProxy     .getInstance();     
    static {     
        detector.add(new HTMLCodepageDetector(false));     
        detector.add(JChardetFacade.getInstance());     
    }     
 
    public boolean gziped = false;
    public URL verifiedurl = null;        
 
    public Connection libconn = null;    
 
    public static void main ( String[] args ) {
        RssGrabTest nc = new RssGrabTest();
 
        nc.InitConnection();
        nc.tp = new ThreadPool(10);        
        nc.Init();
    }
 
    public void Init () {
        GoThread gt = new GoThread(this);
        gt.start();
    }
    public void msg(String s) {
        System.out.println(s);
    }
 
    /*
     * 设置好到数据库的连接
     * */
    public void  InitConnection() {
        try {
            String dbname = "rss";
            Class.forName("com.mysql.jdbc.Driver");
            String url = "jdbc:mysql://localhost:3306/"+dbname+"?useUnicode=true&characterEncoding=UTF-8&user=root&password=pass";
            libconn = DriverManager.getConnection(url);
 
        } catch ( Exception e ) {
            //
            msg("conn fails");
            e.printStackTrace();
        }
 
    }
 
 
    public void GoWithDomain( DomainObject rs ) {
        String domainName = rs.domain;
        String homeUrl = "http://" + domainName + "/";
 
        String homePage = DownloadPage ( homeUrl );        
 
        String currentPage = homePage;
 
        System.out.print( currentPage );
 
    }
 
 
    public class GoThread extends Thread {
        private RssGrabTest rg ;
        GoThread( RssGrabTest rg ) {
            this.rg = rg;
        }
        public void run() {
            try {
                rg.Go();
            } catch (SAXException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            } catch (IOException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    }
 
     private Runnable createRunnable(final DomainObject p){
 
        Runnable aRunnable = new Runnable(){
            public void run(){
                GoWithDomain(p);
            }
        };
 
        return aRunnable;
 
    }
 
    // 映射table
    class DomainObject {
        final public static String table = "domainlist";
        public Integer id;
        public String domain; // like xx.com
        public String linkSelector;
        public String nextPageSelector;
    } ;
 
    public DomainObject CreatePOJO( ResultSet rs ) {
        DomainObject po = new DomainObject();
        try {
            po.id = rs.getInt(1);
            po.domain = rs.getString(2);
            po.linkSelector = rs.getString(3);
            po.nextPageSelector = rs.getString(4);
        } catch (SQLException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
 
        return po;
    }
 
    public void Go() throws SAXException, IOException {
 
        try {
            String sql = "select * from domainlist";
            PreparedStatement ps = libconn.prepareStatement(sql);
 
            ResultSet rs = ps.executeQuery();
            while( rs.next() ){
 
                msg("creating pojo");
                Runnable r = createRunnable( CreatePOJO( rs ) );
                try {
                    tp.execute( r );
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
 
            }
        } catch (SQLException e) {
 
            e.printStackTrace();
        }
 
    }
 
    public CombinedSelector CreateSelector ( String selector ) {
        StyleSheet ss;
        RuleSet rs;
        CombinedSelector cs = null;
        CombinedSelector endcs = null;
 
        try {
            ss = CSSFactory.parse(selector);
            rs = (RuleSet)ss.get(0); 
            cs = (CombinedSelector) rs.getSelectors().get(0);
            return cs;
 
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (CSSException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        return cs;
    }
 
 
 
    public String DownloadPage(String url) {
        verifiedurl = verifyUrl(url);
 
        try {
            msg("开始下载页面:" + url);
 
            URLConnection con = verifiedurl.openConnection();
            con.setDoOutput(true);
            con.setRequestProperty("User-Agent", "Mozilla/5.0 (X11; U; Linux i686; zh-CN; rv:1.9.1.2) Gecko/20090803 Fedora/3.5");
            //con.setRequestProperty("Accept-Encoding", "gzip,deflate");
            con.connect();
            InputStream ins = con.getInputStream();
            GZIPInputStream gzin = null;
 
            String charset = getCharset(url).toString();
            if(gziped == true)
                gzin = new GZIPInputStream(ins);  
 
            BufferedReader reader = new BufferedReader( 
                    new InputStreamReader( gzin == null ? ins : gzin ,charset));
 
            msg("in DownloadPage: stream is opened");
            String line;
            StringBuffer pageBuffer  = new StringBuffer();
            while((line = reader.readLine()) != null){
                msg( Thread.currentThread().getId()+ url + ": readed: " + line.length() + ":" + line);
                try {
                    Thread.currentThread().sleep(400);
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
                pageBuffer.append(line+ "\r\n");
            }
 
            return pageBuffer.toString();
 
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        return null;
 
    }
 
 
    public URL verifyUrl( String url ){
        if ( !url.toLowerCase().startsWith("http://") )
            return null;
 
        URL verifiedUrl = null;
        try {
            verifiedUrl = new URL(url);
        } catch ( Exception e ) {
            return null;
        }
        return verifiedUrl;
    }
 
 
    public String getCharset(String strurl) throws IOException {     
 
        URL url = new URL(strurl);     
        HttpURLConnection urlConnection = (HttpURLConnection) url.openConnection();     
        urlConnection.connect();     
        String strencoding = null;     
 
        Map<String, List<String>> map = urlConnection.getHeaderFields();     
        Set<String> keys = map.keySet();     
        Iterator<String> iterator = keys.iterator();
 
 
        String key = null;     
        String tmp = null;     
        while (iterator.hasNext()) {     
            key = iterator.next();     
            tmp = map.get(key).toString().toLowerCase();
 
 
            if (key != null && key.equals("Content-Type")) {     
                int m = tmp.indexOf("charset=");     
                if (m != -1) {     
                    strencoding = tmp.substring(m + 8).replace("]", "");   
                    //return strencoding;     
                }     
            }
 
            if (key != null && key.equals("Content-Encoding")) {
                if(tmp.indexOf("gzip") != -1)
                    gziped = true;
            }
        }     
        if (strencoding != null ) return strencoding;
 
 
        StringBuffer sb = new StringBuffer();     
        String line;
 
        try {     
            BufferedReader in = new BufferedReader(new InputStreamReader(url     
                    .openStream()));     
            while ((line = in.readLine()) != null) {     
 
                sb.append(line);     
            }     
            in.close();     
        } catch (Exception e) { // Report any errors that arise     
            System.err.println(e);     
            System.err     
                    .println("Usage:   java   HttpClient   <URL>   [<filename>]");     
        }     
        String htmlcode = sb.toString();             
        String strbegin = "<meta";     
        String strend = ">";     
        String strtmp;     
        int begin = htmlcode.indexOf(strbegin);     
        int end = -1;     
        int inttmp;             
 
        while (begin > -1) {
 
            end = htmlcode.substring(begin).indexOf(strend);     
            if (begin > -1 && end > -1) {     
                strtmp = htmlcode.substring(begin, begin + end).toLowerCase();     
                inttmp = strtmp.indexOf("charset");     
                if (inttmp > -1) {     
 
                    strencoding = strtmp.substring(inttmp + 7, end).replace(     
                            "=", "").replace("/", "").replace("\"", "")     
                            .replace("\'", "").replace(" ", "");
 
                    return strencoding;     
                }     
            }     
 
            htmlcode = htmlcode.substring(begin+strbegin.length());     
            begin = htmlcode.indexOf(strbegin);     
        }     
 
        strencoding = getFileEncoding(url);     
 
        if (strencoding == null) {     
            strencoding = "UTF-8";     
        }      
 
        return strencoding;     
    }     
 
    public static String getFileEncoding(URL url) {     
 
        java.nio.charset.Charset charset = null;     
        try {      
            charset = detector.detectCodepage(url);      
        } catch (Exception e) {      
 
        }     
        if (charset != null)      
            return charset.name();      
        return null;     
 
    } 
 
}