修订版 | 62695e1a2c94c9dbf6be2fbfd60656b5726cac52 (tree) |
---|---|
时间 | 2007-12-26 02:45:02 |
作者 | eru <eru01@user...> |
Commiter | eru |
生命キャスト対策コード(VPdiff20071223)マージ。
@@ -537,6 +537,7 @@ int Channel::handshakeFetch() | ||
537 | 537 | sock->writeLineF("GET /channel/%s HTTP/1.0",idStr); |
538 | 538 | sock->writeLineF("%s %d",PCX_HS_POS,streamPos); |
539 | 539 | sock->writeLineF("%s %d",PCX_HS_PCP,1); |
540 | + sock->writeLineF("%s %d",PCX_HS_PORT,servMgr->serverHost.port); | |
540 | 541 | |
541 | 542 | sock->writeLine(""); |
542 | 543 |
@@ -1475,8 +1476,11 @@ bool ChannelStream::getStatus(Channel *ch,ChanPacket &pack) | ||
1475 | 1476 | // ----------------------------------- |
1476 | 1477 | bool Channel::checkBump() |
1477 | 1478 | { |
1479 | + unsigned int maxIdleTime = 30; | |
1480 | + if (isIndexTxt(this)) maxIdleTime = 60; | |
1481 | + | |
1478 | 1482 | if (!isBroadcasting() && (!sourceHost.tracker)) |
1479 | - if (rawData.lastWriteTime && ((sys->getTime() - rawData.lastWriteTime) > 30)) | |
1483 | + if (rawData.lastWriteTime && ((sys->getTime() - rawData.lastWriteTime) > maxIdleTime)) | |
1480 | 1484 | { |
1481 | 1485 | LOG_ERROR("Channel Auto bumped"); |
1482 | 1486 | bump = true; |
@@ -1509,6 +1513,9 @@ int Channel::readStream(Stream &in,ChannelStream *source) | ||
1509 | 1513 | |
1510 | 1514 | unsigned int receiveStartTime = 0; |
1511 | 1515 | |
1516 | + unsigned int ptime = 0; | |
1517 | + unsigned int upsize = 0; | |
1518 | + | |
1512 | 1519 | try |
1513 | 1520 | { |
1514 | 1521 | while (thread.active && !peercastInst->isQuitting) |
@@ -1573,7 +1580,14 @@ int Channel::readStream(Stream &in,ChannelStream *source) | ||
1573 | 1580 | } |
1574 | 1581 | } |
1575 | 1582 | |
1576 | - source->flush(in); | |
1583 | + unsigned int t = sys->getTime(); | |
1584 | + if (t != ptime) { | |
1585 | + ptime = t; | |
1586 | + upsize = Servent::MAX_OUTWARD_SIZE; | |
1587 | + } | |
1588 | + | |
1589 | + unsigned int len = source->flushUb(in, upsize); | |
1590 | + upsize -= len; | |
1577 | 1591 | |
1578 | 1592 | sys->sleepIdle(); |
1579 | 1593 | } |
@@ -1733,9 +1747,12 @@ void ChanPacket::init(ChanPacketv &p) | ||
1733 | 1747 | { |
1734 | 1748 | type = p.type; |
1735 | 1749 | len = p.len; |
1750 | + if (len > MAX_DATALEN) | |
1751 | + throw StreamException("Packet data too large"); | |
1736 | 1752 | pos = p.pos; |
1737 | 1753 | sync = p.sync; |
1738 | 1754 | skip = p.skip; |
1755 | + priority = p.priority; | |
1739 | 1756 | memcpy(data, p.data, len); |
1740 | 1757 | } |
1741 | 1758 | // ----------------------------------- |
@@ -1748,6 +1765,7 @@ void ChanPacket::init(TYPE t, const void *p, unsigned int l,unsigned int _pos) | ||
1748 | 1765 | memcpy(data,p,len); |
1749 | 1766 | pos = _pos; |
1750 | 1767 | skip = false; |
1768 | + priority = 0; | |
1751 | 1769 | } |
1752 | 1770 | // ----------------------------------- |
1753 | 1771 | void ChanPacket::writeRaw(Stream &out) |
@@ -1970,10 +1988,34 @@ void ChanPacketBuffer::readPacket(ChanPacket &pack) | ||
1970 | 1988 | pack.init(packets[readPos%MAX_PACKETS]); |
1971 | 1989 | readPos++; |
1972 | 1990 | lock.off(); |
1991 | +} | |
1973 | 1992 | |
1974 | - sys->sleepIdle(); | |
1993 | +// ----------------------------------- | |
1994 | +void ChanPacketBuffer::readPacketPri(ChanPacket &pack) | |
1995 | +{ | |
1996 | + unsigned int tim = sys->getTime(); | |
1997 | + | |
1998 | + if (readPos < firstPos) | |
1999 | + throw StreamException("Read too far behind"); | |
2000 | + | |
2001 | + while (readPos >= writePos) | |
2002 | + { | |
2003 | + sys->sleepIdle(); | |
2004 | + if ((sys->getTime() - tim) > 30) | |
2005 | + throw TimeoutException(); | |
2006 | + } | |
2007 | + lock.on(); | |
2008 | + ChanPacketv *best = &packets[readPos % MAX_PACKETS]; | |
2009 | + for (unsigned int i = readPos + 1; i < writePos; i++) { | |
2010 | + if (packets[i % MAX_PACKETS].priority > best->priority) | |
2011 | + best = &packets[i % MAX_PACKETS]; | |
2012 | + } | |
2013 | + pack.init(*best); | |
2014 | + best->init(packets[readPos % MAX_PACKETS]); | |
2015 | + readPos++; | |
2016 | + lock.off(); | |
2017 | + } | |
1975 | 2018 | |
1976 | -} | |
1977 | 2019 | // ----------------------------------- |
1978 | 2020 | bool ChanPacketBuffer::willSkip() |
1979 | 2021 | { |
@@ -2898,6 +2940,35 @@ ChanHit *ChanMgr::addHit(ChanHit &h) | ||
2898 | 2940 | } |
2899 | 2941 | |
2900 | 2942 | // ----------------------------------- |
2943 | +bool ChanMgr::findParentHit(ChanHit &p) | |
2944 | +{ | |
2945 | + ChanHitList *hl=NULL; | |
2946 | + | |
2947 | + chanMgr->hitlistlock.on(); | |
2948 | + | |
2949 | + hl = findHitListByID(p.chanID); | |
2950 | + | |
2951 | + if (hl) | |
2952 | + { | |
2953 | + ChanHit *ch = hl->hit; | |
2954 | + while (ch) | |
2955 | + { | |
2956 | + if (!ch->dead && (ch->rhost[0].ip == p.uphost.ip) | |
2957 | + && (ch->rhost[0].port == p.uphost.port)) | |
2958 | + { | |
2959 | + chanMgr->hitlistlock.off(); | |
2960 | + return 1; | |
2961 | + } | |
2962 | + ch = ch->next; | |
2963 | + } | |
2964 | + } | |
2965 | + | |
2966 | + chanMgr->hitlistlock.off(); | |
2967 | + | |
2968 | + return 0; | |
2969 | +} | |
2970 | + | |
2971 | +// ----------------------------------- | |
2901 | 2972 | class ChanFindInfo : public ThreadInfo |
2902 | 2973 | { |
2903 | 2974 | public: |
@@ -3064,6 +3135,7 @@ void ChanHit::init() | ||
3064 | 3135 | version_ex_number = 0; |
3065 | 3136 | |
3066 | 3137 | status = 0; |
3138 | + servent_id = 0; | |
3067 | 3139 | |
3068 | 3140 | sessionID.clear(); |
3069 | 3141 | chanID.clear(); |
@@ -4503,6 +4575,22 @@ int ChanHitSearch::getRelayHost(Host host1, Host host2, GnuID exID, ChanHitList | ||
4503 | 4575 | riSequence &= 0xffffff; |
4504 | 4576 | seqLock.off(); |
4505 | 4577 | |
4578 | + Servent *s = servMgr->servents; | |
4579 | + while (s) { | |
4580 | + if (s->serventHit.rhost[0].port && s->type == Servent::T_RELAY | |
4581 | + && s->chanID.isSame(chl->info.id)) { | |
4582 | + int i = index % MAX_RESULTS; | |
4583 | + if (index < MAX_RESULTS | |
4584 | + || tmpHit[i].lastSendSeq > s->serventHit.lastSendSeq) { | |
4585 | + s->serventHit.lastSendSeq = seq; | |
4586 | + tmpHit[i] = s->serventHit; | |
4587 | + tmpHit[i].host = s->serventHit.rhost[0]; | |
4588 | + index++; | |
4589 | + } | |
4590 | + } | |
4591 | + s = s->next; | |
4592 | + } | |
4593 | + | |
4506 | 4594 | ChanHit *hit = chl->hit; |
4507 | 4595 | |
4508 | 4596 | while(hit){ |
@@ -4534,6 +4622,7 @@ int ChanHitSearch::getRelayHost(Host host1, Host host2, GnuID exID, ChanHitList | ||
4534 | 4622 | //rnd = (float)rand() / (float)RAND_MAX; |
4535 | 4623 | rnd = rand() % base; |
4536 | 4624 | if (hit->numHops == 1){ |
4625 | +#if 0 | |
4537 | 4626 | if (tmpHit[index % MAX_RESULTS].numHops == 1){ |
4538 | 4627 | if (rnd < prob){ |
4539 | 4628 | tmpHit[index % MAX_RESULTS] = *hit; |
@@ -4545,8 +4634,9 @@ int ChanHitSearch::getRelayHost(Host host1, Host host2, GnuID exID, ChanHitList | ||
4545 | 4634 | tmpHit[index % MAX_RESULTS].host = hit->rhost[0]; |
4546 | 4635 | index++; |
4547 | 4636 | } |
4637 | +#endif | |
4548 | 4638 | } else { |
4549 | - if ((tmpHit[index % MAX_RESULTS].numHops != 1) && (rnd < prob)){ | |
4639 | + if ((tmpHit[index % MAX_RESULTS].numHops != 1) && (rnd < prob) || rnd == 0){ | |
4550 | 4640 | tmpHit[index % MAX_RESULTS] = *hit; |
4551 | 4641 | tmpHit[index % MAX_RESULTS].host = hit->rhost[0]; |
4552 | 4642 | index++; |
@@ -207,6 +207,8 @@ public: | ||
207 | 207 | |
208 | 208 | char version_ex_prefix[2]; |
209 | 209 | unsigned int version_ex_number; |
210 | + | |
211 | + unsigned int lastSendSeq; | |
210 | 212 | }; |
211 | 213 | // ---------------------------------- |
212 | 214 | class ChanHitList |
@@ -603,7 +605,7 @@ public: | ||
603 | 605 | |
604 | 606 | int pickHits(ChanHitSearch &); |
605 | 607 | |
606 | - | |
608 | + bool findParentHit(ChanHit &p); | |
607 | 609 | |
608 | 610 | Channel *channel; |
609 | 611 | ChanHitList *hitlist; |
@@ -58,6 +58,7 @@ public: | ||
58 | 58 | pos = 0; |
59 | 59 | sync = 0; |
60 | 60 | skip = false; |
61 | + priority = 0; | |
61 | 62 | } |
62 | 63 | void init(ChanPacketv &p); |
63 | 64 | void init(TYPE t, const void *, unsigned int , unsigned int ); |
@@ -74,6 +75,7 @@ public: | ||
74 | 75 | char data[MAX_DATALEN]; |
75 | 76 | bool skip; |
76 | 77 | |
78 | + int priority; | |
77 | 79 | }; |
78 | 80 | // ---------------------------------- |
79 | 81 | class ChanPacketv |
@@ -111,6 +113,7 @@ public: | ||
111 | 113 | skip = false; |
112 | 114 | data = NULL; |
113 | 115 | datasize = 0; |
116 | + priority = 0; | |
114 | 117 | } |
115 | 118 | void init(ChanPacket &p) |
116 | 119 | { |
@@ -124,6 +127,7 @@ public: | ||
124 | 127 | pos = p.pos; |
125 | 128 | sync = p.sync; |
126 | 129 | skip = p.skip; |
130 | + priority = p.priority; | |
127 | 131 | if (!data) { |
128 | 132 | datasize = (len & ~(BSIZE - 1)) + BSIZE; |
129 | 133 | data = new char[datasize]; |
@@ -149,6 +153,7 @@ public: | ||
149 | 153 | unsigned int datasize; |
150 | 154 | bool skip; |
151 | 155 | |
156 | + int priority; | |
152 | 157 | }; |
153 | 158 | // ---------------------------------- |
154 | 159 | class ChanPacketBuffer |
@@ -176,6 +181,7 @@ public: | ||
176 | 181 | |
177 | 182 | bool writePacket(ChanPacket &,bool = false); |
178 | 183 | void readPacket(ChanPacket &); |
184 | + void readPacketPri(ChanPacket &); | |
179 | 185 | |
180 | 186 | bool willSkip(); |
181 | 187 |
@@ -221,6 +227,7 @@ public: | ||
221 | 227 | virtual void kill() {} |
222 | 228 | virtual bool sendPacket(ChanPacket &,GnuID &) {return false;} |
223 | 229 | virtual void flush(Stream &) {} |
230 | + virtual unsigned int flushUb(Stream &, unsigned int) { return 0; } | |
224 | 231 | virtual void readHeader(Stream &,Channel *)=0; |
225 | 232 | virtual int readPacket(Stream &,Channel *)=0; |
226 | 233 | virtual void readEnd(Stream &,Channel *)=0; |
@@ -84,6 +84,30 @@ void PCPStream::flush(Stream &in) | ||
84 | 84 | pack.writeRaw(in); |
85 | 85 | } |
86 | 86 | } |
87 | + | |
88 | +// ------------------------------------------ | |
89 | +unsigned int PCPStream::flushUb(Stream &in, unsigned int size) | |
90 | +{ | |
91 | + ChanPacket pack; | |
92 | + unsigned int len = 0, skip = 0; | |
93 | + | |
94 | + while (outData.numPending()) | |
95 | + { | |
96 | + outData.readPacketPri(pack); | |
97 | + | |
98 | + if (size >= len + pack.len) { | |
99 | + len += pack.len; | |
100 | + pack.writeRaw(in); | |
101 | + } else { | |
102 | + skip++; | |
103 | + } | |
104 | + } | |
105 | + if (skip > 0) | |
106 | + LOG_DEBUG("PCPStream::flushUb: skip %d packets", skip); | |
107 | + | |
108 | + return len; | |
109 | +} | |
110 | + | |
87 | 111 | // ------------------------------------------ |
88 | 112 | int PCPStream::readPacket(Stream &in,Channel *) |
89 | 113 | { |
@@ -433,9 +457,17 @@ void PCPStream::readHostAtoms(AtomStream &atom, int numc, BroadcastState &bcs, C | ||
433 | 457 | ipNum = 1; |
434 | 458 | } |
435 | 459 | else if (id == PCP_HOST_NUML) |
460 | + { | |
436 | 461 | hit.numListeners = atom.readInt(); |
462 | + if (hit.numListeners > 10) | |
463 | + hit.numListeners = 10; | |
464 | + } | |
437 | 465 | else if (id == PCP_HOST_NUMR) |
466 | + { | |
438 | 467 | hit.numRelays = atom.readInt(); |
468 | + if (hit.numRelays > 100) | |
469 | + hit.numRelays = 100; | |
470 | + } | |
439 | 471 | else if (id == PCP_HOST_UPTIME) |
440 | 472 | hit.upTime = atom.readInt(); |
441 | 473 | else if (id == PCP_HOST_OLDPOS) |
@@ -500,9 +532,11 @@ void PCPStream::readHostAtoms(AtomStream &atom, int numc, BroadcastState &bcs, C | ||
500 | 532 | |
501 | 533 | if (hit.numHops == 1){ |
502 | 534 | Servent *sv = servMgr->findServentByServentID(hit.servent_id); |
503 | - if (sv){ | |
535 | + if (sv && sv->getHost().ip == hit.host.ip){ | |
504 | 536 | // LOG_DEBUG("set servent's waitPort = %d", hit.host.port); |
505 | 537 | sv->waitPort = hit.host.port; |
538 | + hit.lastSendSeq = sv->serventHit.lastSendSeq; | |
539 | + sv->serventHit = hit; | |
506 | 540 | } |
507 | 541 | } |
508 | 542 | } |
@@ -696,6 +730,7 @@ int PCPStream::readBroadcastAtoms(AtomStream &atom,int numc,BroadcastState &bcs) | ||
696 | 730 | { |
697 | 731 | ChanHit hit; |
698 | 732 | readHostAtoms(atom,c,bcs,hit,false); |
733 | + Servent *sv = servMgr->findServentByServentID(bcs.servent_id); | |
699 | 734 | if (hit.uphost.ip == 0){ |
700 | 735 | // LOG_DEBUG("bcs servent_id = %d", bcs.servent_id); |
701 | 736 | if (bcs.numHops == 1){ |
@@ -703,7 +738,7 @@ int PCPStream::readBroadcastAtoms(AtomStream &atom,int numc,BroadcastState &bcs) | ||
703 | 738 | hit.uphost.port = servMgr->serverHost.port; |
704 | 739 | hit.uphostHops = 1; |
705 | 740 | } else { |
706 | - Servent *sv = servMgr->findServentByServentID(bcs.servent_id); | |
741 | + //Servent *sv = servMgr->findServentByServentID(bcs.servent_id); | |
707 | 742 | if (sv){ |
708 | 743 | hit.uphost.ip = sv->getHost().ip; |
709 | 744 | hit.uphost.port = sv->waitPort; |
@@ -711,10 +746,21 @@ int PCPStream::readBroadcastAtoms(AtomStream &atom,int numc,BroadcastState &bcs) | ||
711 | 746 | } |
712 | 747 | } |
713 | 748 | } |
714 | - int oldPos = pmem.pos; | |
715 | - hit.writeAtoms(patom, hit.chanID); | |
716 | - pmem.pos = oldPos; | |
717 | - r = readAtom(patom,bcs); | |
749 | + if (sv && | |
750 | + ((hit.numHops == 1 && (hit.rhost[0].ip == sv->getHost().ip | |
751 | + && hit.uphost.ip == servMgr->serverHost.ip && hit.uphost.port == servMgr->serverHost.port) | |
752 | + || (hit.rhost[1].localIP() && hit.rhost[1].ip == sv->getHost().ip)) | |
753 | + || chanMgr->findParentHit(hit))) | |
754 | + { | |
755 | + int oldPos = pmem.pos; | |
756 | + hit.writeAtoms(patom, hit.chanID); | |
757 | + pmem.pos = oldPos; | |
758 | + r = readAtom(patom,bcs); | |
759 | + } else { | |
760 | + LOG_DEBUG("### Invalid bcst: hops=%d, ver=%d(VP%04d), ttl=%d", | |
761 | + bcs.numHops,ver,ver_vp,ttl); | |
762 | + ttl = 0; | |
763 | + } | |
718 | 764 | } else { |
719 | 765 | // copy and process atoms |
720 | 766 | int oldPos = pmem.pos; |
@@ -761,6 +807,7 @@ int PCPStream::readBroadcastAtoms(AtomStream &atom,int numc,BroadcastState &bcs) | ||
761 | 807 | |
762 | 808 | if (bcs.group & (/*PCP_BCST_GROUP_ROOT|*/PCP_BCST_GROUP_TRACKERS|PCP_BCST_GROUP_RELAYS)) |
763 | 809 | { |
810 | + pack.priority = 11 - bcs.numHops; | |
764 | 811 | chanMgr->broadcastPacketUp(pack,bcs.chanID,remoteID,destID); |
765 | 812 | } |
766 | 813 |
@@ -235,6 +235,7 @@ public: | ||
235 | 235 | |
236 | 236 | virtual bool sendPacket(ChanPacket &,GnuID &); |
237 | 237 | virtual void flush(Stream &); |
238 | + virtual unsigned int flushUb(Stream &, unsigned int); | |
238 | 239 | virtual void readHeader(Stream &,Channel *); |
239 | 240 | virtual int readPacket(Stream &,Channel *); |
240 | 241 | virtual void readEnd(Stream &,Channel *); |
@@ -240,6 +240,8 @@ void Servent::reset() | ||
240 | 240 | type = T_NONE; |
241 | 241 | |
242 | 242 | channel_id = 0; |
243 | + | |
244 | + serventHit.init(); | |
243 | 245 | } |
244 | 246 | // ----------------------------------- |
245 | 247 | bool Servent::sendPacket(ChanPacket &pack,GnuID &cid,GnuID &sid,GnuID &did,Servent::TYPE t) |
@@ -807,6 +809,7 @@ bool Servent::handshakeStream(ChanInfo &chanInfo) | ||
807 | 809 | |
808 | 810 | bool gotPCP=false; |
809 | 811 | unsigned int reqPos=0; |
812 | + unsigned short listenPort = 0; | |
810 | 813 | |
811 | 814 | nsSwitchNum=0; |
812 | 815 |
@@ -820,6 +823,8 @@ bool Servent::handshakeStream(ChanInfo &chanInfo) | ||
820 | 823 | gotPCP = atoi(arg)!=0; |
821 | 824 | else if (http.isHeader(PCX_HS_POS)) |
822 | 825 | reqPos = atoi(arg); |
826 | + else if (http.isHeader(PCX_HS_PORT)) | |
827 | + listenPort = (unsigned short)atoi(arg); | |
823 | 828 | else if (http.isHeader("icy-metadata")) |
824 | 829 | addMetadata = atoi(arg) > 0; |
825 | 830 | else if (http.isHeader(HTTP_HS_AGENT)) |
@@ -873,22 +878,23 @@ bool Servent::handshakeStream(ChanInfo &chanInfo) | ||
873 | 878 | } |
874 | 879 | |
875 | 880 | chanID = chanInfo.id; |
881 | + serventHit.rhost[0].ip = getHost().ip; | |
882 | + serventHit.rhost[0].port = listenPort; | |
883 | + serventHit.host = serventHit.rhost[0]; | |
884 | + serventHit.chanID = chanID; | |
885 | + | |
876 | 886 | canStreamLock.on(); |
877 | 887 | chanReady = canStream(ch); |
878 | - if (0 && !chanReady) | |
888 | + if (/*0 && */!chanReady) | |
879 | 889 | { |
880 | 890 | if (servMgr->numStreams(chanID, Servent::T_RELAY, false) == 0) |
881 | 891 | { |
882 | 892 | sourceHit = &ch->sourceHost; // send source host info |
883 | 893 | |
884 | - if (ch->info.getUptime() > 60) // if stable | |
894 | + if (listenPort && ch->info.getUptime() > 60) // if stable | |
885 | 895 | { |
886 | 896 | // connect "this" host later |
887 | - ChanHit nh; | |
888 | - nh.init(); | |
889 | - nh.chanID = chanID; | |
890 | - nh.rhost[0] = getHost(); | |
891 | - chanMgr->addHit(nh); | |
897 | + chanMgr->addHit(serventHit); | |
892 | 898 | } |
893 | 899 | |
894 | 900 | char tmp[50]; |
@@ -896,11 +902,11 @@ bool Servent::handshakeStream(ChanInfo &chanInfo) | ||
896 | 902 | LOG_DEBUG("Bump channel, hand over sourceHost to %s", tmp); |
897 | 903 | ch->bump = true; |
898 | 904 | } |
899 | - else if (servMgr->kickUnrelayableHost(chanID, this) != 0) | |
905 | + else if (servMgr->kickUnrelayableHost(chanID, serventHit) != 0) | |
900 | 906 | { |
901 | 907 | chanReady = canStream(ch); |
902 | 908 | if (!chanReady) |
903 | - LOG_DEBUG("Kicked unrelayable host, but still cannot stream"); | |
909 | + LOG_DEBUG("*** Kicked unrelayable host, but still cannot stream"); | |
904 | 910 | } |
905 | 911 | } |
906 | 912 | if (!chanReady) type = T_INCOMING; |
@@ -1114,8 +1120,8 @@ bool Servent::handshakeStream(ChanInfo &chanInfo) | ||
1114 | 1120 | if (sourceHit) { |
1115 | 1121 | char tmp[50]; |
1116 | 1122 | sourceHit->writeAtoms(atom2, chanInfo.id); |
1117 | - chs.best[i].host.toStr(tmp); | |
1118 | - LOG_DEBUG("relay info: %s hops = %d", tmp, chs.best[i].numHops); | |
1123 | + sourceHit->host.toStr(tmp); | |
1124 | + LOG_DEBUG("relay info(sourceHit): %s", tmp); | |
1119 | 1125 | best.host.ip = sourceHit->host.ip; |
1120 | 1126 | } |
1121 | 1127 |
@@ -2792,6 +2798,8 @@ void Servent::sendPeercastChannel() | ||
2792 | 2798 | void Servent::sendPCPChannel() |
2793 | 2799 | { |
2794 | 2800 | bool skipCheck = false; |
2801 | + unsigned int ptime = 0; | |
2802 | + int npacket = 0, upsize = 0; | |
2795 | 2803 | |
2796 | 2804 | Channel *ch = chanMgr->findChannelByID(chanID); |
2797 | 2805 | if (!ch) |
@@ -2929,11 +2937,23 @@ void Servent::sendPCPChannel() | ||
2929 | 2937 | BroadcastState bcs; |
2930 | 2938 | bcs.servent_id = servent_id; |
2931 | 2939 | // error = pcpStream->readPacket(*sock,bcs); |
2932 | - do { | |
2940 | + | |
2941 | + unsigned int t = sys->getTime(); | |
2942 | + if (t != ptime) { | |
2943 | + ptime = t; | |
2944 | + npacket = MAX_PROC_PACKETS; | |
2945 | + upsize = MAX_OUTWARD_SIZE; | |
2946 | + } | |
2947 | + | |
2948 | + int len = pcpStream->flushUb(*sock, upsize); | |
2949 | + upsize -= len; | |
2950 | + | |
2951 | + while (npacket > 0 && sock->readReady()) { | |
2952 | + npacket--; | |
2933 | 2953 | error = pcpStream->readPacket(*sock,bcs); |
2934 | 2954 | if (error) |
2935 | 2955 | throw StreamException("PCP exception"); |
2936 | - } while (sock->readReady() || pcpStream->outData.numPending()); | |
2956 | + } | |
2937 | 2957 | |
2938 | 2958 | sys->sleepIdle(); |
2939 | 2959 |
@@ -46,6 +46,12 @@ public: | ||
46 | 46 | MAX_OUTPACKETS = 32 // max. output packets per queue (normal/priority) |
47 | 47 | }; |
48 | 48 | |
49 | + enum | |
50 | + { | |
51 | + MAX_PROC_PACKETS = 300, | |
52 | + MAX_OUTWARD_SIZE = 1024 * 10 | |
53 | + }; | |
54 | + | |
49 | 55 | enum TYPE |
50 | 56 | { |
51 | 57 | T_NONE, // Not allocated |
@@ -288,6 +294,8 @@ public: | ||
288 | 294 | unsigned int lastSkipCount; |
289 | 295 | unsigned int waitPort; |
290 | 296 | |
297 | + ChanHit serventHit; | |
298 | + | |
291 | 299 | int channel_id; |
292 | 300 | }; |
293 | 301 |
@@ -2623,6 +2623,7 @@ void ServMgr::banFirewalledHost() | ||
2623 | 2623 | } |
2624 | 2624 | |
2625 | 2625 | // -------------------------------------------------- |
2626 | +#if 0 | |
2626 | 2627 | static ChanHit *findServentHit(Servent *s) |
2627 | 2628 | { |
2628 | 2629 | ChanHitList *chl = chanMgr->findHitListByID(s->chanID); |
@@ -2640,8 +2641,9 @@ static ChanHit *findServentHit(Servent *s) | ||
2640 | 2641 | } |
2641 | 2642 | return NULL; |
2642 | 2643 | } |
2644 | +#endif | |
2643 | 2645 | // -------------------------------------------------- |
2644 | -int ServMgr::kickUnrelayableHost(GnuID &chid, Servent *ns) | |
2646 | +int ServMgr::kickUnrelayableHost(GnuID &chid, ChanHit &sendhit) | |
2645 | 2647 | { |
2646 | 2648 | Servent *ks = NULL; |
2647 | 2649 | Servent *s = servMgr->servents; |
@@ -2652,9 +2654,8 @@ int ServMgr::kickUnrelayableHost(GnuID &chid, Servent *ns) | ||
2652 | 2654 | { |
2653 | 2655 | Host h = s->getHost(); |
2654 | 2656 | |
2655 | - chanMgr->hitlistlock.on(); | |
2656 | - ChanHit *hit = findServentHit(s); | |
2657 | - if (hit && !hit->relay && hit->numRelays == 0) | |
2657 | + ChanHit hit = s->serventHit; | |
2658 | + if (!hit.relay && hit.numRelays == 0) | |
2658 | 2659 | { |
2659 | 2660 | char hostName[256]; |
2660 | 2661 | h.toStr(hostName); |
@@ -2663,25 +2664,18 @@ int ServMgr::kickUnrelayableHost(GnuID &chid, Servent *ns) | ||
2663 | 2664 | if (!ks || s->lastConnect < ks->lastConnect) // elder servent |
2664 | 2665 | ks = s; |
2665 | 2666 | } |
2666 | - chanMgr->hitlistlock.off(); | |
2667 | 2667 | } |
2668 | 2668 | s = s->next; |
2669 | 2669 | } |
2670 | 2670 | |
2671 | 2671 | if (ks) |
2672 | 2672 | { |
2673 | - if (ns) | |
2673 | + if (sendhit.rhost[0].port) | |
2674 | 2674 | { |
2675 | - Host h = ns->getHost(); | |
2676 | - ChanHit nh; | |
2677 | - nh.init(); | |
2678 | - nh.chanID = chid; | |
2679 | - nh.rhost[0] = h; | |
2680 | - | |
2681 | 2675 | ChanPacket pack; |
2682 | 2676 | MemoryStream mem(pack.data,sizeof(pack.data)); |
2683 | 2677 | AtomStream atom(mem); |
2684 | - nh.writeAtoms(atom, chid); | |
2678 | + sendhit.writeAtoms(atom, chid); | |
2685 | 2679 | pack.len = mem.pos; |
2686 | 2680 | pack.type = ChanPacket::T_PCP; |
2687 | 2681 | GnuID noID; |
@@ -402,7 +402,7 @@ public: | ||
402 | 402 | unsigned int kickPushTime; |
403 | 403 | bool isCheckPushStream(); //JP-EX |
404 | 404 | void banFirewalledHost(); //JP-EX |
405 | - int kickUnrelayableHost(GnuID &, Servent * = NULL); | |
405 | + int kickUnrelayableHost(GnuID &, ChanHit &); | |
406 | 406 | |
407 | 407 | bool getModulePath; //JP-EX |
408 | 408 | bool clearPLS; //JP-EX |
@@ -29,22 +29,22 @@ static bool PCP_FORCE_YP = false; | ||
29 | 29 | #endif |
30 | 30 | // ------------------------------------------------ |
31 | 31 | static const int PCP_CLIENT_VERSION = 1218; |
32 | -static const int PCP_CLIENT_VERSION_VP = 25; | |
32 | +static const int PCP_CLIENT_VERSION_VP = 26; | |
33 | 33 | static const int PCP_ROOT_VERSION = 1218; |
34 | 34 | |
35 | 35 | static const int PCP_CLIENT_MINVERSION = 1200; |
36 | 36 | |
37 | 37 | static const char *PCX_AGENT = "PeerCast/0.1218"; |
38 | 38 | static const char *PCX_AGENTJP = "PeerCast/0.1218-J"; |
39 | -static const char *PCX_AGENTVP = "PeerCast/0.1218(VP0025-1)"; | |
40 | -static const char *PCX_VERSTRING = "v0.1218(VP0025-1)"; | |
39 | +static const char *PCX_AGENTVP = "PeerCast/0.1218(VP0026)"; | |
40 | +static const char *PCX_VERSTRING = "v0.1218(VP0026)"; | |
41 | 41 | |
42 | 42 | #if 1 /* for VP extend version */ |
43 | 43 | #define VERSION_EX 1 |
44 | 44 | static const char *PCP_CLIENT_VERSION_EX_PREFIX = "IM"; // 2bytes only |
45 | -static const int PCP_CLIENT_VERSION_EX_NUMBER = 7650; | |
46 | -static const char *PCX_AGENTEX = "PeerCast/0.1218(IM-VP25-1)"; | |
47 | -static const char *PCX_VERSTRING_EX = "v0.1218(IM765-VP25-1)"; | |
45 | +static const int PCP_CLIENT_VERSION_EX_NUMBER = 26; | |
46 | +static const char *PCX_AGENTEX = "PeerCast/0.1218(IM0026-patch071223)"; | |
47 | +static const char *PCX_VERSTRING_EX = "v0.1218(IM0026)"; | |
48 | 48 | #endif |
49 | 49 | |
50 | 50 | // ------------------------------------------------ |
@@ -537,6 +537,7 @@ int Channel::handshakeFetch() | ||
537 | 537 | sock->writeLineF("GET /channel/%s HTTP/1.0",idStr); |
538 | 538 | sock->writeLineF("%s %d",PCX_HS_POS,streamPos); |
539 | 539 | sock->writeLineF("%s %d",PCX_HS_PCP,1); |
540 | + sock->writeLineF("%s %d",PCX_HS_PORT,servMgr->serverHost.port); | |
540 | 541 | |
541 | 542 | sock->writeLine(""); |
542 | 543 |
@@ -1475,8 +1476,11 @@ bool ChannelStream::getStatus(Channel *ch,ChanPacket &pack) | ||
1475 | 1476 | // ----------------------------------- |
1476 | 1477 | bool Channel::checkBump() |
1477 | 1478 | { |
1479 | + unsigned int maxIdleTime = 30; | |
1480 | + if (isIndexTxt(this)) maxIdleTime = 60; | |
1481 | + | |
1478 | 1482 | if (!isBroadcasting() && (!sourceHost.tracker)) |
1479 | - if (rawData.lastWriteTime && ((sys->getTime() - rawData.lastWriteTime) > 30)) | |
1483 | + if (rawData.lastWriteTime && ((sys->getTime() - rawData.lastWriteTime) > maxIdleTime)) | |
1480 | 1484 | { |
1481 | 1485 | LOG_ERROR("Channel Auto bumped"); |
1482 | 1486 | bump = true; |
@@ -1509,6 +1513,9 @@ int Channel::readStream(Stream &in,ChannelStream *source) | ||
1509 | 1513 | |
1510 | 1514 | unsigned int receiveStartTime = 0; |
1511 | 1515 | |
1516 | + unsigned int ptime = 0; | |
1517 | + unsigned int upsize = 0; | |
1518 | + | |
1512 | 1519 | try |
1513 | 1520 | { |
1514 | 1521 | while (thread.active && !peercastInst->isQuitting) |
@@ -1573,7 +1580,14 @@ int Channel::readStream(Stream &in,ChannelStream *source) | ||
1573 | 1580 | } |
1574 | 1581 | } |
1575 | 1582 | |
1576 | - source->flush(in); | |
1583 | + unsigned int t = sys->getTime(); | |
1584 | + if (t != ptime) { | |
1585 | + ptime = t; | |
1586 | + upsize = Servent::MAX_OUTWARD_SIZE; | |
1587 | + } | |
1588 | + | |
1589 | + unsigned int len = source->flushUb(in, upsize); | |
1590 | + upsize -= len; | |
1577 | 1591 | |
1578 | 1592 | sys->sleepIdle(); |
1579 | 1593 | } |
@@ -1733,9 +1747,12 @@ void ChanPacket::init(ChanPacketv &p) | ||
1733 | 1747 | { |
1734 | 1748 | type = p.type; |
1735 | 1749 | len = p.len; |
1750 | + if (len > MAX_DATALEN) | |
1751 | + throw StreamException("Packet data too large"); | |
1736 | 1752 | pos = p.pos; |
1737 | 1753 | sync = p.sync; |
1738 | 1754 | skip = p.skip; |
1755 | + priority = p.priority; | |
1739 | 1756 | memcpy(data, p.data, len); |
1740 | 1757 | } |
1741 | 1758 | // ----------------------------------- |
@@ -1748,6 +1765,7 @@ void ChanPacket::init(TYPE t, const void *p, unsigned int l,unsigned int _pos) | ||
1748 | 1765 | memcpy(data,p,len); |
1749 | 1766 | pos = _pos; |
1750 | 1767 | skip = false; |
1768 | + priority = 0; | |
1751 | 1769 | } |
1752 | 1770 | // ----------------------------------- |
1753 | 1771 | void ChanPacket::writeRaw(Stream &out) |
@@ -1970,10 +1988,34 @@ void ChanPacketBuffer::readPacket(ChanPacket &pack) | ||
1970 | 1988 | pack.init(packets[readPos%MAX_PACKETS]); |
1971 | 1989 | readPos++; |
1972 | 1990 | lock.off(); |
1991 | +} | |
1973 | 1992 | |
1974 | - sys->sleepIdle(); | |
1993 | +// ----------------------------------- | |
1994 | +void ChanPacketBuffer::readPacketPri(ChanPacket &pack) | |
1995 | +{ | |
1996 | + unsigned int tim = sys->getTime(); | |
1997 | + | |
1998 | + if (readPos < firstPos) | |
1999 | + throw StreamException("Read too far behind"); | |
2000 | + | |
2001 | + while (readPos >= writePos) | |
2002 | + { | |
2003 | + sys->sleepIdle(); | |
2004 | + if ((sys->getTime() - tim) > 30) | |
2005 | + throw TimeoutException(); | |
2006 | + } | |
2007 | + lock.on(); | |
2008 | + ChanPacketv *best = &packets[readPos % MAX_PACKETS]; | |
2009 | + for (unsigned int i = readPos + 1; i < writePos; i++) { | |
2010 | + if (packets[i % MAX_PACKETS].priority > best->priority) | |
2011 | + best = &packets[i % MAX_PACKETS]; | |
2012 | + } | |
2013 | + pack.init(*best); | |
2014 | + best->init(packets[readPos % MAX_PACKETS]); | |
2015 | + readPos++; | |
2016 | + lock.off(); | |
2017 | + } | |
1975 | 2018 | |
1976 | -} | |
1977 | 2019 | // ----------------------------------- |
1978 | 2020 | bool ChanPacketBuffer::willSkip() |
1979 | 2021 | { |
@@ -2898,6 +2940,35 @@ ChanHit *ChanMgr::addHit(ChanHit &h) | ||
2898 | 2940 | } |
2899 | 2941 | |
2900 | 2942 | // ----------------------------------- |
2943 | +bool ChanMgr::findParentHit(ChanHit &p) | |
2944 | +{ | |
2945 | + ChanHitList *hl=NULL; | |
2946 | + | |
2947 | + chanMgr->hitlistlock.on(); | |
2948 | + | |
2949 | + hl = findHitListByID(p.chanID); | |
2950 | + | |
2951 | + if (hl) | |
2952 | + { | |
2953 | + ChanHit *ch = hl->hit; | |
2954 | + while (ch) | |
2955 | + { | |
2956 | + if (!ch->dead && (ch->rhost[0].ip == p.uphost.ip) | |
2957 | + && (ch->rhost[0].port == p.uphost.port)) | |
2958 | + { | |
2959 | + chanMgr->hitlistlock.off(); | |
2960 | + return 1; | |
2961 | + } | |
2962 | + ch = ch->next; | |
2963 | + } | |
2964 | + } | |
2965 | + | |
2966 | + chanMgr->hitlistlock.off(); | |
2967 | + | |
2968 | + return 0; | |
2969 | +} | |
2970 | + | |
2971 | +// ----------------------------------- | |
2901 | 2972 | class ChanFindInfo : public ThreadInfo |
2902 | 2973 | { |
2903 | 2974 | public: |
@@ -3064,6 +3135,7 @@ void ChanHit::init() | ||
3064 | 3135 | version_ex_number = 0; |
3065 | 3136 | |
3066 | 3137 | status = 0; |
3138 | + servent_id = 0; | |
3067 | 3139 | |
3068 | 3140 | sessionID.clear(); |
3069 | 3141 | chanID.clear(); |
@@ -4503,6 +4575,22 @@ int ChanHitSearch::getRelayHost(Host host1, Host host2, GnuID exID, ChanHitList | ||
4503 | 4575 | riSequence &= 0xffffff; |
4504 | 4576 | seqLock.off(); |
4505 | 4577 | |
4578 | + Servent *s = servMgr->servents; | |
4579 | + while (s) { | |
4580 | + if (s->serventHit.rhost[0].port && s->type == Servent::T_RELAY | |
4581 | + && s->chanID.isSame(chl->info.id)) { | |
4582 | + int i = index % MAX_RESULTS; | |
4583 | + if (index < MAX_RESULTS | |
4584 | + || tmpHit[i].lastSendSeq > s->serventHit.lastSendSeq) { | |
4585 | + s->serventHit.lastSendSeq = seq; | |
4586 | + tmpHit[i] = s->serventHit; | |
4587 | + tmpHit[i].host = s->serventHit.rhost[0]; | |
4588 | + index++; | |
4589 | + } | |
4590 | + } | |
4591 | + s = s->next; | |
4592 | + } | |
4593 | + | |
4506 | 4594 | ChanHit *hit = chl->hit; |
4507 | 4595 | |
4508 | 4596 | while(hit){ |
@@ -4534,6 +4622,7 @@ int ChanHitSearch::getRelayHost(Host host1, Host host2, GnuID exID, ChanHitList | ||
4534 | 4622 | //rnd = (float)rand() / (float)RAND_MAX; |
4535 | 4623 | rnd = rand() % base; |
4536 | 4624 | if (hit->numHops == 1){ |
4625 | +#if 0 | |
4537 | 4626 | if (tmpHit[index % MAX_RESULTS].numHops == 1){ |
4538 | 4627 | if (rnd < prob){ |
4539 | 4628 | tmpHit[index % MAX_RESULTS] = *hit; |
@@ -4545,8 +4634,9 @@ int ChanHitSearch::getRelayHost(Host host1, Host host2, GnuID exID, ChanHitList | ||
4545 | 4634 | tmpHit[index % MAX_RESULTS].host = hit->rhost[0]; |
4546 | 4635 | index++; |
4547 | 4636 | } |
4637 | +#endif | |
4548 | 4638 | } else { |
4549 | - if ((tmpHit[index % MAX_RESULTS].numHops != 1) && (rnd < prob)){ | |
4639 | + if ((tmpHit[index % MAX_RESULTS].numHops != 1) && (rnd < prob) || rnd == 0){ | |
4550 | 4640 | tmpHit[index % MAX_RESULTS] = *hit; |
4551 | 4641 | tmpHit[index % MAX_RESULTS].host = hit->rhost[0]; |
4552 | 4642 | index++; |
@@ -207,6 +207,8 @@ public: | ||
207 | 207 | |
208 | 208 | char version_ex_prefix[2]; |
209 | 209 | unsigned int version_ex_number; |
210 | + | |
211 | + unsigned int lastSendSeq; | |
210 | 212 | }; |
211 | 213 | // ---------------------------------- |
212 | 214 | class ChanHitList |
@@ -603,7 +605,7 @@ public: | ||
603 | 605 | |
604 | 606 | int pickHits(ChanHitSearch &); |
605 | 607 | |
606 | - | |
608 | + bool findParentHit(ChanHit &p); | |
607 | 609 | |
608 | 610 | Channel *channel; |
609 | 611 | ChanHitList *hitlist; |
@@ -58,6 +58,7 @@ public: | ||
58 | 58 | pos = 0; |
59 | 59 | sync = 0; |
60 | 60 | skip = false; |
61 | + priority = 0; | |
61 | 62 | } |
62 | 63 | void init(ChanPacketv &p); |
63 | 64 | void init(TYPE t, const void *, unsigned int , unsigned int ); |
@@ -74,6 +75,7 @@ public: | ||
74 | 75 | char data[MAX_DATALEN]; |
75 | 76 | bool skip; |
76 | 77 | |
78 | + int priority; | |
77 | 79 | }; |
78 | 80 | // ---------------------------------- |
79 | 81 | class ChanPacketv |
@@ -111,6 +113,7 @@ public: | ||
111 | 113 | skip = false; |
112 | 114 | data = NULL; |
113 | 115 | datasize = 0; |
116 | + priority = 0; | |
114 | 117 | } |
115 | 118 | void init(ChanPacket &p) |
116 | 119 | { |
@@ -124,6 +127,7 @@ public: | ||
124 | 127 | pos = p.pos; |
125 | 128 | sync = p.sync; |
126 | 129 | skip = p.skip; |
130 | + priority = p.priority; | |
127 | 131 | if (!data) { |
128 | 132 | datasize = (len & ~(BSIZE - 1)) + BSIZE; |
129 | 133 | data = new char[datasize]; |
@@ -149,6 +153,7 @@ public: | ||
149 | 153 | unsigned int datasize; |
150 | 154 | bool skip; |
151 | 155 | |
156 | + int priority; | |
152 | 157 | }; |
153 | 158 | // ---------------------------------- |
154 | 159 | class ChanPacketBuffer |
@@ -176,6 +181,7 @@ public: | ||
176 | 181 | |
177 | 182 | bool writePacket(ChanPacket &,bool = false); |
178 | 183 | void readPacket(ChanPacket &); |
184 | + void readPacketPri(ChanPacket &); | |
179 | 185 | |
180 | 186 | bool willSkip(); |
181 | 187 |
@@ -221,6 +227,7 @@ public: | ||
221 | 227 | virtual void kill() {} |
222 | 228 | virtual bool sendPacket(ChanPacket &,GnuID &) {return false;} |
223 | 229 | virtual void flush(Stream &) {} |
230 | + virtual unsigned int flushUb(Stream &, unsigned int) { return 0; } | |
224 | 231 | virtual void readHeader(Stream &,Channel *)=0; |
225 | 232 | virtual int readPacket(Stream &,Channel *)=0; |
226 | 233 | virtual void readEnd(Stream &,Channel *)=0; |
@@ -84,6 +84,30 @@ void PCPStream::flush(Stream &in) | ||
84 | 84 | pack.writeRaw(in); |
85 | 85 | } |
86 | 86 | } |
87 | + | |
88 | +// ------------------------------------------ | |
89 | +unsigned int PCPStream::flushUb(Stream &in, unsigned int size) | |
90 | +{ | |
91 | + ChanPacket pack; | |
92 | + unsigned int len = 0, skip = 0; | |
93 | + | |
94 | + while (outData.numPending()) | |
95 | + { | |
96 | + outData.readPacketPri(pack); | |
97 | + | |
98 | + if (size >= len + pack.len) { | |
99 | + len += pack.len; | |
100 | + pack.writeRaw(in); | |
101 | + } else { | |
102 | + skip++; | |
103 | + } | |
104 | + } | |
105 | + if (skip > 0) | |
106 | + LOG_DEBUG("PCPStream::flushUb: skip %d packets", skip); | |
107 | + | |
108 | + return len; | |
109 | +} | |
110 | + | |
87 | 111 | // ------------------------------------------ |
88 | 112 | int PCPStream::readPacket(Stream &in,Channel *) |
89 | 113 | { |
@@ -433,9 +457,17 @@ void PCPStream::readHostAtoms(AtomStream &atom, int numc, BroadcastState &bcs, C | ||
433 | 457 | ipNum = 1; |
434 | 458 | } |
435 | 459 | else if (id == PCP_HOST_NUML) |
460 | + { | |
436 | 461 | hit.numListeners = atom.readInt(); |
462 | + if (hit.numListeners > 10) | |
463 | + hit.numListeners = 10; | |
464 | + } | |
437 | 465 | else if (id == PCP_HOST_NUMR) |
466 | + { | |
438 | 467 | hit.numRelays = atom.readInt(); |
468 | + if (hit.numRelays > 100) | |
469 | + hit.numRelays = 100; | |
470 | + } | |
439 | 471 | else if (id == PCP_HOST_UPTIME) |
440 | 472 | hit.upTime = atom.readInt(); |
441 | 473 | else if (id == PCP_HOST_OLDPOS) |
@@ -500,9 +532,11 @@ void PCPStream::readHostAtoms(AtomStream &atom, int numc, BroadcastState &bcs, C | ||
500 | 532 | |
501 | 533 | if (hit.numHops == 1){ |
502 | 534 | Servent *sv = servMgr->findServentByServentID(hit.servent_id); |
503 | - if (sv){ | |
535 | + if (sv && sv->getHost().ip == hit.host.ip){ | |
504 | 536 | // LOG_DEBUG("set servent's waitPort = %d", hit.host.port); |
505 | 537 | sv->waitPort = hit.host.port; |
538 | + hit.lastSendSeq = sv->serventHit.lastSendSeq; | |
539 | + sv->serventHit = hit; | |
506 | 540 | } |
507 | 541 | } |
508 | 542 | } |
@@ -696,6 +730,7 @@ int PCPStream::readBroadcastAtoms(AtomStream &atom,int numc,BroadcastState &bcs) | ||
696 | 730 | { |
697 | 731 | ChanHit hit; |
698 | 732 | readHostAtoms(atom,c,bcs,hit,false); |
733 | + Servent *sv = servMgr->findServentByServentID(bcs.servent_id); | |
699 | 734 | if (hit.uphost.ip == 0){ |
700 | 735 | // LOG_DEBUG("bcs servent_id = %d", bcs.servent_id); |
701 | 736 | if (bcs.numHops == 1){ |
@@ -703,7 +738,7 @@ int PCPStream::readBroadcastAtoms(AtomStream &atom,int numc,BroadcastState &bcs) | ||
703 | 738 | hit.uphost.port = servMgr->serverHost.port; |
704 | 739 | hit.uphostHops = 1; |
705 | 740 | } else { |
706 | - Servent *sv = servMgr->findServentByServentID(bcs.servent_id); | |
741 | + //Servent *sv = servMgr->findServentByServentID(bcs.servent_id); | |
707 | 742 | if (sv){ |
708 | 743 | hit.uphost.ip = sv->getHost().ip; |
709 | 744 | hit.uphost.port = sv->waitPort; |
@@ -711,10 +746,21 @@ int PCPStream::readBroadcastAtoms(AtomStream &atom,int numc,BroadcastState &bcs) | ||
711 | 746 | } |
712 | 747 | } |
713 | 748 | } |
714 | - int oldPos = pmem.pos; | |
715 | - hit.writeAtoms(patom, hit.chanID); | |
716 | - pmem.pos = oldPos; | |
717 | - r = readAtom(patom,bcs); | |
749 | + if (sv && | |
750 | + ((hit.numHops == 1 && (hit.rhost[0].ip == sv->getHost().ip | |
751 | + && hit.uphost.ip == servMgr->serverHost.ip && hit.uphost.port == servMgr->serverHost.port) | |
752 | + || (hit.rhost[1].localIP() && hit.rhost[1].ip == sv->getHost().ip)) | |
753 | + || chanMgr->findParentHit(hit))) | |
754 | + { | |
755 | + int oldPos = pmem.pos; | |
756 | + hit.writeAtoms(patom, hit.chanID); | |
757 | + pmem.pos = oldPos; | |
758 | + r = readAtom(patom,bcs); | |
759 | + } else { | |
760 | + LOG_DEBUG("### Invalid bcst: hops=%d, ver=%d(VP%04d), ttl=%d", | |
761 | + bcs.numHops,ver,ver_vp,ttl); | |
762 | + ttl = 0; | |
763 | + } | |
718 | 764 | } else { |
719 | 765 | // copy and process atoms |
720 | 766 | int oldPos = pmem.pos; |
@@ -761,6 +807,7 @@ int PCPStream::readBroadcastAtoms(AtomStream &atom,int numc,BroadcastState &bcs) | ||
761 | 807 | |
762 | 808 | if (bcs.group & (/*PCP_BCST_GROUP_ROOT|*/PCP_BCST_GROUP_TRACKERS|PCP_BCST_GROUP_RELAYS)) |
763 | 809 | { |
810 | + pack.priority = 11 - bcs.numHops; | |
764 | 811 | chanMgr->broadcastPacketUp(pack,bcs.chanID,remoteID,destID); |
765 | 812 | } |
766 | 813 |
@@ -235,6 +235,7 @@ public: | ||
235 | 235 | |
236 | 236 | virtual bool sendPacket(ChanPacket &,GnuID &); |
237 | 237 | virtual void flush(Stream &); |
238 | + virtual unsigned int flushUb(Stream &, unsigned int); | |
238 | 239 | virtual void readHeader(Stream &,Channel *); |
239 | 240 | virtual int readPacket(Stream &,Channel *); |
240 | 241 | virtual void readEnd(Stream &,Channel *); |
@@ -240,6 +240,8 @@ void Servent::reset() | ||
240 | 240 | type = T_NONE; |
241 | 241 | |
242 | 242 | channel_id = 0; |
243 | + | |
244 | + serventHit.init(); | |
243 | 245 | } |
244 | 246 | // ----------------------------------- |
245 | 247 | bool Servent::sendPacket(ChanPacket &pack,GnuID &cid,GnuID &sid,GnuID &did,Servent::TYPE t) |
@@ -807,6 +809,7 @@ bool Servent::handshakeStream(ChanInfo &chanInfo) | ||
807 | 809 | |
808 | 810 | bool gotPCP=false; |
809 | 811 | unsigned int reqPos=0; |
812 | + unsigned short listenPort = 0; | |
810 | 813 | |
811 | 814 | nsSwitchNum=0; |
812 | 815 |
@@ -820,6 +823,8 @@ bool Servent::handshakeStream(ChanInfo &chanInfo) | ||
820 | 823 | gotPCP = atoi(arg)!=0; |
821 | 824 | else if (http.isHeader(PCX_HS_POS)) |
822 | 825 | reqPos = atoi(arg); |
826 | + else if (http.isHeader(PCX_HS_PORT)) | |
827 | + listenPort = (unsigned short)atoi(arg); | |
823 | 828 | else if (http.isHeader("icy-metadata")) |
824 | 829 | addMetadata = atoi(arg) > 0; |
825 | 830 | else if (http.isHeader(HTTP_HS_AGENT)) |
@@ -873,22 +878,23 @@ bool Servent::handshakeStream(ChanInfo &chanInfo) | ||
873 | 878 | } |
874 | 879 | |
875 | 880 | chanID = chanInfo.id; |
881 | + serventHit.rhost[0].ip = getHost().ip; | |
882 | + serventHit.rhost[0].port = listenPort; | |
883 | + serventHit.host = serventHit.rhost[0]; | |
884 | + serventHit.chanID = chanID; | |
885 | + | |
876 | 886 | canStreamLock.on(); |
877 | 887 | chanReady = canStream(ch); |
878 | - if (0 && !chanReady) | |
888 | + if (/*0 && */!chanReady) | |
879 | 889 | { |
880 | 890 | if (servMgr->numStreams(chanID, Servent::T_RELAY, false) == 0) |
881 | 891 | { |
882 | 892 | sourceHit = &ch->sourceHost; // send source host info |
883 | 893 | |
884 | - if (ch->info.getUptime() > 60) // if stable | |
894 | + if (listenPort && ch->info.getUptime() > 60) // if stable | |
885 | 895 | { |
886 | 896 | // connect "this" host later |
887 | - ChanHit nh; | |
888 | - nh.init(); | |
889 | - nh.chanID = chanID; | |
890 | - nh.rhost[0] = getHost(); | |
891 | - chanMgr->addHit(nh); | |
897 | + chanMgr->addHit(serventHit); | |
892 | 898 | } |
893 | 899 | |
894 | 900 | char tmp[50]; |
@@ -896,11 +902,11 @@ bool Servent::handshakeStream(ChanInfo &chanInfo) | ||
896 | 902 | LOG_DEBUG("Bump channel, hand over sourceHost to %s", tmp); |
897 | 903 | ch->bump = true; |
898 | 904 | } |
899 | - else if (servMgr->kickUnrelayableHost(chanID, this) != 0) | |
905 | + else if (servMgr->kickUnrelayableHost(chanID, serventHit) != 0) | |
900 | 906 | { |
901 | 907 | chanReady = canStream(ch); |
902 | 908 | if (!chanReady) |
903 | - LOG_DEBUG("Kicked unrelayable host, but still cannot stream"); | |
909 | + LOG_DEBUG("*** Kicked unrelayable host, but still cannot stream"); | |
904 | 910 | } |
905 | 911 | } |
906 | 912 | if (!chanReady) type = T_INCOMING; |
@@ -1114,8 +1120,8 @@ bool Servent::handshakeStream(ChanInfo &chanInfo) | ||
1114 | 1120 | if (sourceHit) { |
1115 | 1121 | char tmp[50]; |
1116 | 1122 | sourceHit->writeAtoms(atom2, chanInfo.id); |
1117 | - chs.best[i].host.toStr(tmp); | |
1118 | - LOG_DEBUG("relay info: %s hops = %d", tmp, chs.best[i].numHops); | |
1123 | + sourceHit->host.toStr(tmp); | |
1124 | + LOG_DEBUG("relay info(sourceHit): %s", tmp); | |
1119 | 1125 | best.host.ip = sourceHit->host.ip; |
1120 | 1126 | } |
1121 | 1127 |
@@ -2792,6 +2798,8 @@ void Servent::sendPeercastChannel() | ||
2792 | 2798 | void Servent::sendPCPChannel() |
2793 | 2799 | { |
2794 | 2800 | bool skipCheck = false; |
2801 | + unsigned int ptime = 0; | |
2802 | + int npacket = 0, upsize = 0; | |
2795 | 2803 | |
2796 | 2804 | Channel *ch = chanMgr->findChannelByID(chanID); |
2797 | 2805 | if (!ch) |
@@ -2929,11 +2937,23 @@ void Servent::sendPCPChannel() | ||
2929 | 2937 | BroadcastState bcs; |
2930 | 2938 | bcs.servent_id = servent_id; |
2931 | 2939 | // error = pcpStream->readPacket(*sock,bcs); |
2932 | - do { | |
2940 | + | |
2941 | + unsigned int t = sys->getTime(); | |
2942 | + if (t != ptime) { | |
2943 | + ptime = t; | |
2944 | + npacket = MAX_PROC_PACKETS; | |
2945 | + upsize = MAX_OUTWARD_SIZE; | |
2946 | + } | |
2947 | + | |
2948 | + int len = pcpStream->flushUb(*sock, upsize); | |
2949 | + upsize -= len; | |
2950 | + | |
2951 | + while (npacket > 0 && sock->readReady()) { | |
2952 | + npacket--; | |
2933 | 2953 | error = pcpStream->readPacket(*sock,bcs); |
2934 | 2954 | if (error) |
2935 | 2955 | throw StreamException("PCP exception"); |
2936 | - } while (sock->readReady() || pcpStream->outData.numPending()); | |
2956 | + } | |
2937 | 2957 | |
2938 | 2958 | sys->sleepIdle(); |
2939 | 2959 |
@@ -46,6 +46,12 @@ public: | ||
46 | 46 | MAX_OUTPACKETS = 32 // max. output packets per queue (normal/priority) |
47 | 47 | }; |
48 | 48 | |
49 | + enum | |
50 | + { | |
51 | + MAX_PROC_PACKETS = 300, | |
52 | + MAX_OUTWARD_SIZE = 1024 * 10 | |
53 | + }; | |
54 | + | |
49 | 55 | enum TYPE |
50 | 56 | { |
51 | 57 | T_NONE, // Not allocated |
@@ -288,6 +294,8 @@ public: | ||
288 | 294 | unsigned int lastSkipCount; |
289 | 295 | unsigned int waitPort; |
290 | 296 | |
297 | + ChanHit serventHit; | |
298 | + | |
291 | 299 | int channel_id; |
292 | 300 | }; |
293 | 301 |
@@ -2623,6 +2623,7 @@ void ServMgr::banFirewalledHost() | ||
2623 | 2623 | } |
2624 | 2624 | |
2625 | 2625 | // -------------------------------------------------- |
2626 | +#if 0 | |
2626 | 2627 | static ChanHit *findServentHit(Servent *s) |
2627 | 2628 | { |
2628 | 2629 | ChanHitList *chl = chanMgr->findHitListByID(s->chanID); |
@@ -2640,8 +2641,9 @@ static ChanHit *findServentHit(Servent *s) | ||
2640 | 2641 | } |
2641 | 2642 | return NULL; |
2642 | 2643 | } |
2644 | +#endif | |
2643 | 2645 | // -------------------------------------------------- |
2644 | -int ServMgr::kickUnrelayableHost(GnuID &chid, Servent *ns) | |
2646 | +int ServMgr::kickUnrelayableHost(GnuID &chid, ChanHit &sendhit) | |
2645 | 2647 | { |
2646 | 2648 | Servent *ks = NULL; |
2647 | 2649 | Servent *s = servMgr->servents; |
@@ -2652,9 +2654,8 @@ int ServMgr::kickUnrelayableHost(GnuID &chid, Servent *ns) | ||
2652 | 2654 | { |
2653 | 2655 | Host h = s->getHost(); |
2654 | 2656 | |
2655 | - chanMgr->hitlistlock.on(); | |
2656 | - ChanHit *hit = findServentHit(s); | |
2657 | - if (hit && !hit->relay && hit->numRelays == 0) | |
2657 | + ChanHit hit = s->serventHit; | |
2658 | + if (!hit.relay && hit.numRelays == 0) | |
2658 | 2659 | { |
2659 | 2660 | char hostName[256]; |
2660 | 2661 | h.toStr(hostName); |
@@ -2663,25 +2664,18 @@ int ServMgr::kickUnrelayableHost(GnuID &chid, Servent *ns) | ||
2663 | 2664 | if (!ks || s->lastConnect < ks->lastConnect) // elder servent |
2664 | 2665 | ks = s; |
2665 | 2666 | } |
2666 | - chanMgr->hitlistlock.off(); | |
2667 | 2667 | } |
2668 | 2668 | s = s->next; |
2669 | 2669 | } |
2670 | 2670 | |
2671 | 2671 | if (ks) |
2672 | 2672 | { |
2673 | - if (ns) | |
2673 | + if (sendhit.rhost[0].port) | |
2674 | 2674 | { |
2675 | - Host h = ns->getHost(); | |
2676 | - ChanHit nh; | |
2677 | - nh.init(); | |
2678 | - nh.chanID = chid; | |
2679 | - nh.rhost[0] = h; | |
2680 | - | |
2681 | 2675 | ChanPacket pack; |
2682 | 2676 | MemoryStream mem(pack.data,sizeof(pack.data)); |
2683 | 2677 | AtomStream atom(mem); |
2684 | - nh.writeAtoms(atom, chid); | |
2678 | + sendhit.writeAtoms(atom, chid); | |
2685 | 2679 | pack.len = mem.pos; |
2686 | 2680 | pack.type = ChanPacket::T_PCP; |
2687 | 2681 | GnuID noID; |
@@ -402,7 +402,7 @@ public: | ||
402 | 402 | unsigned int kickPushTime; |
403 | 403 | bool isCheckPushStream(); //JP-EX |
404 | 404 | void banFirewalledHost(); //JP-EX |
405 | - int kickUnrelayableHost(GnuID &, Servent * = NULL); | |
405 | + int kickUnrelayableHost(GnuID &, ChanHit &); | |
406 | 406 | |
407 | 407 | bool getModulePath; //JP-EX |
408 | 408 | bool clearPLS; //JP-EX |
@@ -29,22 +29,22 @@ static bool PCP_FORCE_YP = false; | ||
29 | 29 | #endif |
30 | 30 | // ------------------------------------------------ |
31 | 31 | static const int PCP_CLIENT_VERSION = 1218; |
32 | -static const int PCP_CLIENT_VERSION_VP = 25; | |
32 | +static const int PCP_CLIENT_VERSION_VP = 26; | |
33 | 33 | static const int PCP_ROOT_VERSION = 1218; |
34 | 34 | |
35 | 35 | static const int PCP_CLIENT_MINVERSION = 1200; |
36 | 36 | |
37 | 37 | static const char *PCX_AGENT = "PeerCast/0.1218"; |
38 | 38 | static const char *PCX_AGENTJP = "PeerCast/0.1218-J"; |
39 | -static const char *PCX_AGENTVP = "PeerCast/0.1218(VP0025-1)"; | |
40 | -static const char *PCX_VERSTRING = "v0.1218(VP0025-1)"; | |
39 | +static const char *PCX_AGENTVP = "PeerCast/0.1218(VP0026)"; | |
40 | +static const char *PCX_VERSTRING = "v0.1218(VP0026)"; | |
41 | 41 | |
42 | 42 | #if 1 /* for VP extend version */ |
43 | 43 | #define VERSION_EX 1 |
44 | 44 | static const char *PCP_CLIENT_VERSION_EX_PREFIX = "IM"; // 2bytes only |
45 | -static const int PCP_CLIENT_VERSION_EX_NUMBER = 7650; | |
46 | -static const char *PCX_AGENTEX = "PeerCast/0.1218(IM-VP25-1)"; | |
47 | -static const char *PCX_VERSTRING_EX = "v0.1218(IM765-VP25-1)"; | |
45 | +static const int PCP_CLIENT_VERSION_EX_NUMBER = 26; | |
46 | +static const char *PCX_AGENTEX = "PeerCast/0.1218(IM0026-patch071223)"; | |
47 | +static const char *PCX_VERSTRING_EX = "v0.1218(IM0026)"; | |
48 | 48 | #endif |
49 | 49 | |
50 | 50 | // ------------------------------------------------ |