如何解决Amazon RDSDataService返回SocketException 日志环境
我当前正在创建一个lambda函数,该函数调用IAmazonRDSDataService方法BatchExecuteStatementAsync(BatchExecuteStatementRequest,CancellationToken),以将多个插入请求批量写入到我的Aurora Serverless数据库集群中。该请求执行大约30秒钟,然后失败,并显示一条错误消息。
我能够成功调用ExecuteStatementAsync,所以我知道我能够连接到我的数据库。如果我只是调用await _dataService.BatchExecuteStatementAsync(request).ConfigureAwait(false);在不将响应返回给我的lambda函数的情况下,lambda函数完成了而没有引发异常,但从未将任何项目插入数据库。每当IAmazonRDSDataService返回某些内容时,我的猜测是这会强制lambda阻塞线程,直到创建响应为止,但是我始终会收到以下异常:无法从传输连接中读取数据:管道损坏。
代码:
public async Task<HttpStatusCode> BatchInsertItemsAsync(IEnumerable<T> items)
{
List<T> itemList = items.ToList();
Console.WriteLine($"Batch inserting items with total count: {itemList.Count()}.");
List<List<SqlParameter>> parameterSets = ConstructSqlParameterSets(itemList);
BatchExecuteStatementRequest request = new BatchExecuteStatementRequest
{
Database = _database,ResourceArn = _resourceArn,ParameterSets = parameterSets,SecretArn = _secretArn,Schema = _dbSchema,Sql = ConstructInsertSqlStatement()
};
try
{
var response = await _dataService.BatchExecuteStatementAsync(request).ConfigureAwait(false);
Console.WriteLine($"Response: {JsonConvert.SerializeObject(response)}");
return response.HttpStatusCode;
}
catch (Exception e)
{
Console.WriteLine(JsonConvert.SerializeObject(e));
while (e.InnerException != null)
{
e = e.InnerException; // if the inner exception exists,use that message - otherwise use the exception message
}
Console.WriteLine(JsonConvert.SerializeObject(e));
throw e;
}
}
public static string ConstructInsertSqlStatement()
{
string insertQuery = $"insert into {_dbSchema} values (";
foreach (PropertyInfo p in typeof(T).GetProperties())
{
insertQuery += ":" + p.Name + ",";
}
insertQuery = insertQuery.TrimEnd(',');
insertQuery += ")";
Console.WriteLine($"Insert statement: {insertQuery}");
return insertQuery;
}
public List<List<SqlParameter>> ConstructSqlParameterSets(IList<T> items)
{
Console.WriteLine($"Constructing parameter set. Item count: {items.Count()}");
List<List<SqlParameter>> parameterSets = new List<List<SqlParameter>>();
foreach (T item in items)
{
List<SqlParameter> parameterSet = new List<SqlParameter>();
foreach (PropertyInfo p in typeof(T).GetProperties())
{
Field f = new Field();
if (p.GetValue(item) == null)
{
f.IsNull = true;
parameterSet.Add(new SqlParameter
{
Name = p.Name,Value = f
});
continue;
}
else if (p.PropertyType == typeof(long) || p.PropertyType == typeof(long?))
{
f.LongValue = Convert.ToInt64(p.GetValue(item));
}
else if (p.PropertyType == typeof(int) || p.PropertyType == typeof(int?))
{
f.LongValue = Convert.ToInt64((int)p.GetValue(item));
}
else if (p.PropertyType == typeof(double) || p.PropertyType == typeof(double?))
{
f.DoubleValue = Convert.ToDouble((p.GetValue(item)));
}
else if (p.PropertyType == typeof(decimal) || p.PropertyType == typeof(decimal?))
{
f.DoubleValue = Decimal.ToDouble((decimal)p.GetValue(item));
}
else if (p.PropertyType == typeof(string))
{
f.StringValue = (string)p.GetValue(item);
}
else if (p.PropertyType == typeof(bool) || p.PropertyType == typeof(bool?))
{
f.BooleanValue = (bool)p.GetValue(item);
}
SqlParameter param = new SqlParameter
{
Name = p.Name,Value = f
};
parameterSet.Add(param);
}
parameterSets.Add(parameterSet);
}
Console.WriteLine($"Parameter set count: {parameterSets.Count()}.");
return parameterSets;
}
{
----------------------------------------
namespace BulkInsertCsvDataTask
{
public class Function
{
try
{
GetObjectRequest request = new GetObjectRequest
{
BucketName = fileUploadEvent.BucketName,Key = fileUploadEvent.Key
};
using GetObjectResponse response = await _s3Client.GetObjectAsync(request).ConfigureAwait(false);
using Stream responseStream = response.ResponseStream;
using (var reader = new StreamReader(responseStream))
using (var csv = new CsvReader(reader,CultureInfo.InvariantCulture))
{
csv.Configuration.PrepareHeaderForMatch = (string header,int index) => header.ToLower();
csv.Configuration.HeaderValidated = null;
csv.Configuration.MissingFieldFound = null;
records = csv.GetRecords<ValuationsUploadRow>();
batchWriteStatusCode = await _dataRepository.BatchInsertItemsAsync(records).ConfigureAwait(false);
}
}
catch (Exception e)
{
while (e.InnerException != null)
{
e = e.InnerException; // if the inner exception exists,use that message - otherwise use the exception message
}
LambdaLogger.Log($"File parse/upload process failed with exception: {e.Message}.");
throw e;
}
}
}
日志
堆栈跟踪:
{"ClassName":"System.IO.IOException","Message":"Unable to read data from the transport connection: Broken pipe.","Data":null,"InnerException":{"ClassName":"System.Net.Sockets.SocketException","Message":"Broken pipe","InnerException":null,"HelpURL":null,"StackTraceString":null,"RemoteStackTraceString":null,"RemoteStackIndex":0,"ExceptionMethod":null,"HResult":-2147467259,"Source":null,"WatsonBuckets":null,"NativeErrorCode":32},"StackTraceString":" at Amazon.Runtime.HttpWebRequestMessage.GetResponseAsync(CancellationToken cancellationToken)\n at Amazon.Runtime.Internal.HttpHandler`1.InvokeAsync[T](IExecutionContext executionContext)\n at Amazon.Runtime.Internal.Unmarshaller.InvokeAsync[T](IExecutionContext executionContext)\n at Amazon.Runtime.Internal.ErrorHandler.InvokeAsync[T](IExecutionContext executionContext)\n at Amazon.Runtime.Internal.CallbackHandler.InvokeAsync[T](IExecutionContext executionContext)\n at Amazon.Runtime.Internal.EndpointDiscoveryHandler.InvokeAsync[T](IExecutionContext executionContext)\n at Amazon.Runtime.Internal.EndpointDiscoveryHandler.InvokeAsync[T](IExecutionContext executionContext)\n at Amazon.Runtime.Internal.CredentialsRetriever.InvokeAsync[T](IExecutionContext executionContext)\n at Amazon.Runtime.Internal.RetryHandler.InvokeAsync[T](IExecutionContext executionContext)\n at Amazon.Runtime.Internal.RetryHandler.InvokeAsync[T](IExecutionContext executionContext)\n at Amazon.Runtime.Internal.CallbackHandler.InvokeAsync[T](IExecutionContext executionContext)\n at Amazon.Runtime.Internal.CallbackHandler.InvokeAsync[T](IExecutionContext executionContext)\n at Amazon.Runtime.Internal.ErrorCallbackHandler.InvokeAsync[T](IExecutionContext executionContext)\n at Amazon.Runtime.Internal.MetricsHandler.InvokeAsync[T](IExecutionContext executionContext)\n at Bx.ServerlessWorkflow.DataRepository.RdsDataRepository`1.BatchInsertItemsAsync(IEnumerable`1 items) in C:\\git\\serverless\\valuations-file-upload-demo\\src\\Bx.ServerlessWorkflow.DataRepository\\RdsDataRepository.cs:line 51","HResult":-2146232800,"Source":"AWSSDK.Core","WatsonBuckets":null}
{"ClassName":"System.Net.Sockets.SocketException","NativeErrorCode":32}
File parse/upload process failed with exception: Broken pipe.Broken pipe: SocketException
环境
- SDK版本:3.3.103.116
- 软件包版本:3.3.103.116
- 操作系统信息:Windows 10
- 构建环境Visual Studio / VSCode
- 目标.NET平台:netcoreapp3.1
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。