Skip to content

Commit

Permalink
支持单文件的多线程下载
Browse files Browse the repository at this point in the history
  • Loading branch information
asforest committed Jun 29, 2023
1 parent abbcea8 commit 2064c6d
Show file tree
Hide file tree
Showing 7 changed files with 202 additions and 46 deletions.
12 changes: 12 additions & 0 deletions src/main/kotlin/mcpatch/data/GlobalOptions.kt
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,16 @@ data class GlobalOptions(
* 是否自动关闭更新记录窗口
*/
val autoCloseChangelogs: Int,

/**
* 多线程下载时使用的线程数,仅对http源有效,且需要服务端支持断点续传功能
*/
val concurrentThreads: Int,

/**
* 多线程下载时每个文件块的大小
*/
val concurrentBlockSize: Int
) {
companion object {
/**
Expand Down Expand Up @@ -108,6 +118,8 @@ data class GlobalOptions(
retryTimes = getOption(map, "retry-times") ?: 5,
autoRestartVersion = getOption(map, "auto-restart-version") ?: true,
autoCloseChangelogs = getOption(map, "changelogs-auto-close") ?: 0,
concurrentThreads = getOption(map, "concurrent-threads") ?: 4,
concurrentBlockSize = getOption(map, "concurrent-block-size") ?: 4194304,
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ package mcpatch.exception

import mcpatch.util.PathUtils

class HttpResponseStatusCodeException(statusCode: Int, url: String, body: String?)
: BaseException("Http状态码($statusCode)不在2xx-3xx之间(${PathUtils.getFileNamePart(url)})\n$url\n" +
if (body?.isNotEmpty() == true) "以下为服务端返回的消息(HttpBody):\n${body}" else "服务端没有返回任何附加的消息(HttpBody)"
class HttpResponseStatusCodeException(statusCode: Int, expected: IntRange, url: String, body: String?)
: BaseException("HTTP状态码 $statusCode 不在 $expected 之间\n" +
"文件: ${PathUtils.getFileNamePart(url)})($url)\n" +
"Body: \n$body"
)
8 changes: 4 additions & 4 deletions src/main/kotlin/mcpatch/server/AbstractServerSource.kt
Original file line number Diff line number Diff line change
Expand Up @@ -68,18 +68,18 @@ abstract class AbstractServerSource : AutoCloseable
protected class ReduceReportingFrequency
{
var lastReport = System.currentTimeMillis()
var accumulation = 0L
var accumulated = 0L

fun feed(bytes: Int): Long
{
accumulation += bytes
accumulated += bytes

val now = System.currentTimeMillis()
if (now - lastReport > 200)
{
lastReport = now
val value = accumulation
accumulation = 0
val value = accumulated
accumulated = 0
return value
}

Expand Down
198 changes: 163 additions & 35 deletions src/main/kotlin/mcpatch/server/impl/HttpSupport.kt
Original file line number Diff line number Diff line change
@@ -1,22 +1,28 @@
package mcpatch.server.impl

import mcpatch.data.GlobalOptions
import mcpatch.exception.ConnectionInterruptedException
import mcpatch.exception.ConnectionRejectedException
import mcpatch.exception.ConnectionTimeoutException
import mcpatch.exception.HttpResponseStatusCodeException
import mcpatch.exception.*
import mcpatch.extension.FileExtension.bufferedOutputStream
import mcpatch.logging.Log
import mcpatch.server.AbstractServerSource
import mcpatch.server.OnDownload
import mcpatch.stream.ExposedByteArrayOutputStream
import mcpatch.util.File2
import mcpatch.util.MiscUtils
import okhttp3.OkHttpClient
import okhttp3.Request
import okhttp3.internal.headersContentLength
import java.io.ByteArrayOutputStream
import java.io.InterruptedIOException
import java.io.RandomAccessFile
import java.net.ConnectException
import java.net.SocketException
import java.net.SocketTimeoutException
import java.util.concurrent.Executors
import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicLong
import kotlin.math.min

class HttpSupport(serverString: String, val options: GlobalOptions)
: AbstractServerSource()
Expand Down Expand Up @@ -44,7 +50,7 @@ class HttpSupport(serverString: String, val options: GlobalOptions)
okClient.newCall(req).execute().use { r ->
if (!r.isSuccessful) {
val body = r.body?.string()?.limitLength()
throw HttpResponseStatusCodeException(r.code, url, body)
throw HttpResponseStatusCodeException(r.code, 200..300, url, body)
}

return@withRetrying r.body!!.string()
Expand All @@ -64,44 +70,32 @@ class HttpSupport(serverString: String, val options: GlobalOptions)
override fun downloadFile(relativePath: String, writeTo: File2, callback: OnDownload)
{
val url = buildURI(relativePath)
Log.debug("http request on $url, write to: ${writeTo.path}")

val link = url.replace("+", "%2B")

writeTo.makeParentDirs()
val req = buildRequest(url)

return withRetrying(retryTimes, 1000) {
try {
okClient.newCall(req).execute().use { r ->
// 测试请求
val testing = buildRequest(link, mapOf("Range" to "bytes=0-0"))
val rangeSupported: Boolean
val length:Long

Log.debug("http range test request on $url")
okClient.newCall(testing).execute().use { r ->
if(!r.isSuccessful)
throw HttpResponseStatusCodeException(r.code, link, r.body?.string()?.limitLength())

val body = r.body!!
val bodyLen = if (body.contentLength() != -1L) body.contentLength() else 1024 * 1024 * 1024
val bufferSize = MiscUtils.chooseBufferSize(bodyLen)

body.source().use { input ->
writeTo.file.bufferedOutputStream(bufferSize).use { output ->
var bytesReceived: Long = 0
var len: Int
val buffer = ByteArray(bufferSize)
val rrf = ReduceReportingFrequency()

while (input.read(buffer).also { len = it } != -1)
{
output.write(buffer, 0, len)
bytesReceived += len

val report = rrf.feed(len)
if (report > 0)
callback(report, bytesReceived, bodyLen)
}
}
}
throw HttpResponseStatusCodeException(r.code, 200..300, link, r.body?.string()?.limitLength())

return@withRetrying
rangeSupported = r.code == 206
length = if (rangeSupported) r.headers["Content-Range"].toString().split("/")[1].toLong() else -1
}

Log.debug("http request on $url, concurrent is ${if (rangeSupported) "on" else "off"}, write to: ${writeTo.path}")

if (rangeSupported)
concurrentDownload(url, link, length, writeTo, callback)
else
normalDownload(url, link, writeTo, callback)
} catch (e: ConnectException) {
throw ConnectionInterruptedException(link, e.message ?: "")
} catch (e: SocketException) {
Expand All @@ -126,16 +120,150 @@ class HttpSupport(serverString: String, val options: GlobalOptions)
return if (length > limit) substring(0, limit) + "\n..." else this
}

private fun buildRequest(url: String): Request
private fun buildRequest(url: String, headers: Map<String, String>? = null): Request
{
val req = Request.Builder().url(url)

if (options.clientUserAgent.isNotEmpty())
req.addHeader("User-Agent", this.options.clientUserAgent)

if (headers != null)
for (header in headers.entries)
req.addHeader(header.key, header.value)

for (header in options.httpHeaders)
req.addHeader(header.key, header.value)

return req.build()
}

private fun normalDownload(url: String, link: String, writeTo: File2, callback: OnDownload)
{
val req = buildRequest(url)

okClient.newCall(req).execute().use { r ->
if(!r.isSuccessful)
throw HttpResponseStatusCodeException(r.code, 200..300, link, r.body?.string()?.limitLength())

val body = r.body!!
val bodyLen = if (body.contentLength() != -1L) body.contentLength() else 1024 * 1024 * 1024
val bufferSize = MiscUtils.chooseBufferSize(bodyLen)

body.source().use { input ->
writeTo.file.bufferedOutputStream(bufferSize).use { output ->
var bytesReceived: Long = 0
var len: Int
val buffer = ByteArray(bufferSize)
val rrf = ReduceReportingFrequency()

while (input.read(buffer).also { len = it } != -1)
{
output.write(buffer, 0, len)
bytesReceived += len

val report = rrf.feed(len)
if (report > 0)
callback(report, bytesReceived, bodyLen)
}
}
}
}
}

private fun concurrentDownload(url: String, link: String, length: Long, writeTo: File2, callback: OnDownload)
{
val blockSize = options.concurrentBlockSize
val taskBlocks = LinkedBlockingQueue<Pair<Int, LongRange>>()
val downloadedBlocks = mutableMapOf<Int, ExposedByteArrayOutputStream>()

for (i in 0 until length / blockSize)
taskBlocks.put(Pair(i.toInt(), i * blockSize until i * blockSize + blockSize))

if (length % blockSize > 0)
{
val i = taskBlocks.size
taskBlocks.put(Pair(i, i * blockSize until length))
}

val totalDownloadedBytes = AtomicLong()
val reporter = ReduceReportingFrequency()
val threads = Integer.max(1, min(options.concurrentThreads, taskBlocks.size))
val pool = Executors.newFixedThreadPool(threads)
var ex: Exception? = null

for (i in 0 until threads)
{
pool.execute {
try {
while (true)
{
val (blockindex, block) = taskBlocks.poll() ?: return@execute

Log.debug("http request on $url part $blockindex (${block.first} to ${block.last}), write to: ${writeTo.path}")

val req = buildRequest(link, mapOf("Range" to "bytes=${block.first}-${block.last}"))

okClient.newCall(req).execute().use { r ->
if(!r.isSuccessful)
throw HttpResponseStatusCodeException(r.code, 200..300, link, r.body?.string()?.limitLength())

if (r.code != 206)
throw HttpResponseStatusCodeException(r.code, 206..206, link, r.body?.string()?.limitLength())

val body = r.body!!
val bodyLen = if (body.contentLength() != -1L) body.contentLength() else blockSize.toLong()
val bufferSize = MiscUtils.chooseBufferSize(bodyLen)

body.source().use { input ->
val buf = ExposedByteArrayOutputStream(bufferSize)

buf.use { output ->
var len: Int
val buffer = ByteArray(bufferSize)

while (input.read(buffer).also { len = it } != -1)
{
output.write(buffer, 0, len)

val total = totalDownloadedBytes.addAndGet(len.toLong())
val report = reporter.feed(len)

if (report > 0)
callback(report, total, length)
}
}

downloadedBlocks[blockindex] = buf
}

return@execute
}
}
} catch (e: Exception) {
if (e !is InterruptedIOException)
{
ex = e
pool.shutdownNow()
}
}
}
}

pool.shutdown()
pool.awaitTermination(1, TimeUnit.DAYS)

if (ex != null)
throw ex!!

RandomAccessFile(writeTo.file, "rw").use { file ->
for (i in 0 until downloadedBlocks.size)
{
val block = downloadedBlocks[i]!!
val buf = block.internalBuffer()
val len = block.size()

file.write(buf, 0, len)
}
}
}
}
8 changes: 4 additions & 4 deletions src/main/kotlin/mcpatch/server/impl/WebdavSupport.kt
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,9 @@ class WebdavSupport(serverString: String, val options: GlobalOptions)
} catch (e: HttpHostConnectException) {
throw WebdavConnectException(e, url)
} catch (e: McPatchSardineImpl.GetException) {
throw HttpResponseStatusCodeException(e.ex.statusCode, url, e.body)
throw HttpResponseStatusCodeException(e.ex.statusCode, 200..300, url, e.body)
} catch (e: SardineException) {
throw HttpResponseStatusCodeException(e.statusCode, url, "")
throw HttpResponseStatusCodeException(e.statusCode, 200..300, url, "")
} catch (e: SSLPeerUnverifiedException) {
throw SslCertificateUnverifiedException(url, e.toString())
} catch (e: SocketException) {
Expand Down Expand Up @@ -114,9 +114,9 @@ class WebdavSupport(serverString: String, val options: GlobalOptions)
} catch (e: HttpHostConnectException) {
throw WebdavConnectException(e, url)
} catch (e: McPatchSardineImpl.GetException) {
throw HttpResponseStatusCodeException(e.ex.statusCode, url, e.body)
throw HttpResponseStatusCodeException(e.ex.statusCode, 200..300, url, e.body)
} catch (e: SardineException) {
throw HttpResponseStatusCodeException(e.statusCode, url, "")
throw HttpResponseStatusCodeException(e.statusCode, 200..300, url, "")
} catch (e: SSLPeerUnverifiedException) {
throw SslCertificateUnverifiedException(url, e.toString())
} catch (e: SocketException) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package mcpatch.stream

import java.io.ByteArrayOutputStream

class ExposedByteArrayOutputStream(initialSize: Int = 4 * 1024) : ByteArrayOutputStream()
{
fun internalBuffer(): ByteArray = buf
}
7 changes: 7 additions & 0 deletions src/main/resources/mc-patch-config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,13 @@ http-response-timeout: 5000
# 出现网络问题时的重试次数(对所有源有效)
retry-times: 5

# 多线程下载时使用的线程数,仅对http源有效,且需要服务端支持断点续传功能
# 自带的MiniHTTPServer不支持断点续传!
concurrent-threads: 4

# 多线程下载时每个文件块的大小
concurrent-block-size: 4194304 # 默认为4mb

# 如果客户端当前版本号在服务端的版本列表里不存在时(即版本号损坏),是否自动重头开始下载所有版本
# true:如果客户端版本号出现问题,会从新下载一遍服务端的所有版本以达到修复文件的目的
# false:如果客户端版本号出现问题,会直接报错而不会进行任何额外动作
Expand Down

0 comments on commit 2064c6d

Please sign in to comment.